下面列出了怎么用io.netty.util.CharsetUtil的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testEncodeDecode() throws Exception {
CallRequestFrame callRequestFrame = Fixtures.callRequest(42, false, Unpooled.wrappedBuffer("Hello, World!".getBytes(StandardCharsets.UTF_8)));
CallRequestFrame inboundCallRequestFrame =
(CallRequestFrame) MessageCodec.decode(
CodecTestUtil.encodeDecode(
MessageCodec.encode(
ByteBufAllocator.DEFAULT, callRequestFrame
)
)
);
assertEquals("Hello, World!", inboundCallRequestFrame.getPayload().toString(CharsetUtil.UTF_8));
inboundCallRequestFrame.getPayload().release();
}
@Test
public void shouldReleaseStoreRequestContentOnSuccess() throws Exception {
ByteBuf content = Unpooled.copiedBuffer("content", CharsetUtil.UTF_8);
FullBinaryMemcacheResponse response = new DefaultFullBinaryMemcacheResponse(KEY, Unpooled.EMPTY_BUFFER,
content);
response.setStatus(BinaryMemcacheResponseStatus.SUCCESS);
UpsertRequest requestMock = mock(UpsertRequest.class);
ByteBuf requestContent = Unpooled.copiedBuffer("content", CharsetUtil.UTF_8);
when(requestMock.bucket()).thenReturn("bucket");
when(requestMock.observable()).thenReturn(AsyncSubject.<CouchbaseResponse>create());
when(requestMock.content()).thenReturn(requestContent);
requestQueue.add(requestMock);
assertEquals(1, content.refCnt());
assertEquals(1, requestContent.refCnt());
channel.writeInbound(response);
assertEquals(1, content.refCnt());
assertEquals(0, requestContent.refCnt());
}
@Override
public void activeProcess(ProcessDto processDto) {
log.info("FindNodeProcessor activeProcess "+processDto.getMessageInfo()+"talbesize="+routingTables.size());
Map<String, Object> rMap = DHTUtil.getParamMap(processDto.getRawMap(), "r", "FIND_NODE,找不到r参数.map:" + processDto.getRawMap());
List<Node> nodeList = DHTUtil.getNodeListByRMap(rMap);
//为空退出
if (CollectionUtils.isEmpty(nodeList)) return;
//去重
Node[] nodes = nodeList.stream().distinct().toArray(Node[]::new);
//将nodes加入发送队列
for (Node node : nodes) {
FileUtil.wirteNode("nodes1=="+node.getIp()+","+node.getPort()+"\r\n");
findNodeTask.put(node.toAddress());
}
byte[] id = DHTUtil.getParamString(rMap, "id", "FIND_NODE,找不到id参数.map:" + processDto.getRawMap()).getBytes(CharsetUtil.ISO_8859_1);
//将发送消息的节点加入路由表
routingTables.get(processDto.getNum()).put(new Node(id, processDto.getSender(), NodeRankEnum.FIND_NODE_RECEIVE.getKey()));
}
@Override
public ServiceException decodeFromWire(int pos, Buffer buffer) {
int failureCode = buffer.getInt(pos);
pos += 4;
boolean isNull = buffer.getByte(pos) == (byte)0;
pos++;
String message;
if (!isNull) {
int strLength = buffer.getInt(pos);
pos += 4;
byte[] bytes = buffer.getBytes(pos, pos + strLength);
message = new String(bytes, CharsetUtil.UTF_8);
pos += strLength;
} else {
message = null;
}
JsonObject debugInfo = new JsonObject();
debugInfo.readFromBuffer(pos, buffer);
return new ServiceException(failureCode, message, debugInfo);
}
@Test
public void shouldNotCompressWithPoorCompressionRatio() throws Exception {
String text = "Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo " +
"ligula eget dolor. Aenean massa. Cum sociis natoque penatibus et magnis dis parturient " +
"montes, nascetur ridiculus mus.";
channel.pipeline().addFirst(new SnappyFeatureHandler());
ByteBuf content = Unpooled.copiedBuffer(text, CharsetUtil.UTF_8);
UpsertRequest request = new UpsertRequest("key", content.copy(), "bucket");
request.partition((short) 512);
channel.writeOutbound(request);
FullBinaryMemcacheRequest outbound = (FullBinaryMemcacheRequest) channel.readOutbound();
assertNotNull(outbound);
assertEquals(0, outbound.getDataType());
ReferenceCountUtil.release(outbound);
}
protected int broadcast(String srcActor, String dstNodeType, String dstActor, int userCmd, String userData){
int failedNum = 0;
DFNodeList nodeLs = null;
if(dstNodeType == null){ //all node
nodeLs = _getAllNodeSafe();
}else{ //by type
nodeLs = _getNodeByTypeSafe(dstNodeType);
}
if(nodeLs != null){
byte[] bufUser = null;
if(userData != null){
bufUser = userData.getBytes(CharsetUtil.UTF_8);
}
//
DFNode curNode = null;
Iterator<DFNode> it = nodeLs.lsNode.iterator();
while(it.hasNext()){
curNode = it.next();
if(_sendToNode(srcActor, curNode.name, dstActor, null, 0, userCmd, bufUser, RpcParamType.STRING) != 0){
++failedNum;
}
}
}
return failedNum;
}
private static HttpResponse createStatusResponse(HttpResponseStatus responseStatus, HttpRequest request, String description) {
if (request != null && request.method() == HttpMethod.HEAD) {
return new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, responseStatus, Unpooled.EMPTY_BUFFER);
}
StringBuilder builder = new StringBuilder();
String message = responseStatus.toString();
builder.append("<!doctype html><title>").append(message).append("</title>").append("<h1 style=\"text-align: center\">").append(message).append("</h1>");
if (description != null) {
builder.append("<p>").append(description).append("</p>");
}
builder.append("<hr/><p style=\"text-align: center\">").append(StringUtil.notNullize(getServerHeaderValue())).append("</p>");
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, responseStatus, ByteBufUtil
.encodeString(ByteBufAllocator.DEFAULT, CharBuffer.wrap(builder), CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html");
return response;
}
private static void sendHttpResponse(
ChannelHandlerContext ctx, HttpRequest req, FullHttpResponse res) {
// Generate an error page if response status code is not OK (200).
if (res.getStatus().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
setContentLength(res, res.content().readableBytes());
}
// Send the response and close the connection if necessary.
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (!isKeepAlive(req) || res.getStatus().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
@Test
public void deflateEncodingWriteLargeMessage() throws Exception {
final int BUFFER_SIZE = 1 << 12;
final byte[] bytes = new byte[BUFFER_SIZE];
new Random().nextBytes(bytes);
bootstrapEnv(BUFFER_SIZE);
final ByteBuf data = Unpooled.wrappedBuffer(bytes);
try {
final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.DEFLATE);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() throws Http2Exception {
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
clientEncoder.writeData(ctxClient(), 3, data.retain(), 0, true, newPromiseClient());
clientHandler.flush(ctxClient());
}
});
awaitServer();
assertEquals(data.resetReaderIndex().toString(CharsetUtil.UTF_8),
serverOut.toString(CharsetUtil.UTF_8.name()));
} finally {
data.release();
}
}
@Test
public void shouldHandleDoubleInsert() throws Exception {
String key = "insert-key";
String content = "Hello World!";
InsertRequest insert = new InsertRequest(key, Unpooled.copiedBuffer(content, CharsetUtil.UTF_8), bucket());
InsertResponse insertResponse = cluster().<InsertResponse>send(insert).toBlocking().single();
assertEquals(ResponseStatus.SUCCESS, insertResponse.status());
ReferenceCountUtil.releaseLater(insertResponse.content());
assertValidMetadata(insertResponse.mutationToken());
insert = new InsertRequest(key, Unpooled.copiedBuffer(content, CharsetUtil.UTF_8), bucket());
insertResponse = cluster().<InsertResponse>send(insert).toBlocking().single();
assertEquals(ResponseStatus.EXISTS, insertResponse.status());
ReferenceCountUtil.releaseLater(insertResponse.content());
assertNull(insertResponse.mutationToken());
}
ByteBuf readPrivateKey(InputStream in) throws KeyException {
String content;
try {
content = readContent(in);
} catch (IOException e) {
throw new KeyException("failed to read key input stream", e);
}
Matcher m = KEY_PATTERN.matcher(content);
if (!m.find()) {
throw new KeyException(
"could not find a PKCS #8 private key in input stream"
+ " (see https://netty.io/wiki/sslcontextbuilder-and-private-key.html for more information)");
}
ByteBuf base64 = Unpooled.copiedBuffer(m.group(1), CharsetUtil.US_ASCII);
ByteBuf der = Base64.decode(base64);
base64.release();
return der;
}
public String readContent(InputStream in) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
byte[] buf = new byte[8192];
for (; ; ) {
int ret = in.read(buf);
if (ret < 0) {
break;
}
out.write(buf, 0, ret);
}
return out.toString(CharsetUtil.US_ASCII.name());
} finally {
out.close();
}
}
private static void sendHttpResponse(
ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
// Generate an error page if response getStatus code is not OK (200).
if (res.getStatus().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
HttpHeaders.setContentLength(res, res.content().readableBytes());
}
// Send the response and close the connection if necessary.
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
@Test
public void testDecodeDataAsClient() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new Http2StreamFrameToHttpObjectCodec(false));
ByteBuf hello = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);
assertTrue(ch.writeInbound(new DefaultHttp2DataFrame(hello)));
HttpContent content = ch.readInbound();
try {
assertThat(content.content().toString(CharsetUtil.UTF_8), is("hello world"));
assertFalse(content instanceof LastHttpContent);
} finally {
content.release();
}
assertThat(ch.readInbound(), is(nullValue()));
assertFalse(ch.finish());
}
@Test
public void verify_proxy_router_call_works_for_ssl_downstream_system() throws JsonProcessingException {
SerializableObject widget = new SerializableObject(UUID.randomUUID().toString(), generateRandomBytes(32));
String requestPayload = objectMapper.writeValueAsString(widget);
String payloadHash = getHashForPayload(requestPayload.getBytes(CharsetUtil.UTF_8));
ExtractableResponse response =
given()
.baseUri("http://127.0.0.1")
.port(serverConfig.endpointsPort())
.basePath(DownstreamProxySsl.MATCHING_PATH)
.header(REQUEST_PAYLOAD_HASH_HEADER_KEY, payloadHash)
.body(requestPayload)
.log().all()
.when()
.post()
.then()
.log().all()
.statusCode(200)
.extract();
String responsePayload = response.asString();
assertThat(responsePayload).isEqualTo("success_proxy_downstream_endpoint_call_ssl_true");
}
@Test
public void testEncodeDataEndWithTrailersAsClient() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new Http2StreamFrameToHttpObjectCodec(false));
ByteBuf hello = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);
LastHttpContent trailers = new DefaultLastHttpContent(hello, true);
HttpHeaders headers = trailers.trailingHeaders();
headers.set("key", "value");
assertTrue(ch.writeOutbound(trailers));
Http2DataFrame dataFrame = ch.readOutbound();
try {
assertThat(dataFrame.content().toString(CharsetUtil.UTF_8), is("hello world"));
assertFalse(dataFrame.isEndStream());
} finally {
dataFrame.release();
}
Http2HeadersFrame headerFrame = ch.readOutbound();
assertThat(headerFrame.headers().get("key").toString(), is("value"));
assertTrue(headerFrame.isEndStream());
assertThat(ch.readOutbound(), is(nullValue()));
assertFalse(ch.finish());
}
@Test
public void testFullContent() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new TestEncoder());
ch.writeInbound(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"));
FullHttpResponse res = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(new byte[42]));
ch.writeOutbound(res);
assertEncodedResponse(ch);
HttpContent c = ch.readOutbound();
assertThat(c.content().readableBytes(), is(2));
assertThat(c.content().toString(CharsetUtil.US_ASCII), is("42"));
c.release();
LastHttpContent last = ch.readOutbound();
assertThat(last.content().readableBytes(), is(0));
last.release();
assertThat(ch.readOutbound(), is(nullValue()));
}
private void sendError(HttpResponseStatus status, final Throwable error) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
Unpooled.copiedBuffer(error.getMessage(), CharsetUtil.UTF_8));
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
ChannelPromise promise = ctx.newPromise();
promise.addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
throw new ContainerException(error);
}
}
});
ctx.writeAndFlush(response, promise);
}
@Test(
alwaysRun = true,
dependsOnMethods = {"testPredictionsValidRequestSize"})
public void testPredictionsDecodeRequest()
throws InterruptedException, NoSuchFieldException, IllegalAccessException {
Channel inferChannel = TestUtils.getInferenceChannel(configManager);
Channel mgmtChannel = TestUtils.getManagementChannel(configManager);
setConfiguration("decode_input_request", "true");
loadTests(mgmtChannel, "noop-v1.0-config-tests.mar", "noop-config");
TestUtils.setResult(null);
TestUtils.setLatch(new CountDownLatch(1));
DefaultFullHttpRequest req =
new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.POST, "/predictions/noop-config");
req.content().writeCharSequence("{\"data\": \"test\"}", CharsetUtil.UTF_8);
HttpUtil.setContentLength(req, req.content().readableBytes());
req.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
inferChannel.writeAndFlush(req);
TestUtils.getLatch().await();
Assert.assertEquals(TestUtils.getHttpStatus(), HttpResponseStatus.OK);
Assert.assertFalse(TestUtils.getResult().contains("bytearray"));
unloadTests(mgmtChannel, "noop-config");
}
@Test
public void testWillToNullOnNormalDisconnect() throws Exception {
String willTopic = "will";
String message = "ASTALAVISTA";
String clientId = TestUtil.newClientId();
ConnectOptions options = new ConnectOptions();
options.clientId(clientId);
options.will(
new Message(-1, willTopic, null, message.getBytes(CharsetUtil.UTF_8), MqttQoS.AT_LEAST_ONCE, false));
options.cleanSession(false);
MqttClient client0 = new MqttClient("mqtt://localhost:" + Settings.INSTANCE.mqttPort());
MqttConnectReturnCode ret = client0.connectOptions(options).connect();
Assert.assertEquals(MqttConnectReturnCode.CONNECTION_ACCEPTED, ret);
Assert.assertTrue(client0.isConnected());
Assert.assertTrue(Session.NEXUS.get(clientId).will() != null
&& Session.NEXUS.get(clientId).will().topicName().equals(willTopic));
client0.disconnect(true);
Thread.sleep(100);
Assert.assertNull(Session.NEXUS.get(clientId).will());
}
/**
* If the length of the content is 0 for sure, {@link HttpContentEncoder} should skip encoding.
*/
@Test
public void testEmptyFullContent() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new HttpContentCompressor());
ch.writeInbound(newRequest());
FullHttpResponse res = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.EMPTY_BUFFER);
ch.writeOutbound(res);
Object o = ch.readOutbound();
assertThat(o, is(instanceOf(FullHttpResponse.class)));
res = (FullHttpResponse) o;
assertThat(res.headers().get(HttpHeaderNames.TRANSFER_ENCODING), is(nullValue()));
// Content encoding shouldn't be modified.
assertThat(res.headers().get(HttpHeaderNames.CONTENT_ENCODING), is(nullValue()));
assertThat(res.content().readableBytes(), is(0));
assertThat(res.content().toString(CharsetUtil.US_ASCII), is(""));
res.release();
assertThat(ch.readOutbound(), is(nullValue()));
}
@Test
public void gzipEncodingSingleMessage() throws Exception {
final String text = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbccccccccccccccccccccccc";
final ByteBuf data = Unpooled.copiedBuffer(text.getBytes());
bootstrapEnv(data.readableBytes());
try {
final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() throws Http2Exception {
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
clientEncoder.writeData(ctxClient(), 3, data.retain(), 0, true, newPromiseClient());
clientHandler.flush(ctxClient());
}
});
awaitServer();
assertEquals(text, serverOut.toString(CharsetUtil.UTF_8.name()));
} finally {
data.release();
}
}
@Test
@Ignore
public void verifyConstantRetry() throws Exception {
opFailRequest(Long.parseLong("7FF0", 16), -1);
startRetryVerifyRequest();
try {
String key = "upsert-key";
UpsertRequest upsert = new UpsertRequest(key, Unpooled.copiedBuffer("", CharsetUtil.UTF_8), bucket());
UpsertResponse response = cluster().<UpsertResponse>send(upsert).toBlocking().single();
ReferenceCountUtil.releaseLater(response.content());
} catch (Exception ex) {
//ignore exception
}
checkRetryVerifyRequest(Long.parseLong("7FF0", 16), OP_UPSERT, 25);
opFailRequest(Long.parseLong("7FF0", 16), 0);
}
@Test
public void testFailsOnIncompleteChunkedResponse() {
HttpClientCodec codec = new HttpClientCodec(4096, 8192, 8192, true);
EmbeddedChannel ch = new EmbeddedChannel(codec);
ch.writeOutbound(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "http://localhost/"));
ByteBuf buffer = ch.readOutbound();
assertNotNull(buffer);
buffer.release();
assertNull(ch.readInbound());
ch.writeInbound(Unpooled.copiedBuffer(INCOMPLETE_CHUNKED_RESPONSE, CharsetUtil.ISO_8859_1));
assertThat(ch.readInbound(), instanceOf(HttpResponse.class));
((HttpContent) ch.readInbound()).release(); // Chunk 'first'
((HttpContent) ch.readInbound()).release(); // Chunk 'second'
assertNull(ch.readInbound());
try {
ch.finish();
fail();
} catch (CodecException e) {
assertTrue(e instanceof PrematureChannelClosureException);
}
}
@Test
public void testQuotedBoundary() throws Exception {
final String boundary = "dLV9Wyq26L_-JQxk6ferf-RT153LhOO";
final DefaultFullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST,
"http://localhost");
req.setDecoderResult(DecoderResult.SUCCESS);
req.headers().add(HttpHeaders.Names.CONTENT_TYPE, "multipart/form-data; boundary=\"" + boundary + '"');
req.headers().add(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED);
// Force to use memory-based data.
final DefaultHttpDataFactory inMemoryFactory = new DefaultHttpDataFactory(false);
for (String data : Arrays.asList("", "\r", "\r\r", "\r\r\r")) {
final String body =
"--" + boundary + "\r\n" +
"Content-Disposition: form-data; name=\"file\"; filename=\"tmp-0.txt\"\r\n" +
"Content-Type: image/gif\r\n" +
"\r\n" +
data + "\r\n" +
"--" + boundary + "--\r\n";
req.content().writeBytes(body.getBytes(CharsetUtil.UTF_8));
}
// Create decoder instance to test.
final HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(inMemoryFactory, req);
assertFalse(decoder.getBodyHttpDatas().isEmpty());
decoder.destroy();
}
@Test
public void testEncodeErrorPacketWithEndpoint() throws IOException {
// Given
Packet packet = new Packet(PacketType.ERROR);
// packet.setEndpoint("/woot");
// When
String result = PacketEncoder.encodePacket(packet).toString(CharsetUtil.UTF_8);
// Then
// Assert.assertEquals("7::/woot", result);
assertEquals("7::", result);
}
@Test
public void builder_works_as_expected_for_all_fields() {
// given
ChunkedResponseInfo.ChunkedResponseInfoBuilder builder = ResponseInfo.newChunkedResponseBuilder();
int statusCode = 42;
HttpHeaders headers = mock(HttpHeaders.class);
String mimeType = UUID.randomUUID().toString();
Charset encoding = CharsetUtil.US_ASCII;
Set<Cookie> cookies = mock(Set.class);
boolean preventCompressedOutput = Math.random() > 0.5;
// when
ChunkedResponseInfo responseInfo = builder
.withHttpStatusCode(statusCode)
.withHeaders(headers)
.withDesiredContentWriterMimeType(mimeType)
.withDesiredContentWriterEncoding(encoding)
.withCookies(cookies)
.withPreventCompressedOutput(preventCompressedOutput)
.build();
// then
assertThat(responseInfo.getHttpStatusCode(), is(statusCode));
assertThat(responseInfo.getHeaders(), is(headers));
assertThat(responseInfo.getDesiredContentWriterMimeType(), is(mimeType));
assertThat(responseInfo.getDesiredContentWriterEncoding(), is(encoding));
assertThat(responseInfo.getCookies(), is(cookies));
assertThat(responseInfo.isPreventCompressedOutput(), is(preventCompressedOutput));
}
@Override
public @NotNull CompletableFuture<ResponseInfo<CharSequence>> execute(
@NotNull RequestInfo<String> request,
@NotNull Executor longRunningTaskExecutor,
@NotNull ChannelHandlerContext ctx
) {
CharSequence responsePayload = new StringBuilder(UUID.randomUUID().toString());
String responsePayloadHash = getHashForPayload(responsePayload.toString().getBytes(CharsetUtil.UTF_8));
return CompletableFuture.completedFuture(
ResponseInfo.newBuilder(responsePayload)
.withHeaders(new DefaultHttpHeaders().add(RESPONSE_PAYLOAD_HASH_HEADER_KEY, responsePayloadHash))
.build()
);
}
/**
* Test of a GET_PARAMETER request, with body.
*/
@Test
public void testSendGetParameterRequest() {
String expected = "GET_PARAMETER rtsp://172.10.20.30:554 RTSP/1.0\r\n"
+ "session: 2547019973447939919\r\n"
+ "cseq: 3\r\n"
+ "content-length: 31\r\n"
+ "content-type: text/parameters\r\n"
+ "\r\n"
+ "stream_state\r\n"
+ "position\r\n"
+ "scale\r\n";
byte[] content = ("stream_state\r\n"
+ "position\r\n"
+ "scale\r\n").getBytes(CharsetUtil.UTF_8);
FullHttpRequest request = new DefaultFullHttpRequest(
RtspVersions.RTSP_1_0,
RtspMethods.GET_PARAMETER,
"rtsp://172.10.20.30:554");
request.headers().add(RtspHeaderNames.SESSION, "2547019973447939919");
request.headers().add(RtspHeaderNames.CSEQ, "3");
request.headers().add(RtspHeaderNames.CONTENT_LENGTH,
"" + content.length);
request.headers().add(RtspHeaderNames.CONTENT_TYPE, "text/parameters");
request.content().writeBytes(content);
EmbeddedChannel ch = new EmbeddedChannel(new RtspEncoder());
ch.writeOutbound(request);
ByteBuf buf = ch.readOutbound();
String actual = buf.toString(CharsetUtil.UTF_8);
buf.release();
assertEquals(expected, actual);
}
@Test
public void testSplitContent() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new HttpContentCompressor());
ch.writeInbound(newRequest());
ch.writeOutbound(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK));
ch.writeOutbound(new DefaultHttpContent(Unpooled.copiedBuffer("Hell", CharsetUtil.US_ASCII)));
ch.writeOutbound(new DefaultHttpContent(Unpooled.copiedBuffer("o, w", CharsetUtil.US_ASCII)));
ch.writeOutbound(new DefaultLastHttpContent(Unpooled.copiedBuffer("orld", CharsetUtil.US_ASCII)));
assertEncodedResponse(ch);
HttpContent chunk;
chunk = ch.readOutbound();
assertThat(ByteBufUtil.hexDump(chunk.content()), is("1f8b0800000000000000f248cdc901000000ffff"));
chunk.release();
chunk = ch.readOutbound();
assertThat(ByteBufUtil.hexDump(chunk.content()), is("cad7512807000000ffff"));
chunk.release();
chunk = ch.readOutbound();
assertThat(ByteBufUtil.hexDump(chunk.content()), is("ca2fca4901000000ffff"));
chunk.release();
chunk = ch.readOutbound();
assertThat(ByteBufUtil.hexDump(chunk.content()), is("0300c2a99ae70c000000"));
assertThat(chunk, is(instanceOf(HttpContent.class)));
chunk.release();
chunk = ch.readOutbound();
assertThat(chunk.content().isReadable(), is(false));
assertThat(chunk, is(instanceOf(LastHttpContent.class)));
chunk.release();
assertThat(ch.readOutbound(), is(nullValue()));
}