下面列出了javax.annotation.ParametersAreNonnullByDefault#com.datastax.driver.core.ResultSet 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public boolean hasEnoughData(long expected) {
ResultSet rs = session.execute("select count(*) from test_data");
if (rs == null) {
return false;
}
List<Row> all = rs.all();
if (all == null || all.size() == 0) {
return false;
}
long count = all.get(0).getLong("count");
return count == expected;
}
public static <T> PagedResults<T> page(Session session, BoundStatement statement, int limit, Function<Row, T> transformer, Function<Row, String> token) {
List<T> results = new ArrayList<>(limit);
statement.setFetchSize(limit + 1);
ResultSet rs = session.execute( statement );
Row row = rs.one();
while(row != null && results.size() < limit) {
try {
T result = transformer.apply(row);
results.add(result);
}
catch(Exception e) {
log.warn("Unable to deserialize row {}", row, e);
}
row = rs.one();
}
if(row == null) {
return PagedResults.newPage(results);
}
else {
return PagedResults.newPage(results, token.apply(row));
}
}
private Response executeSelectQuery(
String keyspaceName,
String tableName,
List<String> ids,
Builder selectBuilder,
String primaryKeyColumnName) {
Response response;
Select selectQuery = selectBuilder.from(keyspaceName, tableName);
Where selectWhere = selectQuery.where();
Clause clause = null;
if (StringUtils.isBlank(primaryKeyColumnName)) {
clause = QueryBuilder.in(JsonKey.ID, ids.toArray(new Object[ids.size()]));
} else {
clause = QueryBuilder.in(primaryKeyColumnName, ids.toArray(new Object[ids.size()]));
}
selectWhere.and(clause);
ResultSet results = connectionManager.getSession(keyspaceName).execute(selectQuery);
response = CassandraUtil.createResponse(results);
return response;
}
@Test
public void handle_PutWithConditionButNoMutationApplied_ShouldThrowProperExecutionException()
throws ExecutionException {
// Arrange
put = preparePutWithClusteringKey();
put.withCondition(new PutIfNotExists());
spy = prepareSpiedInsertStatementHandler();
ResultSet results = mock(ResultSet.class);
Row row = mock(Row.class);
when(results.one()).thenReturn(row);
when(row.getBool(0)).thenReturn(false);
doReturn(results).when(spy).execute(bound, put);
// Act Assert
assertThatThrownBy(
() -> {
spy.handle(put);
})
.isInstanceOf(NoMutationException.class);
}
/**
* Returns a map with column name as key and column date type as value.
*
* The value might be as simple as "Boolean" or more complex like
* - "Set|Boolean"
* - "List|String"
* - "Map|String|Integer"
* these are cases when the data type is a container of primitive data types.
*
* @param tableName
* @return
* @throws DbException
*/
public Map<String, String> getColumnInfo(
String tableName ) throws DbException {
connect();
ResultSet results = session.execute("SELECT * FROM " + this.dbName + "." + tableName + " LIMIT 1");
Map<String, String> columnInfo = new HashMap<String, String>();
for (Definition columnDefinition : results.getColumnDefinitions()) {
DataType dataType = columnDefinition.getType();
String dataTypeName = dataType.getName().name();
if ("Set".equalsIgnoreCase(dataTypeName)) {
dataTypeName = dataTypeName + "|" + dataType.getTypeArguments().get(0);
} else if ("List".equalsIgnoreCase(dataTypeName)) {
dataTypeName = dataTypeName + "|" + dataType.getTypeArguments().get(0);
} else if ("Map".equalsIgnoreCase(dataTypeName)) {
dataTypeName = dataTypeName + "|" + dataType.getTypeArguments().get(0) + "|"
+ dataType.getTypeArguments().get(1);
}
columnInfo.put(columnDefinition.getName(), dataTypeName);
}
return columnInfo;
}
private Iterator<com.datastax.driver.core.Row> cassandraSelect(Context context, Resource resource,
Timestamp start, Timestamp end) {
List<Future<ResultSet>> futures = Lists.newArrayList();
Duration resourceShard = m_contextConfigurations.getResourceShard(context);
Timestamp lower = start.stepFloor(resourceShard);
Timestamp upper = end.stepFloor(resourceShard);
for (Timestamp partition : new IntervalGenerator(lower, upper, resourceShard)) {
BoundStatement bindStatement = m_selectStatement.bind();
bindStatement.setString(SchemaConstants.F_CONTEXT, context.getId());
bindStatement.setInt(SchemaConstants.F_PARTITION, (int) partition.asSeconds());
bindStatement.setString(SchemaConstants.F_RESOURCE, resource.getId());
bindStatement.setTimestamp("start", start.asDate());
bindStatement.setTimestamp("end", end.asDate());
// Use the context specific consistency level
bindStatement.setConsistencyLevel(m_contextConfigurations.getReadConsistency(context));
futures.add(m_session.executeAsync(bindStatement));
}
return new ConcurrentResultWrapper(futures);
}
@Test
public void testCassandraPojoAtLeastOnceSink() throws Exception {
session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test"));
CassandraPojoSink<Pojo> sink = new CassandraPojoSink<>(Pojo.class, builder);
try {
sink.open(new Configuration());
for (int x = 0; x < 20; x++) {
sink.send(new Pojo(UUID.randomUUID().toString(), x, 0));
}
} finally {
sink.close();
}
ResultSet rs = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, "test"));
Assert.assertEquals(20, rs.all().size());
}
@Test
void mimeMessageShouldBeStoredWhenStoringMailPartsHasFailed(CassandraCluster cassandra) throws Exception {
MailImpl mail = MailImpl.builder()
.name("mymail")
.sender("[email protected]")
.addRecipient("[email protected]")
.addRecipient("[email protected]")
.mimeMessage(MimeMessageBuilder
.mimeMessageBuilder()
.setSubject("test")
.setText("this is the content")
.build())
.build();
assertThatThrownBy(() -> cassandraMailRepository.store(mail))
.isInstanceOf(RuntimeException.class)
.hasMessage("Expected failure while storing mail parts");
ResultSet resultSet = cassandra.getConf().execute(select()
.from(BlobTables.DefaultBucketBlobTable.TABLE_NAME));
assertThat(resultSet.all()).hasSize(2);
}
@Test
public void testMapMapWithUDT() throws Exception {
testInSession(new Callback() {
@Override
public void call(Session session) throws Exception {
final DatastaxMapper<DbObjectsWithMapUDT> mapper = DatastaxMapperFactory.newInstance().mapTo(DbObjectsWithMapUDT.class);
ResultSet rs = session.execute("select id, l from dbobjects_mapudt");
final Iterator<DbObjectsWithMapUDT> iterator = mapper.iterator(rs);
DbObjectsWithMapUDT next = iterator.next();
assertEquals(1, next.getId());
DbObjectsWithMapUDT.MyType myType = next.getL().get(2);
assertEquals("t1", myType.str);
assertEquals(12, myType.l);
assertFalse(iterator.hasNext());
}
});
}
private void rewriteOpenIncidentTablePart1() throws Exception {
if (!tableExists("open_incident")) {
// must be upgrading all the way from a glowroot version prior to open_incident
return;
}
dropTableIfExists("open_incident_temp");
session.updateSchemaWithRetry("create table if not exists open_incident_temp (one int,"
+ " agent_rollup_id varchar, condition blob, severity varchar, notification blob,"
+ " open_time timestamp, primary key (one, agent_rollup_id, condition, severity))");
PreparedStatement insertTempPS = session.prepare("insert into open_incident_temp (one,"
+ " agent_rollup_id, condition, severity, notification, open_time) values"
+ " (1, ?, ?, ?, ?, ?)");
ResultSet results = session.read("select agent_rollup_id, condition, severity,"
+ " notification, open_time from open_incident where one = 1");
for (Row row : results) {
BoundStatement boundStatement = insertTempPS.bind();
boundStatement.setString(0, row.getString(0));
boundStatement.setBytes(1, row.getBytes(1));
boundStatement.setString(2, row.getString(2));
boundStatement.setBytes(3, row.getBytes(3));
boundStatement.setTimestamp(4, row.getTimestamp(4));
session.write(boundStatement);
}
}
private void LoadDbToTimeSeriesMap() {
ResultSet rs = session
.execute("select column1 from "+MetaConstants.META_KEY_SPACE+"."+MetaConstants.META_COLUMN_FAMILY+ " where key='com.test.entity.type.paas.timeseries';");
List<Row> rows = rs.all();
for (Row row : rows) {
String key = row.getString(0).split("\\.")[0];
String table = row.getString(0).split("\\.")[1];
List<String> currval = null;
currval = dbToTimeseriesMap.get(key);
if (currval == null) {
currval = new ArrayList<String>();
}
currval.add(table);
dbToTimeseriesMap.put(key, currval);
}
}
@Test(timeout = DEFAULT_TEST_TIMEOUT)
public void testReleaseOnSuccess() throws Exception {
final CassandraSinkBaseConfig config = CassandraSinkBaseConfig.newBuilder()
.setMaxConcurrentRequests(1)
.build();
try (TestCassandraSink testCassandraSink = createOpenedTestCassandraSink(config)) {
Assert.assertEquals(1, testCassandraSink.getAvailablePermits());
Assert.assertEquals(0, testCassandraSink.getAcquiredPermits());
CompletableFuture<ResultSet> completableFuture = new CompletableFuture<>();
testCassandraSink.enqueueCompletableFuture(completableFuture);
testCassandraSink.invoke("N/A");
Assert.assertEquals(0, testCassandraSink.getAvailablePermits());
Assert.assertEquals(1, testCassandraSink.getAcquiredPermits());
completableFuture.complete(null);
Assert.assertEquals(1, testCassandraSink.getAvailablePermits());
Assert.assertEquals(0, testCassandraSink.getAcquiredPermits());
}
}
@Test(timeout = DEFAULT_TEST_TIMEOUT)
public void testReleaseOnFailure() throws Exception {
final CassandraSinkBaseConfig config = CassandraSinkBaseConfig.newBuilder()
.setMaxConcurrentRequests(1)
.build();
final CassandraFailureHandler failureHandler = ignored -> {};
try (TestCassandraSink testCassandraSink = createOpenedTestCassandraSink(config, failureHandler)) {
Assert.assertEquals(1, testCassandraSink.getAvailablePermits());
Assert.assertEquals(0, testCassandraSink.getAcquiredPermits());
CompletableFuture<ResultSet> completableFuture = new CompletableFuture<>();
testCassandraSink.enqueueCompletableFuture(completableFuture);
testCassandraSink.invoke("N/A");
Assert.assertEquals(0, testCassandraSink.getAvailablePermits());
Assert.assertEquals(1, testCassandraSink.getAcquiredPermits());
completableFuture.completeExceptionally(new RuntimeException());
Assert.assertEquals(1, testCassandraSink.getAvailablePermits());
Assert.assertEquals(0, testCassandraSink.getAcquiredPermits());
}
}
@Override
public Stream<ModelEntity> streamDeviceModelByPlaceId(UUID placeId, boolean includeTombstoned) {
if(placeId == null) {
return Stream.empty();
}
try (Context timerContext = streamDeviceModelByPlaceIdTimer.time())
{
Statement associationQuery = findIdsByPlace.bind(placeId);
Function<Row, UUID> entityIdTransform =
row -> row.getUUID("devid");
Function<ResultSet, ModelEntity> entityTransform =
resultSet -> toModel(resultSet.one(), includeTombstoned);
return listByAssociation(associationQuery, entityIdTransform, entityTransform, asyncTimeoutMs).stream();
}
}
private void updateRoles() throws Exception {
PreparedStatement insertPS =
session.prepare("insert into role (name, permissions) values (?, ?)");
ResultSet results = session.read("select name, permissions from role");
for (Row row : results) {
String name = row.getString(0);
Set<String> permissions = row.getSet(1, String.class);
Set<String> upgradedPermissions = upgradePermissions(permissions);
if (upgradedPermissions == null) {
continue;
}
BoundStatement boundStatement = insertPS.bind();
boundStatement.setString(0, name);
boundStatement.setSet(1, upgradedPermissions, String.class);
session.write(boundStatement);
}
}
@Override
protected void verifyResultsDataDiscardingUponRestore(CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY));
ArrayList<Integer> list = new ArrayList<>();
for (int x = 1; x <= 20; x++) {
list.add(x);
}
for (int x = 41; x <= 60; x++) {
list.add(x);
}
for (com.datastax.driver.core.Row s : result) {
list.remove(new Integer(s.getInt("counter")));
}
Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
}
/**
* The actual health check
*
* @return
*/
private boolean check()
{
try
{
if (session != null && session.getLocalCql() != null)
{
ResultSet rs = session.getLocalCql().execute("SELECT release_version FROM system.local");
boolean result = (rs.one() != null);
logger.debug("HealthCheck status: {}", result);
return result;
}
}
catch (NoHostAvailableException nha)
{
logger.trace("NoHostAvailableException in HealthCheck - Cassandra Down");
}
catch (Exception e)
{
logger.error("Failed to reach Cassandra.", e);
}
return false;
}
@Override
protected Session doReadSession(Serializable sessionId) {
UUID id = toUuid(sessionId);
PreparedStatement ps = prepareReadStatement();
BoundStatement bs = new BoundStatement(ps);
bs.bind(id);
ResultSet results = cassandraSession.execute(bs);
for(Row row : results) {
UUID rowId = row.getUUID("id");
if (id.equals(rowId)) {
ByteBuffer buffer = row.getBytes("serialized_value");
if (buffer != null) { //could be null if a tombstone due to TTL removal
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return serializer.deserialize(bytes);
}
}
}
return null;
}
@Test
void mimeMessageShouldBeStoredWhenStoringKeysHasFailed(CassandraCluster cassandra) throws Exception {
MailImpl mail = MailImpl.builder()
.name("mymail")
.sender("[email protected]")
.addRecipient("[email protected]")
.addRecipient("[email protected]")
.mimeMessage(MimeMessageBuilder
.mimeMessageBuilder()
.setSubject("test")
.setText("this is the content")
.build())
.build();
assertThatThrownBy(() -> cassandraMailRepository.store(mail))
.isInstanceOf(RuntimeException.class)
.hasMessage("Expected failure while storing keys");
ResultSet resultSet = cassandra.getConf().execute(select()
.from(BlobTables.DefaultBucketBlobTable.TABLE_NAME));
assertThat(resultSet.all()).hasSize(2);
}
@Test
public void testMapUDT() throws Exception {
testInSession(new Callback() {
@Override
public void call(Session session) throws Exception {
final DatastaxMapper<DbObjectsWithUDT> mapper = DatastaxMapperFactory.newInstance().mapTo(DbObjectsWithUDT.class);
ResultSet rs = session.execute("select id, t from dbobjects_udt");
final Iterator<DbObjectsWithUDT> iterator = mapper.iterator(rs);
DbObjectsWithUDT next = iterator.next();
assertEquals(1, next.getId());
assertEquals("t1", next.getT().str);
assertEquals(12l, next.getT().l);
assertFalse(iterator.hasNext());
}
});
}
private void rewriteV09AgentRollupPart2() throws Exception {
if (!tableExists("v09_agent_rollup_temp")) {
// previously failed mid-upgrade prior to updating schema version
return;
}
dropTableIfExists("v09_agent_rollup");
session.createTableWithLCS("create table if not exists v09_agent_rollup (one int,"
+ " v09_agent_id varchar, v09_agent_rollup_id varchar, primary key (one,"
+ " v09_agent_id))");
PreparedStatement insertPS = session.prepare("insert into v09_agent_rollup (one,"
+ " v09_agent_id, v09_agent_rollup_id) values (1, ?, ?)");
ResultSet results = session.read("select v09_agent_id, v09_agent_rollup_id from"
+ " v09_agent_rollup_temp where one = 1");
for (Row row : results) {
BoundStatement boundStatement = insertPS.bind();
boundStatement.setString(0, row.getString(0));
boundStatement.setString(1, row.getString(1));
session.write(boundStatement);
}
dropTableIfExists("v09_agent_rollup_temp");
}
public @Nullable Profile readMainThreadProfileUsingPS(String agentId, String traceId,
PreparedStatement readPS) throws Exception {
BoundStatement boundStatement = readPS.bind();
boundStatement.setString(0, agentId);
boundStatement.setString(1, traceId);
ResultSet results = session.read(boundStatement);
Row row = results.one();
if (row == null) {
return null;
}
return Profile.parseFrom(checkNotNull(row.getBytes(0)));
}
@Override
public Response updateRecordWithTTL(
String keyspaceName,
String tableName,
Map<String, Object> request,
Map<String, Object> compositeKey,
int ttl) {
long startTime = System.currentTimeMillis();
Session session = connectionManager.getSession(keyspaceName);
Update update = QueryBuilder.update(keyspaceName, tableName);
Assignments assignments = update.with();
Update.Where where = update.where();
request
.entrySet()
.stream()
.forEach(
x -> {
assignments.and(QueryBuilder.set(x.getKey(), x.getValue()));
});
compositeKey
.entrySet()
.stream()
.forEach(
x -> {
where.and(eq(x.getKey(), x.getValue()));
});
update.using(QueryBuilder.ttl(ttl));
ProjectLogger.log(
"CassandraOperationImpl:updateRecordWithTTL: query = " + update.getQueryString(),
LoggerEnum.INFO.name());
ResultSet results = session.execute(update);
Response response = CassandraUtil.createResponse(results);
logQueryElapseTime("updateRecordWithTTL", startTime);
return response;
}
public Map<String, JsonObject> runQuery(String key, String col) {
// TODO Auto-generated method stub
ResultSet rs = session
.execute("select column1, value from "+MetaConstants.META_KEY_SPACE+"."+MetaConstants.META_COLUMN_FAMILY+ " where key='"+key+"' and column1='"+col+"';");
List<Row> rows = rs.all();
Map<String,JsonObject> storageMap = new HashMap<String,JsonObject>();
for (Row row : rows) {
String field = row.getString(0);
JsonObject val = new JsonObject(row.getString(1));
storageMap.put(field, val);
}
return storageMap;
}
@Override
public Stream<Map<UUID,UUID>> streamPlaceAndAccountByPartitionId(int partitionId) {
try(Context ctxt = streamPlaceAndAccountByPartitionIdTimer.time()) {
BoundStatement bs = streamPlaceAndAccountByPartitionId.bind(partitionId);
ResultSet rs = session.execute(bs);
return stream(rs, (row) -> {
return ImmutableMap.of(row.getUUID(BaseEntityColumns.ID), row.getUUID(PlaceEntityColumns.ACCOUNT_ID));
});
}
}
private void rewriteEnvironmentTablePart2() throws Exception {
if (!tableExists("environment_temp")) {
// previously failed mid-upgrade prior to updating schema version
return;
}
dropTableIfExists("environment");
session.createTableWithLCS("create table if not exists environment (agent_id varchar,"
+ " environment blob, primary key (agent_id))");
PreparedStatement insertPS = session
.prepare("insert into environment (agent_id, environment) values (?, ?)");
Map<String, V09AgentRollup> v09AgentRollups = getV09AgentRollupsFromAgentRollupTable();
ResultSet results = session.read("select agent_id, environment from environment_temp");
for (Row row : results) {
String v09AgentRollupId = row.getString(0);
V09AgentRollup v09AgentRollup = v09AgentRollups.get(v09AgentRollupId);
if (v09AgentRollup == null) {
// v09AgentRollupId was manually deleted (via the UI) from the agent_rollup
// table in which case its parent is no longer known and best to ignore
continue;
}
BoundStatement boundStatement = insertPS.bind();
boundStatement.setString(0, v09AgentRollup.agentRollupId());
boundStatement.setBytes(1, row.getBytes(1));
session.write(boundStatement);
}
dropTableIfExists("environment_temp");
}
Set<String> getGaugeNames(String agentRollupId, long from, long to) throws Exception {
long rolledUpFrom = CaptureTimes.getRollup(from, DAYS.toMillis(1));
long rolledUpTo = CaptureTimes.getRollup(to, DAYS.toMillis(1));
BoundStatement boundStatement = readPS.bind();
boundStatement.setString(0, agentRollupId);
boundStatement.setTimestamp(1, new Date(rolledUpFrom));
boundStatement.setTimestamp(2, new Date(rolledUpTo));
ResultSet results = session.read(boundStatement);
Set<String> gaugeNames = new HashSet<>();
for (Row row : results) {
gaugeNames.add(checkNotNull(row.getString(0)));
}
return gaugeNames;
}
@Test
public void testForListPrependAndExplicitNullForSomeColumns() throws Exception
{
User aUser = new User();
String userId = "user" + System.currentTimeMillis();
aUser.setUserid(userId);
FullName fullName = new FullName("first24" + System.currentTimeMillis(), "last" + System.currentTimeMillis());
aUser.setUsername(fullName);
List<Integer> topScores = new ArrayList<>();
topScores.add(1);
topScores.add(2);
aUser.setTopScores(topScores);
UpsertExecutionContext<User> originalEntry = new UpsertExecutionContext<>();
originalEntry.setPayload(aUser);
UpsertExecutionContext<User> subsequentUpdateForTopScores = new UpsertExecutionContext<>();
subsequentUpdateForTopScores.setListPlacementStyle(
UpsertExecutionContext.ListPlacementStyle.PREPEND_TO_EXISTING_LIST);
subsequentUpdateForTopScores.setCollectionMutationStyle(
UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION);
subsequentUpdateForTopScores.setNullHandlingMutationStyle(
UpsertExecutionContext.NullHandlingMutationStyle.SET_NULL_COLUMNS);
User oldUser = new User();
oldUser.setUserid(userId);
List<Integer> topScoresAppended = new ArrayList<>();
topScoresAppended.add(3);
oldUser.setTopScores(topScoresAppended);
subsequentUpdateForTopScores.setPayload(oldUser);
userUpsertOperator.beginWindow(6);
userUpsertOperator.input.process(originalEntry);
userUpsertOperator.input.process(subsequentUpdateForTopScores);
userUpsertOperator.endWindow();
ResultSet results = userUpsertOperator.session.execute(
"SELECT * FROM unittests.users WHERE userid = '" + userId + "'");
List<Row> rows = results.all();
Row userRow = rows.get(0);
FullName name = userRow.get("username", FullName.class);
assertEquals(null, name);
}
/**
* Gets the list of token ranges that a table occupies on a give Cassandra node.
*
* <p>NB: This method is compatible with Cassandra 2.1.5 and greater.
*/
private static List<TokenRange> getTokenRanges(Cluster cluster, String keyspace, String table) {
try (Session session = cluster.newSession()) {
ResultSet resultSet =
session.execute(
"SELECT range_start, range_end, partitions_count, mean_partition_size FROM "
+ "system.size_estimates WHERE keyspace_name = ? AND table_name = ?",
keyspace,
table);
ArrayList<TokenRange> tokenRanges = new ArrayList<>();
for (Row row : resultSet) {
TokenRange tokenRange =
new TokenRange(
row.getLong("partitions_count"),
row.getLong("mean_partition_size"),
new BigInteger(row.getString("range_start")),
new BigInteger(row.getString("range_end")));
tokenRanges.add(tokenRange);
}
// The table may not contain the estimates yet
// or have partitions_count and mean_partition_size fields = 0
// if the data was just inserted and the amount of data in the table was small.
// This is very common situation during tests,
// when we insert a few rows and immediately query them.
// However, for tiny data sets the lack of size estimates is not a problem at all,
// because we don't want to split tiny data anyways.
// Therefore, we're not issuing a warning if the result set was empty
// or mean_partition_size and partitions_count = 0.
return tokenRanges;
}
}
@Override
public boolean hasAuxThreadProfile(String agentRollupId, AggregateQuery query)
throws Exception {
BoundStatement boundStatement = query.transactionName() == null
? existsAuxThreadProfileOverallPS.get(query.rollupLevel()).bind()
: existsAuxThreadProfileTransactionPS.get(query.rollupLevel()).bind();
bindQuery(boundStatement, agentRollupId, query);
ResultSet results = session.read(boundStatement);
return results.one() != null;
}