下面列出了怎么用io.grpc.ForwardingServerCallListener的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata,
ServerCallHandler<ReqT, RespT> serverCallHandler) {
String resourceName = serverCall.getMethodDescriptor().getFullMethodName();
// Remote address: serverCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
Entry entry = null;
try {
ContextUtil.enter(resourceName);
entry = SphU.entry(resourceName, EntryType.IN);
// Allow access, forward the call.
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
serverCallHandler.startCall(
new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(serverCall) {
@Override
public void close(Status status, Metadata trailers) {
super.close(status, trailers);
// Record the exception metrics.
if (!status.isOk()) {
recordException(status.asRuntimeException());
}
}
}, metadata)) {};
} catch (BlockException e) {
serverCall.close(FLOW_CONTROL_BLOCK, new Metadata());
return new ServerCall.Listener<ReqT>() {};
} finally {
if (entry != null) {
entry.exit();
}
ContextUtil.exit();
}
}
/**
* @param call: ServerCall
* @param requestHeaders : Metadata request headers
* @param next: ServerCallHandler
* @param <R>: Request
* @param <S>: Response
* @return {@link Contexts}
*/
@Override
public <R, S> Listener<R> interceptCall(
ServerCall<R, S> call, Metadata requestHeaders, ServerCallHandler<R, S> next) {
LOGGER.trace("Headers : {}", requestHeaders);
String methodName = call.getMethodDescriptor().getFullMethodName();
if (!ModelDBConstants.GRPC_HEALTH_CHECK_METHOD_NAME.equalsIgnoreCase(methodName)) {
LOGGER.info("methodName: {}", methodName);
}
Context context =
Context.current()
.withValue(METADATA_INFO, requestHeaders)
.withValue(METHOD_NAME, methodName);
ServerCall.Listener<R> delegate = Contexts.interceptCall(context, call, requestHeaders, next);
ACTIVE_REQUEST_COUNT.incrementAndGet();
LOGGER.trace("Active Request count {}", ACTIVE_REQUEST_COUNT.get());
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<R>(delegate) {
@Override
public void onCancel() {
ACTIVE_REQUEST_COUNT.decrementAndGet();
LOGGER.trace("Decrease Request count oon onCancel()");
LOGGER.trace("Active Request count {}", ACTIVE_REQUEST_COUNT.get());
super.onCancel();
}
@Override
public void onComplete() {
ACTIVE_REQUEST_COUNT.decrementAndGet();
LOGGER.trace("Decrease Request count on onComplete()");
LOGGER.trace("Active Request count {}", ACTIVE_REQUEST_COUNT.get());
super.onComplete();
}
};
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
assertNotNull(tracer.activeSpan());
call =
new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata headers) {
assertNotNull(tracer.activeSpan());
super.sendHeaders(headers);
}
@Override
public void sendMessage(RespT message) {
assertNotNull(tracer.activeSpan());
super.sendMessage(message);
}
@Override
public void close(Status status, Metadata trailers) {
assertNotNull(tracer.activeSpan());
super.close(status, trailers);
}
};
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
next.startCall(call, headers)) {
@Override
public void onReady() {
assertNotNull(tracer.activeSpan());
super.onReady();
}
@Override
public void onMessage(ReqT message) {
assertNotNull(tracer.activeSpan());
super.onMessage(message);
}
@Override
public void onHalfClose() {
assertNotNull(tracer.activeSpan());
super.onHalfClose();
}
@Override
public void onCancel() {
assertNotNull(tracer.activeSpan());
super.onCancel();
}
@Override
public void onComplete() {
assertNotNull(tracer.activeSpan());
super.onComplete();
}
};
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
final StreamExecutorRejectedExecutionRequestScheduler.Listener scheduleListener = this.scheduler.schedule(call);
if (logger.isInfoEnabled()) {
logger.info("Initialize schedule listener. {} {}, headers={}, initNumMessages={}, scheduler={}, listener={}",
this.name, call.getMethodDescriptor().getFullMethodName(), headers, initNumMessages, scheduler, scheduleListener);
}
final ServerCall.Listener<ReqT> listener = next.startCall(call, headers);
// Init MessageDeframer.pendingDeliveries
call.request(initNumMessages);
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(listener) {
@Override
public void onMessage(final ReqT message) {
try {
executor.execute(new Runnable() {
@Override
public void run() {
delegate().onMessage(message);
}
});
} catch (RejectedExecutionException ree) {
// Defense code, need log ?
scheduleListener.onRejectedExecution();
// logger.warn("Failed to request. Rejected execution, count={}", scheduleListener.getRejectedExecutionCount());
}
}
@Override
public void onCancel() {
scheduleListener.onCancel();
delegate().onCancel();
}
@Override
public void onComplete() {
scheduleListener.onCancel();
delegate().onComplete();
}
};
}