下面列出了com.google.protobuf.Descriptors.DescriptorValidationException#org.apache.nifi.processor.exception.ProcessException 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Connection getConnection() throws ProcessException {
try {
if (ugi != null) {
return ugi.doAs(new PrivilegedExceptionAction<Connection>() {
@Override
public Connection run() throws Exception {
return dataSource.getConnection();
}
});
} else {
getLogger().info("Simple Authentication");
return dataSource.getConnection();
}
} catch (SQLException | IOException | InterruptedException e) {
getLogger().error("Error getting Hive connection", e);
throw new ProcessException(e);
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
if (source instanceof PollableSource) {
super.onTrigger(context, sessionFactory);
} else if (source instanceof EventDrivenSource) {
ProcessSessionFactory old = sessionFactoryRef.getAndSet(sessionFactory);
if (old != sessionFactory) {
if (runnerRef.get() != null) {
stopped();
sessionFactoryRef.set(sessionFactory);
}
runnerRef.set(new EventDrivenSourceRunner());
eventDrivenSourceChannelRef.set(new NifiSessionFactoryChannel(sessionFactoryRef.get(), SUCCESS));
eventDrivenSourceChannelRef.get().start();
source.setChannelProcessor(new ChannelProcessor(
new NifiChannelSelector(eventDrivenSourceChannelRef.get())));
runnerRef.get().setSource(source);
runnerRef.get().start();
}
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final List<FlowFile> flowFiles = session.get(50);
if (flowFiles.isEmpty()) {
return;
}
final ComponentLog logger = getLogger();
try {
if (fileWatcher.checkAndReset()) {
this.dictionaryTerms = createDictionary(context);
}
} catch (final IOException e) {
logger.error("Unable to reload dictionary due to {}", e);
}
final boolean matchAll = context.getProperty(MATCHING_CRITERIA).getValue().equals(MATCH_CRITERIA_ALL);
for (final FlowFile flowFile : flowFiles) {
final boolean matched = matchAll ? allMatch(flowFile, attributePattern, dictionaryTerms) : anyMatch(flowFile, attributePattern, dictionaryTerms);
final Relationship relationship = matched ? REL_MATCHED : REL_UNMATCHED;
session.getProvenanceReporter().route(flowFile, relationship);
session.transfer(flowFile, relationship);
logger.info("Transferred {} to {}", new Object[]{flowFile, relationship});
}
}
/**
* Will rendezvous with Kafka if {@link ProcessSession} contains {@link FlowFile}
* producing a result {@link FlowFile}.
* <br>
* The result {@link FlowFile} that is successful is then transferred to {@link #REL_SUCCESS}
* <br>
* The result {@link FlowFile} that is failed is then transferred to {@link #REL_FAILURE}
*
*/
@Override
protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) throws ProcessException {
boolean processed = false;
FlowFile flowFile = session.get();
if (flowFile != null) {
flowFile = this.doRendezvousWithKafka(flowFile, context, session);
if (!this.isFailedFlowFile(flowFile)) {
session.getProvenanceReporter().send(flowFile,
context.getProperty(SEED_BROKERS).getValue() + "/"
+ context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue());
session.transfer(flowFile, REL_SUCCESS);
} else {
session.transfer(session.penalize(flowFile), REL_FAILURE);
}
processed = true;
}
return processed;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile != null) {
try {
InvocationContextProperties contextProperties = new InvocationContextProperties(context, flowFile);
flowFile = this.doTransform(context, session, flowFile, contextProperties);
session.transfer(flowFile, REL_SUCCESS);
} catch (Exception e) {
this.getLogger().error("Failed FlowFile processing, routing to failure. Issue: " + e.getMessage(), e);
session.transfer(flowFile, REL_FAILURE);
}
} else {
context.yield();
}
}
/**
* Cleanup
*/
@OnStopped
public void stop() {
if (isSubscribed()) {
wEvtApi.EvtClose(subscriptionHandle);
}
subscriptionHandle = null;
evtSubscribeCallback = null;
if (!renderedXMLs.isEmpty()) {
if (sessionFactory != null) {
getLogger().info("Finishing processing leftover events");
ProcessSession session = sessionFactory.createSession();
processQueue(session);
} else {
throw new ProcessException("Stopping the processor but there is no ProcessSessionFactory stored and there are messages in the internal queue. Removing the processor now will " +
"clear the queue but will result in DATA LOSS. This is normally due to starting the processor, receiving events and stopping before the onTrigger happens. The messages " +
"in the internal queue cannot finish processing until until the processor is triggered to run.");
}
}
sessionFactory = null;
provenanceUri = null;
renderedXMLs = null;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ComponentLog logger = getLogger();
WrappedMessageConsumer wrappedConsumer = consumerQueue.poll();
if (wrappedConsumer == null) {
try {
wrappedConsumer = JmsFactory.createQueueMessageConsumer(context);
} catch (JMSException e) {
logger.error("Failed to connect to JMS Server due to {}", e);
context.yield();
return;
}
}
try {
super.consume(context, session, wrappedConsumer);
} finally {
if (!wrappedConsumer.isClosed()) {
consumerQueue.offer(wrappedConsumer);
}
}
}
protected AWSCredentials getCredentials(final ProcessContext context) {
final String accessKey = context.getProperty(ACCESS_KEY).evaluateAttributeExpressions().getValue();
final String secretKey = context.getProperty(SECRET_KEY).evaluateAttributeExpressions().getValue();
final String credentialsFile = context.getProperty(CREDENTIALS_FILE).getValue();
if (credentialsFile != null) {
try {
return new PropertiesCredentials(new File(credentialsFile));
} catch (final IOException ioe) {
throw new ProcessException("Could not read Credentials File", ioe);
}
}
if (accessKey != null && secretKey != null) {
return new BasicAWSCredentials(accessKey, secretKey);
}
return new AnonymousAWSCredentials();
}
/**
* Will construct AMQP message by extracting its body from the incoming
* {@link FlowFile}. AMQP Properties will be extracted from the
* {@link FlowFile} and converted to {@link BasicProperties} to be sent
* along with the message. Upon success the incoming {@link FlowFile} is
* transferred to 'success' {@link Relationship} and upon failure FlowFile is
* penalized and transferred to the 'failure' {@link Relationship}
* <br>
* NOTE: Attributes extracted from {@link FlowFile} are considered
* candidates for AMQP properties if their names are prefixed with
* {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml)
*
*/
@Override
protected void rendezvousWithAmqp(ProcessContext context, ProcessSession processSession) throws ProcessException {
FlowFile flowFile = processSession.get();
if (flowFile != null) {
BasicProperties amqpProperties = this.extractAmqpPropertiesFromFlowFile(flowFile);
String routingKey = context.getProperty(ROUTING_KEY).evaluateAttributeExpressions(flowFile).getValue();
if (routingKey == null){
throw new IllegalArgumentException("Failed to determine 'routing key' with provided value '"
+ context.getProperty(ROUTING_KEY) + "' after evaluating it as expression against incoming FlowFile.");
}
String exchange = context.getProperty(EXCHANGE).evaluateAttributeExpressions(flowFile).getValue();
byte[] messageContent = this.extractMessage(flowFile, processSession);
try {
this.targetResource.publish(messageContent, amqpProperties, routingKey, exchange);
processSession.transfer(flowFile, REL_SUCCESS);
processSession.getProvenanceReporter().send(flowFile, this.amqpConnection.toString() + "/E:" + exchange + "/RK:" + routingKey);
} catch (Exception e) {
processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
this.getLogger().error("Failed while sending message to AMQP via " + this.targetResource, e);
context.yield();
}
}
}
@Test(expected = ProcessException.class)
public void testCreateElasticsearch5ClientWithException() throws ProcessException {
FetchElasticsearch5TestProcessor processor = new FetchElasticsearch5TestProcessor(true) {
@Override
protected Client getTransportClient(Settings.Builder settingsBuilder, String xPackPath,
String username, String password,
List<InetSocketAddress> esHosts, ComponentLog log)
throws MalformedURLException {
throw new MalformedURLException();
}
};
MockProcessContext context = new MockProcessContext(processor);
processor.initialize(new MockProcessorInitializationContext(processor, context));
processor.callCreateElasticsearchClient(context);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
try {
Long currTime = System.currentTimeMillis();
if(microBatchTime == null){
noMicroBatch(context, session, flowFile, currTime);
} else{
microBatch(context, session, flowFile, currTime);
}
} catch (Exception e) {
getLogger().error("Ran into an error while processing {}.", new Object[] { flowFile}, e);
session.transfer(flowFile, REL_FAILURE);
}
}
protected void sendMessage(final byte[] buffer) throws ProcessException {
final EventHubClient sender = senderQueue.poll();
if(null != sender) {
try {
sender.sendSync(new EventData(buffer));
} catch (final ServiceBusException sbe) {
throw new ProcessException("Caught exception trying to send message to eventbus", sbe);
} finally {
senderQueue.offer(sender);
}
}else{
throw new ProcessException("No EventHubClients are configured for sending");
}
}
@OnScheduled
public void onScheduled(ProcessContext context) throws ProcessException {
if (batchSize == -1) {
batchSize = context.getProperty(BATCH_SIZE).asInteger();
}
if (riemannClient == null || !riemannClient.isConnected()) {
transport = Transport.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
String host = context.getProperty(RIEMANN_HOST).getValue().trim();
int port = context.getProperty(RIEMANN_PORT).asInteger();
writeTimeout = context.getProperty(TIMEOUT).asLong();
RiemannClient client = null;
try {
switch (transport) {
case TCP:
client = RiemannClient.tcp(host, port);
break;
case UDP:
client = RiemannClient.udp(host, port);
break;
}
client.connect();
riemannClient = client;
} catch (IOException e) {
if (client != null) {
client.close();
}
context.yield();
throw new ProcessException(String.format("Unable to connect to Riemann [%s:%d] (%s)\n%s", host, port, transport, e.getMessage()));
}
}
if (customAttributes.size() == 0) {
for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {
// only custom defined properties
if (!getSupportedPropertyDescriptors().contains(property.getKey())) {
customAttributes.add(property.getKey());
}
}
}
}
@Override
protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
client = mock(OkHttpClient.class);
when(client.newCall(any(Request.class))).thenAnswer(new Answer<Call>() {
@Override
public Call answer(InvocationOnMock invocationOnMock) throws Throwable {
Request realRequest = (Request) invocationOnMock.getArguments()[0];
StringBuilder sb = new StringBuilder("{\"_index\":\"randomuser.me\",\"_type\":\"user\",\"_id\":\"0\",\"_version\":2,");
if (documentExists) {
sb.append("\"found\":true,\"_source\":{\"gender\":\"female\",\"name\":{\"title\":\"Ms\",\"first\":\"Joan\",\"last\":\"Smith\"}}");
} else {
sb.append("\"found\": false");
}
sb.append("}");
Response mockResponse = new Response.Builder()
.request(realRequest)
.protocol(Protocol.HTTP_1_1)
.code(statusCode)
.message(statusMessage)
.body(ResponseBody.create(MediaType.parse("application/json"), sb.toString()))
.build();
final Call call = mock(Call.class);
when(call.execute()).thenReturn(mockResponse);
return call;
}
});
}
@Override
public Connection getConnection() throws ProcessException {
try {
if (++successful > allowedBeforeFailure) {
final Connection conn = Mockito.mock(Connection.class);
Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new SQLException("Unit Test Generated SQLException"));
return conn;
} else {
return service.getConnection();
}
} catch (final Exception e) {
e.printStackTrace();
throw new ProcessException("getConnection failed: " + e);
}
}
@Test
public void testInsertQuotedTableIdentifier() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
runner.setProperty(ConvertJSONToSQL.QUOTED_TABLE_IDENTIFIER, "true");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.1.value", "1");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.2.value", "Mark");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "48");
out.assertContentEquals("INSERT INTO \"PERSONS\" (ID, NAME, CODE) VALUES (?, ?, ?)");
}
private boolean evaluateCondition(final ProcessContext context, final Condition condition, final FlowFile flowfile, final Map<String, String> statefulAttributes) {
try {
// evaluate the expression for the given flow file
return getPropertyValue(condition.getExpression(), context).evaluateAttributeExpressions(flowfile, null, null, statefulAttributes).asBoolean();
} catch (final ProcessException pe) {
throw new ProcessException(String.format("Unable to evaluate condition '%s': %s.", condition.getExpression(), pe), pe);
}
}
protected Cipher getInitializedCipher(EncryptionMethod encryptionMethod, String password, byte[] salt, byte[] iv, int keyLength, boolean encryptMode) throws Exception {
if (encryptionMethod == null) {
throw new IllegalArgumentException("The encryption method must be specified");
}
if (!encryptionMethod.isCompatibleWithStrongKDFs()) {
throw new IllegalArgumentException(encryptionMethod.name() + " is not compatible with Scrypt");
}
if (StringUtils.isEmpty(password)) {
throw new IllegalArgumentException("Encryption with an empty password is not supported");
}
String algorithm = encryptionMethod.getAlgorithm();
final String cipherName = CipherUtility.parseCipherFromAlgorithm(algorithm);
if (!CipherUtility.isValidKeyLength(keyLength, cipherName)) {
throw new IllegalArgumentException(String.valueOf(keyLength) + " is not a valid key length for " + cipherName);
}
String scryptSalt = formatSaltForScrypt(salt);
List<Integer> params = new ArrayList<>(3);
byte[] rawSalt = new byte[Scrypt.getDefaultSaltLength()];
parseSalt(scryptSalt, rawSalt, params);
String hash = Scrypt.scrypt(password, rawSalt, params.get(0), params.get(1), params.get(2), keyLength);
// Split out the derived key from the hash and form a key object
final String[] hashComponents = hash.split("\\$");
final int HASH_INDEX = 4;
if (hashComponents.length < HASH_INDEX) {
throw new ProcessException("There was an error generating a scrypt hash -- the resulting hash was not properly formatted");
}
byte[] keyBytes = Base64.decodeBase64(hashComponents[HASH_INDEX]);
SecretKey tempKey = new SecretKeySpec(keyBytes, algorithm);
KeyedCipherProvider keyedCipherProvider = new AESKeyedCipherProvider();
return keyedCipherProvider.getCipher(encryptionMethod, tempKey, iv, encryptMode);
}
@Test
public void testUpdateWithMissingColumnIgnore() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code, extra");
runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Ignore Unmatched Columns");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.1.value", "1");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.2.value", "Mark");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "48");
out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?");
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
if (smtp == null) {
try {
final SMTPServer server = prepareServer(context, sessionFactory);
server.start();
smtp = server;
} catch (final Exception ex) {//have to catch exception due to awkward exception handling in subethasmtp
smtp = null;
getLogger().error("Unable to start SMTP server due to " + ex.getMessage(), ex);
}
}
context.yield();//nothing really to do here since threading managed by smtp server sessions
}
@Override
public void process(InputStream in, OutputStream out) throws IOException {
try {
PGPKeyEncryptionMethodGenerator encryptionMethodGenerator = new JcePBEKeyEncryptionMethodGenerator(password).setProvider(provider);
org.apache.nifi.processors.standard.util.PGPUtil.encrypt(in, out, algorithm, provider, PGPEncryptedData.AES_128, filename, encryptionMethodGenerator);
} catch (Exception e) {
throw new ProcessException(e.getMessage());
}
}
@Test
public void testAppendToChildThrowsIOExceptionThenRemove() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.build();
flowFileQueue.put(flowFileRecord);
FlowFile original = session.get();
assertNotNull(original);
FlowFile child = session.create(original);
child = session.append(child, out -> out.write("hello".getBytes()));
// Force an IOException. This will decrement out claim count for the resource claim.
try {
child = session.append(child, out -> {
throw new IOException();
});
Assert.fail("append() callback threw IOException but it was not wrapped in ProcessException");
} catch (final ProcessException pe) {
// expected
}
session.remove(child);
session.transfer(original);
session.commit();
final int numClaims = contentRepo.getExistingClaims().size();
assertEquals(0, numClaims);
}
/**
* event handler method to handle the FlowFile being forwarded to the Processor by the framework. The FlowFile contents is sent out over a TCP connection using an acquired ChannelSender object. If
* the FlowFile contents was sent out successfully then the FlowFile is forwarded to the success relationship. If an error occurred then the FlowFile is forwarded to the failure relationship.
*
* @param context
* - the current process context.
*
* @param sessionFactory
* - a factory object to obtain a process session.
*/
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
final ProcessSession session = sessionFactory.createSession();
final FlowFile flowFile = session.get();
if (flowFile == null) {
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
context.yield();
return;
}
ChannelSender sender = acquireSender(context, session, flowFile);
if (sender == null) {
return;
}
try {
String outgoingMessageDelimiter = getOutgoingMessageDelimiter(context, flowFile);
ByteArrayOutputStream content = readContent(session, flowFile);
if (outgoingMessageDelimiter != null) {
Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
content = appendDelimiter(content, outgoingMessageDelimiter, charset);
}
StopWatch stopWatch = new StopWatch(true);
sender.send(content.toByteArray());
session.getProvenanceReporter().send(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
session.commit();
} catch (Exception e) {
onFailure(context, session, flowFile);
getLogger().error("Exception while handling a process session, transferring {} to failure.", new Object[] { flowFile }, e);
} finally {
// If we are going to use this sender again, then relinquish it back to the pool.
if (!isConnectionPerFlowFile(context)) {
relinquishSender(sender);
} else {
sender.close();
}
}
}
@Override
public void remove(final FlowFile flowFile) {
validateState(flowFile);
final Iterator<MockFlowFile> penalizedItr = penalized.iterator();
while (penalizedItr.hasNext()) {
final MockFlowFile ff = penalizedItr.next();
if (Objects.equals(ff.getId(), flowFile.getId())) {
penalizedItr.remove();
penalized.remove(ff);
break;
}
}
final Iterator<Long> processedItr = beingProcessed.iterator();
while (processedItr.hasNext()) {
final Long ffId = processedItr.next();
if (ffId != null && ffId.equals(flowFile.getId())) {
processedItr.remove();
beingProcessed.remove(ffId);
removedFlowFiles.add(flowFile.getId());
currentVersions.remove(ffId);
return;
}
}
throw new ProcessException(flowFile + " not found in queue");
}
private void trigger(final Connectable worker, final ScheduleState scheduleState, final ConnectableProcessContext processContext, final ProcessSessionFactory sessionFactory) {
final int newThreadCount = scheduleState.incrementActiveThreadCount();
if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) {
// its possible that the worker queue could give us a worker node that is eligible to run based
// on the number of threads but another thread has already incremented the thread count, result in
// reaching the maximum number of threads. we won't know this until we atomically increment the thread count
// on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would
// result in using more than the maximum number of defined threads
scheduleState.decrementActiveThreadCount();
return;
}
try {
try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(worker.getClass(), worker.getIdentifier())) {
worker.onTrigger(processContext, sessionFactory);
} catch (final ProcessException pe) {
logger.error("{} failed to process session due to {}", worker, pe.toString());
} catch (final Throwable t) {
logger.error("{} failed to process session due to {}", worker, t.toString());
logger.error("", t);
logger.warn("{} Administratively Pausing for {} due to processing failure: {}", worker, getAdministrativeYieldDuration(), t.toString());
logger.warn("", t);
try {
Thread.sleep(FormatUtils.getTimeDuration(adminYieldDuration, TimeUnit.MILLISECONDS));
} catch (final InterruptedException e) {
}
}
} finally {
if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(worker.getClass(), worker.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, worker, processContext);
}
}
scheduleState.decrementActiveThreadCount();
}
}
@Override
public Connection getConnection() throws ProcessException {
try {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
final Connection conn = DriverManager.getConnection("jdbc:derby:" + dbLocation + ";create=true");
return conn;
} catch (final Exception e) {
e.printStackTrace();
throw new ProcessException("getConnection failed: " + e);
}
}
@Override
public MockFlowFile write(final FlowFile flowFile, final StreamCallback callback) {
validateState(flowFile);
if (callback == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null");
}
if (!(flowFile instanceof MockFlowFile)) {
throw new IllegalArgumentException("Cannot export a flow file that I did not create");
}
final MockFlowFile mock = (MockFlowFile) flowFile;
final ByteArrayInputStream in = new ByteArrayInputStream(mock.getData());
final ByteArrayOutputStream out = new ByteArrayOutputStream();
recursionSet.add(flowFile);
try {
callback.process(in, out);
} catch (final IOException e) {
throw new ProcessException(e.toString(), e);
} finally {
recursionSet.remove(flowFile);
}
final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
currentVersions.put(newFlowFile.getId(), newFlowFile);
newFlowFile.setData(out.toByteArray());
return newFlowFile;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
try {
// be slow
Thread.sleep(50);
// make sure we are still scheduled
if (isScheduled()) {
// increment counter
++counter;
}
} catch (InterruptedException e) {
}
}
@Test
public void testUpdateBasedOnPrimaryKey() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.1.value", "Mark");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.2.value", "48");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "1");
out.assertContentEquals("UPDATE PERSONS SET NAME = ?, CODE = ? WHERE ID = ?");
}
@Test
public void testInsertQuotedIdentifiers() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
runner.setProperty(ConvertJSONToSQL.QUOTED_IDENTIFIERS, "true");
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.1.value", "1");
out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
out.assertAttributeEquals("sql.args.2.value", "Mark");
out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
out.assertAttributeEquals("sql.args.3.value", "48");
out.assertContentEquals("INSERT INTO PERSONS (\"ID\", \"NAME\", \"CODE\") VALUES (?, ?, ?)");
}