下面列出了怎么用io.grpc.ForwardingClientCall的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
protected Listener<RespT> delegate() {
callTime = System.nanoTime();
return super.delegate();
}
}, headers);
}
};
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
protected Listener<RespT> delegate() {
callTime = System.nanoTime();
return super.delegate();
}
}, headers);
}
};
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
channel.newCall(methodDescriptor, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
// TODO: Here set request metadata
Metadata.Key<String> email_key =
Metadata.Key.of("email", Metadata.ASCII_STRING_MARSHALLER);
Metadata.Key<String> dev_key =
Metadata.Key.of("developer_key", Metadata.ASCII_STRING_MARSHALLER);
Metadata.Key<String> source_key =
Metadata.Key.of("source", Metadata.ASCII_STRING_MARSHALLER);
headers.put(email_key, client1Email);
headers.put(dev_key, client1DevKey);
headers.put(source_key, "PythonClient");
super.start(responseListener, headers);
}
};
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
channel.newCall(methodDescriptor, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
// TODO: Here set request metadata
Metadata.Key<String> email_key =
Metadata.Key.of("email", Metadata.ASCII_STRING_MARSHALLER);
Metadata.Key<String> dev_key =
Metadata.Key.of("developer_key", Metadata.ASCII_STRING_MARSHALLER);
Metadata.Key<String> source_key =
Metadata.Key.of("source", Metadata.ASCII_STRING_MARSHALLER);
headers.put(email_key, client2Email);
headers.put(dev_key, client2DevKey);
headers.put(source_key, "PythonClient");
super.start(responseListener, headers);
}
};
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT,RespT> method, CallOptions callOptions, Channel next) {
LOGGER.info("Intercepted " + method.getFullMethodName());
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
call = new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
if (apiKey != null && !apiKey.isEmpty()) {
LOGGER.info("Attaching API Key: " + apiKey);
headers.put(API_KEY_HEADER, apiKey);
}
super.start(responseListener, headers);
}
};
return call;
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT,RespT> method, CallOptions callOptions, Channel next) {
LOGGER.info("Intercepted " + method.getFullMethodName());
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
call = new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
if (apiKey != null && !apiKey.isEmpty()) {
LOGGER.info("Attaching API Key: " + apiKey);
headers.put(API_KEY_HEADER, apiKey);
}
if (authToken != null && !authToken.isEmpty()) {
System.out.println("Attaching auth token");
headers.put(AUTHORIZATION_HEADER, "Bearer " + authToken);
}
super.start(responseListener, headers);
}
};
return call;
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
channel.newCall(methodDescriptor, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
// Inject the request with the current context
textFormat.inject(Context.current(), headers, setter);
// Perform the gRPC request
super.start(responseListener, headers);
}
};
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
channel.newCall(methodDescriptor, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
// Inject the request with the current context
textFormat.inject(Context.current(), headers, setter);
// Perform the gRPC request
super.start(responseListener, headers);
}
};
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
channel.newCall(methodDescriptor, callOptions)) {
@Override
public void start(Listener<RespT> listener, Metadata metadata) {
metadata.put(Metadata.Key.of("token", ASCII_STRING_MARSHALLER), tokenValue);
super.start(listener, metadata);
}
};
}
@Test
public void test() throws InterruptedException, ExecutionException {
final CountDownLatch latch = new CountDownLatch(1);
final ClientBuilder builder = Client.builder().endpoints(cluster.getClientEndpoints())
.header("MyHeader1", "MyHeaderVal1").header("MyHeader2", "MyHeaderVal2").interceptor(new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(responseListener, headers);
assertThat(headers.get(Metadata.Key.of("MyHeader1", Metadata.ASCII_STRING_MARSHALLER)))
.isEqualTo("MyHeaderVal1");
assertThat(headers.get(Metadata.Key.of("MyHeader2", Metadata.ASCII_STRING_MARSHALLER)))
.isEqualTo("MyHeaderVal2");
latch.countDown();
}
};
}
});
try (Client client = builder.build()) {
CompletableFuture<PutResponse> future = client.getKVClient().put(bytesOf("sample_key"), bytesOf("sample_key"));
latch.await(1, TimeUnit.MINUTES);
future.get();
}
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
final ClientCall<ReqT, RespT> clientCall = next.newCall(method, callOptions);
final ClientCall<ReqT, RespT> forwardingClientCall = new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(clientCall) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
for (Short code : supportCommandCodes) {
headers.put(Header.SUPPORT_COMMAND_CODE, String.valueOf(code));
}
super.start(responseListener, headers);
}
};
return forwardingClientCall;
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
//这里和下面不在一个线程
if (RpcRunningState.isDebugMode()) {
LOGGER.info("[1]header send from client:");
}
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(
method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata requestHeader) {
RpcInvokeContext context = RpcInvokeContext.getContext();
SofaRequest sofaRequest = (SofaRequest) context.get(TripleContants.SOFA_REQUEST_KEY);
ConsumerConfig consumerConfig = (ConsumerConfig) context.get(TripleContants.SOFA_CONSUMER_CONFIG_KEY);
TripleTracerAdapter.beforeSend(sofaRequest, consumerConfig, requestHeader);
if (RpcRunningState.isDebugMode()) {
LOGGER.info("[2]prepare to send from client:{}", requestHeader);
}
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata responseHeader) {
// 客户端收到响应Header
if (RpcRunningState.isDebugMode()) {
LOGGER.info("[3]response header received from server:{}", responseHeader);
}
super.onHeaders(responseHeader);
}
@Override
public void onMessage(RespT message) {
if (RpcRunningState.isDebugMode()) {
LOGGER.info("[4]response message received from server:{}", message);
}
super.onMessage(message);
}
@Override
public void onClose(Status status, Metadata trailers) {
if (RpcRunningState.isDebugMode()) {
LOGGER.info("[5]response close received from server:{},trailers:{}", status, trailers);
}
super.onClose(status, trailers);
}
@Override
public void onReady() {
if (RpcRunningState.isDebugMode()) {
LOGGER.info("[5]client is ready");
}
super.onReady();
}
}, requestHeader);
}
};
}
/**
* Build an AsyncHandler instance
*
* @param _credentials A valid authentication token
* @param _host The handler host
* @param _port The handler port
* @param _certificate The handler certificate
* @return An Observable stream containing the newly built AsyncHandler wrapper
*/
public static Observable<AsyncHandler> from(AsyncOAuth2Token _credentials, String _host, int _port, InputStream _certificate) {
return Observable
.create((Subscriber<? super AsyncHandler> t) -> {
try {
t.onNext(new AsyncHandler(
ApplicationManagerGrpc.newFutureStub(
NettyChannelBuilder
.forAddress(_host, _port)
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSslContexts
.forClient()
.trustManager(_certificate)
.build()
)
.intercept(new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(ClientCall.Listener<RespT> responseListener, Metadata headers) {
/**
* Add auth header here
*/
headers.put(Metadata.Key.of("token", Metadata.ASCII_STRING_MARSHALLER), _credentials.getRawToken());
super.start(responseListener, headers);
}
};
}
})
.build()
)
));
t.onCompleted();
} catch (Exception ex) {
t.onError(ex);
}
});
}