下面列出了java.util.concurrent.ConcurrentSkipListSet#first ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* <pre>
* Trace the state of build job for the earliest(NOT ALL) segment for each streaming cube, and
* 1. try to promote into Ready HBase Segment if job's state is succeed
* 2. try to resume the build job if job's state is error
* </pre>
*
* @return all succeed building job
*/
@NonSideEffect
List<SegmentJobBuildInfo> traceEarliestSegmentBuildJob() {
List<SegmentJobBuildInfo> successJobs = Lists.newArrayList();
for (Map.Entry<String, ConcurrentSkipListSet<SegmentJobBuildInfo>> entry :
segmentBuildJobCheckList.entrySet()) {
ConcurrentSkipListSet<SegmentJobBuildInfo> buildInfos = entry.getValue();
if (buildInfos.isEmpty()) {
logger.trace("Skip {}", entry.getKey());
continue;
}
// find the earliest segment build job and try to promote
SegmentJobBuildInfo segmentBuildJob = buildInfos.first();
logger.debug("Check the cube:{} segment:{} build status.", segmentBuildJob.cubeName,
segmentBuildJob.segmentName);
try {
CubingJob cubingJob = (CubingJob) coordinator.getExecutableManager().getJob(segmentBuildJob.jobID);
if (cubingJob == null) {
logger.error("Cannot find metadata of current job.");
continue;
}
ExecutableState jobState = cubingJob.getStatus();
logger.debug("Current job state {}", jobState);
if (ExecutableState.SUCCEED.equals(jobState)) {
CubeManager cubeManager = coordinator.getCubeManager();
CubeInstance cubeInstance = cubeManager.getCube(segmentBuildJob.cubeName).latestCopyForWrite();
CubeSegment cubeSegment = cubeInstance.getSegment(segmentBuildJob.segmentName, null);
logger.info("The cube:{} segment:{} is ready to be promoted.", segmentBuildJob.cubeName,
segmentBuildJob.segmentName);
coordinator.getClusterManager().segmentBuildComplete(cubingJob, cubeInstance, cubeSegment,
segmentBuildJob);
addToCheckList(cubeInstance.getName());
successJobs.add(segmentBuildJob);
} else if (ExecutableState.ERROR.equals(jobState)) {
if (segmentBuildJob.retryCnt < 5) {
logger.info("Job:{} is error, resume the job.", segmentBuildJob);
coordinator.getExecutableManager().resumeJob(segmentBuildJob.jobID);
segmentBuildJob.retryCnt++;
} else {
logger.warn("Job:{} is error, exceed max retry. Kylin admin could resume it or discard it"
+ "(to let new building job be sumbitted) .", segmentBuildJob);
}
}
} catch (StoreException storeEx) {
logger.error("Error when check streaming segment job build state:" + segmentBuildJob, storeEx);
throw storeEx;
}
}
return successJobs;
}
/**
* <pre>
* Trace the state of build job for the earliest(NOT ALL) segment for each streaming cube, and
* 1. try to promote into Ready HBase Segment if job's state is succeed
* 2. try to resume the build job if job's state is error
* </pre>
*
* @return all succeed building job
*/
@NonSideEffect
List<SegmentJobBuildInfo> traceEarliestSegmentBuildJob() {
List<SegmentJobBuildInfo> successJobs = Lists.newArrayList();
for (Map.Entry<String, ConcurrentSkipListSet<SegmentJobBuildInfo>> entry :
segmentBuildJobCheckList.entrySet()) {
ConcurrentSkipListSet<SegmentJobBuildInfo> buildInfos = entry.getValue();
if (buildInfos.isEmpty()) {
logger.trace("Skip {}", entry.getKey());
continue;
}
// find the earliest segment build job and try to promote
SegmentJobBuildInfo segmentBuildJob = buildInfos.first();
logger.debug("Check the cube:{} segment:{} build status.", segmentBuildJob.cubeName,
segmentBuildJob.segmentName);
try {
CubingJob cubingJob = (CubingJob) coordinator.getExecutableManager().getJob(segmentBuildJob.jobID);
if (cubingJob == null) {
logger.error("Cannot find metadata of current job.");
continue;
}
ExecutableState jobState = cubingJob.getStatus();
logger.debug("Current job state {}", jobState);
if (ExecutableState.SUCCEED.equals(jobState)) {
CubeManager cubeManager = coordinator.getCubeManager();
CubeInstance cubeInstance = cubeManager.getCube(segmentBuildJob.cubeName).latestCopyForWrite();
CubeSegment cubeSegment = cubeInstance.getSegment(segmentBuildJob.segmentName, null);
logger.info("The cube:{} segment:{} is ready to be promoted.", segmentBuildJob.cubeName,
segmentBuildJob.segmentName);
coordinator.getClusterManager().segmentBuildComplete(cubingJob, cubeInstance, cubeSegment,
segmentBuildJob);
addToCheckList(cubeInstance.getName());
successJobs.add(segmentBuildJob);
} else if (ExecutableState.ERROR.equals(jobState)) {
if (segmentBuildJob.retryCnt < 5) {
logger.info("Job:{} is error, resume the job.", segmentBuildJob);
coordinator.getExecutableManager().resumeJob(segmentBuildJob.jobID);
segmentBuildJob.retryCnt++;
} else {
logger.warn("Job:{} is error, exceed max retry. Kylin admin could resume it or discard it"
+ "(to let new building job be sumbitted) .", segmentBuildJob);
}
}
} catch (StoreException storeEx) {
logger.error("Error when check streaming segment job build state:" + segmentBuildJob, storeEx);
throw storeEx;
}
}
return successJobs;
}
private double getCenterFromSet( final ConcurrentSkipListSet<Double> set ) {
return (set.last() - set.first()) / 2;
}