下面列出了org.apache.log4j.Logger#removeAppender ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testKafkaReporter() {
TestAppender testAppender = new TestAppender();
Logger logger = LogManager.getLogger(LoggingPusher.class.getName());
logger.addAppender(testAppender);
KeyValuePusher<String, String> loggingPusher =
new LoggingPusher<String, String>("broker", "topic", Optional.absent());
loggingPusher.pushMessages(ImmutableList.of("message1", "message2"));
loggingPusher.pushKeyValueMessages(ImmutableList.of(org.apache.commons.lang3.tuple.Pair.of("key", "message3")));
Assert.assertEquals(testAppender.events.size(), 3);
Assert.assertEquals(testAppender.events.get(0).getRenderedMessage(), "Pushing to broker:topic: message1");
Assert.assertEquals(testAppender.events.get(1).getRenderedMessage(), "Pushing to broker:topic: message2");
Assert.assertEquals(testAppender.events.get(2).getRenderedMessage(), "Pushing to broker:topic: key - message3");
logger.removeAppender(testAppender);
}
@Test
public void testRevertPositiveRootLogger() {
ActiveDbAppender appender = new ActiveDbAppender();
appender.setHost("test");
appender.setDatabase("test");
appender.setUser("test");
appender.setPassword("test");
Logger log = Logger.getRootLogger();
log.addAppender(appender);
//construct the configurator - an appender is present
RemoteLoggingConfigurator remoteLoggingConfig = new RemoteLoggingConfigurator(null, -1);
//remove the appender, so the configurator will need to apply it
log.removeAppender(appender);
//apply the appender
remoteLoggingConfig.apply();
remoteLoggingConfig.revert();
assertFalse(log.getAllAppenders().hasMoreElements());
}
@Test
public void testRevertPositive() {
ActiveDbAppender appender = new ActiveDbAppender();
appender.setHost("test");
appender.setDatabase("test");
appender.setUser("test");
appender.setPassword("test");
Logger log = Logger.getLogger(loggerName);
log.addAppender(appender);
//construct the configurator - an appender is present
RemoteLoggingConfigurator remoteLoggingConfig = new RemoteLoggingConfigurator(null, -1);
//remove the appender, so the configurator will need to apply it
log.removeAppender(appender);
//apply the appender
remoteLoggingConfig.apply();
remoteLoggingConfig.revert();
assertFalse(log.getAllAppenders().hasMoreElements());
}
@Test
public void testCustomizedTaskFrameworkFailureInTaskCreation() throws Exception {
TestAppender testAppender = new TestAppender();
Logger logger = LogManager.getLogger(GobblinMultiTaskAttempt.class.getName() + "-noattempt");
logger.addAppender(testAppender);
Properties jobProperties =
GobblinLocalJobLauncherUtils.getJobProperties("runtime_test/skip_workunits_test.properties");
jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, CustomizedTaskTestSource.class.getName());
// To demonstrate failure caught in task creation in test setting, disabled retry in task creation.
jobProperties.setProperty(RETRY_TIMES, "1");
jobProperties.setProperty(RETRY_TYPE, RetryerFactory.RetryType.FIXED_ATTEMPT.name());
try {
GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
} catch (Throwable t){
// Expect to get exception, do nothing
}
Assert.assertTrue(testAppender.events.stream().anyMatch(e -> e.getRenderedMessage().contains("Could not create task for workunit")));
logger.removeAppender(testAppender);
}
@Test(dependsOnMethods = "testKafkaSingleTopicSource")
public void testTransportCreationDisabledProperty() throws InterruptedException {
receivedEventNameList = new ArrayList<>(2);
receivedValueList = new ArrayList<>(2);
SiddhiManager siddhiManager = new SiddhiManager();
Logger logger = Logger.getLogger(Source.class);
UnitTestAppender appender = new UnitTestAppender();
logger.addAppender(appender);
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(
"@App:name('TestExecutionPlan') @App:transportChannelCreationEnabled('false')" +
"define stream BarStream (symbol string, price float, volume long); " +
"@info(name = 'query1') " +
"@source(type='kafka', topic.list='single_topic', group.id='test_single_topic', " +
"threading.option='single.thread', bootstrap.servers='localhost:9092'," +
"@map(type='xml'))" +
"Define stream FooStream (symbol string, price float, volume long);" +
"from FooStream select symbol, price, volume insert into BarStream;");
siddhiAppRuntime.start();
Thread.sleep(5000);
if (appender.getMessages() != null) {
AssertJUnit.assertTrue(appender.getMessages().contains("Error on 'TestExecutionPlan'. Topic(s) " +
"single_topic creation failed. User has disabled topic creation by setting " +
"transportChannelCreationEnabled property to false. Hence Siddhi App deployment will be aborted"));
}
logger.removeAppender(appender);
}
@After
public void teardownForEachTest() {
final Logger rootLogger = Logger.getRootLogger();
if (name.getMethodName().equals("shouldPingChannelIfClientDies")||
name.getMethodName().equals("shouldCloseChannelIfClientDoesntRespond")) {
final org.apache.log4j.Logger webSocketClientHandlerLogger = org.apache.log4j.Logger.getLogger(OpSelectorHandler.class);
webSocketClientHandlerLogger.setLevel(previousLogLevel);
}
rootLogger.removeAppender(recordingAppender);
}
@After
@Override
public void tearDown() throws Exception {
super.tearDown();
final Logger rootLogger = Logger.getRootLogger();
rootLogger.removeAppender(recordingAppender);
kdcServer.close();
}
@Test(dependsOnMethods = {"incrementalStreamProcessorTest15"})
public void incrementalStreamProcessorTest16() throws InterruptedException {
LOG.info("incrementalStreamProcessorTest16");
SiddhiManager siddhiManager = new SiddhiManager();
String stockStream =
"define stream stockStream (symbol string, price float, lastClosingPrice float, volume long , " +
"quantity int, timestamp string);";
String query = "" +
"define aggregation stockAggregation " +
"from stockStream " +
"select symbol, avg(price) as avgPrice, sum(price) as totalPrice, " +
"(price * quantity) as lastTradeValue " +
"group by symbol " +
"aggregate by timestamp every sec...year ; ";
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(stockStream + query);
InputHandler stockStreamInputHandler = siddhiAppRuntime.getInputHandler("stockStream");
siddhiAppRuntime.start();
Logger logger = Logger.getLogger(StreamJunction.class);
UnitTestAppender appender = new UnitTestAppender();
logger.addAppender(appender);
try {
// Thursday, June 1, 2017 4:05:50 AM
stockStreamInputHandler.send(new Object[]{"WSO2", 50f, 60f, 90L, 6, "June 1, 2017 4:05:50 AM"});
AssertJUnit.assertTrue(appender.getMessages().contains("'June 1, 2017 4:05:50 AM' doesn't match the " +
"supported formats <yyyy>-<MM>-<dd> <HH>:<mm>:<ss> (for GMT time zone) or <yyyy>-<MM>-<dd> " +
"<HH>:<mm>:<ss> <Z> (for non GMT time zone). The ISO 8601 UTC offset must be provided for <Z> " +
"(ex. +05:30, -11:00). Exception on class 'io.siddhi.core.executor.incremental." +
"IncrementalUnixTimeFunctionExecutor'. Hence, dropping event"));
} catch (Exception e) {
Assert.fail("Unexpected exception occurred when testing.", e);
} finally {
logger.removeAppender(appender);
siddhiAppRuntime.shutdown();
}
}
@Test
public void testAppend() throws InterruptedException {
Logger logger = Logger.getLogger(getClass());
// use the Apache syslog appender to write to syslog source
SyslogAppender appender = new SyslogAppender(null,
"localhost:"+TEST_SYSLOG_PORT, SyslogAppender.LOG_FTP);
logger.addAppender(appender);
Event e = null;
Event e2 = null;
source.start();
// write to syslog
logger.info("test flume syslog");
logger.info("");
Transaction txn = channel.getTransaction();
try {
txn.begin();
e = channel.take();
e2 = channel.take();
txn.commit();
} finally {
txn.close();
}
source.stop();
logger.removeAppender(appender);
Assert.assertNotNull(e);
Assert.assertEquals(String.valueOf(SyslogAppender.LOG_FTP / 8),
e.getHeaders().get(SyslogUtils.SYSLOG_FACILITY));
Assert.assertArrayEquals(e.getBody(), "test flume syslog".getBytes());
Assert.assertNotNull(e2);
Assert.assertEquals(String.valueOf(SyslogAppender.LOG_FTP / 8),
e2.getHeaders().get(SyslogUtils.SYSLOG_FACILITY));
Assert.assertArrayEquals(e2.getBody(), "".getBytes());
}
@After
public void teardownForEachTest() {
final Logger rootLogger = Logger.getRootLogger();
if (name.getMethodName().equals("shouldCloseConnectionDeadDueToUnRecoverableError")) {
final org.apache.log4j.Logger connectionLogger = org.apache.log4j.Logger.getLogger(Connection.class);
connectionLogger.setLevel(previousLogLevel);
}
rootLogger.removeAppender(recordingAppender);
}
@Test
public void testAllLogging() throws Exception {
Logger logger = Logger.getLogger("fluo.tx");
StringWriter writer = new StringWriter();
WriterAppender appender =
new WriterAppender(new PatternLayout("%d{ISO8601} [%-8c{2}] %-5p: %m%n"), writer);
Level level = logger.getLevel();
boolean additivity = logger.getAdditivity();
try {
logger.setLevel(Level.TRACE);
logger.setAdditivity(false);
logger.addAppender(appender);
try (LoaderExecutor le = client.newLoaderExecutor()) {
le.execute(new SimpleLoader());
}
try (LoaderExecutor le = client.newLoaderExecutor()) {
le.execute(new TriggerLoader(0));
}
miniFluo.waitForObservers();
try (Snapshot snap = client.newSnapshot()) {
Assert.assertTrue(Integer.parseInt(snap.gets("all", STAT_COUNT)) >= 1);
Assert.assertEquals("1", snap.gets("r1", new Column("a", "b")));
}
String logMsgs = writer.toString();
logMsgs = logMsgs.replace('\n', ' ');
String pattern;
// simple loader should cause this pattern in logs
pattern = ".*txid: (\\d+) begin\\(\\) thread: \\d+";
pattern += ".*txid: \\1 class: org.apache.fluo.integration.log.LogIT\\$SimpleLoader";
pattern += ".*txid: \\1 get\\(r1, a b \\) -> null";
pattern += ".*txid: \\1 set\\(r1, a b , 1\\)";
pattern += ".*txid: \\1 commit\\(\\) -> SUCCESSFUL commitTs: \\d+";
pattern += ".*";
Assert.assertTrue(logMsgs.matches(pattern));
waitForClose(writer, pattern);
// trigger loader should cause this pattern in logs
pattern = ".*txid: (\\d+) begin\\(\\) thread: \\d+";
pattern += ".*txid: \\1 class: org.apache.fluo.integration.log.LogIT\\$TriggerLoader";
pattern += ".*txid: \\1 set\\(0, stat count , 1\\)";
pattern += ".*txid: \\1 setWeakNotification\\(0, stat count \\)";
pattern += ".*txid: \\1 commit\\(\\) -> SUCCESSFUL commitTs: \\d+";
pattern += ".*";
Assert.assertTrue(logMsgs.matches(pattern));
waitForClose(writer, pattern);
// observer should cause this pattern in logs
pattern = ".*txid: (\\d+) begin\\(\\) thread: \\d+";
pattern += ".*txid: \\1 trigger: 0 stat count \\d+";
pattern += ".*txid: \\1 class: org.apache.fluo.integration.log.LogIT\\$TestObserver";
pattern += ".*txid: \\1 get\\(0, stat count \\) -> 1";
pattern += ".*txid: \\1 get\\(all, stat count \\) -> null";
pattern += ".*txid: \\1 set\\(all, stat count , 1\\)";
pattern += ".*txid: \\1 commit\\(\\) -> SUCCESSFUL commitTs: \\d+";
pattern += ".*";
Assert.assertTrue(logMsgs.matches(pattern));
waitForClose(writer, pattern);
// two gets done by snapshot should cause this pattern
pattern = ".*txid: (\\d+) begin\\(\\) thread: \\d+";
pattern += ".*txid: \\1 get\\(all, stat count \\) -> 1";
pattern += ".*txid: \\1 get\\(r1, a b \\) -> 1";
pattern += ".*txid: \\1 close\\(\\).*";
Assert.assertTrue(logMsgs.matches(pattern));
} finally {
logger.removeAppender(appender);
logger.setAdditivity(additivity);
logger.setLevel(level);
}
}
@Test
public void testSummaryLogging() throws Exception {
Logger logger = Logger.getLogger("fluo.tx.summary");
StringWriter writer = new StringWriter();
WriterAppender appender = new WriterAppender(new PatternLayout("%p, %m%n"), writer);
Level level = logger.getLevel();
boolean additivity = logger.getAdditivity();
try {
logger.setLevel(Level.TRACE);
logger.setAdditivity(false);
logger.addAppender(appender);
try (LoaderExecutor le = client.newLoaderExecutor()) {
for (int i = 0; i < 20; i++) {
le.execute(new SimpleLoader());
le.execute(new TriggerLoader(i));
}
}
miniFluo.waitForObservers();
} finally {
logger.removeAppender(appender);
logger.setAdditivity(additivity);
logger.setLevel(level);
}
String logMsgs = writer.toString();
logMsgs = logMsgs.replace('\n', ' ');
Assert.assertTrue(logMsgs.matches(".*txid: \\d+ thread : \\d+ "
+ "time: \\d+ \\(\\d+ \\d+\\) #ret: 0 #set: 1 #collisions: 0 waitTime: \\d+ "
+ "committed: true class: TriggerLoader.*"));
Assert.assertTrue(logMsgs.matches(".*txid: \\d+ thread : \\d+ "
+ "time: \\d+ \\(\\d+ \\d+\\) #ret: 1 #set: 1 #collisions: 0 waitTime: \\d+ "
+ "committed: true class: SimpleLoader.*"));
Assert.assertTrue(logMsgs.matches(".*txid: \\d+ thread : \\d+ "
+ "time: \\d+ \\(\\d+ \\d+\\) #ret: 1 #set: 1 #collisions: 1 waitTime: \\d+ "
+ "committed: false class: SimpleLoader.*"));
Assert.assertTrue(logMsgs.matches(".*txid: \\d+ thread : \\d+ "
+ "time: \\d+ \\(\\d+ \\d+\\) #ret: 2 #set: 1 #collisions: 0 waitTime: \\d+ "
+ "committed: true class: TestObserver.*"));
Assert.assertTrue(logMsgs.matches(".*txid: \\d+ thread : \\d+ "
+ "time: \\d+ \\(\\d+ \\d+\\) #ret: 2 #set: 1 #collisions: 1 waitTime: \\d+ "
+ "committed: false class: TestObserver.*"));
}
@Test(dependsOnMethods = "faultStreamTest2")
public void faultStreamTest3() throws InterruptedException {
log.info("faultStreamTest3-Tests fault handling when it's set to stream. " +
"No errors would be logged since exceptions are being gracefully handled.");
SiddhiManager siddhiManager = new SiddhiManager();
siddhiManager.setExtension("custom:fault", FaultFunctionExtension.class);
String siddhiApp = "" +
"@OnError(action='stream')" +
"define stream cseEventStream (symbol string, price float, volume long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[custom:fault() > volume] " +
"select symbol, price, symbol as sym1 " +
"insert into outputStream ;";
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
siddhiAppRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
EventPrinter.print(timeStamp, inEvents, removeEvents);
count.addAndGet(inEvents.length);
eventArrived = true;
}
});
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream");
siddhiAppRuntime.start();
Logger logger = Logger.getLogger(StreamJunction.class);
UnitTestAppender appender = new UnitTestAppender();
logger.addAppender(appender);
try {
inputHandler.send(new Object[]{"IBM", 0f, 100L});
AssertJUnit.assertTrue(appender.getMessages() == null);
} catch (Exception e) {
Assert.fail("Unexpected exception occurred when testing.", e);
} finally {
logger.removeAppender(appender);
siddhiAppRuntime.shutdown();
}
AssertJUnit.assertEquals(0, count.get());
AssertJUnit.assertFalse(eventArrived);
}
@Test
public void faultStreamTest1() throws InterruptedException {
log.info("faultStreamTest1-Tests logging by default when fault handling is not configured explicitly.");
SiddhiManager siddhiManager = new SiddhiManager();
siddhiManager.setExtension("custom:fault", FaultFunctionExtension.class);
String siddhiApp = "" +
"define stream cseEventStream (symbol string, price float, volume long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[custom:fault() > volume] " +
"select symbol, price , symbol as sym1 " +
"insert into outputStream ;";
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
siddhiAppRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
EventPrinter.print(timeStamp, inEvents, removeEvents);
count.addAndGet(inEvents.length);
eventArrived = true;
}
});
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream");
siddhiAppRuntime.start();
Logger logger = Logger.getLogger(StreamJunction.class);
UnitTestAppender appender = new UnitTestAppender();
logger.addAppender(appender);
try {
inputHandler.send(new Object[]{"IBM", 0f, 100L});
AssertJUnit.assertTrue(appender.getMessages().contains("Error when running faultAdd(). " +
"Exception on class 'io.siddhi.core.stream.FaultFunctionExtension'"));
} catch (Exception e) {
Assert.fail("Unexpected exception occurred when testing.", e);
} finally {
logger.removeAppender(appender);
siddhiAppRuntime.shutdown();
}
AssertJUnit.assertEquals(0, count.get());
AssertJUnit.assertFalse(eventArrived);
}
@Test
public void testApplyPositive() {
ActiveDbAppender appender = new ActiveDbAppender();
appender.setHost("test");
appender.setDatabase("test");
appender.setUser("test");
appender.setPassword("test");
Logger log = Logger.getLogger(loggerName);
log.addAppender(appender);
//construct the configurator - an appender is present
RemoteLoggingConfigurator remoteLoggingConfig = new RemoteLoggingConfigurator(null, -1);
//remove the appender, so the configurator will need to apply it
log.removeAppender(appender);
// check if needs to be applied - this sets the internal flags
// so the next "apply" method will work as expected
assertTrue(remoteLoggingConfig.needsApplying());
boolean hasDbCheckError = false;
try {
//apply the appender
//this statement will fail, due to missing PostgreSQL or MSSQL server at localhost
remoteLoggingConfig.apply();
} catch (Exception e) {
if (!e.getCause()
.getMessage()
.contains("Neither MSSQL, nor PostgreSQL server at 'test:1433' has database with name 'test'.")) {
// an exception was caught, but its cause is not the expected one
// re-throw the exception
throw e;
} else {
// expected exception was caught
hasDbCheckError = true;
}
}
assertTrue(hasDbCheckError);
}
@Test(dependsOnMethods = "faultStreamTest3")
public void faultStreamTest4() throws InterruptedException {
log.info("faultStreamTest4-Tests fault handling when it's set to stream. " +
"Events would be available in the corresponding fault stream");
SiddhiManager siddhiManager = new SiddhiManager();
siddhiManager.setExtension("custom:fault", FaultFunctionExtension.class);
String siddhiApp = "" +
"@OnError(action='stream')" +
"define stream cseEventStream (symbol string, price float, volume long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[custom:fault() > volume] " +
"select symbol, price, symbol as sym1 " +
"insert into outputStream ;" +
"" +
"from !cseEventStream " +
"select * " +
"insert into faultStream";
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
siddhiAppRuntime.addCallback("faultStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
EventPrinter.print(events);
Assert.assertTrue(events[0].getData(3) != null);
count.addAndGet(events.length);
eventArrived = true;
}
});
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream");
siddhiAppRuntime.start();
Logger logger = Logger.getLogger(StreamJunction.class);
UnitTestAppender appender = new UnitTestAppender();
logger.addAppender(appender);
try {
inputHandler.send(new Object[]{"IBM", 0f, 100L});
AssertJUnit.assertTrue(appender.getMessages() == null);
} catch (Exception e) {
Assert.fail("Unexpected exception occurred when testing.", e);
} finally {
logger.removeAppender(appender);
siddhiAppRuntime.shutdown();
}
AssertJUnit.assertEquals(1, count.get());
AssertJUnit.assertTrue(eventArrived);
}
@Test
public void testJobMonitorAndPrint() throws Exception {
JobStatus jobStatus_1 = new JobStatus(new JobID("job_000", 1), 1f, 0.1f,
0.1f, 0f, State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname",
"tmp-queue", "tmp-jobfile", "tmp-url", true);
JobStatus jobStatus_2 = new JobStatus(new JobID("job_000", 1), 1f, 1f,
1f, 1f, State.SUCCEEDED, JobPriority.HIGH, "tmp-user", "tmp-jobname",
"tmp-queue", "tmp-jobfile", "tmp-url", true);
doAnswer(
new Answer<TaskCompletionEvent[]>() {
@Override
public TaskCompletionEvent[] answer(InvocationOnMock invocation)
throws Throwable {
return new TaskCompletionEvent[0];
}
}
).when(job).getTaskCompletionEvents(anyInt(), anyInt());
doReturn(new TaskReport[5]).when(job).getTaskReports(isA(TaskType.class));
when(clientProtocol.getJobStatus(any(JobID.class))).thenReturn(jobStatus_1, jobStatus_2);
// setup the logger to capture all logs
Layout layout =
Logger.getRootLogger().getAppender("stdout").getLayout();
ByteArrayOutputStream os = new ByteArrayOutputStream();
WriterAppender appender = new WriterAppender(layout, os);
appender.setThreshold(Level.ALL);
Logger qlogger = Logger.getLogger(Job.class);
qlogger.addAppender(appender);
job.monitorAndPrintJob();
qlogger.removeAppender(appender);
LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
String line;
boolean foundHundred = false;
boolean foundComplete = false;
boolean foundUber = false;
String uberModeMatch = "uber mode : true";
String progressMatch = "map 100% reduce 100%";
String completionMatch = "completed successfully";
while ((line = r.readLine()) != null) {
if (line.contains(uberModeMatch)) {
foundUber = true;
}
foundHundred = line.contains(progressMatch);
if (foundHundred)
break;
}
line = r.readLine();
foundComplete = line.contains(completionMatch);
assertTrue(foundUber);
assertTrue(foundHundred);
assertTrue(foundComplete);
System.out.println("The output of job.toString() is : \n" + job.toString());
assertTrue(job.toString().contains("Number of maps: 5\n"));
assertTrue(job.toString().contains("Number of reduces: 5\n"));
}
@Test
public void testCollisionLogging() throws Exception {
Logger logger = Logger.getLogger("fluo.tx.collisions");
StringWriter writer = new StringWriter();
WriterAppender appender = new WriterAppender(new PatternLayout("%p, %m%n"), writer);
Level level = logger.getLevel();
boolean additivity = logger.getAdditivity();
try {
logger.setLevel(Level.TRACE);
logger.setAdditivity(false);
logger.addAppender(appender);
try (LoaderExecutor le = client.newLoaderExecutor()) {
for (int i = 0; i < 20; i++) {
le.execute(new SimpleBinaryLoader());
le.execute(new TriggerLoader(i));
}
}
miniFluo.waitForObservers();
} finally {
logger.removeAppender(appender);
logger.setAdditivity(additivity);
logger.setLevel(level);
}
String logMsgs = writer.toString();
logMsgs = logMsgs.replace('\n', ' ');
Assert.assertFalse(logMsgs.contains("TriggerLoader"));
String pattern;
pattern = ".*txid: (\\d+) class: org.apache.fluo.integration.log.LogIT\\$SimpleBinaryLoader";
pattern += ".*txid: \\1 collisions: \\Q[r1\\x0d=[a \\x00\\x09 ]]\\E.*";
Assert.assertTrue(logMsgs.matches(pattern));
pattern = ".*txid: (\\d+) trigger: \\d+ stat count \\d+";
pattern += ".*txid: \\1 class: org.apache.fluo.integration.log.LogIT\\$TestObserver";
pattern += ".*txid: \\1 collisions: \\Q[all=[stat count ]]\\E.*";
Assert.assertTrue(logMsgs.matches(pattern));
}
@Test(dependsOnMethods = "faultStreamTest11")
public void faultStreamTest12() throws InterruptedException {
log.info("faultStreamTest12-Tests fault handling for async when it's set to log. " +
"Events would be logged and dropped during publishing failure at Sink");
SiddhiManager siddhiManager = new SiddhiManager();
String siddhiApp = "" +
"@OnError(action='stream')" +
"define stream cseEventStream (symbol string, price float, volume long);" +
"\n" +
"@sink(type='testAsyncInMemory', topic='{{symbol}}', on.error='log', @map(type='passThrough')) " +
"define stream outputStream (symbol string, price float, sym1 string);" +
"\n" +
"@info(name = 'query1') " +
"from cseEventStream " +
"select symbol, price , symbol as sym1 " +
"insert into outputStream ;" +
"";
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
EventPrinter.print(events);
Assert.assertTrue(events[0].getData(0) != null);
}
});
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream");
siddhiAppRuntime.start();
Logger logger = Logger.getLogger(Sink.class);
UnitTestAppender appender = new UnitTestAppender();
logger.addAppender(appender);
try {
inputHandler.send(new Object[]{"IBM", 0f, 100L});
Thread.sleep(6000);
AssertJUnit.assertTrue(appender.getMessages().contains("Dropping event at Sink 'testAsyncInMemory' at"));
} catch (Exception e) {
Assert.fail("Unexpected exception occurred when testing.", e);
} finally {
logger.removeAppender(appender);
siddhiAppRuntime.shutdown();
}
}
/**
*
* Attaches a {@link TestAppender} to a {@link Logger} and ensures that it is the only
* test appender attached to the logger.
*
* @param logger The logger which will be monitored by the test
* @param testAppender The test appender to attach to {@code logger}
*/
public static void safeAddAppender(Logger logger, TestAppender testAppender) {
logger.removeAppender(APPENDER_NAME);
logger.addAppender(testAppender);
}