下面列出了io.netty.channel.ChannelId#io.netty.channel.embedded.EmbeddedChannel 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testHttpUpgradeRequestInvalidUpgradeHeader() {
EmbeddedChannel ch = createChannel();
FullHttpRequest httpRequestWithEntity = new WebSocketRequestBuilder().httpVersion(HTTP_1_1)
.method(HttpMethod.GET)
.uri("/test")
.connection("Upgrade")
.version00()
.upgrade("BogusSocket")
.build();
ch.writeInbound(httpRequestWithEntity);
FullHttpResponse response = ReferenceCountUtil.releaseLater(responses.remove());
assertEquals(BAD_REQUEST, response.getStatus());
assertEquals("not a WebSocket handshake request: missing upgrade", getResponseMessage(response));
}
private void writeBytesToChannel(EmbeddedChannel ch, byte[] bytes, boolean randomlySlice) {
if (randomlySlice) {
long bytesWritten = 0;
List<List<Byte>> slices = NetTestUtils.getRandomByteSlices(bytes);
for (int s = 0; s<slices.size(); s++) {
List<Byte> slice = slices.get(s);
byte[] sliceBytes = Bytes.toArray(slice);
ch.writeInbound(Unpooled.wrappedBuffer(sliceBytes));
bytesWritten += sliceBytes.length;
}
assertThat(bytesWritten, equalTo((long)bytes.length));
} else {
ch.writeInbound(Unpooled.wrappedBuffer(bytes));
}
}
@Test
public void testSimpleUnmarshalling() throws IOException {
MarshallerFactory marshallerFactory = createMarshallerFactory();
MarshallingConfiguration configuration = createMarshallingConfig();
EmbeddedChannel ch = new EmbeddedChannel(createDecoder(Integer.MAX_VALUE));
ByteArrayOutputStream bout = new ByteArrayOutputStream();
Marshaller marshaller = marshallerFactory.createMarshaller(configuration);
marshaller.start(Marshalling.createByteOutput(bout));
marshaller.writeObject(testObject);
marshaller.finish();
marshaller.close();
byte[] testBytes = bout.toByteArray();
ch.writeInbound(input(testBytes));
assertTrue(ch.finish());
String unmarshalled = ch.readInbound();
assertEquals(testObject, unmarshalled);
assertNull(ch.readInbound());
}
@Test
public void testEncodeEmptyFullRequest() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new Http2StreamFrameToHttpObjectCodec(false));
assertTrue(ch.writeOutbound(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/hello/world")));
Http2HeadersFrame headersFrame = ch.readOutbound();
Http2Headers headers = headersFrame.headers();
assertThat(headers.scheme().toString(), is("http"));
assertThat(headers.method().toString(), is("GET"));
assertThat(headers.path().toString(), is("/hello/world"));
assertTrue(headersFrame.isEndStream());
assertThat(ch.readOutbound(), is(nullValue()));
assertFalse(ch.finish());
}
@Test
public void testUpgradeNonEmptyFullResponse() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new Http2StreamFrameToHttpObjectCodec(true));
ByteBuf hello = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);
assertTrue(ch.writeOutbound(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, hello)));
Http2HeadersFrame headersFrame = ch.readOutbound();
assertThat(headersFrame.headers().status().toString(), is("200"));
assertFalse(headersFrame.isEndStream());
Http2DataFrame dataFrame = ch.readOutbound();
try {
assertThat(dataFrame.content().toString(CharsetUtil.UTF_8), is("hello world"));
assertTrue(dataFrame.isEndStream());
} finally {
dataFrame.release();
}
assertThat(ch.readOutbound(), is(nullValue()));
assertFalse(ch.finish());
}
@Test
public void testChannelReadPacket() throws Exception {
ByteBuf content = Unpooled.copiedBuffer("d=3:::{\"greetings\":\"Hello World!\"}", CharsetUtil.UTF_8);
HttpRequest request =
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/socket.io/1/jsonp-polling", content);
String origin = "http://localhost:8080";
request.headers().add(HttpHeaderNames.ORIGIN, origin);
LastOutboundHandler lastOutboundHandler = new LastOutboundHandler();
EmbeddedChannel channel = new EmbeddedChannel(lastOutboundHandler, jsonpPollingHandler);
channel.writeInbound(request);
Object object = channel.readInbound();
assertTrue(object instanceof Packet);
Packet packet = (Packet) object;
assertEquals(origin, packet.getOrigin());
assertEquals("{\"greetings\":\"Hello World!\"}", packet.getData().toString(CharsetUtil.UTF_8));
channel.finish();
}
@Test
public void test_filter_ignored_topics() throws Exception {
final Topic anothertopic = new Topic("anothertopic", QoS.AT_LEAST_ONCE);
final List<SubscriptionResult> subscriptions = newArrayList(
subResult(new Topic("topic", QoS.AT_LEAST_ONCE), false),
subResult(anothertopic, false));
ignoredTopics.add(anothertopic);
final SendRetainedMessagesListener listener = createListener(subscriptions, ignoredTopics);
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new PublishUserEventReceivedHandler());
embeddedChannel.attr(ChannelAttributes.CLIENT_ID).set("client");
when(retainedMessagePersistence.get("topic")).thenReturn(Futures.immediateFuture(
new RetainedMessage("test".getBytes(UTF_8), QoS.EXACTLY_ONCE, 1L,
MqttConfigurationDefaults.TTL_DISABLED)));
when(retainedMessagePersistence.get("anothertopic")).thenReturn(Futures.immediateFuture(
new RetainedMessage("test".getBytes(UTF_8), QoS.EXACTLY_ONCE, 1L,
MqttConfigurationDefaults.TTL_DISABLED)));
listener.operationComplete(embeddedChannel.newSucceededFuture());
embeddedChannel.runPendingTasks();
verify(queuePersistence).add(eq("client"), eq(false), anyList(), eq(true));
}
@Test
public void testEncoder() {
String writeData = "안녕하세요";
ByteBuf request = Unpooled.wrappedBuffer(writeData.getBytes());
Base64Encoder encoder = new Base64Encoder();
EmbeddedChannel embeddedChannel = new EmbeddedChannel(encoder);
embeddedChannel.writeOutbound(request);
ByteBuf response = (ByteBuf) embeddedChannel.readOutbound();
String expect = "7JWI64WV7ZWY7IS47JqU";
assertEquals(expect, response.toString(Charset.defaultCharset()));
embeddedChannel.finish();
}
@Test
public void testPassThroughOther() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new Http2StreamFrameToHttpObjectCodec(true));
Http2ResetFrame reset = new DefaultHttp2ResetFrame(0);
Http2GoAwayFrame goaway = new DefaultHttp2GoAwayFrame(0);
assertTrue(ch.writeInbound(reset));
assertTrue(ch.writeInbound(goaway.retain()));
assertEquals(reset, ch.readInbound());
Http2GoAwayFrame frame = ch.readInbound();
try {
assertEquals(goaway, frame);
assertThat(ch.readInbound(), is(nullValue()));
assertFalse(ch.finish());
} finally {
goaway.release();
frame.release();
}
}
@Before
public void setup() {
isTriggered.set(false);
MockitoAnnotations.initMocks(this);
executor = new PluginTaskExecutor(new AtomicLong());
executor.postConstruct();
channel = new EmbeddedChannel();
channel.attr(ChannelAttributes.CLIENT_ID).set("client");
channel.attr(ChannelAttributes.REQUEST_RESPONSE_INFORMATION).set(true);
channel.attr(ChannelAttributes.PLUGIN_CLIENT_CONTEXT).set(clientContext);
when(extension.getId()).thenReturn("extension");
configurationService = new TestConfigurationBootstrap().getFullConfigurationService();
final PluginOutPutAsyncer asyncer = new PluginOutputAsyncerImpl(Mockito.mock(ShutdownHooks.class));
final PluginTaskExecutorService pluginTaskExecutorService = new PluginTaskExecutorServiceImpl(() -> executor, mock(ShutdownHooks.class));
handler =
new UnsubscribeInboundInterceptorHandler(configurationService, asyncer, extensions,
pluginTaskExecutorService);
channel.pipeline().addFirst(handler);
}
private static void testUpgrade(Http2ConnectionHandler handler) {
FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.OPTIONS, "*");
request.headers().set(HttpHeaderNames.HOST, "netty.io");
request.headers().set(HttpHeaderNames.CONNECTION, "Upgrade, HTTP2-Settings");
request.headers().set(HttpHeaderNames.UPGRADE, "h2c");
request.headers().set("HTTP2-Settings", "AAMAAABkAAQAAP__");
EmbeddedChannel channel = new EmbeddedChannel(new ChannelInboundHandlerAdapter());
ChannelHandlerContext ctx = channel.pipeline().firstContext();
Http2ServerUpgradeCodec codec = new Http2ServerUpgradeCodec("connectionHandler", handler);
assertTrue(codec.prepareUpgradeResponse(ctx, request, new DefaultHttpHeaders()));
codec.upgradeTo(ctx, request);
// Flush the channel to ensure we write out all buffered data
channel.flush();
assertSame(handler, channel.pipeline().remove("connectionHandler"));
assertNull(channel.pipeline().get(handler.getClass()));
assertTrue(channel.finish());
// Check that the preface was send (a.k.a the settings frame)
ByteBuf settingsBuffer = channel.readOutbound();
assertNotNull(settingsBuffer);
settingsBuffer.release();
assertNull(channel.readOutbound());
}
private MqttConnAckMessage executeNormalChannelRead0(String clientId, boolean cleanSession, ChannelId channelId)
throws Exception {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false,
10);
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader("MQTT", 4, true, true, true, 0, true,
cleanSession, 60);
MqttConnectPayload payload = new MqttConnectPayload(clientId, "willtopic", "willmessage", "username",
"password");
MqttConnectMessage msg = new MqttConnectMessage(fixedHeader, variableHeader, payload);
ChannelId cid = channelId == null ? TestUtil.newChannelId(clientId, false) : channelId;
EmbeddedChannel channel = new EmbeddedChannel(cid, new ConnectReceiver());
channel.writeInbound(msg);
return channel.readOutbound();
}
/**
* Test of a 200 OK response, without body.
*/
@Test
public void testSend200OkResponseWithoutBody() {
String expected = "RTSP/1.0 200 OK\r\n"
+ "server: Testserver\r\n"
+ "cseq: 1\r\n"
+ "session: 2547019973447939919\r\n"
+ "\r\n";
HttpResponse response = new DefaultHttpResponse(RtspVersions.RTSP_1_0,
RtspResponseStatuses.OK);
response.headers().add(RtspHeaderNames.SERVER, "Testserver");
response.headers().add(RtspHeaderNames.CSEQ, "1");
response.headers().add(RtspHeaderNames.SESSION, "2547019973447939919");
EmbeddedChannel ch = new EmbeddedChannel(new RtspEncoder());
ch.writeOutbound(response);
ByteBuf buf = ch.readOutbound();
String actual = buf.toString(CharsetUtil.UTF_8);
buf.release();
assertEquals(expected, actual);
}
@Test
public void testFullHttpRequest() {
// test that ContentDecoder can be used after the ObjectAggregator
HttpRequestDecoder decoder = new HttpRequestDecoder(4096, 4096, 5);
HttpObjectAggregator aggregator = new HttpObjectAggregator(1024);
HttpContentDecoder decompressor = new HttpContentDecompressor();
EmbeddedChannel channel = new EmbeddedChannel(decoder, aggregator, decompressor);
String headers = "POST / HTTP/1.1\r\n" +
"Content-Length: " + GZ_HELLO_WORLD.length + "\r\n" +
"Content-Encoding: gzip\r\n" +
"\r\n";
assertTrue(channel.writeInbound(Unpooled.copiedBuffer(headers.getBytes(), GZ_HELLO_WORLD)));
Queue<Object> req = channel.inboundMessages();
assertTrue(req.size() > 1);
int contentLength = 0;
contentLength = calculateContentLength(req, contentLength);
byte[] receivedContent = readContent(req, contentLength, true);
assertEquals(HELLO_WORLD, new String(receivedContent, CharsetUtil.US_ASCII));
assertHasInboundMessages(channel, true);
assertHasOutboundMessages(channel, false);
assertFalse(channel.finish());
}
@NotNull
private List<Record> collect10NetflowV5MessagesFromChannel(EmbeddedChannel ch, int packetLength) {
List<Record> records = new LinkedList<>();
for (int i=0; i<10; i++) {
Object object = ch.readInbound();
assertNotNull(object);
assertThat(object, is(instanceOf(NetflowV5Message.class)));
NetflowV5Message msg = (NetflowV5Message) object;
// fix packet length for test; it passes in MAX_LENGTH by default
msg.setLength(packetLength);
Record record = RecordCreator.create();
msg.populateRecord(record);
records.add(record);
}
return records;
}
@Test
public void testLastResponseWithHeaderRemoveTrailingSpaces() {
EmbeddedChannel ch = new EmbeddedChannel(new HttpResponseDecoder());
ch.writeInbound(Unpooled.copiedBuffer(
"HTTP/1.1 200 OK\r\nX-Header: h2=h2v2; Expires=Wed, 09-Jun-2021 10:18:14 GMT \r\n\r\n",
CharsetUtil.US_ASCII));
HttpResponse res = ch.readInbound();
assertThat(res.protocolVersion(), sameInstance(HttpVersion.HTTP_1_1));
assertThat(res.status(), is(HttpResponseStatus.OK));
assertThat(res.headers().get(of("X-Header")), is("h2=h2v2; Expires=Wed, 09-Jun-2021 10:18:14 GMT"));
assertThat(ch.readInbound(), is(nullValue()));
ch.writeInbound(Unpooled.wrappedBuffer(new byte[1024]));
HttpContent content = ch.readInbound();
assertThat(content.content().readableBytes(), is(1024));
content.release();
assertThat(ch.finish(), is(true));
LastHttpContent lastContent = ch.readInbound();
assertThat(lastContent.content().isReadable(), is(false));
lastContent.release();
assertThat(ch.readInbound(), is(nullValue()));
}
@Test
public void testDecode() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(
new DelimiterBasedFrameDecoder(8192, true, Delimiters.lineDelimiter()));
ch.writeInbound(Unpooled.copiedBuffer("first\r\nsecond\nthird", CharsetUtil.US_ASCII));
ByteBuf buf = ch.readInbound();
assertEquals("first", buf.toString(CharsetUtil.US_ASCII));
ByteBuf buf2 = ch.readInbound();
assertEquals("second", buf2.toString(CharsetUtil.US_ASCII));
assertNull(ch.readInbound());
ch.finish();
ReferenceCountUtil.release(ch.readInbound());
buf.release();
buf2.release();
}
@Test
public void readResponseTest() throws Exception {
EmbeddedChannel embedder = new EmbeddedChannel(new DatagramDnsResponseDecoder());
for (byte[] p: packets) {
ByteBuf packet = embedder.alloc().buffer(512).writeBytes(p);
embedder.writeInbound(new DatagramPacket(packet, null, new InetSocketAddress(0)));
AddressedEnvelope<DnsResponse, InetSocketAddress> envelope = embedder.readInbound();
assertThat(envelope, is(instanceOf(DatagramDnsResponse.class)));
DnsResponse response = envelope.content();
assertThat(response, is(sameInstance((Object) envelope)));
ByteBuf raw = Unpooled.wrappedBuffer(p);
assertThat(response.id(), is(raw.getUnsignedShort(0)));
assertThat(response.count(DnsSection.QUESTION), is(raw.getUnsignedShort(4)));
assertThat(response.count(DnsSection.ANSWER), is(raw.getUnsignedShort(6)));
assertThat(response.count(DnsSection.AUTHORITY), is(raw.getUnsignedShort(8)));
assertThat(response.count(DnsSection.ADDITIONAL), is(raw.getUnsignedShort(10)));
envelope.release();
}
}
@Test
public void testHttpActionAsyncPing() throws Exception {
initTestClient();
EmbeddedChannel channel = createTestChannel();
AsteriskPingGetRequest req = pingSetup(channel);
final boolean[] callback = {false};
req.execute(new AriCallback<AsteriskPing>() {
@Override
public void onSuccess(AsteriskPing res) {
pingValidate(channel, res);
callback[0] = true;
}
@Override
public void onFailure(RestException e) {
fail(e.toString());
}
});
channel.runPendingTasks();
assertTrue("No onSuccess Callback", callback[0]);
}
@Test
public void testBindMessageCanBeReadIfTypeForParamsIsUnknown() throws Exception {
PostgresWireProtocol ctx =
new PostgresWireProtocol(
sqlOperations,
sessionContext -> AccessControl.DISABLED,
new AlwaysOKNullAuthentication(),
null);
channel = new EmbeddedChannel(ctx.decoder, ctx.handler);
ByteBuf buffer = Unpooled.buffer();
ClientMessages.sendStartupMessage(buffer, "doc");
ClientMessages.sendParseMessage(buffer, "S1", "select ?, ?", new int[0]); // no type hints for parameters
List<Object> params = Arrays.asList(10, 20);
ClientMessages.sendBindMessage(buffer, "P1", "S1", params);
channel.writeInbound(buffer);
channel.releaseInbound();
Session session = sessions.get(0);
// If the query can be retrieved via portalName it means bind worked
assertThat(session.getQuery("P1"), is("select ?, ?"));
}
/**
* If the length of the content is 0 for sure, {@link HttpContentEncoder} should skip encoding.
*/
@Test
public void testEmptyFullContent() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new TestEncoder());
ch.writeInbound(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"));
FullHttpResponse res = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.EMPTY_BUFFER);
ch.writeOutbound(res);
Object o = ch.readOutbound();
assertThat(o, is(instanceOf(FullHttpResponse.class)));
res = (FullHttpResponse) o;
assertThat(res.headers().get(Names.TRANSFER_ENCODING), is(nullValue()));
// Content encoding shouldn't be modified.
assertThat(res.headers().get(Names.CONTENT_ENCODING), is(nullValue()));
assertThat(res.content().readableBytes(), is(0));
assertThat(res.content().toString(CharsetUtil.US_ASCII), is(""));
res.release();
assertThat(ch.readOutbound(), is(nullValue()));
}
private void assertHasInboundMessages(EmbeddedChannel channel, boolean hasMessages) {
Object o;
if (hasMessages) {
while (true) {
o = channel.readInbound();
assertNotNull(o);
ReferenceCountUtil.release(o);
if (o instanceof LastHttpContent) {
break;
}
}
} else {
o = channel.readInbound();
assertNull(o);
}
}
@Test
public void testPrependLengthInLittleEndian() throws Exception {
final EmbeddedChannel ch = new EmbeddedChannel(new LengthFieldPrepender(ByteOrder.LITTLE_ENDIAN, 4, 0, false));
ch.writeOutbound(msg);
ByteBuf buf = ch.readOutbound();
assertEquals(4, buf.readableBytes());
byte[] writtenBytes = new byte[buf.readableBytes()];
buf.getBytes(0, writtenBytes);
assertEquals(1, writtenBytes[0]);
assertEquals(0, writtenBytes[1]);
assertEquals(0, writtenBytes[2]);
assertEquals(0, writtenBytes[3]);
buf.release();
buf = ch.readOutbound();
assertSame(buf, msg);
buf.release();
assertFalse("The channel must have been completely read", ch.finish());
}
@Test
public void testUpgradeEmptyEnd() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new Http2StreamFrameToHttpObjectCodec(true));
LastHttpContent end = LastHttpContent.EMPTY_LAST_CONTENT;
assertTrue(ch.writeOutbound(end));
Http2DataFrame emptyFrame = ch.readOutbound();
try {
assertThat(emptyFrame.content().readableBytes(), is(0));
assertTrue(emptyFrame.isEndStream());
} finally {
emptyFrame.release();
}
assertThat(ch.readOutbound(), is(nullValue()));
assertFalse(ch.finish());
}
@Test
public void testSuccess_twoChannels_twoUserIds() {
// Set up another user.
final WhoisQuotaHandler otherHandler = new WhoisQuotaHandler(quotaManager, metrics);
final EmbeddedChannel otherChannel = new EmbeddedChannel(otherHandler);
final String otherRemoteAddress = "192.168.0.1";
otherChannel.attr(REMOTE_ADDRESS_KEY).set(otherRemoteAddress);
setProtocol(otherChannel);
final DateTime later = now.plus(Duration.standardSeconds(1));
when(quotaManager.acquireQuota(QuotaRequest.create(remoteAddress)))
.thenReturn(QuotaResponse.create(true, remoteAddress, now));
when(quotaManager.acquireQuota(QuotaRequest.create(otherRemoteAddress)))
.thenReturn(QuotaResponse.create(false, otherRemoteAddress, later));
// Allows the first user.
assertThat(channel.writeInbound(message)).isTrue();
assertThat((Object) channel.readInbound()).isEqualTo(message);
assertThat(channel.isActive()).isTrue();
// Blocks the second user.
OverQuotaException e =
assertThrows(OverQuotaException.class, () -> otherChannel.writeInbound(message));
assertThat(e).hasMessageThat().contains("none");
verify(metrics).registerQuotaRejection("whois", "none");
verifyNoMoreInteractions(metrics);
}
@Test
public void testCompressedFrame() {
EmbeddedChannel encoderChannel = new EmbeddedChannel(
ZlibCodecFactory.newZlibEncoder(ZlibWrapper.NONE, 9, 15, 8));
EmbeddedChannel decoderChannel = new EmbeddedChannel(new PerMessageDeflateDecoder(false));
// initialize
byte[] payload = new byte[300];
random.nextBytes(payload);
encoderChannel.writeOutbound(Unpooled.wrappedBuffer(payload));
ByteBuf compressedPayload = encoderChannel.readOutbound();
BinaryWebSocketFrame compressedFrame = new BinaryWebSocketFrame(true,
WebSocketExtension.RSV1 | WebSocketExtension.RSV3,
compressedPayload.slice(0, compressedPayload.readableBytes() - 4));
// execute
decoderChannel.writeInbound(compressedFrame);
BinaryWebSocketFrame uncompressedFrame = decoderChannel.readInbound();
// test
assertNotNull(uncompressedFrame);
assertNotNull(uncompressedFrame.content());
assertTrue(uncompressedFrame instanceof BinaryWebSocketFrame);
assertEquals(WebSocketExtension.RSV3, uncompressedFrame.rsv());
assertEquals(300, uncompressedFrame.content().readableBytes());
byte[] finalPayload = new byte[300];
uncompressedFrame.content().readBytes(finalPayload);
assertTrue(Arrays.equals(finalPayload, payload));
uncompressedFrame.release();
}
@Test
public void testFlushViaThresholdOutsideOfReadLoop() {
final AtomicInteger flushCount = new AtomicInteger();
EmbeddedChannel channel = newChannel(flushCount, true);
// After a given threshold, the async task should be bypassed and a flush should be triggered immediately
for (int i = 0; i < EXPLICIT_FLUSH_AFTER_FLUSHES; i++) {
channel.flush();
}
assertEquals(1, flushCount.get());
assertFalse(channel.finish());
}
@Before
@SuppressWarnings("unchecked")
public void setup() {
responseBuffer = new Disruptor<ResponseEvent>(new EventFactory<ResponseEvent>() {
@Override
public ResponseEvent newInstance() {
return new ResponseEvent();
}
}, 1024, Executors.newCachedThreadPool());
firedEvents = Collections.synchronizedList(new ArrayList<CouchbaseMessage>());
latch = new CountDownLatch(1);
responseBuffer.handleEventsWith(new EventHandler<ResponseEvent>() {
@Override
public void onEvent(ResponseEvent event, long sequence, boolean endOfBatch) throws Exception {
firedEvents.add(event.getMessage());
latch.countDown();
}
});
responseRingBuffer = responseBuffer.start();
CoreEnvironment environment = mock(CoreEnvironment.class);
when(environment.scheduler()).thenReturn(Schedulers.computation());
when(environment.maxRequestLifetime()).thenReturn(10000L); // 10 seconds
when(environment.autoreleaseAfter()).thenReturn(2000L);
when(environment.retryStrategy()).thenReturn(FailFastRetryStrategy.INSTANCE);
endpoint = mock(AbstractEndpoint.class);
when(endpoint.environment()).thenReturn(environment);
when(endpoint.context()).thenReturn(new CoreContext(environment, null, 1));
when(environment.userAgent()).thenReturn("Couchbase Client Mock");
queue = new ArrayDeque<ViewRequest>();
handler = new ViewHandler(endpoint, responseRingBuffer, queue, false, false);
channel = new EmbeddedChannel(handler);
}
protected void initializeChannel(Consumer<Channel> initializer) {
channel =
new EmbeddedChannel(
new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
initializer.accept(ch);
}
});
}
@Test
public void testResponseDecompression() {
// baseline test: response decoder, content decompressor && request aggregator work as expected
HttpResponseDecoder decoder = new HttpResponseDecoder();
HttpContentDecoder decompressor = new HttpContentDecompressor();
HttpObjectAggregator aggregator = new HttpObjectAggregator(1024);
EmbeddedChannel channel = new EmbeddedChannel(decoder, decompressor, aggregator);
String headers = "HTTP/1.1 200 OK\r\n" +
"Content-Length: " + GZ_HELLO_WORLD.length + "\r\n" +
"Content-Encoding: gzip\r\n" +
"\r\n";
ByteBuf buf = Unpooled.copiedBuffer(headers.getBytes(CharsetUtil.US_ASCII), GZ_HELLO_WORLD);
assertTrue(channel.writeInbound(buf));
Object o = channel.readInbound();
assertThat(o, is(instanceOf(FullHttpResponse.class)));
FullHttpResponse resp = (FullHttpResponse) o;
assertEquals(HELLO_WORLD.length(),
Integer.valueOf(resp.headers().get(HttpHeaders.Names.CONTENT_LENGTH)).intValue());
assertEquals(HELLO_WORLD, resp.content().toString(CharsetUtil.US_ASCII));
resp.release();
assertHasInboundMessages(channel, false);
assertHasOutboundMessages(channel, false);
assertFalse(channel.finish()); // assert that no messages are left in channel
}