下面列出了io.grpc.Server#shutdownNow ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void stop() throws InterruptedException {
Server s = server;
if (s == null) {
throw new IllegalStateException("gRPC Listener Server is already stopped");
}
server = null;
s.shutdown();
if (s.awaitTermination(1, TimeUnit.SECONDS)) {
log.debug("gRPC Listener Server stopped");
return;
}
s.shutdownNow();
if (s.awaitTermination(1, TimeUnit.SECONDS)) {
return;
}
throw new RuntimeException("Unable to shutdown gRPC Listener Server");
}
@After
public void teardown() throws Exception {
// All scheduled tasks have been accounted for
assertThat(clock.getPendingTasks()).isEmpty();
// Health-check streams are usually not closed in the tests because handleSubchannelState() is
// faked. Force closing for clean up.
for (Server server : servers) {
server.shutdownNow();
assertThat(server.awaitTermination(1, TimeUnit.SECONDS)).isTrue();
}
for (ManagedChannel channel : channels) {
channel.shutdownNow();
assertThat(channel.awaitTermination(1, TimeUnit.SECONDS)).isTrue();
}
for (HealthImpl impl : healthImpls) {
assertThat(impl.checkCalled).isFalse();
}
}
static TestServerContext fromGrpcJavaServer(final Server server) {
return new TestServerContext() {
@Override
public void close() {
try {
if (!server.shutdown().awaitTermination(DEFAULT_TIMEOUT_SECONDS, SECONDS)) {
server.shutdownNow();
}
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public SocketAddress listenAddress() {
return server.getListenSockets().get(0);
}
};
}
private void stopServer() throws InterruptedException {
Server s = server;
if (s == null) {
throw new IllegalStateException("Already stopped");
}
server = null;
s.shutdown();
if (s.awaitTermination(1, TimeUnit.SECONDS)) {
return;
}
s.shutdownNow();
if (s.awaitTermination(1, TimeUnit.SECONDS)) {
return;
}
throw new RuntimeException("Unable to shutdown server");
}
public static void main(String[] args) throws Exception {
Server server = InProcessServerBuilder
.forName("ResumeStreamReactorDemo")
.addService(new FlakyNumberService())
.build()
.start();
ManagedChannel channel = InProcessChannelBuilder
.forName("ResumeStreamReactorDemo")
.usePlaintext()
.build();
ReactorNumbersGrpc.ReactorNumbersStub stub = ReactorNumbersGrpc.newReactorStub(channel);
// Keep retrying the stream until you get ten in a row with no error
new GrpcRetryFlux<>(() -> stub.oneToMany(Mono.just(Message.getDefaultInstance())))
.map(Message::getNumber)
.subscribe(System.out::println);
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
channel.shutdownNow();
server.shutdownNow();
}
public static void main(String[] args) throws Exception {
Server server = InProcessServerBuilder
.forName("ResumeStreamReactorDemo")
.addService(new FlakyNumberService())
.build()
.start();
ManagedChannel channel = InProcessChannelBuilder
.forName("ResumeStreamReactorDemo")
.usePlaintext()
.build();
RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(channel);
// Keep retrying the stream until you get ten in a row with no error
new GrpcRetryFlowable<>(() -> stub.oneToMany(Single.just(Message.getDefaultInstance())))
.map(Message::getNumber)
.subscribe(System.out::println);
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
channel.shutdownNow();
server.shutdownNow();
}
/**
* Shutdown the gRPC {@link Server} when this object is closed.
*/
@Override
public void close() throws Exception {
final Server server = server();
if (server != null) {
server.shutdown();
try {
// TODO: Maybe we should catch the InterruptedException from this?
server.awaitTermination(shutdownWaitTimeInMillis, TimeUnit.MILLISECONDS);
} finally {
server.shutdownNow();
this.server = null;
}
}
}
/**
* Attempt to {@link Server#shutdown()} the {@link Server} gracefully. If the max wait time is exceeded, give up and
* perform a hard {@link Server#shutdownNow()}.
*
* @param server the server to be shutdown
* @param timeout the max amount of time to wait for graceful shutdown to occur
* @param unit the time unit denominating the shutdown timeout
* @return the given server
* @throws InterruptedException if waiting for termination is interrupted
*/
public static Server shutdownGracefully(Server server, long timeout, TimeUnit unit) throws InterruptedException {
Preconditions.checkNotNull(server, "server");
Preconditions.checkArgument(timeout > 0, "timeout must be greater than 0");
Preconditions.checkNotNull(unit, "unit");
server.shutdown();
try {
server.awaitTermination(timeout, unit);
} finally {
server.shutdownNow();
}
return server;
}
@After
public void teardown() throws Exception {
// All scheduled tasks have been accounted for
assertThat(clock.getPendingTasks()).isEmpty();
// Health-check streams are usually not closed in the tests because onSubchannelState() is
// faked. Force closing for clean up.
for (Server server : servers) {
server.shutdownNow();
assertThat(server.awaitTermination(1, TimeUnit.SECONDS)).isTrue();
}
for (ManagedChannel channel : channels) {
channel.shutdownNow();
assertThat(channel.awaitTermination(1, TimeUnit.SECONDS)).isTrue();
}
for (HealthImpl impl : healthImpls) {
assertThat(impl.checkCalled).isFalse();
}
}
/**
* The following method shuts down an {@code Server} in two
* phases, first by calling {@code shutdown} to reject incoming tasks,
* and then calling {@code shutdownNow}, if necessary, to cancel any
* lingering tasks.
*/
public static boolean shutdownAndAwaitTermination(final Server server, final long timeoutMillis) {
if (server == null) {
return true;
}
// disable new tasks from being submitted
server.shutdown();
final TimeUnit unit = TimeUnit.MILLISECONDS;
final long phaseOne = timeoutMillis / 5;
try {
// wait a while for existing tasks to terminate
if (server.awaitTermination(phaseOne, unit)) {
return true;
}
server.shutdownNow();
// wait a while for tasks to respond to being cancelled
if (server.awaitTermination(timeoutMillis - phaseOne, unit)) {
return true;
}
LOG.warn("Fail to shutdown grpc server: {}.", server);
} catch (final InterruptedException e) {
// (Re-)cancel if current thread also interrupted
server.shutdownNow();
// preserve interrupt status
Thread.currentThread().interrupt();
}
return false;
}
private int killServerReceivedMessage() {
for (int port : eventsMap.keySet()) {
if (!eventsMap.get(port).isEmpty()) {
Server serverToKill = servers.get(port);
serverToKill.shutdownNow();
return port;
}
}
throw new IllegalStateException("None of the servers received any message");
}
public static void main(final String[] args) throws InterruptedException, IOException {
final CountDownLatch latch = TestUtil.initExpectedSpanLatch(2);
final Server server = ServerBuilder.forPort(8086).addService(new GreeterImpl()).build().start();
final ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8086).usePlaintext(true).build();
final GreeterBlockingStub greeterBlockingStub = GreeterGrpc.newBlockingStub(channel);
greeterBlockingStub.sayHello(HelloRequest.newBuilder().setName("world").build()).getMessage();
server.shutdownNow();
TestUtil.checkSpan(latch, new ComponentSpanCount("java-grpc", 2) );
}
private int killServerReceivedMessage() {
for (int port : eventsMap.keySet()) {
if (!eventsMap.get(port).isEmpty()) {
Server serverToKill = servers.get(port);
serverToKill.shutdownNow();
return port;
}
}
throw new IllegalStateException("None of the servers received any message");
}
@Test
public void serverRunsAndRespondsCorrectly() throws ExecutionException,
IOException,
InterruptedException,
TimeoutException {
final String name = UUID.randomUUID().toString();
Server server = ServerBuilder.forPort(9999)
.addService(new GreeterImpl())
.build();
server.start();
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", server.getPort())
.usePlaintext(true)
.build();
GreeterGrpc8.GreeterCompletableFutureStub stub = GreeterGrpc8.newCompletableFutureStub(channel);
CompletableFuture<HelloResponse> response = stub.sayHello(HelloRequest.newBuilder().setName(name).build());
await().atMost(3, TimeUnit.SECONDS).until(() -> response.isDone() && response.get().getMessage().contains(name));
channel.shutdown();
channel.awaitTermination(1, TimeUnit.MINUTES);
channel.shutdownNow();
server.shutdown();
server.awaitTermination(1, TimeUnit.MINUTES);
server.shutdownNow();
}
@Test
public void stopSendingWhenClusterIsDown() throws Exception {
for(Server server:servers.values()) {
server.shutdownNow();
}
messageSender.onConnected();
final Thread thread = new Thread(new Runnable() {
@Override
public void run() {
messageSender.send(event);
}
});
thread.start();
// we don't want to keep sending on cluster down
await().atMost(2, SECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return thread.isAlive() && thread.getState().equals(WAITING);
}
});
assertThat(eventsMap.get(8080).isEmpty(), is(true));
assertThat(eventsMap.get(8090).isEmpty(), is(true));
//TODO:it seems in Windows environment we need wait a short time in order to make sure reconnect mechanism work
Thread.sleep(2000);
startServerOnPort(8080);
startServerOnPort(8090);
await().atMost(2, SECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return connected.get(8080).size() == 2 || connected.get(8090).size() == 2;
}
});
await().atMost(2, SECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return eventsMap.get(8080).size() == 1 || eventsMap.get(8090).size() == 1;
}
});
}
@Test
public void stopSendingWhenClusterIsDown() throws Exception {
for(Server server:servers.values()) {
server.shutdownNow();
}
messageSender.onConnected();
final Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
messageSender.send(event);
} catch (OmegaException ex) {
assertThat(ex.getMessage().endsWith("all alpha server is down."), is(true));
}
}
});
thread.start();
// we don't want to keep sending on cluster down
await().atMost(10, SECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return thread.getState().equals(TERMINATED);
}
});
assertThat(eventsMap.get(8080).isEmpty(), is(true));
assertThat(eventsMap.get(8090).isEmpty(), is(true));
//TODO:it seems in Windows environment we need wait a short time in order to make sure reconnect mechanism work
Thread.sleep(2000);
startServerOnPort(8080);
startServerOnPort(8090);
messageSender.send(event);
await().atMost(2, SECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return connected.get(8080).size() == 2 || connected.get(8090).size() == 2;
}
});
await().atMost(2, SECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return eventsMap.get(8080).size() == 1 || eventsMap.get(8090).size() == 1;
}
});
}
@Test
public void testKeepAlive() throws IOException, ExecutionException, InterruptedException {
// Verify that keep-alive timeout less than permissible by the server results in a failure.
NettyChannelBuilder builder = NettyChannelBuilder.forAddress("localhost", serverPort)
.keepAliveTime(10, TimeUnit.SECONDS);
if (testSecure) {
builder = builder.sslContext(GrpcSslContexts.forClient().trustManager(
new File(SecurityConfigDefaults.TLS_CA_CERT_PATH)).build());
} else {
builder = builder.usePlaintext();
}
@Cleanup
final ControllerImpl controller = new ControllerImpl(builder,
ControllerImplConfig.builder().clientConfig(ClientConfig.builder()
.trustStore(SecurityConfigDefaults.TLS_CA_CERT_PATH)
.controllerURI(URI.create((testSecure ? "tls://" : "tcp://") + "localhost:" + serverPort))
.build())
.retryAttempts(1).build(),
this.executor);
CompletableFuture<Boolean> createStreamStatus = controller.createStream("scope1", "streamdelayed", StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(1))
.build());
AssertExtensions.assertFutureThrows("Should throw RetriesExhaustedException", createStreamStatus,
throwable -> throwable instanceof RetriesExhaustedException);
// Verify that the same RPC with permissible keepalive time succeeds.
int serverPort2 = TestUtils.getAvailableListenPort();
NettyServerBuilder testServerBuilder = NettyServerBuilder.forPort(serverPort2)
.addService(testServerImpl)
.permitKeepAliveTime(5, TimeUnit.SECONDS);
if (testSecure) {
testServerBuilder = testServerBuilder.useTransportSecurity(
new File(SecurityConfigDefaults.TLS_SERVER_CERT_PATH),
new File(SecurityConfigDefaults.TLS_SERVER_PRIVATE_KEY_PATH));
}
Server testServer = testServerBuilder.build()
.start();
builder = NettyChannelBuilder.forAddress("localhost", serverPort2)
.keepAliveTime(10, TimeUnit.SECONDS);
if (testSecure) {
builder = builder.sslContext(GrpcSslContexts.forClient().trustManager(
new File(SecurityConfigDefaults.TLS_CA_CERT_PATH)).build());
} else {
builder = builder.usePlaintext();
}
@Cleanup
final ControllerImpl controller1 = new ControllerImpl(builder,
ControllerImplConfig.builder().clientConfig(ClientConfig.builder()
.trustStore(SecurityConfigDefaults.TLS_CA_CERT_PATH)
.controllerURI(URI.create((testSecure ? "tls://" : "tcp://") + "localhost:" + serverPort))
.build())
.retryAttempts(1).build(), this.executor);
createStreamStatus = controller1.createStream("scope1", "streamdelayed", StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(1))
.build());
assertTrue(createStreamStatus.get());
testServer.shutdownNow();
}
@Test
public void pullOneMessage() throws IOException {
String expectedSubscription = SUBSCRIPTION.getPath();
final PullRequest expectedRequest =
PullRequest.newBuilder()
.setSubscription(expectedSubscription)
.setReturnImmediately(true)
.setMaxMessages(10)
.build();
Timestamp timestamp =
Timestamp.newBuilder()
.setSeconds(PUB_TIME / 1000)
.setNanos((int) (PUB_TIME % 1000) * 1000)
.build();
PubsubMessage expectedPubsubMessage =
PubsubMessage.newBuilder()
.setMessageId(MESSAGE_ID)
.setData(ByteString.copyFrom(DATA.getBytes(StandardCharsets.UTF_8)))
.setPublishTime(timestamp)
.putAllAttributes(ATTRIBUTES)
.putAllAttributes(
ImmutableMap.of(
TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME), ID_ATTRIBUTE, RECORD_ID))
.build();
ReceivedMessage expectedReceivedMessage =
ReceivedMessage.newBuilder().setMessage(expectedPubsubMessage).setAckId(ACK_ID).build();
final PullResponse response =
PullResponse.newBuilder()
.addAllReceivedMessages(ImmutableList.of(expectedReceivedMessage))
.build();
final List<PullRequest> requestsReceived = new ArrayList<>();
SubscriberImplBase subscriberImplBase =
new SubscriberImplBase() {
@Override
public void pull(PullRequest request, StreamObserver<PullResponse> responseObserver) {
requestsReceived.add(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
}
};
Server server =
InProcessServerBuilder.forName(channelName).addService(subscriberImplBase).build().start();
try {
List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
assertEquals(1, acutalMessages.size());
IncomingMessage actualMessage = acutalMessages.get(0);
assertEquals(ACK_ID, actualMessage.ackId());
assertEquals(DATA, actualMessage.message().getData().toStringUtf8());
assertEquals(RECORD_ID, actualMessage.recordId());
assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch());
assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch());
assertEquals(expectedRequest, Iterables.getOnlyElement(requestsReceived));
} finally {
server.shutdownNow();
}
}
@Test
public void publishOneMessage() throws IOException {
String expectedTopic = TOPIC.getPath();
PubsubMessage expectedPubsubMessage =
PubsubMessage.newBuilder()
.setData(ByteString.copyFrom(DATA.getBytes(StandardCharsets.UTF_8)))
.putAllAttributes(ATTRIBUTES)
.putAllAttributes(
ImmutableMap.of(
TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME), ID_ATTRIBUTE, RECORD_ID))
.build();
final PublishRequest expectedRequest =
PublishRequest.newBuilder()
.setTopic(expectedTopic)
.addAllMessages(ImmutableList.of(expectedPubsubMessage))
.build();
final PublishResponse response =
PublishResponse.newBuilder().addAllMessageIds(ImmutableList.of(MESSAGE_ID)).build();
final List<PublishRequest> requestsReceived = new ArrayList<>();
PublisherImplBase publisherImplBase =
new PublisherImplBase() {
@Override
public void publish(
PublishRequest request, StreamObserver<PublishResponse> responseObserver) {
requestsReceived.add(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
}
};
Server server =
InProcessServerBuilder.forName(channelName).addService(publisherImplBase).build().start();
try {
OutgoingMessage actualMessage =
OutgoingMessage.of(
com.google.pubsub.v1.PubsubMessage.newBuilder()
.setData(ByteString.copyFromUtf8(DATA))
.putAllAttributes(ATTRIBUTES)
.build(),
MESSAGE_TIME,
RECORD_ID);
int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
assertEquals(1, n);
assertEquals(expectedRequest, Iterables.getOnlyElement(requestsReceived));
} finally {
server.shutdownNow();
}
}