下面列出了io.grpc.Server#shutdown ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void stop() throws InterruptedException {
Server s = server;
if (s == null) {
throw new IllegalStateException("gRPC Listener Server is already stopped");
}
server = null;
s.shutdown();
if (s.awaitTermination(1, TimeUnit.SECONDS)) {
log.debug("gRPC Listener Server stopped");
return;
}
s.shutdownNow();
if (s.awaitTermination(1, TimeUnit.SECONDS)) {
return;
}
throw new RuntimeException("Unable to shutdown gRPC Listener Server");
}
void run() throws Exception {
// Port 0 means that the operating system will pick an available port to use.
Server server = ServerBuilder.forPort(0).addService(new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
responseObserver.onError(Status.INTERNAL
.withDescription("Eggplant Xerxes Crybaby Overbite Narwhal").asRuntimeException());
}
}).build().start();
channel =
ManagedChannelBuilder.forAddress("localhost", server.getPort()).usePlaintext().build();
blockingCall();
futureCallDirect();
futureCallCallback();
asyncCall();
advancedAsyncCall();
channel.shutdown();
server.shutdown();
channel.awaitTermination(1, TimeUnit.SECONDS);
server.awaitTermination();
}
void run() throws Exception {
Server server = ServerBuilder.forPort(0).addService(new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
Metadata trailers = new Metadata();
trailers.put(DEBUG_INFO_TRAILER_KEY, DEBUG_INFO);
responseObserver.onError(Status.INTERNAL.withDescription(DEBUG_DESC)
.asRuntimeException(trailers));
}
}).build().start();
channel =
ManagedChannelBuilder.forAddress("localhost", server.getPort()).usePlaintext().build();
blockingCall();
futureCallDirect();
futureCallCallback();
asyncCall();
advancedAsyncCall();
channel.shutdown();
server.shutdown();
channel.awaitTermination(1, TimeUnit.SECONDS);
server.awaitTermination();
}
/**
* Starts and shuts down an in-process gRPC service based on the {@code serviceImpl} provided,
* while allowing a test to execute using the {@link GrpcClient}.
*
* @param serviceImpl implementation of the Spanner service. Typically, just the methods needed to
* execute the test.
* @param clientConsumer consumer of the {@link GrpcClient} - the class under test.
* @return a Mockito spy for the gRPC service for verification.
*/
private SpannerImplBase doTest(SpannerGrpc.SpannerImplBase serviceImpl,
Consumer<GrpcClient> clientConsumer)
throws IOException {
SpannerGrpc.SpannerImplBase serviceImplSpy = spy(serviceImpl);
String serverName = InProcessServerBuilder.generateName();
Server server = InProcessServerBuilder
.forName(serverName).directExecutor().addService(serviceImplSpy).build().start();
ManagedChannel channel =
InProcessChannelBuilder.forName(serverName).directExecutor().build();
clientConsumer.accept(new GrpcClient(SpannerGrpc.newStub(channel), null, null));
channel.shutdown();
server.shutdown();
return serviceImplSpy;
}
private void stopServer() throws InterruptedException {
Server s = server;
if (s == null) {
throw new IllegalStateException("Already stopped");
}
server = null;
s.shutdown();
if (s.awaitTermination(1, TimeUnit.SECONDS)) {
return;
}
s.shutdownNow();
if (s.awaitTermination(1, TimeUnit.SECONDS)) {
return;
}
throw new RuntimeException("Unable to shutdown server");
}
/**
* Shutdown the gRPC {@link Server} when this object is closed.
*/
@Override
public void close() throws Exception {
final Server server = server();
if (server != null) {
server.shutdown();
try {
// TODO: Maybe we should catch the InterruptedException from this?
server.awaitTermination(shutdownWaitTimeInMillis, TimeUnit.MILLISECONDS);
} finally {
server.shutdownNow();
this.server = null;
}
}
}
/**
* Attempt to {@link Server#shutdown()} the {@link Server} gracefully. If the max wait time is exceeded, give up and
* perform a hard {@link Server#shutdownNow()}.
*
* @param server the server to be shutdown
* @param timeout the max amount of time to wait for graceful shutdown to occur
* @param unit the time unit denominating the shutdown timeout
* @return the given server
* @throws InterruptedException if waiting for termination is interrupted
*/
public static Server shutdownGracefully(Server server, long timeout, TimeUnit unit) throws InterruptedException {
Preconditions.checkNotNull(server, "server");
Preconditions.checkArgument(timeout > 0, "timeout must be greater than 0");
Preconditions.checkNotNull(unit, "unit");
server.shutdown();
try {
server.awaitTermination(timeout, unit);
} finally {
server.shutdownNow();
}
return server;
}
void run() throws Exception {
// Port 0 means that the operating system will pick an available port to use.
Server server = ServerBuilder.forPort(0).addService(new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
responseObserver.onError(Status.INTERNAL
.withDescription("Eggplant Xerxes Crybaby Overbite Narwhal").asRuntimeException());
}
}).build().start();
channel =
ManagedChannelBuilder.forAddress("localhost", server.getPort()).usePlaintext().build();
blockingCall();
futureCallDirect();
futureCallCallback();
asyncCall();
advancedAsyncCall();
channel.shutdown();
server.shutdown();
channel.awaitTermination(1, TimeUnit.SECONDS);
server.awaitTermination();
}
void run() throws Exception {
Server server = ServerBuilder.forPort(0).addService(new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
Metadata trailers = new Metadata();
trailers.put(DEBUG_INFO_TRAILER_KEY, DEBUG_INFO);
responseObserver.onError(Status.INTERNAL.withDescription(DEBUG_DESC)
.asRuntimeException(trailers));
}
}).build().start();
channel =
ManagedChannelBuilder.forAddress("localhost", server.getPort()).usePlaintext().build();
blockingCall();
futureCallDirect();
futureCallCallback();
asyncCall();
advancedAsyncCall();
channel.shutdown();
server.shutdown();
channel.awaitTermination(1, TimeUnit.SECONDS);
server.awaitTermination();
}
/**
* The following method shuts down an {@code Server} in two
* phases, first by calling {@code shutdown} to reject incoming tasks,
* and then calling {@code shutdownNow}, if necessary, to cancel any
* lingering tasks.
*/
public static boolean shutdownAndAwaitTermination(final Server server, final long timeoutMillis) {
if (server == null) {
return true;
}
// disable new tasks from being submitted
server.shutdown();
final TimeUnit unit = TimeUnit.MILLISECONDS;
final long phaseOne = timeoutMillis / 5;
try {
// wait a while for existing tasks to terminate
if (server.awaitTermination(phaseOne, unit)) {
return true;
}
server.shutdownNow();
// wait a while for tasks to respond to being cancelled
if (server.awaitTermination(timeoutMillis - phaseOne, unit)) {
return true;
}
LOG.warn("Fail to shutdown grpc server: {}.", server);
} catch (final InterruptedException e) {
// (Re-)cancel if current thread also interrupted
server.shutdownNow();
// preserve interrupt status
Thread.currentThread().interrupt();
}
return false;
}
/**
* Initiates an orderly shutdown of the grpc server and releases the references to the server. This call does not
* wait for the server to be completely shut down.
*/
protected void stopAndReleaseGrpcServer() {
final Server localServer = this.server;
if (localServer != null) {
localServer.shutdown();
this.server = null;
log.info("gRPC server shutdown.");
}
}
@Test
public void serverRunsAndRespondsCorrectly() throws ExecutionException,
IOException,
InterruptedException,
TimeoutException {
final String name = UUID.randomUUID().toString();
Server server = ServerBuilder.forPort(9999)
.addService(new GreeterImpl())
.build();
server.start();
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", server.getPort())
.usePlaintext(true)
.build();
GreeterGrpc8.GreeterCompletableFutureStub stub = GreeterGrpc8.newCompletableFutureStub(channel);
CompletableFuture<HelloResponse> response = stub.sayHello(HelloRequest.newBuilder().setName(name).build());
await().atMost(3, TimeUnit.SECONDS).until(() -> response.isDone() && response.get().getMessage().contains(name));
channel.shutdown();
channel.awaitTermination(1, TimeUnit.MINUTES);
channel.shutdownNow();
server.shutdown();
server.awaitTermination(1, TimeUnit.MINUTES);
server.shutdownNow();
}
/**
* Initiates an orderly shutdown of the grpc server and releases the references to the server. This call does not
* wait for the server to be completely shut down.
*/
protected void stopAndReleaseGrpcServer() {
final Server localServer = this.server;
if (localServer != null) {
localServer.shutdown();
this.server = null;
log.info("gRPC server shutdown.");
}
}
protected void stopAndReleaseGrpcServer() {
Server localServer = this.server;
if (localServer != null) {
localServer.shutdown();
this.server = null;
logger.info("gRPC server stopped");
}
}
@AfterClass
public static void tearDown() throws Exception {
for(Server server: servers.values()) {
server.shutdown();
}
}
@AfterClass
public static void tearDown() {
for(Server server: servers.values()) {
server.shutdown();
}
}
@AfterClass
public static void shutdownServer() {
for(Server server: servers.values()) {
server.shutdown();
}
}
@Ignore
// issue when moved up to latest netty http://openjdk.5641.n7.nabble.com/sun-security-ssl-ProtocolVersion-valueOf-in-Java8-and-TLSv1-3-td350186.html
@Test
public void selfSignedTLSCertTest() throws Exception {
AtomicBoolean handshakeOccured = new AtomicBoolean(false);
TLSCertificateBuilder certBuilder = new TLSCertificateBuilder();
TLSCertificateKeyPair serverCert = certBuilder.serverCert("localhost");
File serverCertFile = createFile("server-cert.pem", serverCert.getCertPEMBytes());
File serverKeyFile = createFile("server-key.pem", serverCert.getKeyPemBytes());
TLSCertificateKeyPair clientCert = certBuilder.clientCert();
File clientCertFile = createFile("client-cert.pem", clientCert.getCertPEMBytes());
File clientKeyFile = createFile("client-key.pem", clientCert.getKeyPemBytes());
Server server = NettyServerBuilder.forPort(0).addService(new MockEndorser()).
intercept(mutualTLSInterceptor(clientCert.getCertDERBytes(), handshakeOccured))
.sslContext(GrpcSslContexts.forServer(serverCertFile, serverKeyFile).protocols(TLS_PROTOCOL)
.trustManager(clientCertFile)
.clientAuth(ClientAuth.REQUIRE)
.build()).build();
server.start();
if (vendor.contains("IBM")) {
// The TLS handshake doesn't work with IBM JRE, skipping
server.shutdown();
return;
}
NettyChannelBuilder channelBuilder = NettyChannelBuilder
.forAddress("localhost", server.getPort())
.sslContext(getSslContextBuilder(clientCertFile, clientKeyFile, serverCertFile).protocols(TLS_PROTOCOL).build())
.negotiationType(NegotiationType.TLS);
ManagedChannel chan = channelBuilder.build();
ProposalPackage.SignedProposal prop = ProposalPackage.SignedProposal.getDefaultInstance();
EndorserGrpc.newBlockingStub(chan).processProposal(prop);
// Ensure that TLS handshake occurred
Assert.assertTrue("Handshake didn't occur", handshakeOccured.get());
chan.shutdown();
server.shutdown();
}