下面列出了io.grpc.ServerInterceptors#intercept ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void registerProcessor(ProviderConfig providerConfig, Invoker instance) {
Object ref = providerConfig.getRef();
try {
final ServerServiceDefinition serviceDef;
if (SofaProtoUtils.isProtoClass(ref)) {
BindableService bindableService = (BindableService) providerConfig.getRef();
serviceDef = bindableService.bindService();
} else {
GenericServiceImpl genericService = new GenericServiceImpl(providerConfig);
serviceDef = buildSofaServiceDef(genericService, providerConfig, instance);
}
List<TripleServerInterceptor> interceptorList = buildInterceptorChain(serviceDef);
ServerServiceDefinition serviceDefinition = ServerInterceptors.intercept(
serviceDef, interceptorList);
serviceInfo.put(providerConfig, serviceDefinition);
handlerRegistry.addService(serviceDefinition);
invokerCnt.incrementAndGet();
} catch (Exception e) {
LOGGER.error("Register triple service error", e);
serviceInfo.remove(providerConfig);
}
}
@Override
public ServerServiceDefinition getObject() throws Exception {
// WARNING singleton
// final ServerInterceptor interceptor = FactoryBean<ServerInterceptor>.getObject();
final ServerInterceptor interceptor = serverInterceptor;
if (interceptor == null) {
return newServerServiceDefinition();
}
final ServerServiceDefinition serverServiceDefinition = newServerServiceDefinition();
return ServerInterceptors.intercept(serverServiceDefinition, interceptor);
}
private ServerServiceDefinition newSpanBindableService(Executor executor) throws Exception {
FactoryBean<ServerInterceptor> interceptorFactory = new StreamExecutorServerInterceptorFactory(executor, 100, Executors.newSingleThreadScheduledExecutor(), 1000, 10);
((StreamExecutorServerInterceptorFactory) interceptorFactory).setBeanName("SpanService");
ServerInterceptor interceptor = interceptorFactory.getObject();
SpanService spanService = new SpanService(new MockDispatchHandler(), new DefaultServerRequestFactory());
return ServerInterceptors.intercept(spanService, interceptor);
}
/**
* Starts the server with given transport params.
*
* @param name UDS pathname or server name for {@link InProcessServerBuilder}
* @param useUds creates a UDS based server if true.
* @param useInterceptor if true, uses {@link SdsServerInterceptor} to grab & save Jwt Token.
*/
void startServer(String name, boolean useUds, boolean useInterceptor) throws IOException {
checkNotNull(name, "name");
discoveryService = new SecretDiscoveryServiceImpl();
ServerServiceDefinition serviceDefinition = discoveryService.bindService();
if (useInterceptor) {
serviceDefinition =
ServerInterceptors.intercept(serviceDefinition, new SdsServerInterceptor());
}
if (useUds) {
elg = new EpollEventLoopGroup();
boss = new EpollEventLoopGroup(1);
server =
NettyServerBuilder.forAddress(new DomainSocketAddress(name))
.bossEventLoopGroup(boss)
.workerEventLoopGroup(elg)
.channelType(EpollServerDomainSocketChannel.class)
.addService(serviceDefinition)
.directExecutor()
.build()
.start();
} else {
server =
InProcessServerBuilder.forName(name)
.addService(serviceDefinition)
.directExecutor()
.build()
.start();
}
}
@Test
public void statusRuntimeExceptionTransmitter() {
final Status expectedStatus = Status.UNAVAILABLE;
final Metadata expectedMetadata = new Metadata();
FakeServerCall<Void, Void> call =
new FakeServerCall<Void, Void>(expectedStatus, expectedMetadata);
final StatusRuntimeException exception =
new StatusRuntimeException(expectedStatus, expectedMetadata);
listener = new VoidCallListener() {
@Override
public void onMessage(Void message) {
throw exception;
}
@Override
public void onHalfClose() {
throw exception;
}
@Override
public void onCancel() {
throw exception;
}
@Override
public void onComplete() {
throw exception;
}
@Override
public void onReady() {
throw exception;
}
};
ServerServiceDefinition intercepted = ServerInterceptors.intercept(
serviceDefinition,
Arrays.asList(TransmitStatusRuntimeExceptionInterceptor.instance()));
// The interceptor should have handled the error by directly closing the ServerCall
// and the exception should not propagate to the method's caller
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers).onMessage(null);
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers).onCancel();
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers).onComplete();
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers).onHalfClose();
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers).onReady();
assertEquals(5, call.numCloses);
}
@Test
public void statusRuntimeExceptionTransmitterIgnoresClosedCalls() {
final Status expectedStatus = Status.UNAVAILABLE;
final Status unexpectedStatus = Status.CANCELLED;
final Metadata expectedMetadata = new Metadata();
FakeServerCall<Void, Void> call =
new FakeServerCall<Void, Void>(expectedStatus, expectedMetadata);
final StatusRuntimeException exception =
new StatusRuntimeException(expectedStatus, expectedMetadata);
listener = new VoidCallListener() {
@Override
public void onMessage(Void message) {
throw exception;
}
@Override
public void onHalfClose() {
throw exception;
}
};
ServerServiceDefinition intercepted = ServerInterceptors.intercept(
serviceDefinition,
Arrays.asList(TransmitStatusRuntimeExceptionInterceptor.instance()));
ServerCall.Listener<Void> callDoubleSreListener =
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers);
callDoubleSreListener.onMessage(null); // the only close with our exception
callDoubleSreListener.onHalfClose(); // should not trigger a close
// this listener closes the call when it is initialized with startCall
listener = new VoidCallListener() {
@Override
public void onCall(ServerCall<Void, Void> call, Metadata headers) {
call.close(unexpectedStatus, headers);
}
@Override
public void onHalfClose() {
throw exception;
}
};
ServerCall.Listener<Void> callClosedListener =
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers);
// call is already closed, does not match exception
callClosedListener.onHalfClose(); // should not trigger a close
assertEquals(1, call.numCloses);
}
private ServerServiceDefinition build(List<ServerInterceptor> commonInterceptors) {
return ServerInterceptors.intercept(
serviceDefinition,
CollectionsExt.merge(commonInterceptors, interceptors)
);
}
@Override
protected void configure(ServerBuilder sb) {
sb.workerGroup(EventLoopGroups.newEventLoopGroup(1), true);
sb.maxRequestLength(MAX_MESSAGE_SIZE);
sb.idleTimeoutMillis(0);
sb.http(0);
sb.https(0);
sb.tlsSelfSigned();
final ServerServiceDefinition interceptService =
ServerInterceptors.intercept(
new TestServiceImpl(Executors.newSingleThreadScheduledExecutor()),
new ServerInterceptor() {
@Override
public <REQ, RESP> Listener<REQ> interceptCall(
ServerCall<REQ, RESP> call,
Metadata requestHeaders,
ServerCallHandler<REQ, RESP> next) {
final HttpHeadersBuilder fromClient = HttpHeaders.builder();
MetadataUtil.fillHeaders(requestHeaders, fromClient);
CLIENT_HEADERS_CAPTURE.set(fromClient.build());
return next.startCall(
new SimpleForwardingServerCall<REQ, RESP>(call) {
@Override
public void close(Status status, Metadata trailers) {
trailers.merge(requestHeaders);
super.close(status, trailers);
}
}, requestHeaders);
}
});
sb.serviceUnder("/",
GrpcService.builder()
.addService(interceptService)
.setMaxInboundMessageSizeBytes(MAX_MESSAGE_SIZE)
.setMaxOutboundMessageSizeBytes(MAX_MESSAGE_SIZE)
.useClientTimeoutHeader(false)
.build()
.decorate((client, ctx, req) -> {
final HttpResponse res = client.serve(ctx, req);
return new FilteredHttpResponse(res) {
private boolean headersReceived;
@Override
protected HttpObject filter(HttpObject obj) {
if (obj instanceof HttpHeaders) {
if (!headersReceived) {
headersReceived = true;
} else {
SERVER_TRAILERS_CAPTURE.set((HttpHeaders) obj);
}
}
return obj;
}
};
}));
}
private ServerServiceDefinition handlerInterceptorBind(BindableService handler, ServerInterceptor interceptor) {
return ServerInterceptors.intercept(handler, interceptor);
}
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) {
allArguments[0] = ServerInterceptors.intercept((ServerServiceDefinition) allArguments[0], new ServerInterceptor());
}
private ServerServiceDefinition newStatBindableService(Executor executor) throws Exception {
FactoryBean<ServerInterceptor> interceptorFactory = new StreamExecutorServerInterceptorFactory(executor, 100, Executors.newSingleThreadScheduledExecutor(), 1000, 10);
ServerInterceptor interceptor = interceptorFactory.getObject();
StatService statService = new StatService(new MockDispatchHandler(), new DefaultServerRequestFactory());
return ServerInterceptors.intercept(statService, interceptor);
}
@Test
public void statusRuntimeExceptionTransmitter() {
final Status expectedStatus = Status.UNAVAILABLE;
final Metadata expectedMetadata = new Metadata();
FakeServerCall<Void, Void> call =
new FakeServerCall<>(expectedStatus, expectedMetadata);
final StatusRuntimeException exception =
new StatusRuntimeException(expectedStatus, expectedMetadata);
listener = new VoidCallListener() {
@Override
public void onMessage(Void message) {
throw exception;
}
@Override
public void onHalfClose() {
throw exception;
}
@Override
public void onCancel() {
throw exception;
}
@Override
public void onComplete() {
throw exception;
}
@Override
public void onReady() {
throw exception;
}
};
ServerServiceDefinition intercepted = ServerInterceptors.intercept(
serviceDefinition,
Arrays.asList(TransmitStatusRuntimeExceptionInterceptor.instance()));
// The interceptor should have handled the error by directly closing the ServerCall
// and the exception should not propagate to the method's caller
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers).onMessage(null);
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers).onCancel();
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers).onComplete();
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers).onHalfClose();
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers).onReady();
assertEquals(5, call.numCloses);
}
@Test
public void statusRuntimeExceptionTransmitterIgnoresClosedCalls() {
final Status expectedStatus = Status.UNAVAILABLE;
final Status unexpectedStatus = Status.CANCELLED;
final Metadata expectedMetadata = new Metadata();
FakeServerCall<Void, Void> call =
new FakeServerCall<>(expectedStatus, expectedMetadata);
final StatusRuntimeException exception =
new StatusRuntimeException(expectedStatus, expectedMetadata);
listener = new VoidCallListener() {
@Override
public void onMessage(Void message) {
throw exception;
}
@Override
public void onHalfClose() {
throw exception;
}
};
ServerServiceDefinition intercepted = ServerInterceptors.intercept(
serviceDefinition,
Arrays.asList(TransmitStatusRuntimeExceptionInterceptor.instance()));
ServerCall.Listener<Void> callDoubleSreListener =
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers);
callDoubleSreListener.onMessage(null); // the only close with our exception
callDoubleSreListener.onHalfClose(); // should not trigger a close
// this listener closes the call when it is initialized with startCall
listener = new VoidCallListener() {
@Override
public void onCall(ServerCall<Void, Void> call, Metadata headers) {
call.close(unexpectedStatus, headers);
}
@Override
public void onHalfClose() {
throw exception;
}
};
ServerCall.Listener<Void> callClosedListener =
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers);
// call is already closed, does not match exception
callClosedListener.onHalfClose(); // should not trigger a close
assertEquals(1, call.numCloses);
}
/**
* Add tracing to all requests made to this service.
*
* @param serviceDef of the service to intercept
* @return the serviceDef with a tracing interceptor
*/
public ServerServiceDefinition intercept(ServerServiceDefinition serviceDef) {
return ServerInterceptors.intercept(serviceDef, this);
}
/**
* Add tracing to all requests made to this service.
*
* @param bindableService to intercept
* @return the serviceDef with a tracing interceptor
*/
public ServerServiceDefinition intercept(BindableService bindableService) {
return ServerInterceptors.intercept(bindableService, this);
}