下面列出了org.mockito.internal.verification.Times#software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void idleConnectionReaperDoesNotReapActiveConnections() throws InterruptedException {
Duration maxIdleTime = Duration.ofSeconds(2);
try(SdkAsyncHttpClient client = NettyNioAsyncHttpClient.builder()
.connectionMaxIdleTime(maxIdleTime)
.buildWithDefaults(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS)) {
Instant end = Instant.now().plus(maxIdleTime.plusSeconds(1));
// Send requests for longer than the max-idle time, ensuring no connections are closed.
while (Instant.now().isBefore(end)) {
makeRequest(client);
Thread.sleep(100);
verify(TRAFFIC_LISTENER, new Times(0)).closed(any());
}
// Do nothing for longer than the max-idle time, ensuring connections are closed.
Thread.sleep(maxIdleTime.plusSeconds(1).toMillis());
verify(TRAFFIC_LISTENER, new AtLeast(1)).closed(any());
}
}
@Test
public void oldConnectionReaperReapsActiveConnections() throws InterruptedException {
Duration connectionTtl = Duration.ofMillis(200);
try (SdkAsyncHttpClient client = NettyNioAsyncHttpClient.builder()
.connectionTimeToLive(connectionTtl)
.buildWithDefaults(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS)) {
Instant end = Instant.now().plus(Duration.ofSeconds(5));
verify(TRAFFIC_LISTENER, new Times(0)).closed(any());
// Send requests frequently, validating that connections are still being closed.
while (Instant.now().isBefore(end)) {
makeRequest(client);
Thread.sleep(100);
}
verify(TRAFFIC_LISTENER, new AtLeast(20)).closed(any());
}
}
@Setup(Level.Trial)
public void setup() throws Exception {
mockServer = new MockH2Server(false);
mockServer.start();
SslProvider sslProvider = getSslProvider(sslProviderValue);
sdkHttpClient = NettyNioAsyncHttpClient.builder()
.sslProvider(sslProvider)
.buildWithDefaults(trustAllTlsAttributeMapBuilder()
.put(PROTOCOL, Protocol.HTTP2)
.build());
client = ProtocolRestJsonAsyncClient.builder()
.endpointOverride(mockServer.getHttpsUri())
.httpClient(sdkHttpClient)
.build();
// Making sure the request actually succeeds
client.allTypes().join();
}
@Setup(Level.Trial)
public void setup() throws Exception {
mockServer = new MockServer();
mockServer.start();
SslProvider sslProvider = getSslProvider(sslProviderValue);
sdkHttpClient = NettyNioAsyncHttpClient.builder()
.sslProvider(sslProvider)
.buildWithDefaults(trustAllTlsAttributeMapBuilder().build());
client = ProtocolRestJsonAsyncClient.builder()
.endpointOverride(mockServer.getHttpsUri())
.httpClient(sdkHttpClient)
.build();
// Making sure the request actually succeeds
client.allTypes().join();
}
@Bean
public S3AsyncClient s3client(S3ClientConfigurarionProperties s3props, AwsCredentialsProvider credentialsProvider) {
SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder()
.writeTimeout(Duration.ZERO)
.maxConcurrency(64)
.build();
S3Configuration serviceConfiguration = S3Configuration.builder()
.checksumValidationEnabled(false)
.chunkedEncodingEnabled(true)
.build();
S3AsyncClientBuilder b = S3AsyncClient.builder()
.httpClient(httpClient)
.region(s3props.getRegion())
.credentialsProvider(credentialsProvider)
.serviceConfiguration(serviceConfiguration);
if (s3props.getEndpoint() != null) {
b = b.endpointOverride(s3props.getEndpoint());
}
return b.build();
}
private S3AsyncClientBuilder asyncClientBuilder(String region) {
SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder()
.maxConcurrency(downloaderProperties.getMaxConcurrency())
.connectionMaxIdleTime(Duration.ofSeconds(5)) // https://github.com/aws/aws-sdk-java-v2/issues/1122
.build();
return S3AsyncClient.builder()
.region(Region.of(region))
.credentialsProvider(awsCredentialsProvider(
downloaderProperties.getAccessKey(), downloaderProperties.getSecretKey()))
.httpClient(httpClient)
.overrideConfiguration(c -> c.addExecutionInterceptor(metricsExecutionInterceptor));
}
@Test
public void subscribeToShard_smallWindow_doesNotTimeOutReads() {
// We want sufficiently large records (relative to the initial window
// size we're choosing) so the client has to send multiple
// WINDOW_UPDATEs to receive them
for (int i = 0; i < 16; ++i) {
putRecord(64 * 1024);
}
KinesisAsyncClient smallWindowAsyncClient = KinesisAsyncClient.builder()
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
.httpClientBuilder(NettyNioAsyncHttpClient.builder()
.http2Configuration(Http2Configuration.builder()
.initialWindowSize(16384)
.build()))
.build();
try {
smallWindowAsyncClient.subscribeToShard(r -> r.consumerARN(consumerArn)
.shardId(shardId)
.startingPosition(s -> s.type(ShardIteratorType.TRIM_HORIZON)),
SubscribeToShardResponseHandler.builder()
.onEventStream(es -> Flowable.fromPublisher(es).forEach(e -> {}))
.onResponse(this::verifyHttpMetadata)
.build())
.join();
} finally {
smallWindowAsyncClient.close();
}
}
@Test
public void connectionCloseAfterResponse_shouldNotReuseConnection() throws Exception {
server = new Server();
server.init();
netty = NettyNioAsyncHttpClient.builder()
.eventLoopGroup(SdkEventLoopGroup.builder().numberOfThreads(2).build())
.protocol(Protocol.HTTP1_1)
.buildWithDefaults(AttributeMap.builder().put(TRUST_ALL_CERTIFICATES, true).build());
sendGetRequest().join();
sendGetRequest().join();
assertThat(server.channels.size()).isEqualTo(2);
}
@Test
public void builderSetter_negativeValue_throws() {
expected.expect(IllegalArgumentException.class);
NettyNioAsyncHttpClient.builder()
.http2Configuration(Http2Configuration.builder()
.initialWindowSize(-1)
.build())
.build();
}
@Test
public void builderSetter_0Value_throws() {
expected.expect(IllegalArgumentException.class);
NettyNioAsyncHttpClient.builder()
.http2Configuration(Http2Configuration.builder()
.initialWindowSize(0)
.build())
.build();
}
@Before
public void setup() throws Exception {
server = new Server();
server.init();
netty = NettyNioAsyncHttpClient.builder()
.eventLoopGroup(SdkEventLoopGroup.builder().numberOfThreads(2).build())
.protocol(Protocol.HTTP1_1)
.buildWithDefaults(AttributeMap.builder().put(TRUST_ALL_CERTIFICATES, true).build());
}
@Before
public void setup() throws Exception {
server = new Server();
server.init();
netty = NettyNioAsyncHttpClient.builder()
.eventLoopGroup(SdkEventLoopGroup.builder().numberOfThreads(3).build())
.protocol(Protocol.HTTP2)
.buildWithDefaults(AttributeMap.builder().put(TRUST_ALL_CERTIFICATES, true).build());
}
@Before
public void setup() throws Exception {
server = new Server();
server.init();
netty = NettyNioAsyncHttpClient.builder()
.readTimeout(Duration.ofMillis(500))
.eventLoopGroup(SdkEventLoopGroup.builder().numberOfThreads(3).build())
.protocol(Protocol.HTTP2)
.buildWithDefaults(AttributeMap.builder().put(TRUST_ALL_CERTIFICATES, true).build());
}
@Before
public void setup() throws Exception {
server = new Server();
server.init();
netty = NettyNioAsyncHttpClient.builder()
.readTimeout(Duration.ofMillis(1000))
.eventLoopGroup(SdkEventLoopGroup.builder().numberOfThreads(3).build())
.http2Configuration(h -> h.healthCheckPingPeriod(Duration.ofMillis(200)))
.protocol(Protocol.HTTP2)
.buildWithDefaults(AttributeMap.builder().put(TRUST_ALL_CERTIFICATES, true).build());
}
/**
* @return an instance of KinesisAsyncClient
*/
public static KinesisAsyncClient kinesisClient() {
return KinesisAsyncClient.builder()
.credentialsProvider(EnvironmentVariableCredentialsProvider.create())
.region(Region.US_EAST_1)
.httpClientBuilder(NettyNioAsyncHttpClient.builder())
.build();
}
@BeforeEach
public void setup() {
streamName = "kinesisstabilitytest" + System.currentTimeMillis();
consumerArns = new ArrayList<>(CONSUMER_COUNT);
shardIds = new ArrayList<>(SHARD_COUNT);
producedData = new ArrayList<>();
asyncClient = KinesisAsyncClient.builder()
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
.httpClientBuilder(NettyNioAsyncHttpClient.builder().maxConcurrency(MAX_CONCURRENCY))
.build();
asyncClient.createStream(r -> r.streamName(streamName)
.shardCount(SHARD_COUNT))
.join();
waitForStreamToBeActive();
streamARN = asyncClient.describeStream(r -> r.streamName(streamName)).join()
.streamDescription()
.streamARN();
shardIds = asyncClient.listShards(r -> r.streamName(streamName))
.join()
.shards().stream().map(Shard::shardId).collect(Collectors.toList());
waiterExecutorService = Executors.newFixedThreadPool(CONSUMER_COUNT);
producer = Executors.newScheduledThreadPool(1);
registerStreamConsumers();
waitForConsumersToBeActive();
}
@BeforeAll
public static void setup() {
transcribeStreamingClient = TranscribeStreamingAsyncClient.builder()
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
.httpClientBuilder(NettyNioAsyncHttpClient.builder()
.connectionAcquisitionTimeout(Duration.ofSeconds(30))
.maxConcurrency(CONCURRENCY))
.build();
audioFileInputStream = getInputStream();
if (audioFileInputStream == null) {
throw new RuntimeException("fail to get the audio input stream");
}
}
@Bean
@Primary
public KinesisAsyncClient kinesisAsyncClient(final @Value("${test.environment:local}") String testEnvironment,
final AwsCredentialsProvider credentialsProvider,
final RetryPolicy kinesisRetryPolicy) {
// kinesalite does not support cbor at the moment (v1.11.6)
System.setProperty("aws.cborEnabled", "false");
LOG.info("kinesis client for local tests");
final KinesisAsyncClient kinesisClient;
if (testEnvironment.equals("local")) {
kinesisClient = KinesisAsyncClient.builder()
.httpClient(
// Disables HTTP2 because of problems with LocalStack
NettyNioAsyncHttpClient.builder()
.protocol(Protocol.HTTP1_1)
.build())
.endpointOverride(URI.create("http://localhost:4568"))
.region(Region.EU_CENTRAL_1)
.credentialsProvider(credentialsProvider)
.overrideConfiguration(ClientOverrideConfiguration.builder().retryPolicy(kinesisRetryPolicy).build())
.build();
} else {
kinesisClient = KinesisAsyncClient.builder()
.credentialsProvider(credentialsProvider)
.build();
}
createChannelIfNotExists(kinesisClient, KINESIS_INTEGRATION_TEST_CHANNEL, EXPECTED_NUMBER_OF_SHARDS);
return kinesisClient;
}
@Bean
@Primary
public KinesisAsyncClient kinesisAsyncClient(final @Value("${test.environment:local}") String testEnvironment,
final AwsCredentialsProvider credentialsProvider) {
// kinesalite does not support cbor at the moment (v1.11.6)
System.setProperty("aws.cborEnabled", "false");
LOG.info("kinesis client for local tests");
final KinesisAsyncClient kinesisClient;
if (testEnvironment.equals("local")) {
kinesisClient = KinesisAsyncClient.builder()
.httpClient(
// Disables HTTP2 because of problems with LocalStack
NettyNioAsyncHttpClient.builder()
.protocol(Protocol.HTTP1_1)
.build())
.endpointOverride(URI.create("http://localhost:4568"))
.region(Region.EU_CENTRAL_1)
.credentialsProvider(credentialsProvider)
.build();
} else {
kinesisClient = KinesisAsyncClient.builder()
.credentialsProvider(credentialsProvider)
.build();
}
createChannelIfNotExists(kinesisClient, INTEGRATION_TEST_STREAM, 2);
return kinesisClient;
}
/**
* @return The builder for {@link NettyNioAsyncHttpClient}
*/
public NettyNioAsyncHttpClient.Builder getBuilder() {
return builder.proxyConfiguration(proxy.build());
}
public RuntimeValue<SdkAsyncHttpClient.Builder> configureAsync(String clientName,
RuntimeValue<NettyHttpClientConfig> asyncConfigRuntime) {
NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
NettyHttpClientConfig asyncConfig = asyncConfigRuntime.getValue();
validateNettyClientConfig(clientName, asyncConfig);
builder.connectionAcquisitionTimeout(asyncConfig.connectionAcquisitionTimeout);
builder.connectionMaxIdleTime(asyncConfig.connectionMaxIdleTime);
builder.connectionTimeout(asyncConfig.connectionTimeout);
asyncConfig.connectionTimeToLive.ifPresent(builder::connectionTimeToLive);
builder.maxConcurrency(asyncConfig.maxConcurrency);
builder.maxPendingConnectionAcquires(asyncConfig.maxPendingConnectionAcquires);
builder.protocol(asyncConfig.protocol);
builder.readTimeout(asyncConfig.readTimeout);
builder.writeTimeout(asyncConfig.writeTimeout);
asyncConfig.sslProvider.ifPresent(builder::sslProvider);
builder.useIdleConnectionReaper(asyncConfig.useIdleConnectionReaper);
if (asyncConfig.http2.initialWindowSize.isPresent() || asyncConfig.http2.maxStreams.isPresent()) {
Http2Configuration.Builder http2Builder = Http2Configuration.builder();
asyncConfig.http2.initialWindowSize.ifPresent(http2Builder::initialWindowSize);
asyncConfig.http2.maxStreams.ifPresent(http2Builder::maxStreams);
asyncConfig.http2.healthCheckPingPeriod.ifPresent(http2Builder::healthCheckPingPeriod);
builder.http2Configuration(http2Builder.build());
}
if (asyncConfig.proxy.enabled && asyncConfig.proxy.endpoint.isPresent()) {
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.Builder proxyBuilder = software.amazon.awssdk.http.nio.netty.ProxyConfiguration
.builder().scheme(asyncConfig.proxy.endpoint.get().getScheme())
.host(asyncConfig.proxy.endpoint.get().getHost())
.nonProxyHosts(new HashSet<>(asyncConfig.proxy.nonProxyHosts.orElse(Collections.emptyList())));
if (asyncConfig.proxy.endpoint.get().getPort() != -1) {
proxyBuilder.port(asyncConfig.proxy.endpoint.get().getPort());
}
builder.proxyConfiguration(proxyBuilder.build());
}
getTlsKeyManagersProvider(asyncConfig.tlsManagersProvider).ifPresent(builder::tlsKeyManagersProvider);
if (asyncConfig.eventLoop.override) {
SdkEventLoopGroup.Builder eventLoopBuilder = SdkEventLoopGroup.builder();
asyncConfig.eventLoop.numberOfThreads.ifPresent(eventLoopBuilder::numberOfThreads);
if (asyncConfig.eventLoop.threadNamePrefix.isPresent()) {
eventLoopBuilder.threadFactory(
new ThreadFactoryBuilder().threadNamePrefix(asyncConfig.eventLoop.threadNamePrefix.get()).build());
}
builder.eventLoopGroupBuilder(eventLoopBuilder);
}
return new RuntimeValue<>(builder);
}
@Test
public void execute_noExplicitValueSet_sendsDefaultValueInSettings() throws InterruptedException {
ConcurrentLinkedQueue<Http2Frame> receivedFrames = new ConcurrentLinkedQueue<>();
server = new TestH2Server(() -> new StreamHandler(receivedFrames));
server.init();
netty = NettyNioAsyncHttpClient.builder()
.protocol(Protocol.HTTP2)
.build();
AsyncExecuteRequest req = AsyncExecuteRequest.builder()
.requestContentPublisher(new EmptyPublisher())
.request(SdkHttpFullRequest.builder()
.method(SdkHttpMethod.GET)
.protocol("http")
.host("localhost")
.port(server.port())
.build())
.responseHandler(new SdkAsyncHttpResponseHandler() {
@Override
public void onHeaders(SdkHttpResponse headers) {
}
@Override
public void onStream(Publisher<ByteBuffer> stream) {
}
@Override
public void onError(Throwable error) {
}
})
.build();
netty.execute(req).join();
List<Http2Settings> receivedSettings = receivedFrames.stream()
.filter(f -> f instanceof Http2SettingsFrame)
.map(f -> (Http2SettingsFrame) f)
.map(Http2SettingsFrame::settings)
.collect(Collectors.toList());
assertThat(receivedSettings.size()).isGreaterThan(0);
for (Http2Settings s : receivedSettings) {
assertThat(s.initialWindowSize()).isEqualTo(DEFAULT_INIT_WINDOW_SIZE);
}
}
private void expectCorrectWindowSizeValueTest(Integer builderSetterValue, int settingsFrameValue) throws InterruptedException {
ConcurrentLinkedQueue<Http2Frame> receivedFrames = new ConcurrentLinkedQueue<>();
server = new TestH2Server(() -> new StreamHandler(receivedFrames));
server.init();
netty = NettyNioAsyncHttpClient.builder()
.protocol(Protocol.HTTP2)
.http2Configuration(Http2Configuration.builder()
.initialWindowSize(builderSetterValue)
.build())
.build();
AsyncExecuteRequest req = AsyncExecuteRequest.builder()
.requestContentPublisher(new EmptyPublisher())
.request(SdkHttpFullRequest.builder()
.method(SdkHttpMethod.GET)
.protocol("http")
.host("localhost")
.port(server.port())
.build())
.responseHandler(new SdkAsyncHttpResponseHandler() {
@Override
public void onHeaders(SdkHttpResponse headers) {
}
@Override
public void onStream(Publisher<ByteBuffer> stream) {
}
@Override
public void onError(Throwable error) {
}
})
.build();
netty.execute(req).join();
List<Http2Settings> receivedSettings = receivedFrames.stream()
.filter(f -> f instanceof Http2SettingsFrame)
.map(f -> (Http2SettingsFrame) f)
.map(Http2SettingsFrame::settings)
.collect(Collectors.toList());
assertThat(receivedSettings.size()).isGreaterThan(0);
for (Http2Settings s : receivedSettings) {
assertThat(s.initialWindowSize()).isEqualTo(settingsFrameValue);
}
}
private CompletableFuture<Void> makeRequest(Duration healthCheckPingPeriod) {
netty = NettyNioAsyncHttpClient.builder()
.protocol(Protocol.HTTP2)
.http2Configuration(Http2Configuration.builder().healthCheckPingPeriod(healthCheckPingPeriod).build())
.build();
SdkHttpFullRequest request = SdkHttpFullRequest.builder()
.protocol("http")
.host("localhost")
.port(server.port())
.method(SdkHttpMethod.GET)
.build();
AsyncExecuteRequest executeRequest = AsyncExecuteRequest.builder()
.fullDuplex(false)
.request(request)
.requestContentPublisher(new EmptyPublisher())
.responseHandler(new SdkAsyncHttpResponseHandler() {
@Override
public void onHeaders(SdkHttpResponse headers) {
}
@Override
public void onStream(Publisher<ByteBuffer> stream) {
stream.subscribe(new Subscriber<ByteBuffer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(ByteBuffer byteBuffer) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
}
@Override
public void onError(Throwable error) {
}
})
.build();
return netty.execute(executeRequest);
}
public static KinesisAsyncClientBuilder adjustKinesisClientBuilder(KinesisAsyncClientBuilder builder) {
return builder.httpClientBuilder(NettyNioAsyncHttpClient.builder().maxConcurrency(Integer.MAX_VALUE)
.http2Configuration(Http2Configuration.builder().initialWindowSize(INITIAL_WINDOW_SIZE_BYTES)
.healthCheckPingPeriod(Duration.ofMillis(HEALTH_CHECK_PING_PERIOD_MILLIS)).build())
.protocol(Protocol.HTTP2));
}