下面列出了org.apache.commons.lang3.tuple.Pair#getKey ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Authenticates the given request using the authenticators which have been initialized.
*
* @param messageContext The message to be authenticated
* @return true if the authentication is successful (never returns false)
* @throws APISecurityException If an authentication failure or some other error occurs
*/
protected boolean isAuthenticate(MessageContext messageContext) throws APISecurityException {
boolean authenticated = false;
AuthenticationResponse authenticationResponse;
List<AuthenticationResponse> authResponses = new ArrayList<>();
for (Authenticator authenticator : authenticators) {
authenticationResponse = authenticator.authenticate(messageContext);
if (authenticationResponse.isMandatoryAuthentication()) {
// Update authentication status only if the authentication is a mandatory one
authenticated = authenticationResponse.isAuthenticated();
}
if (!authenticationResponse.isAuthenticated()) {
authResponses.add(authenticationResponse);
}
if (!authenticationResponse.isContinueToNextAuthenticator()) {
break;
}
}
if (!authenticated) {
Pair<Integer, String> error = getError(authResponses);
throw new APISecurityException(error.getKey(), error.getValue());
}
return true;
}
@Test
public void testFixStuckTickets() {
List<TicketCategoryModification> categories = Collections.singletonList(
new TicketCategoryModification(null, "default", AVAILABLE_SEATS,
new DateTimeModification(LocalDate.now(), LocalTime.now()),
new DateTimeModification(LocalDate.now(), LocalTime.now()),
DESCRIPTION, BigDecimal.TEN, false, "", false, null, null, null, null, null, 0, null, null, AlfioMetadata.empty()));
Pair<Event, String> eventUsername = initEvent(categories);
Event event = eventUsername.getKey();
TicketReservationModification trm = new TicketReservationModification();
trm.setAmount(1);
trm.setTicketCategoryId(eventManager.loadTicketCategories(event).get(0).getId());
TicketReservationWithOptionalCodeModification r = new TicketReservationWithOptionalCodeModification(trm, Optional.empty());
Date expiration = DateUtils.addDays(new Date(), 1);
String reservationId = ticketReservationManager.createTicketReservation(event, Collections.singletonList(r), Collections.emptyList(), expiration, Optional.empty(), Locale.ENGLISH, false);
//simulate the effect of a reservation cancellation after #392, as described in #391
ticketReservationRepository.updateReservationStatus(reservationId, TicketReservation.TicketReservationStatus.CANCELLED.name());
List<Ticket> ticketsInReservation = ticketRepository.findTicketsInReservation(reservationId);
assertEquals(1, ticketsInReservation.size());
String uuid = ticketsInReservation.get(0).getUuid();
assertTrue(ticketsInReservation.stream().allMatch(t -> t.getStatus() == Ticket.TicketStatus.PENDING));
dataMigrator.fixStuckTickets(event.getId());
assertSame(Ticket.TicketStatus.RELEASED, ticketRepository.findByUUID(uuid).getStatus());
}
@Override
public Pair<UserWorkflowResult<String>, Boolean> internalSuspend(
final String key, final String updater, final String context) {
User user = userDAO.authFind(key);
Pair<UserWorkflowResult<String>, Boolean> result = null;
Pair<Boolean, Boolean> enforce = userDAO.enforcePolicies(user);
if (enforce.getKey()) {
LOG.debug("User {} {} is over the max failed logins", user.getKey(), user.getUsername());
// reduce failed logins number to avoid multiple request
user.setFailedLogins(user.getFailedLogins() - 1);
// set suspended flag
user.setSuspended(Boolean.TRUE);
result = Pair.of(doSuspend(user, updater, context), enforce.getValue());
}
return result;
}
@Test
public void testUpdateTicketReservation() {
List<TicketCategoryModification> categories = Collections.singletonList(
new TicketCategoryModification(null, "default", AVAILABLE_SEATS,
new DateTimeModification(LocalDate.now(), LocalTime.now()),
new DateTimeModification(LocalDate.now(), LocalTime.now()),
DESCRIPTION, BigDecimal.TEN, false, "", false, null, null, null, null, null, 0, null, null, AlfioMetadata.empty()));
Pair<Event, String> eventUsername = initEvent(categories);
Event event = eventUsername.getKey();
try {
TicketReservationModification trm = new TicketReservationModification();
trm.setAmount(1);
trm.setTicketCategoryId(eventManager.loadTicketCategories(event).get(0).getId());
TicketReservationWithOptionalCodeModification r = new TicketReservationWithOptionalCodeModification(trm, Optional.empty());
Date expiration = DateUtils.addDays(new Date(), 1);
String reservationId = ticketReservationManager.createTicketReservation(event, Collections.singletonList(r), Collections.emptyList(), expiration, Optional.empty(), Locale.ENGLISH, false);
dataMigrator.fillReservationsLanguage();
TicketReservation ticketReservation = ticketReservationManager.findById(reservationId).get();
assertEquals("en", ticketReservation.getUserLanguage());
} finally {
eventManager.deleteEvent(event.getId(), eventUsername.getValue());
}
}
@Test
public void testDuplicates() {
List<TicketCategoryModification> categories = Collections.singletonList(
new TicketCategoryModification(null, "default", 10,
new DateTimeModification(LocalDate.now().plusDays(1), LocalTime.now()),
new DateTimeModification(LocalDate.now().plusDays(2), LocalTime.now()),
DESCRIPTION, BigDecimal.TEN, false, "", false, null, null, null, null, null, 0, null, null, AlfioMetadata.empty()));
Pair<Event, String> pair = initEvent(categories, organizationRepository, userManager, eventManager, eventRepository);
Event event = pair.getKey();
Group group = groupManager.createNew("test", "This is a test", event.getOrganizationId());
assertNotNull(group);
LinkedGroupModification modification = new LinkedGroupModification(null, group.getId(), event.getId(), null, LinkedGroup.Type.ONCE_PER_VALUE, LinkedGroup.MatchType.FULL, null);
LinkedGroup configuration = groupManager.createLink(group.getId(), event.getId(), modification);
assertNotNull(configuration);
Result<Integer> items = groupManager.insertMembers(group.getId(), Arrays.asList(new GroupMemberModification(null,"[email protected]", "description"), new GroupMemberModification(null,"[email protected]", "description")));
assertFalse(items.isSuccess());
assertEquals("value.duplicate", items.getFirstErrorOrNull().getCode());
assertEquals("[email protected]", items.getFirstErrorOrNull().getDescription());
}
public static <T> List<Pair<Index, List<T>>> combineByIndex(final List<Pair<Index, T>> input) {
final List<Pair<Index, List<T>>> result = new ArrayList<>();
sortInPlace(input);
List<T> valueSet = new ArrayList<>();
Pair<Index, T> last = null;
for (final Pair<Index, T> item : input) {
if ((last != null)
&& (item.getKey() != null)
&& ((last.getKey() == null)
|| !last.getKey().getName().equals(item.getKey().getName()))) {
result.add(Pair.of(last.getLeft(), valueSet));
valueSet = new ArrayList<>();
}
valueSet.add(item.getValue());
last = item;
}
if (last != null) {
result.add(Pair.of(last.getLeft(), valueSet));
}
return result;
}
/**
* Single-block version of blockFinder. Can safely be called directly
* for quick block check.
* @param pos the BlockPos to check
* @param state the current state of the block
* @param add true if the block was added to world, false if it was removed
*/
public static void checkBlock(BlockPos pos, BlockState state, boolean add )
{
if ( !Controller.isXRayActive() || Controller.getBlockStore().getStore().isEmpty() )
return; // just pass
// If we're removing then remove :D
if( !add ) {
Render.syncRenderList.remove( new RenderBlockProps(pos,0) );
return;
}
ResourceLocation block = state.getBlock().getRegistryName();
if( block == null )
return;
Pair<BlockData, UUID> dataWithUUID = Controller.getBlockStore().getStoreByReference(block.toString());
if( dataWithUUID == null || dataWithUUID.getKey() == null || !dataWithUUID.getKey().isDrawing() )
return;
// the block was added to the world, let's add it to the drawing buffer
Render.syncRenderList.add(new RenderBlockProps(pos, dataWithUUID.getKey().getColor()) );
}
/**
* Common utility to configure a GATKReportTable columns
*
* Sets the column names to the strat names in stratsAndStates for the primary key in table
*
* @param table
* @param primaryKey
* @param stratsAndStates
*/
private static void setStratificationColumns(final GATKReportTable table,
final String primaryKey,
final List<Pair<VariantStratifier, Object>> stratsAndStates) {
table.set(primaryKey, table.getTableName(), table.getTableName());
for ( final Pair<VariantStratifier, Object> stratAndState : stratsAndStates ) {
final VariantStratifier vs = stratAndState.getKey();
final String columnName = vs.getName();
final Object strat = stratAndState.getValue();
if ( columnName == null || strat == null )
throw new GATKException("Unexpected null variant stratifier state at " + table + " key = " + primaryKey);
table.set(primaryKey, columnName, strat);
}
}
@Override
public void registerHeatFrameCoolRecipe(Object input, ItemStack output){
if(input == null) throw new NullPointerException("Input can't be null!");
if(!(input instanceof ItemStack) && !(input instanceof Pair)) throw new IllegalArgumentException("Input needs to be of type ItemStack or (Apache's) Pair<String, Integer>. Violating object: " + input);
if(input instanceof Pair) {
Pair pair = (Pair)input;
if(!(pair.getKey() instanceof String)) throw new IllegalArgumentException("Pair key needs to be a String (ore dict entry)");
if(!(pair.getValue() instanceof Integer)) throw new IllegalArgumentException("Value key needs to be an Integer (amount)");
}
if(output == null) throw new NullPointerException("Output can't be null!");
heatFrameCoolingRecipes.add(new ImmutablePair(input, output));
}
@Override
public void process(JCas jCas) throws AnalysisEngineProcessException {
// convert to SVM sentences
Pair<List<Cooccurrence>, List<SentenceExample>> toSvm = JsreTrainAnnotator
.getSvmSentences(jCas, brClass);
ExampleSet inputSet = new SentenceSetCopy();
for (SentenceExample se : toSvm.getRight()) {
inputSet.add(se.s, se.classz, se.id);
}
// embed the input data into a feature space
ExampleSet featureSpaceSet = mapping.map(inputSet, index);
// predict
Pair<List<Integer>, List<Double>> predicted = predict(featureSpaceSet,
model);
List<Integer> predictedLabels = predicted.getLeft();
List<Double> predictedProbs = predicted.getRight();
// remove if not predicted
List<Cooccurrence> coocs = toSvm.getKey();
Cooccurrence[] array = coocs.toArray(new Cooccurrence[coocs.size()]);
Preconditions.checkArgument(predictedLabels.size() == coocs.size(),
"pmid" + getHeaderDocId(jCas)
+ " should have same # of elems, but was: coocs="
+ coocs.size() + " and predictedLabels="
+ predictedLabels.size());
for (int i = 0; i < predictedLabels.size(); i++) {
if (predictedLabels.get(i) == 0) {// 0=>no rel
array[i].removeFromIndexes();
// TODO array[i].setHasInteraction(false);
} else {
array[i].setConfidence(new Double(predictedProbs.get(i))
.floatValue());
array[i].setHasInteraction(true);
}
}
}
@Test
public void testReserveFromNewCategory() throws Exception {
List<TicketCategoryModification> categories = Collections.singletonList(
new TicketCategoryModification(null, "default", 1,
new DateTimeModification(LocalDate.now(), LocalTime.now()),
new DateTimeModification(LocalDate.now(), LocalTime.now()),
DESCRIPTION, BigDecimal.TEN, false, "", true, null, null, null, null, null, 0, null, null, AlfioMetadata.empty()));
Pair<Event, String> eventWithUsername = initEvent(categories, organizationRepository, userManager, eventManager, eventRepository);
Event event = eventWithUsername.getKey();
String username = eventWithUsername.getValue();
DateTimeModification expiration = DateTimeModification.fromZonedDateTime(ZonedDateTime.now().plusDays(1));
CustomerData customerData = new CustomerData("Integration", "Test", "[email protected]", "Billing Address", "reference", "en", "1234", "CH", null);
Category category = new Category(null, "name", new BigDecimal("100.00"));
int attendees = AVAILABLE_SEATS;
List<TicketsInfo> ticketsInfoList = Collections.singletonList(new TicketsInfo(category, generateAttendees(attendees), true, false));
AdminReservationModification modification = new AdminReservationModification(expiration, customerData, ticketsInfoList, "en", false, false, null, null);
Result<Pair<TicketReservation, List<Ticket>>> result = adminReservationManager.createReservation(modification, event.getShortName(), username);
assertTrue(result.isSuccess());
Pair<TicketReservation, List<Ticket>> data = result.getData();
List<Ticket> tickets = data.getRight();
assertTrue(tickets.size() == attendees);
assertNotNull(data.getLeft());
int categoryId = tickets.get(0).getCategoryId();
eventManager.getSingleEvent(event.getShortName(), username);
assertEquals(attendees + 1, eventRepository.countExistingTickets(event.getId()).intValue());
assertEquals(attendees, ticketRepository.findPendingTicketsInCategories(Collections.singletonList(categoryId)).size());
TicketCategory categoryModified = ticketCategoryRepository.getByIdAndActive(categoryId, event.getId());
assertEquals(categoryModified.getMaxTickets(), attendees);
ticketCategoryRepository.findAllTicketCategories(event.getId()).forEach(tc -> assertTrue(specialPriceRepository.findAllByCategoryId(tc.getId()).stream().allMatch(sp -> sp.getStatus() == SpecialPrice.Status.PENDING)));
adminReservationManager.confirmReservation(event.getShortName(), data.getLeft().getId(), username, EMPTY);
ticketCategoryRepository.findAllTicketCategories(event.getId()).forEach(tc -> assertTrue(specialPriceRepository.findAllByCategoryId(tc.getId()).stream().allMatch(sp -> sp.getStatus() == SpecialPrice.Status.TAKEN)));
assertFalse(ticketRepository.findAllReservationsConfirmedButNotAssignedForUpdate(event.getId()).contains(data.getLeft().getId()));
}
@Test
void loader_should_return_the_same_name_as_specified() {
final FilterResource cut = FilterResource.absolute( "content.filter" );
final Pair<String, FilterLoader> pair = cut.loader();
final String name = pair.getKey();
assertThat( name ).isEqualTo( cut.getName() );
}
private static long send_metric(long base, KafkaProducer<String, String> proceduer, String stream, int hostIndex) {
Pair<Long, String> pair = createEntity(base, stream, hostIndex);
base = pair.getKey();
ProducerRecord<String, String> record = new ProducerRecord<String, String>("perfmon_metrics",
pair.getRight());
proceduer.send(record);
return base;
}
@Before
public void ensureConfiguration() {
IntegrationTestUtil.ensureMinimalConfiguration(configurationRepository);
List<TicketCategoryModification> categories = Collections.singletonList(
new TicketCategoryModification(null, "default", AVAILABLE_SEATS,
new DateTimeModification(LocalDate.now().minusDays(1), LocalTime.now()),
new DateTimeModification(LocalDate.now().plusDays(1), LocalTime.now()),
DESCRIPTION, BigDecimal.TEN, false, "", false, null, null, null, null, null, 0, null, null, AlfioMetadata.empty()));
Pair<Event, String> eventAndUser = initEvent(categories, organizationRepository, userManager, eventManager, eventRepository);
event = eventAndUser.getKey();
user = eventAndUser.getValue() + "_owner";
}
/**
*
* @param successSolutionPair
* @param failureSolutionPair
* @param eeAttributeNameList
*/
public ElementCheckerImpl(Pair<TestSolution, String> successSolutionPair,
Pair<TestSolution, String> failureSolutionPair,
String... eeAttributeNameList) {
this.successSolutionPair.left = successSolutionPair.getKey();
this.successSolutionPair.right = successSolutionPair.getValue();
this.failureSolutionPair.left = failureSolutionPair.getKey();
this.failureSolutionPair.right = failureSolutionPair.getValue();
this.eeAttributeNames =
Arrays.copyOf(eeAttributeNameList, eeAttributeNameList.length);
}
@Test
public void testSubscribeDenied() {
List<TicketCategoryModification> categories = getPreSalesTicketCategoryModifications(false, AVAILABLE_SEATS, true, 10);
Pair<Event, String> pair = initEvent(categories, organizationRepository, userManager, eventManager, eventRepository);
Event event = pair.getKey();
TicketCategory firstCategory = eventManager.loadTicketCategories(event).stream().filter(t->t.getName().equals("defaultFirst")).findFirst().orElseThrow(IllegalStateException::new);
configurationManager.saveCategoryConfiguration(firstCategory.getId(), event.getId(), Collections.singletonList(new ConfigurationModification(null, ConfigurationKeys.MAX_AMOUNT_OF_TICKETS_BY_RESERVATION.getValue(), "1")), pair.getRight()+"_owner");
configurationManager.saveSystemConfiguration(ConfigurationKeys.ENABLE_PRE_REGISTRATION, "true");
configurationManager.saveSystemConfiguration(ConfigurationKeys.ENABLE_WAITING_QUEUE, "true");
configurationRepository.insertEventLevel(event.getOrganizationId(), event.getId(), ConfigurationKeys.STOP_WAITING_QUEUE_SUBSCRIPTIONS.name(), "true", "");
//subscription should now be denied
boolean result = waitingQueueManager.subscribe(event, customerJohnDoe(event), "[email protected]", null, Locale.ENGLISH);
assertFalse(result);
}
private void checkNoOutboundInternetTraffic(String instanceIP) {
String checkInternetCommand = "curl --max-time 30 cloudera.com";
try (SSHClient sshClient = createSshClient(instanceIP)) {
Pair<Integer, String> cmdOut = execute(sshClient, checkInternetCommand);
Log.log(LOGGER, format("Command exit status [%s] and result [%s].", cmdOut.getKey(), cmdOut.getValue()));
if (cmdOut.getKey() == 0) {
throw new TestFailException("Instance [" + instanceIP + "] has internet coonection but shouldn't have!");
}
} catch (Exception e) {
LOGGER.error("SSH fail on [{}] while executing command [{}]", instanceIP, checkInternetCommand);
throw new TestFailException(" SSH fail on [" + instanceIP + "] while executing command [" + checkInternetCommand + "].");
}
}
/**
* Makes Job Update Tasks required to reconcile the current ingestion jobs with the given source
* to store map. Compares the current ingestion jobs and source to store mapping to determine
* which whether jobs have started/stopped/updated in terms of Job Update tasks. Only tries to
* stop ingestion jobs its the required ingestions to maintained ingestion jobs are already
* RUNNING.
*
* @param sourceToStores a iterable of source to stores pairs where ingestion jobs would have to
* be maintained for ingestion to work correctly.
* @return list of job update tasks required to reconcile the current ingestion jobs to the state
* that is defined by sourceStoreMap.
*/
List<JobTask> makeJobUpdateTasks(Iterable<Pair<Source, Set<Store>>> sourceToStores) {
List<JobTask> jobTasks = new LinkedList<>();
// Ensure a running job for each source to store mapping
List<Job> activeJobs = new LinkedList<>();
boolean isSafeToStopJobs = true;
for (Pair<Source, Set<Store>> mapping : sourceToStores) {
Source source = mapping.getKey();
Set<Store> stores = mapping.getValue();
Set<FeatureSet> featureSets =
stores.stream()
.flatMap(s -> getFeatureSetsForStore(s).stream())
.collect(Collectors.toSet());
Job job = groupingStrategy.getOrCreateJob(source, stores);
if (job.isDeployed()) {
if (!job.isRunning()) {
jobTasks.add(UpdateJobStatusTask.builder().setJob(job).setJobManager(jobManager).build());
// Mark that it is not safe to stop jobs without disrupting ingestion
isSafeToStopJobs = false;
continue;
}
if (jobRequiresUpgrade(job, stores) && job.isRunning()) {
// Since we want to upgrade job without downtime
// it would make sense to spawn clone of current job
// and terminate old version on the next Poll.
// Both jobs should be in the same consumer group and not conflict with each other
job = job.clone();
job.setId(groupingStrategy.createJobId(job));
job.setStores(stores);
isSafeToStopJobs = false;
jobTasks.add(CreateJobTask.builder().setJob(job).setJobManager(jobManager).build());
} else {
jobTasks.add(UpdateJobStatusTask.builder().setJob(job).setJobManager(jobManager).build());
}
} else {
job.setId(groupingStrategy.createJobId(job));
jobTasks.add(CreateJobTask.builder().setJob(job).setJobManager(jobManager).build());
}
allocateFeatureSets(job, featureSets);
// Record the job as required to safeguard it from getting stopped
activeJobs.add(job);
}
// Stop extra jobs that are not required to maintain ingestion when safe
if (isSafeToStopJobs) {
getExtraJobs(activeJobs)
.forEach(
extraJob -> {
jobTasks.add(
TerminateJobTask.builder().setJob(extraJob).setJobManager(jobManager).build());
});
}
return jobTasks;
}
/**
* Perform rendering on a single url content, and append result to "writer". Automatically follows redirects
*
* @param pageUrl
* Address of the page containing the template
* @param incomingRequest
* originating request object
* @param renderers
* the renderers to use in order to transform the output
* @return The resulting response
* @throws IOException
* If an IOException occurs while writing to the writer
* @throws HttpErrorPage
* If an Exception occurs while retrieving the template
*/
public CloseableHttpResponse render(String pageUrl, IncomingRequest incomingRequest, Renderer... renderers)
throws IOException, HttpErrorPage {
DriverRequest driverRequest = new DriverRequest(incomingRequest, this, pageUrl);
// Replace ESI variables in URL
// TODO: should be performed in the ESI extension
String resultingPageUrl = VariablesResolver.replaceAllVariables(pageUrl, driverRequest);
String targetUrl = ResourceUtils.getHttpUrlWithQueryString(resultingPageUrl, driverRequest, false);
String currentValue;
CloseableHttpResponse response;
// Retrieve URL
// Get from cache to prevent multiple request to the same url if
// multiple fragments are used.
String cacheKey = CACHE_RESPONSE_PREFIX + targetUrl;
Pair<String, CloseableHttpResponse> cachedValue = incomingRequest.getAttribute(cacheKey);
// content and response were not in cache
if (cachedValue == null) {
OutgoingRequest outgoingRequest = requestExecutor.createOutgoingRequest(driverRequest, targetUrl, false);
headerManager.copyHeaders(driverRequest, outgoingRequest);
response = requestExecutor.execute(outgoingRequest);
int redirects = MAX_REDIRECTS;
try {
while (redirects > 0
&& this.redirectStrategy.isRedirected(outgoingRequest, response, outgoingRequest.getContext())) {
// Must consume the entity
EntityUtils.consumeQuietly(response.getEntity());
redirects--;
// Perform new request
outgoingRequest =
this.requestExecutor.createOutgoingRequest(
driverRequest,
this.redirectStrategy.getLocationURI(outgoingRequest, response,
outgoingRequest.getContext()).toString(), false);
this.headerManager.copyHeaders(driverRequest, outgoingRequest);
response = requestExecutor.execute(outgoingRequest);
}
} catch (ProtocolException e) {
throw new HttpErrorPage(HttpStatus.SC_BAD_GATEWAY, "Invalid response from server", e);
}
response = this.headerManager.copyHeaders(outgoingRequest, incomingRequest, response);
currentValue = HttpResponseUtils.toString(response, this.eventManager);
// Cache
cachedValue = new ImmutablePair<>(currentValue, response);
incomingRequest.setAttribute(cacheKey, cachedValue);
}
currentValue = cachedValue.getKey();
response = cachedValue.getValue();
logAction("render", pageUrl, renderers);
// Apply renderers
currentValue = performRendering(pageUrl, driverRequest, response, currentValue, renderers);
response.setEntity(new StringEntity(currentValue, HttpResponseUtils.getContentType(response)));
return response;
}
/**
* This methods handles the onResourceCompleted callback from the RM. Based on the ContainerExitStatus, it decides
* whether the container that exited is marked as complete or failure.
* @param resourceStatus status of the resource that completed
*/
public void onResourceCompleted(SamzaResourceStatus resourceStatus) {
String containerId = resourceStatus.getContainerId();
Pair<String, String> runningProcessorIdHostname = getRunningProcessor(containerId);
String processorId = runningProcessorIdHostname.getKey();
String hostName = runningProcessorIdHostname.getValue();
if (processorId == null) {
LOG.info("No running Processor ID found for Container ID: {} with Status: {}. Ignoring redundant notification.", containerId, resourceStatus.toString());
state.redundantNotifications.incrementAndGet();
if (resourceStatus.getExitCode() != SamzaResourceStatus.SUCCESS) {
// the requested container failed before assigning the request to it.
// Remove from the buffer if it is there
containerAllocator.releaseResource(containerId);
}
return;
}
state.runningProcessors.remove(processorId);
int exitStatus = resourceStatus.getExitCode();
switch (exitStatus) {
case SamzaResourceStatus.SUCCESS:
LOG.info("Container ID: {} for Processor ID: {} completed successfully.", containerId, processorId);
state.completedProcessors.incrementAndGet();
state.finishedProcessors.incrementAndGet();
processorFailures.remove(processorId);
if (state.completedProcessors.get() == state.processorCount.get()) {
LOG.info("Setting job status to SUCCEEDED since all containers have been marked as completed.");
state.status = SamzaApplicationState.SamzaAppStatus.SUCCEEDED;
}
break;
case SamzaResourceStatus.DISK_FAIL:
case SamzaResourceStatus.ABORTED:
case SamzaResourceStatus.PREEMPTED:
LOG.info("Container ID: {} for Processor ID: {} was released with an exit code: {}. This means that " +
"the container was killed by YARN, either due to being released by the application master " +
"or being 'lost' due to node failures etc. or due to preemption by the RM." +
"Requesting a new container for the processor.",
containerId, processorId, exitStatus);
state.releasedContainers.incrementAndGet();
// If this container was assigned some partitions (a processorId), then
// clean up, and request a new container for the processor. This only
// should happen if the container was 'lost' due to node failure, not
// if the AM released the container.
state.neededProcessors.incrementAndGet();
state.jobHealthy.set(false);
// handle container stop due to node fail
handleContainerStop(processorId, resourceStatus.getContainerId(), ResourceRequestState.ANY_HOST, exitStatus, Duration.ZERO);
break;
default:
onResourceCompletedWithUnknownStatus(resourceStatus, containerId, processorId, exitStatus);
}
if (diagnosticsManager.isDefined()) {
diagnosticsManager.get().addProcessorStopEvent(processorId, resourceStatus.getContainerId(), hostName, exitStatus);
}
}