下面列出了怎么用io.grpc.internal.ServerStream的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public ServerTransportListener transportCreated(final ServerTransport transport) {
transports.add((NettyServerTransport) transport);
return new ServerTransportListener() {
@Override
public void streamCreated(ServerStream stream, String method, Metadata headers) {
EchoServerStreamListener listener = new EchoServerStreamListener(stream, method, headers);
stream.setListener(listener);
stream.writeHeaders(new Metadata());
stream.request(1);
streamListeners.add(listener);
}
@Override
public Attributes transportReady(Attributes transportAttrs) {
serverTransportAttributesList.add(transportAttrs);
return transportAttrs;
}
@Override
public void transportTerminated() {}
};
}
@Test
@SuppressWarnings("deprecation")
public void authorityPropagation() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
Metadata clientHeaders = new Metadata();
ClientStream clientStream = client.newStream(methodDescriptor, clientHeaders, callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
StreamCreation serverStreamCreation
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ServerStream serverStream = serverStreamCreation.stream;
assertEquals(testAuthority(server), serverStream.getAuthority());
}
/**
* Helper that simply does an RPC. It can be used similar to a sleep for negative testing: to give
* time for actions _not_ to happen. Since it is based on doing an actual RPC with actual
* callbacks, it generally provides plenty of time for Runnables to execute. But it is also faster
* on faster machines and more reliable on slower machines.
*/
private void doPingPong(MockServerListener serverListener) throws Exception {
ManagedClientTransport client = newClientTransport(server);
ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class);
startTransport(client, listener);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
StreamCreation serverStreamCreation
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ServerStream serverStream = serverStreamCreation.stream;
ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
serverStream.close(Status.OK, new Metadata());
assertNotNull(clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertNotNull(serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
client.shutdown(Status.UNAVAILABLE);
}
@Override
public ServerTransportListener transportCreated(final ServerTransport transport) {
transports.add((NettyServerTransport) transport);
return new ServerTransportListener() {
@Override
public void streamCreated(ServerStream stream, String method, Metadata headers) {
EchoServerStreamListener listener = new EchoServerStreamListener(stream, method, headers);
stream.setListener(listener);
stream.writeHeaders(new Metadata());
stream.request(1);
streamListeners.add(listener);
}
@Override
public Attributes transportReady(Attributes transportAttrs) {
serverTransportAttributesList.add(transportAttrs);
return transportAttrs;
}
@Override
public void transportTerminated() {}
};
}
@Test
public void zeroMessageStream() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
StreamCreation serverStreamCreation
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ServerStream serverStream = serverStreamCreation.stream;
ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
clientStream.halfClose();
assertTrue(serverStreamListener.awaitHalfClosed(TIMEOUT_MS, TimeUnit.MILLISECONDS));
serverStream.writeHeaders(new Metadata());
assertNotNull(clientStreamListener.headers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
Status status = Status.OK.withDescription("Nice talking to you");
serverStream.close(status, new Metadata());
assertCodeEquals(Status.OK, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertEquals(status.getCode(), clientStreamStatus.getCode());
assertEquals(status.getDescription(), clientStreamStatus.getDescription());
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertTrue(clientStreamTracer1.getInboundHeaders());
assertSame(clientStreamStatus, clientStreamTracer1.getStatus());
assertSame(status, serverStreamTracer1.getStatus());
}
@Test
public void earlyServerClose_withServerHeaders() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
StreamCreation serverStreamCreation
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ServerStream serverStream = serverStreamCreation.stream;
ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
serverStream.writeHeaders(new Metadata());
assertNotNull(clientStreamListener.headers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
Status strippedStatus = Status.OK.withDescription("Hello. Goodbye.");
Status status = strippedStatus.withCause(new Exception());
serverStream.close(status, new Metadata());
assertCodeEquals(Status.OK, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertEquals(status.getCode(), clientStreamStatus.getCode());
assertEquals("Hello. Goodbye.", clientStreamStatus.getDescription());
assertNull(clientStreamStatus.getCause());
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertTrue(clientStreamTracer1.getInboundHeaders());
assertSame(clientStreamStatus, clientStreamTracer1.getStatus());
assertSame(status, serverStreamTracer1.getStatus());
}
@Test
public void earlyServerClose_serverFailure() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
StreamCreation serverStreamCreation
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ServerStream serverStream = serverStreamCreation.stream;
ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
Status strippedStatus = Status.INTERNAL.withDescription("I'm not listening");
Status status = strippedStatus.withCause(new Exception());
serverStream.close(status, new Metadata());
assertCodeEquals(Status.OK, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertEquals(status.getCode(), clientStreamStatus.getCode());
assertEquals(status.getDescription(), clientStreamStatus.getDescription());
assertNull(clientStreamStatus.getCause());
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertSame(clientStreamStatus, clientStreamTracer1.getStatus());
assertSame(status, serverStreamTracer1.getStatus());
}
@Test
public void serverCancel() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
StreamCreation serverStreamCreation
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ServerStream serverStream = serverStreamCreation.stream;
ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
Status status = Status.DEADLINE_EXCEEDED.withDescription("It was bound to happen")
.withCause(new Exception());
serverStream.cancel(status);
assertEquals(status, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
// Presently we can't sent much back to the client in this case. Verify that is the current
// behavior for consistency between transports.
assertCodeEquals(Status.CANCELLED, clientStreamStatus);
// Cause should not be transmitted between server and client
assertNull(clientStreamStatus.getCause());
verify(clientStreamTracerFactory).newClientStreamTracer(
any(CallOptions.class), any(Metadata.class));
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertSame(clientStreamStatus, clientStreamTracer1.getStatus());
verify(serverStreamTracerFactory).newServerStreamTracer(anyString(), any(Metadata.class));
assertSame(status, serverStreamTracer1.getStatus());
// Second cancellation shouldn't trigger additional callbacks
serverStream.cancel(status);
doPingPong(serverListener);
}
@Test
public void transportTracer_server_streamEnded_ok() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
startTransport(client, mockClientTransportListener);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
StreamCreation serverStreamCreation
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ServerStream serverStream = serverStreamCreation.stream;
if (!haveTransportTracer()) {
return;
}
TransportStats serverBefore = getTransportStats(serverTransportListener.transport);
assertEquals(0, serverBefore.streamsSucceeded);
assertEquals(0, serverBefore.streamsFailed);
TransportStats clientBefore = getTransportStats(client);
assertEquals(0, clientBefore.streamsSucceeded);
assertEquals(0, clientBefore.streamsFailed);
clientStream.halfClose();
serverStream.close(Status.OK, new Metadata());
// do not validate stats until close() has been called on client
assertNotNull(clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
TransportStats serverAfter = getTransportStats(serverTransportListener.transport);
assertEquals(1, serverAfter.streamsSucceeded);
assertEquals(0, serverAfter.streamsFailed);
TransportStats clientAfter = getTransportStats(client);
assertEquals(1, clientAfter.streamsSucceeded);
assertEquals(0, clientAfter.streamsFailed);
}
@Test
public void transportTracer_server_streamEnded_nonOk() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
startTransport(client, mockClientTransportListener);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
StreamCreation serverStreamCreation
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ServerStream serverStream = serverStreamCreation.stream;
if (!haveTransportTracer()) {
return;
}
TransportStats serverBefore = getTransportStats(serverTransportListener.transport);
assertEquals(0, serverBefore.streamsFailed);
assertEquals(0, serverBefore.streamsSucceeded);
TransportStats clientBefore = getTransportStats(client);
assertEquals(0, clientBefore.streamsFailed);
assertEquals(0, clientBefore.streamsSucceeded);
serverStream.close(Status.UNKNOWN, new Metadata());
// do not validate stats until close() has been called on client
assertNotNull(clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
TransportStats serverAfter = getTransportStats(serverTransportListener.transport);
assertEquals(1, serverAfter.streamsFailed);
assertEquals(0, serverAfter.streamsSucceeded);
TransportStats clientAfter = getTransportStats(client);
assertEquals(1, clientAfter.streamsFailed);
assertEquals(0, clientAfter.streamsSucceeded);
client.shutdown(Status.UNAVAILABLE);
}
@Test
public void transportTracer_server_receive_msg() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
startTransport(client, mockClientTransportListener);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
StreamCreation serverStreamCreation
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ServerStream serverStream = serverStreamCreation.stream;
ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
if (!haveTransportTracer()) {
return;
}
TransportStats serverBefore = getTransportStats(serverTransportListener.transport);
assertEquals(0, serverBefore.messagesReceived);
assertEquals(0, serverBefore.lastMessageReceivedTimeNanos);
TransportStats clientBefore = getTransportStats(client);
assertEquals(0, clientBefore.messagesSent);
assertEquals(0, clientBefore.lastMessageSentTimeNanos);
serverStream.request(1);
clientStream.writeMessage(methodDescriptor.streamRequest("request"));
clientStream.flush();
clientStream.halfClose();
verifyMessageCountAndClose(serverStreamListener.messageQueue, 1);
TransportStats serverAfter = getTransportStats(serverTransportListener.transport);
assertEquals(1, serverAfter.messagesReceived);
assertEquals(fakeCurrentTimeNanos(), serverAfter.lastMessageReceivedTimeNanos);
TransportStats clientAfter = getTransportStats(client);
assertEquals(1, clientAfter.messagesSent);
assertEquals(fakeCurrentTimeNanos(), clientAfter.lastMessageSentTimeNanos);
serverStream.close(Status.OK, new Metadata());
}
@Test
public void transportTracer_server_send_msg() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
startTransport(client, mockClientTransportListener);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
StreamCreation serverStreamCreation
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ServerStream serverStream = serverStreamCreation.stream;
if (!haveTransportTracer()) {
return;
}
TransportStats serverBefore = getTransportStats(serverTransportListener.transport);
assertEquals(0, serverBefore.messagesSent);
assertEquals(0, serverBefore.lastMessageSentTimeNanos);
TransportStats clientBefore = getTransportStats(client);
assertEquals(0, clientBefore.messagesReceived);
assertEquals(0, clientBefore.lastMessageReceivedTimeNanos);
clientStream.request(1);
serverStream.writeHeaders(new Metadata());
serverStream.writeMessage(methodDescriptor.streamResponse("response"));
serverStream.flush();
verifyMessageCountAndClose(clientStreamListener.messageQueue, 1);
TransportStats serverAfter = getTransportStats(serverTransportListener.transport);
assertEquals(1, serverAfter.messagesSent);
assertEquals(fakeCurrentTimeNanos(), serverAfter.lastMessageSentTimeNanos);
TransportStats clientAfter = getTransportStats(client);
assertEquals(1, clientAfter.messagesReceived);
assertEquals(fakeCurrentTimeNanos(), clientAfter.lastMessageReceivedTimeNanos);
serverStream.close(Status.OK, new Metadata());
}
@Test
public void socketStats() throws Exception {
server.start(serverListener);
ManagedClientTransport client = newClientTransport(server);
startTransport(client, mockClientTransportListener);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
StreamCreation serverStreamCreation
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ServerStream serverStream = serverStreamCreation.stream;
SocketAddress serverAddress = clientStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
SocketAddress clientAddress = serverStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
SocketStats clientSocketStats = client.getStats().get();
assertEquals(clientAddress, clientSocketStats.local);
assertEquals(serverAddress, clientSocketStats.remote);
// very basic sanity check that socket options are populated
assertNotNull(clientSocketStats.socketOptions.lingerSeconds);
assertTrue(clientSocketStats.socketOptions.others.containsKey("SO_SNDBUF"));
SocketStats serverSocketStats = serverTransportListener.transport.getStats().get();
assertEquals(serverAddress, serverSocketStats.local);
assertEquals(clientAddress, serverSocketStats.remote);
// very basic sanity check that socket options are populated
assertNotNull(serverSocketStats.socketOptions.lingerSeconds);
assertTrue(serverSocketStats.socketOptions.others.containsKey("SO_SNDBUF"));
}
public StreamCreation(
ServerStream stream, String method, Metadata headers, ServerStreamListenerBase listener) {
this.stream = stream;
this.method = method;
this.headers = headers;
this.listener = listener;
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT> call,
final Metadata headers,
final ServerCallHandler<ReqT, RespT> next) {
Context ctx = Context.current();
final ServerStream stream = ServerStreamHelper.getServerStream(call);
if (stream != null) {
ctx = ctx.withValue(STREAM, stream);
}
return Contexts.interceptCall(ctx, call, headers, next);
}
public static Connection getCurrentConnection(final List<ConnectionClosedEventListener> listeners) {
final ServerStream stream = ConnectionInterceptor.STREAM.get();
if (stream != null) {
return NettyConnectionHelper.getOrCreateConnection(stream, listeners);
}
return null;
}
public static Connection getOrCreateConnection(final ServerStream stream,
final List<ConnectionClosedEventListener> listeners) {
if (stream instanceof NettyServerStream) {
return attachChannel(CHANNEL_GETTER.get((NettyServerStream) stream), listeners);
}
return null;
}
AsyncContext getAsyncContext(Object object) {
if (object instanceof ServerStreamGetter) {
ServerStream serverStream = ((ServerStreamGetter) object)._$PINPOINT$_getServerStream();
if (serverStream instanceof AsyncContextAccessor) {
return ((AsyncContextAccessor) serverStream)._$PINPOINT$_getAsyncContext();
}
}
return null;
}
static boolean validate(Object[] args) {
if (ArrayUtils.getLength(args) == 3) {
if (!(args[0] instanceof ServerStream)) {
return false;
}
if (!(args[1] instanceof String)) {
return false;
}
if (!(args[2] instanceof Metadata)) {
return false;
}
return true;
}
return false;
}
@Override
public void streamCreated(ServerStream stream, String method, Metadata headers) {
stream.setListener(streamListener);
}
EchoServerStreamListener(ServerStream stream, String method, Metadata headers) {
this.stream = stream;
this.method = method;
this.headers = headers;
}
@Test
public void openStreamPreventsTermination() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true);
StreamCreation serverStreamCreation
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ServerStream serverStream = serverStreamCreation.stream;
ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
client.shutdown(Status.UNAVAILABLE);
client = null;
server.shutdown();
serverTransport.shutdown();
serverTransport = null;
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class));
assertTrue(serverListener.waitForShutdown(TIMEOUT_MS, TimeUnit.MILLISECONDS));
// A new server should be able to start listening, since the current server has given up
// resources. There may be cases this is impossible in the future, but for now it is a useful
// property.
serverListener = new MockServerListener();
server = newServer(server, Arrays.asList(serverStreamTracerFactory));
server.start(serverListener);
// Try to "flush" out any listener notifications on client and server. This also ensures that
// the stream still functions.
serverStream.writeHeaders(new Metadata());
clientStream.halfClose();
assertNotNull(clientStreamListener.headers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertTrue(serverStreamListener.awaitHalfClosed(TIMEOUT_MS, TimeUnit.MILLISECONDS));
verify(mockClientTransportListener, never()).transportTerminated();
verify(mockClientTransportListener, never()).transportInUse(false);
assertFalse(serverTransportListener.isTerminated());
clientStream.cancel(Status.CANCELLED);
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(false);
assertTrue(serverTransportListener.waitForTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS));
}
@Test
public void earlyServerClose_noServerHeaders() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
StreamCreation serverStreamCreation
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ServerStream serverStream = serverStreamCreation.stream;
ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
Status strippedStatus = Status.OK.withDescription("Hellogoodbye");
Status status = strippedStatus.withCause(new Exception());
Metadata trailers = new Metadata();
trailers.put(asciiKey, "trailers");
trailers.put(asciiKey, "dupvalue");
trailers.put(asciiKey, "dupvalue");
trailers.put(binaryKey, "äbinarytrailers");
serverStream.close(status, trailers);
assertCodeEquals(Status.OK, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
Metadata clientStreamTrailers =
clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertEquals(status.getCode(), clientStreamStatus.getCode());
assertEquals("Hellogoodbye", clientStreamStatus.getDescription());
// Cause should not be transmitted to the client.
assertNull(clientStreamStatus.getCause());
assertEquals(
Lists.newArrayList(trailers.getAll(asciiKey)),
Lists.newArrayList(clientStreamTrailers.getAll(asciiKey)));
assertEquals(
Lists.newArrayList(trailers.getAll(binaryKey)),
Lists.newArrayList(clientStreamTrailers.getAll(binaryKey)));
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertSame(clientStreamStatus, clientStreamTracer1.getStatus());
assertSame(status, serverStreamTracer1.getStatus());
}
@Override
public void streamCreated(ServerStream stream, String method, Metadata headers) {
ServerStreamListenerBase listener = new ServerStreamListenerBase();
streams.add(new StreamCreation(stream, method, headers, listener));
stream.setListener(listener);
}
static GrpcServerStreamRequest create(Object[] args) {
if (validate(args)) {
return new GrpcServerStreamRequest((ServerStream) args[0], (String) args[1], (Metadata) args[2]);
}
return null;
}
GrpcServerStreamRequest(ServerStream serverStream, String methodName, Metadata metadata) {
this.serverStream = Assert.requireNonNull(serverStream, "serverStream");
this.methodName = Assert.requireNonNull(methodName, "methodName");
this.metadata = Assert.requireNonNull(metadata, "metadata");
}
public ServerStream getServerStream() {
return serverStream;
}
@Override
public void streamCreated(ServerStream stream, String method, Metadata headers) {
stream.setListener(streamListener);
}
EchoServerStreamListener(ServerStream stream, String method, Metadata headers) {
this.stream = stream;
this.method = method;
this.headers = headers;
}
@Override public void streamCreated(ServerStream stream, String method, Metadata headers) {}