xds: move orca to java package io.grpc.xds.orca (#9086)

1. move orca from xds and from service to io.grpc.xds.orca new package
2. keep CallMetricsRecorder and InternalCallMetricsRecorder in service
3. Added APIs for recording utilization/requestCost/cpuUtilization/memoryUtilzation for per-query requests, added internal data structure equivalent to OrcaLoadReport
This commit is contained in:
yifeizhuang 2022-04-29 14:56:44 -07:00 committed by GitHub
parent e147b5ebfb
commit 9a5467b2ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 227 additions and 57 deletions

View File

@ -24,6 +24,7 @@ dependencies {
project(':grpc-core')
implementation libraries.protobuf_util,
libraries.guava
runtimeOnly libraries.errorprone
compileOnly libraries.javax_annotation

View File

@ -16,6 +16,8 @@
package io.grpc.services;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Context;
import io.grpc.ExperimentalApi;
@ -36,11 +38,47 @@ public final class CallMetricRecorder {
static final Context.Key<CallMetricRecorder> CONTEXT_KEY =
Context.key("io.grpc.services.CallMetricRecorder");
private final AtomicReference<ConcurrentHashMap<String, Double>> metrics =
private final AtomicReference<ConcurrentHashMap<String, Double>> utilizationMetrics =
new AtomicReference<>();
private final AtomicReference<ConcurrentHashMap<String, Double>> requestCostMetrics =
new AtomicReference<>();
private double cpuUtilizationMetric = 0;
private double memoryUtilizationMetric = 0;
private volatile boolean disabled;
CallMetricRecorder() {
public static final class CallMetricReport {
private double cpuUtilization;
private double memoryUtilization;
private Map<String, Double> requestCostMetrics;
private Map<String, Double> utilizationMetrics;
/**
* Create a report for all backend metrics.
*/
CallMetricReport(double cpuUtilization, double memoryUtilization,
Map<String, Double> requestCostMetrics,
Map<String, Double> utilizationMetrics) {
this.cpuUtilization = cpuUtilization;
this.memoryUtilization = memoryUtilization;
this.requestCostMetrics = checkNotNull(requestCostMetrics, "requestCostMetrics");
this.utilizationMetrics = checkNotNull(utilizationMetrics, "utilizationMetrics");
}
public double getCpuUtilization() {
return cpuUtilization;
}
public double getMemoryUtilization() {
return memoryUtilization;
}
public Map<String, Double> getRequestCostMetrics() {
return requestCostMetrics;
}
public Map<String, Double> getUtilizationMetrics() {
return utilizationMetrics;
}
}
/**
@ -62,39 +100,116 @@ public final class CallMetricRecorder {
}
/**
* Records a call metric measurement. If RPC has already finished, this method is no-op.
* Records a call metric measurement for utilization.
* If RPC has already finished, this method is no-op.
*
* <p>A latter record will overwrite its former name-sakes.
*
* @return this recorder object
* @since 1.23.0
*/
public CallMetricRecorder recordUtilizationMetric(String name, double value) {
if (disabled) {
return this;
}
if (utilizationMetrics.get() == null) {
// The chance of race of creation of the map should be very small, so it should be fine
// to create these maps that might be discarded.
utilizationMetrics.compareAndSet(null, new ConcurrentHashMap<String, Double>());
}
utilizationMetrics.get().put(name, value);
return this;
}
/**
* Records a call metric measurement for request cost.
* If RPC has already finished, this method is no-op.
*
* <p>A latter record will overwrite its former name-sakes.
*
* @return this recorder object
* @since 1.47.0
*/
public CallMetricRecorder recordCallMetric(String name, double value) {
if (disabled) {
return this;
}
if (metrics.get() == null) {
if (requestCostMetrics.get() == null) {
// The chance of race of creation of the map should be very small, so it should be fine
// to create these maps that might be discarded.
metrics.compareAndSet(null, new ConcurrentHashMap<String, Double>());
requestCostMetrics.compareAndSet(null, new ConcurrentHashMap<String, Double>());
}
metrics.get().put(name, value);
requestCostMetrics.get().put(name, value);
return this;
}
/**
* Records a call metric measurement for CPU utilization.
* If RPC has already finished, this method is no-op.
*
* <p>A latter record will overwrite its former name-sakes.
*
* @return this recorder object
* @since 1.47.0
*/
public CallMetricRecorder recordCpuUtilizationMetric(double value) {
if (disabled) {
return this;
}
cpuUtilizationMetric = value;
return this;
}
/**
* Records a call metric measurement for memory utilization.
* If RPC has already finished, this method is no-op.
*
* <p>A latter record will overwrite its former name-sakes.
*
* @return this recorder object
* @since 1.47.0
*/
public CallMetricRecorder recordMemoryUtilizationMetric(double value) {
if (disabled) {
return this;
}
memoryUtilizationMetric = value;
return this;
}
/**
* Returns all request cost metric values. No more metric values will be recorded after this
* method is called. Calling this method multiple times returns the same collection of metric
* values.
*
* @return a map containing all saved metric name-value pairs.
*/
Map<String, Double> finalizeAndDump() {
disabled = true;
Map<String, Double> savedMetrics = requestCostMetrics.get();
if (savedMetrics == null) {
return Collections.emptyMap();
}
return Collections.unmodifiableMap(savedMetrics);
}
/**
* Returns all save metric values. No more metric values will be recorded after this method is
* called. Calling this method multiple times returns the same collection of metric values.
*
* @return a map containing all saved metric name-value pairs.
* @return a per-request ORCA reports containing all saved metrics.
*/
Map<String, Double> finalizeAndDump() {
disabled = true;
Map<String, Double> savedMetrics = metrics.get();
if (savedMetrics == null) {
return Collections.emptyMap();
CallMetricReport finalizeAndDump2() {
Map<String, Double> savedRequestCostMetrics = finalizeAndDump();
Map<String, Double> savedUtilizationMetrics = utilizationMetrics.get();
if (savedUtilizationMetrics == null) {
savedUtilizationMetrics = Collections.emptyMap();
}
return Collections.unmodifiableMap(savedMetrics);
return new CallMetricReport(cpuUtilizationMetric,
memoryUtilizationMetric, Collections.unmodifiableMap(savedRequestCostMetrics),
Collections.unmodifiableMap(savedUtilizationMetrics)
);
}
@VisibleForTesting

View File

@ -40,4 +40,8 @@ public final class InternalCallMetricRecorder {
public static Map<String, Double> finalizeAndDump(CallMetricRecorder recorder) {
return recorder.finalizeAndDump();
}
public static CallMetricRecorder.CallMetricReport finalizeAndDump2(CallMetricRecorder recorder) {
return recorder.finalizeAndDump2();
}
}

View File

@ -18,6 +18,7 @@ package io.grpc.services;
import static com.google.common.truth.Truth.assertThat;
import com.google.common.truth.Truth;
import io.grpc.Context;
import java.util.Map;
import org.junit.Test;
@ -37,19 +38,28 @@ public class CallMetricRecorderTest {
@Test
public void dumpDumpsAllSavedMetricValues() {
recorder.recordCallMetric("cost1", 154353.423);
recorder.recordCallMetric("cost2", 0.1367);
recorder.recordCallMetric("cost3", 1437.34);
recorder.recordUtilizationMetric("util1", 154353.423);
recorder.recordUtilizationMetric("util2", 0.1367);
recorder.recordUtilizationMetric("util3", 1437.34);
recorder.recordCallMetric("cost1", 37465.12);
recorder.recordCallMetric("cost2", 10293.0);
recorder.recordCallMetric("cost3", 1.0);
recorder.recordCpuUtilizationMetric(0.1928);
recorder.recordMemoryUtilizationMetric(47.4);
Map<String, Double> dump = recorder.finalizeAndDump();
assertThat(dump)
.containsExactly("cost1", 154353.423, "cost2", 0.1367, "cost3", 1437.34);
CallMetricRecorder.CallMetricReport dump = recorder.finalizeAndDump2();
Truth.assertThat(dump.getUtilizationMetrics())
.containsExactly("util1", 154353.423, "util2", 0.1367, "util3", 1437.34);
Truth.assertThat(dump.getRequestCostMetrics())
.containsExactly("cost1", 37465.12, "cost2", 10293.0, "cost3", 1.0);
Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0.1928);
Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(47.4);
}
@Test
public void noMetricsRecordedAfterSnapshot() {
Map<String, Double> initDump = recorder.finalizeAndDump();
recorder.recordCallMetric("cost", 154353.423);
recorder.recordUtilizationMetric("cost", 154353.423);
assertThat(recorder.finalizeAndDump()).isEqualTo(initDump);
}
@ -60,9 +70,19 @@ public class CallMetricRecorderTest {
recorder.recordCallMetric("cost1", 6441.341);
recorder.recordCallMetric("cost1", 4654.67);
recorder.recordCallMetric("cost2", 75.83);
Map<String, Double> dump = recorder.finalizeAndDump();
assertThat(dump)
recorder.recordMemoryUtilizationMetric(1.3);
recorder.recordMemoryUtilizationMetric(3.1);
recorder.recordUtilizationMetric("util1", 28374.21);
recorder.recordMemoryUtilizationMetric(9384.0);
recorder.recordUtilizationMetric("util1", 84323.3);
CallMetricRecorder.CallMetricReport dump = recorder.finalizeAndDump2();
Truth.assertThat(dump.getRequestCostMetrics())
.containsExactly("cost1", 4654.67, "cost2", 75.83);
Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(9384.0);
Truth.assertThat(dump.getUtilizationMetrics())
.containsExactly("util1", 84323.3);
Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0);
}
@Test

View File

@ -14,12 +14,13 @@
* limitations under the License.
*/
package io.grpc.xds;
package io.grpc.xds.orca;
import com.github.xds.data.orca.v3.OrcaLoadReport;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.ExperimentalApi;
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
import io.grpc.Metadata;
import io.grpc.ServerCall;
@ -30,7 +31,6 @@ import io.grpc.Status;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.services.CallMetricRecorder;
import io.grpc.services.InternalCallMetricRecorder;
import java.util.Map;
/**
* A {@link ServerInterceptor} that intercepts a {@link ServerCall} by running server-side RPC
@ -40,7 +40,8 @@ import java.util.Map;
*
* @since 1.23.0
*/
final class OrcaMetricReportingServerInterceptor implements ServerInterceptor {
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/9127")
public final class OrcaMetricReportingServerInterceptor implements ServerInterceptor {
private static final OrcaMetricReportingServerInterceptor INSTANCE =
new OrcaMetricReportingServerInterceptor();
@ -48,7 +49,7 @@ final class OrcaMetricReportingServerInterceptor implements ServerInterceptor {
@VisibleForTesting
static final Metadata.Key<OrcaLoadReport> ORCA_ENDPOINT_LOAD_METRICS_KEY =
Metadata.Key.of(
"x-endpoint-load-metrics-bin",
"endpoint-load-metrics-bin",
ProtoUtils.metadataMarshaller(OrcaLoadReport.getDefaultInstance()));
@VisibleForTesting
@ -73,12 +74,9 @@ final class OrcaMetricReportingServerInterceptor implements ServerInterceptor {
new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void close(Status status, Metadata trailers) {
Map<String, Double> metricValues =
InternalCallMetricRecorder.finalizeAndDump(finalCallMetricRecorder);
// Only attach a metric report if there are some metric values to be reported.
if (!metricValues.isEmpty()) {
OrcaLoadReport report =
OrcaLoadReport.newBuilder().putAllRequestCost(metricValues).build();
OrcaLoadReport report = fromInternalReport(
InternalCallMetricRecorder.finalizeAndDump2(finalCallMetricRecorder));
if (!report.equals(OrcaLoadReport.getDefaultInstance())) {
trailers.put(ORCA_ENDPOINT_LOAD_METRICS_KEY, report);
}
super.close(status, trailers);
@ -90,4 +88,14 @@ final class OrcaMetricReportingServerInterceptor implements ServerInterceptor {
headers,
next);
}
private static OrcaLoadReport fromInternalReport(
CallMetricRecorder.CallMetricReport internalReport) {
return OrcaLoadReport.newBuilder()
.setCpuUtilization(internalReport.getCpuUtilization())
.setMemUtilization(internalReport.getMemoryUtilization())
.putAllUtilization(internalReport.getUtilizationMetrics())
.putAllRequestCost(internalReport.getRequestCostMetrics())
.build();
}
}

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.grpc.xds;
package io.grpc.xds.orca;
import static com.google.common.base.Preconditions.checkNotNull;

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.grpc.xds;
package io.grpc.xds.orca;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
@ -37,6 +37,7 @@ import io.grpc.ChannelLogger;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.ClientCall;
import io.grpc.ConnectivityStateInfo;
import io.grpc.ExperimentalApi;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.CreateSubchannelArgs;
import io.grpc.LoadBalancer.Helper;
@ -68,7 +69,8 @@ import javax.annotation.Nullable;
* Utility class that provides method for {@link LoadBalancer} to install listeners to receive
* out-of-band backend cost metrics in the format of Open Request Cost Aggregation (ORCA).
*/
abstract class OrcaOobUtil {
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/9129")
public abstract class OrcaOobUtil {
private static final Logger logger = Logger.getLogger(OrcaPerRequestUtil.class.getName());
private static final OrcaOobUtil DEFAULT_INSTANCE =

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.grpc.xds;
package io.grpc.xds.orca;
import static com.google.common.base.Preconditions.checkNotNull;
@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
import io.grpc.CallOptions;
import io.grpc.ClientStreamTracer;
import io.grpc.ClientStreamTracer.StreamInfo;
import io.grpc.ExperimentalApi;
import io.grpc.LoadBalancer;
import io.grpc.Metadata;
import io.grpc.internal.ForwardingClientStreamTracer;
@ -34,7 +35,8 @@ import java.util.List;
* Utility class that provides method for {@link LoadBalancer} to install listeners to receive
* per-request backend cost metrics in the format of Open Request Cost Aggregation (ORCA).
*/
abstract class OrcaPerRequestUtil {
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/9128")
public abstract class OrcaPerRequestUtil {
private static final ClientStreamTracer NOOP_CLIENT_STREAM_TRACER = new ClientStreamTracer() {};
private static final ClientStreamTracer.Factory NOOP_CLIENT_STREAM_TRACER_FACTORY =
new ClientStreamTracer.Factory() {
@ -195,7 +197,7 @@ abstract class OrcaPerRequestUtil {
@VisibleForTesting
static final Metadata.Key<OrcaLoadReport> ORCA_ENDPOINT_LOAD_METRICS_KEY =
Metadata.Key.of(
"x-endpoint-load-metrics-bin",
"endpoint-load-metrics-bin",
ProtoUtils.metadataMarshaller(OrcaLoadReport.getDefaultInstance()));
private static final CallOptions.Key<OrcaReportBroker> ORCA_REPORT_BROKER_KEY =

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.grpc.xds;
package io.grpc.xds.orca;
import static com.google.common.base.Preconditions.checkNotNull;

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.grpc.xds;
package io.grpc.xds.orca;
import static com.google.common.truth.Truth.assertThat;
@ -69,7 +69,10 @@ public class OrcaMetricReportingServerInterceptorTest {
private static final SimpleRequest REQUEST =
SimpleRequest.newBuilder().setRequestMessage("Simple request").build();
private final Map<String, Double> applicationMetrics = new HashMap<>();
private final Map<String, Double> applicationUtilizationMetrics = new HashMap<>();
private final Map<String, Double> applicationCostMetrics = new HashMap<>();
private double cpuUtilizationMetrics = 0;
private double memoryUtilizationMetrics = 0;
private final AtomicReference<Metadata> trailersCapture = new AtomicReference<>();
@ -82,9 +85,16 @@ public class OrcaMetricReportingServerInterceptorTest {
@Override
public void unaryRpc(
SimpleRequest request, StreamObserver<SimpleResponse> responseObserver) {
for (Map.Entry<String, Double> entry : applicationMetrics.entrySet()) {
CallMetricRecorder.getCurrent().recordCallMetric(entry.getKey(), entry.getValue());
for (Map.Entry<String, Double> entry : applicationUtilizationMetrics.entrySet()) {
CallMetricRecorder.getCurrent().recordUtilizationMetric(entry.getKey(),
entry.getValue());
}
for (Map.Entry<String, Double> entry : applicationCostMetrics.entrySet()) {
CallMetricRecorder.getCurrent().recordCallMetric(entry.getKey(),
entry.getValue());
}
CallMetricRecorder.getCurrent().recordCpuUtilizationMetric(cpuUtilizationMetrics);
CallMetricRecorder.getCurrent().recordMemoryUtilizationMetric(memoryUtilizationMetrics);
SimpleResponse response =
SimpleResponse.newBuilder().setResponseMessage("Simple response").build();
responseObserver.onNext(response);
@ -111,8 +121,7 @@ public class OrcaMetricReportingServerInterceptorTest {
@Test
public void shareCallMetricRecorderInContext() throws IOException {
final CallMetricRecorder callMetricRecorder =
InternalCallMetricRecorder.newCallMetricRecorder();
final CallMetricRecorder callMetricRecorder = new CallMetricRecorder();
ServerStreamTracer.Factory callMetricRecorderSharingStreamTracerFactory =
new ServerStreamTracer.Factory() {
@Override
@ -169,15 +178,24 @@ public class OrcaMetricReportingServerInterceptorTest {
@Test
public void responseTrailersContainAllReportedMetrics() {
applicationMetrics.put("cost1", 1231.4543);
applicationMetrics.put("cost2", 0.1367);
applicationMetrics.put("cost3", 7614.145);
applicationCostMetrics.put("cost1", 1231.4543);
applicationCostMetrics.put("cost2", 0.1367);
applicationCostMetrics.put("cost3", 7614.145);
applicationUtilizationMetrics.put("util1", 0.1082);
applicationUtilizationMetrics.put("util2", 0.4936);
applicationUtilizationMetrics.put("util3", 0.5342);
cpuUtilizationMetrics = 0.3465;
memoryUtilizationMetrics = 0.764;
ClientCalls.blockingUnaryCall(channelToUse, SIMPLE_METHOD, CallOptions.DEFAULT, REQUEST);
Metadata receivedTrailers = trailersCapture.get();
OrcaLoadReport report =
receivedTrailers.get(OrcaMetricReportingServerInterceptor.ORCA_ENDPOINT_LOAD_METRICS_KEY);
assertThat(report.getUtilizationMap())
.containsExactly("util1", 0.1082, "util2", 0.4936, "util3", 0.5342);
assertThat(report.getRequestCostMap())
.containsExactly("cost1", 1231.4543, "cost2", 0.1367, "cost3", 7614.145);
assertThat(report.getCpuUtilization()).isEqualTo(0.3465);
assertThat(report.getMemUtilization()).isEqualTo(0.764);
}
private static final class TrailersCapturingClientInterceptor implements ClientInterceptor {

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.grpc.xds;
package io.grpc.xds.orca;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
@ -61,10 +61,10 @@ import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.FakeClock;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.OrcaOobUtil.OrcaOobReportListener;
import io.grpc.xds.OrcaOobUtil.OrcaReportingConfig;
import io.grpc.xds.OrcaOobUtil.OrcaReportingHelperWrapper;
import io.grpc.xds.OrcaOobUtil.SubchannelImpl;
import io.grpc.xds.orca.OrcaOobUtil.OrcaOobReportListener;
import io.grpc.xds.orca.OrcaOobUtil.OrcaReportingConfig;
import io.grpc.xds.orca.OrcaOobUtil.OrcaReportingHelperWrapper;
import io.grpc.xds.orca.OrcaOobUtil.SubchannelImpl;
import java.net.SocketAddress;
import java.text.MessageFormat;
import java.util.ArrayDeque;

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.grpc.xds;
package io.grpc.xds.orca;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.AdditionalAnswers.delegatesTo;
@ -30,8 +30,8 @@ import static org.mockito.Mockito.when;
import com.github.xds.data.orca.v3.OrcaLoadReport;
import io.grpc.ClientStreamTracer;
import io.grpc.Metadata;
import io.grpc.xds.OrcaPerRequestUtil.OrcaPerRequestReportListener;
import io.grpc.xds.OrcaPerRequestUtil.OrcaReportingTracerFactory;
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaReportingTracerFactory;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.grpc.xds;
package io.grpc.xds.orca;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;