下面列出了com.codahale.metrics.MetricRegistry#counter ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void verifyDefaultProviderChainIsUsed() throws Exception {
System.setProperty("aws.accessKeyId", "fake");
System.setProperty("aws.secretKey", "fake");
try {
CloudWatchReporterFactory factory = new CloudWatchReporterFactory();
MetricRegistry registry = new MetricRegistry();
Counter counter = registry.counter(MetricRegistry.name(this.getClass(), "test machine=123*"));
counter.inc();
factory.build(registry).report();
// expecting a 403
} finally {
System.clearProperty("aws.accessKeyId");
System.clearProperty("aws.secretKey");
}
}
HelixParticipantMetrics(MetricRegistry metricRegistry, String zkConnectStr,
Map<String, ReplicaState> localPartitionAndState) {
String zkSuffix = zkConnectStr == null ? "" : "-" + zkConnectStr;
this.localPartitionAndState = localPartitionAndState;
EnumSet.complementOf(EnumSet.of(ReplicaState.DROPPED)).forEach(state -> replicaCountByState.put(state, 0));
Gauge<Integer> bootstrapPartitionCount = () -> getReplicaCountInState(ReplicaState.BOOTSTRAP);
metricRegistry.register(MetricRegistry.name(HelixParticipant.class, "bootstrapPartitionCount" + zkSuffix),
bootstrapPartitionCount);
Gauge<Integer> standbyPartitionCount = () -> getReplicaCountInState(ReplicaState.STANDBY);
metricRegistry.register(MetricRegistry.name(HelixParticipant.class, "standbyPartitionCount" + zkSuffix),
standbyPartitionCount);
Gauge<Integer> leaderPartitionCount = () -> getReplicaCountInState(ReplicaState.LEADER);
metricRegistry.register(MetricRegistry.name(HelixParticipant.class, "leaderPartitionCount" + zkSuffix),
leaderPartitionCount);
Gauge<Integer> inactivePartitionCount = () -> getReplicaCountInState(ReplicaState.INACTIVE);
metricRegistry.register(MetricRegistry.name(HelixParticipant.class, "inactivePartitionCount" + zkSuffix),
inactivePartitionCount);
Gauge<Integer> offlinePartitionCount = () -> getReplicaCountInState(ReplicaState.OFFLINE);
metricRegistry.register(MetricRegistry.name(HelixParticipant.class, "offlinePartitionCount" + zkSuffix),
offlinePartitionCount);
Gauge<Integer> errorStatePartitionCount = () -> getReplicaCountInState(ReplicaState.ERROR);
metricRegistry.register(MetricRegistry.name(HelixParticipant.class, "errorStatePartitionCount" + zkSuffix),
errorStatePartitionCount);
partitionDroppedCount =
metricRegistry.counter(MetricRegistry.name(HelixParticipant.class, "partitionDroppedCount" + zkSuffix));
}
/**
* Loads metrics from the file at the directory path
*
* @param metricsRegistry
* @param directoryPath
* @return
* @throws IOException
*/
private MetricRegistry loadMetrics(MetricRegistry metricsRegistry, String directoryPath) {
File metricsFile = new File(directoryPath + "/metrics/metrics_counters.data");
if (metricsFile.exists()) {
try {
BufferedReader reader = new BufferedReader(new FileReader(metricsFile));
String line = null;
while ((line = reader.readLine()) != null) {
String[] input = line.split(":");
Counter counter = metricsRegistry.counter(input[0]);
counter.inc(Integer.parseInt(input[1]));
}
reader.close();
} catch (Exception e) {
logger.error("Unable to deserialize counters : " + e.getMessage());
}
}
return metricsRegistry;
}
/**
* Metric names will be in the following format:
* {@code com.github.ambry.frontend.ContainerMetrics.{accountName}___{containerName}___{operationType}{metric}}
* For example:
* {@code com.github.ambry.frontend.ContainerMetrics.account-a___container-b___GetBlobSuccessCount}
* @param accountName the account name to use for naming metrics.
* @param containerName the container name to use for naming metrics.
* @param operationType the operation type to use for naming metrics.
* @param metricRegistry the {@link MetricRegistry}.
*/
ContainerMetrics(String accountName, String containerName, String operationType, MetricRegistry metricRegistry) {
String metricPrefix = accountName + SEPARATOR + containerName + SEPARATOR + operationType;
roundTripTimeInMs =
metricRegistry.histogram(MetricRegistry.name(ContainerMetrics.class, metricPrefix + "RoundTripTimeInMs"));
// counts by status code type
successCount = metricRegistry.counter(MetricRegistry.name(ContainerMetrics.class, metricPrefix + "SuccessCount"));
redirectionCount =
metricRegistry.counter(MetricRegistry.name(ContainerMetrics.class, metricPrefix + "RedirectionCount"));
clientErrorCount =
metricRegistry.counter(MetricRegistry.name(ContainerMetrics.class, metricPrefix + "ClientErrorCount"));
serverErrorCount =
metricRegistry.counter(MetricRegistry.name(ContainerMetrics.class, metricPrefix + "ServerErrorCount"));
// counts for individual status codes
badRequestCount =
metricRegistry.counter(MetricRegistry.name(ContainerMetrics.class, metricPrefix + "BadRequestCount"));
unauthorizedCount =
metricRegistry.counter(MetricRegistry.name(ContainerMetrics.class, metricPrefix + "UnauthorizedCount"));
forbiddenCount =
metricRegistry.counter(MetricRegistry.name(ContainerMetrics.class, metricPrefix + "ForbiddenCount"));
notFoundCount = metricRegistry.counter(MetricRegistry.name(ContainerMetrics.class, metricPrefix + "NotFoundCount"));
goneCount = metricRegistry.counter(MetricRegistry.name(ContainerMetrics.class, metricPrefix + "GoneCount"));
}
@Override
public Object getMetrics() {
// TODO Auto-generated method stub
MetricRegistry metricRegistry = new MetricRegistry();
Counter counter = metricRegistry.counter("batchInputRecords");
counter.inc(100);
return metricRegistry;
}
public FileImport(SampleRepository repository, MetricRegistry metrics, Path path)
throws FileNotFoundException, IOException {
m_repository = repository;
m_numRows = metrics.counter("num-rows");
m_numSamples = metrics.counter("num-samples");
m_writeTimer = metrics.timer("writes");
InputStream gzipStream = new GZIPInputStream(new FileInputStream(path.toString()));
m_reader = new BufferedReader(new InputStreamReader(gzipStream, "US-ASCII"));
m_lineParser = new LineParser();
}
@VisibleForTesting
public DefaultDataStore(DatabusEventWriterRegistry eventWriterRegistry,TableDAO tableDao,
DataReaderDAO dataReaderDao, DataWriterDAO dataWriterDao,
SlowQueryLog slowQueryLog, ExecutorService compactionExecutor, HistoryStore historyStore,
Optional<URI> stashRootDirectory, CompactionControlSource compactionControlSource,
Condition stashBlackListTableCondition, AuditWriter auditWriter,
MapStore<DataStoreMinSplitSize> minSplitSizeMap, MetricRegistry metricRegistry, Clock clock) {
_eventWriterRegistry = checkNotNull(eventWriterRegistry, "eventWriterRegistry");
_tableDao = checkNotNull(tableDao, "tableDao");
_dataReaderDao = checkNotNull(dataReaderDao, "dataReaderDao");
_dataWriterDao = checkNotNull(dataWriterDao, "dataWriterDao");
_slowQueryLog = checkNotNull(slowQueryLog, "slowQueryLog");
_compactionExecutor = checkNotNull(compactionExecutor, "compactionExecutor");
_historyStore = checkNotNull(historyStore, "historyStore");
_stashRootDirectory = checkNotNull(stashRootDirectory, "stashRootDirectory");
_stashBlackListTableCondition = checkNotNull(stashBlackListTableCondition, "stashBlackListTableCondition");
_auditWriter = checkNotNull(auditWriter, "auditWriter");
_resolveAnnotatedEventTimer = metricRegistry.timer(getMetricName("resolve_event"));
_archiveDeltaSize = metricRegistry.counter(MetricRegistry.name("bv.emodb.sor", "DefaultCompactor", "archivedDeltaSize"));
_discardedCompactions = metricRegistry.meter(MetricRegistry.name("bv.emodb.sor", "DefaultDataStore", "discarded_compactions"));
_compactor = new DistributedCompactor(_archiveDeltaSize, _historyStore.isDeltaHistoryEnabled(), metricRegistry);
_compactionControlSource = checkNotNull(compactionControlSource, "compactionControlSource");
_minSplitSizeMap = checkNotNull(minSplitSizeMap, "minSplitSizeMap");
_clock = checkNotNull(clock, "clock");
}
@Inject
public WriteCloseableDataStore(@ManagedDataStoreDelegate DataStore delegate,
@ManagedTableBackingStoreDelegate TableBackingStore tableBackingStore,
MetricRegistry metricRegistry) {
_delegate = requireNonNull(delegate);
_tableBackingStore = requireNonNull(tableBackingStore);
_writesAccepted = true;
_writerPhaser = new Phaser(1);
_writesRejectedCounter = metricRegistry.counter(MetricRegistry.name("bv.emodb.sor", "WriteCloseableDataStore",
"writesRejected"));
}
public SupervisorMetricsImpl(final String group,
final MetricRegistry registry) {
final String prefix = MetricRegistry.name(group, TYPE) + ".";
containerStartedCounter = registry.counter(prefix + "container_started_counter");
containersExitedCounter = registry.counter(prefix + "containers_exited_counter");
containersRunningCounter = registry.counter(prefix + "containers_running_counter");
containersThrewExceptionCounter = registry.counter(
prefix + "containers_threw_exception_counter");
imageCacheHitCounter = registry.counter(prefix + "image_cache_hit_counter");
supervisorClosedCounter = registry.counter(prefix + "supervisor_closed_counter");
supervisorStartedCounter = registry.counter(prefix + "supervisors_created_counter");
supervisorStoppedCounter = registry.counter(prefix + "supervisor_stopped_counter");
supervisorRunCounter = registry.counter(prefix + "supervisor_run_counter");
dockerTimeoutCounter = registry.counter(prefix + "docker_timeout_counter");
containerStartedMeter = registry.meter(prefix + "container_started_meter");
containersExitedMeter = registry.meter(prefix + "containers_exited_meter");
containersRunningMeter = registry.meter(prefix + "containers_running_meter");
containersThrewExceptionMeter = registry.meter(prefix + "containers_threw_exception_meter");
imageCacheHitMeter = registry.meter(prefix + "image_cache_hit_meter");
supervisorClosedMeter = registry.meter(prefix + "supervisor_closed_meter");
supervisorStartedMeter = registry.meter(prefix + "supervisors_created_meter");
supervisorStoppedMeter = registry.meter(prefix + "supervisor_stopped_meter");
supervisorRunMeter = registry.meter(prefix + "supervisor_run_meter");
dockerTimeoutMeter = registry.meter(prefix + "docker_timeout_meter");
imagePull = new RequestMetrics(group, TYPE, "image_pull", registry);
}
/**
* Creates an instance of RestRequestMetrics for {@code requestType} and attaches all the metrics related to the
* request to the given {@code ownerClass}. The metrics are also registered in the provided {@code metricRegistry}.
* @param ownerClass the {@link Class} that is supposed to own the metrics created by this tracker.
* @param requestType the type of request for which a tracker is being created.
* @param metricRegistry the {@link MetricRegistry} to use to register the created metrics.
*/
public RestRequestMetrics(Class ownerClass, String requestType, MetricRegistry metricRegistry) {
if (ownerClass == null || requestType == null || metricRegistry == null) {
throw new IllegalArgumentException(
"Null arg(s) during instantiation. Owner class - [" + ownerClass + "]. Request type - [" + requestType
+ "]. Metric registry - [" + metricRegistry + "]");
}
nioRequestProcessingTimeInMs =
metricRegistry.histogram(MetricRegistry.name(ownerClass, requestType + NIO_REQUEST_PROCESSING_TIME_SUFFIX));
nioResponseProcessingTimeInMs =
metricRegistry.histogram(MetricRegistry.name(ownerClass, requestType + NIO_RESPONSE_PROCESSING_TIME_SUFFIX));
nioRoundTripTimeInMs =
metricRegistry.histogram(MetricRegistry.name(ownerClass, requestType + NIO_ROUND_TRIP_TIME_SUFFIX));
nioTimeToFirstByteInMs =
metricRegistry.histogram(MetricRegistry.name(ownerClass, requestType + NIO_TIME_TO_FIRST_BYTE_SUFFIX));
scRequestProcessingTimeInMs =
metricRegistry.histogram(MetricRegistry.name(ownerClass, requestType + SC_REQUEST_PROCESSING_TIME_SUFFIX));
scRequestProcessingWaitTimeInMs =
metricRegistry.histogram(MetricRegistry.name(ownerClass, requestType + SC_REQUEST_PROCESSING_WAIT_TIME_SUFFIX));
scResponseProcessingTimeInMs =
metricRegistry.histogram(MetricRegistry.name(ownerClass, requestType + SC_RESPONSE_PROCESSING_TIME_SUFFIX));
scResponseProcessingWaitTimeInMs = metricRegistry.histogram(
MetricRegistry.name(ownerClass, requestType + SC_RESPONSE_PROCESSING_WAIT_TIME_SUFFIX));
scRoundTripTimeInMs =
metricRegistry.histogram(MetricRegistry.name(ownerClass, requestType + SC_ROUND_TRIP_TIME_SUFFIX));
operationRate = metricRegistry.meter(MetricRegistry.name(ownerClass, requestType + OPERATION_RATE_SUFFIX));
operationError = metricRegistry.counter(MetricRegistry.name(ownerClass, requestType + OPERATION_ERROR_SUFFIX));
operationCount = metricRegistry.counter(MetricRegistry.name(ownerClass, requestType + OPERATION_COUNT_SUFFIX));
unsatisfiedRequestCount =
metricRegistry.counter(MetricRegistry.name(ownerClass, requestType + UNSATISFIED_REQUEST_COUNT_SUFFIX));
satisfiedRequestCount =
metricRegistry.counter(MetricRegistry.name(ownerClass, requestType + SATISFIED_REQUEST_COUNT_SUFFIX));
}
@Test
public void testRemoveMissing() {
MetricRegistry metricRegistry = new MetricRegistry();
MetricMetadata metadata = new MetricMetadataImpl();
Counter counter = metricRegistry.counter("counter");
assertFalse(metadata.removeMetric(counter, metricRegistry));
}
@Before
public void before() {
registry = new MetricRegistry();
registry.counter("one:path");
registry.timer("two:path");
emptyRegistry = new MetricRegistry();
}
public InstrumentedConnectionFactoryWrapper(ConnectionFactory wrappedConnectionFactory, MetricRegistry metricRegistry, String bindHost, Integer port) {
_wrappedConnectionFactory = wrappedConnectionFactory;
final String counterName = name(HttpConnectionFactory.class,
bindHost,
Integer.toString(port),
"activeConnections");
_activeConnectionCounter = metricRegistry.counter(counterName);
}
public MetricsHolder(final MetricRegistry metricRegistry) {
this.metricRegistry = metricRegistry;
incomingMessageCounter = metricRegistry.counter(INCOMING_MESSAGE_COUNT.name());
outgoingMessageCounter = metricRegistry.counter(OUTGOING_MESSAGE_COUNT.name());
incomingConnectCounter = metricRegistry.counter(INCOMING_CONNECT_COUNT.name());
incomingPublishCounter = metricRegistry.counter(INCOMING_PUBLISH_COUNT.name());
outgoingPublishCounter = metricRegistry.counter(OUTGOING_PUBLISH_COUNT.name());
droppedMessageCounter = metricRegistry.counter(DROPPED_MESSAGE_COUNT.name());
closedConnectionsCounter = metricRegistry.counter(CONNECTIONS_CLOSED_COUNT.name());
subscriptionCounter = metricRegistry.counter(SUBSCRIPTIONS_CURRENT.name());
}
public ConnectionLimiter(MetricRegistry metrics, ServerLimits limits) {
this.maxConnections = limits.maxConnections();
this.numConnections = new AtomicInteger(0);
this.connections = metrics.counter(name("Active Connections"));
}
@Autowired
public DetectorMappingRepositoryImpl(MetricRegistry metricRegistry) {
this.delayTimer = metricRegistry.timer("es-lookup.time-delay");
this.exceptionCount = metricRegistry.counter("es-lookup.exception");
}
public KafkaFactory(final KafkaLocationManager kafkaLocationManager, final MetricRegistry metricRegistry) {
this.kafkaLocationManager = kafkaLocationManager;
this.useCountMetric = metricRegistry.counter("kafka.producer.use_count");
this.producerTerminations = metricRegistry.counter("kafka.producer.termination_count");
}
/** Simulate compaction occurring between Record.passOneIterator and Record.passTwoIterator.
* Note that this test is for legacy Compactor that restarts itself in case of race conditions between two
* compactor threads.
*/
@Test
public void testRestart()
throws Exception {
final Key key = mock(Key.class);
UUID t1 = TimeUUIDs.newUUID(); // First write
UUID t2 = TimeUUIDs.newUUID(); // Second write
UUID t3 = TimeUUIDs.newUUID(); // Third write
UUID t4 = TimeUUIDs.newUUID(); // First compaction compacts t1 and t2
UUID t5 = TimeUUIDs.newUUID(); // Second compaction compacts t1, t2, t3
UUID t6 = TimeUUIDs.newUUID(); // Fourth write
// Wait 1 ms so that now is guaranteed to be after the last UUID created above
SystemClock.tick();
// Compaction records after the first compaction
Compaction compaction1 = new Compaction(2, t1, t2, "0123456789abcdef", t2, t2);
final List<Map.Entry<DeltaClusteringKey, Compaction>> compactions1 = ImmutableList.of(
Maps.immutableEntry(new DeltaClusteringKey(t4, 1), compaction1));
// Compaction and delta records after the second compaction
Delta delta2 = Deltas.literal(ImmutableMap.of("key", "value"));
Delta delta3 = Deltas.mapBuilder().put("key2", "change").build();
Compaction compaction2 = new Compaction(2, t1, t3, "abcdef0123456789", t3, t3);
final List<Map.Entry<DeltaClusteringKey, Compaction>> compactions2 = ImmutableList.of(
Maps.immutableEntry(new DeltaClusteringKey(t5, 1), compaction2));
final List<Map.Entry<DeltaClusteringKey, Change>> deltas2 = ImmutableList.of(
Maps.immutableEntry(new DeltaClusteringKey(t3, 1), ChangeBuilder.just(t3, delta2)),
Maps.immutableEntry(new DeltaClusteringKey(t5, 1), ChangeBuilder.just(t5, compaction2)),
Maps.immutableEntry(new DeltaClusteringKey(t6, 1), ChangeBuilder.just(t6, delta3)));
// First try will delegate to legacy compactor and fail because compaction record 1 is not present in the 2nd sequence of deltas
Record record1 = mock(Record.class);
when(record1.getKey()).thenReturn(key);
when(record1.passOneIterator()).thenReturn(compactions1.iterator());
when(record1.passTwoIterator()).thenReturn(deltas2.iterator());
// Second try should succeed - while also delegating back to legacy compactor.
Record record2 = mock(Record.class);
when(record2.getKey()).thenReturn(key);
when(record2.passOneIterator()).thenReturn(compactions2.iterator());
when(record2.passTwoIterator()).thenReturn(deltas2.iterator());
//noinspection unchecked
Supplier<Record> requeryFn = mock(Supplier.class);
when(requeryFn.get()).thenReturn(record2);
long now = System.currentTimeMillis();
MetricRegistry metricRegistry = new MetricRegistry();
Counter archiveDeltaSize = metricRegistry.counter(MetricRegistry.name("bv.emodb.sor", "DistributedCompactor", "archivedDeltaSize"));
Expanded expanded =
new DistributedCompactor(archiveDeltaSize, true, metricRegistry)
.expand(record1, now, now, now, MutableIntrinsics.create(key), false, requeryFn);
// Methods that return iterators may not be called more than once.
verify(record1, times(1)).passOneIterator();
verify(record1, times(1)).passTwoIterator();
verify(requeryFn).get();
verify(record2, times(1)).passOneIterator();
verify(record2, times(1)).passTwoIterator();
verifyNoMoreInteractions(record1, record2, requeryFn);
// Verify that expansion resolved to the expected results.
Map<String, String> expectedContent = ImmutableMap.of("key", "value", "key2", "change");
assertEquals(expanded.getResolved().getContent(), expectedContent);
assertEquals(expanded.getResolved().getIntrinsics().getVersion(), 4); // 4 writes: t1, t2, t3, t6
assertEquals(expanded.getResolved().getIntrinsics().getFirstUpdateAtUuid(), t1);
assertEquals(expanded.getResolved().getIntrinsics().getLastUpdateAtUuid(), t6);
assertEquals(expanded.getNumDeletedDeltas(), 2);
assertEquals(expanded.getNumPersistentDeltas(), 2);
assertTrue(expanded.getPendingCompaction() != null);
assertEquals(expanded.getPendingCompaction().getChangeId(), t6);
assertEquals(expanded.getPendingCompaction().getDelta(), Deltas.literal(expectedContent));
assertEquals(expanded.getPendingCompaction().getCompaction().getCount(), 3/*add 1 for cutoff delta*/+1);
assertEquals(expanded.getPendingCompaction().getCompaction().getFirst(), t1);
assertEquals(expanded.getPendingCompaction().getCompaction().getCutoff(), t6);
}
@Test
public void testDisableDeltaHistory() {
final Key key = mock(Key.class);
UUID t1 = TimeUUIDs.newUUID(); // First write
UUID t2 = TimeUUIDs.newUUID(); // Second write
UUID t3 = TimeUUIDs.newUUID(); // Third write
// Wait 1 ms so that now is guaranteed to be after the last UUID created above
SystemClock.tick();
Delta delta2 = Deltas.literal(ImmutableMap.of("key", "value"));
Delta delta3 = Deltas.mapBuilder().put("key2", "change").build();
final List<Map.Entry<DeltaClusteringKey, Compaction>> compactions = Lists.newArrayList();
final List<Map.Entry<DeltaClusteringKey, Change>> deltas2 = ImmutableList.of(
Maps.immutableEntry(new DeltaClusteringKey(t1, 1), ChangeBuilder.just(t1, delta2)),
Maps.immutableEntry(new DeltaClusteringKey(t2, 1), ChangeBuilder.just(t2, delta2)),
Maps.immutableEntry(new DeltaClusteringKey(t3, 1), ChangeBuilder.just(t3, delta3)));
Record record = mock(Record.class);
when(record.getKey()).thenReturn(key);
when(record.passOneIterator()).thenReturn(compactions.iterator());
when(record.passTwoIterator()).thenReturn(deltas2.iterator());
//noinspection unchecked
Supplier<Record> requeryFn = mock(Supplier.class);
when(requeryFn.get()).thenReturn(record);
long now = System.currentTimeMillis();
MetricRegistry metricRegistry = new MetricRegistry();
Counter archiveDeltaSize = metricRegistry.counter(MetricRegistry.name("bv.emodb.sor", "DefaultCompactor", "archivedDeltaSize"));
boolean isDeltaHistoryEnabled = false;
Expanded expanded = new DistributedCompactor(archiveDeltaSize, isDeltaHistoryEnabled, metricRegistry)
.expand(record, now, now, now, MutableIntrinsics.create(key), false, requeryFn);
// Verify that expansion produces a compaction with no delta archives
Map<String, String> expectedContent = ImmutableMap.of("key", "value", "key2", "change");
assertEquals(expanded.getResolved().getContent(), expectedContent);
assertTrue(expanded.getPendingCompaction() != null);
assertTrue(expanded.getPendingCompaction().getDeltasToArchive().isEmpty(), "Delta history is disabled");
// Now verify we do get delta history if enabled
Record record2 = mock(Record.class);
when(record2.getKey()).thenReturn(key);
when(record2.passOneIterator()).thenReturn(compactions.iterator());
when(record2.passTwoIterator()).thenReturn(deltas2.iterator());
isDeltaHistoryEnabled = true;
expanded = new DistributedCompactor(archiveDeltaSize, isDeltaHistoryEnabled, metricRegistry)
.expand(record2, now, now, now, MutableIntrinsics.create(key), false, requeryFn);
expectedContent = ImmutableMap.of("key", "value", "key2", "change");
assertEquals(expanded.getResolved().getContent(), expectedContent);
assertTrue(expanded.getPendingCompaction() != null);
assertEquals(expanded.getPendingCompaction().getDeltasToArchive().size(), 3, "Archive 3 deltas");
}
@Test
public void testJsonForLegacyCompactions() {
// Test that legacy compactions work fine with new tagging
// No lastTags attribute
String legacyCompactionJson = "{\"count\":1," +
"\"first\":\"9e278d70-1e09-11e6-a07b-26a39ee5ccb6\"," +
"\"cutoff\":\"9e278d70-1e09-11e6-a07b-26a39ee5ccb6\"," +
"\"cutoffSignature\":\"c7fb73f63ce47ec422bfccf2aaa37503\"," +
"\"lastMutation\":\"9e278d70-1e09-11e6-a07b-26a39ee5ccb6\"," +
"\"compactedDelta\":\"{\\\"name\\\":\\\"Bob\\\"}\"}";
Compaction legacyCompaction = JsonHelper.fromJson(legacyCompactionJson, Compaction.class);
assertTrue(legacyCompaction.getLastTags().isEmpty(), "Legacy compaction should have empty last tags");
UUID uuid0 = TimeUUIDs.newUUID();
final List<Map.Entry<DeltaClusteringKey, Compaction>> compactions = ImmutableList.of(
Maps.immutableEntry(new DeltaClusteringKey(uuid0, 1), legacyCompaction));
UUID uuid1 = TimeUUIDs.newUUID();
final List<Map.Entry<DeltaClusteringKey, Change>> deltas = ImmutableList.of(
Maps.immutableEntry(new DeltaClusteringKey(uuid0, 1), ChangeBuilder.just(uuid0, legacyCompaction)),
Maps.immutableEntry(new DeltaClusteringKey(uuid1, 1), ChangeBuilder.just(uuid1, Deltas.fromString("{..,\"name\":\"Bob\"}")))); // uuid1 is redundant
final List<Map.Entry<DeltaClusteringKey, Change>> deltas2 = ImmutableList.of(
Maps.immutableEntry(new DeltaClusteringKey(uuid0, 1),ChangeBuilder.just(uuid0, legacyCompaction)),
Maps.immutableEntry(new DeltaClusteringKey(uuid1, 1), ChangeBuilder.just(uuid1,
Deltas.fromString("{..,\"name\":\"Bob\", \"~tags\":[\"tag0\"]}"), ImmutableSet.of("tag0")))); // uuid1 is different
Key key = mock(Key.class);
Record record = mock(Record.class);
when(record.getKey()).thenReturn(key);
when(record.passOneIterator()).thenReturn(compactions.iterator()).thenReturn(compactions.iterator());
when(record.passTwoIterator()).thenReturn(deltas.iterator()).thenReturn(deltas2.iterator());
long now = System.currentTimeMillis();
MetricRegistry metricRegistry = new MetricRegistry();
Counter archiveDeltaSize = metricRegistry.counter(MetricRegistry.name("bv.emodb.sor", "DistributedCompactor", "archivedDeltaSize"));
Expanded expanded =
new DistributedCompactor(archiveDeltaSize, true, metricRegistry)
.expand(record, now, now, now, MutableIntrinsics.create(key), false, mock(Supplier.class));
assertTrue(expanded.getResolved().isChangeDeltaRedundant(uuid1), "Legacy compaction issue");
expanded = new DistributedCompactor(archiveDeltaSize, true, metricRegistry)
.expand(record, now, now, now, MutableIntrinsics.create(key), false, mock(Supplier.class));
assertFalse(expanded.getResolved().isChangeDeltaRedundant(uuid1), "Legacy compaction issue");
}