下面列出了io.netty.channel.embedded.EmbeddedChannel#releaseInbound ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testBindMessageCanBeReadIfTypeForParamsIsUnknown() throws Exception {
PostgresWireProtocol ctx =
new PostgresWireProtocol(
sqlOperations,
sessionContext -> AccessControl.DISABLED,
new AlwaysOKNullAuthentication(),
null);
channel = new EmbeddedChannel(ctx.decoder, ctx.handler);
ByteBuf buffer = Unpooled.buffer();
ClientMessages.sendStartupMessage(buffer, "doc");
ClientMessages.sendParseMessage(buffer, "S1", "select ?, ?", new int[0]); // no type hints for parameters
List<Object> params = Arrays.asList(10, 20);
ClientMessages.sendBindMessage(buffer, "P1", "S1", params);
channel.writeInbound(buffer);
channel.releaseInbound();
Session session = sessions.get(0);
// If the query can be retrieved via portalName it means bind worked
assertThat(session.getQuery("P1"), is("select ?, ?"));
}
@Test
public void testSessionCloseOnTerminationMessage() throws Exception {
SQLOperations sqlOperations = mock(SQLOperations.class);
Session session = mock(Session.class);
when(sqlOperations.createSession(any(String.class), any(User.class))).thenReturn(session);
PostgresWireProtocol ctx =
new PostgresWireProtocol(
sqlOperations,
sessionContext -> AccessControl.DISABLED,
new AlwaysOKNullAuthentication(),
null);
channel = new EmbeddedChannel(ctx.decoder, ctx.handler);
ByteBuf buffer = Unpooled.buffer();
ClientMessages.sendStartupMessage(buffer, "doc");
ClientMessages.sendTermination(buffer);
channel.writeInbound(buffer);
channel.releaseInbound();
verify(session, times(1)).close();
}
@Test
public void testNotNoHbaConfig() throws Exception {
HttpAuthUpstreamHandler handler = new HttpAuthUpstreamHandler(Settings.EMPTY, authService);
EmbeddedChannel ch = new EmbeddedChannel(handler);
DefaultHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/_sql");
request.headers().add(HttpHeaderNames.AUTHORIZATION.toString(), "Basic QWxhZGRpbjpPcGVuU2VzYW1l");
request.headers().add("X-Real-Ip", "10.1.0.100");
ch.writeInbound(request);
ch.releaseInbound();
assertFalse(handler.authorized());
assertUnauthorized(
ch.readOutbound(),
"No valid auth.host_based.config entry found for host \"10.1.0.100\", user \"Aladdin\", protocol \"http\"\n");
}
@Test
public void test_channel_is_flushed_after_receiving_flush_request() throws Exception {
SQLOperations sqlOperations = mock(SQLOperations.class);
Session session = mock(Session.class);
when(sqlOperations.createSession(any(String.class), any(User.class))).thenReturn(session);
PostgresWireProtocol ctx =
new PostgresWireProtocol(
sqlOperations,
sessionContext -> AccessControl.DISABLED,
new AlwaysOKNullAuthentication(),
null);
AtomicBoolean flushed = new AtomicBoolean(false);
channel = new EmbeddedChannel(ctx.decoder, ctx.handler) {
@Override
public Channel flush() {
flushed.set(true);
return super.flush();
}
};
ByteBuf buffer = Unpooled.buffer();
ClientMessages.sendStartupMessage(buffer, "doc");
ClientMessages.sendParseMessage(buffer, "", "select ?", new int[0]);
ClientMessages.sendFlush(buffer);
channel.writeInbound(buffer);
channel.releaseInbound();
assertThat(flushed.get(), is(true));
}
@Test
public void testCrateServerVersionIsReceivedOnStartup() throws Exception {
PostgresWireProtocol ctx = new PostgresWireProtocol(
sqlOperations, sessionContext -> AccessControl.DISABLED, new AlwaysOKNullAuthentication(), null);
channel = new EmbeddedChannel(ctx.decoder, ctx.handler);
ByteBuf buf = Unpooled.buffer();
ClientMessages.sendStartupMessage(buf, "doc");
channel.writeInbound(buf);
channel.releaseInbound();
ByteBuf respBuf;
respBuf = channel.readOutbound();
try {
assertThat((char) respBuf.readByte(), is('R')); // Auth OK
} finally {
respBuf.release();
}
respBuf = channel.readOutbound();
try {
assertThat((char) respBuf.readByte(), is('S')); // ParameterStatus
respBuf.readInt(); // length
String key = PostgresWireProtocol.readCString(respBuf);
String value = PostgresWireProtocol.readCString(respBuf);
assertThat(key, is("crate_version"));
assertThat(value, is(Version.CURRENT.externalNumber()));
} finally {
respBuf.release();
}
}
@Test
public void testChannelClosedWhenUnauthorized() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel();
HttpAuthUpstreamHandler.sendUnauthorized(ch, null);
ch.releaseInbound();
HttpResponse resp = ch.readOutbound();
assertThat(resp.status(), is(HttpResponseStatus.UNAUTHORIZED));
assertThat(ch.isOpen(), is(false));
}
@Test
public void testSendUnauthorizedWithoutBody() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel();
HttpAuthUpstreamHandler.sendUnauthorized(ch, null);
ch.releaseInbound();
DefaultFullHttpResponse resp = ch.readOutbound();
assertThat(resp.content(), is(Unpooled.EMPTY_BUFFER));
}
@Test
public void testSendUnauthorizedWithBody() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel();
HttpAuthUpstreamHandler.sendUnauthorized(ch, "not allowed\n");
ch.releaseInbound();
DefaultFullHttpResponse resp = ch.readOutbound();
assertThat(resp.content().toString(StandardCharsets.UTF_8), is("not allowed\n"));
}
@Test
public void testSendUnauthorizedWithBodyNoNewline() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel();
HttpAuthUpstreamHandler.sendUnauthorized(ch, "not allowed");
ch.releaseInbound();
DefaultFullHttpResponse resp = ch.readOutbound();
assertThat(resp.content().toString(StandardCharsets.UTF_8), is("not allowed\n"));
}
@Test
public void testAuthorized() throws Exception {
HttpAuthUpstreamHandler handler = new HttpAuthUpstreamHandler(Settings.EMPTY, new AlwaysOKNullAuthentication());
EmbeddedChannel ch = new EmbeddedChannel(handler);
DefaultHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/_sql");
ch.writeInbound(request);
ch.releaseInbound();
assertThat(handler.authorized(), is(true));
}
@Test
public void testUnauthorizedUser() throws Exception {
HttpAuthUpstreamHandler handler = new HttpAuthUpstreamHandler(Settings.EMPTY, authService);
EmbeddedChannel ch = new EmbeddedChannel(handler);
HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/_sql");
ch.writeInbound(request);
ch.releaseInbound();
assertFalse(handler.authorized());
assertUnauthorized(ch.readOutbound(), "trust authentication failed for user \"crate\"\n");
}
@Test
public void testUserAuthenticationWithDisabledHBA() throws Exception {
User crateUser = User.of("crate", EnumSet.of(User.Role.SUPERUSER));
Authentication authServiceNoHBA = new AlwaysOKAuthentication(userName -> crateUser);
HttpAuthUpstreamHandler handler = new HttpAuthUpstreamHandler(Settings.EMPTY, authServiceNoHBA);
EmbeddedChannel ch = new EmbeddedChannel(handler);
HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/_sql");
request.headers().add(HttpHeaderNames.AUTHORIZATION.toString(), "Basic Y3JhdGU6");
ch.writeInbound(request);
ch.releaseInbound();
assertTrue(handler.authorized());
}
@Test
public void testUnauthorizedUserWithDisabledHBA() throws Exception {
Authentication authServiceNoHBA = new AlwaysOKAuthentication(userName -> null);
HttpAuthUpstreamHandler handler = new HttpAuthUpstreamHandler(Settings.EMPTY, authServiceNoHBA);
EmbeddedChannel ch = new EmbeddedChannel(handler);
HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/_sql");
request.headers().add(HttpHeaderNames.AUTHORIZATION.toString(), "Basic QWxhZGRpbjpPcGVuU2VzYW1l");
ch.writeInbound(request);
ch.releaseInbound();
assertFalse(handler.authorized());
assertUnauthorized(ch.readOutbound(), "trust authentication failed for user \"Aladdin\"\n");
}
private static void sendStartupMessage(EmbeddedChannel channel) {
ByteBuf startupMsg = Unpooled.buffer();
ClientMessages.sendStartupMessage(startupMsg, "db");
channel.writeInbound(startupMsg);
channel.releaseInbound();
}
@Test
public void testPipelineConfiguration() throws Exception {
Settings settings = Settings.builder()
.put(PATH_HOME_SETTING.getKey(), "/tmp")
.put(SslConfigSettings.SSL_HTTP_ENABLED.getKey(), true)
.put(SslConfigSettings.SSL_TRUSTSTORE_FILEPATH.getKey(), trustStoreFile.getAbsolutePath())
.put(SslConfigSettings.SSL_TRUSTSTORE_PASSWORD.getKey(), "truststorePassword")
.put(SslConfigSettings.SSL_KEYSTORE_FILEPATH.getKey(), keyStoreFile.getAbsolutePath())
.put(SslConfigSettings.SSL_KEYSTORE_PASSWORD.getKey(), "keystorePassword")
.put(SslConfigSettings.SSL_KEYSTORE_KEY_PASSWORD.getKey(), "serverKeyPassword")
.build();
NetworkService networkService = new NetworkService(Collections.singletonList(new NetworkService.CustomNameResolver() {
@Override
public InetAddress[] resolveDefault() {
return new InetAddress[] { InetAddresses.forString("127.0.0.1") };
}
@Override
public InetAddress[] resolveIfPossible(String value) throws IOException {
return new InetAddress[] { InetAddresses.forString("127.0.0.1") };
}
}));
PipelineRegistry pipelineRegistry = new PipelineRegistry(settings);
pipelineRegistry.setSslContextProvider(new SslContextProviderImpl(settings));
Netty4HttpServerTransport transport =
new Netty4HttpServerTransport(
settings,
networkService,
BigArrays.NON_RECYCLING_INSTANCE,
mock(ThreadPool.class),
NamedXContentRegistry.EMPTY,
pipelineRegistry,
mock(NodeClient.class));
EmbeddedChannel channel = new EmbeddedChannel();
try {
transport.start();
Netty4HttpServerTransport.HttpChannelHandler httpChannelHandler =
(Netty4HttpServerTransport.HttpChannelHandler) transport.configureServerChannelHandler();
httpChannelHandler.initChannel(channel);
assertThat(channel.pipeline().first(), instanceOf(SslHandler.class));
} finally {
transport.stop();
transport.close();
channel.releaseInbound();
channel.close().awaitUninterruptibly();
}
}