下面列出了com.datastax.driver.core.BoundStatement#setInt ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private ListenableFuture<?> rollupOverallSummaryFromRows(RollupParams rollup,
AggregateQuery query, Iterable<Row> rows) throws Exception {
double totalDurationNanos = 0;
long transactionCount = 0;
for (Row row : rows) {
totalDurationNanos += row.getDouble(0);
transactionCount += row.getLong(1);
}
BoundStatement boundStatement =
getInsertOverallPS(summaryTable, rollup.rollupLevel()).bind();
int i = 0;
boundStatement.setString(i++, rollup.agentRollupId());
boundStatement.setString(i++, query.transactionType());
boundStatement.setTimestamp(i++, new Date(query.to()));
boundStatement.setDouble(i++, totalDurationNanos);
boundStatement.setLong(i++, transactionCount);
boundStatement.setInt(i++, rollup.adjustedTTL().generalTTL());
return session.writeAsync(boundStatement);
}
@Override
public ListenableFuture<Void> save(EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
long partition = toPartitionTs(tsKvEntry.getTs());
DataType type = tsKvEntry.getDataType();
BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind();
stmt.setString(0, entityId.getEntityType().name())
.setUUID(1, entityId.getId())
.setString(2, tsKvEntry.getKey())
.setLong(3, partition)
.setLong(4, tsKvEntry.getTs());
addValue(tsKvEntry, stmt, 5);
if (ttl > 0) {
stmt.setInt(6, (int) ttl);
}
return getFuture(executeAsyncWrite(stmt), rs -> null);
}
@Override
protected Statement setStatementParameters(PreparedStatement updateCommand, EnrichedCustomerService tuple)
throws DriverException
{
final BoundStatement boundStmnt = new BoundStatement(updateCommand);
boundStmnt.setLong(0, ++id);
boundStmnt.setString(1, tuple.imsi);
boundStmnt.setInt(2, tuple.totalDuration);
boundStmnt.setInt(3, tuple.wait);
boundStmnt.setString(4, tuple.zipCode);
boundStmnt.setString(5, tuple.issueType.name());
boundStmnt.setBool(6, tuple.satisfied);
boundStmnt.setString(7, tuple.operatorCode);
boundStmnt.setString(8, tuple.deviceBrand);
boundStmnt.setString(9, tuple.deviceModel);
//or boundStatement.bind();
return boundStmnt;
}
@Override
protected Statement setStatementParameters(PreparedStatement updateCommand, CustomerService tuple)
throws DriverException
{
final BoundStatement boundStmnt = new BoundStatement(updateCommand);
boundStmnt.setLong(0, ++id);
boundStmnt.setString(1, tuple.imsi);
boundStmnt.setInt(2, tuple.totalDuration);
boundStmnt.setInt(3, tuple.wait);
boundStmnt.setString(4, tuple.zipCode);
boundStmnt.setString(5, tuple.issueType.name());
boundStmnt.setBool(6, tuple.satisfied);
//or boundStatement.bind();
return boundStmnt;
}
private static void bindErrorPoint(BoundStatement boundStatement, String agentRollupId,
String agentId, String traceId, Trace.Header header, int adjustedTTL, boolean overall)
throws IOException {
int i = bind(boundStatement, agentRollupId, agentId, traceId, header, overall, false);
boundStatement.setLong(i++, header.getDurationNanos());
boundStatement.setString(i++, header.getError().getMessage());
boundStatement.setString(i++, header.getHeadline());
boundStatement.setString(i++, Strings.emptyToNull(header.getUser()));
List<Trace.Attribute> attributes = header.getAttributeList();
if (attributes.isEmpty()) {
boundStatement.setToNull(i++);
} else {
boundStatement.setBytes(i++, Messages.toByteBuffer(attributes));
}
boundStatement.setInt(i++, adjustedTTL);
}
@Override
public List<SyntheticResultRollup0> readLastFromRollup0(String agentRollupId,
String syntheticMonitorId, int x) throws Exception {
BoundStatement boundStatement = readLastFromRollup0.bind();
int i = 0;
boundStatement.setString(i++, agentRollupId);
boundStatement.setString(i++, syntheticMonitorId);
boundStatement.setInt(i++, x);
ResultSet results = session.read(boundStatement);
List<SyntheticResultRollup0> syntheticResults = new ArrayList<>();
for (Row row : results) {
i = 0;
long captureTime = checkNotNull(row.getTimestamp(i++)).getTime();
double totalDurationNanos = row.getDouble(i++);
ByteBuffer errorIntervalsBytes = row.getBytes(i++);
syntheticResults.add(ImmutableSyntheticResultRollup0.builder()
.captureTime(captureTime)
.totalDurationNanos(totalDurationNanos)
.error(errorIntervalsBytes != null)
.build());
}
return syntheticResults;
}
private Iterator<com.datastax.driver.core.Row> cassandraSelect(Context context, Resource resource,
Timestamp start, Timestamp end) {
List<Future<ResultSet>> futures = Lists.newArrayList();
Duration resourceShard = m_contextConfigurations.getResourceShard(context);
Timestamp lower = start.stepFloor(resourceShard);
Timestamp upper = end.stepFloor(resourceShard);
for (Timestamp partition : new IntervalGenerator(lower, upper, resourceShard)) {
BoundStatement bindStatement = m_selectStatement.bind();
bindStatement.setString(SchemaConstants.F_CONTEXT, context.getId());
bindStatement.setInt(SchemaConstants.F_PARTITION, (int) partition.asSeconds());
bindStatement.setString(SchemaConstants.F_RESOURCE, resource.getId());
bindStatement.setTimestamp("start", start.asDate());
bindStatement.setTimestamp("end", end.asDate());
// Use the context specific consistency level
bindStatement.setConsistencyLevel(m_contextConfigurations.getReadConsistency(context));
futures.add(m_session.executeAsync(bindStatement));
}
return new ConcurrentResultWrapper(futures);
}
List<Future<?>> insert(String agentRollupId, long captureTime, String syntheticMonitorId,
String syntheticMonitorDisplay) throws Exception {
long rollupCaptureTime = CaptureTimes.getRollup(captureTime, DAYS.toMillis(1));
SyntheticMonitorKey rateLimiterKey = ImmutableSyntheticMonitorKey.builder()
.agentRollupId(agentRollupId)
.captureTime(rollupCaptureTime)
.syntheticMonitorId(syntheticMonitorId)
.syntheticMonitorDisplay(syntheticMonitorDisplay)
.build();
if (!rateLimiter.tryAcquire(rateLimiterKey)) {
return ImmutableList.of();
}
BoundStatement boundStatement = insertPS.bind();
int i = 0;
boundStatement.setString(i++, agentRollupId);
boundStatement.setTimestamp(i++, new Date(rollupCaptureTime));
boundStatement.setString(i++, syntheticMonitorId);
boundStatement.setString(i++, syntheticMonitorDisplay);
int maxRollupTTL = configRepository.getCentralStorageConfig().getMaxRollupTTL();
boundStatement.setInt(i++, Common.getAdjustedTTL(maxRollupTTL, rollupCaptureTime, clock));
return ImmutableList.of(session.writeAsync(boundStatement));
}
protected Statement prepareUpsert(ActionDefinition ad, Date ts) {
// note this method does not update lastExecuted
BoundStatement bs = upsert.bind();
bs.setUUID(Column.PLACE_ID.columnName(), ad.getPlaceId());
bs.setInt(Column.ID.columnName(), ad.getSequenceId());
bs.setTimestamp(Column.CREATED.columnName(), ad.getCreated());
bs.setTimestamp(Column.MODIFIED.columnName(), ad.getModified());
bs.setString(Column.NAME.columnName(), ad.getName());
bs.setString(Column.DESCRIPTION.columnName(), ad.getDescription());
bs.setSet(Column.TAGS.columnName(), ad.getTags());
bs.setBytes(ActionColumn.ACTION.columnName(), ByteBuffer.wrap(ad.getAction()));
return bs;
}
private ListenableFuture<?> insertQueries(List<MutableQuery> queries, int rollupLevel,
String agentRollupId, String transactionType, @Nullable String transactionName,
long captureTime, TTL adjustedTTL) throws Exception {
List<ListenableFuture<?>> futures = new ArrayList<>();
for (MutableQuery query : queries) {
BoundStatement boundStatement;
if (transactionName == null) {
boundStatement = getInsertOverallPS(queryTable, rollupLevel).bind();
} else {
boundStatement = getInsertTransactionPS(queryTable, rollupLevel).bind();
}
String fullTextSha1 = query.getFullTextSha1();
int i = 0;
boundStatement.setString(i++, agentRollupId);
boundStatement.setString(i++, transactionType);
if (transactionName != null) {
boundStatement.setString(i++, transactionName);
}
boundStatement.setTimestamp(i++, new Date(captureTime));
boundStatement.setString(i++, query.getType());
boundStatement.setString(i++, query.getTruncatedText());
// full_query_text_sha1 cannot be null since it is used in clustering key
boundStatement.setString(i++, Strings.nullToEmpty(fullTextSha1));
boundStatement.setDouble(i++, query.getTotalDurationNanos());
boundStatement.setLong(i++, query.getExecutionCount());
if (query.hasTotalRows()) {
boundStatement.setLong(i++, query.getTotalRows());
} else {
boundStatement.setToNull(i++);
}
boundStatement.setInt(i++, adjustedTTL.queryTTL());
futures.add(session.writeAsync(boundStatement));
}
return Futures.allAsList(futures);
}
@Override
public void updateVariables(CompositeId<UUID, Integer> id, Map<String, Object> variables, Date modified) {
BoundStatement bs = updateVariables.bind();
bs.setUUID(Column.PLACE_ID.columnName(), id.getPrimaryId());
bs.setInt(Column.ID.columnName(), id.getSecondaryId());
bs.setString(RuleColumn.VARIABLES.columnName(), JSON.toJson(variables));
bs.setTimestamp(Column.MODIFIED.columnName(),modified);
ResultSet rs = session.execute(bs);
if(!rs.wasApplied()) {
throw new IllegalStateException(String.format("Unable to update rule variables. Rule [%s] has been modified since read",id));
}
}
private void updateTraceAttributeNamePartitionKeyPart2() throws Exception {
if (!tableExists("trace_attribute_name_temp")) {
// previously failed mid-upgrade prior to updating schema version
return;
}
dropTableIfExists("trace_attribute_name");
session.createTableWithLCS("create table if not exists trace_attribute_name (agent_rollup"
+ " varchar, transaction_type varchar, trace_attribute_name varchar, primary key"
+ " (agent_rollup, transaction_type, trace_attribute_name))");
PreparedStatement insertPS = session.prepare("insert into trace_attribute_name"
+ " (agent_rollup, transaction_type, trace_attribute_name) values (?, ?, ?) using"
+ " ttl ?");
int ttl = getCentralStorageConfig(session).getTraceTTL();
ResultSet results = session.read("select agent_rollup, transaction_type,"
+ " trace_attribute_name from trace_attribute_name_temp");
Queue<ListenableFuture<?>> futures = new ArrayDeque<>();
for (Row row : results) {
BoundStatement boundStatement = insertPS.bind();
boundStatement.setString(0, row.getString(0));
boundStatement.setString(1, row.getString(1));
boundStatement.setString(2, row.getString(2));
boundStatement.setInt(3, ttl);
futures.add(session.writeAsync(boundStatement));
waitForSome(futures);
}
MoreFutures.waitForAll(futures);
dropTableIfExists("trace_attribute_name_temp");
}
@Override
public ScheduledCommand schedule(
UUID placeId,
Address schedulerAddress,
Date scheduledTime,
OptionalLong validForMs
) {
Date expiresAt = new Date(System.currentTimeMillis() + validForMs.orElse(defaultExpirationTimeMs));
PartitionOffset offset = getPartitionOffsetFor(placeId, scheduledTime);
BoundStatement statement = upsertCommand.bind();
statement.setInt(ScheduledEventTable.Columns.PARTITION_ID, offset.getPartition().getId());
statement.setTimestamp(ScheduledEventTable.Columns.TIME_BUCKET, offset.getOffset());
statement.setTimestamp(ScheduledEventTable.Columns.SCHEDULED_TIME, scheduledTime);
statement.setUUID(ScheduledEventTable.Columns.PLACE_ID, placeId);
statement.setString(ScheduledEventTable.Columns.SCHEDULER, schedulerAddress.getRepresentation());
session.execute(statement);
ScheduledCommand command = new ScheduledCommand();
command.setOffset(offset);
command.setPlaceId(placeId);
command.setScheduledTime(scheduledTime);
command.setSchedulerAddress(schedulerAddress);
command.setExpirationTime(expiresAt);
return command;
}
private void rewriteTraceAttributeNameTablePart2() throws Exception {
if (!tableExists("trace_attribute_name_temp")) {
// previously failed mid-upgrade prior to updating schema version
return;
}
dropTableIfExists("trace_attribute_name");
Map<String, V09AgentRollup> v09AgentRollups = getV09AgentRollupsFromAgentRollupTable();
session.createTableWithLCS("create table if not exists trace_attribute_name (agent_rollup"
+ " varchar, transaction_type varchar, trace_attribute_name varchar, primary key"
+ " ((agent_rollup, transaction_type), trace_attribute_name))");
PreparedStatement insertPS = session.prepare("insert into trace_attribute_name"
+ " (agent_rollup, transaction_type, trace_attribute_name) values (?, ?, ?) using"
+ " ttl ?");
int ttl = getCentralStorageConfig(session).getTraceTTL();
ResultSet results = session.read("select agent_rollup, transaction_type,"
+ " trace_attribute_name from trace_attribute_name_temp");
for (Row row : results) {
String v09AgentRollupId = row.getString(0);
V09AgentRollup v09AgentRollup = v09AgentRollups.get(v09AgentRollupId);
if (v09AgentRollup == null) {
// v09AgentRollupId was manually deleted (via the UI) from the agent_rollup
// table in which case its parent is no longer known and best to ignore
continue;
}
BoundStatement boundStatement = insertPS.bind();
boundStatement.setString(0, v09AgentRollup.agentRollupId());
boundStatement.setString(1, row.getString(1));
boundStatement.setString(2, row.getString(2));
boundStatement.setInt(3, ttl);
session.write(boundStatement);
}
dropTableIfExists("trace_attribute_name_temp");
}
private BoundStatement setDefaultsAndPrepareBoundStatement(UpsertExecutionContext tuple)
{
UpsertExecutionContext.NullHandlingMutationStyle nullHandlingMutationStyle = tuple.getNullHandlingMutationStyle();
if (UpsertExecutionContext.NullHandlingMutationStyle.UNDEFINED == nullHandlingMutationStyle) {
nullHandlingMutationStyle = UpsertExecutionContext.NullHandlingMutationStyle.SET_NULL_COLUMNS;
}
boolean setNulls = true;
if (nullHandlingMutationStyle != UpsertExecutionContext.NullHandlingMutationStyle.SET_NULL_COLUMNS) {
setNulls = false;
}
UpsertExecutionContext.CollectionMutationStyle collectionMutationStyle = tuple.getCollectionMutationStyle();
if ((collectionMutationStyle == null) ||
(collectionMutationStyle == UpsertExecutionContext.CollectionMutationStyle.UNDEFINED) ) {
tuple.setCollectionMutationStyle(UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION);
}
UpsertExecutionContext.ListPlacementStyle listPlacementStyle = tuple.getListPlacementStyle();
if ( (listPlacementStyle == null) || (listPlacementStyle == UpsertExecutionContext.ListPlacementStyle.UNDEFINED) ) {
tuple.setListPlacementStyle(UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST);
}
PreparedStatement preparedStatement = resolvePreparedStatementForCurrentExecutionContext(tuple);
BoundStatement stmnt = processPayloadForExecution(preparedStatement, tuple, setNulls);
if ((tuple.isTtlOverridden()) || (connectionStateManager.isTTLSet())) {
int ttlToUse = connectionStateManager.getDefaultTtlInSecs();
if (tuple.isTtlOverridden()) {
ttlToUse = tuple.getOverridingTTL();
}
stmnt.setInt(CassandraPreparedStatementGenerator.TTL_PARAM_NAME, ttlToUse);
}
if (tuple.isOverridingConsistencyLevelSet()) {
ConsistencyLevel currentConsistencyLevel = tuple.getOverridingConsistencyLevel();
if (currentConsistencyLevel.isSerial()) {
stmnt.setSerialConsistencyLevel(tuple.getOverridingConsistencyLevel());
} else {
stmnt.setConsistencyLevel(tuple.getOverridingConsistencyLevel());
}
}
LOG.debug("Executing statement " + preparedStatement.getQueryString());
return stmnt;
}
private List<Future<?>> storeTransactionNameSummary(String agentRollupId,
String transactionType, String transactionName, long captureTime, Aggregate aggregate,
TTL adjustedTTL) throws Exception {
final int rollupLevel = 0;
List<Future<?>> futures = new ArrayList<>();
BoundStatement boundStatement = getInsertTransactionPS(summaryTable, rollupLevel).bind();
int i = 0;
boundStatement.setString(i++, agentRollupId);
boundStatement.setString(i++, transactionType);
boundStatement.setTimestamp(i++, new Date(captureTime));
boundStatement.setString(i++, transactionName);
boundStatement.setDouble(i++, aggregate.getTotalDurationNanos());
boundStatement.setLong(i++, aggregate.getTransactionCount());
boundStatement.setInt(i++, adjustedTTL.generalTTL());
futures.add(session.writeAsync(boundStatement));
if (aggregate.getErrorCount() > 0) {
boundStatement = getInsertTransactionPS(errorSummaryTable, rollupLevel).bind();
i = 0;
boundStatement.setString(i++, agentRollupId);
boundStatement.setString(i++, transactionType);
boundStatement.setTimestamp(i++, new Date(captureTime));
boundStatement.setString(i++, transactionName);
boundStatement.setLong(i++, aggregate.getErrorCount());
boundStatement.setLong(i++, aggregate.getTransactionCount());
boundStatement.setInt(i++, adjustedTTL.generalTTL());
futures.add(session.writeAsync(boundStatement));
}
return futures;
}
private void splitActiveAgentRollupTables(int rollupLevel) throws Exception {
logger.info("populating active_top_level_rollup_{} and active_child_rollup_{} tables - this"
+ " could take several minutes on large data sets...", rollupLevel);
dropTableIfExists("active_top_level_rollup_" + rollupLevel);
dropTableIfExists("active_child_rollup_" + rollupLevel);
Integer expirationHours =
getCentralStorageConfig(session).rollupExpirationHours().get(rollupLevel);
session.createTableWithTWCS("create table if not exists active_top_level_rollup_"
+ rollupLevel + " (one int, capture_time timestamp, top_level_id varchar, primary"
+ " key (one, capture_time, top_level_id))", expirationHours);
session.createTableWithTWCS("create table if not exists active_child_rollup_" + rollupLevel
+ " (top_level_id varchar, capture_time timestamp, child_agent_id varchar, primary"
+ " key (top_level_id, capture_time, child_agent_id))", expirationHours);
PreparedStatement insertTopLevelPS = session.prepare("insert into active_top_level_rollup_"
+ rollupLevel + " (one, capture_time, top_level_id) values (1, ?, ?) using ttl ?");
PreparedStatement insertChildPS = session.prepare("insert into active_child_rollup_"
+ rollupLevel + " (top_level_id, capture_time, child_agent_id) values (?, ?, ?)"
+ " using ttl ?");
int ttl = Ints.saturatedCast(HOURS.toSeconds(expirationHours));
PreparedStatement readPS = session.prepare("select capture_time, agent_id from"
+ " active_agent_rollup_" + rollupLevel + " where one = 1 and capture_time > ?");
BoundStatement boundStatement = readPS.bind();
boundStatement.setTimestamp(0,
new Date(clock.currentTimeMillis() - HOURS.toMillis(expirationHours)));
ResultSet results = session.read(boundStatement);
Queue<ListenableFuture<?>> futures = new ArrayDeque<>();
for (Row row : results) {
Date captureDate = checkNotNull(row.getTimestamp(0));
String agentId = checkNotNull(row.getString(1));
int index = agentId.indexOf("::");
String topLevelId;
String childAgentId;
if (index == -1) {
topLevelId = agentId;
childAgentId = null;
} else {
topLevelId = agentId.substring(0, index + 2);
childAgentId = agentId.substring(index + 2);
}
int adjustedTTL = Common.getAdjustedTTL(ttl, captureDate.getTime(), clock);
boundStatement = insertTopLevelPS.bind();
boundStatement.setTimestamp(0, captureDate);
boundStatement.setString(1, topLevelId);
boundStatement.setInt(2, adjustedTTL);
futures.add(session.writeAsync(boundStatement));
if (childAgentId != null) {
boundStatement = insertChildPS.bind();
boundStatement.setString(0, topLevelId);
boundStatement.setTimestamp(1, captureDate);
boundStatement.setString(2, childAgentId);
boundStatement.setInt(3, adjustedTTL);
futures.add(session.writeAsync(boundStatement));
}
waitForSome(futures);
}
MoreFutures.waitForAll(futures);
dropTableIfExists("active_agent_rollup_" + rollupLevel);
logger.info("populating active_top_level_rollup_{} and active_child_rollup_{} tables"
+ " - complete", rollupLevel);
}
private void populateTraceTtSlowCountAndPointPartialPart1() throws Exception {
logger.info("populating trace_tt_slow_count_partial and trace_tt_slow_point_partial tables"
+ " - this could take several minutes on large data sets...");
CentralStorageConfig storageConfig = getCentralStorageConfig(session);
dropTableIfExists("trace_tt_slow_count_partial");
dropTableIfExists("trace_tt_slow_point_partial");
session.createTableWithTWCS("create table if not exists trace_tt_slow_count_partial"
+ " (agent_rollup varchar, transaction_type varchar, capture_time timestamp,"
+ " agent_id varchar, trace_id varchar, primary key ((agent_rollup,"
+ " transaction_type), capture_time, agent_id, trace_id))",
storageConfig.traceExpirationHours(), false, true);
session.createTableWithTWCS("create table if not exists trace_tt_slow_point_partial"
+ " (agent_rollup varchar, transaction_type varchar, capture_time timestamp,"
+ " agent_id varchar, trace_id varchar, duration_nanos bigint, error boolean,"
+ " headline varchar, user varchar, attributes blob, primary key ((agent_rollup,"
+ " transaction_type), capture_time, agent_id, trace_id))",
storageConfig.traceExpirationHours(), false, true);
PreparedStatement insertCountPartialPS = session.prepare("insert into"
+ " trace_tt_slow_count_partial (agent_rollup, transaction_type, capture_time,"
+ " agent_id, trace_id) values (?, ?, ?, ?, ?) using ttl ?");
PreparedStatement insertPointPartialPS = session.prepare("insert into"
+ " trace_tt_slow_point_partial (agent_rollup, transaction_type, capture_time,"
+ " agent_id, trace_id, duration_nanos, error, headline, user, attributes) values"
+ " (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) using ttl ?");
int ttl = getCentralStorageConfig(session).getTraceTTL();
ResultSet results = session.read("select agent_rollup, transaction_type, capture_time,"
+ " agent_id, trace_id, duration_nanos, error, headline, user, attributes, partial"
+ " from trace_tt_slow_point");
Queue<ListenableFuture<?>> futures = new ArrayDeque<>();
Stopwatch stopwatch = Stopwatch.createStarted();
int rowCount = 0;
for (Row row : results) {
if (!row.getBool(10)) { // partial
// unfortunately cannot use "where partial = true allow filtering" in the query
// above as that leads to ReadTimeoutException
continue;
}
BoundStatement boundStatement = insertCountPartialPS.bind();
int i = 0;
copyString(row, boundStatement, i++); // agent_rollup
copyString(row, boundStatement, i++); // transaction_type
Date captureDate = checkNotNull(row.getTimestamp(i));
int adjustedTTL = Common.getAdjustedTTL(ttl, captureDate.getTime(), clock);
copyTimestamp(row, boundStatement, i++); // capture_time
copyString(row, boundStatement, i++); // agent_id
copyString(row, boundStatement, i++); // trace_id
boundStatement.setInt(i++, adjustedTTL);
futures.add(session.writeAsync(boundStatement));
boundStatement = insertPointPartialPS.bind();
i = 0;
copyString(row, boundStatement, i++); // agent_rollup
copyString(row, boundStatement, i++); // transaction_type
copyTimestamp(row, boundStatement, i++); // capture_time
copyString(row, boundStatement, i++); // agent_id
copyString(row, boundStatement, i++); // trace_id
copyLong(row, boundStatement, i++); // duration_nanos
copyBool(row, boundStatement, i++); // error
copyString(row, boundStatement, i++); // headline
copyString(row, boundStatement, i++); // user
copyBytes(row, boundStatement, i++); // attributes
boundStatement.setInt(i++, adjustedTTL);
futures.add(session.writeAsync(boundStatement));
rowCount++;
if (stopwatch.elapsed(SECONDS) > 60) {
logger.info("processed {} records", rowCount);
stopwatch.reset().start();
}
waitForSome(futures);
}
MoreFutures.waitForAll(futures);
logger.info("populating trace_tt_slow_count_partial and trace_tt_slow_point_partial tables"
+ " - complete");
}
private void populateTraceTnSlowCountAndPointPartialPart1() throws Exception {
logger.info("populating trace_tn_slow_count_partial and trace_tn_slow_point_partial tables"
+ " - this could take several minutes on large data sets...");
CentralStorageConfig storageConfig = getCentralStorageConfig(session);
dropTableIfExists("trace_tn_slow_count_partial");
dropTableIfExists("trace_tn_slow_point_partial");
session.createTableWithTWCS("create table if not exists trace_tn_slow_count_partial"
+ " (agent_rollup varchar, transaction_type varchar, transaction_name varchar,"
+ " capture_time timestamp, agent_id varchar, trace_id varchar, primary key"
+ " ((agent_rollup, transaction_type, transaction_name), capture_time, agent_id,"
+ " trace_id))", storageConfig.traceExpirationHours(), false, true);
session.createTableWithTWCS("create table if not exists trace_tn_slow_point_partial"
+ " (agent_rollup varchar, transaction_type varchar, transaction_name varchar,"
+ " capture_time timestamp, agent_id varchar, trace_id varchar, duration_nanos"
+ " bigint, error boolean, headline varchar, user varchar, attributes blob, primary"
+ " key ((agent_rollup, transaction_type, transaction_name), capture_time,"
+ " agent_id, trace_id))", storageConfig.traceExpirationHours(), false, true);
PreparedStatement insertCountPartialPS = session.prepare("insert into"
+ " trace_tn_slow_count_partial (agent_rollup, transaction_type, transaction_name,"
+ " capture_time, agent_id, trace_id) values (?, ?, ?, ?, ?, ?) using ttl ?");
PreparedStatement insertPointPartialPS = session.prepare("insert into"
+ " trace_tn_slow_point_partial (agent_rollup, transaction_type, transaction_name,"
+ " capture_time, agent_id, trace_id, duration_nanos, error, headline, user,"
+ " attributes) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) using ttl ?");
int ttl = getCentralStorageConfig(session).getTraceTTL();
ResultSet results = session.read("select agent_rollup, transaction_type,"
+ " transaction_name, capture_time, agent_id, trace_id, duration_nanos, error,"
+ " headline, user, attributes, partial from trace_tn_slow_point");
Queue<ListenableFuture<?>> futures = new ArrayDeque<>();
Stopwatch stopwatch = Stopwatch.createStarted();
int rowCount = 0;
for (Row row : results) {
if (!row.getBool(11)) { // partial
// unfortunately cannot use "where partial = true allow filtering" in the query
// above as that leads to ReadTimeoutException
continue;
}
BoundStatement boundStatement = insertCountPartialPS.bind();
int i = 0;
copyString(row, boundStatement, i++); // agent_rollup
copyString(row, boundStatement, i++); // transaction_type
copyString(row, boundStatement, i++); // transaction_name
Date captureDate = checkNotNull(row.getTimestamp(i));
int adjustedTTL = Common.getAdjustedTTL(ttl, captureDate.getTime(), clock);
copyTimestamp(row, boundStatement, i++); // capture_time
copyString(row, boundStatement, i++); // agent_id
copyString(row, boundStatement, i++); // trace_id
boundStatement.setInt(i++, adjustedTTL);
futures.add(session.writeAsync(boundStatement));
boundStatement = insertPointPartialPS.bind();
i = 0;
copyString(row, boundStatement, i++); // agent_rollup
copyString(row, boundStatement, i++); // transaction_type
copyString(row, boundStatement, i++); // transaction_name
copyTimestamp(row, boundStatement, i++); // capture_time
copyString(row, boundStatement, i++); // agent_id
copyString(row, boundStatement, i++); // trace_id
copyLong(row, boundStatement, i++); // duration_nanos
copyBool(row, boundStatement, i++); // error
copyString(row, boundStatement, i++); // headline
copyString(row, boundStatement, i++); // user
copyBytes(row, boundStatement, i++); // attributes
boundStatement.setInt(i++, adjustedTTL);
futures.add(session.writeAsync(boundStatement));
rowCount++;
if (stopwatch.elapsed(SECONDS) > 60) {
logger.info("processed {} records", rowCount);
stopwatch.reset().start();
}
waitForSome(futures);
}
MoreFutures.waitForAll(futures);
logger.info("populating trace_tn_slow_count_partial and trace_tn_slow_point_partial tables"
+ " - complete");
}
public BoundStatement saveQuery(T value, int ttl, long timestamp) {
BoundStatement boundStatement = insertQueryWithTtlAndTimestamp.bind();
insertSetter.mapTo(value, boundStatement);
boundStatement.setInt(numberOfColumns, ttl);
boundStatement.setLong(numberOfColumns + 1, timestamp);
return boundStatement;
}