下面列出了io.grpc.protobuf.ProtoUtils#io.grpc.stub.ServerCalls 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void startServer(ServerCalls.UnaryMethod<String, String> method) {
try {
server = NettyServerBuilder.forPort(0)
.addService(ServerInterceptors.intercept(
ServerServiceDefinition.builder("service")
.addMethod(METHOD_DESCRIPTOR, ServerCalls.asyncUnaryCall(method))
.build(),
ConcurrencyLimitServerInterceptor.newBuilder(limiter)
.build())
)
.build()
.start();
channel = NettyChannelBuilder.forAddress("localhost", server.getPort())
.usePlaintext(true)
.build();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public ServerMethodDefinition<?, ?> lookupMethod(String methodName, @Nullable String authority) {
return ServerMethodDefinition.create(
MethodDescriptor.<byte[], byte[]>newBuilder()
.setRequestMarshaller(new ByteArrayMarshaller())
.setResponseMarshaller(new ByteArrayMarshaller())
.setType(MethodType.UNARY)
.setFullMethodName(methodName)
.build(),
ServerCalls.asyncUnaryCall(new ProxyUnaryMethod(backend, methodName)));
}
@SuppressWarnings("unchecked")
@Override
public void registerProcessor(final RpcProcessor processor) {
final String interest = processor.interest();
final Message reqIns = Requires.requireNonNull(this.parserClasses.get(interest), "null default instance: " + interest);
final MethodDescriptor<Message, Message> method = MethodDescriptor //
.<Message, Message>newBuilder() //
.setType(MethodDescriptor.MethodType.UNARY) //
.setFullMethodName(
MethodDescriptor.generateFullMethodName(processor.interest(), GrpcRaftRpcFactory.FIXED_METHOD_NAME)) //
.setRequestMarshaller(ProtoUtils.marshaller(reqIns)) //
.setResponseMarshaller(ProtoUtils.marshaller(this.marshallerRegistry.findResponseInstanceByRequest(interest))) //
.build();
final ServerCallHandler<Message, Message> handler = ServerCalls.asyncUnaryCall(
(request, responseObserver) -> {
final SocketAddress remoteAddress = RemoteAddressInterceptor.getRemoteAddress();
final Connection conn = ConnectionInterceptor.getCurrentConnection(this.closedEventListeners);
final RpcContext rpcCtx = new RpcContext() {
@Override
public void sendResponse(final Object responseObj) {
try {
responseObserver.onNext((Message) responseObj);
responseObserver.onCompleted();
} catch (final Throwable t) {
LOG.warn("[GRPC] failed to send response: {}.", t);
}
}
@Override
public Connection getConnection() {
if (conn == null) {
throw new IllegalStateException("fail to get connection");
}
return conn;
}
@Override
public String getRemoteAddress() {
// Rely on GRPC's capabilities, not magic (netty channel)
return remoteAddress != null ? remoteAddress.toString() : null;
}
};
final RpcProcessor.ExecutorSelector selector = processor.executorSelector();
Executor executor;
if (selector != null && request instanceof RpcRequests.AppendEntriesRequest) {
final RpcRequests.AppendEntriesRequest req = (RpcRequests.AppendEntriesRequest) request;
final RpcRequests.AppendEntriesRequestHeader.Builder header = RpcRequests.AppendEntriesRequestHeader //
.newBuilder() //
.setGroupId(req.getGroupId()) //
.setPeerId(req.getPeerId()) //
.setServerId(req.getServerId());
executor = selector.select(interest, header.build());
} else {
executor = processor.executor();
}
if (executor == null) {
executor = this.defaultExecutor;
}
if (executor != null) {
executor.execute(() -> processor.handleRequest(rpcCtx, request));
} else {
processor.handleRequest(rpcCtx, request);
}
});
final ServerServiceDefinition serviceDef = ServerServiceDefinition //
.builder(interest) //
.addMethod(method, handler) //
.build();
this.handlerRegistry
.addService(ServerInterceptors.intercept(serviceDef, this.serverInterceptors.toArray(new ServerInterceptor[0])));
}
@Test
@Ignore
public void simulation() throws IOException {
Semaphore sem = new Semaphore(20, true);
Server server = NettyServerBuilder.forPort(0)
.addService(ServerServiceDefinition.builder("service")
.addMethod(METHOD_DESCRIPTOR, ServerCalls.asyncUnaryCall((req, observer) -> {
try {
sem.acquire();
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
} finally {
sem.release();
}
observer.onNext("response");
observer.onCompleted();
}))
.build())
.build()
.start();
Limiter<GrpcClientRequestContext> limiter = new GrpcClientLimiterBuilder()
.blockOnLimit(true)
.build();
Channel channel = NettyChannelBuilder.forTarget("localhost:" + server.getPort())
.usePlaintext(true)
.intercept(new ConcurrencyLimitClientInterceptor(limiter))
.build();
AtomicLong counter = new AtomicLong();
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
System.out.println(" " + counter.getAndSet(0) + " : " + limiter.toString());
}, 1, 1, TimeUnit.SECONDS);
for (int i = 0 ; i < 10000000; i++) {
counter.incrementAndGet();
ClientCalls.futureUnaryCall(channel.newCall(METHOD_DESCRIPTOR, CallOptions.DEFAULT), "request");
}
}
private TestServer(final Builder builder) throws IOException {
this.semaphore = new Semaphore(builder.concurrency, true);
ServerCallHandler<String, String> handler = ServerCalls.asyncUnaryCall(new UnaryMethod<String, String>() {
volatile int segment = 0;
{
Executors.newSingleThreadExecutor().execute(() -> {
while (true) {
Segment s = builder.segments.get(0);
Uninterruptibles.sleepUninterruptibly(s.duration(), TimeUnit.NANOSECONDS);
segment = segment++ % builder.segments.size();
}
});
}
@Override
public void invoke(String req, StreamObserver<String> observer) {
try {
long delay = builder.segments.get(0).latency();
semaphore.acquire();
TimeUnit.MILLISECONDS.sleep(delay);
observer.onNext("response");
observer.onCompleted();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
observer.onError(Status.UNKNOWN.asRuntimeException());
} finally {
semaphore.release();
}
}
});
this.server = NettyServerBuilder.forPort(0)
.addService(ServerInterceptors.intercept(ServerServiceDefinition.builder("service")
.addMethod(METHOD_DESCRIPTOR, handler) // Rate = Limit / Latency = 2 / 0.02 = 100
.build(), ConcurrencyLimitServerInterceptor.newBuilder(builder.limiter)
.build()
))
.build()
.start();
}
@Override
public ServerServiceDefinition export(Class<?> protocol, Object protocolImpl) {
Class<?> serivce = protocol;
Object serviceRef = protocolImpl;
String serviceName = protocol.getName();
ServerServiceDefinition.Builder serviceDefBuilder =
ServerServiceDefinition.builder(serviceName);
List<Method> methods = ReflectUtils.findAllPublicMethods(serivce);
if (methods.isEmpty()) {
throw new IllegalStateException(
"protocolClass " + serviceName + " not have export method" + serivce);
}
final ConcurrentMap<String, AtomicInteger> concurrents = Maps.newConcurrentMap();
for (Method method : methods) {
MethodDescriptor<Message, Message> methodDescriptor =
GrpcUtil.createMethodDescriptor(serivce, method);
GrpcMethodType grpcMethodType = method.getAnnotation(GrpcMethodType.class);
switch (grpcMethodType.methodType()) {
case UNARY:
serviceDefBuilder.addMethod(methodDescriptor,
ServerCalls.asyncUnaryCall(new ServerInvocation(serviceRef, method, grpcMethodType,
providerUrl, concurrents, clientServerMonitor)));
break;
case CLIENT_STREAMING:
serviceDefBuilder.addMethod(methodDescriptor,
ServerCalls.asyncClientStreamingCall(new ServerInvocation(serviceRef, method,
grpcMethodType, providerUrl, concurrents, clientServerMonitor)));
break;
case SERVER_STREAMING:
serviceDefBuilder.addMethod(methodDescriptor,
ServerCalls.asyncServerStreamingCall(new ServerInvocation(serviceRef, method,
grpcMethodType, providerUrl, concurrents, clientServerMonitor)));
break;
case BIDI_STREAMING:
serviceDefBuilder.addMethod(methodDescriptor,
ServerCalls.asyncBidiStreamingCall(new ServerInvocation(serviceRef, method,
grpcMethodType, providerUrl, concurrents, clientServerMonitor)));
break;
default:
RpcServiceException rpcFramwork =
new RpcServiceException(RpcErrorMsgConstant.SERVICE_UNFOUND);
throw rpcFramwork;
}
}
log.info("'{}' service has been registered.", serviceName);
return serviceDefBuilder.build();
}