下面列出了com.google.protobuf.Descriptors.DescriptorValidationException#org.apache.nifi.processor.ProcessContext 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@OnScheduled
public void onScheduled(final ProcessContext context) throws ProcessException {
this.receiveBufferSize = context.getProperty(RECEIVE_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
this.originalServerAddressList = context.getProperty(ENDPOINT_LIST).getValue();
this.endOfMessageByte = ((byte) context.getProperty(END_OF_MESSAGE_BYTE).asInteger().intValue());
this.connectionAttemptCount = context.getProperty(CONNECTION_ATTEMPT_COUNT).asInteger();
this.reconnectInterval = context.getProperty(RECONNECT_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
this.clientScheduler = new ScheduledThreadPoolExecutor(originalServerAddressList.split(",").length + 1);
this.clientScheduler.setKeepAliveTime(10, TimeUnit.SECONDS);
this.clientScheduler.allowCoreThreadTimeOut(true);
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
if (descriptor.isDynamic()) {
this.dynamicAttributes.put(descriptor.getName(), entry.getValue());
}
}
}
@OnScheduled
public void OnScheduled(final ProcessContext context) {
// Configure jackson mapper before spawning onTriggers
final SimpleModule module = new SimpleModule()
.addSerializer(MacAddress.class, new MacAddressToStringSerializer());
mapper.registerModule(module);
mapper.setDateFormat(this.simpleDateFormat);
switch (context.getProperty(TIME_REPRESENTATION).getValue()) {
case LOCAL_TZ:
// set the mapper TZ to local TZ
mapper.setTimeZone(TimeZone.getDefault());
tzId = TimeZone.getDefault().getID();
break;
case UTC:
// set the mapper TZ to local TZ
mapper.setTimeZone(TimeZone.getTimeZone(UTC));
tzId = UTC;
break;
}
}
/**
* Method used to configure the 'sasl.jaas.config' property based on KAFKA-4259<br />
* https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+configuration+for+Kafka+clients<br />
* <br />
* It expects something with the following format: <br />
* <br />
* <LoginModuleClass> <ControlFlag> *(<OptionName>=<OptionValue>); <br />
* ControlFlag = required / requisite / sufficient / optional
*
* @param mapToPopulate Map of configuration properties
* @param context Context
*/
private static void setJaasConfig(Map<String, Object> mapToPopulate, ProcessContext context) {
String keytab = context.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
String principal = context.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
// If the Kerberos Credentials Service is specified, we need to use its configuration, not the explicit properties for principal/keytab.
// The customValidate method ensures that only one can be set, so we know that the principal & keytab above are null.
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
if (credentialsService != null) {
principal = credentialsService.getPrincipal();
keytab = credentialsService.getKeytab();
}
String serviceName = context.getProperty(JAAS_SERVICE_NAME).evaluateAttributeExpressions().getValue();
if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) {
mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required "
+ "useTicketCache=false "
+ "renewTicket=true "
+ "serviceName=\"" + serviceName + "\" "
+ "useKeyTab=true "
+ "keyTab=\"" + keytab + "\" "
+ "principal=\"" + principal + "\";");
}
}
/**
* Register subscriber via native call
*
* @param context the process context
*/
private String subscribe(ProcessContext context) throws URISyntaxException {
String channel = context.getProperty(CHANNEL).getValue();
String query = context.getProperty(QUERY).getValue();
renderedXMLs = new LinkedBlockingQueue<>(context.getProperty(MAX_EVENT_QUEUE_SIZE).asInteger());
provenanceUri = new URI("winlog", name, "/" + channel, query, null).toASCIIString();
evtSubscribeCallback = new EventSubscribeXmlRenderingCallback(getLogger(), s -> {
try {
renderedXMLs.put(s);
} catch (InterruptedException e) {
throw new IllegalStateException("Got interrupted while waiting to add to queue.", e);
}
}, context.getProperty(MAX_BUFFER_SIZE).asInteger(), wEvtApi, kernel32, errorLookup);
subscriptionHandle = wEvtApi.EvtSubscribe(null, null, channel, query, null, null,
evtSubscribeCallback, WEvtApi.EvtSubscribeFlags.SUBSCRIBE_TO_FUTURE | WEvtApi.EvtSubscribeFlags.EVT_SUBSCRIBE_STRICT);
if (!isSubscribed()) {
return UNABLE_TO_SUBSCRIBE + errorLookup.getLastError();
}
return null;
}
/**
* Adapted from HBASEUtils. Their approach seemed ideal for what our intent is here.
* @param columnFamily column family from which to extract the visibility or to execute an expression against
* @param columnQualifier column qualifier from which to extract the visibility or to execute an expression against
* @param flowFile flow file being written
* @param context process context
* @return
*/
public static String produceVisibility(String columnFamily, String columnQualifier, FlowFile flowFile, ProcessContext context) {
if (org.apache.commons.lang3.StringUtils.isNotEmpty(columnFamily)) {
return null;
}
String lookupKey = String.format("visibility.%s%s%s", columnFamily, !org.apache.commons.lang3.StringUtils.isNotEmpty(columnQualifier) ? "." : "", columnQualifier);
String fromAttribute = flowFile.getAttribute(lookupKey);
if (fromAttribute == null && !org.apache.commons.lang3.StringUtils.isBlank(columnQualifier)) {
String lookupKeyFam = String.format("visibility.%s", columnFamily);
fromAttribute = flowFile.getAttribute(lookupKeyFam);
}
if (fromAttribute != null) {
return fromAttribute;
} else {
PropertyValue descriptor = context.getProperty(lookupKey);
if (descriptor == null || !descriptor.isSet()) {
descriptor = context.getProperty(String.format("visibility.%s", columnFamily));
}
String retVal = descriptor != null ? descriptor.evaluateAttributeExpressions(flowFile).getValue() : null;
return retVal;
}
}
@Override
protected Tuple<Map<String, RecordPath>, RecordPath> getFlowFileContext(final FlowFile flowFile, final ProcessContext context) {
final Map<String, RecordPath> recordPaths = new HashMap<>();
for (final PropertyDescriptor prop : context.getProperties().keySet()) {
if (!prop.isDynamic()) {
continue;
}
final String pathText = context.getProperty(prop).evaluateAttributeExpressions(flowFile).getValue();
final RecordPath lookupRecordPath = recordPathCache.getCompiled(pathText);
recordPaths.put(prop.getName(), lookupRecordPath);
}
final RecordPath resultRecordPath;
if (context.getProperty(RESULT_RECORD_PATH).isSet()) {
final String resultPathText = context.getProperty(RESULT_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
resultRecordPath = recordPathCache.getCompiled(resultPathText);
} else {
resultRecordPath = null;
}
return new Tuple<>(recordPaths, resultRecordPath);
}
protected void initializeRegionAndEndpoint(ProcessContext context) {
// if the processor supports REGION, get the configured region.
if (getSupportedPropertyDescriptors().contains(REGION)) {
final String region = context.getProperty(REGION).getValue();
if (region != null) {
this.region = Region.getRegion(Regions.fromName(region));
client.setRegion(this.region);
} else {
this.region = null;
}
}
// if the endpoint override has been configured, set the endpoint.
// (per Amazon docs this should only be configured at client creation)
final String urlstr = StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).getValue());
if (!urlstr.isEmpty()) {
this.client.setEndpoint(urlstr);
}
}
protected AWSCredentials getCredentials(final ProcessContext context) {
final String accessKey = context.getProperty(ACCESS_KEY).evaluateAttributeExpressions().getValue();
final String secretKey = context.getProperty(SECRET_KEY).evaluateAttributeExpressions().getValue();
final String credentialsFile = context.getProperty(CREDENTIALS_FILE).getValue();
if (credentialsFile != null) {
try {
return new PropertiesCredentials(new File(credentialsFile));
} catch (final IOException ioe) {
throw new ProcessException("Could not read Credentials File", ioe);
}
}
if (accessKey != null && secretKey != null) {
return new BasicAWSCredentials(accessKey, secretKey);
}
return new AnonymousAWSCredentials();
}
/**
* Will construct AMQP message by extracting its body from the incoming
* {@link FlowFile}. AMQP Properties will be extracted from the
* {@link FlowFile} and converted to {@link BasicProperties} to be sent
* along with the message. Upon success the incoming {@link FlowFile} is
* transferred to 'success' {@link Relationship} and upon failure FlowFile is
* penalized and transferred to the 'failure' {@link Relationship}
* <br>
* NOTE: Attributes extracted from {@link FlowFile} are considered
* candidates for AMQP properties if their names are prefixed with
* {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml)
*
*/
@Override
protected void rendezvousWithAmqp(ProcessContext context, ProcessSession processSession) throws ProcessException {
FlowFile flowFile = processSession.get();
if (flowFile != null) {
BasicProperties amqpProperties = this.extractAmqpPropertiesFromFlowFile(flowFile);
String routingKey = context.getProperty(ROUTING_KEY).evaluateAttributeExpressions(flowFile).getValue();
if (routingKey == null){
throw new IllegalArgumentException("Failed to determine 'routing key' with provided value '"
+ context.getProperty(ROUTING_KEY) + "' after evaluating it as expression against incoming FlowFile.");
}
String exchange = context.getProperty(EXCHANGE).evaluateAttributeExpressions(flowFile).getValue();
byte[] messageContent = this.extractMessage(flowFile, processSession);
try {
this.targetResource.publish(messageContent, amqpProperties, routingKey, exchange);
processSession.transfer(flowFile, REL_SUCCESS);
processSession.getProvenanceReporter().send(flowFile, this.amqpConnection.toString() + "/E:" + exchange + "/RK:" + routingKey);
} catch (Exception e) {
processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
this.getLogger().error("Failed while sending message to AMQP via " + this.targetResource, e);
context.yield();
}
}
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
final ComponentLog logger = getLogger();
final Integer cacheSize = context.getProperty(CACHE_SIZE).asInteger();
final Long cacheTTL = context.getProperty(CACHE_TTL_AFTER_LAST_ACCESS).asTimePeriod(TimeUnit.SECONDS);
if (cacheSize > 0) {
CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder().maximumSize(cacheSize);
if (cacheTTL > 0) {
cacheBuilder = cacheBuilder.expireAfterAccess(cacheTTL, TimeUnit.SECONDS);
}
cache = cacheBuilder.build(
new CacheLoader<String, Templates>() {
@Override
public Templates load(String path) throws TransformerConfigurationException, LookupFailureException {
return newTemplates(context, path);
}
});
} else {
cache = null;
logger.info("Stylesheet cache disabled because cache size is set to 0");
}
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
fileFilterRef.set(createFileFilter(context));
includeFileAttributes = context.getProperty(INCLUDE_FILE_ATTRIBUTES).asBoolean();
final long maxDiskOperationMillis = context.getProperty(MAX_DISK_OPERATION_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
final long maxListingMillis = context.getProperty(MAX_LISTING_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
final boolean trackPerformance = context.getProperty(TRACK_PERFORMANCE).asBoolean();
if (trackPerformance) {
final int maxEntries = context.getProperty(MAX_TRACKED_FILES).evaluateAttributeExpressions().asInteger();
performanceTracker = new RollingMetricPerformanceTracker(getLogger(), maxDiskOperationMillis, maxEntries);
} else {
performanceTracker = new UntrackedPerformanceTracker(getLogger(), maxDiskOperationMillis);
}
final long millisToKeepStats = TimeUnit.MINUTES.toMillis(15);
final MonitorActiveTasks monitorTask = new MonitorActiveTasks(performanceTracker, getLogger(), maxDiskOperationMillis, maxListingMillis, millisToKeepStats);
monitoringFuture = monitoringThreadPool.scheduleAtFixedRate(monitorTask, 15, 15, TimeUnit.SECONDS);
}
@Override
protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<LumberjackEvent> events) throws IOException {
final EventFactory<LumberjackEvent> eventFactory = new LumberjackEventFactory();
final ChannelHandlerFactory<LumberjackEvent, AsyncChannelDispatcher> handlerFactory = new LumberjackSocketChannelHandlerFactory<>();
final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger();
final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
// initialize the buffer pool based on max number of connections and the buffer size
final BlockingQueue<ByteBuffer> bufferPool = createBufferPool(maxConnections, bufferSize);
// if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
SSLContext sslContext = null;
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null) {
sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
}
// if we decide to support SSL then get the context and pass it in here
return new SocketChannelDispatcher<>(eventFactory, handlerFactory, bufferPool, events,
getLogger(), maxConnections, sslContext, charSet);
}
/**
*
*/
protected FlowFile doTransform(ProcessContext context, ProcessSession session, FlowFile flowFile, InvocationContextProperties contextProperties) {
AtomicReference<Map<String, String>> attributeRef = new AtomicReference<Map<String, String>>();
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
attributeRef.set(transform(in, null, contextProperties));
}
});
if (attributeRef.get() != null) {
flowFile = session.putAllAttributes(flowFile, attributeRef.get());
}
return flowFile;
}
private Set<String> findOldFlowFileIds(final ProcessContext ctx) {
final Set<String> old = new HashSet<>();
final long expiryMillis = ctx.getProperty(MAX_UNCONFIRMED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
final long cutoffTime = System.currentTimeMillis() - expiryMillis;
for (final Map.Entry<String, FlowFileEntryTimeWrapper> entry : flowFileMap.entrySet()) {
final FlowFileEntryTimeWrapper wrapper = entry.getValue();
if (wrapper != null && wrapper.getEntryTime() < cutoffTime) {
old.add(entry.getKey());
}
}
return old;
}
protected void createKerberosUserAndOrKuduClient(ProcessContext context) throws LoginException {
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
if (credentialsService != null) {
kerberosUser = createKerberosKeytabUser(credentialsService.getPrincipal(), credentialsService.getKeytab(), context);
kerberosUser.login(); // login creates the kudu client as well
} else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
kerberosUser = createKerberosPasswordUser(kerberosPrincipal, kerberosPassword, context);
kerberosUser.login(); // login creates the kudu client as well
} else {
createKuduClient(context);
}
}
@Test
public void validateGetErrorMessages() throws Exception {
String groupName = "validateGetErrorMessages";
when(mockConsumerPool.obtainConsumer(any(), any())).thenReturn(mockLease);
when(mockLease.continuePolling()).thenReturn(true, false);
when(mockLease.commit()).thenReturn(Boolean.FALSE);
ConsumeKafka_0_11 proc = new ConsumeKafka_0_11() {
@Override
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return mockConsumerPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_0_11.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_0_11.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_11.AUTO_OFFSET_RESET, ConsumeKafka_0_11.OFFSET_EARLIEST);
runner.run(1, false);
verify(mockConsumerPool, times(1)).obtainConsumer(any(), any());
verify(mockLease, times(2)).continuePolling();
verify(mockLease, times(1)).poll();
verify(mockLease, times(1)).commit();
verify(mockLease, times(1)).close();
verifyNoMoreInteractions(mockConsumerPool);
verifyNoMoreInteractions(mockLease);
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
final String filterRegex = context.getProperty(DICTIONARY_FILTER).getValue();
this.dictionaryFilterPattern = (filterRegex == null) ? null : Pattern.compile(filterRegex);
final String attributeRegex = context.getProperty(ATTRIBUTE_PATTERN).getValue();
this.attributePattern = (attributeRegex.equals(".*")) ? null : Pattern.compile(attributeRegex);
this.dictionaryTerms = createDictionary(context);
this.fileWatcher = new SynchronousFileWatcher(Paths.get(context.getProperty(DICTIONARY_FILE).evaluateAttributeExpressions().getValue()), new LastModifiedMonitor(), 1000L);
}
private void onTrigger(final ProcessContext context, final ProcessSession session) {
readLock.lock();
try {
Set<Relationship> available = context.getAvailableRelationships();
int iterations = 0;
while (!available.isEmpty()) {
final List<FlowFile> flowFiles = session.get(1000);
if (flowFiles.isEmpty()) {
break;
}
session.transfer(flowFiles, Relationship.ANONYMOUS);
session.commit();
// If there are fewer than 1,000 FlowFiles available to transfer, or if we
// have hit the configured FlowFile cap, we want to stop. This prevents us from
// holding the Timer-Driven Thread for an excessive amount of time.
if (flowFiles.size() < 1000 || ++iterations >= maxIterations) {
break;
}
available = context.getAvailableRelationships();
}
} finally {
readLock.unlock();
}
}
@Test
public void validateGetAllMessages() throws Exception {
String groupName = "validateGetAllMessages";
when(mockConsumerPool.obtainConsumer(any(), any())).thenReturn(mockLease);
when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
when(mockLease.commit()).thenReturn(Boolean.TRUE);
ConsumeKafka_0_11 proc = new ConsumeKafka_0_11() {
@Override
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return mockConsumerPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_0_11.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_0_11.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_11.AUTO_OFFSET_RESET, ConsumeKafka_0_11.OFFSET_EARLIEST);
runner.run(1, false);
verify(mockConsumerPool, times(1)).obtainConsumer(any(), any());
verify(mockLease, times(3)).continuePolling();
verify(mockLease, times(2)).poll();
verify(mockLease, times(1)).commit();
verify(mockLease, times(1)).close();
verifyNoMoreInteractions(mockConsumerPool);
verifyNoMoreInteractions(mockLease);
}
protected MongoCollection<Document> getCollection(final ProcessContext context, final FlowFile flowFile) {
final String collectionName = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isEmpty(collectionName)) {
throw new ProcessException("Collection name was empty after expression language evaluation.");
}
return getDatabase(context, flowFile).getCollection(collectionName);
}
/**
* Builds {@link PublishingContext} for message(s) to be sent to Kafka.
* {@link PublishingContext} contains all contextual information required by
* {@link KafkaPublisher} to publish to Kafka. Such information contains
* things like topic name, content stream, delimiter, key and last ACKed
* message for cases where provided FlowFile is being retried (failed in the
* past). <br>
* For the clean FlowFile (file that has been sent for the first time),
* PublishingContext will be built form {@link ProcessContext} associated
* with this invocation. <br>
* For the failed FlowFile, {@link PublishingContext} will be built from
* attributes of that FlowFile which by then will already contain required
* information (e.g., topic, key, delimiter etc.). This is required to
* ensure the affinity of the retry in the even where processor
* configuration has changed. However keep in mind that failed FlowFile is
* only considered a failed FlowFile if it is being re-processed by the same
* processor (determined via {@link #FAILED_PROC_ID_ATTR}, see
* {@link #isFailedFlowFile(FlowFile)}). If failed FlowFile is being sent to
* another PublishKafka processor it is treated as a fresh FlowFile
* regardless if it has #FAILED* attributes set.
*/
private PublishingContext buildPublishingContext(FlowFile flowFile, ProcessContext context,
InputStream contentStream) {
String topicName;
byte[] keyBytes;
byte[] delimiterBytes = null;
int lastAckedMessageIndex = -1;
if (this.isFailedFlowFile(flowFile)) {
lastAckedMessageIndex = Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX));
topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR);
keyBytes = flowFile.getAttribute(FAILED_KEY_ATTR) != null
? flowFile.getAttribute(FAILED_KEY_ATTR).getBytes(StandardCharsets.UTF_8) : null;
delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != null
? flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : null;
} else {
topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
String _key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
keyBytes = _key == null ? null : _key.getBytes(StandardCharsets.UTF_8);
delimiterBytes = context.getProperty(MESSAGE_DELIMITER).isSet() ? context.getProperty(MESSAGE_DELIMITER)
.evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null;
}
PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex);
publishingContext.setKeyBytes(keyBytes);
publishingContext.setDelimiterBytes(delimiterBytes);
publishingContext.setPartitionId(this.determinePartition(context, flowFile));
return publishingContext;
}
@Override
@OnScheduled
public void onScheduled(ProcessContext context) throws IOException {
super.onScheduled(context);
final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
messageDemarcatorBytes = msgDemarcator.getBytes(charset);
}
@OnRemoved
public void onRemoved(final ProcessContext context) {
final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
if (client != null) {
clearState(client);
}
}
public static Connection createConnection(final ProcessContext context, final String clientId) throws JMSException {
Objects.requireNonNull(context);
Objects.requireNonNull(clientId);
final ConnectionFactory connectionFactory = createConnectionFactory(context);
final String username = context.getProperty(USERNAME).getValue();
final String password = context.getProperty(PASSWORD).getValue();
final Connection connection = (username == null && password == null) ? connectionFactory.createConnection() : connectionFactory.createConnection(username, password);
connection.setClientID(clientId);
connection.start();
return connection;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile input = session.get();
if (input == null) {
return;
}
final String deleteQuery = getQuery(context, input);
final String queryAttribute = context.getProperty(QUERY_ATTRIBUTE).isSet()
? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
: null;
GridFSBucket bucket = getBucket(input, context);
try {
Document query = Document.parse(deleteQuery);
MongoCursor cursor = bucket.find(query).iterator();
if (cursor.hasNext()) {
GridFSFile file = (GridFSFile)cursor.next();
bucket.delete(file.getObjectId());
if (!StringUtils.isEmpty(queryAttribute)) {
input = session.putAttribute(input, queryAttribute, deleteQuery);
}
session.transfer(input, REL_SUCCESS);
} else {
getLogger().error(String.format("Query %s did not delete anything in %s", deleteQuery, bucket.getBucketName()));
session.transfer(input, REL_FAILURE);
}
cursor.close();
} catch (Exception ex) {
getLogger().error(String.format("Error deleting using query: %s", deleteQuery), ex);
session.transfer(input, REL_FAILURE);
}
}
@OnScheduled
public void setup(ProcessContext context) {
// If the query is not set, then an incoming flow file is needed. Otherwise fail the initialization
if (!context.getProperty(HIVEQL_SELECT_QUERY).isSet() && !context.hasIncomingConnection()) {
final String errorString = "Either the Select Query must be specified or there must be an incoming connection "
+ "providing flowfile(s) containing a SQL select query";
getLogger().error(errorString);
throw new ProcessException(errorString);
}
}
/**
* Add failed flow file attributes
* @param flowFiles all flow files
* @param failedFlowFiles list of failed flow files
* @param session process session
* @param context the process context
* @return failed flow files with updated attributes
*/
protected List<FlowFile> updateFailedFlowFileAttributes(
List<FlowFile> flowFiles,
List<FlowFile> failedFlowFiles, ProcessSession session, ProcessContext context) {
int flowFileCount = flowFiles.size();
int flowFileFailed = failedFlowFiles.size();
List<FlowFile> updatedFailedFlowFiles = new ArrayList<>();
for (int i = 0; i < flowFileFailed; i++) {
FlowFile flowFile = failedFlowFiles.get(i);
Map<String,String> attributes = new HashMap<>();
attributes.put(IGNITE_BATCH_FLOW_FILE_FAILED_ITEM_NUMBER, Integer.toString(i));
attributes.put(IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, Integer.toString(flowFileCount));
attributes.put(IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, Integer.toString(flowFiles.indexOf(flowFile)));
attributes.put(IGNITE_BATCH_FLOW_FILE_FAILED_COUNT, Integer.toString(flowFileFailed));
String key = context.getProperty(IGNITE_CACHE_ENTRY_KEY).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isEmpty(key)) {
attributes.put(IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY,
IGNITE_BATCH_FLOW_FILE_FAILED_MISSING_KEY_MESSAGE);
} else if (flowFile.getSize() == 0) {
attributes.put(IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY,
IGNITE_BATCH_FLOW_FILE_FAILED_ZERO_SIZE_MESSAGE);
} else {
throw new ProcessException("Unknown reason for failing file: " + flowFile);
}
flowFile = session.putAllAttributes(flowFile, attributes);
updatedFailedFlowFiles.add(flowFile);
}
return updatedFailedFlowFiles;
}
@Override
protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<BeatsEvent> events) throws IOException {
final EventFactory<BeatsEvent> eventFactory = new BeatsEventFactory();
final ChannelHandlerFactory<BeatsEvent, AsyncChannelDispatcher> handlerFactory = new BeatsSocketChannelHandlerFactory<>();
final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger();
final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
// initialize the buffer pool based on max number of connections and the buffer size
final BlockingQueue<ByteBuffer> bufferPool = createBufferPool(maxConnections, bufferSize);
// if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
SSLContext sslContext = null;
SslContextFactory.ClientAuth clientAuth = null;
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null) {
final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
sslContext = sslContextService.createSSLContext(SslContextFactory.ClientAuth.valueOf(clientAuthValue));
clientAuth = SslContextFactory.ClientAuth.valueOf(clientAuthValue);
}
// if we decide to support SSL then get the context and pass it in here
return new SocketChannelDispatcher<>(eventFactory, handlerFactory, bufferPool, events,
getLogger(), maxConnections, sslContext, clientAuth, charSet);
}
/**
* Create client using aws credentials provider. This is the preferred way for creating clients
*/
@Override
protected AmazonKinesisFirehoseClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
getLogger().info("Creating client using aws credentials provider");
return new AmazonKinesisFirehoseClient(credentialsProvider, config);
}
private Publisher.Builder getPublisherBuilder(ProcessContext context) {
final Long batchSize = context.getProperty(BATCH_SIZE).asLong();
return Publisher.newBuilder(getTopicName(context))
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
.setBatchingSettings(BatchingSettings.newBuilder()
.setElementCountThreshold(batchSize)
.setIsEnabled(true)
.build());
}