下面列出了io.netty.channel.embedded.EmbeddedChannel#newPromise ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testRemoveAndFailAllReentrantFailAll() {
EmbeddedChannel channel = newChannel();
final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext());
ChannelPromise promise = channel.newPromise();
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
queue.removeAndFailAll(new IllegalStateException());
}
});
queue.add(1L, promise);
ChannelPromise promise2 = channel.newPromise();
queue.add(2L, promise2);
queue.removeAndFailAll(new Exception());
assertTrue(promise.isDone());
assertFalse(promise.isSuccess());
assertTrue(promise2.isDone());
assertFalse(promise2.isSuccess());
assertFalse(channel.finish());
}
@Test
public void testRemoveAndFailAllReentrance() {
EmbeddedChannel channel = new EmbeddedChannel(new ChannelInboundHandlerAdapter());
final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext());
ChannelPromise promise = channel.newPromise();
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
queue.removeAndFailAll(new IllegalStateException());
}
});
queue.add(1L, promise);
ChannelPromise promise2 = channel.newPromise();
queue.add(2L, promise2);
queue.removeAndFailAll(new Exception());
assertFalse(promise.isSuccess());
assertFalse(promise2.isSuccess());
assertFalse(channel.finish());
}
private void uploadsShouldWork(boolean casUpload, EmbeddedChannel ch, HttpResponseStatus status)
throws Exception {
ByteArrayInputStream data = new ByteArrayInputStream(new byte[] {1, 2, 3, 4, 5});
ChannelPromise writePromise = ch.newPromise();
ch.writeOneOutbound(new UploadCommand(CACHE_URI, casUpload, "abcdef", data, 5), writePromise);
HttpRequest request = ch.readOutbound();
assertThat(request.method()).isEqualTo(HttpMethod.PUT);
assertThat(request.headers().get(HttpHeaders.CONNECTION))
.isEqualTo(HttpHeaderValues.KEEP_ALIVE.toString());
HttpChunkedInput content = ch.readOutbound();
assertThat(content.readChunk(ByteBufAllocator.DEFAULT).content().readableBytes()).isEqualTo(5);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ch.writeInbound(response);
assertThat(writePromise.isDone()).isTrue();
assertThat(ch.isOpen()).isTrue();
}
/** Test that the handler correctly supports http error codes i.e. 404 (NOT FOUND). */
@Test
public void httpErrorsAreSupported() {
EmbeddedChannel ch = new EmbeddedChannel(new HttpUploadHandler(null, ImmutableList.of()));
ByteArrayInputStream data = new ByteArrayInputStream(new byte[] {1, 2, 3, 4, 5});
ChannelPromise writePromise = ch.newPromise();
ch.writeOneOutbound(new UploadCommand(CACHE_URI, true, "abcdef", data, 5), writePromise);
HttpRequest request = ch.readOutbound();
assertThat(request).isInstanceOf(HttpRequest.class);
HttpChunkedInput content = ch.readOutbound();
assertThat(content).isInstanceOf(HttpChunkedInput.class);
FullHttpResponse response =
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN);
response.headers().set(HttpHeaders.CONNECTION, HttpHeaderValues.CLOSE);
ch.writeInbound(response);
assertThat(writePromise.isDone()).isTrue();
assertThat(writePromise.cause()).isInstanceOf(HttpException.class);
assertThat(((HttpException) writePromise.cause()).response().status())
.isEqualTo(HttpResponseStatus.FORBIDDEN);
assertThat(ch.isOpen()).isFalse();
}
/**
* Test that the handler correctly supports http error codes i.e. 404 (NOT FOUND) with a
* Content-Length header.
*/
@Test
public void httpErrorsWithContentAreSupported() {
EmbeddedChannel ch = new EmbeddedChannel(new HttpUploadHandler(null, ImmutableList.of()));
ByteArrayInputStream data = new ByteArrayInputStream(new byte[] {1, 2, 3, 4, 5});
ChannelPromise writePromise = ch.newPromise();
ch.writeOneOutbound(new UploadCommand(CACHE_URI, true, "abcdef", data, 5), writePromise);
HttpRequest request = ch.readOutbound();
assertThat(request).isInstanceOf(HttpRequest.class);
HttpChunkedInput content = ch.readOutbound();
assertThat(content).isInstanceOf(HttpChunkedInput.class);
ByteBuf errorMsg = ByteBufUtil.writeAscii(ch.alloc(), "error message");
FullHttpResponse response =
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, errorMsg);
response.headers().set(HttpHeaders.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ch.writeInbound(response);
assertThat(writePromise.isDone()).isTrue();
assertThat(writePromise.cause()).isInstanceOf(HttpException.class);
assertThat(((HttpException) writePromise.cause()).response().status())
.isEqualTo(HttpResponseStatus.NOT_FOUND);
assertThat(ch.isOpen()).isTrue();
}
@Test
public void extraHeadersAreIncluded() throws Exception {
URI uri = new URI("http://does.not.exist:8080/foo");
ImmutableList<Entry<String, String>> remoteHeaders =
ImmutableList.of(
Maps.immutableEntry("key1", "value1"), Maps.immutableEntry("key2", "value2"));
EmbeddedChannel ch =
new EmbeddedChannel(new HttpDownloadHandler(/* credentials= */ null, remoteHeaders));
DownloadCommand cmd =
new DownloadCommand(uri, /* casDownload= */ true, DIGEST, new ByteArrayOutputStream());
ChannelPromise writePromise = ch.newPromise();
ch.writeOneOutbound(cmd, writePromise);
HttpRequest request = ch.readOutbound();
assertThat(request.headers().get("key1")).isEqualTo("value1");
assertThat(request.headers().get("key2")).isEqualTo("value2");
}
@Test
public void multipleExtraHeadersAreSupported() throws Exception {
URI uri = new URI("http://does.not.exist:8080/foo");
ImmutableList<Entry<String, String>> remoteHeaders =
ImmutableList.of(
Maps.immutableEntry("key", "value1"), Maps.immutableEntry("key", "value2"));
EmbeddedChannel ch =
new EmbeddedChannel(new HttpDownloadHandler(/* credentials= */ null, remoteHeaders));
DownloadCommand cmd =
new DownloadCommand(uri, /* casDownload= */ true, DIGEST, new ByteArrayOutputStream());
ChannelPromise writePromise = ch.newPromise();
ch.writeOneOutbound(cmd, writePromise);
HttpRequest request = ch.readOutbound();
assertThat(request.headers().getAll("key")).isEqualTo(Arrays.asList("value1", "value2"));
}
/** Test that the handler correctly supports http error codes i.e. 404 (NOT FOUND). */
@Test
public void httpErrorsAreSupported() throws IOException {
EmbeddedChannel ch = new EmbeddedChannel(new HttpDownloadHandler(null, ImmutableList.of()));
ByteArrayOutputStream out = Mockito.spy(new ByteArrayOutputStream());
DownloadCommand cmd = new DownloadCommand(CACHE_URI, true, DIGEST, out);
ChannelPromise writePromise = ch.newPromise();
ch.writeOneOutbound(cmd, writePromise);
HttpResponse response =
new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);
response.headers().set(HttpHeaders.HOST, "localhost");
response.headers().set(HttpHeaders.CONTENT_LENGTH, 0);
response.headers().set(HttpHeaders.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ch.writeInbound(response);
ch.writeInbound(LastHttpContent.EMPTY_LAST_CONTENT);
assertThat(writePromise.isDone()).isTrue();
assertThat(writePromise.cause()).isInstanceOf(HttpException.class);
assertThat(((HttpException) writePromise.cause()).response().status())
.isEqualTo(HttpResponseStatus.NOT_FOUND);
// No data should have been written to the OutputStream and it should have been closed.
assertThat(out.size()).isEqualTo(0);
// The caller is responsible for closing the stream.
verify(out, never()).close();
assertThat(ch.isOpen()).isTrue();
}
@Test
public void testIncompleteWriteDoesNotCompletePromisePrematurely() throws NoSuchAlgorithmException {
SSLEngine engine = SSLContext.getDefault().createSSLEngine();
engine.setUseClientMode(false);
EmbeddedChannel ch = new EmbeddedChannel(new SslHandler(engine));
ChannelPromise promise = ch.newPromise();
ByteBuf buf = Unpooled.buffer(10).writeZero(10);
ch.writeAndFlush(buf, promise);
assertFalse(promise.isDone());
assertTrue(ch.finishAndReleaseAll());
assertTrue(promise.isDone());
assertThat(promise.cause(), is(instanceOf(SSLException.class)));
}
@Test
public void testRemoveAndWriteAllReentrance() {
EmbeddedChannel channel = newChannel();
final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext());
ChannelPromise promise = channel.newPromise();
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
queue.removeAndWriteAll();
}
});
queue.add(1L, promise);
ChannelPromise promise2 = channel.newPromise();
queue.add(2L, promise2);
queue.removeAndWriteAll();
channel.flush();
assertTrue(promise.isSuccess());
assertTrue(promise2.isSuccess());
assertTrue(channel.finish());
assertEquals(1L, channel.readOutbound());
assertEquals(2L, channel.readOutbound());
assertNull(channel.readOutbound());
assertNull(channel.readInbound());
}
@Test
public void testCloseChannelOnCreation() {
EmbeddedChannel channel = newChannel();
ChannelHandlerContext context = channel.pipeline().firstContext();
channel.close().syncUninterruptibly();
final PendingWriteQueue queue = new PendingWriteQueue(context);
IllegalStateException ex = new IllegalStateException();
ChannelPromise promise = channel.newPromise();
queue.add(1L, promise);
queue.removeAndFailAll(ex);
assertSame(ex, promise.cause());
}
@Test
public void testRemoveAndWriteAllReentrance() {
EmbeddedChannel channel = new EmbeddedChannel(new ChannelInboundHandlerAdapter());
final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext());
ChannelPromise promise = channel.newPromise();
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
queue.removeAndWriteAll();
}
});
queue.add(1L, promise);
ChannelPromise promise2 = channel.newPromise();
queue.add(2L, promise2);
queue.removeAndWriteAll();
channel.flush();
assertTrue(promise.isSuccess());
assertTrue(promise2.isSuccess());
assertTrue(channel.finish());
assertEquals(1L, channel.readOutbound());
assertEquals(2L, channel.readOutbound());
assertNull(channel.readOutbound());
assertNull(channel.readInbound());
}
@Test
public void basicAuthShouldWork() throws Exception {
URI uri = new URI("http://user:[email protected]/foo");
EmbeddedChannel ch = new EmbeddedChannel(new HttpDownloadHandler(null, ImmutableList.of()));
DownloadCommand cmd = new DownloadCommand(uri, true, DIGEST, new ByteArrayOutputStream());
ChannelPromise writePromise = ch.newPromise();
ch.writeOneOutbound(cmd, writePromise);
HttpRequest request = ch.readOutbound();
assertThat(request.headers().get(HttpHeaderNames.AUTHORIZATION))
.isEqualTo("Basic dXNlcjpwYXNzd29yZA==");
}
@Test
public void basicAuthShouldNotEnabled() throws Exception {
URI uri = new URI("http://does.not.exist/foo");
EmbeddedChannel ch = new EmbeddedChannel(new HttpDownloadHandler(null, ImmutableList.of()));
DownloadCommand cmd = new DownloadCommand(uri, true, DIGEST, new ByteArrayOutputStream());
ChannelPromise writePromise = ch.newPromise();
ch.writeOneOutbound(cmd, writePromise);
HttpRequest request = ch.readOutbound();
assertThat(request.headers().contains(HttpHeaderNames.AUTHORIZATION)).isFalse();
}
@Test
public void hostDoesntIncludePortHttp() throws Exception {
URI uri = new URI("http://does.not.exist/foo");
EmbeddedChannel ch = new EmbeddedChannel(new HttpDownloadHandler(null, ImmutableList.of()));
DownloadCommand cmd = new DownloadCommand(uri, true, DIGEST, new ByteArrayOutputStream());
ChannelPromise writePromise = ch.newPromise();
ch.writeOneOutbound(cmd, writePromise);
HttpRequest request = ch.readOutbound();
assertThat(request.headers().get(HttpHeaderNames.HOST)).isEqualTo("does.not.exist");
}
@Test
public void hostDoesntIncludePortHttps() throws Exception {
URI uri = new URI("https://does.not.exist/foo");
EmbeddedChannel ch = new EmbeddedChannel(new HttpDownloadHandler(null, ImmutableList.of()));
DownloadCommand cmd = new DownloadCommand(uri, true, DIGEST, new ByteArrayOutputStream());
ChannelPromise writePromise = ch.newPromise();
ch.writeOneOutbound(cmd, writePromise);
HttpRequest request = ch.readOutbound();
assertThat(request.headers().get(HttpHeaderNames.HOST)).isEqualTo("does.not.exist");
}
@Test
public void hostDoesIncludePort() throws Exception {
URI uri = new URI("http://does.not.exist:8080/foo");
EmbeddedChannel ch = new EmbeddedChannel(new HttpDownloadHandler(null, ImmutableList.of()));
DownloadCommand cmd = new DownloadCommand(uri, true, DIGEST, new ByteArrayOutputStream());
ChannelPromise writePromise = ch.newPromise();
ch.writeOneOutbound(cmd, writePromise);
HttpRequest request = ch.readOutbound();
assertThat(request.headers().get(HttpHeaderNames.HOST)).isEqualTo("does.not.exist:8080");
}
@Test
public void headersDoIncludeUserAgent() throws Exception {
URI uri = new URI("http://does.not.exist:8080/foo");
EmbeddedChannel ch =
new EmbeddedChannel(new HttpDownloadHandler(/* credentials= */ null, ImmutableList.of()));
DownloadCommand cmd =
new DownloadCommand(uri, /* casDownload= */ true, DIGEST, new ByteArrayOutputStream());
ChannelPromise writePromise = ch.newPromise();
ch.writeOneOutbound(cmd, writePromise);
HttpRequest request = ch.readOutbound();
assertThat(request.headers().get(HttpHeaderNames.USER_AGENT)).isEqualTo("bazel/");
}
private void downloadShouldWork(boolean casDownload, EmbeddedChannel ch) throws IOException {
ByteArrayOutputStream out = Mockito.spy(new ByteArrayOutputStream());
DownloadCommand cmd = new DownloadCommand(CACHE_URI, casDownload, DIGEST, out);
ChannelPromise writePromise = ch.newPromise();
ch.writeOneOutbound(cmd, writePromise);
HttpRequest request = ch.readOutbound();
assertThat(request.method()).isEqualTo(HttpMethod.GET);
assertThat(request.headers().get(HttpHeaderNames.HOST)).isEqualTo(CACHE_URI.getHost());
if (casDownload) {
assertThat(request.uri()).isEqualTo("/cache-bucket/cas/" + DIGEST.getHash());
} else {
assertThat(request.uri()).isEqualTo("/cache-bucket/ac/" + DIGEST.getHash());
}
assertThat(writePromise.isDone()).isFalse();
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.headers().set(HttpHeaders.CONTENT_LENGTH, 5);
response.headers().set(HttpHeaders.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ch.writeInbound(response);
ByteBuf content = Unpooled.buffer();
content.writeBytes(new byte[] {1, 2, 3, 4, 5});
ch.writeInbound(new DefaultLastHttpContent(content));
assertThat(writePromise.isDone()).isTrue();
assertThat(out.toByteArray()).isEqualTo(new byte[] {1, 2, 3, 4, 5});
verify(out, never()).close();
assertThat(ch.isActive()).isTrue();
}
/**
* Test that the handler correctly supports http error codes i.e. 404 (NOT FOUND) with a
* Content-Length header.
*/
@Test
public void httpErrorsWithContentAreSupported() throws IOException {
EmbeddedChannel ch = new EmbeddedChannel(new HttpDownloadHandler(null, ImmutableList.of()));
ByteArrayOutputStream out = Mockito.spy(new ByteArrayOutputStream());
DownloadCommand cmd = new DownloadCommand(CACHE_URI, true, DIGEST, out);
ChannelPromise writePromise = ch.newPromise();
ch.writeOneOutbound(cmd, writePromise);
HttpResponse response =
new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);
ByteBuf errorMessage = ByteBufUtil.writeAscii(ch.alloc(), "Error message");
response.headers().set(HttpHeaders.HOST, "localhost");
response
.headers()
.set(HttpHeaders.CONTENT_LENGTH, String.valueOf(errorMessage.readableBytes()));
response.headers().set(HttpHeaders.CONNECTION, HttpHeaderValues.CLOSE);
ch.writeInbound(response);
// The promise must not be done because we haven't received the error message yet.
assertThat(writePromise.isDone()).isFalse();
ch.writeInbound(new DefaultHttpContent(errorMessage));
ch.writeInbound(LastHttpContent.EMPTY_LAST_CONTENT);
assertThat(writePromise.isDone()).isTrue();
assertThat(writePromise.cause()).isInstanceOf(HttpException.class);
assertThat(((HttpException) writePromise.cause()).response().status())
.isEqualTo(HttpResponseStatus.NOT_FOUND);
// No data should have been written to the OutputStream and it should have been closed.
assertThat(out.size()).isEqualTo(0);
// The caller is responsible for closing the stream.
verify(out, never()).close();
assertThat(ch.isOpen()).isFalse();
}