下面列出了io.grpc.ServerInterceptor#io.grpc.Server 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void stop() throws InterruptedException {
Server s = server;
if (s == null) {
throw new IllegalStateException("gRPC Listener Server is already stopped");
}
server = null;
s.shutdown();
if (s.awaitTermination(1, TimeUnit.SECONDS)) {
log.debug("gRPC Listener Server stopped");
return;
}
s.shutdownNow();
if (s.awaitTermination(1, TimeUnit.SECONDS)) {
return;
}
throw new RuntimeException("Unable to shutdown gRPC Listener Server");
}
void run() throws Exception {
Server server = ServerBuilder.forPort(0).addService(new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
Metadata trailers = new Metadata();
trailers.put(DEBUG_INFO_TRAILER_KEY, DEBUG_INFO);
responseObserver.onError(Status.INTERNAL.withDescription(DEBUG_DESC)
.asRuntimeException(trailers));
}
}).build().start();
channel =
ManagedChannelBuilder.forAddress("localhost", server.getPort()).usePlaintext().build();
blockingCall();
futureCallDirect();
futureCallCallback();
asyncCall();
advancedAsyncCall();
channel.shutdown();
server.shutdown();
channel.awaitTermination(1, TimeUnit.SECONDS);
server.awaitTermination();
}
/**
* Provide a {@link GenieWebRpcInfo} bean if one hasn't already been defined.
*
* @param server The gRPC {@link Server} instance. Must not be {@link Server#isShutdown()} or
* {@link Server#isTerminated()}. Must be able to get the port the server is listening on.
* @return A {@link GenieWebRpcInfo} instance
* @throws IllegalStateException When an instance can't be created
*/
@Bean
@ConditionalOnMissingBean(
{
GenieWebRpcInfo.class
}
)
public GenieWebRpcInfo genieWebRpcInfo(final Server server) throws IllegalStateException {
if (server.isShutdown() || server.isTerminated()) {
throw new IllegalStateException("gRPC server is already shut down. Can't start.");
} else {
final int port = GRpcServerUtils.startServer(server);
if (port < 1) {
throw new IllegalStateException("gRPC server started on illegal port: " + port);
}
return new GenieWebRpcInfo(port);
}
}
@Test
public void singleServerCleanup() throws Throwable {
// setup
Server server = mock(Server.class);
Statement statement = mock(Statement.class);
InOrder inOrder = inOrder(statement, server);
GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
// run
grpcCleanup.register(server);
boolean awaitTerminationFailed = false;
try {
// will throw because channel.awaitTermination(long, TimeUnit) will return false;
grpcCleanup.apply(statement, null /* description*/).evaluate();
} catch (AssertionError e) {
awaitTerminationFailed = true;
}
// verify
assertTrue(awaitTerminationFailed);
inOrder.verify(statement).evaluate();
inOrder.verify(server).shutdown();
inOrder.verify(server).awaitTermination(anyLong(), any(TimeUnit.class));
inOrder.verify(server).shutdownNow();
}
@VisibleForTesting
ServerWatcherRunnable(
Server server,
long maxIdleSeconds,
boolean shutdownOnLowSysMem,
CommandManager commandManager,
LowMemoryChecker lowMemoryChecker) {
Preconditions.checkArgument(
maxIdleSeconds > 0,
"Expected to only check idleness when --max_idle_secs > 0 but it was %s",
maxIdleSeconds);
this.server = server;
this.maxIdleSeconds = maxIdleSeconds;
this.commandManager = commandManager;
this.lowMemoryChecker = lowMemoryChecker;
this.shutdownOnLowSysMem = shutdownOnLowSysMem;
}
public static void main(String[] arg) {
try {
Server server = ServerBuilder.forPort(8080)
.addService(new EmployeeService())
.build();
System.out.println("Starting gRPC Server Service ...");
server.start();
System.out.println("Server has started at port: 8080");
System.out.println("Following services are available: ");
server.getServices().stream()
.forEach(
s -> System.out.println("Service Name: " + s.getServiceDescriptor().getName())
);
server.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Starts and shuts down an in-process gRPC service based on the {@code serviceImpl} provided,
* while allowing a test to execute using the {@link GrpcClient}.
*
* @param serviceImpl implementation of the Spanner service. Typically, just the methods needed to
* execute the test.
* @param clientConsumer consumer of the {@link GrpcClient} - the class under test.
* @return a Mockito spy for the gRPC service for verification.
*/
private SpannerImplBase doTest(SpannerGrpc.SpannerImplBase serviceImpl,
Consumer<GrpcClient> clientConsumer)
throws IOException {
SpannerGrpc.SpannerImplBase serviceImplSpy = spy(serviceImpl);
String serverName = InProcessServerBuilder.generateName();
Server server = InProcessServerBuilder
.forName(serverName).directExecutor().addService(serviceImplSpy).build().start();
ManagedChannel channel =
InProcessChannelBuilder.forName(serverName).directExecutor().build();
clientConsumer.accept(new GrpcClient(SpannerGrpc.newStub(channel), null, null));
channel.shutdown();
server.shutdown();
return serviceImplSpy;
}
@Test
public void singleServerCleanup() throws Throwable {
// setup
Server server = mock(Server.class);
Statement statement = mock(Statement.class);
InOrder inOrder = inOrder(statement, server);
GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
// run
grpcCleanup.register(server);
boolean awaitTerminationFailed = false;
try {
// will throw because channel.awaitTermination(long, TimeUnit) will return false;
grpcCleanup.apply(statement, null /* description*/).evaluate();
} catch (AssertionError e) {
awaitTerminationFailed = true;
}
// verify
assertTrue(awaitTerminationFailed);
inOrder.verify(statement).evaluate();
inOrder.verify(server).shutdown();
inOrder.verify(server).awaitTermination(anyLong(), any(TimeUnit.class));
inOrder.verify(server).shutdownNow();
}
public Server startServer() throws IOException {
ServerInterceptor headersInterceptor = new TracingMetadataUtils.ServerHeadersInterceptor();
NettyServerBuilder b =
NettyServerBuilder.forPort(workerOptions.listenPort)
.addService(ServerInterceptors.intercept(actionCacheServer, headersInterceptor))
.addService(ServerInterceptors.intercept(bsServer, headersInterceptor))
.addService(ServerInterceptors.intercept(casServer, headersInterceptor))
.addService(ServerInterceptors.intercept(capabilitiesServer, headersInterceptor));
if (workerOptions.tlsCertificate != null) {
b.sslContext(getSslContextBuilder(workerOptions).build());
}
if (execServer != null) {
b.addService(ServerInterceptors.intercept(execServer, headersInterceptor));
} else {
logger.atInfo().log("Execution disabled, only serving cache requests");
}
Server server = b.build();
logger.atInfo().log("Starting gRPC server on port %d", workerOptions.listenPort);
server.start();
return server;
}
/**
* User beans override default beans.
*/
@Test
void expectedBeansExistWhenUserOverrides() {
this.contextRunner
.withUserConfiguration(UserConfig.class)
.run(
context -> {
Assertions.assertThat(context).hasSingleBean(GRpcServerProperties.class);
Assertions.assertThat(context).hasSingleBean(Server.class);
Assertions.assertThat(context).hasSingleBean(GRpcServerManager.class);
Assertions.assertThat(context.containsBean("userServer")).isTrue();
Assertions.assertThat(context.containsBean("userServerManager")).isTrue();
Assertions.assertThat(context.containsBean("gRpcServer")).isFalse();
Assertions.assertThat(context.containsBean("gRpcServerManager")).isFalse();
}
);
}
@Test
void shouldConsiderTransportConfigurers() {
var service = ServerServiceDefinition.builder("test").build();
new ApplicationContextRunner()
.withInitializer((ApplicationContextInitializer) new GRPCConfiguration())
.withPropertyValues(
"spring.profiles.active: gateway",
"grpc.port: 0"
)
.withInitializer(ctx -> {
var context = (GenericApplicationContext) ctx;
context.registerBean(LiiklusService.class, () -> Mockito.mock(LiiklusService.class));
context.registerBean(GRPCLiiklusTransportConfigurer.class, () -> builder -> builder.addService(() -> service));
})
.run(context -> {
assertThat(context).getBeans(GRPCLiiklusTransportConfigurer.class).isNotEmpty();
assertThat(context)
.getBean(Server.class)
.satisfies(server -> {
assertThat(server.getServices()).contains(service);
});
});
}
private void stopServer() throws InterruptedException {
Server s = server;
if (s == null) {
throw new IllegalStateException("Already stopped");
}
server = null;
s.shutdown();
if (s.awaitTermination(1, TimeUnit.SECONDS)) {
return;
}
s.shutdownNow();
if (s.awaitTermination(1, TimeUnit.SECONDS)) {
return;
}
throw new RuntimeException("Unable to shutdown server");
}
public static void main(String[] args) throws Exception {
Server server = InProcessServerBuilder
.forName("ResumeStreamReactorDemo")
.addService(new FlakyNumberService())
.build()
.start();
ManagedChannel channel = InProcessChannelBuilder
.forName("ResumeStreamReactorDemo")
.usePlaintext()
.build();
ReactorNumbersGrpc.ReactorNumbersStub stub = ReactorNumbersGrpc.newReactorStub(channel);
// Keep retrying the stream until you get ten in a row with no error
new GrpcRetryFlux<>(() -> stub.oneToMany(Mono.just(Message.getDefaultInstance())))
.map(Message::getNumber)
.subscribe(System.out::println);
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
channel.shutdownNow();
server.shutdownNow();
}
private static Server startServer(String bindAddress, int port, boolean https, File confDir,
@Nullable ExecutorService confDirWatchExecutor, DownstreamServiceImpl downstreamService,
CollectorServiceImpl collectorService) throws IOException {
NettyServerBuilder builder =
NettyServerBuilder.forAddress(new InetSocketAddress(bindAddress, port));
if (https) {
builder.sslContext(
DelegatingSslContext.create(confDir, checkNotNull(confDirWatchExecutor)));
}
return builder.addService(collectorService.bindService())
.addService(downstreamService.bindService())
// need to override default max message size of 4mb until streaming is implemented
// for DownstreamService.EntriesResponse and FullTraceResponse
.maxInboundMessageSize(64 * 1024 * 1024)
// aggressive keep alive is used by agent to detect silently dropped connections
// (see org.glowroot.agent.central.CentralConnection)
.permitKeepAliveTime(20, SECONDS)
// aggressive max connection age forces agents to re-resolve DNS often for DNS-based
// load balancing (e.g. to pick up and spread load across new central collectors)
.maxConnectionAge(20, MINUTES)
.build()
.start();
}
/**
* Shutdown the gRPC {@link Server} when this object is closed.
*/
@Override
public void close() throws Exception {
final Server server = server();
if (server != null) {
server.shutdown();
try {
// TODO: Maybe we should catch the InterruptedException from this?
server.awaitTermination(shutdownWaitTimeInMillis, TimeUnit.MILLISECONDS);
} finally {
server.shutdownNow();
this.server = null;
}
}
}
@Test
public void startDoesNotStartServerWithoutServices() throws Exception {
final int port = ThreadLocalRandom.current().nextInt(1000, 10000);
final long shutdownWaitTimeInMillis = ThreadLocalRandom.current().nextLong(1000, 10000);
final ApplicationContext applicationContext = mock(ApplicationContext.class);
final Server server = mock(Server.class, new TriesToReturnSelf());
final GrpcServerFactory factory = mock(GrpcServerFactory.class);
when(server.getPort()).thenReturn(port);
// Configure application context to contain no gRPC services.
when(applicationContext.getBeansWithAnnotation(eq(GrpcService.class))).thenReturn(ImmutableMap.of());
GrpcServerHost runner = new GrpcServerHost(port, shutdownWaitTimeInMillis, factory);
runner.setApplicationContext(applicationContext);
assertThatThrownBy(runner::start).isInstanceOf(IOException.class);
// Make sure the server builder was not used.
verify(factory, never()).buildServerForServices(anyInt(), any());
assertThat(runner.server()).isNull();
}
@Override
public Void call() throws Exception {
OzoneConfiguration ozoneConfiguration = createOzoneConfiguration();
CsiConfig csiConfig = ozoneConfiguration.getObject(CsiConfig.class);
OzoneClient rpcClient = OzoneClientFactory.getRpcClient(ozoneConfiguration);
EpollEventLoopGroup group = new EpollEventLoopGroup();
if (csiConfig.getVolumeOwner().isEmpty()) {
throw new IllegalArgumentException(
"ozone.csi.owner is not set. You should set this configuration "
+ "variable to define which user should own all the created "
+ "buckets.");
}
Server server =
NettyServerBuilder
.forAddress(new DomainSocketAddress(csiConfig.getSocketPath()))
.channelType(EpollServerDomainSocketChannel.class)
.workerEventLoopGroup(group)
.bossEventLoopGroup(group)
.addService(new IdentitiyService())
.addService(new ControllerService(rpcClient,
csiConfig.getDefaultVolumeSize()))
.addService(new NodeService(csiConfig))
.build();
server.start();
server.awaitTermination();
rpcClient.close();
return null;
}
public static void main(String[] args) throws IOException, InterruptedException {
Server server = ServerBuilder.forPort(8081)
.addService(new GreetingServiceImpl())
.build();
server.start();
server.awaitTermination();
}
@Test
public void shutdownGracefullyThrowsIfMaxWaitTimeInMillisIsLessThanZero() {
final long maxWaitTimeInMillis = ThreadLocalRandom.current().nextLong(Long.MIN_VALUE, 0);
final Server server = mock(Server.class);
assertThatThrownBy(() -> Servers.shutdownGracefully(server, maxWaitTimeInMillis))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("timeout must be greater than 0");
}
@Test
public void registerNullServerThrowsNpe() {
Server server = null;
GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
thrown.expect(NullPointerException.class);
thrown.expectMessage("server");
grpcCleanup.register(server);
}
/**
* Default beans should be created.
*/
@Test
void expectedBeansExistIfGrpcEnabledAndNoUserBeans() {
this.contextRunner
.run(
context -> {
Assertions.assertThat(context).hasSingleBean(GRpcServerProperties.class);
Assertions.assertThat(context).hasSingleBean(Server.class);
Assertions.assertThat(context).hasSingleBean(GRpcServerManager.class);
}
);
}
static public void main(String [] args) throws IOException, InterruptedException {
Brave brave = Constant.brave("greeting-service");
Server greetingServer = ServerBuilder.forPort(8080)
.addService(ServerInterceptors.intercept(new GreetingServiceImpl(),
new BraveGrpcServerInterceptor(brave),
MonitoringServerInterceptor.create(Configuration.allMetrics())))
.build();
greetingServer.start();
System.out.println("Server started!");
greetingServer.awaitTermination();
}
@Override
public RpcServer createRpcServer(final Endpoint endpoint, final ConfigHelper<RpcServer> helper) {
final int port = Requires.requireNonNull(endpoint, "endpoint").getPort();
Requires.requireTrue(port > 0 && port < 0xFFFF, "port out of range:" + port);
final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
final Server server = ServerBuilder.forPort(port) //
.fallbackHandlerRegistry(handlerRegistry) //
.directExecutor() //
.build();
final RpcServer rpcServer = new GrpcServer(server, handlerRegistry, this.parserClasses, getMarshallerRegistry());
if (helper != null) {
helper.config(rpcServer);
}
return rpcServer;
}
@Bean
Server mockGRpcServer() {
final Server server = Mockito.mock(Server.class);
Mockito.when(server.isTerminated()).thenReturn(false);
Mockito.when(server.isShutdown()).thenReturn(true);
return server;
}
public GrpcServer(Server server, MutableHandlerRegistry handlerRegistry, Map<String, Message> parserClasses,
MarshallerRegistry marshallerRegistry) {
this.server = server;
this.handlerRegistry = handlerRegistry;
this.parserClasses = parserClasses;
this.marshallerRegistry = marshallerRegistry;
registerDefaultServerInterceptor();
}
private static void startServerOnPort(int port) {
ServerBuilder<?> serverBuilder = ServerBuilder.forPort(port);
serverBuilder.addService(new MyTxEventService(connected.get(port), eventsMap.get(port), delays.get(port)));
Server server = serverBuilder.build();
try {
server.start();
servers.put(port, server);
} catch (IOException e) {
fail(e.getMessage());
}
}
private int killServerReceivedMessage() {
for (int port : eventsMap.keySet()) {
if (!eventsMap.get(port).isEmpty()) {
Server serverToKill = servers.get(port);
serverToKill.shutdownNow();
return port;
}
}
throw new IllegalStateException("None of the servers received any message");
}
/**
* Initiates an orderly shutdown of the grpc server and releases the references to the server. This call does not
* wait for the server to be completely shut down.
*/
protected void stopAndReleaseGrpcServer() {
final Server localServer = this.server;
if (localServer != null) {
localServer.shutdown();
this.server = null;
log.info("gRPC server shutdown.");
}
}
public static void main(String[] args) throws Exception {
logger.info("Starting server on port " + DEMO_SERVER_PORT);
Server server = ServerBuilder.forPort(DEMO_SERVER_PORT)
.addService(ProtoReflectionService.newInstance())
.addService(new HelloServiceImpl())
.build()
.start();
server.awaitTermination();
}
/**
* main method (and init()) is not actually needed, but helpful for debugging
* purposes
*
* @param args
* @throws Exception
*/
public static void main(final String[] args) throws Exception {
init();
Server server = ServerBuilder.forPort(8080).addService(new PCSBasedOptimizerServiceImpl(evaluator, input)).build();
server.start();
server.awaitTermination();
}