下面列出了org.junit.jupiter.api.extension.ExtensionConfigurationException#com.datastax.driver.core.Cluster 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@BeforeClass
public static void setup()
{
try {
cluster = Cluster.builder().addContactPoint(NODE).build();
session = cluster.connect(KEYSPACE);
String createMetaTable = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + "."
+ CassandraTransactionalStore.DEFAULT_META_TABLE + " ( " + CassandraTransactionalStore.DEFAULT_APP_ID_COL
+ " TEXT, " + CassandraTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT, "
+ CassandraTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT, " + "PRIMARY KEY ("
+ CassandraTransactionalStore.DEFAULT_APP_ID_COL + ", " + CassandraTransactionalStore.DEFAULT_OPERATOR_ID_COL
+ ") " + ");";
session.execute(createMetaTable);
String createTable = "CREATE TABLE IF NOT EXISTS "
+ KEYSPACE
+ "."
+ TABLE_NAME
+ " (id uuid PRIMARY KEY,age int,lastname text,test boolean,floatvalue float,doubleValue double,set1 set<int>,list1 list<int>,map1 map<text,int>,last_visited timestamp);";
session.execute(createTable);
} catch (Throwable e) {
DTThrowable.rethrow(e);
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Message> source = env.fromCollection(messages);
CassandraSink.addSink(source)
.setClusterBuilder(new ClusterBuilder() {
@Override
protected Cluster buildCluster(Builder builder) {
return builder.addContactPoint("127.0.0.1").build();
}
})
.setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
.build();
env.execute("Cassandra Sink example");
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(1000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000));
env.setStateBackend(new FsStateBackend("file:///" + System.getProperty("java.io.tmpdir") + "/flink/backend"));
CassandraSink<Tuple2<String, Integer>> sink = CassandraSink.addSink(env.addSource(new MySource()))
.setQuery("INSERT INTO zhisheng.values (id, counter) values (?, ?);")
.enableWriteAheadLog()
.setClusterBuilder(new ClusterBuilder() {
private static final long serialVersionUID = 2793938419775311824L;
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1").build();
}
})
.build();
sink.name("Cassandra Sink").disableChaining().setParallelism(1).uid("hello");
env.execute();
}
@BeforeClass
public static void beforeClass() throws Exception {
cassandraPort = NetworkTestHelper.getAvailableLocalPort();
cassandraNativePort = NetworkTestHelper.getAvailableLocalPort();
replacePortsInConfFile();
// Start the Embedded Cassandra Service
cassandra.start();
final SocketOptions socketOptions = new SocketOptions();
// Setting this to 0 disables read timeouts.
socketOptions.setReadTimeoutMillis(0);
// This defaults to 5 s. Increase to a minute.
socketOptions.setConnectTimeoutMillis(60 * 1000);
cluster =
Cluster.builder()
.addContactPoint(CASSANDRA_HOST)
.withClusterName("beam")
.withSocketOptions(socketOptions)
.withPort(cassandraNativePort)
.build();
session = cluster.connect();
createCassandraData();
}
@BeforeClass()
public static void setup() throws ConfigurationException, IOException
{
cassandra = new EmbeddedCassandraService();
cassandra.start();
cluster = Cluster.builder().addContactPoint("127.0.0.1").withPort(DatabaseDescriptor.getNativeTransportPort()).build();
session = cluster.connect();
session.execute("drop keyspace if exists junit;");
session.execute("create keyspace junit WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };");
session.execute("CREATE TABLE junit.noncounter (\n" +
" id int PRIMARY KEY,\n" +
" val text\n" +
");");
session.execute("CREATE TABLE junit.counter (\n" +
" id int PRIMARY KEY,\n" +
" val counter,\n" +
");");
noncounter = session.prepare("insert into junit.noncounter(id, val)values(?,?)");
counter = session.prepare("update junit.counter set val = val + ? where id = ?");
}
@Test
public void testClusterHintsPollerWhenNodeDown() throws UnknownHostException {
ClusterHintsPoller clusterHintsPoller = new ClusterHintsPoller();
Session mockSession = mock(Session.class);
Cluster mockCluster = mock(Cluster.class);
Metadata mockMetadata = mock(Metadata.class);
when(mockCluster.getMetadata()).thenReturn(mockMetadata);
when(mockCluster.getClusterName()).thenReturn("test-cluster");
Host node1 = mock(Host.class);
when(node1.getAddress()).thenReturn(InetAddress.getByName("127.0.0.1"));
Host node2 = mock(Host.class);
when(node2.getAddress()).thenReturn(InetAddress.getByName("127.0.0.2"));
Host node3 = mock(Host.class);
when(node3.getAddress()).thenReturn(InetAddress.getByName("127.0.0.3"));
when(mockSession.getCluster()).thenReturn(mockCluster);
// The first node queried is down
when(mockSession.execute(any(Statement.class))).thenThrow(new NoHostAvailableException(ImmutableMap.<InetSocketAddress, Throwable>of()));
when(mockMetadata.getAllHosts()).thenReturn(ImmutableSet.of(node1, node2, node3));
HintsPollerResult actualResult = clusterHintsPoller.getOldestHintsInfo(mockSession);
// Make sure HintsPollerResult fails
assertFalse(actualResult.areAllHostsPolling(), "Result should show hosts failing");
assertEquals(actualResult.getHostFailure(), ImmutableSet.of(InetAddress.getByName("127.0.0.1")), "Node 1 should return with host failure");
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Message> source = env.fromCollection(messages);
CassandraSink.addSink(source)
.setClusterBuilder(new ClusterBuilder() {
@Override
protected Cluster buildCluster(Builder builder) {
return builder.addContactPoint("127.0.0.1").build();
}
})
.setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
.build();
env.execute("Cassandra Sink example");
}
private Session createSession(Profile profile) {
QueryOptions options = new QueryOptions();
options.setConsistencyLevel(profile.getConsistencyLevel());
Cluster.Builder builder = Cluster.builder();
builder.addContactPoints(profile.getNodes().toArray(new String[0]));
builder.withPort(profile.getPort());
builder.withQueryOptions(options);
if(!StringUtils.isBlank(profile.getUsername()) && !StringUtils.isBlank(profile.getPassword())) {
builder.withCredentials(profile.getUsername(), profile.getPassword());
}
Cluster cluster = builder.build();
Session session = cluster.connect(profile.getKeyspace());
return session;
}
public static Cluster loadTablesFromRemote(String host, int port, String cfidOverrides) throws IOException {
Map<String, UUID> cfs = parseOverrides(cfidOverrides);
Cluster.Builder builder = Cluster.builder().addContactPoints(host).withPort(port);
Cluster cluster = builder.build();
Metadata metadata = cluster.getMetadata();
IPartitioner partitioner = FBUtilities.newPartitioner(metadata.getPartitioner());
if (DatabaseDescriptor.getPartitioner() == null)
DatabaseDescriptor.setPartitionerUnsafe(partitioner);
for (com.datastax.driver.core.KeyspaceMetadata ksm : metadata.getKeyspaces()) {
if (!ksm.getName().equals("system")) {
for (TableMetadata tm : ksm.getTables()) {
String name = ksm.getName()+"."+tm.getName();
try {
CassandraUtils.tableFromCQL(
new ByteArrayInputStream(tm.asCQLQuery().getBytes()),
cfs.get(name) != null ? cfs.get(name) : tm.getId());
} catch(SyntaxException e) {
// ignore tables that we cant parse (probably dse)
logger.debug("Ignoring table " + name + " due to syntax exception " + e.getMessage());
}
}
}
}
return cluster;
}
@Test
public void stop() {
final CassandraSink sink = new CassandraSink();
final Channel channel = mock(Channel.class);
final Session session = mock(Session.class);
final Cluster cluster = mock(Cluster.class);
final Context ctx = new Context();
ctx.put("tables", "keyspace.table");
sink.configure(ctx);
sink.setChannel(channel);
sink.session = session;
sink.cluster = cluster;
sink.stop();
verify(session).isClosed();
verify(session).close();
verifyNoMoreInteractions(session);
verify(cluster).isClosed();
verify(cluster).close();
verifyNoMoreInteractions(cluster);
}
/**
* Uses a Cluster.Builder to create a Cassandra cluster reference using the given parameters
*
* @param contactPoints The contact points (hostname:port list of Cassandra nodes)
* @param sslContext The SSL context (used for secure connections)
* @param username The username for connection authentication
* @param password The password for connection authentication
* @param compressionType Enable compression at transport-level requests and responses.
* @return A reference to the Cluster object associated with the given Cassandra configuration
*/
protected Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext,
String username, String password, String compressionType) {
Cluster.Builder builder = Cluster.builder().addContactPointsWithPorts(contactPoints);
if (sslContext != null) {
JdkSSLOptions sslOptions = JdkSSLOptions.builder()
.withSSLContext(sslContext)
.build();
builder = builder.withSSL(sslOptions);
if(ProtocolOptions.Compression.SNAPPY.equals(compressionType)) {
builder = builder.withCompression(ProtocolOptions.Compression.SNAPPY);
} else if(ProtocolOptions.Compression.LZ4.equals(compressionType)) {
builder = builder.withCompression(ProtocolOptions.Compression.LZ4);
}
}
if (username != null && password != null) {
builder = builder.withCredentials(username, password);
}
return builder.build();
}
@Inject
public SchemaManager(@Named("cassandra.keyspace") String keyspace, @Named("cassandra.host") String host, @Named("cassandra.port") int port,
@Named("cassandra.username") String username, @Named("cassandra.password") String password, @Named("cassandra.ssl") boolean ssl) {
m_keyspace = keyspace;
Builder builder = Cluster.builder()
.withPort(port)
.addContactPoints(host.split(","));
if (username != null && password != null) {
LOG.info("Using username: {} and password: XXXXXXXX", username);
builder.withCredentials(username, password);
}
if (ssl) {
LOG.info("Using SSL.");
builder.withSSL();
}
m_cluster= builder.build();
m_session = m_cluster.connect();
}
@Override
public synchronized Session getSession() {
String id = this.getHost()+":"+this.cqlPort;
if (!cassandraSession.containsKey(id)){
Cluster cluster = Cluster.builder()
.withPort(this.cqlPort)
.addContactPoint(this.getHost())
.withCredentials(this.username, this.password)
.withProtocolVersion(PROTOCOL_VERSION)
.build();
session = cluster.connect(quote(this.catalog));
cassandraSession.put(id,session);
}
return cassandraSession.get(id);
}
private void setup()
throws IOException, KeyStoreException, NoSuchAlgorithmException, KeyManagementException,
CertificateException, UnrecoverableKeyException {
// Connect to Cassandra
Cluster.Builder clusterBuilder = Cluster.builder()
.addContactPoint(host)
.withPort(port)
.withLoadBalancingPolicy(new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build()));
if (null != username)
clusterBuilder = clusterBuilder.withCredentials(username, password);
if (null != truststorePath)
clusterBuilder = clusterBuilder.withSSL(createSSLOptions());
cluster = clusterBuilder.build();
if (null == cluster) {
throw new IOException("Could not create cluster");
}
session = cluster.connect();
}
@BeforeClass
public static void startCassandraEmbedded() throws InterruptedException, TTransportException, ConfigurationException, IOException {
EmbeddedCassandraServerHelper.startEmbeddedCassandra();
final Cluster cluster = Cluster.builder().addContactPoints("127.0.0.1").withPort(9142).build();
final Session session = cluster.connect();
session.execute(KEYSPACE_CREATION_QUERY);
session.execute(KEYSPACE_ACTIVATE_QUERY);
Thread.sleep(5000);
}
public static void initSchema() throws IOException {
try (Cluster cluster = buildCluster(); Session tmpSession = cluster.connect()) {
await().with().pollInterval(3, SECONDS).atMost(2, MINUTES).until(() -> {
try {
tmpSession.execute("DROP KEYSPACE IF EXISTS reaper_db");
return true;
} catch (RuntimeException ex) {
return false;
}
});
tmpSession.execute(
"CREATE KEYSPACE reaper_db WITH replication = {" + BasicSteps.buildNetworkTopologyStrategyString(cluster)
+ "}");
}
}
@After
public void after() {
try (Cluster cluster = cassandra.getCluster(); Session session = cluster.newSession()) {
session.execute("TRUNCATE jaeger_v1_dc1.traces");
session.execute(String.format("TRUNCATE jaeger_v1_dc1.%s", dependenciesTable(session)));
}
}
/**
* execute shCQLcoomand
* @param Session session
* @param shCQLcommand command shCQL
* @return List string with result
*/
@Override
public Table execute(Session session, String shCQLcommand) {
SHCQLOperation shCqlOperation = new SHCQLOperation(shCQLcommand);
Cluster cluster = session.getCluster();
DescribeExecutor executor = DescribeExecutorFactory.select(shCqlOperation.identifier());
executor.optionalParam(shCqlOperation.optionalValue());
return executor.execute(cluster.getMetadata());
}
public AsyncFuture<Connection> construct() {
AsyncFuture<Session> session = async.call(() -> {
final QueryOptions queryOptions = new QueryOptions()
.setFetchSize(fetchSize)
.setConsistencyLevel(consistencyLevel);
final SocketOptions socketOptions = new SocketOptions()
.setReadTimeoutMillis((int) readTimeout.toMilliseconds());
final Cluster.Builder cluster = Cluster.builder()
.addContactPointsWithPorts(seeds)
.withRetryPolicy(retryPolicy)
.withQueryOptions(queryOptions)
.withSocketOptions(socketOptions)
.withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy()))
.withoutJMXReporting();
authentication.accept(cluster);
poolingOptions.apply(cluster);
return cluster.build().connect();
});
if (configure) {
session = session.lazyTransform(s -> schema.configure(s).directTransform(i -> s));
}
return session.lazyTransform(
s -> schema.instance(s).directTransform(schema -> new Connection(s, schema)));
}
@Override
public void run() {
Cluster cluster = null;
try {
// Send TASK_RUNNING
sendStatus(driver, Protos.TaskState.TASK_RUNNING,
"Started taking schema backup");
cluster = Cluster.builder().addContactPoint(daemon.getProbe().getEndpoint()).build();
final List<String> keyspaces = StorageUtil.filterSystemKeyspaces(daemon.getNonSystemKeySpaces());
if (keyspaces.size() > 0) {
StringBuilder sb = new StringBuilder();
for (String keyspace : keyspaces) {
LOGGER.info("Taking schema backup for keyspace: {}", keyspace);
KeyspaceMetadata ksm = cluster.getMetadata().getKeyspace(keyspace);
sb.append(ksm.exportAsString()).append(System.getProperty("line.separator"));
}
backupStorageDriver.uploadSchema(context, sb.toString());
}
// Send TASK_FINISHED
sendStatus(driver, Protos.TaskState.TASK_FINISHED,
"Finished taking schema backup for keyspaces: " + keyspaces);
} catch (Throwable t){
LOGGER.error("Schema backup failed. Reason: ", t);
sendStatus(driver, Protos.TaskState.TASK_FAILED, t.getMessage());
} finally {
if (cluster != null)
cluster.close();
}
}
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
List<CustomCassandraAnnotatedPojo> customCassandraAnnotatedPojos = IntStream.range(0, 20)
.mapToObj(x -> new CustomCassandraAnnotatedPojo(UUID.randomUUID().toString(), x, 0))
.collect(Collectors.toList());
DataSet<CustomCassandraAnnotatedPojo> dataSet = env.fromCollection(customCassandraAnnotatedPojos);
ClusterBuilder clusterBuilder = new ClusterBuilder() {
private static final long serialVersionUID = -1754532803757154795L;
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoints("127.0.0.1").build();
}
};
dataSet.output(new CassandraPojoOutputFormat<>(clusterBuilder, CustomCassandraAnnotatedPojo.class, () -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)}));
env.execute("Write");
/*
* This is for the purpose of showing an example of creating a DataSet using CassandraPojoInputFormat.
*/
DataSet<CustomCassandraAnnotatedPojo> inputDS = env
.createInput(new CassandraPojoInputFormat<>(
SELECT_QUERY,
clusterBuilder,
CustomCassandraAnnotatedPojo.class,
() -> new Mapper.Option[]{Mapper.Option.consistencyLevel(ConsistencyLevel.ANY)}
));
inputDS.print();
}
public CqlSession(final String nodes, final int port, final String keyspace, final SocketOptions socketOptions,
final RetryPolicy retryPolicy, final QueryOptions queryOptions,
final LoadBalancingPolicy loadBalancingPolicy, final int maxConnectionsPerHost,
final MetricFactory metricFactory) {
// this is temp. to reuse current hosts properties:
final Iterable<String> nodesIter = Splitter.on(",").split(nodes);
final String[] nodesArr = Iterables.toArray(
StreamSupport.stream(nodesIter.spliterator(), false).map(input -> {
if (input == null) return null;
final int idx = input.lastIndexOf(":");
return input.substring(0, idx);
}).collect(Collectors.toList()), String.class);
/*PoolingOptions poolingOptions = new PoolingOptions();
poolingOptions.setMaxConnectionsPerHost(HostDistance.LOCAL, maxConnectionsPerHost);
poolingOptions.setMaxConnectionsPerHost(HostDistance.REMOTE, maxConnectionsPerHost);*/
final Cluster cluster = Cluster.builder().
withPort(port).
withSocketOptions(socketOptions).
withQueryOptions(queryOptions).
withLoadBalancingPolicy(loadBalancingPolicy).
// withPoolingOptions(poolingOptions).
addContactPoints(nodesArr).build();
//cluster.init();
this.session = cluster.connect(keyspace);
this.retryPolicy = Preconditions.checkNotNull(retryPolicy);
this.metricFactory = Preconditions.checkNotNull(metricFactory);
}
public CassandraTweetRepository(String cassandraHost, int cassandraPort, String keyspaceName) {
m_cassandraHost = cassandraHost;
m_cassandraPort = cassandraPort;
m_keyspaceName = keyspaceName;
LOG.info("Connecting to {}:{}...", cassandraHost, cassandraPort);
cluster = Cluster.builder().withPort(m_cassandraPort).addContactPoint(m_cassandraHost).build();
session = cluster.connect(m_keyspaceName);
LOG.info("Connected.");
}
@Bean
public Cluster cluster() throws Exception {
dataSet = new ClassPathCQLDataSet(CASSANDRA_INIT_SCRIPT, TEST_KEYSPACE);
cluster = new Cluster.Builder().addContactPoints(LOCALHOST).withPort(9142).build();
init();
return cluster;
}
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder
.addContactPointsWithPorts(new InetSocketAddress(HOST, PORT))
.withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
.withoutJMXReporting()
.withoutMetrics().build();
}
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
List<CustomCassandraAnnotatedPojo> customCassandraAnnotatedPojos = IntStream.range(0, 20)
.mapToObj(x -> new CustomCassandraAnnotatedPojo(UUID.randomUUID().toString(), x, 0))
.collect(Collectors.toList());
DataSet<CustomCassandraAnnotatedPojo> dataSet = env.fromCollection(customCassandraAnnotatedPojos);
ClusterBuilder clusterBuilder = new ClusterBuilder() {
private static final long serialVersionUID = -1754532803757154795L;
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoints("127.0.0.1").build();
}
};
dataSet.output(new CassandraPojoOutputFormat<>(clusterBuilder, CustomCassandraAnnotatedPojo.class, () -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)}));
env.execute("zhisheng");
/*
* This is for the purpose of showing an example of creating a DataSet using CassandraPojoInputFormat.
*/
DataSet<CustomCassandraAnnotatedPojo> inputDS = env
.createInput(new CassandraPojoInputFormat<>(
SELECT_QUERY,
clusterBuilder,
CustomCassandraAnnotatedPojo.class,
() -> new Mapper.Option[]{Mapper.Option.consistencyLevel(ConsistencyLevel.ANY)}
));
inputDS.print();
}
@Inject
public SessionWithInitializedTablesFactory(KeyspaceConfiguration keyspaceConfiguration,
Cluster cluster,
CassandraModule module) {
this.module = module;
this.session = createSession(cluster, keyspaceConfiguration.getKeyspace());
}
private static void createTable(String keyspaceName, String tableName) {
try (Cluster cluster = buildCluster(); Session tmpSession = cluster.connect()) {
VersionNumber lowestNodeVersion = getCassandraVersion(tmpSession);
String createTableStmt
= "CREATE TABLE "
+ (VersionNumber.parse("2.0").compareTo(lowestNodeVersion) <= 0 ? "IF NOT EXISTS " : "")
+ keyspaceName
+ "."
+ tableName
+ "(id int PRIMARY KEY, value text)";
if (tableName.endsWith("twcs")) {
if (((VersionNumber.parse("3.0.8").compareTo(lowestNodeVersion) <= 0
&& VersionNumber.parse("3.0.99").compareTo(lowestNodeVersion) >= 0)
|| VersionNumber.parse("3.8").compareTo(lowestNodeVersion) <= 0)) {
// TWCS is available by default
createTableStmt
+= " WITH compaction = {'class':'TimeWindowCompactionStrategy',"
+ "'compaction_window_size': '1', "
+ "'compaction_window_unit': 'MINUTES'}";
} else if (VersionNumber.parse("2.0.11").compareTo(lowestNodeVersion) <= 0) {
createTableStmt += " WITH compaction = {'class':'DateTieredCompactionStrategy'}";
}
}
try {
if (null == tmpSession.getCluster().getMetadata().getKeyspace(keyspaceName).getTable(tableName)) {
tmpSession.execute(createTableStmt);
}
} catch (AlreadyExistsException ignore) { }
for (int i = 0; i < 100; i++) {
tmpSession.execute(
"INSERT INTO " + keyspaceName + "." + tableName + "(id, value) VALUES(" + i + ",'" + i + "')");
}
}
}
static Cluster getCluster(InetSocketAddress contactPoint) {
return Cluster.builder()
.withoutJMXReporting()
.addContactPointsWithPorts(contactPoint)
.withRetryPolicy(ZipkinRetryPolicy.INSTANCE)
.withPoolingOptions(new PoolingOptions().setMaxConnectionsPerHost(HostDistance.LOCAL, 1))
.build();
}
private static Cluster newCluster(String host, int port) {
Cluster.Builder builder = Cluster.builder();
builder.addContactPoint(host);
builder.withPort(port);
builder.withoutMetrics();
return builder.build();
}