下面列出了org.apache.log4j.Logger#addAppender ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testAppendInClose() throws Exception {
final ContainerLogAppender claAppender = new ContainerLogAppender();
claAppender.setName("testCLA");
claAppender.setLayout(new PatternLayout("%-5p [%t]: %m%n"));
claAppender.setContainerLogDir("target/testAppendInClose/logDir");
claAppender.setContainerLogFile("syslog");
claAppender.setTotalLogFileSize(1000);
claAppender.activateOptions();
final Logger claLog = Logger.getLogger("testAppendInClose-catergory");
claLog.setAdditivity(false);
claLog.addAppender(claAppender);
claLog.info(new Object() {
public String toString() {
claLog.info("message1");
return "return message1";
}
});
claAppender.close();
}
/**
* Setup log4j logging
*
* @param logFile log file
*/
public static void setupLogging(File logFile) {
Logger root = Logger.getRootLogger();
ConsoleAppender console = new ConsoleAppender(new PatternLayout("[%t] %m%n"));
console.setThreshold(Level.INFO);
root.addAppender(console);
if (logFile != null) {
try {
FileAppender file = new FileAppender(new PatternLayout("%d{dd-MMM-yy HH:mm:ss.SSS} [%t] %-5p %c{2}> %m%n"), logFile.getPath(), false);
file.setThreshold(Level.DEBUG);
root.addAppender(file);
} catch (IOException e) {
sLog.fatal("Unable to configure logging, reason: " + e.getMessage(), e);
}
}
}
private static void configureLog4j() {
// Log4j configuration for selection process (if enabled)
if (PAResourceManagerProperties.RM_SELECTION_LOGS_LOCATION.isSet()) {
String logsLocation = PAResourceManagerProperties.getAbsolutePath(PAResourceManagerProperties.RM_SELECTION_LOGS_LOCATION.getValueAsString());
boolean cleanStart = PAResourceManagerProperties.RM_DB_HIBERNATE_DROPDB.getValueAsBoolean();
if (cleanStart) {
// removing selection logs directory
logger.info("Removing logs " + logsLocation);
FileUtils.removeDir(new File(logsLocation));
}
Logger selectionLogger = Logger.getLogger(SelectionManager.class.getPackage().getName());
MultipleFileAppender appender = new MultipleFileAppender();
if (PAResourceManagerProperties.RM_SELECTION_LOGS_MAX_SIZE.isSet()) {
appender.setMaxFileSize(PAResourceManagerProperties.RM_SELECTION_LOGS_MAX_SIZE.getValueAsString());
}
appender.setFilesLocation(logsLocation);
selectionLogger.addAppender(appender);
}
}
@Test
void testNoExceptionWithJavaFile() throws Exception {
final TestLogAppender appender = new TestLogAppender();
final Logger logger = Logger.getRootLogger();
logger.addAppender(appender);
File f = new File("src/test/resources/workspace/camel.java");
try (FileInputStream fis = new FileInputStream(f)) {
CamelLanguageServer camelLanguageServer = initializeLanguageServer(fis, ".java");
CompletableFuture<List<Either<SymbolInformation,DocumentSymbol>>> documentSymbolFor = getDocumentSymbolFor(camelLanguageServer);
List<Either<SymbolInformation, DocumentSymbol>> symbolsInformation = documentSymbolFor.get();
assertThat(symbolsInformation).isEmpty();
for (LoggingEvent loggingEvent : appender.getLog()) {
if (loggingEvent.getMessage() != null) {
assertThat((String)loggingEvent.getMessage()).doesNotContain(DocumentSymbolProcessor.CANNOT_DETERMINE_DOCUMENT_SYMBOLS);
}
}
}
}
public Logger createLogger(Writer writer) {
TestLoggerRepository repo = new TestLoggerRepository();
Logger logger = repo.getLogger("test");
Log4Json layout = new Log4Json();
WriterAppender appender = new WriterAppender(layout, writer);
logger.addAppender(appender);
return logger;
}
/**
* Tests append method under normal conditions.
*/
public void testAppend() {
SyslogAppender appender = new SyslogAppender();
appender.setName("foo");
appender.setThreshold(Level.INFO);
appender.setSyslogHost("localhost");
appender.setFacility("user");
appender.setLayout(new PatternLayout("%m%n"));
VectorErrorHandler errorHandler = new VectorErrorHandler();
appender.setErrorHandler(errorHandler);
appender.activateOptions();
//
// wrap SyslogAppender with an Async since appender may
// hang if syslogd is not accepting network messages
//
AsyncAppender asyncAppender = new AsyncAppender();
asyncAppender.addAppender(appender);
asyncAppender.activateOptions();
Logger logger = Logger.getRootLogger();
logger.addAppender(asyncAppender);
Exception e =
new Exception("Expected exception from SyslogAppenderTest.testAppend");
logger.info(
"Expected message from log4j unit test SyslogAppenderTest.testAppend.", e);
assertEquals(0, errorHandler.size());
}
public static void initializeDefaultConsoleLogger(Level logLevel) {
Logger root = Logger.getRootLogger();
root.removeAllAppenders();
PatternLayout layout = new PatternLayout("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
root.addAppender(appender);
root.setLevel(logLevel);
}
@Before
public void setup() {
final Logger root = Logger.getRootLogger();
root.addAppender(mockAppender);
root.setLevel(Level.WARN);
}
@Test(timeout=20000)
public void testWarnCommandOpts() throws Exception {
Logger logger = Logger.getLogger(YARNRunner.class);
ByteArrayOutputStream bout = new ByteArrayOutputStream();
Layout layout = new SimpleLayout();
Appender appender = new WriterAppender(layout, bout);
logger.addAppender(appender);
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, "-Djava.net.preferIPv4Stack=true -Djava.library.path=foo");
jobConf.set(MRJobConfig.MR_AM_COMMAND_OPTS, "-Xmx1024m -Djava.library.path=bar");
YARNRunner yarnRunner = new YARNRunner(jobConf);
@SuppressWarnings("unused")
ApplicationSubmissionContext submissionContext =
buildSubmitContext(yarnRunner, jobConf);
String logMsg = bout.toString();
assertTrue(logMsg.contains("WARN - Usage of -Djava.library.path in " +
"yarn.app.mapreduce.am.admin-command-opts can cause programs to no " +
"longer function if hadoop native libraries are used. These values " +
"should be set as part of the LD_LIBRARY_PATH in the app master JVM " +
"env using yarn.app.mapreduce.am.admin.user.env config settings."));
assertTrue(logMsg.contains("WARN - Usage of -Djava.library.path in " +
"yarn.app.mapreduce.am.command-opts can cause programs to no longer " +
"function if hadoop native libraries are used. These values should " +
"be set as part of the LD_LIBRARY_PATH in the app master JVM env " +
"using yarn.app.mapreduce.am.env config settings."));
}
private static void addConsoleOutput(Logger logger) {
BasicConfigurator.configure();
ConsoleAppender appender = new ConsoleAppender(new PatternLayout());
logger.addAppender(appender);
appender.setTarget(ConsoleAppender.SYSTEM_OUT);
appender.activateOptions();
}
/**
* Since the legacy join compare logic is non-deterministic, it is not safe to build any expected test results on the
* generated SQL. The fallback is to validate that the legacy code path is traversed when the "legacy_join_order"
* boolean set to true in the model. To do this, the test verifies that logging output is as expected.
*
* @throws PentahoMetadataException
*/
@Test
public void testLegacyJoinOrderLogic() throws PentahoMetadataException {
Logger logger = Logger.getLogger( SQLJoin.class.getName() );
ByteArrayOutputStream out = new ByteArrayOutputStream();
Appender appender = new WriterAppender( new SimpleLayout(), out );
logger.addAppender( appender );
try {
RelationshipType[] typesToTest = new RelationshipType[] { RelationshipType._0_N, RelationshipType._1_1 };
for ( RelationshipType firstRel : typesToTest ) {
for ( RelationshipType secondRel : typesToTest ) {
final LogicalModel model = new LogicalModel();
model.setId( "model_01" );
Category mainCat = new Category();
mainCat.setId( "cat_01" );
model.getCategories().add( mainCat );
LogicalTable[] tables = getTablesWithRelationships( firstRel, secondRel, mainCat, model );
DatabaseMeta databaseMeta = new DatabaseMeta( "", "ORACLE", "Native", "", "", "", "", "" );
Query myTest = new Query( null, model );
myTest.getSelections().add( new Selection( null, tables[ 0 ].getLogicalColumns().get( 0 ), null ) );
myTest.getSelections().add( new Selection( null, tables[ 1 ].getLogicalColumns().get( 0 ), null ) );
myTest.getSelections().add( new Selection( null, tables[ 2 ].getLogicalColumns().get( 0 ), null ) );
SqlGenerator generator = new SqlGenerator();
// first verify the legacy logic is not used if the property is not set
generator.generateSql( myTest, "en_US", null, databaseMeta );
Assert.assertTrue( "Did not expect to use the legacy SQLJoin.compareTo() logic.", !out.toString().contains(
"Using legacy SQLJoin compare." ) );
// set the property and make sure the legacy logic is used
model.setProperty( "legacy_join_order", true );
generator.generateSql( myTest, "en_US", null, databaseMeta );
Assert.assertTrue( "Should have used legacy SQLJoin.compareTo() logic.", out.toString().contains(
"Using legacy SQLJoin compare." ) );
out.reset(); // clear out accumulated logs for next run
}
}
} finally {
logger.removeAppender( appender );
}
}
@Test
public void testCollisionLogging() throws Exception {
Logger logger = Logger.getLogger("fluo.tx.collisions");
StringWriter writer = new StringWriter();
WriterAppender appender = new WriterAppender(new PatternLayout("%p, %m%n"), writer);
Level level = logger.getLevel();
boolean additivity = logger.getAdditivity();
try {
logger.setLevel(Level.TRACE);
logger.setAdditivity(false);
logger.addAppender(appender);
try (LoaderExecutor le = client.newLoaderExecutor()) {
for (int i = 0; i < 20; i++) {
le.execute(new SimpleBinaryLoader());
le.execute(new TriggerLoader(i));
}
}
miniFluo.waitForObservers();
} finally {
logger.removeAppender(appender);
logger.setAdditivity(additivity);
logger.setLevel(level);
}
String logMsgs = writer.toString();
logMsgs = logMsgs.replace('\n', ' ');
Assert.assertFalse(logMsgs.contains("TriggerLoader"));
String pattern;
pattern = ".*txid: (\\d+) class: org.apache.fluo.integration.log.LogIT\\$SimpleBinaryLoader";
pattern += ".*txid: \\1 collisions: \\Q[r1\\x0d=[a \\x00\\x09 ]]\\E.*";
Assert.assertTrue(logMsgs.matches(pattern));
pattern = ".*txid: (\\d+) trigger: \\d+ stat count \\d+";
pattern += ".*txid: \\1 class: org.apache.fluo.integration.log.LogIT\\$TestObserver";
pattern += ".*txid: \\1 collisions: \\Q[all=[stat count ]]\\E.*";
Assert.assertTrue(logMsgs.matches(pattern));
}
@Test(description = "cacheLFUTestCase4") // 1 primary key & LFU & update func
public void cacheLFUTestCase4() throws InterruptedException, SQLException {
final TestAppenderToValidateLogsForCachingTests appender = new TestAppenderToValidateLogsForCachingTests();
final Logger logger = Logger.getRootLogger();
logger.setLevel(Level.DEBUG);
logger.addAppender(appender);
SiddhiManager siddhiManager = new SiddhiManager();
String streams = "" +
"define stream StockStream (symbol string, price float, volume long); " +
"define stream UpdateStockStream (symbol string, price float, volume long); " +
"@Store(type=\"testStoreForCacheMiss\", @Cache(size=\"2\", cache.policy=\"LFU\"))\n" +
"@PrimaryKey(\'symbol\') " +
"define table StockTable (symbol string, price float, volume long); ";
String query = "" +
"@info(name = 'query1') " +
"from StockStream " +
"insert into StockTable ;" +
"" +
"@info(name = 'query2') " +
"from UpdateStockStream\n" +
"select symbol, price, volume\n" +
"update StockTable\n" +
"on (StockTable.symbol == symbol);";
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query);
siddhiAppRuntime.addCallback("query2", new QueryCallback() {
@Override
public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) {
EventPrinter.print(timestamp, inEvents, removeEvents);
if (inEvents != null) {
for (Event event : inEvents) {
inEventCount++;
switch (inEventCount) {
case 1:
Assert.assertEquals(event.getData(), new Object[]{"WSO2", 66.5f, 3L});
break;
}
}
eventArrived = true;
}
}
});
InputHandler stockStream = siddhiAppRuntime.getInputHandler("StockStream");
InputHandler updateStockStream = siddhiAppRuntime.getInputHandler("UpdateStockStream");
siddhiAppRuntime.start();
stockStream.send(new Object[]{"WSO2", 55.6f, 1L});
Thread.sleep(10);
stockStream.send(new Object[]{"APPLE", 75.6f, 2L});
Thread.sleep(10);
updateStockStream.send(new Object[]{"WSO2", 66.5f, 3L});
Thread.sleep(10);
stockStream.send(new Object[]{"CISCO", 86.6f, 5L});
Event[] events = siddhiAppRuntime.query("" +
"from StockTable " +
"on symbol == \"APPLE\" ");
EventPrinter.print(events);
AssertJUnit.assertEquals(1, events.length);
final List<LoggingEvent> log = appender.getLog();
List<String> logMessages = new ArrayList<>();
for (LoggingEvent logEvent : log) {
String message = String.valueOf(logEvent.getMessage());
if (message.contains(":")) {
message = message.split(": ")[1];
}
logMessages.add(message);
}
Assert.assertEquals(logMessages.
contains("store table size is smaller than max cache. Sending results from cache"), false);
Assert.assertEquals(logMessages.contains("store table size is bigger than cache."), true);
Assert.assertEquals(Collections.frequency(logMessages, "store table size is bigger than cache."), 1);
Assert.assertEquals(logMessages.contains("cache constraints satisfied. Checking cache"), true);
Assert.assertEquals(Collections.frequency(logMessages, "cache constraints satisfied. Checking cache"), 1);
Assert.assertEquals(logMessages.contains("cache hit. Sending results from cache"), false);
Assert.assertEquals(logMessages.contains("cache miss. Loading from store"), true);
Assert.assertEquals(Collections.frequency(logMessages, "cache miss. Loading from store"), 1);
Assert.assertEquals(logMessages.contains("store also miss. sending null"), false);
Assert.assertEquals(logMessages.contains("sending results from cache after loading from store"), true);
Assert.assertEquals(Collections.frequency(logMessages, "sending results from cache after loading from store"),
1);
Assert.assertEquals(logMessages.contains("sending results from store"), false);
siddhiAppRuntime.shutdown();
}
private void testImageChecksum(boolean compress) throws Exception {
MiniDFSCluster cluster = null;
if (compress) {
config.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY, true);
}
try {
LOG.info("\n===========================================\n" +
"Starting empty cluster");
cluster = new MiniDFSCluster.Builder(config)
.numDataNodes(0)
.format(true)
.build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
fs.mkdirs(new Path("/test"));
LOG.info("Shutting down cluster #1");
cluster.shutdown();
cluster = null;
// Corrupt the md5 files in all the namedirs
corruptFSImageMD5(true);
// Attach our own log appender so we can verify output
final LogVerificationAppender appender = new LogVerificationAppender();
final Logger logger = Logger.getRootLogger();
logger.addAppender(appender);
// Try to start a new cluster
LOG.info("\n===========================================\n" +
"Starting same cluster after simulated crash");
try {
cluster = new MiniDFSCluster.Builder(config)
.numDataNodes(0)
.format(false)
.build();
fail("Should not have successfully started with corrupt image");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains(
"Failed to load an FSImage file!", ioe);
int md5failures = appender.countExceptionsWithMessage(
" is corrupt with MD5 checksum of ");
// Two namedirs, so should have seen two failures
assertEquals(2, md5failures);
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test(description = "cacheLFUTestCase2") // 1 primary key & LFU & cointains api (in)
public void cacheLFUTestCase2() throws InterruptedException, SQLException {
final TestAppenderToValidateLogsForCachingTests appender = new TestAppenderToValidateLogsForCachingTests();
final Logger logger = Logger.getRootLogger();
logger.setLevel(Level.DEBUG);
logger.addAppender(appender);
SiddhiManager siddhiManager = new SiddhiManager();
String streams = "" +
"define stream StockStream (symbol string, price float, volume long); " +
"define stream DeleteStockStream (symbol string, price float, volume long); " +
"define stream CheckInStockStream (symbol string); " +
"@Store(type=\"testStoreForCacheMiss\", @Cache(size=\"2\", cache.policy=\"LFU\"))\n" +
"@PrimaryKey(\'symbol\') " +
"define table StockTable (symbol string, price float, volume long); ";
String query = "" +
"@info(name = 'query1') " +
"from StockStream " +
"insert into StockTable ;" +
"" +
"@info(name = 'query2') " +
"from DeleteStockStream " +
"delete StockTable " +
" on StockTable.symbol == symbol AND StockTable.price == price AND StockTable.volume == volume;" +
"" +
"@info(name = 'query3') " +
"from CheckInStockStream[StockTable.symbol == symbol in StockTable]\n" +
"insert into OutputStream ;";
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query);
siddhiAppRuntime.addCallback("query3", new QueryCallback() {
@Override
public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) {
EventPrinter.print(timestamp, inEvents, removeEvents);
if (inEvents != null) {
for (Event event : inEvents) {
inEventCount++;
switch (inEventCount) {
case 1:
Assert.assertEquals(event.getData(), new Object[]{"WSO2"});
break;
}
}
eventArrived = true;
}
}
});
InputHandler stockStream = siddhiAppRuntime.getInputHandler("StockStream");
InputHandler deleteStockStream = siddhiAppRuntime.getInputHandler("DeleteStockStream");
InputHandler checkInStockStream = siddhiAppRuntime.getInputHandler("CheckInStockStream");
siddhiAppRuntime.start();
stockStream.send(new Object[]{"WSO2", 55.6f, 1L});
Thread.sleep(10);
stockStream.send(new Object[]{"APPLE", 75.6f, 4L});
Thread.sleep(10);
checkInStockStream.send(new Object[]{"WSO2"});
siddhiAppRuntime.shutdown();
}
@Test(description = "cacheLRUTestCase1", dependsOnMethods = {"cacheLRUTestCase0"})
// using query api and 1 primary key & LRu
public void cacheLRUTestCase1() throws InterruptedException, SQLException {
final TestAppenderToValidateLogsForCachingTests appender = new TestAppenderToValidateLogsForCachingTests();
final Logger logger = Logger.getRootLogger();
logger.setLevel(Level.DEBUG);
logger.addAppender(appender);
SiddhiManager siddhiManager = new SiddhiManager();
String streams = "" +
"define stream StockStream (symbol string, price float, volume long); " +
"define stream DeleteStockStream (symbol string, price float, volume long); " +
"@Store(type=\"testStoreForCacheMiss\", @Cache(size=\"2\", cache.policy=\"LRU\"))\n" +
"@PrimaryKey(\'symbol\') " +
"define table StockTable (symbol string, price float, volume long); ";
String query = "" +
"@info(name = 'query1') " +
"from StockStream " +
"insert into StockTable ;" +
"" +
"@info(name = 'query2') " +
"from DeleteStockStream " +
"delete StockTable " +
" on StockTable.symbol == symbol AND StockTable.price == price AND StockTable.volume == volume;";
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query);
InputHandler stockStream = siddhiAppRuntime.getInputHandler("StockStream");
InputHandler deleteStockStream = siddhiAppRuntime.getInputHandler("DeleteStockStream");
siddhiAppRuntime.start();
deleteStockStream.send(new Object[]{"WSO2", 55.6f, 1L});
deleteStockStream.send(new Object[]{"IBM", 75.6f, 2L});
stockStream.send(new Object[]{"CISCO", 75.6f, 3L});
Thread.sleep(10);
stockStream.send(new Object[]{"APPLE", 75.6f, 4L});
Thread.sleep(1000);
Event[] events = siddhiAppRuntime.query("" +
"from StockTable " +
"on symbol == \"WSO2\" ");
EventPrinter.print(events);
AssertJUnit.assertEquals(1, events.length);
final List<LoggingEvent> log = appender.getLog();
List<String> logMessages = new ArrayList<>();
for (LoggingEvent logEvent : log) {
String message = String.valueOf(logEvent.getMessage());
if (message.contains(":")) {
message = message.split(": ")[1];
}
logMessages.add(message);
}
Assert.assertEquals(logMessages.
contains("store table size is smaller than max cache. Sending results from cache"), false);
Assert.assertEquals(logMessages.contains("store table size is bigger than cache."), true);
Assert.assertEquals(Collections.frequency(logMessages, "store table size is bigger than cache."), 1);
Assert.assertEquals(logMessages.contains("cache constraints satisfied. Checking cache"), true);
Assert.assertEquals(Collections.frequency(logMessages, "cache constraints satisfied. Checking cache"), 1);
Assert.assertEquals(logMessages.contains("cache hit. Sending results from cache"), false);
Assert.assertEquals(logMessages.contains("cache miss. Loading from store"), true);
Assert.assertEquals(Collections.frequency(logMessages, "cache miss. Loading from store"), 1);
Assert.assertEquals(logMessages.contains("store also miss. sending null"), false);
Assert.assertEquals(logMessages.contains("sending results from cache after loading from store"), true);
Assert.assertEquals(Collections.frequency(logMessages, "sending results from cache after loading from store"),
1);
Assert.assertEquals(logMessages.contains("sending results from store"), false);
siddhiAppRuntime.shutdown();
}
@Test(description = "cacheLRUTestCase3", dependsOnMethods = {"cacheLRUTestCase2"})
// 2 primary keys & LRu & cointains api (in)
public void cacheLRUTestCase3() throws InterruptedException, SQLException {
final TestAppenderToValidateLogsForCachingTests appender = new TestAppenderToValidateLogsForCachingTests();
final Logger logger = Logger.getRootLogger();
logger.setLevel(Level.DEBUG);
logger.addAppender(appender);
SiddhiManager siddhiManager = new SiddhiManager();
String streams = "" +
"define stream StockStream (symbol string, price float, volume long); " +
"define stream DeleteStockStream (symbol string, price float, volume long); " +
"define stream CheckInStockStream (symbol string, price float); " +
"@Store(type=\"testStoreForCacheMiss\", @Cache(size=\"2\", cache.policy=\"LRU\"))\n" +
"@PrimaryKey(\'symbol\', \'price\') " +
"define table StockTable (symbol string, price float, volume long); ";
String query = "" +
"@info(name = 'query1') " +
"from StockStream " +
"insert into StockTable ;" +
"" +
"@info(name = 'query2') " +
"from DeleteStockStream " +
"delete StockTable " +
" on StockTable.symbol == symbol AND StockTable.price == price AND StockTable.volume == volume;" +
"" +
"@info(name = 'query3') " +
"from CheckInStockStream[(StockTable.symbol == symbol AND StockTable.price == price) in StockTable]\n" +
"insert into OutputStream ;";
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query);
siddhiAppRuntime.addCallback("query3", new QueryCallback() {
@Override
public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) {
EventPrinter.print(timestamp, inEvents, removeEvents);
if (inEvents != null) {
for (Event event : inEvents) {
inEventCount++;
switch (inEventCount) {
case 1:
Assert.assertEquals(event.getData(), new Object[]{"WSO2", 55.6f});
break;
}
}
eventArrived = true;
}
}
});
InputHandler stockStream = siddhiAppRuntime.getInputHandler("StockStream");
InputHandler deleteStockStream = siddhiAppRuntime.getInputHandler("DeleteStockStream");
InputHandler checkInStockStream = siddhiAppRuntime.getInputHandler("CheckInStockStream");
siddhiAppRuntime.start();
stockStream.send(new Object[]{"WSO2", 55.6f, 1L});
Thread.sleep(10);
stockStream.send(new Object[]{"APPLE", 75.6f, 4L});
Thread.sleep(10);
checkInStockStream.send(new Object[]{"WSO2", 55.6f});
Thread.sleep(10);
stockStream.send(new Object[]{"CISCO", 86.6f, 5L});
Thread.sleep(10);
Event[] events = siddhiAppRuntime.query("" +
"from StockTable " +
"on symbol == \"APPLE\" AND price == 75.6f ");
EventPrinter.print(events);
AssertJUnit.assertEquals(1, events.length);
final List<LoggingEvent> log = appender.getLog();
List<String> logMessages = new ArrayList<>();
for (LoggingEvent logEvent : log) {
String message = String.valueOf(logEvent.getMessage());
if (message.contains(":")) {
message = message.split(": ")[1];
}
logMessages.add(message);
}
Assert.assertEquals(logMessages.
contains("store table size is smaller than max cache. Sending results from cache"), false);
Assert.assertEquals(logMessages.contains("store table size is bigger than cache."), true);
Assert.assertEquals(Collections.frequency(logMessages, "store table size is bigger than cache."), 1);
Assert.assertEquals(logMessages.contains("cache constraints satisfied. Checking cache"), true);
Assert.assertEquals(Collections.frequency(logMessages, "cache constraints satisfied. Checking cache"), 1);
Assert.assertEquals(logMessages.contains("cache hit. Sending results from cache"), false);
Assert.assertEquals(logMessages.contains("cache miss. Loading from store"), true);
Assert.assertEquals(Collections.frequency(logMessages, "cache miss. Loading from store"), 1);
Assert.assertEquals(logMessages.contains("store also miss. sending null"), false);
Assert.assertEquals(logMessages.contains("sending results from cache after loading from store"), true);
Assert.assertEquals(Collections.frequency(logMessages, "sending results from cache after loading from store"),
1);
Assert.assertEquals(logMessages.contains("sending results from store"), false);
siddhiAppRuntime.shutdown();
}
@Test(description = "cacheLRUTestCase0") // using query api and 2 primary keys & LRu
public void cacheLRUTestCase0() throws InterruptedException, SQLException {
final TestAppenderToValidateLogsForCachingTests appender = new TestAppenderToValidateLogsForCachingTests();
final Logger logger = Logger.getRootLogger();
logger.setLevel(Level.DEBUG);
logger.addAppender(appender);
SiddhiManager siddhiManager = new SiddhiManager();
String streams = "" +
"define stream StockStream (symbol string, price float, volume long); " +
"define stream DeleteStockStream (symbol string, price float, volume long); " +
"@Store(type=\"testStoreForCacheMiss\", @Cache(size=\"2\", cache.policy=\"LRU\"))\n" +
"@PrimaryKey(\'symbol\', \'price\') " +
"define table StockTable (symbol string, price float, volume long); ";
String query = "" +
"@info(name = 'query1') " +
"from StockStream " +
"insert into StockTable ;" +
"" +
"@info(name = 'query2') " +
"from DeleteStockStream " +
"delete StockTable " +
" on StockTable.symbol == symbol AND StockTable.price == price AND StockTable.volume == volume;";
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query);
InputHandler stockStream = siddhiAppRuntime.getInputHandler("StockStream");
InputHandler deleteStockStream = siddhiAppRuntime.getInputHandler("DeleteStockStream");
siddhiAppRuntime.start();
deleteStockStream.send(new Object[]{"WSO2", 55.6f, 1L});
deleteStockStream.send(new Object[]{"IBM", 75.6f, 2L});
stockStream.send(new Object[]{"CISCO", 75.6f, 3L});
Thread.sleep(10);
stockStream.send(new Object[]{"APPLE", 75.6f, 4L});
Thread.sleep(1000);
Event[] events = siddhiAppRuntime.query("" +
"from StockTable " +
"on symbol == \"WSO2\" AND price == 55.6f ");
EventPrinter.print(events);
AssertJUnit.assertEquals(1, events.length);
final List<LoggingEvent> log = appender.getLog();
List<String> logMessages = new ArrayList<>();
for (LoggingEvent logEvent : log) {
String message = String.valueOf(logEvent.getMessage());
if (message.contains(":")) {
message = message.split(": ")[1];
}
logMessages.add(message);
}
Assert.assertEquals(logMessages.
contains("store table size is smaller than max cache. Sending results from cache"), false);
Assert.assertEquals(logMessages.contains("store table size is bigger than cache."), true);
Assert.assertEquals(Collections.frequency(logMessages, "store table size is bigger than cache."), 1);
Assert.assertEquals(logMessages.contains("cache constraints satisfied. Checking cache"), true);
Assert.assertEquals(Collections.frequency(logMessages, "cache constraints satisfied. Checking cache"), 1);
Assert.assertEquals(logMessages.contains("cache hit. Sending results from cache"), false);
Assert.assertEquals(logMessages.contains("cache miss. Loading from store"), true);
Assert.assertEquals(Collections.frequency(logMessages, "cache miss. Loading from store"), 1);
Assert.assertEquals(logMessages.contains("store also miss. sending null"), false);
Assert.assertEquals(logMessages.contains("sending results from cache after loading from store"), true);
Assert.assertEquals(Collections.frequency(logMessages, "sending results from cache after loading from store"),
1);
Assert.assertEquals(logMessages.contains("sending results from store"), false);
siddhiAppRuntime.shutdown();
}
/**
* Testing {@link RemoteLoggingConfigurator#needsApplying()} functionality ( must return true, after appender is removed form log4j )
* and
* */
@Test
public void testApplyPositiveRootLogger() {
ActiveDbAppender appender = new ActiveDbAppender();
appender.setHost("test");
appender.setDatabase("test");
appender.setUser("test");
appender.setPassword("test");
Logger log = Logger.getRootLogger();
log.addAppender(appender);
//construct the configurator - an appender is present
RemoteLoggingConfigurator remoteLoggingConfig = new RemoteLoggingConfigurator(null, -1);
//remove the appender, so the configurator will need to apply it
log.removeAppender(appender);
// check if needs to be applied - this sets the internal flags
// so the next "apply" method will work as expected
assertTrue(remoteLoggingConfig.needsApplying());
boolean hasDbCheckError = false;
try {
//apply the appender
//this statement will fail, due to missing PostgreSQL or MSSQL server at localhost
remoteLoggingConfig.apply();
} catch (Exception e) {
if (!e.getCause()
.getMessage()
.contains("Neither MSSQL, nor PostgreSQL server at 'test:1433' has database with name 'test'.")) {
// an exception was caught, but its cause is not the expected one
// re-throw the exception
throw e;
} else {
// expected exception was caught
hasDbCheckError = true;
}
}
assertTrue(hasDbCheckError);
}
@Test
public void testJobMonitorAndPrint() throws Exception {
JobStatus jobStatus_1 = new JobStatus(new JobID("job_000", 1), 1f, 0.1f,
0.1f, 0f, State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname",
"tmp-queue", "tmp-jobfile", "tmp-url", true);
JobStatus jobStatus_2 = new JobStatus(new JobID("job_000", 1), 1f, 1f,
1f, 1f, State.SUCCEEDED, JobPriority.HIGH, "tmp-user", "tmp-jobname",
"tmp-queue", "tmp-jobfile", "tmp-url", true);
doAnswer(
new Answer<TaskCompletionEvent[]>() {
@Override
public TaskCompletionEvent[] answer(InvocationOnMock invocation)
throws Throwable {
return new TaskCompletionEvent[0];
}
}
).when(job).getTaskCompletionEvents(anyInt(), anyInt());
doReturn(new TaskReport[5]).when(job).getTaskReports(isA(TaskType.class));
when(clientProtocol.getJobStatus(any(JobID.class))).thenReturn(jobStatus_1, jobStatus_2);
// setup the logger to capture all logs
Layout layout =
Logger.getRootLogger().getAppender("stdout").getLayout();
ByteArrayOutputStream os = new ByteArrayOutputStream();
WriterAppender appender = new WriterAppender(layout, os);
appender.setThreshold(Level.ALL);
Logger qlogger = Logger.getLogger(Job.class);
qlogger.addAppender(appender);
job.monitorAndPrintJob();
qlogger.removeAppender(appender);
LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
String line;
boolean foundHundred = false;
boolean foundComplete = false;
boolean foundUber = false;
String uberModeMatch = "uber mode : true";
String progressMatch = "map 100% reduce 100%";
String completionMatch = "completed successfully";
while ((line = r.readLine()) != null) {
if (line.contains(uberModeMatch)) {
foundUber = true;
}
foundHundred = line.contains(progressMatch);
if (foundHundred)
break;
}
line = r.readLine();
foundComplete = line.contains(completionMatch);
assertTrue(foundUber);
assertTrue(foundHundred);
assertTrue(foundComplete);
System.out.println("The output of job.toString() is : \n" + job.toString());
assertTrue(job.toString().contains("Number of maps: 5\n"));
assertTrue(job.toString().contains("Number of reduces: 5\n"));
}