下面列出了怎么用org.testng.collections.Maps的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testMatchesSupportedProtocols() {
Map<String, byte[]> protocols = Maps.newHashMap();
protocols.put("range", new byte[0]);
MemberMetadata member = newMember(protocols);
assertTrue(member.matches(protocols));
protocols = new HashMap<>();
protocols.put("range", new byte[] { 0 });
assertFalse(member.matches(protocols));
protocols = new HashMap<>();
protocols.put("roundrobin", new byte[0]);
assertFalse(member.matches(protocols));
protocols = new HashMap<>();
protocols.put("range", new byte[0]);
protocols.put("roundrobin", new byte[0]);
assertFalse(member.matches(protocols));
}
@Test
void createDocTest()
{
BaseIndexer indexer = new SolrIndexer("", TEST1);
List<String> languages = Lists.newArrayList();
languages.add(LOCALE);
Map<String, Object> createdDoc = indexer.createDoc(ID, ENGLISH_TITLE, SPANISH_TITLE, ALT_TITLE, languages);
Map<String, Object> expectedDoc = Maps.newHashMap();
expectedDoc.put(Properties.idField.get(), ID + "_" + TEST1);
expectedDoc.put(Properties.titleFields.get().get(0) + "_en", ENGLISH_TITLE);
Set<Object> localizedTitles = Sets.newHashSet();
localizedTitles.add(SPANISH_TITLE);
expectedDoc.put(Properties.titleFields.get().get(0) + "_es", localizedTitles);
expectedDoc.put(Properties.docTypeFieldName.get(), TEST1);
Assert.assertEquals(createdDoc, expectedDoc);
StringBuilder jsonStringOfDoc = indexer.getJsonStringOfDoc(new ObjectMapper().valueToTree(createdDoc));
Assert.assertEquals(jsonStringOfDoc.toString(), "[{\"query_testing_type\":\"test1\",\"title_en\":\"title en\",\"id\":\"123_test1\",\"title_es\":[\"title es\"]}]");
String urlForAddingDoc = indexer.getUrlForAddingDoc(createdDoc);
Assert.assertEquals(urlForAddingDoc, "http://localhost:8983/solr/qtest/update");
String urlForCommitting = indexer.getUrlForCommitting();
Assert.assertEquals(urlForCommitting, "http://localhost:8983/solr/qtest/update?commit=true");
}
private List<List<IMethodInstance>> createInstances(List<IMethodInstance> methodInstances) {
Map<Object, List<IMethodInstance>> map = Maps.newHashMap();
// MapList<IMethodInstance[], Object> map = new MapList<IMethodInstance[], Object>();
for (IMethodInstance imi : methodInstances) {
for (Object o : imi.getInstances()) {
System.out.println(o);
List<IMethodInstance> l = map.get(o);
if (l == null) {
l = Lists.newArrayList();
map.put(o, l);
}
l.add(imi);
}
// for (Object instance : imi.getInstances()) {
// map.put(imi, instance);
// }
}
// return map.getKeys();
// System.out.println(map);
return new ArrayList<List<IMethodInstance>>(map.values());
}
@Test
public void testDefaultCredentialProvider() throws Exception {
KinesisSink sink = new KinesisSink();
Map<String, String> credentialParam = Maps.newHashMap();
String awsCredentialPluginParam = new Gson().toJson(credentialParam);
try {
sink.defaultCredentialProvider(awsCredentialPluginParam);
Assert.fail("accessKey and SecretKey validation not applied");
} catch (IllegalArgumentException ie) {
// Ok..
}
final String accesKey = "ak";
final String secretKey = "sk";
credentialParam.put(KinesisSink.ACCESS_KEY_NAME, accesKey);
credentialParam.put(KinesisSink.SECRET_KEY_NAME, secretKey);
awsCredentialPluginParam = new Gson().toJson(credentialParam);
AWSCredentialsProvider credentialProvider = sink.defaultCredentialProvider(awsCredentialPluginParam)
.getCredentialProvider();
Assert.assertNotNull(credentialProvider);
Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(), accesKey);
Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(), secretKey);
sink.close();
}
private Record<byte[]> createRecord(byte[] data, String algo, String[] keyNames, byte[][] keyValues, byte[] param,
Map<String, String> metadata1, Map<String, String> metadata2, int batchSize, int compressionMsgSize,
Map<String, String> properties, boolean isEncryption) {
EncryptionContext ctx = null;
if(isEncryption) {
ctx = new EncryptionContext();
ctx.setAlgorithm(algo);
ctx.setBatchSize(Optional.of(batchSize));
ctx.setCompressionType(CompressionType.LZ4);
ctx.setUncompressedMessageSize(compressionMsgSize);
Map<String, EncryptionKey> keys = Maps.newHashMap();
EncryptionKey encKeyVal = new EncryptionKey();
encKeyVal.setKeyValue(keyValues[0]);
encKeyVal.setMetadata(metadata1);
EncryptionKey encKeyVal2 = new EncryptionKey();
encKeyVal2.setKeyValue(keyValues[1]);
encKeyVal2.setMetadata(metadata2);
keys.put(keyNames[0], encKeyVal);
keys.put(keyNames[1], encKeyVal2);
ctx.setKeys(keys);
ctx.setParam(param);
}
return new RecordImpl(data, properties, Optional.ofNullable(ctx));
}
@Test
public void testDisableSchemaValidationEnforcedHasSchema() throws Exception {
admin.namespaces().createNamespace("schema-validation-enforced/default-has-schema");
String namespace = "schema-validation-enforced/default-has-schema";
String topicName = "persistent://schema-validation-enforced/default-has-schema/test";
assertFalse(admin.namespaces().getSchemaValidationEnforced(namespace));
admin.namespaces().setSchemaValidationEnforced(namespace, false);
try {
admin.schemas().getSchemaInfo(topicName);
} catch (PulsarAdminException.NotFoundException e) {
assertTrue(e.getMessage().contains("HTTP 404 Not Found"));
}
Map<String, String> properties = Maps.newHashMap();
SchemaInfo schemaInfo = new SchemaInfo();
schemaInfo.setType(SchemaType.STRING);
schemaInfo.setProperties(properties);
schemaInfo.setName("test");
schemaInfo.setSchema("".getBytes());
PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", "", properties);
admin.schemas().createSchema(topicName, postSchemaPayload);
try (Producer p = pulsarClient.newProducer().topic(topicName).create()) {
p.send("test schemaValidationEnforced".getBytes());
}
assertEquals(admin.schemas().getSchemaInfo(topicName), schemaInfo);
}
@Test
public void testEnableSchemaValidationEnforcedHasSchemaMatch() throws Exception {
admin.namespaces().createNamespace("schema-validation-enforced/enable-has-schema-match");
String namespace = "schema-validation-enforced/enable-has-schema-match";
String topicName = "persistent://schema-validation-enforced/enable-has-schema-match/test";
assertFalse(admin.namespaces().getSchemaValidationEnforced(namespace));
try {
admin.schemas().getSchemaInfo(topicName);
} catch (PulsarAdminException.NotFoundException e) {
assertTrue(e.getMessage().contains("HTTP 404 Not Found"));
}
admin.namespaces().setSchemaValidationEnforced(namespace,true);
Map<String, String> properties = Maps.newHashMap();
SchemaInfo schemaInfo = new SchemaInfo();
schemaInfo.setType(SchemaType.STRING);
schemaInfo.setProperties(properties);
schemaInfo.setName("test");
schemaInfo.setSchema("".getBytes());
PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", "", properties);
admin.schemas().createSchema(topicName, postSchemaPayload);
try (Producer<String> p = pulsarClient.newProducer(Schema.STRING).topic(topicName).create()) {
p.send("test schemaValidationEnforced");
}
assertEquals(admin.schemas().getSchemaInfo(topicName).getName(), schemaInfo.getName());
assertEquals(admin.schemas().getSchemaInfo(topicName).getType(), schemaInfo.getType());
}
private static Map<Integer, String> consumeCompactedTopic(PulsarClient client,
String topic,
String subscription,
int numKeys) throws PulsarClientException {
Map<Integer, String> keys = Maps.newHashMap();
try (Consumer<byte[]> consumer = client.newConsumer()
.readCompacted(true)
.topic(topic)
.subscriptionName(subscription)
.subscribe()
) {
for (int i = 0; i < numKeys; i++) {
Message<byte[]> m = consumer.receive();
keys.put(Integer.parseInt(m.getKey()), new String(m.getValue(), UTF_8));
}
}
return keys;
}
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
serviceUrl = pulsar.getWebServiceAddress();
pulsarSpoutConf = new PulsarSpoutConfiguration();
pulsarSpoutConf.setServiceUrl(serviceUrl);
pulsarSpoutConf.setTopic(topic);
pulsarSpoutConf.setSubscriptionName(subscriptionName);
pulsarSpoutConf.setMessageToValuesMapper(messageToValuesMapper);
pulsarSpoutConf.setFailedRetriesTimeout(1, TimeUnit.SECONDS);
pulsarSpoutConf.setMaxFailedRetries(2);
pulsarSpoutConf.setSharedConsumerEnabled(true);
pulsarSpoutConf.setMetricsTimeIntervalInSecs(60);
pulsarSpoutConf.setSubscriptionType(SubscriptionType.Shared);
spout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
mockCollector = new MockSpoutOutputCollector();
SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector);
TopologyContext context = mock(TopologyContext.class);
when(context.getThisComponentId()).thenReturn("test-spout-" + methodName);
when(context.getThisTaskId()).thenReturn(0);
spout.open(Maps.newHashMap(), context, collector);
producer = pulsarClient.newProducer().topic(topic).create();
}
@Test
public void testSharedConsumer() throws Exception {
TopicStats topicStats = admin.topics().getStats(topic);
assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
PulsarSpout otherSpout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
MockSpoutOutputCollector otherMockCollector = new MockSpoutOutputCollector();
SpoutOutputCollector collector = new SpoutOutputCollector(otherMockCollector);
TopologyContext context = mock(TopologyContext.class);
when(context.getThisComponentId()).thenReturn("test-spout-" + methodName);
when(context.getThisTaskId()).thenReturn(1);
otherSpout.open(Maps.newHashMap(), context, collector);
topicStats = admin.topics().getStats(topic);
assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
otherSpout.close();
topicStats = admin.topics().getStats(topic);
assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
}
@Test
public void testNoSharedConsumer() throws Exception {
TopicStats topicStats = admin.topics().getStats(topic);
assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
pulsarSpoutConf.setSharedConsumerEnabled(false);
PulsarSpout otherSpout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
MockSpoutOutputCollector otherMockCollector = new MockSpoutOutputCollector();
SpoutOutputCollector collector = new SpoutOutputCollector(otherMockCollector);
TopologyContext context = mock(TopologyContext.class);
when(context.getThisComponentId()).thenReturn("test-spout-" + methodName);
when(context.getThisTaskId()).thenReturn(1);
otherSpout.open(Maps.newHashMap(), context, collector);
topicStats = admin.topics().getStats(topic);
assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 2);
otherSpout.close();
topicStats = admin.topics().getStats(topic);
assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
}
@Test
public void testFailedConsumer() {
PulsarSpoutConfiguration pulsarSpoutConf = new PulsarSpoutConfiguration();
pulsarSpoutConf.setServiceUrl(serviceUrl);
pulsarSpoutConf.setTopic("persistent://invalidTopic");
pulsarSpoutConf.setSubscriptionName(subscriptionName);
pulsarSpoutConf.setMessageToValuesMapper(messageToValuesMapper);
pulsarSpoutConf.setFailedRetriesTimeout(1, TimeUnit.SECONDS);
pulsarSpoutConf.setMaxFailedRetries(2);
pulsarSpoutConf.setSharedConsumerEnabled(false);
pulsarSpoutConf.setMetricsTimeIntervalInSecs(60);
pulsarSpoutConf.setSubscriptionType(SubscriptionType.Shared);
PulsarSpout spout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
MockSpoutOutputCollector mockCollector = new MockSpoutOutputCollector();
SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector);
TopologyContext context = mock(TopologyContext.class);
when(context.getThisComponentId()).thenReturn("new-test" + methodName);
when(context.getThisTaskId()).thenReturn(0);
try {
spout.open(Maps.newHashMap(), context, collector);
fail("should have failed as consumer creation failed");
} catch (IllegalStateException e) {
// Ok
}
}
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
serviceUrl = pulsar.getWebServiceAddress();
pulsarBoltConf = new PulsarBoltConfiguration();
pulsarBoltConf.setServiceUrl(serviceUrl);
pulsarBoltConf.setTopic(topic);
pulsarBoltConf.setTupleToMessageMapper(tupleToMessageMapper);
pulsarBoltConf.setMetricsTimeIntervalInSecs(60);
bolt = new PulsarBolt(pulsarBoltConf, PulsarClient.builder());
mockCollector = new MockOutputCollector();
OutputCollector collector = new OutputCollector(mockCollector);
TopologyContext context = mock(TopologyContext.class);
when(context.getThisComponentId()).thenReturn("test-bolt-" + methodName);
when(context.getThisTaskId()).thenReturn(0);
bolt.prepare(Maps.newHashMap(), context, collector);
consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName).subscribe();
}
@Test
public void testSharedProducer() throws Exception {
TopicStats topicStats = admin.topics().getStats(topic);
Assert.assertEquals(topicStats.publishers.size(), 1);
PulsarBolt otherBolt = new PulsarBolt(pulsarBoltConf, PulsarClient.builder());
MockOutputCollector otherMockCollector = new MockOutputCollector();
OutputCollector collector = new OutputCollector(otherMockCollector);
TopologyContext context = mock(TopologyContext.class);
when(context.getThisComponentId()).thenReturn("test-bolt-" + methodName);
when(context.getThisTaskId()).thenReturn(1);
otherBolt.prepare(Maps.newHashMap(), context, collector);
topicStats = admin.topics().getStats(topic);
Assert.assertEquals(topicStats.publishers.size(), 1);
otherBolt.close();
topicStats = admin.topics().getStats(topic);
Assert.assertEquals(topicStats.publishers.size(), 1);
}
@Test
public void testFailedProducer() {
PulsarBoltConfiguration pulsarBoltConf = new PulsarBoltConfiguration();
pulsarBoltConf.setServiceUrl(serviceUrl);
pulsarBoltConf.setTopic("persistent://invalid");
pulsarBoltConf.setTupleToMessageMapper(tupleToMessageMapper);
pulsarBoltConf.setMetricsTimeIntervalInSecs(60);
PulsarBolt bolt = new PulsarBolt(pulsarBoltConf, PulsarClient.builder());
MockOutputCollector mockCollector = new MockOutputCollector();
OutputCollector collector = new OutputCollector(mockCollector);
TopologyContext context = mock(TopologyContext.class);
when(context.getThisComponentId()).thenReturn("new" + methodName);
when(context.getThisTaskId()).thenReturn(0);
try {
bolt.prepare(Maps.newHashMap(), context, collector);
fail("should have failed as producer creation failed");
} catch (IllegalStateException ie) {
// Ok.
}
}
@Test
public void defaultToEmptyCollectionsOnNullValueTest() throws Throwable {
assertTrue(Util.defaultToEmptyCollectionsOnNullValue(List.class, null).isEmpty());
assertTrue(Util.defaultToEmptyCollectionsOnNullValue(List.class, Lists.newArrayList("1")).contains("1"));
Set<Object> set = Sets.newHashSet();
set.add(1);
assertTrue(Util.defaultToEmptyCollectionsOnNullValue(Set.class, set).contains(1));
assertTrue(Util.defaultToEmptyCollectionsOnNullValue(Set.class, null).isEmpty());
Map<Object, Object> map = Maps.newHashMap();
map.put(1, 2);
assertTrue(Util.defaultToEmptyCollectionsOnNullValue(Map.class, map).containsKey(1));
assertTrue(Util.defaultToEmptyCollectionsOnNullValue(Map.class, map).containsValue(2));
assertTrue(Util.defaultToEmptyCollectionsOnNullValue(Map.class, null).isEmpty());
Object[] arr = set.toArray(new Object[set.size()]);
assertTrue(Util.defaultToEmptyCollectionsOnNullValue(Object[].class, null).length == 0);
assertTrue(Util.defaultToEmptyCollectionsOnNullValue(Object[].class, arr).length == 1);
Properties prop = new Properties();
prop.put("1", "2");
assertTrue(Util.defaultToEmptyCollectionsOnNullValue(Properties.class, prop).containsKey("1"));
assertTrue(Util.defaultToEmptyCollectionsOnNullValue(Properties.class, prop).containsValue("2"));
assertTrue(Util.defaultToEmptyCollectionsOnNullValue(Properties.class, null).isEmpty());
assertTrue(Util.defaultToEmptyCollectionsOnNullValue(Collection.class, null).isEmpty());
assertTrue(Util.defaultToEmptyCollectionsOnNullValue(Collection.class, set).contains(1));
assertTrue(Util.defaultToEmptyCollectionsOnNullValue(String.class, null) == null);
assertTrue(Util.defaultToEmptyCollectionsOnNullValue(String.class, "123").contentEquals("123"));
}
@Test
void createDocTest()
{
BaseIndexer indexer = new ElasticsearchIndexer("", TEST1);
List<String> languages = Lists.newArrayList();
languages.add(LOCALE);
Map<String, Object> createdDoc = indexer.createDoc(ID, ENGLISH_TITLE, SPANISH_TITLE, ALT_TITLE, languages);
Map<String, Object> expectedDoc = Maps.newHashMap();
expectedDoc.put(Properties.idField.get(), ID + "_" + TEST1);
expectedDoc.put(Properties.titleFields.get().get(0) + "_en", ENGLISH_TITLE);
Set<Object> localizedTitles = Sets.newHashSet();
localizedTitles.add(SPANISH_TITLE);
expectedDoc.put(Properties.titleFields.get().get(0) + "_es", localizedTitles);
expectedDoc.put(Properties.docTypeFieldName.get(), TEST1);
Assert.assertEquals(createdDoc, expectedDoc);
StringBuilder jsonStringOfDoc = indexer.getJsonStringOfDoc(new ObjectMapper().valueToTree(createdDoc));
Assert.assertEquals(jsonStringOfDoc.toString(), "{\"query_testing_type\":\"test1\",\"title_en\":\"title en\",\"id\":\"123_test1\",\"title_es\":[\"title es\"]}");
String urlForAddingDoc = indexer.getUrlForAddingDoc(createdDoc);
Assert.assertEquals(urlForAddingDoc, "http://localhost:8983/solr/qtest/test_doc/123_test1");
String urlForCommitting = indexer.getUrlForCommitting();
Assert.assertEquals(urlForCommitting, "http://localhost:8983/solr/qtest/_flush");
}
@Test
void createDocTest()
{
GoogleDataExtractor titleExtractor = Mockito.mock(GoogleDataExtractor.class);
Map<String, Map<Integer, TitleWithQueries>> mapOfQueriesToTitles = Maps.newHashMap();
Map<Integer, TitleWithQueries> titlesWithQueries = Maps.newHashMap();
TitleWithQueries titleWithQueries = new TitleWithQueries(DATASET_ID);
titleWithQueries.setValue(TitleWithQueries.ID, ID1);
titleWithQueries.setValue(TitleWithQueries.TITLE_EN, ENGLISH_TITLE);
titleWithQueries.setValue(TitleWithQueries.TITLE_LOCALE, SPANISH_TITLE);
titleWithQueries.setValue(TitleWithQueries.Q_ + "regular", Q1);
titlesWithQueries.put(1, titleWithQueries);
TitleWithQueries titleWithQueries2 = new TitleWithQueries(DATASET_ID);
titleWithQueries2.setValue(TitleWithQueries.ID, ID2);
titleWithQueries2.setValue(TitleWithQueries.TITLE_EN, ENGLISH_TITLE);
titleWithQueries2.setValue(TitleWithQueries.TITLE_LOCALE, SPANISH_TITLE);
titleWithQueries2.setValue(TitleWithQueries.Q_ + "regular", Q1);
titlesWithQueries.put(2, titleWithQueries2);
mapOfQueriesToTitles.put(DATASET_ID, titlesWithQueries);
Mockito.when(titleExtractor.getTitlesWithQueriesPerDataset()).thenReturn(mapOfQueriesToTitles);
Queries queries = new Queries(DATASET_ID, TEST1, titleExtractor);
queries.populateFromGoogleSpreadsheets();
Map<String, Set<String>> queryToIdMap = queries.getQueryToIdMap();
Map<String, Set<String>> expectedQueryToIdMap = Maps.newHashMap();
Set<String> titles = Sets.newHashSet();
titles.add(ID1+"_"+DATASET_ID);
titles.add(ID2+"_"+DATASET_ID);
expectedQueryToIdMap.put(Q1, titles);
Assert.assertEquals(queryToIdMap, expectedQueryToIdMap);
}
private Map<String, String> createGroups(String[] groups) {
Map<String, String> result = Maps.newHashMap();
// Groups that were passed on the command line
for (String group : groups) {
result.put(group, group);
}
// See if we have any MetaGroups and
// expand them if they match one of the groups
// we have just been passed
List<String> unfinishedGroups = Lists.newArrayList();
if (m_metaGroups.size() > 0) {
collectGroups(groups, unfinishedGroups, result);
// Do we need to loop over unfinished groups?
while (unfinishedGroups.size() > 0) {
String[] uGroups = unfinishedGroups.toArray(new String[unfinishedGroups.size()]);
unfinishedGroups = Lists.newArrayList();
collectGroups(uGroups, unfinishedGroups, result);
}
}
// Utils.dumpMap(result);
return result;
}
private ListMultiMap<ITestNGMethod, ITestNGMethod> createInstanceDependencies(ITestNGMethod[] methods) {
ListMultiMap<Object, ITestNGMethod> instanceMap = Maps.newSortedListMultiMap();
for (ITestNGMethod m : methods) {
instanceMap.put(m.getInstance(), m);
}
ListMultiMap<ITestNGMethod, ITestNGMethod> result = Maps.newListMultiMap();
Object previousInstance = null;
for (Map.Entry<Object, List<ITestNGMethod>> es : instanceMap.entrySet()) {
if (previousInstance == null) {
previousInstance = es.getKey();
} else {
List<ITestNGMethod> previousMethods = instanceMap.get(previousInstance);
Object currentInstance = es.getKey();
List<ITestNGMethod> currentMethods = instanceMap.get(currentInstance);
// Make all the methods from the current instance depend on the methods of
// the previous instance
for (ITestNGMethod cm : currentMethods) {
for (ITestNGMethod pm : previousMethods) {
result.put(cm, pm);
}
}
previousInstance = currentInstance;
}
}
return result;
}
@Override
public Collection<ITestNGMethod> getExcludedMethods() {
Map<ITestNGMethod, ITestNGMethod> vResult = Maps.newHashMap();
for (ITestNGMethod m : m_excludedMethods) {
vResult.put(m, m);
}
return vResult.keySet();
}
public static void main(String[] args) {
Map<Object, Object> m = Maps.newHashMap();
m.put("name", "a");
m.put("uname", "a");
m.put("id", 1);
Item item = new Gson().fromJson(new Gson().toJson(m), Item.class);
System.out.println(new Gson().toJson(item));
}
@DataProvider(name="dp-with-testngmethod-contex")
public Object[][] dataProviderForBDD(ITestNGMethod method, ITestContext contex){
Map<Object, Object> m = Maps.newHashMap();
m.put("method", method.getMethodName());
m.put("contex", contex.getName());
return new Object[][]{{m}};
}
@DataProvider(name="dp-for-filter")
public static Object[][] dataProviderToTestFilter(){
Map<Object, Object> m = Maps.newHashMap();
m.put("name", "a");
m.put("uname", "a");
m.put("id", 1);
Map<Object, Object> m2 = Maps.newHashMap();
m2.put("name", "b");
m2.put("uname", "b");
m2.put("id", 2);
Map<Object, Object> m3 = Maps.newHashMap();
m3.put("name", "c");
m3.put("uname", "c");
m3.put("id", 3);
Map<Object, Object> m4 = Maps.newHashMap();
m4.put("name", "d");
m4.put("uname", "d");
m4.put("id", 4);
Map<Object, Object> m5 = Maps.newHashMap();
m5.put("name", "e");
m5.put("uname", "e");
m5.put("id", 5);
return new Object[][]{{m},{m2},{m3},{m4},{m5}};
}
@Test
public void testCredentialProvider() throws Exception {
KinesisSink sink = new KinesisSink();
final String accesKey = "ak";
final String secretKey = "sk";
Map<String, String> credentialParam = Maps.newHashMap();
credentialParam.put(KinesisSink.ACCESS_KEY_NAME, accesKey);
credentialParam.put(KinesisSink.SECRET_KEY_NAME, secretKey);
String awsCredentialPluginParam = new Gson().toJson(credentialParam);
AWSCredentialsProvider credentialProvider = sink.createCredentialProvider(null, awsCredentialPluginParam)
.getCredentialProvider();
Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(), accesKey);
Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(), secretKey);
credentialProvider = sink.createCredentialProvider(AwsCredentialProviderPluginImpl.class.getName(), "{}")
.getCredentialProvider();
Assert.assertNotNull(credentialProvider);
Assert.assertEquals(credentialProvider.getCredentials().getAWSAccessKeyId(),
AwsCredentialProviderPluginImpl.accessKey);
Assert.assertEquals(credentialProvider.getCredentials().getAWSSecretKey(),
AwsCredentialProviderPluginImpl.secretKey);
Assert.assertEquals(((BasicSessionCredentials) credentialProvider.getCredentials()).getSessionToken(),
AwsCredentialProviderPluginImpl.sessionToken);
sink.close();
}
protected final void createAdminClient() throws Exception {
Map<String, String> authParams = Maps.newHashMap();
authParams.put("tlsCertFile", TLS_SUPERUSER_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_SUPERUSER_CLIENT_KEY_FILE_PATH);
admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrlTls.toString())
.tlsTrustCertsFilePath(TLS_BROKER_TRUST_CERT_FILE_PATH).allowTlsInsecureConnection(true)
.authentication(AuthenticationTls.class.getName(), authParams).build());
}
@SuppressWarnings("deprecation")
private PulsarClient createPulsarClient(String proxyServiceUrl) throws PulsarClientException {
Map<String, String> authParams = Maps.newHashMap();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
Authentication authTls = new AuthenticationTls();
authTls.configure(authParams);
return PulsarClient.builder().serviceUrl(proxyServiceUrl).statsInterval(0, TimeUnit.SECONDS)
.tlsTrustCertsFilePath(TLS_PROXY_TRUST_CERT_FILE_PATH).allowTlsInsecureConnection(true)
.authentication(authTls).enableTls(true).build();
}
protected final void createAdminClient() throws Exception {
Map<String, String> authParams = Maps.newHashMap();
authParams.put("tlsCertFile", TLS_SUPERUSER_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_SUPERUSER_CLIENT_KEY_FILE_PATH);
admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrlTls.toString())
.tlsTrustCertsFilePath(TLS_PROXY_TRUST_CERT_FILE_PATH).allowTlsInsecureConnection(true)
.authentication(AuthenticationTls.class.getName(), authParams).build());
}
@SuppressWarnings("deprecation")
private PulsarClient createPulsarClient(String proxyServiceUrl, ClientBuilder clientBuilder)
throws PulsarClientException {
Map<String, String> authParams = Maps.newHashMap();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
Authentication authTls = new AuthenticationTls();
authTls.configure(authParams);
return clientBuilder.serviceUrl(proxyServiceUrl).statsInterval(0, TimeUnit.SECONDS)
.tlsTrustCertsFilePath(TLS_PROXY_TRUST_CERT_FILE_PATH).allowTlsInsecureConnection(true)
.authentication(authTls).enableTls(true)
.operationTimeout(1000, TimeUnit.MILLISECONDS).build();
}
protected Map<String, String> produceSchemaInsertMessagesToInputTopic(String inputTopicName,
int numMessages,
Schema<Foo> schema) throws Exception {
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
@Cleanup
Producer<Foo> producer = client.newProducer(schema)
.topic(inputTopicName)
.create();
LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
for (int i = 0; i < numMessages; i++) {
String key = "key-" + i;
JdbcPostgresSinkTester.Foo obj = new JdbcPostgresSinkTester.Foo();
obj.setField1("field1_insert_" + i);
obj.setField2("field2_insert_" + i);
obj.setField3(i);
String value = new String(schema.encode(obj));
Map<String, String> properties = Maps.newHashMap();
properties.put("ACTION", "INSERT");
kvs.put(key, value);
kvs.put("ACTION", "INSERT");
producer.newMessage()
.properties(properties)
.key(key)
.value(obj)
.send();
}
return kvs;
}