下面列出了io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener#com.google.protobuf.util.Durations 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
static SocketOption toSocketOptionLinger(int lingerSeconds) {
final SocketOptionLinger lingerOpt;
if (lingerSeconds >= 0) {
lingerOpt = SocketOptionLinger
.newBuilder()
.setActive(true)
.setDuration(Durations.fromSeconds(lingerSeconds))
.build();
} else {
lingerOpt = SocketOptionLinger.getDefaultInstance();
}
return SocketOption
.newBuilder()
.setName(SO_LINGER)
.setAdditional(Any.pack(lingerOpt))
.build();
}
private Span fromProto(Model.Span protoSpan) {
Span span = new Span();
span.traceId = asHexString(protoSpan.getTraceId());
span.spanId = asHexString(protoSpan.getSpanId());
span.operationName = protoSpan.getOperationName();
span.serviceName = protoSpan.getProcess().getServiceName();
span.startTimeMicros = Timestamps.toMicros(protoSpan.getStartTime());
span.durationMicros = Durations.toMicros(protoSpan.getDuration());
addTags(span, protoSpan.getTagsList());
addTags(span, protoSpan.getProcess().getTagsList());
if (protoSpan.getReferencesList().size() > 0) {
SpanRef reference = protoSpan.getReferences(0);
if (asHexString(reference.getTraceId()).equals(span.traceId)) {
span.parentId = asHexString(protoSpan.getReferences(0).getSpanId());
}
}
return span;
}
@Test
public void anyInMaps() throws Exception {
TestAny.Builder testAny = TestAny.newBuilder();
testAny.putAnyMap("int32_wrapper", Any.pack(Int32Value.newBuilder().setValue(123).build()));
testAny.putAnyMap("int64_wrapper", Any.pack(Int64Value.newBuilder().setValue(456).build()));
testAny.putAnyMap("timestamp", Any.pack(Timestamps.parse("1969-12-31T23:59:59Z")));
testAny.putAnyMap("duration", Any.pack(Durations.parse("12345.1s")));
testAny.putAnyMap("field_mask", Any.pack(FieldMaskUtil.fromString("foo.bar,baz")));
Value numberValue = Value.newBuilder().setNumberValue(1.125).build();
Struct.Builder struct = Struct.newBuilder();
struct.putFields("number", numberValue);
testAny.putAnyMap("struct", Any.pack(struct.build()));
Value nullValue = Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build();
testAny.putAnyMap(
"list_value",
Any.pack(ListValue.newBuilder().addValues(numberValue).addValues(nullValue).build()));
testAny.putAnyMap("number_value", Any.pack(numberValue));
testAny.putAnyMap("any_value_number", Any.pack(Any.pack(numberValue)));
testAny.putAnyMap("any_value_default", Any.pack(Any.getDefaultInstance()));
testAny.putAnyMap("default", Any.getDefaultInstance());
assertMatchesUpstream(testAny.build(), TestAllTypes.getDefaultInstance());
}
@Override
protected void validateAction(
String operationName,
Action action,
PreconditionFailure.Builder preconditionFailure,
RequestMetadata requestMetadata)
throws InterruptedException, StatusException {
if (action.hasTimeout() && config.hasMaximumActionTimeout()) {
Duration timeout = action.getTimeout();
Duration maximum = config.getMaximumActionTimeout();
if (timeout.getSeconds() > maximum.getSeconds()
|| (timeout.getSeconds() == maximum.getSeconds()
&& timeout.getNanos() > maximum.getNanos())) {
preconditionFailure
.addViolationsBuilder()
.setType(VIOLATION_TYPE_INVALID)
.setSubject(Durations.toString(timeout) + " > " + Durations.toString(maximum))
.setDescription(TIMEOUT_OUT_OF_BOUNDS);
}
}
super.validateAction(operationName, action, preconditionFailure, requestMetadata);
}
@Override
protected void validateAction(
Action action,
@Nullable Command command,
Map<Digest, Directory> directoriesIndex,
Consumer<Digest> onInputDigest,
PreconditionFailure.Builder preconditionFailure) {
if (action.hasTimeout() && hasMaxActionTimeout()) {
Duration timeout = action.getTimeout();
if (timeout.getSeconds() > maxActionTimeout.getSeconds()
|| (timeout.getSeconds() == maxActionTimeout.getSeconds()
&& timeout.getNanos() > maxActionTimeout.getNanos())) {
preconditionFailure
.addViolationsBuilder()
.setType(VIOLATION_TYPE_INVALID)
.setSubject(Durations.toString(timeout) + " > " + Durations.toString(maxActionTimeout))
.setDescription(TIMEOUT_OUT_OF_BOUNDS);
}
}
super.validateAction(action, command, directoriesIndex, onInputDigest, preconditionFailure);
}
private void removeStaleData(Timestamp now) {
// currentSlot != lastUsedSlot means stepping over to a new bucket.
// The data in the new bucket should be thrown away before storing new data.
int currentSlot = getCurrentSlot(now);
if (lastOperationCompleteTime != null && lastUsedSlot >= 0) {
Duration duration = Timestamps.between(lastOperationCompleteTime, now);
// if 1) duration between the new added operation and last added one is longer than period
// or 2) the duration is shorter than period but longer than time range of a single bucket
// and at the same time currentSlot == lastUsedSlot
if ((Durations.toMillis(duration) >= Durations.toMillis(period))
|| (lastUsedSlot == currentSlot
&& Durations.toMillis(duration) > (Durations.toMillis(period) / buckets.length))) {
for (OperationStageDurations bucket : buckets) {
bucket.reset();
}
} else if (lastUsedSlot != currentSlot) {
// currentSlot < lastUsedSlot means wrap around happened
currentSlot = currentSlot < lastUsedSlot ? currentSlot + NumOfSlots : currentSlot;
for (int i = lastUsedSlot + 1; i <= currentSlot; i++) {
buckets[i % buckets.length].reset();
}
}
}
}
void addOperations(OperationStageDurations other) {
this.queuedToMatch = Durations.add(this.queuedToMatch, other.queuedToMatch);
this.matchToInputFetchStart =
Durations.add(this.matchToInputFetchStart, other.matchToInputFetchStart);
this.inputFetchStartToComplete =
Durations.add(this.inputFetchStartToComplete, other.inputFetchStartToComplete);
this.inputFetchCompleteToExecutionStart =
Durations.add(
this.inputFetchCompleteToExecutionStart, other.inputFetchCompleteToExecutionStart);
this.executionStartToComplete =
Durations.add(this.executionStartToComplete, other.executionStartToComplete);
this.executionCompleteToOutputUploadStart =
Durations.add(
this.executionCompleteToOutputUploadStart,
other.executionCompleteToOutputUploadStart);
this.outputUploadStartToComplete =
Durations.add(this.outputUploadStartToComplete, other.outputUploadStartToComplete);
this.operationCount += other.operationCount;
}
private OperationStageDurations computeAverage(int weight) {
OperationStageDurations average = new OperationStageDurations();
if (weight == 0) {
return average;
}
average.queuedToMatch = Durations.fromNanos(Durations.toNanos(this.queuedToMatch) / weight);
average.matchToInputFetchStart =
Durations.fromNanos(Durations.toNanos(this.matchToInputFetchStart) / weight);
average.inputFetchStartToComplete =
Durations.fromNanos(Durations.toNanos(this.inputFetchStartToComplete) / weight);
average.inputFetchCompleteToExecutionStart =
Durations.fromNanos(Durations.toNanos(this.inputFetchCompleteToExecutionStart) / weight);
average.executionStartToComplete =
Durations.fromNanos(Durations.toNanos(this.executionStartToComplete) / weight);
average.executionCompleteToOutputUploadStart =
Durations.fromNanos(
Durations.toNanos(this.executionCompleteToOutputUploadStart) / weight);
average.outputUploadStartToComplete =
Durations.fromNanos(Durations.toNanos(this.outputUploadStartToComplete) / weight);
average.operationCount = this.operationCount;
return average;
}
CPSPublisherTask(StartRequest request, MetricsHandler metricsHandler, int workerCount) {
super(request, metricsHandler, workerCount);
log.warn("constructing CPS publisher");
this.payload = getPayload();
try {
this.publisher =
Publisher.newBuilder(ProjectTopicName.of(request.getProject(), request.getTopic()))
.setBatchingSettings(
BatchingSettings.newBuilder()
.setElementCountThreshold((long) request.getPublisherOptions().getBatchSize())
.setRequestByteThreshold(9500000L)
.setDelayThreshold(
Duration.ofMillis(
Durations.toMillis(request.getPublisherOptions().getBatchDuration())))
.setIsEnabled(true)
.build())
.build();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private KafkaPublisherTask(
LoadtestProto.StartRequest request, MetricsHandler handler, int workerCount) {
super(request, handler, workerCount);
this.topic = request.getTopic();
Properties props = new Properties();
props.putAll(
new ImmutableMap.Builder<>()
.put("max.block.ms", "30000")
.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
.put("acks", "all")
.put("bootstrap.servers", request.getKafkaOptions().getBroker())
.put("buffer.memory", Integer.toString(1000 * 1000 * 1000)) // 1 GB
.put(
"batch.size",
Long.toString(
getBatchSize(
request.getPublisherOptions().getBatchSize(),
request.getPublisherOptions().getMessageSize())))
.put(
"linger.ms",
Long.toString(Durations.toMillis(request.getPublisherOptions().getBatchDuration())))
.build());
this.publisher = new KafkaProducer<>(props);
}
public TestParameters apply(TestParameters source) {
TestParameters.Builder builder = source.toBuilder();
if (messageSize != null) {
builder.setMessageSize(messageSize);
}
if (burnInMinutes != null) {
builder.setBurnInDuration(Durations.fromSeconds(burnInMinutes * 60));
}
if (testMinutes != null) {
builder.setLoadtestDuration(Durations.fromSeconds(testMinutes * 60));
}
if (numCores != null) {
builder.setNumCoresPerWorker(numCores);
}
if (scalingFactor != null) {
builder.setSubscriberCpuScaling(scalingFactor);
}
return builder.build();
}
private void sendLoadReport() {
long interval = reportStopwatch.elapsed(TimeUnit.NANOSECONDS);
reportStopwatch.reset().start();
LoadStatsRequest.Builder requestBuilder = LoadStatsRequest.newBuilder().setNode(node);
for (String name : clusterNames) {
if (loadStatsStoreMap.containsKey(name)) {
Map<String, LoadStatsStore> clusterLoadStatsStores = loadStatsStoreMap.get(name);
for (LoadStatsStore statsStore : clusterLoadStatsStores.values()) {
ClusterStats report =
statsStore.generateLoadReport()
.toBuilder()
.setLoadReportInterval(Durations.fromNanos(interval))
.build();
requestBuilder.addClusterStats(report);
}
}
}
LoadStatsRequest request = requestBuilder.build();
lrsRequestWriter.onNext(request);
logger.log(XdsLogLevel.DEBUG, "Sent LoadStatsRequest\n{0}", request);
scheduleNextLoadReport();
}
static SocketOption toSocketOptionLinger(int lingerSeconds) {
final SocketOptionLinger lingerOpt;
if (lingerSeconds >= 0) {
lingerOpt = SocketOptionLinger
.newBuilder()
.setActive(true)
.setDuration(Durations.fromSeconds(lingerSeconds))
.build();
} else {
lingerOpt = SocketOptionLinger.getDefaultInstance();
}
return SocketOption
.newBuilder()
.setName(SO_LINGER)
.setAdditional(Any.pack(lingerOpt))
.build();
}
private static LoadBalanceResponse buildInitialResponse(long loadReportIntervalMillis) {
return LoadBalanceResponse.newBuilder()
.setInitialResponse(
InitialLoadBalanceResponse.newBuilder()
.setClientStatsReportInterval(Durations.fromMillis(loadReportIntervalMillis)))
.build();
}
static SocketOption toSocketOptionTimeout(String name, int timeoutMillis) {
Preconditions.checkNotNull(name);
return SocketOption
.newBuilder()
.setName(name)
.setAdditional(
Any.pack(
SocketOptionTimeout
.newBuilder()
.setDuration(Durations.fromMillis(timeoutMillis))
.build()))
.build();
}
private static Snapshot createSnapshotWithNotWorkingCluster(boolean ads,
String clusterName,
String endpointAddress,
int endpointPort,
String listenerName,
int listenerPort,
String routeName) {
ConfigSource edsSource = ConfigSource.newBuilder()
.setAds(AggregatedConfigSource.getDefaultInstance())
.build();
Cluster cluster = Cluster.newBuilder()
.setName(clusterName)
.setConnectTimeout(Durations.fromSeconds(RandomUtils.nextInt(5)))
// we are enabling HTTP2 - communication with cluster won't work
.setHttp2ProtocolOptions(Http2ProtocolOptions.newBuilder().build())
.setEdsClusterConfig(Cluster.EdsClusterConfig.newBuilder()
.setEdsConfig(edsSource)
.setServiceName(clusterName))
.setType(Cluster.DiscoveryType.EDS)
.build();
ClusterLoadAssignment endpoint = TestResources.createEndpoint(clusterName, endpointAddress, endpointPort);
Listener listener = TestResources.createListener(ads, listenerName, listenerPort, routeName);
RouteConfiguration route = TestResources.createRoute(routeName, clusterName);
// here we have new version of resources other than CDS.
return Snapshot.create(
ImmutableList.of(cluster),
"1",
ImmutableList.of(endpoint),
"2",
ImmutableList.of(listener),
"2",
ImmutableList.of(route),
"2",
ImmutableList.of(),
"2");
}
/**
* Returns a new test cluster using EDS.
*
* @param clusterName name of the new cluster
*/
public static Cluster createCluster(String clusterName) {
ConfigSource edsSource = ConfigSource.newBuilder()
.setAds(AggregatedConfigSource.getDefaultInstance())
.build();
return Cluster.newBuilder()
.setName(clusterName)
.setConnectTimeout(Durations.fromSeconds(5))
.setEdsClusterConfig(EdsClusterConfig.newBuilder()
.setEdsConfig(edsSource)
.setServiceName(clusterName))
.setType(DiscoveryType.EDS)
.build();
}
/**
* Returns a new test cluster not using EDS.
*
* @param clusterName name of the new cluster
* @param address address to use for the cluster endpoint
* @param port port to use for the cluster endpoint
*/
public static Cluster createCluster(String clusterName, String address, int port) {
return Cluster.newBuilder()
.setName(clusterName)
.setConnectTimeout(Durations.fromSeconds(5))
.setType(DiscoveryType.STRICT_DNS)
.addHosts(Address.newBuilder()
.setSocketAddress(SocketAddress.newBuilder()
.setAddress(address)
.setPortValue(port)
.setProtocolValue(Protocol.TCP_VALUE)))
.build();
}
@Override
public void doMerge(JsonParser parser, int unused, Message.Builder messageBuilder)
throws IOException {
Duration.Builder builder = (Duration.Builder) messageBuilder;
try {
builder.mergeFrom(Durations.parse(ParseSupport.parseString(parser)));
} catch (ParseException e) {
throw new InvalidProtocolBufferException(
"Failed to readValue duration: " + parser.getText());
}
}
public Details getServerStatus() {
RuntimeMXBean rb = ManagementFactory.getRuntimeMXBean();
Duration uptime = Durations.fromMillis(rb.getUptime());
if (!leaderActivator.isLeader()) {
return Details.newBuilder()
.setStatus(NOT_SERVING)
.setLeader(false)
.setActive(false)
.setUptime(uptime)
.build();
}
boolean active = leaderActivator.isActivated();
List<ServiceActivation> serviceActivations = activationLifecycle.getServiceActionTimesMs().stream()
.sorted(Comparator.comparing(Pair::getRight))
.map(pair -> ServiceActivation.newBuilder()
.setName(pair.getLeft())
.setActivationTime(Durations.fromMillis(pair.getRight()))
.build())
.collect(Collectors.toList());
Details.Builder details = Details.newBuilder()
.setStatus(SERVING)
.setCell(cellInfoResolver.getCellName())
.setLeader(true)
.setActive(active)
.setUptime(uptime)
.setElectionTimestamp(Timestamps.fromMillis(leaderActivator.getElectionTimestamp()))
.setActivationTimestamp(Timestamps.fromMillis(leaderActivator.getActivationEndTimestamp()))
.addAllServiceActivationTimes(serviceActivations);
if (active) {
details.setActivationTime(Durations.fromMillis(leaderActivator.getActivationTime()));
}
return details.build();
}
@GET
public ServerStatusRepresentation getServerStatus() {
Details details = healthService.getServerStatus();
if (details.getStatus() != ServingStatus.SERVING) {
return new ServerStatusRepresentation(
false,
false,
DateTimeExt.toTimeUnitString(details.getUptime()),
NOT_APPLICABLE,
NOT_APPLICABLE,
NOT_APPLICABLE,
Collections.emptyList(),
Collections.emptyList()
);
}
List<ServerStatusRepresentation.ServiceActivation> sortedByActivationTime = details.getServiceActivationTimesList().stream()
.sorted(Comparator.comparing(ServiceActivation::getActivationTime, Durations.comparator()))
.map(s -> new ServerStatusRepresentation.ServiceActivation(s.getName(), DateTimeExt.toTimeUnitString(s.getActivationTime())))
.collect(Collectors.toList());
List<String> namesSortedByActivationTimestamp = details.getServiceActivationTimesList().stream()
.map(ServiceActivation::getName)
.collect(Collectors.toList());
return new ServerStatusRepresentation(
details.getLeader(),
details.getActive(),
DateTimeExt.toTimeUnitString(details.getUptime()),
Timestamps.toString(details.getElectionTimestamp()),
Timestamps.toString(details.getActivationTimestamp()),
details.getActive() ? DateTimeExt.toTimeUnitString(details.getActivationTime()) : NOT_APPLICABLE,
sortedByActivationTime,
namesSortedByActivationTimestamp
);
}
@Test
public void singleCell() {
reset(connector);
when(connector.getChannels()).thenReturn(Collections.singletonMap(
new Cell("one", "1"), cellOne.getChannel()
));
when(connector.getChannelForCell(any())).thenReturn(Optional.of(cellOne.getChannel()));
HealthCheckResponse one = HealthCheckResponse.newBuilder()
.setStatus(NOT_SERVING)
.addDetails(ServerStatus.newBuilder()
.setDetails(Details.newBuilder()
.setCell("one")
.setLeader(false)
.setActive(true)
.setUptime(Durations.fromMillis(clock.wallTime()))
.build())
.build())
.build();
cellOne.getServiceRegistry().addService(new CellWithHealthStatus(one));
AssertableSubscriber<HealthCheckResponse> subscriber = service.check(HealthCheckRequest.newBuilder().build()).test();
subscriber.awaitTerminalEvent(10, TimeUnit.SECONDS);
subscriber.assertNoErrors();
subscriber.assertValueCount(1);
HealthCheckResponse response = subscriber.getOnNextEvents().get(0);
assertThat(response.getStatus()).isEqualTo(NOT_SERVING);
assertThat(response.getDetailsCount()).isEqualTo(1);
assertThat(response.getDetails(0).hasDetails()).isTrue();
assertThat(response.getDetails(0).getDetails().getCell()).isEqualTo("one");
assertThat(response.getDetails(0).getDetails().getLeader()).isFalse();
}
private HealthCheckResponse ok(String cellName) {
return HealthCheckResponse.newBuilder()
.setStatus(SERVING)
.addDetails(ServerStatus.newBuilder()
.setDetails(Details.newBuilder()
.setCell(cellName)
.setLeader(true)
.setActive(true)
.setUptime(Durations.fromMillis(clock.wallTime()))
.build())
.build())
.build();
}
private HealthCheckResponse failing(String cellName) {
return HealthCheckResponse.newBuilder()
.setStatus(NOT_SERVING)
.addDetails(ServerStatus.newBuilder()
.setDetails(Details.newBuilder()
.setCell(cellName)
.setLeader(true)
.setActive(false)
.setUptime(Durations.fromMillis(clock.wallTime()))
.build())
.build())
.build();
}
private Duration getWaitTime() {
checkNotNull(periodDeadline);
Deadline waitDeadline = expirationDeadline.minimum(periodDeadline);
long waitMicros = waitDeadline.timeRemaining(MICROSECONDS);
if (waitMicros <= 0) {
return Duration.getDefaultInstance();
}
return Durations.fromMicros(waitMicros);
}
AverageTimeCostOfLastPeriod(int period) {
this.period = Durations.fromSeconds(period);
buckets = new OperationStageDurations[NumOfSlots];
for (int i = 0; i < buckets.length; i++) {
buckets[i] = new OperationStageDurations();
}
nextOperation = new OperationStageDurations();
averageTimeCosts = new OperationStageDurations();
}
@Before
public void setUp() throws Exception {
outstandingOperations = new MemoryInstance.OutstandingOperations();
watchers =
synchronizedSetMultimap(
MultimapBuilder.hashKeys().hashSetValues(/* expectedValuesPerKey=*/ 1).build());
watcherService = newDirectExecutorService();
MemoryInstanceConfig memoryInstanceConfig =
MemoryInstanceConfig.newBuilder()
.setListOperationsDefaultPageSize(1024)
.setListOperationsMaxPageSize(16384)
.setTreeDefaultPageSize(1024)
.setTreeMaxPageSize(16384)
.setOperationPollTimeout(Durations.fromSeconds(10))
.setOperationCompletedDelay(Durations.fromSeconds(10))
.setDefaultActionTimeout(Durations.fromSeconds(600))
.setMaximumActionTimeout(MAXIMUM_ACTION_TIMEOUT)
.setActionCacheConfig(
ActionCacheConfig.newBuilder()
.setDelegateCas(DelegateCASConfig.getDefaultInstance())
.build())
.build();
storage = Maps.newHashMap();
workers = Lists.newArrayList();
requeuers = Maps.newHashMap();
operationTimeoutDelays = Maps.newHashMap();
instance =
new MemoryInstance(
"memory",
DIGEST_UTIL,
memoryInstanceConfig,
casMapDecorator(storage),
watchers,
watcherService,
outstandingOperations,
workers,
requeuers,
operationTimeoutDelays);
}
@Test
public void actionWithExcessiveTimeoutFailsValidation()
throws InterruptedException, InvalidProtocolBufferException {
Duration timeout = Durations.fromSeconds(9000);
Digest actionDigestWithExcessiveTimeout = createAction(Action.newBuilder().setTimeout(timeout));
Watcher watcher = mock(Watcher.class);
IllegalStateException timeoutInvalid = null;
instance.execute(
actionDigestWithExcessiveTimeout,
/* skipCacheLookup=*/ true,
ExecutionPolicy.getDefaultInstance(),
ResultsCachePolicy.getDefaultInstance(),
RequestMetadata.getDefaultInstance(),
watcher);
ArgumentCaptor<Operation> watchCaptor = ArgumentCaptor.forClass(Operation.class);
verify(watcher, times(1)).observe(watchCaptor.capture());
Operation watchOperation = watchCaptor.getValue();
assertThat(watchOperation.getResponse().is(ExecuteResponse.class)).isTrue();
Status status = watchOperation.getResponse().unpack(ExecuteResponse.class).getStatus();
assertThat(status.getCode()).isEqualTo(Code.FAILED_PRECONDITION.getNumber());
assertThat(status.getDetailsCount()).isEqualTo(1);
assertThat(status.getDetails(0).is(PreconditionFailure.class)).isTrue();
PreconditionFailure preconditionFailure =
status.getDetails(0).unpack(PreconditionFailure.class);
assertThat(preconditionFailure.getViolationsList())
.contains(
Violation.newBuilder()
.setType(VIOLATION_TYPE_INVALID)
.setSubject(
Durations.toString(timeout)
+ " > "
+ Durations.toString(MAXIMUM_ACTION_TIMEOUT))
.setDescription(TIMEOUT_OUT_OF_BOUNDS)
.build());
}
@Before
public void setUp() throws Exception {
String uniqueServerName = "in-process server for " + getClass();
memoryInstanceConfig =
MemoryInstanceConfig.newBuilder()
.setListOperationsDefaultPageSize(1024)
.setListOperationsMaxPageSize(16384)
.setTreeDefaultPageSize(1024)
.setTreeMaxPageSize(16384)
.setOperationPollTimeout(Durations.fromSeconds(10))
.setOperationCompletedDelay(Durations.fromSeconds(10))
.setCasConfig(
ContentAddressableStorageConfig.newBuilder()
.setMemory(MemoryCASConfig.newBuilder().setMaxSizeBytes(640 * 1024)))
.setActionCacheConfig(
ActionCacheConfig.newBuilder()
.setDelegateCas(DelegateCASConfig.getDefaultInstance())
.build())
.setDefaultActionTimeout(Durations.fromSeconds(600))
.setMaximumActionTimeout(Durations.fromSeconds(3600))
.build();
BuildFarmServerConfig.Builder configBuilder = BuildFarmServerConfig.newBuilder().setPort(0);
configBuilder
.addInstancesBuilder()
.setName(INSTANCE_NAME)
.setDigestFunction(DigestFunction.Value.SHA256)
.setMemoryInstanceConfig(memoryInstanceConfig);
server =
new BuildFarmServer(
"test",
InProcessServerBuilder.forName(uniqueServerName).directExecutor(),
configBuilder.build());
server.start();
inProcessChannel = InProcessChannelBuilder.forName(uniqueServerName).directExecutor().build();
}
private KafkaSubscriberTask(StartRequest request, MetricsHandler handler, int numWorkers) {
super(request, handler, numWorkers);
this.pollLength = Durations.toMillis(request.getKafkaOptions().getPollDuration());
Properties props = new Properties();
props.putAll(
ImmutableMap.of(
"key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer",
"group.id", "SUBSCRIBER_ID",
"enable.auto.commit", "true",
"session.timeout.ms", "30000"));
props.put("bootstrap.servers", request.getKafkaOptions().getBroker());
subscriber = new KafkaConsumer<>(props);
subscriber.subscribe(Collections.singletonList(request.getTopic()));
}