下面列出了org.apache.log4j.HTMLLayout#org.apache.log4j.WriterAppender 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws IOException {
WriterAppender appender =
new WriterAppender(new PatternLayout(PATTERN), resp.getWriter());
appender.setThreshold(Level.TRACE);
try {
Logger.getRootLogger().addAppender(appender);
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} finally {
Logger.getRootLogger().removeAppender(appender);
}
}
/**
* This test will try to create document with netty wire DEBUG logging and validate it.
*
* @throws Exception
*/
@Test(groups = { "simple" }, timeOut = TIMEOUT)
public void createDocumentWithDebugLevel() throws Exception {
LogManager.getLogger(NETWORK_LOGGING_CATEGORY).setLevel(Level.DEBUG);
StringWriter consoleWriter = new StringWriter();
WriterAppender appender = new WriterAppender(new PatternLayout(), consoleWriter);
LogManager.getLogger(NETWORK_LOGGING_CATEGORY).addAppender(appender);
AsyncDocumentClient client = this.clientBuilder().build();
try {
Document docDefinition = getDocumentDefinition();
Observable<ResourceResponse<Document>> createObservable = client
.createDocument(getCollectionLink(), docDefinition, null, false);
ResourceResponseValidator<Document> validator = new ResourceResponseValidator.Builder<Document>()
.withId(docDefinition.getId())
.build();
validateSuccess(createObservable, validator);
assertThat(consoleWriter.toString()).isEmpty();
} finally {
safeClose(client);
}
}
/**
* This test will try to create document with netty wire WARN logging and validate it.
*
* @throws Exception
*/
@Test(groups = { "simple" }, timeOut = TIMEOUT)
public void createDocumentWithWarningLevel() throws Exception {
LogManager.getRootLogger().setLevel(Level.INFO);
LogManager.getLogger(NETWORK_LOGGING_CATEGORY).setLevel(Level.WARN);
StringWriter consoleWriter = new StringWriter();
WriterAppender appender = new WriterAppender(new PatternLayout(), consoleWriter);
Logger.getLogger(NETWORK_LOGGING_CATEGORY).addAppender(appender);
AsyncDocumentClient client = this.clientBuilder().build();
try {
Document docDefinition = getDocumentDefinition();
Observable<ResourceResponse<Document>> createObservable = client
.createDocument(getCollectionLink(), docDefinition, null, false);
ResourceResponseValidator<Document> validator = new ResourceResponseValidator.Builder<Document>()
.withId(docDefinition.getId())
.build();
validateSuccess(createObservable, validator);
assertThat(consoleWriter.toString()).isEmpty();
} finally {
safeClose(client);
}
}
@Test(groups = { "simple" }, timeOut = TIMEOUT)
public void createDocumentWithTraceLevelAtRoot() throws Exception {
LogManager.getRootLogger().setLevel(Level.INFO);
LogManager.getLogger(COSMOS_DB_LOGGING_CATEGORY).setLevel(Level.TRACE);
StringWriter consoleWriter = new StringWriter();
WriterAppender appender = new WriterAppender(new PatternLayout(), consoleWriter);
Logger.getLogger(NETWORK_LOGGING_CATEGORY).addAppender(appender);
AsyncDocumentClient client = this.clientBuilder().build();
try {
Document docDefinition = getDocumentDefinition();
Observable<ResourceResponse<Document>> createObservable = client
.createDocument(getCollectionLink(), docDefinition, null, false);
ResourceResponseValidator<Document> validator = new ResourceResponseValidator.Builder<Document>()
.withId(docDefinition.getId())
.build();
validateSuccess(createObservable, validator);
assertThat(consoleWriter.toString()).contains(LOG_PATTERN_1);
assertThat(consoleWriter.toString()).contains(LOG_PATTERN_2);
assertThat(consoleWriter.toString()).contains(LOG_PATTERN_3);
assertThat(consoleWriter.toString()).contains(LOG_PATTERN_4);
} finally {
safeClose(client);
}
}
@Test(groups = { "simple" }, timeOut = TIMEOUT)
public void createDocumentWithDebugLevelAtRoot() throws Exception {
LogManager.getRootLogger().setLevel(Level.INFO);
LogManager.getLogger(COSMOS_DB_LOGGING_CATEGORY).setLevel(Level.DEBUG);
StringWriter consoleWriter = new StringWriter();
WriterAppender appender = new WriterAppender(new PatternLayout(), consoleWriter);
Logger.getLogger(NETWORK_LOGGING_CATEGORY).addAppender(appender);
AsyncDocumentClient client = this.clientBuilder().build();
try {
Document docDefinition = getDocumentDefinition();
Observable<ResourceResponse<Document>> createObservable = client
.createDocument(getCollectionLink(), docDefinition, null, false);
ResourceResponseValidator<Document> validator = new ResourceResponseValidator.Builder<Document>()
.withId(docDefinition.getId())
.build();
validateSuccess(createObservable, validator);
assertThat(consoleWriter.toString()).isEmpty();
} finally {
safeClose(client);
}
}
/**
* This test will try to create document with netty wire ERROR logging and validate it.
*
* @throws Exception
*/
@Test(groups = { "simple" }, timeOut = TIMEOUT)
public void createDocumentWithErrorClient() throws Exception {
LogManager.getRootLogger().setLevel(Level.INFO);
LogManager.getLogger(NETWORK_LOGGING_CATEGORY).setLevel(Level.ERROR);
StringWriter consoleWriter = new StringWriter();
WriterAppender appender = new WriterAppender(new PatternLayout(), consoleWriter);
Logger.getLogger(NETWORK_LOGGING_CATEGORY).addAppender(appender);
AsyncDocumentClient client = this.clientBuilder().build();
try {
Document docDefinition = getDocumentDefinition();
Observable<ResourceResponse<Document>> createObservable = client
.createDocument(getCollectionLink(), docDefinition, null, false);
ResourceResponseValidator<Document> validator = new ResourceResponseValidator.Builder<Document>()
.withId(docDefinition.getId())
.build();
validateSuccess(createObservable, validator);
assertThat(consoleWriter.toString()).isEmpty();
} finally {
safeClose(client);
}
}
/**
* This test will try to create document with netty wire INFO logging and validate it.
*
* @throws Exception
*/
@Test(groups = { "simple" }, timeOut = TIMEOUT)
public void createDocumentWithInfoLevel() throws Exception {
LogManager.getRootLogger().setLevel(Level.INFO);
LogManager.getLogger(NETWORK_LOGGING_CATEGORY).setLevel(Level.INFO);
StringWriter consoleWriter = new StringWriter();
WriterAppender appender = new WriterAppender(new PatternLayout(), consoleWriter);
Logger.getLogger(NETWORK_LOGGING_CATEGORY).addAppender(appender);
AsyncDocumentClient client = this.clientBuilder().build();
try {
Document docDefinition = getDocumentDefinition();
Observable<ResourceResponse<Document>> createObservable = client
.createDocument(getCollectionLink(), docDefinition, null, false);
ResourceResponseValidator<Document> validator = new ResourceResponseValidator.Builder<Document>()
.withId(docDefinition.getId())
.build();
validateSuccess(createObservable, validator);
assertThat(consoleWriter.toString()).isEmpty();
} finally {
safeClose(client);
}
}
private synchronized void configureLogger() {
org.apache.log4j.Logger npmLogger = org.apache.log4j.Logger.getLogger(
"com.github.eirslett.maven.plugins.frontend.lib.DefaultYarnRunner");
Enumeration appenders = org.apache.log4j.Logger.getRootLogger().getAllAppenders();
if (appenders != null) {
while (appenders.hasMoreElements()) {
Appender appender = (Appender) appenders.nextElement();
appender.addFilter(new Filter() {
@Override
public int decide(LoggingEvent loggingEvent) {
if (loggingEvent.getLoggerName().contains("DefaultYarnRunner")) {
return DENY;
} else {
return NEUTRAL;
}
}
});
}
}
npmLogger.addAppender(new WriterAppender(
new PatternLayout("%m%n"),
out
));
}
private LogCapturer(Logger logger) {
this.logger = logger;
Appender defaultAppender = Logger.getRootLogger().getAppender("stdout");
if (defaultAppender == null) {
defaultAppender = Logger.getRootLogger().getAppender("console");
}
final Layout layout = (defaultAppender == null) ? new PatternLayout() :
defaultAppender.getLayout();
this.appender = new WriterAppender(layout, sw);
logger.addAppender(this.appender);
}
@Override
protected void append(LoggingEvent event) {
StringWriter stringWriter = new StringWriter();
WriterAppender writerAppender = new WriterAppender(layout, stringWriter);
writerAppender.append(event);
logList.add(stringWriter.toString());
}
/**
* This test will try to create document with netty wire TRACE logging and validate it.
*
* @throws Exception
*/
@Test(groups = { "simple" }, timeOut = TIMEOUT)
public void createDocumentWithTraceLevel() throws Exception {
LogManager.getRootLogger().setLevel(Level.INFO);
LogManager.getLogger(NETWORK_LOGGING_CATEGORY).setLevel(Level.TRACE);
StringWriter consoleWriter = new StringWriter();
WriterAppender appender = new WriterAppender(new PatternLayout(), consoleWriter);
Logger.getLogger(NETWORK_LOGGING_CATEGORY).addAppender(appender);
AsyncDocumentClient client = this.clientBuilder().build();
try {
Document docDefinition = getDocumentDefinition();
Observable<ResourceResponse<Document>> createObservable = client
.createDocument(getCollectionLink(), docDefinition, null, false);
ResourceResponseValidator<Document> validator = new ResourceResponseValidator.Builder<Document>()
.withId(docDefinition.getId())
.build();
validateSuccess(createObservable, validator);
assertThat(consoleWriter.toString()).contains(LOG_PATTERN_1);
assertThat(consoleWriter.toString()).contains(LOG_PATTERN_2);
assertThat(consoleWriter.toString()).contains(LOG_PATTERN_3);
assertThat(consoleWriter.toString()).contains(LOG_PATTERN_4);
} finally {
safeClose(client);
}
}
/**
* This test will try to create document via http proxy server with netty wire logging and validate it.
*
* @throws Exception
*/
@Test(groups = { "simple" }, timeOut = TIMEOUT)
public void createDocumentWithValidHttpProxyWithNettyWireLogging() throws Exception {
LogManager.getRootLogger().setLevel(Level.INFO);
LogManager.getLogger(LogLevelTest.NETWORK_LOGGING_CATEGORY).setLevel(Level.TRACE);
AsyncDocumentClient clientWithRightProxy = null;
try {
StringWriter consoleWriter = new StringWriter();
WriterAppender appender = new WriterAppender(new PatternLayout(), consoleWriter);
Logger.getLogger(LogLevelTest.NETWORK_LOGGING_CATEGORY).addAppender(appender);
ConnectionPolicy connectionPolicy =new ConnectionPolicy();
connectionPolicy.setProxy(PROXY_HOST, PROXY_PORT);
clientWithRightProxy = new Builder().withServiceEndpoint(TestConfigurations.HOST)
.withMasterKeyOrResourceToken(TestConfigurations.MASTER_KEY)
.withConnectionPolicy(connectionPolicy)
.withConsistencyLevel(ConsistencyLevel.Session).build();
Document docDefinition = getDocumentDefinition();
Observable<ResourceResponse<Document>> createObservable = clientWithRightProxy
.createDocument(getCollectionLink(), docDefinition, null, false);
ResourceResponseValidator<Document> validator = new ResourceResponseValidator.Builder<Document>()
.withId(docDefinition.getId())
.build();
validateSuccess(createObservable, validator);
assertThat(consoleWriter.toString()).contains(LogLevelTest.LOG_PATTERN_1);
assertThat(consoleWriter.toString()).contains(LogLevelTest.LOG_PATTERN_2);
assertThat(consoleWriter.toString()).contains(LogLevelTest.LOG_PATTERN_3);
} finally {
safeClose(clientWithRightProxy);
}
}
@Test(timeout=20000)
public void testWarnCommandOpts() throws Exception {
Logger logger = Logger.getLogger(YARNRunner.class);
ByteArrayOutputStream bout = new ByteArrayOutputStream();
Layout layout = new SimpleLayout();
Appender appender = new WriterAppender(layout, bout);
logger.addAppender(appender);
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, "-Djava.net.preferIPv4Stack=true -Djava.library.path=foo");
jobConf.set(MRJobConfig.MR_AM_COMMAND_OPTS, "-Xmx1024m -Djava.library.path=bar");
YARNRunner yarnRunner = new YARNRunner(jobConf);
@SuppressWarnings("unused")
ApplicationSubmissionContext submissionContext =
buildSubmitContext(yarnRunner, jobConf);
String logMsg = bout.toString();
assertTrue(logMsg.contains("WARN - Usage of -Djava.library.path in " +
"yarn.app.mapreduce.am.admin-command-opts can cause programs to no " +
"longer function if hadoop native libraries are used. These values " +
"should be set as part of the LD_LIBRARY_PATH in the app master JVM " +
"env using yarn.app.mapreduce.am.admin.user.env config settings."));
assertTrue(logMsg.contains("WARN - Usage of -Djava.library.path in " +
"yarn.app.mapreduce.am.command-opts can cause programs to no longer " +
"function if hadoop native libraries are used. These values should " +
"be set as part of the LD_LIBRARY_PATH in the app master JVM env " +
"using yarn.app.mapreduce.am.env config settings."));
}
public Logger createLogger(Writer writer) {
TestLoggerRepository repo = new TestLoggerRepository();
Logger logger = repo.getLogger("test");
Log4Json layout = new Log4Json();
WriterAppender appender = new WriterAppender(layout, writer);
logger.addAppender(appender);
return logger;
}
@Test(timeout=20000)
public void testWarnCommandOpts() throws Exception {
Logger logger = Logger.getLogger(YARNRunner.class);
ByteArrayOutputStream bout = new ByteArrayOutputStream();
Layout layout = new SimpleLayout();
Appender appender = new WriterAppender(layout, bout);
logger.addAppender(appender);
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, "-Djava.net.preferIPv4Stack=true -Djava.library.path=foo");
jobConf.set(MRJobConfig.MR_AM_COMMAND_OPTS, "-Xmx1024m -Djava.library.path=bar");
YARNRunner yarnRunner = new YARNRunner(jobConf);
@SuppressWarnings("unused")
ApplicationSubmissionContext submissionContext =
buildSubmitContext(yarnRunner, jobConf);
String logMsg = bout.toString();
assertTrue(logMsg.contains("WARN - Usage of -Djava.library.path in " +
"yarn.app.mapreduce.am.admin-command-opts can cause programs to no " +
"longer function if hadoop native libraries are used. These values " +
"should be set as part of the LD_LIBRARY_PATH in the app master JVM " +
"env using yarn.app.mapreduce.am.admin.user.env config settings."));
assertTrue(logMsg.contains("WARN - Usage of -Djava.library.path in " +
"yarn.app.mapreduce.am.command-opts can cause programs to no longer " +
"function if hadoop native libraries are used. These values should " +
"be set as part of the LD_LIBRARY_PATH in the app master JVM env " +
"using yarn.app.mapreduce.am.env config settings."));
}
public Logger createLogger(Writer writer) {
TestLoggerRepository repo = new TestLoggerRepository();
Logger logger = repo.getLogger("test");
Log4Json layout = new Log4Json();
WriterAppender appender = new WriterAppender(layout, writer);
logger.addAppender(appender);
return logger;
}
/**
* Helper method to set up stream.
* @return ByteArrayOutputStream for logging
*/
private ByteArrayOutputStream setUpLoggerStream() {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final WriterAppender appender = new WriterAppender(
new SimpleLayout(),
baos
);
appender.setThreshold(Level.ERROR);
appender.activateOptions();
Logger.getRootLogger().addAppender(appender);
return baos;
}
LogCapturer(org.apache.log4j.Logger logger) {
this.logger = logger;
Appender defaultAppender = org.apache.log4j.Logger.getRootLogger().getAppender("stdout");
if (defaultAppender == null) {
defaultAppender = org.apache.log4j.Logger.getRootLogger().getAppender("console");
}
final Layout layout = (defaultAppender == null) ? new PatternLayout() :
defaultAppender.getLayout();
this.appender = new WriterAppender(layout, sw);
this.logger.addAppender(this.appender);
}
public WriterAppenderManager( LogChannelInterface logChannelInterface, LogLevel logLevel, String name,
LogWriter logWriter ) {
// Set up an appender that will send all pig log messages to Kettle's log
// via logBasic().
KettleLoggingPrintWriter klps = new KettleLoggingPrintWriter( logChannelInterface );
pigToKettleAppender = new WriterAppender( new Log4jKettleLayout( true ), klps );
Logger pigLogger = Logger.getLogger( "org.apache.pig" );
Level log4jLevel = getLog4jLevel( logLevel );
pigLogger.setLevel( log4jLevel );
String logFileName = "pdi-" + name; //$NON-NLS-1$
Log4jFileAppender appender = null;
this.logWriter = logWriter;
try {
appender = LogWriter.createFileAppender( logFileName, true, false );
logWriter.addAppender( appender );
logChannelInterface.setLogLevel( logLevel );
if ( pigLogger != null ) {
pigLogger.addAppender( pigToKettleAppender );
}
} catch ( Exception e ) {
logChannelInterface.logError( BaseMessages
.getString( PKG, "JobEntryPigScriptExecutor.FailedToOpenLogFile", logFileName, e.toString() ) ); //$NON-NLS-1$
logChannelInterface.logError( Const.getStackTracker( e ) );
}
this.appender = appender;
}
static void create(Logger logger) {
try {
File logFile = createLogFile();
WriterAppender appender = createAppender(logFile);
logger.addAppender(appender);
} catch (Exception e) {
logger.log(Level.ERROR, e.getMessage(), e);
}
}
private static WriterAppender createAppender(File logFile)
throws IOException {
HTMLLayout layout = new HTMLLayout();
RollingFileAppender app = new RollingFileAppender(layout,
logFile.getAbsolutePath(), true);
app.setMaxFileSize("3MB");
app.setMaxBackupIndex(3);
return app;
}
private void init() {
logName = "PurgeUtilityLog." + getThreadName();
logger = Logger.getLogger( logName );
logger.setLevel( logLevel );
IPurgeUtilityLayout layout;
if ( layoutClass == PurgeUtilityHTMLLayout.class ) {
layout = new PurgeUtilityHTMLLayout( logLevel );
} else {
layout = new PurgeUtilityTextLayout( logLevel );
}
layout.setTitle( "Purge Utility Log" );
writeAppender =
new WriterAppender( (Layout) layout, new OutputStreamWriter( outputStream, Charset.forName( "utf-8" ) ) );
logger.addAppender( writeAppender );
}
protected int runImport(String tableName, Configuration sqoopConf,
boolean sequenceFile) {
Logger rootLogger = Logger.getRootLogger();
StringWriter stringWriter = new StringWriter();
Layout layout = new PatternLayout("%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n");
WriterAppender writerAppender = new WriterAppender(layout, stringWriter);
rootLogger.addAppender(writerAppender);
List<String> sqoopArgs = new ArrayList<String>();
sqoopArgs.add("import");
sqoopArgs.add("--direct");
if (sequenceFile) {
sqoopArgs.add("--as-sequencefile");
}
sqoopArgs.add("--connect");
sqoopArgs.add(OracleUtils.CONNECT_STRING);
sqoopArgs.add("--username");
sqoopArgs.add(OracleUtils.ORACLE_USER_NAME);
sqoopArgs.add("--password");
sqoopArgs.add(OracleUtils.ORACLE_USER_PASS);
sqoopArgs.add("--table");
sqoopArgs.add(tableName);
sqoopArgs.add("--target-dir");
sqoopArgs.add(this.sqoopTargetDirectory);
sqoopArgs.add("--class-name");
sqoopArgs.add(getSqoopGenClassName());
sqoopArgs.add("--bindir");
sqoopArgs.add(this.sqoopGenLibDirectory);
sqoopArgs.add("--outdir");
sqoopArgs.add(this.sqoopGenSrcDirectory);
if (OracleUtils.NUM_MAPPERS != 0) {
sqoopArgs.add("--num-mappers");
sqoopArgs.add(Integer.toString(OracleUtils.NUM_MAPPERS));
}
int rowsInTable =
countTable(tableName, OraOopUtilities.splitOracleStringList(sqoopConf
.get(OraOopConstants.ORAOOP_IMPORT_PARTITION_LIST)));
int retCode =
Sqoop.runTool(sqoopArgs.toArray(new String[sqoopArgs.size()]),
sqoopConf);
int rowsImported = 0;
if (retCode == 0) {
String logString = stringWriter.toString();
Pattern pattern =
Pattern.compile(
"(INFO mapreduce.ImportJobBase: Retrieved )([0-9]+)( records.)");
Matcher matcher = pattern.matcher(logString);
while (matcher.find()) {
rowsImported = Integer.parseInt(matcher.group(2));
}
}
Assert.assertEquals("Incorrect number of rows imported", rowsInTable,
rowsImported);
return retCode;
}
@Test
public void testJobMonitorAndPrint() throws Exception {
JobStatus jobStatus_1 = new JobStatus(new JobID("job_000", 1), 1f, 0.1f,
0.1f, 0f, State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname",
"tmp-queue", "tmp-jobfile", "tmp-url", true);
JobStatus jobStatus_2 = new JobStatus(new JobID("job_000", 1), 1f, 1f,
1f, 1f, State.SUCCEEDED, JobPriority.HIGH, "tmp-user", "tmp-jobname",
"tmp-queue", "tmp-jobfile", "tmp-url", true);
doAnswer(
new Answer<TaskCompletionEvent[]>() {
@Override
public TaskCompletionEvent[] answer(InvocationOnMock invocation)
throws Throwable {
return new TaskCompletionEvent[0];
}
}
).when(job).getTaskCompletionEvents(anyInt(), anyInt());
doReturn(new TaskReport[5]).when(job).getTaskReports(isA(TaskType.class));
when(clientProtocol.getJobStatus(any(JobID.class))).thenReturn(jobStatus_1, jobStatus_2);
// setup the logger to capture all logs
Layout layout =
Logger.getRootLogger().getAppender("stdout").getLayout();
ByteArrayOutputStream os = new ByteArrayOutputStream();
WriterAppender appender = new WriterAppender(layout, os);
appender.setThreshold(Level.ALL);
Logger qlogger = Logger.getLogger(Job.class);
qlogger.addAppender(appender);
job.monitorAndPrintJob();
qlogger.removeAppender(appender);
LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
String line;
boolean foundHundred = false;
boolean foundComplete = false;
boolean foundUber = false;
String uberModeMatch = "uber mode : true";
String progressMatch = "map 100% reduce 100%";
String completionMatch = "completed successfully";
while ((line = r.readLine()) != null) {
if (line.contains(uberModeMatch)) {
foundUber = true;
}
foundHundred = line.contains(progressMatch);
if (foundHundred)
break;
}
line = r.readLine();
foundComplete = line.contains(completionMatch);
assertTrue(foundUber);
assertTrue(foundHundred);
assertTrue(foundComplete);
System.out.println("The output of job.toString() is : \n" + job.toString());
assertTrue(job.toString().contains("Number of maps: 5\n"));
assertTrue(job.toString().contains("Number of reduces: 5\n"));
}
private LogCapturer(Logger logger) {
this.logger = logger;
Layout layout = Logger.getRootLogger().getAppender("stdout").getLayout();
WriterAppender wa = new WriterAppender(layout, sw);
logger.addAppender(wa);
}
private void testNegotiation(int amount, TestContext context) {
PrimaryVertex pv = new PrimaryVertex();
GDHVertex[] verticles = new GDHVertex[amount];
Configuration[] confs = new Configuration[amount];
Writer writer = new StringWriter();
for (int i = 0; i < amount; i++) {
verticles[i] = new GDHVertex();
confs[i] = new Configuration();
WriterAppender app = new WriterAppender(new PatternLayout(), writer);
app.setThreshold(Level.DEBUG);
app.activateOptions();
confs[i].setAppender(app);
String port = amount + "08" + i;
confs[i].setIP("localhost").setPort(port).setLogLevel(Level.DEBUG);
verticles[i].setConfiguration(confs[i]);
}
List<GDHVertex> list = new ArrayList<>(Arrays.asList(verticles));
Group g = new Group(confs[0], list.stream().map(y -> y.getNode()).collect(Collectors.toList()));
verticles[0].addGroup(g);
Async async1 = context.async(amount);
for (int i = 0; i < amount; i++)
pv.run(verticles[i], res -> {
if (res.succeeded()) {
async1.countDown();
} else {
res.cause().printStackTrace();
return;
}
});
async1.awaitSuccess();
BigInteger[] keys = new BigInteger[2];
try {
keys[0] = verticles[0].exchange(g.getGroupId()).get();
Assert.assertFalse(!writer.toString().isEmpty() && writer.toString().contains(keys[0].toString()));
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Async async2 = context.async(amount);
for (int i = 0; i < amount; i++)
pv.kill(verticles[i], res -> {
if (res.succeeded()) {
async2.countDown();
} else {
res.cause().printStackTrace();
}
});
async2.awaitSuccess();
}
private void testNegotiation(int amount, TestContext context) {
PrimaryVertex pv = new PrimaryVertex();
GDHVertex[] verticles = new GDHVertex[amount];
Configuration[] confs = new Configuration[amount];
Writer writer = new StringWriter();
for (int i = 0; i < amount; i++) {
verticles[i] = new GDHVertex();
confs[i] = new Configuration();
WriterAppender app = new WriterAppender(new PatternLayout(), writer);
app.setThreshold(Level.DEBUG);
app.activateOptions();
confs[i].setAppender(app);
String port = amount + "08" + i;
confs[i].setIP("localhost").setPort(port).setLogLevel(Level.DEBUG);
verticles[i].setConfiguration(confs[i]);
}
List<GDHVertex> list = new ArrayList<>(Arrays.asList(verticles));
Group g = new Group(confs[0], list.stream().map(y -> y.getNode()).collect(Collectors.toList()));
verticles[0].addGroup(g);
Async async1 = context.async(amount);
for (int i = 0; i < amount; i++)
pv.run(verticles[i], res -> {
if (res.succeeded()) {
async1.countDown();
} else {
res.cause().printStackTrace();
return;
}
});
async1.awaitSuccess();
BigInteger[] keys = new BigInteger[1];
try {
keys[0] = verticles[0].exchange(g.getGroupId()).get();
for (int j = 0; j < verticles.length; j++) {
Assert.assertEquals(verticles[j].getKey(g.getGroupId()).get(), keys[0]);
}
String write = writer.toString();
int count1 = write.length() - write.replace(Constants.LOG_IN, "0000000").length();
int count2 = write.length() - write.replace(Constants.LOG_OUT, "0000000").length();
Assert.assertTrue(count1 >= amount*amount-1);
Assert.assertTrue(count2 >= amount*amount-1);
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Async async2 = context.async(amount);
for (int i = 0; i < amount; i++)
pv.kill(verticles[i], res -> {
if (res.succeeded()) {
async2.countDown();
} else {
res.cause().printStackTrace();
}
});
async2.awaitSuccess();
}
@Test
public void testJobMonitorAndPrint() throws Exception {
JobStatus jobStatus_1 = new JobStatus(new JobID("job_000", 1), 1f, 0.1f,
0.1f, 0f, State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname",
"tmp-queue", "tmp-jobfile", "tmp-url", true);
JobStatus jobStatus_2 = new JobStatus(new JobID("job_000", 1), 1f, 1f,
1f, 1f, State.SUCCEEDED, JobPriority.HIGH, "tmp-user", "tmp-jobname",
"tmp-queue", "tmp-jobfile", "tmp-url", true);
doAnswer(
new Answer<TaskCompletionEvent[]>() {
@Override
public TaskCompletionEvent[] answer(InvocationOnMock invocation)
throws Throwable {
return new TaskCompletionEvent[0];
}
}
).when(job).getTaskCompletionEvents(anyInt(), anyInt());
doReturn(new TaskReport[5]).when(job).getTaskReports(isA(TaskType.class));
when(clientProtocol.getJobStatus(any(JobID.class))).thenReturn(jobStatus_1, jobStatus_2);
// setup the logger to capture all logs
Layout layout =
Logger.getRootLogger().getAppender("stdout").getLayout();
ByteArrayOutputStream os = new ByteArrayOutputStream();
WriterAppender appender = new WriterAppender(layout, os);
appender.setThreshold(Level.ALL);
Logger qlogger = Logger.getLogger(Job.class);
qlogger.addAppender(appender);
job.monitorAndPrintJob();
qlogger.removeAppender(appender);
LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
String line;
boolean foundHundred = false;
boolean foundComplete = false;
boolean foundUber = false;
String uberModeMatch = "uber mode : true";
String progressMatch = "map 100% reduce 100%";
String completionMatch = "completed successfully";
while ((line = r.readLine()) != null) {
if (line.contains(uberModeMatch)) {
foundUber = true;
}
foundHundred = line.contains(progressMatch);
if (foundHundred)
break;
}
line = r.readLine();
foundComplete = line.contains(completionMatch);
assertTrue(foundUber);
assertTrue(foundHundred);
assertTrue(foundComplete);
System.out.println("The output of job.toString() is : \n" + job.toString());
assertTrue(job.toString().contains("Number of maps: 5\n"));
assertTrue(job.toString().contains("Number of reduces: 5\n"));
}
private LogCapturer(Logger logger) {
this.logger = logger;
Layout layout = Logger.getRootLogger().getAppender("stdout").getLayout();
WriterAppender wa = new WriterAppender(layout, sw);
logger.addAppender(wa);
}
public WriterAppender getGraphLogAppender() {
return graphLogAppender;
}