下面列出了怎么用io.grpc.internal.testing.StreamRecorder的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void cancelAfterBegin() throws Exception {
StreamRecorder<StreamingInputCallResponse> responseObserver = StreamRecorder.create();
StreamObserver<StreamingInputCallRequest> requestObserver =
asyncStub.streamingInputCall(responseObserver);
requestObserver.onError(new RuntimeException());
responseObserver.awaitCompletion();
assertEquals(Arrays.<StreamingInputCallResponse>asList(), responseObserver.getValues());
assertEquals(Status.Code.CANCELLED,
Status.fromThrowable(responseObserver.getError()).getCode());
if (metricsExpected()) {
MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkStartTags(clientStartRecord, "grpc.testing.TestService/StreamingInputCall");
// CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be
// recorded. The tracer stats rely on the stream being created, which is not always the case
// in this test. Therefore we don't check the tracer stats.
MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkEndTags(
clientEndRecord, "grpc.testing.TestService/StreamingInputCall",
Status.CANCELLED.getCode());
// Do not check server-side metrics, because the status on the server side is undetermined.
}
}
@Test
public void cancelAfterFirstResponse() throws Exception {
final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder()
.setSize(31415))
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[27182])))
.build();
final StreamingOutputCallResponse goldenResponse = StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[31415])))
.build();
StreamRecorder<StreamingOutputCallResponse> responseObserver = StreamRecorder.create();
StreamObserver<StreamingOutputCallRequest> requestObserver
= asyncStub.fullDuplexCall(responseObserver);
requestObserver.onNext(request);
assertResponse(goldenResponse, responseObserver.firstValue().get());
requestObserver.onError(new RuntimeException());
responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
assertEquals(1, responseObserver.getValues().size());
assertEquals(Status.Code.CANCELLED,
Status.fromThrowable(responseObserver.getError()).getCode());
assertStatsTrace("grpc.testing.TestService/FullDuplexCall", Status.Code.CANCELLED);
}
@Test
public void fileContainingNestedSymbol() throws Exception {
ServerReflectionRequest request =
ServerReflectionRequest.newBuilder()
.setHost(TEST_HOST)
.setFileContainingSymbol("grpc.reflection.testing.NestedTypeOuter.Middle.Inner")
.build();
ServerReflectionResponse goldenResponse =
ServerReflectionResponse.newBuilder()
.setValidHost(TEST_HOST)
.setOriginalRequest(request)
.setFileDescriptorResponse(
FileDescriptorResponse.newBuilder()
.addFileDescriptorProto(
ReflectionTestDepthThreeProto.getDescriptor().toProto().toByteString())
.build())
.build();
StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver =
stub.serverReflectionInfo(responseObserver);
requestObserver.onNext(request);
requestObserver.onCompleted();
assertEquals(goldenResponse, responseObserver.firstValue().get());
}
@Test
public void allExtensionNumbersOfType() throws Exception {
ServerReflectionRequest request =
ServerReflectionRequest.newBuilder()
.setHost(TEST_HOST)
.setAllExtensionNumbersOfType("grpc.reflection.testing.ThirdLevelType")
.build();
Set<Integer> goldenResponse = new HashSet<Integer>(Arrays.asList(100, 101));
StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver =
stub.serverReflectionInfo(responseObserver);
requestObserver.onNext(request);
requestObserver.onCompleted();
Set<Integer> extensionNumberResponseSet =
new HashSet<Integer>(
responseObserver
.firstValue()
.get()
.getAllExtensionNumbersResponse()
.getExtensionNumberList());
assertEquals(goldenResponse, extensionNumberResponseSet);
}
@Test
void cantApplyFeatureSetIfNotProjectMember() throws InvalidProtocolBufferException {
String project = "project1";
Authentication auth = mock(Authentication.class);
SecurityContext context = mock(SecurityContext.class);
SecurityContextHolder.setContext(context);
when(context.getAuthentication()).thenReturn(auth);
doReturn(AuthorizationResult.failed(null))
.when(authProvider)
.checkAccess(anyString(), any(Authentication.class));
StreamRecorder<ApplyFeatureSetResponse> responseObserver = StreamRecorder.create();
FeatureSetProto.FeatureSet incomingFeatureSet = newDummyFeatureSet("f2", 1, project).toProto();
FeatureSetProto.FeatureSetSpec incomingFeatureSetSpec =
incomingFeatureSet.getSpec().toBuilder().build();
FeatureSetProto.FeatureSet spec =
FeatureSetProto.FeatureSet.newBuilder().setSpec(incomingFeatureSetSpec).build();
ApplyFeatureSetRequest request =
ApplyFeatureSetRequest.newBuilder().setFeatureSet(spec).build();
assertThrows(
AccessDeniedException.class, () -> coreService.applyFeatureSet(request, responseObserver));
}
@Test
void canApplyFeatureSetIfProjectMember() throws InvalidProtocolBufferException {
String project = "project1";
Authentication auth = mock(Authentication.class);
SecurityContext context = mock(SecurityContext.class);
SecurityContextHolder.setContext(context);
when(context.getAuthentication()).thenReturn(auth);
doReturn(AuthorizationResult.success())
.when(authProvider)
.checkAccess(anyString(), any(Authentication.class));
StreamRecorder<ApplyFeatureSetResponse> responseObserver = StreamRecorder.create();
FeatureSetProto.FeatureSet incomingFeatureSet = newDummyFeatureSet("f2", 1, project).toProto();
FeatureSetProto.FeatureSetSpec incomingFeatureSetSpec =
incomingFeatureSet.getSpec().toBuilder().build();
FeatureSetProto.FeatureSet spec =
FeatureSetProto.FeatureSet.newBuilder().setSpec(incomingFeatureSetSpec).build();
ApplyFeatureSetRequest request =
ApplyFeatureSetRequest.newBuilder().setFeatureSet(spec).build();
coreService.applyFeatureSet(request, responseObserver);
}
@Test
public void allExtensionNumbersOfType() throws Exception {
ServerReflectionRequest request =
ServerReflectionRequest.newBuilder()
.setHost(TEST_HOST)
.setAllExtensionNumbersOfType("grpc.reflection.testing.ThirdLevelType")
.build();
Set<Integer> goldenResponse = new HashSet<>(Arrays.asList(100, 101));
StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver =
stub.serverReflectionInfo(responseObserver);
requestObserver.onNext(request);
requestObserver.onCompleted();
Set<Integer> extensionNumberResponseSet =
new HashSet<>(
responseObserver
.firstValue()
.get()
.getAllExtensionNumbersResponse()
.getExtensionNumberList());
assertEquals(goldenResponse, extensionNumberResponseSet);
}
@Test
public void testGetAccountIncludesPasswordPolicy() throws IOException {
Path licenseFilePath = Files.createTempFile("license", "txt");
Files.writeString(licenseFilePath, VALID_LICENSE);
ReflectionTestUtils.setField(underTest, "cmLicenseFilePath", licenseFilePath.toString());
try {
underTest.init();
GetAccountRequest req = GetAccountRequest.getDefaultInstance();
StreamRecorder<GetAccountResponse> observer = StreamRecorder.create();
underTest.getAccount(req, observer);
assertThat(observer.getValues().size()).isEqualTo(1);
GetAccountResponse res = observer.getValues().get(0);
assertThat(res.hasAccount()).isTrue();
Account account = res.getAccount();
assertThat(account.hasPasswordPolicy()).isTrue();
WorkloadPasswordPolicy passwordPolicy = account.getPasswordPolicy();
assertThat(passwordPolicy.getWorkloadPasswordMaxLifetime()).isEqualTo(MockUserManagementService.PASSWORD_LIFETIME);
} finally {
Files.delete(licenseFilePath);
}
}
/**
* Test failing call.
*/
@Test
@DirtiesContext
public void testFailingCall() {
log.info("--- Starting tests with failing call ---");
assertThrowsStatus(UNIMPLEMENTED,
() -> TestServiceGrpc.newBlockingStub(this.channel).unimplemented(Empty.getDefaultInstance()));
final StreamRecorder<SomeType> streamRecorder = StreamRecorder.create();
this.testServiceStub.unimplemented(Empty.getDefaultInstance(), streamRecorder);
assertFutureThrowsStatus(UNIMPLEMENTED, streamRecorder.firstValue(), 5, TimeUnit.SECONDS);
assertThrowsStatus(UNIMPLEMENTED, () -> this.testServiceBlockingStub.unimplemented(Empty.getDefaultInstance()));
assertFutureThrowsStatus(UNIMPLEMENTED, this.testServiceFutureStub.unimplemented(Empty.getDefaultInstance()),
5, TimeUnit.SECONDS);
log.info("--- Test completed ---");
}
/**
* Test successful call with broken setup.
*/
@Test
@DirtiesContext
public void testSuccessfulCallWithBrokenSetup() {
log.info("--- Starting tests with successful call with broken setup ---");
assertThrowsStatus(UNAVAILABLE,
() -> TestServiceGrpc.newBlockingStub(this.channel).normal(Empty.getDefaultInstance()));
final StreamRecorder<SomeType> streamRecorder = StreamRecorder.create();
this.testServiceStub.normal(Empty.getDefaultInstance(), streamRecorder);
assertFutureThrowsStatus(UNAVAILABLE, streamRecorder.firstValue(), 5, TimeUnit.SECONDS);
assertThrowsStatus(UNAVAILABLE, () -> this.testServiceBlockingStub.normal(Empty.getDefaultInstance()));
assertFutureThrowsStatus(UNAVAILABLE, this.testServiceFutureStub.normal(Empty.getDefaultInstance()),
5, TimeUnit.SECONDS);
log.info("--- Test completed ---");
}
@Test
public void receivedDataForFinishedStream() throws Exception {
Messages.ResponseParameters.Builder responseParameters =
Messages.ResponseParameters.newBuilder()
.setSize(1);
Messages.StreamingOutputCallRequest.Builder requestBuilder =
Messages.StreamingOutputCallRequest.newBuilder();
for (int i = 0; i < 1000; i++) {
requestBuilder.addResponseParameters(responseParameters);
}
StreamRecorder<Messages.StreamingOutputCallResponse> recorder = StreamRecorder.create();
StreamObserver<Messages.StreamingOutputCallRequest> requestStream =
asyncStub.fullDuplexCall(recorder);
Messages.StreamingOutputCallRequest request = requestBuilder.build();
requestStream.onNext(request);
recorder.firstValue().get();
requestStream.onError(new Exception("failed"));
recorder.awaitCompletion();
assertEquals(EMPTY, blockingStub.emptyCall(EMPTY));
}
@Test
public void cancelAfterFirstResponse() throws Exception {
final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder()
.setSize(31415))
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[27182])))
.build();
final StreamingOutputCallResponse goldenResponse = StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[31415])))
.build();
StreamRecorder<StreamingOutputCallResponse> responseObserver = StreamRecorder.create();
StreamObserver<StreamingOutputCallRequest> requestObserver
= asyncStub.fullDuplexCall(responseObserver);
requestObserver.onNext(request);
assertResponse(goldenResponse, responseObserver.firstValue().get());
requestObserver.onError(new RuntimeException());
responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
assertEquals(1, responseObserver.getValues().size());
assertEquals(Status.Code.CANCELLED,
Status.fromThrowable(responseObserver.getError()).getCode());
assertStatsTrace("grpc.testing.TestService/FullDuplexCall", Status.Code.CANCELLED);
}
@Test
void getAccountTestIncludesFixedEntitlements() {
ReflectionTestUtils.setField(underTest, "cbLicense", VALID_LICENSE);
underTest.initializeWorkloadPasswordPolicy();
GetAccountRequest req = GetAccountRequest.getDefaultInstance();
StreamRecorder<GetAccountResponse> observer = StreamRecorder.create();
underTest.getAccount(req, observer);
assertThat(observer.getValues().size()).isEqualTo(1);
GetAccountResponse res = observer.getValues().get(0);
assertThat(res.hasAccount()).isTrue();
Account account = res.getAccount();
List<String> entitlements = account.getEntitlementsList().stream().map(Entitlement::getEntitlementName).collect(Collectors.toList());
assertThat(entitlements).contains("CDP_AZURE", "CDP_AUTOMATIC_USERSYNC_POLLER", "CLOUDERA_INTERNAL_ACCOUNT", "DATAHUB_AZURE_AUTOSCALING",
"DATAHUB_AWS_AUTOSCALING", "LOCAL_DEV");
}
/**
* Test failing call.
*/
@Test
@DirtiesContext
public void testFailingCall() {
log.info("--- Starting tests with failing call ---");
assertThrowsStatus(UNIMPLEMENTED,
() -> TestServiceGrpc.newBlockingStub(this.channel).unimplemented(Empty.getDefaultInstance()));
final StreamRecorder<SomeType> streamRecorder = StreamRecorder.create();
this.testServiceStub.unimplemented(Empty.getDefaultInstance(), streamRecorder);
assertFutureThrowsStatus(UNIMPLEMENTED, streamRecorder.firstValue(), 5, TimeUnit.SECONDS);
assertThrowsStatus(UNIMPLEMENTED, () -> this.testServiceBlockingStub.unimplemented(Empty.getDefaultInstance()));
assertFutureThrowsStatus(UNIMPLEMENTED, this.testServiceFutureStub.unimplemented(Empty.getDefaultInstance()),
5, TimeUnit.SECONDS);
log.info("--- Test completed ---");
}
/**
* Test failing call with broken setup.
*/
@Test
@DirtiesContext
public void testFailingCallWithBrokenSetup() {
log.info("--- Starting tests with failing call with broken setup ---");
assertThrowsStatus(UNAVAILABLE,
() -> TestServiceGrpc.newBlockingStub(this.channel).unimplemented(Empty.getDefaultInstance()));
final StreamRecorder<SomeType> streamRecorder = StreamRecorder.create();
this.testServiceStub.unimplemented(Empty.getDefaultInstance(), streamRecorder);
assertFutureThrowsStatus(UNAVAILABLE, streamRecorder.firstValue(), 5, TimeUnit.SECONDS);
assertThrowsStatus(UNAVAILABLE, () -> this.testServiceBlockingStub.unimplemented(Empty.getDefaultInstance()));
assertFutureThrowsStatus(UNAVAILABLE, this.testServiceFutureStub.unimplemented(Empty.getDefaultInstance()),
5, TimeUnit.SECONDS);
log.info("--- Test completed ---");
}
@ParameterizedTest(name = "{0}")
@MethodSource("conditionalEntitlementDataProvider")
void getAccountTestIncludesConditionalEntitlement(String testCaseName, String conditionFieldName, boolean condition, String entitlementName,
boolean entitlementPresentExpected) {
ReflectionTestUtils.setField(underTest, "cbLicense", VALID_LICENSE);
underTest.initializeWorkloadPasswordPolicy();
ReflectionTestUtils.setField(underTest, conditionFieldName, condition);
GetAccountRequest req = GetAccountRequest.getDefaultInstance();
StreamRecorder<GetAccountResponse> observer = StreamRecorder.create();
underTest.getAccount(req, observer);
assertThat(observer.getValues().size()).isEqualTo(1);
GetAccountResponse res = observer.getValues().get(0);
assertThat(res.hasAccount()).isTrue();
Account account = res.getAccount();
List<String> entitlements = account.getEntitlementsList().stream().map(Entitlement::getEntitlementName).collect(Collectors.toList());
if (entitlementPresentExpected) {
assertThat(entitlements).contains(entitlementName);
} else {
assertThat(entitlements).doesNotContain(entitlementName);
}
}
@Test
public void clientStreaming() throws Exception {
final List<StreamingInputCallRequest> requests = Arrays.asList(
StreamingInputCallRequest.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[27182])))
.build(),
StreamingInputCallRequest.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[8])))
.build(),
StreamingInputCallRequest.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[1828])))
.build(),
StreamingInputCallRequest.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[45904])))
.build());
final StreamingInputCallResponse goldenResponse = StreamingInputCallResponse.newBuilder()
.setAggregatedPayloadSize(74922)
.build();
StreamRecorder<StreamingInputCallResponse> responseObserver = StreamRecorder.create();
StreamObserver<StreamingInputCallRequest> requestObserver =
asyncStub.streamingInputCall(responseObserver);
for (StreamingInputCallRequest request : requests) {
requestObserver.onNext(request);
}
requestObserver.onCompleted();
assertEquals(goldenResponse, responseObserver.firstValue().get());
responseObserver.awaitCompletion();
assertThat(responseObserver.getValues()).hasSize(1);
Throwable t = responseObserver.getError();
if (t != null) {
throw new AssertionError(t);
}
}
@Test
public void emptyStream() throws Exception {
StreamRecorder<StreamingOutputCallResponse> responseObserver = StreamRecorder.create();
StreamObserver<StreamingOutputCallRequest> requestObserver
= asyncStub.fullDuplexCall(responseObserver);
requestObserver.onCompleted();
responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
}
@Test
public void fullDuplexCallShouldSucceed() throws Exception {
// Build the request.
List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
StreamingOutputCallRequest.Builder streamingOutputBuilder =
StreamingOutputCallRequest.newBuilder();
for (Integer size : responseSizes) {
streamingOutputBuilder.addResponseParameters(
ResponseParameters.newBuilder().setSize(size).setIntervalUs(0));
}
final StreamingOutputCallRequest request = streamingOutputBuilder.build();
StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create();
StreamObserver<StreamingOutputCallRequest> requestStream =
asyncStub.fullDuplexCall(recorder);
final int numRequests = 10;
List<StreamingOutputCallRequest> requests =
new ArrayList<>(numRequests);
for (int ix = numRequests; ix > 0; --ix) {
requests.add(request);
requestStream.onNext(request);
}
requestStream.onCompleted();
recorder.awaitCompletion();
assertSuccess(recorder);
assertEquals(responseSizes.size() * numRequests, recorder.getValues().size());
for (int ix = 0; ix < recorder.getValues().size(); ++ix) {
StreamingOutputCallResponse response = recorder.getValues().get(ix);
int length = response.getPayload().getBody().size();
int expectedSize = responseSizes.get(ix % responseSizes.size());
assertEquals("comparison failed at index " + ix, expectedSize, length);
}
assertStatsTrace("grpc.testing.TestService/FullDuplexCall", Status.Code.OK, requests,
recorder.getValues());
}
@Test
public void halfDuplexCallShouldSucceed() throws Exception {
// Build the request.
List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
StreamingOutputCallRequest.Builder streamingOutputBuilder =
StreamingOutputCallRequest.newBuilder();
for (Integer size : responseSizes) {
streamingOutputBuilder.addResponseParameters(
ResponseParameters.newBuilder().setSize(size).setIntervalUs(0));
}
final StreamingOutputCallRequest request = streamingOutputBuilder.build();
StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create();
StreamObserver<StreamingOutputCallRequest> requestStream = asyncStub.halfDuplexCall(recorder);
final int numRequests = 10;
for (int ix = numRequests; ix > 0; --ix) {
requestStream.onNext(request);
}
requestStream.onCompleted();
recorder.awaitCompletion();
assertSuccess(recorder);
assertEquals(responseSizes.size() * numRequests, recorder.getValues().size());
for (int ix = 0; ix < recorder.getValues().size(); ++ix) {
StreamingOutputCallResponse response = recorder.getValues().get(ix);
int length = response.getPayload().getBody().size();
int expectedSize = responseSizes.get(ix % responseSizes.size());
assertEquals("comparison failed at index " + ix, expectedSize, length);
}
}
@Test
public void halfDuplexCallShouldSucceed() throws Exception {
// Build the request.
List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
StreamingOutputCallRequest.Builder streamingOutputBuilder =
StreamingOutputCallRequest.newBuilder();
for (Integer size : responseSizes) {
streamingOutputBuilder.addResponseParameters(
ResponseParameters.newBuilder().setSize(size).setIntervalUs(0));
}
final StreamingOutputCallRequest request = streamingOutputBuilder.build();
StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create();
StreamObserver<StreamingOutputCallRequest> requestStream = asyncStub.halfDuplexCall(recorder);
final int numRequests = 10;
for (int ix = numRequests; ix > 0; --ix) {
requestStream.onNext(request);
}
requestStream.onCompleted();
recorder.awaitCompletion();
assertSuccess(recorder);
assertEquals(responseSizes.size() * numRequests, recorder.getValues().size());
for (int ix = 0; ix < recorder.getValues().size(); ++ix) {
StreamingOutputCallResponse response = recorder.getValues().get(ix);
int length = response.getPayload().getBody().size();
int expectedSize = responseSizes.get(ix % responseSizes.size());
assertEquals("comparison failed at index " + ix, expectedSize, length);
}
}
/** Start a fullDuplexCall which the server will not respond, and verify the deadline expires. */
@SuppressWarnings("MissingFail")
@Test
public void timeoutOnSleepingServer() throws Exception {
TestServiceGrpc.TestServiceStub stub =
asyncStub.withDeadlineAfter(1, TimeUnit.MILLISECONDS);
StreamRecorder<StreamingOutputCallResponse> responseObserver = StreamRecorder.create();
StreamObserver<StreamingOutputCallRequest> requestObserver
= stub.fullDuplexCall(responseObserver);
StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[27182])))
.build();
try {
requestObserver.onNext(request);
} catch (IllegalStateException expected) {
// This can happen if the stream has already been terminated due to deadline exceeded.
}
assertTrue(responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
assertEquals(0, responseObserver.getValues().size());
assertEquals(Status.DEADLINE_EXCEEDED.getCode(),
Status.fromThrowable(responseObserver.getError()).getCode());
if (metricsExpected()) {
// CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be
// recorded. The tracer stats rely on the stream being created, which is not always the case
// in this test, thus we will not check that.
MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkStartTags(clientStartRecord, "grpc.testing.TestService/FullDuplexCall");
MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkEndTags(
clientEndRecord,
"grpc.testing.TestService/FullDuplexCall",
Status.DEADLINE_EXCEEDED.getCode());
}
}
@Test
public void fileContainingSymbol() throws Exception {
ServerReflectionRequest request =
ServerReflectionRequest.newBuilder()
.setHost(TEST_HOST)
.setFileContainingSymbol("grpc.reflection.testing.ReflectableService.Method")
.build();
List<ByteString> goldenResponse =
Arrays.asList(
ReflectionTestProto.getDescriptor().toProto().toByteString(),
ReflectionTestDepthTwoProto.getDescriptor().toProto().toByteString(),
ReflectionTestDepthTwoAlternateProto.getDescriptor().toProto().toByteString(),
ReflectionTestDepthThreeProto.getDescriptor().toProto().toByteString());
StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver =
stub.serverReflectionInfo(responseObserver);
requestObserver.onNext(request);
requestObserver.onCompleted();
List<ByteString> response =
responseObserver
.firstValue()
.get()
.getFileDescriptorResponse()
.getFileDescriptorProtoList();
assertEquals(goldenResponse.size(), response.size());
assertEquals(new HashSet<ByteString>(goldenResponse), new HashSet<ByteString>(response));
}
@Test
public void fileContainingExtension() throws Exception {
ServerReflectionRequest request =
ServerReflectionRequest.newBuilder()
.setHost(TEST_HOST)
.setFileContainingExtension(
ExtensionRequest.newBuilder()
.setContainingType("grpc.reflection.testing.ThirdLevelType")
.setExtensionNumber(100)
.build())
.build();
List<ByteString> goldenResponse =
Arrays.asList(
ReflectionTestProto.getDescriptor().toProto().toByteString(),
ReflectionTestDepthTwoProto.getDescriptor().toProto().toByteString(),
ReflectionTestDepthTwoAlternateProto.getDescriptor().toProto().toByteString(),
ReflectionTestDepthThreeProto.getDescriptor().toProto().toByteString());
StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver =
stub.serverReflectionInfo(responseObserver);
requestObserver.onNext(request);
requestObserver.onCompleted();
List<ByteString> response =
responseObserver
.firstValue()
.get()
.getFileDescriptorResponse()
.getFileDescriptorProtoList();
assertEquals(goldenResponse.size(), response.size());
assertEquals(new HashSet<ByteString>(goldenResponse), new HashSet<ByteString>(response));
}
@Test
public void fileContainingNestedExtension() throws Exception {
ServerReflectionRequest request =
ServerReflectionRequest.newBuilder()
.setHost(TEST_HOST)
.setFileContainingExtension(
ExtensionRequest.newBuilder()
.setContainingType("grpc.reflection.testing.ThirdLevelType")
.setExtensionNumber(101)
.build())
.build();
ServerReflectionResponse goldenResponse =
ServerReflectionResponse.newBuilder()
.setValidHost(TEST_HOST)
.setOriginalRequest(request)
.setFileDescriptorResponse(
FileDescriptorResponse.newBuilder()
.addFileDescriptorProto(
ReflectionTestDepthTwoProto.getDescriptor().toProto().toByteString())
.addFileDescriptorProto(
ReflectionTestDepthThreeProto.getDescriptor().toProto().toByteString())
.build())
.build();
StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver =
stub.serverReflectionInfo(responseObserver);
requestObserver.onNext(request);
requestObserver.onCompleted();
assertEquals(goldenResponse, responseObserver.firstValue().get());
}
@Test
public void testGetWorkloadCredentials() throws IOException {
Path sshPublicKeyFilePath = Files.createTempFile("key", ".pub");
Files.writeString(sshPublicKeyFilePath, SAMPLE_SSH_PUBLIC_KEY);
ReflectionTestUtils.setField(underTest, "sshPublicKeyFilePath", sshPublicKeyFilePath.toString());
try {
underTest.initializeActorWorkloadCredentials();
long currentTime = System.currentTimeMillis();
GetActorWorkloadCredentialsRequest req = GetActorWorkloadCredentialsRequest.newBuilder()
.setActorCrn(Crn.builder()
.setPartition(Crn.Partition.CDP)
.setAccountId(UUID.randomUUID().toString())
.setService(Crn.Service.IAM)
.setResourceType(Crn.ResourceType.USER)
.setResource(UUID.randomUUID().toString())
.build().toString())
.build();
StreamRecorder<GetActorWorkloadCredentialsResponse> observer = StreamRecorder.create();
underTest.getActorWorkloadCredentials(req, observer);
assertThat(observer.getValues().size()).isEqualTo(1);
GetActorWorkloadCredentialsResponse res = observer.getValues().get(0);
assertThat(res).isNotNull();
assertThat(res.getPasswordHash()).isNotNull();
assertThat(res.getKerberosKeysList()).isNotNull();
assertThat(res.getKerberosKeysList().size()).isEqualTo(2);
assertThat(res.getPasswordHashExpirationDate() > currentTime).isTrue();
assertThat(res.getSshPublicKeyCount()).isEqualTo(1);
assertThat(res.getSshPublicKey(0).getPublicKey()).isEqualTo(SAMPLE_SSH_PUBLIC_KEY);
} finally {
Files.delete(sshPublicKeyFilePath);
}
}
@Test
public void fullDuplexCallShouldSucceed() throws Exception {
// Build the request.
List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
StreamingOutputCallRequest.Builder streamingOutputBuilder =
StreamingOutputCallRequest.newBuilder();
for (Integer size : responseSizes) {
streamingOutputBuilder.addResponseParameters(
ResponseParameters.newBuilder().setSize(size).setIntervalUs(0));
}
final StreamingOutputCallRequest request = streamingOutputBuilder.build();
StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create();
StreamObserver<StreamingOutputCallRequest> requestStream =
asyncStub.fullDuplexCall(recorder);
final int numRequests = 10;
List<StreamingOutputCallRequest> requests =
new ArrayList<>(numRequests);
for (int ix = numRequests; ix > 0; --ix) {
requests.add(request);
requestStream.onNext(request);
}
requestStream.onCompleted();
recorder.awaitCompletion();
assertSuccess(recorder);
assertEquals(responseSizes.size() * numRequests, recorder.getValues().size());
for (int ix = 0; ix < recorder.getValues().size(); ++ix) {
StreamingOutputCallResponse response = recorder.getValues().get(ix);
int length = response.getPayload().getBody().size();
int expectedSize = responseSizes.get(ix % responseSizes.size());
assertEquals("comparison failed at index " + ix, expectedSize, length);
}
assertStatsTrace("grpc.testing.TestService/FullDuplexCall", Status.Code.OK, requests,
recorder.getValues());
}
protected void assertClientStreamingCallFailure(final TestServiceStub serviceStub, final Code expectedCode) {
final StreamRecorder<Empty> responseRecorder = StreamRecorder.create();
final StreamObserver<SomeType> requestObserver = serviceStub.secureDrain(responseRecorder);
// Let the server throw an exception if he receives that (assert security):
sendAndComplete(requestObserver, "explode");
assertFutureThrowsStatus(expectedCode, responseRecorder, 15, SECONDS);
}
/**
* Tests server per-message compression in a streaming response. Ideally we would assert that the
* responses have the requested compression, but this is not supported by the API. Given a
* compliant server, this test will exercise the code path for receiving a compressed response but
* cannot itself verify that the response was compressed.
*/
public void serverCompressedStreaming() throws Exception {
final StreamingOutputCallRequest request =
StreamingOutputCallRequest.newBuilder()
.addResponseParameters(
ResponseParameters.newBuilder()
.setCompressed(BoolValue.newBuilder().setValue(true))
.setSize(31415))
.addResponseParameters(
ResponseParameters.newBuilder()
.setCompressed(BoolValue.newBuilder().setValue(false))
.setSize(92653))
.build();
final List<StreamingOutputCallResponse> goldenResponses =
Arrays.asList(
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[31415])))
.build(),
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[92653])))
.build());
StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create();
asyncStub.streamingOutputCall(request, recorder);
recorder.awaitCompletion();
assertSuccess(recorder);
assertResponses(goldenResponses, recorder.getValues());
}
protected void assertUnarySuccessfulMethod(final TestServiceStub serviceStub,
final TriConsumer<TestServiceStub, Empty, StreamRecorder<SomeType>> serviceCall,
final TestServiceBlockingStub blockingStub,
final BiFunction<TestServiceBlockingStub, Empty, SomeType> blockingcall,
final TestServiceFutureStub futureStub,
final BiFunction<TestServiceFutureStub, Empty, ListenableFuture<SomeType>> futureCall) {
final StreamRecorder<SomeType> responseRecorder = StreamRecorder.create();
serviceCall.accept(serviceStub, EMPTY, responseRecorder);
assertFutureFirstEquals("1.2.3", responseRecorder, SomeType::getVersion, 5, SECONDS);
assertEquals("1.2.3", blockingcall.apply(blockingStub, EMPTY).getVersion());
assertFutureEquals("1.2.3", futureCall.apply(futureStub, EMPTY), SomeType::getVersion, 5, SECONDS);
}