下面列出了io.grpc.testing.GrpcCleanupRule#io.grpc.inprocess.InProcessServerBuilder 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Before
public void setUp() throws Exception {
GreeterImplBase greeterImplBase =
new GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
responseObserver.onNext(HelloReply.getDefaultInstance());
responseObserver.onCompleted();
}
};
// Generate a unique in-process server name.
String serverName = InProcessServerBuilder.generateName();
// Create a server, add service, start, and register for automatic graceful shutdown.
grpcCleanup.register(InProcessServerBuilder.forName(serverName).directExecutor()
.addService(ServerInterceptors.intercept(greeterImplBase, new HeaderServerInterceptor()))
.build().start());
// Create a client channel and register for automatic graceful shutdown.
channel =
grpcCleanup.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());
}
/**
* Before the test has started, create the server and channel.
*/
@Override
public void beforeEach(ExtensionContext context) throws Exception {
serverName = UUID.randomUUID().toString();
serviceRegistry = new MutableHandlerRegistry();
InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName(serverName)
.fallbackHandlerRegistry(serviceRegistry);
if (useDirectExecutor) {
serverBuilder.directExecutor();
}
server = serverBuilder.build().start();
InProcessChannelBuilder channelBuilder = InProcessChannelBuilder.forName(serverName);
if (useDirectExecutor) {
channelBuilder.directExecutor();
}
channel = channelBuilder.build();
}
@Before
public void setUp() throws Exception {
String serverName = InProcessServerBuilder.generateName();
grpcCleanup.register(InProcessServerBuilder
.forName(serverName)
.directExecutor()
.addService(cacheService)
.build()
.start());
channel = grpcCleanup.register(InProcessChannelBuilder
.forName(serverName)
.directExecutor()
.build());
when(managedChannelFactory.getInstance()).thenReturn(channel);
}
/**
* Starts a live instance of a mock implementation of the broker server.
*/
static void startServer(FakeBrokerImpl fakeServer, GrpcCleanupRule grpcCleanup) {
String serverName = InProcessServerBuilder.generateName();
try {
grpcCleanup.register(InProcessServerBuilder.forName(serverName).directExecutor()
.addService(ServerInterceptors.intercept(fakeServer, new AuthorizationHeaderServerInterceptor()))
.build().start());
} catch (IOException e) {
throw new RuntimeException(e);
}
ManagedChannel channel = grpcCleanup.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());
BrokerGrpc.BrokerBlockingStub stub = BrokerGrpc.newBlockingStub(channel);
mockStatic(GrpcUtils.class);
when(GrpcUtils.newManagedChannel(BROKER_HOST, 1234, false, null)).thenReturn(channel);
when(GrpcUtils.newStub(channel)).thenReturn(stub);
}
@Before
public void setUp() throws Exception {
// Generate a unique in-process server name.
String serverName = InProcessServerBuilder.generateName();
// Create a server, add service, start, and register for automatic graceful shutdown.
grpcCleanup.register(InProcessServerBuilder
.forName(serverName).directExecutor().addService(serviceImpl).build().start());
// Create a client channel and register for automatic graceful shutdown.
ManagedChannel channel = grpcCleanup.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
// Create a client using the in-process channel;
client = new GoalStateProvisionerClient(channel);
}
@Before
public void setup() throws Exception {
this.grpcRule = new GrpcCleanupRule();
// setup fake serving service
String serverName = InProcessServerBuilder.generateName();
this.grpcRule.register(
InProcessServerBuilder.forName(serverName)
.directExecutor()
.addService(this.servingMock)
.build()
.start());
// setup test feast client target
ManagedChannel channel =
this.grpcRule.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
this.client = new FeastClient(channel);
}
/**
* To test the server, make calls with a real stub using the in-process channel, and verify
* behaviors or state changes from the client side.
*/
@Test
public void greeterImpl_replyMessage() throws Exception {
// Generate a unique in-process server name.
String serverName = InProcessServerBuilder.generateName();
// Create a server, add service, start, and register for automatic graceful shutdown.
grpcCleanup.register(InProcessServerBuilder
.forName(serverName).directExecutor().addService(new GreeterImpl()).build().start());
GreeterGrpc.GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(
// Create a client channel and register for automatic graceful shutdown.
grpcCleanup.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()));
HelloReply reply =
blockingStub.sayHello(HelloRequest.newBuilder().setName( "test name").build());
assertEquals("Hello test name", reply.getMessage());
}
@Before
public void setUp() throws Exception {
// Generate a unique in-process server name
String serverName = InProcessServerBuilder.generateName();
// Use a mutable service registry for later registering the service impl for each test case.
grpcCleanup.register(
InProcessServerBuilder.forName(serverName)
.fallbackHandlerRegistry(serviceRegistry)
.directExecutor()
.build()
.start());
client =
new LogdClient(
InProcessChannelBuilder.forName(serverName).directExecutor(),
new TestStreamObserverFactory(testHelper));
}
@Before
public void setUp() throws IOException {
// Generate a unique in-process server name.
String serverName = InProcessServerBuilder.generateName();
// Create a server, add service, start, and register for automatic graceful shutdown.
grpcCleanup.register(InProcessServerBuilder.forName(serverName).directExecutor()
.addService(ServerInterceptors.intercept(
new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(
HelloRequest request, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder()
.setMessage("AuthClientTest user=" + request.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
},
mockServerInterceptor))
.build().start());
CallCredentials credentials = new JwtCredential("test-client");
ManagedChannel channel = InProcessChannelBuilder.forName(serverName).directExecutor().build();
client = new AuthClient(credentials, channel);
}
@Setup
public void setup() throws IOException {
System.out.println("---------- SETUP ONCE -------------");
ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(Runtime.getRuntime()
.availableProcessors());
reactiveServer =
InProcessServerBuilder.forName("benchmark-reactiveServer")
.scheduledExecutorService(scheduledExecutorService)
.addService(new BenchmarkRxServerServiceImpl(100000))
.build()
.start();
reactiveChannel = InProcessChannelBuilder.forName("benchmark-reactiveServer")
.build();
reactiveClient = RxBenchmarkServiceGrpc.newRxStub(reactiveChannel);
}
@Before
public void setUp() throws Exception {
// Generate a unique in-process server name.
String serverName = InProcessServerBuilder.generateName();
// Create a server, add service, start, and register for automatic graceful shutdown.
grpcCleanup.register(InProcessServerBuilder
.forName(serverName).directExecutor().addService(serviceImpl).build().start());
// Create a client channel and register for automatic graceful shutdown.
ManagedChannel channel = grpcCleanup.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
// Create a HelloWorldClient using the in-process channel;
client = new HelloWorldClient(channel);
}
@Setup
public void setup() throws IOException {
System.out.println("---------- SETUP ONCE -------------");
ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(Runtime.getRuntime()
.availableProcessors());
gRpcServer =
InProcessServerBuilder.forName("benchmark-gRpcServer")
.scheduledExecutorService(scheduledExecutorService)
.addService(new com.salesforce.reactivegrpc.jmh.BenchmarkGRpcServerServiceImpl(100000))
.build()
.start();
gRpcChannel = InProcessChannelBuilder.forName("benchmark-gRpcServer")
.build();
gRpcClient = BenchmarkServiceGrpc.newStub(gRpcChannel);
}
public TestRemoteExecutionClients(List<BindableService> services) throws IOException {
eventBus = new DefaultBuckEventBus(new DefaultClock(), new BuildId("dontcare"));
String serverName = "uniquish-" + new Random().nextLong();
InProcessServerBuilder serverBuilder =
InProcessServerBuilder.forName(serverName).directExecutor();
for (BindableService service : services) {
serverBuilder.addService(service);
}
server = serverBuilder.build().start();
ManagedChannel channel = InProcessChannelBuilder.forName(serverName).directExecutor().build();
clients =
new GrpcRemoteExecutionClients(
"buck",
channel,
channel,
100,
MetadataProviderFactory.emptyMetadataProvider(),
eventBus,
FakeBuckConfig.builder()
.build()
.getView(RemoteExecutionConfig.class)
.getStrategyConfig());
}
@Test
public void testNoRetryOnOk() throws IOException {
String uniqueName = InProcessServerBuilder.generateName();
ExecutionImpl service = new ExecutionImpl(Status.OK, 0);
InProcessServerBuilder.forName(uniqueName).addService(service).build().start();
CallCounter beforeRetry = new CallCounter();
ManagedChannel channel =
InProcessChannelBuilder.forName(uniqueName)
.intercept(
new RetryClientInterceptor(
RetryPolicy.builder().setMaxRetries(2).setBeforeRetry(beforeRetry).build()))
.build();
ExecutionBlockingStub stub = ExecutionGrpc.newBlockingStub(channel);
stub.execute(ExecuteRequest.newBuilder().build()).forEachRemaining(resp -> {});
Assert.assertEquals(1, service.calls);
Assert.assertEquals(0, beforeRetry.calls);
}
private void createServer(CommandDispatcher dispatcher) throws Exception {
serverDirectory = fileSystem.getPath("/bazel_server_directory");
serverDirectory.createDirectoryAndParents();
FileSystemUtils.writeContentAsLatin1(serverDirectory.getChild("server.pid.txt"), "12345");
serverImpl =
new GrpcServerImpl(
dispatcher,
new JavaClock(),
/* port= */ -1,
REQUEST_COOKIE,
"response-cookie",
serverDirectory,
1000,
false,
false);
String uniqueName = InProcessServerBuilder.generateName();
server =
InProcessServerBuilder.forName(uniqueName)
.directExecutor()
.addService(serverImpl)
.build()
.start();
channel = InProcessChannelBuilder.forName(uniqueName).directExecutor().build();
}
@Before
public void setUp() throws Exception {
// Generate a unique in-process server name.
String serverName = InProcessServerBuilder.generateName();
// Create a server, add service, start, and register for automatic graceful shutdown.
grpcCleanup.register(InProcessServerBuilder.forName(serverName).
fallbackHandlerRegistry(serviceRegistry).directExecutor().build().start());
// Create a client channel and register for automatic graceful shutdown.
ManagedChannel channel = grpcCleanup.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
// Create a TccEventServiceStub using the in-process channel;
service = new GrpcTccClientMessageSender(serviceConfig, channel, address, handler, null);
}
@Bean
public GrpcServerFactory factory() {
return new SimpleGrpcServerFactory() {
@Override
public Server buildServerForServices(int port, Collection<BindableService> services) {
System.out.println("Building an IN-PROC service for " + services.size() + " services");
ServerBuilder builder = InProcessServerBuilder.forName(SERVER_NAME);
services.forEach(builder::addService);
return builder.build();
}
@Override
public List<Class<? extends Annotation>> forAnnotations() {
return ImmutableList.of(InProcessGrpcService.class);
}
};
}
@Before
public void setup() throws IOException {
final MockJobManagerService mockJobManagerService = new MockJobManagerService();
testServer = InProcessServerBuilder
.forName("testServer")
.directExecutor()
.addService(mockJobManagerService)
.build()
.start();
final ManagedChannel channel = InProcessChannelBuilder
.forName("testServer")
.directExecutor()
.usePlaintext(true)
.build();
final JobManagementServiceStub jobManagementServiceStub = JobManagementServiceGrpc.newStub(channel);
final JobManagementServiceFutureStub jobManagementServiceFutureStub = JobManagementServiceGrpc.newFutureStub(channel);
titusClient = new TitusClientImpl(jobManagementServiceStub, jobManagementServiceFutureStub, new DefaultRegistry());
}
@BeforeClass
public static void setUp() throws IOException {
final String name = InProcessServerBuilder.generateName();
server = InProcessServerBuilder.forName(name)
.directExecutor()
.addService(new JobsServiceAdapter(p(LocalJobsService.class)))
.addService(new Chronicle(p(LocalJobsService.class)))
.build();
server.start();
channel = InProcessChannelBuilder.forName(name)
.directExecutor()
.build();
asyncStub = JobsServiceGrpc.newStub(channel);
chronicleStub = ChronicleGrpc.newBlockingStub(channel);
}
@Test
public void healthServing() throws Exception {
// Generate a unique in-process server name.
String serverName = InProcessServerBuilder.generateName();
HealthCheckAggregator hca = mock(HealthCheckAggregator.class);
CompletableFuture<HealthCheckStatus> hcsf = mock(CompletableFuture.class);
HealthCheckStatus hcs = mock(HealthCheckStatus.class);
when(hcs.isHealthy()).thenReturn(true);
when(hcsf.get()).thenReturn(hcs);
when(hca.check()).thenReturn(hcsf);
HealthServiceImpl healthyService = new HealthServiceImpl(hca);
addService(serverName, healthyService);
HealthGrpc.HealthBlockingStub blockingStub = HealthGrpc.newBlockingStub(
// Create a client channel and register for automatic graceful shutdown.
grpcCleanup.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()));
HealthCheckResponse reply = blockingStub.check(HealthCheckRequest.newBuilder().build());
assertEquals(HealthCheckResponse.ServingStatus.SERVING, reply.getStatus());
}
@Before
public final void setUp() throws Exception {
// Use a mutable service registry for later registering the service impl for each test case.
fakeServer =
InProcessServerBuilder.forName(fakeServerName)
.fallbackHandlerRegistry(serviceRegistry)
.directExecutor()
.build()
.start();
logStream = Mockito.mock(AsynchronousFileOutputStream.class);
clock = new ManualClock();
interceptor = new LoggingInterceptor(logStream, clock);
loggedChannel =
ClientInterceptors.intercept(
InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), interceptor);
}
@Test
public void healthException() throws Exception {
// Generate a unique in-process server name.
String serverName = InProcessServerBuilder.generateName();
HealthCheckAggregator hca = mock(HealthCheckAggregator.class);
CompletableFuture<HealthCheckStatus> hcsf = mock(CompletableFuture.class);
when(hcsf.get()).thenThrow(InterruptedException.class);
when(hca.check()).thenReturn(hcsf);
HealthServiceImpl healthyService = new HealthServiceImpl(hca);
addService(serverName, healthyService);
HealthGrpc.HealthBlockingStub blockingStub = HealthGrpc.newBlockingStub(
// Create a client channel and register for automatic graceful shutdown.
grpcCleanup.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()));
thrown.expect(StatusRuntimeException.class);
thrown.expect(hasProperty("status", is(Status.INTERNAL)));
blockingStub.check(HealthCheckRequest.newBuilder().build());
}
private RegistryGrpc.RegistryBlockingStub getSchemaRegistryStub(MetaStore metaStore)
throws IOException {
String serverName = InProcessServerBuilder.generateName();
grpcCleanup.register(
InProcessServerBuilder.forName(serverName)
.directExecutor()
.addService(new RegistryService(metaStore))
.build()
.start());
return RegistryGrpc.newBlockingStub(
grpcCleanup.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()));
}
/**
* The in-process strategy starts up a grpc remote execution service in process and connects to it
* directly.
*/
public static RemoteExecutionClients createInProcess(
BuckEventBus buckEventBus, RemoteExecutionStrategyConfig strategyConfig) throws IOException {
NamedTemporaryDirectory workDir = new NamedTemporaryDirectory("__remote__");
GrpcRemoteExecutionServiceServer remoteExecution =
new GrpcRemoteExecutionServiceServer(
new LocalContentAddressedStorage(
workDir.getPath().resolve("__cache__"),
GrpcRemoteExecutionClients.PROTOCOL,
buckEventBus),
workDir.getPath().resolve("__work__"));
InProcessServerBuilder builder = InProcessServerBuilder.forName("unique");
remoteExecution.getServices().forEach(builder::addService);
Server server = builder.build().start();
ManagedChannel channel = InProcessChannelBuilder.forName("unique").build();
return new GrpcRemoteExecutionClients(
"in-process",
channel,
channel,
CAS_DEADLINE_S,
MetadataProviderFactory.emptyMetadataProvider(),
buckEventBus,
strategyConfig) {
@Override
public void close() throws IOException {
try (Closer closer = Closer.create()) {
closer.register(server::shutdown);
closer.register(workDir);
closer.register(super::close);
}
try {
server.awaitTermination();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
}
@Override
public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) {
try {
grpcCleanupRule.register(
InProcessServerBuilder.forName(target)
.addService(rlsServerImpl)
.directExecutor()
.build()
.start());
} catch (IOException e) {
throw new RuntimeException("cannot create server: " + target, e);
}
final InProcessChannelBuilder builder =
InProcessChannelBuilder.forName(target).directExecutor();
class CleaningChannelBuilder extends ForwardingChannelBuilder<CleaningChannelBuilder> {
@Override
protected ManagedChannelBuilder<?> delegate() {
return builder;
}
@Override
public ManagedChannel build() {
return grpcCleanupRule.register(super.build());
}
}
return new CleaningChannelBuilder();
}
@Test
public void clientHeaderDeliveredToServer() throws Exception {
// Generate a unique in-process server name.
String serverName = InProcessServerBuilder.generateName();
// Create a server, add service, start, and register for automatic graceful shutdown.
grpcCleanup.register(InProcessServerBuilder.forName(serverName).directExecutor()
.addService(ServerInterceptors.intercept(new GreeterImplBase() {}, mockServerInterceptor))
.build().start());
// Create a client channel and register for automatic graceful shutdown.
ManagedChannel channel = grpcCleanup.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(
ClientInterceptors.intercept(channel, new HeaderClientInterceptor()));
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
try {
blockingStub.sayHello(HelloRequest.getDefaultInstance());
fail();
} catch (StatusRuntimeException expected) {
// expected because the method is not implemented at server side
}
verify(mockServerInterceptor).interceptCall(
Matchers.<ServerCall<HelloRequest, HelloReply>>any(),
metadataCaptor.capture(),
Matchers.<ServerCallHandler<HelloRequest, HelloReply>>any());
assertEquals(
"customRequestValue",
metadataCaptor.getValue().get(HeaderClientInterceptor.CUSTOM_HEADER_KEY));
}
@Test
public void testDeadlinePropagation() throws Exception {
final AtomicInteger recursionDepthRemaining = new AtomicInteger(3);
final SettableFuture<Deadline> finalDeadline = SettableFuture.create();
class DeadlineSaver extends TestServiceGrpc.TestServiceImplBase {
@Override
public void unaryCall(final SimpleRequest request,
final StreamObserver<SimpleResponse> responseObserver) {
Context.currentContextExecutor(otherWork).execute(new Runnable() {
@Override
public void run() {
try {
if (recursionDepthRemaining.decrementAndGet() == 0) {
finalDeadline.set(Context.current().getDeadline());
responseObserver.onNext(SimpleResponse.getDefaultInstance());
} else {
responseObserver.onNext(blockingStub.unaryCall(request));
}
responseObserver.onCompleted();
} catch (Exception ex) {
responseObserver.onError(ex);
}
}
});
}
}
server = InProcessServerBuilder.forName("channel").executor(otherWork)
.addService(new DeadlineSaver())
.build().start();
Deadline initialDeadline = Deadline.after(1, TimeUnit.MINUTES);
blockingStub.withDeadline(initialDeadline).unaryCall(SimpleRequest.getDefaultInstance());
assertNotSame(initialDeadline, finalDeadline);
// Since deadline is re-calculated at each hop, some variance is acceptable and expected.
assertAbout(deadline())
.that(finalDeadline.get()).isWithin(1, TimeUnit.SECONDS).of(initialDeadline);
}
@Before
public void setUp() throws Exception {
SimpleServiceGrpc.SimpleServiceImplBase simpleServiceImpl =
new SimpleServiceGrpc.SimpleServiceImplBase() {
@Override
public void unaryRpc(
SimpleRequest request, StreamObserver<SimpleResponse> responseObserver) {
for (Map.Entry<String, Double> entry : applicationMetrics.entrySet()) {
CallMetricRecorder.getCurrent().recordCallMetric(entry.getKey(), entry.getValue());
}
SimpleResponse response =
SimpleResponse.newBuilder().setResponseMessage("Simple response").build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
};
ServerInterceptor metricReportingServerInterceptor = new OrcaMetricReportingServerInterceptor();
String serverName = InProcessServerBuilder.generateName();
grpcCleanupRule.register(
InProcessServerBuilder
.forName(serverName)
.directExecutor()
.addService(
ServerInterceptors.intercept(simpleServiceImpl, metricReportingServerInterceptor))
.build().start());
ManagedChannel baseChannel =
grpcCleanupRule.register(InProcessChannelBuilder.forName(serverName).build());
channelToUse =
ClientInterceptors.intercept(
baseChannel, new TrailersCapturingClientInterceptor(trailersCapture));
}
@Before
public void setUp() throws Exception {
reflectionService = ProtoReflectionService.newInstance();
server =
InProcessServerBuilder.forName("proto-reflection-test")
.directExecutor()
.addService(reflectionService)
.addService(new ReflectableServiceGrpc.ReflectableServiceImplBase() {})
.fallbackHandlerRegistry(handlerRegistry)
.build()
.start();
channel = InProcessChannelBuilder.forName("proto-reflection-test").directExecutor().build();
stub = ServerReflectionGrpc.newStub(channel);
}
@Test
public void blockingServerStreamingCall2_interruptedWaitsForOnClose() throws Exception {
Integer req = 2;
class NoopServerStreamingMethod implements ServerStreamingMethod<Integer, Integer> {
ServerCallStreamObserver<Integer> observer;
@Override public void invoke(Integer request, StreamObserver<Integer> responseObserver) {
observer = (ServerCallStreamObserver<Integer>) responseObserver;
}
}
NoopServerStreamingMethod methodImpl = new NoopServerStreamingMethod();
server = InProcessServerBuilder.forName("noop").directExecutor()
.addService(ServerServiceDefinition.builder("some")
.addMethod(SERVER_STREAMING_METHOD, ServerCalls.asyncServerStreamingCall(methodImpl))
.build())
.build().start();
InterruptInterceptor interceptor = new InterruptInterceptor();
channel = InProcessChannelBuilder.forName("noop")
.directExecutor()
.intercept(interceptor)
.build();
Iterator<Integer> iter = ClientCalls.blockingServerStreamingCall(
channel, SERVER_STREAMING_METHOD, CallOptions.DEFAULT, req);
try {
iter.next();
fail();
} catch (StatusRuntimeException ex) {
assertTrue(Thread.interrupted());
assertTrue("interrupted", ex.getCause() instanceof InterruptedException);
}
assertTrue("onCloseCalled", interceptor.onCloseCalled);
assertTrue("context not cancelled", methodImpl.observer.isCancelled());
}