下面列出了怎么用org.apache.commons.lang3.tuple.MutablePair的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void process(HashMap<String, Number> pieNumbers)
{
@SuppressWarnings("unchecked")
HashMap<String, Object>[] result = (HashMap<String, Object>[])Array.newInstance(HashMap.class, pieNumbers.size());
int j = 0;
for (Entry<String, Number> e : pieNumbers.entrySet()) {
result[j] = new HashMap<>();
result[j].put("label", e.getKey());
result[j++].put("value", e.getValue());
}
if (operator.isWebSocketConnected) {
HashMap<String, Object> schemaObj = new HashMap<>();
schemaObj.put("type", "piechart");
schemaObj.put("n", operator.nInPie);
operator.wsoo.input.process(new MutablePair<String, Object>(operator.getFullTopic(operator.pieChartTopic, schemaObj), result));
} else {
operator.coo.input.process(pieNumbers);
}
}
/**
* Given a call graph and a CHA it creates a map of external calls and their call type. This map
* indicates the source methods by their unique within artifact id existing in the cha, target
* methods by their {@link FastenURI}, and a map that indicates the call type.
* @param cg {@link ComputedCallGraph}
* @param cha A Map of {@link ObjectType} and {@link RevisionCallGraph.Type}
* @return A map that each each entry of it is a {@link Pair} of source method's id, and target
* method's {@link FastenURI} as key and a map that shows call types as value. call types
* map's key is the name of JVM call type and the value is number of invocation by this call
* type for this specific edge.
*/
private Map<Pair<Integer, FastenURI>, Map<String, String>> getExternalCalls(
final ComputedCallGraph cg,
final Map<ObjectType, OPALType> cha) {
List<UnresolvedMethodCall> v = new ArrayList<>();
final var externlCalls = cg.unresolvedMethodCalls();
final Map<Pair<Integer, FastenURI>, Map<String, String>> result = new HashMap<>();
for (final var externalCall : JavaConverters.asJavaIterable(externlCalls)) {
final var call = new MutablePair<>(
cha.get(externalCall.caller().declaringClassFile().thisType()).getMethods()
.get(externalCall.caller()),
getTargetURI(externalCall));
final var typeOfCall =
externalCall.caller().instructionsOption().get()[externalCall.pc()].mnemonic();
putCall(result, call, typeOfCall);
}
return result;
}
/**
* Creates {@link Graph} for the given JSONObject.
* @param graph JSONObject of a graph including its internal calls and external calls.
*/
public Graph(final JSONObject graph) {
final var internalCalls = graph.getJSONArray("internalCalls");
this.internalCalls = new ArrayList<>();
final int numberOfArcs = internalCalls.length();
for (int i = 0; i < numberOfArcs; i++) {
final var pair = internalCalls.getJSONArray(i);
this.internalCalls.add(Arrays.asList((Integer) pair.get(0), (Integer) pair.get(1)));
}
final var externalCalls = graph.getJSONArray("externalCalls");
this.externalCalls = new HashMap<>();
final int numberOfExternalArcs = externalCalls.length();
for (int i = 0; i < numberOfExternalArcs; i++) {
final var call = externalCalls.getJSONArray(i);
final var callTypeJson = call.getJSONObject(2);
final Map<String, String> callType = new HashMap<>();
for (final var type : callTypeJson.keySet()) {
final String number = callTypeJson.getString(type);
callType.put(type, number);
}
this.externalCalls.put(new MutablePair<>(Integer.parseInt(call.getString(0)),
FastenURI.create(call.getString(1))), callType);
}
}
/**
* Issue requests to AM RM Client again if previous container requests expired and were not allocated by Yarn
* @param amRmClient
* @param requestedResources
* @param loopCounter
* @param resourceRequestor
* @param containerRequests
* @param removedContainerRequests
*/
public void reissueContainerRequests(AMRMClient<ContainerRequest> amRmClient, Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, ResourceRequestHandler resourceRequestor, List<ContainerRequest> containerRequests, List<ContainerRequest> removedContainerRequests)
{
if (!requestedResources.isEmpty()) {
for (Map.Entry<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> entry : requestedResources.entrySet()) {
/*
* Create container requests again if pending requests were not allocated by Yarn till timeout.
*/
if ((loopCounter - entry.getValue().getKey()) > NUMBER_MISSED_HEARTBEATS) {
StreamingContainerAgent.ContainerStartRequest csr = entry.getKey();
LOG.debug("Request for container {} timed out. Re-requesting container", csr.container);
removedContainerRequests.add(entry.getValue().getRight());
ContainerRequest cr = resourceRequestor.createContainerRequest(csr, false);
entry.getValue().setLeft(loopCounter);
entry.getValue().setRight(cr);
containerRequests.add(cr);
}
}
}
}
/**
* List list.
*
* @return the list
*/
public List<Deque<MutablePair<DeltaType, KubernetesObject>>> list() {
lock.readLock().lock();
List<Deque<MutablePair<DeltaType, KubernetesObject>>> objects = new ArrayList<>();
try {
// TODO: make a generic deep copy utility
for (Map.Entry<String, Deque<MutablePair<DeltaType, KubernetesObject>>> entry :
items.entrySet()) {
Deque<MutablePair<DeltaType, KubernetesObject>> copiedDeltas =
new LinkedList<>(entry.getValue());
objects.add(copiedDeltas);
}
} finally {
lock.readLock().unlock();
}
return objects;
}
public static Pair<int[],byte[]> getNumTandemRepeatUnits(final byte[] refBases, final byte[] altBases, final byte[] remainingRefContext) {
/* we can't exactly apply same logic as in basesAreRepeated() to compute tandem unit and number of repeated units.
Consider case where ref =ATATAT and we have an insertion of ATAT. Natural description is (AT)3 -> (AT)2.
*/
byte[] longB;
// find first repeat unit based on either ref or alt, whichever is longer
if (altBases.length > refBases.length)
longB = altBases;
else
longB = refBases;
// see if non-null allele (either ref or alt, whichever is longer) can be decomposed into several identical tandem units
// for example, -*,CACA needs to first be decomposed into (CA)2
final int repeatUnitLength = findRepeatedSubstring(longB);
final byte[] repeatUnit = Arrays.copyOf(longB, repeatUnitLength);
final int[] repetitionCount = new int[2];
// look for repetitions forward on the ref bases (i.e. starting at beginning of ref bases)
int repetitionsInRef = findNumberOfRepetitions(repeatUnit, refBases, true);
repetitionCount[0] = findNumberOfRepetitions(repeatUnit, ArrayUtils.addAll(refBases, remainingRefContext), true)-repetitionsInRef;
repetitionCount[1] = findNumberOfRepetitions(repeatUnit, ArrayUtils.addAll(altBases, remainingRefContext), true)-repetitionsInRef;
return new MutablePair<>(repetitionCount, repeatUnit);
}
/**
* Return the longest suffix of bases shared among all provided vertices
*
* For example, if the vertices have sequences AC, CC, and ATC, this would return
* a single C. However, for ACC and TCC this would return CC. And for AC and TG this
* would return null;
*
* @param middleVertices a non-empty set of vertices
* @return
*/
@VisibleForTesting
static Pair<SeqVertex, SeqVertex> commonPrefixAndSuffixOfVertices(final Collection<SeqVertex> middleVertices) {
final List<byte[]> kmers = new ArrayList<>(middleVertices.size());
int min = Integer.MAX_VALUE;
for ( final SeqVertex v : middleVertices ) {
kmers.add(v.getSequence());
min = Math.min(min, v.getSequence().length);
}
final int prefixLen = GraphUtils.commonMaximumPrefixLength(kmers);
final int suffixLen = GraphUtils.commonMaximumSuffixLength(kmers, min - prefixLen);
final byte[] kmer = kmers.get(0);
final byte[] prefix = Arrays.copyOfRange(kmer, 0, prefixLen);
final byte[] suffix = Arrays.copyOfRange(kmer, kmer.length - suffixLen, kmer.length);
return new MutablePair<>(new SeqVertex(prefix), new SeqVertex(suffix));
}
@Test
public void testDeltaFIFOResync() {
V1Pod foo1 = new V1Pod().metadata(new V1ObjectMeta().name("foo1").namespace("default"));
Cache cache = new Cache();
DeltaFIFO deltaFIFO = new DeltaFIFO(Caches::deletionHandlingMetaNamespaceKeyFunc, cache);
// sync after add
cache.add(foo1);
deltaFIFO.resync();
Deque<MutablePair<DeltaFIFO.DeltaType, KubernetesObject>> deltas =
deltaFIFO.getItems().get(Caches.deletionHandlingMetaNamespaceKeyFunc(foo1));
assertEquals(1, deltas.size());
assertEquals(foo1, deltas.peekLast().getRight());
assertEquals(DeltaFIFO.DeltaType.Sync, deltas.peekLast().getLeft());
}
@Override
protected Serializable onApplyInternal(final SchemaTO modelObject) {
modelObject.getLabels().clear();
modelObject.getLabels().putAll(translations.getObject().stream().
filter(Objects::nonNull).
filter(translation -> translation.getKey() != null).
filter(translation -> translation.getValue() != null).
collect(Collectors.toMap(MutablePair::getKey, MutablePair::getValue)));
if (getOriginalItem() == null || StringUtils.isBlank(getOriginalItem().getKey())) {
SchemaRestClient.create(schemaType, modelObject);
} else {
SchemaRestClient.update(schemaType, modelObject);
}
return null;
}
/**
* Implement InputOperator Interface.
*/
@Override
public void emitTuples()
{
if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
return;
}
int count = consumer.getQueueSize();
if (maxTuplesPerWindow > 0) {
count = Math.min(count, maxTuplesPerWindow - emitCount);
}
for (int i = 0; i < count; i++) {
Pair<String, Record> data = consumer.pollRecord();
String shardId = data.getFirst();
String recordId = data.getSecond().getSequenceNumber();
emitTuple(data);
MutablePair<String, Integer> shardOffsetAndCount = currentWindowRecoveryState.get(shardId);
if (shardOffsetAndCount == null) {
currentWindowRecoveryState.put(shardId, new MutablePair<String, Integer>(recordId, 1));
} else {
shardOffsetAndCount.setRight(shardOffsetAndCount.right + 1);
}
shardPosition.put(shardId, recordId);
}
emitCount += count;
}
public static MutablePair<Long, Long> getProcNetDevStats() throws Exception {
ProcessBuilder ps = new ProcessBuilder("cat", "/proc/net/dev");
Process pr = ps.start();
pr.waitFor();
BufferedReader in = new BufferedReader(new InputStreamReader(pr.getInputStream()));
String line;
int counter = 0;
long receivedBytes = 0;
long outBytes = 0;
while ((line = in.readLine()) != null) {
System.out.println(counter + ": " + line);
if (line.contains("eth0")) {
String[] strs = line.split(" ");
receivedBytes = Long.parseLong(strs[3]);
outBytes = Long.parseLong(strs[41]);
System.out.println(" inBytes = " + receivedBytes + " outBytes = " + outBytes);
}
counter++;
}
in.close();
MutablePair<Long, Long> result = new MutablePair<>(receivedBytes, outBytes);
return result;
}
/**
* Find the 0-based index within a read base array corresponding to a given 1-based position in the reference, along with the cigar operator of
* the element containing that base. If the reference coordinate occurs within a deletion, the first index after the deletion is returned.
* Note that this treats soft-clipped bases as if they align with the reference, which is useful for hard-clipping reads with soft clips.
*
* @param alignmentStart The soft start of the read on the reference
* @param cigar The read's cigar
* @param refCoord The target reference coordinate
* @return If the reference coordinate occurs before the read start or after the read end {@code CLIPPING_GOAL_NOT_REACHED};
* if the reference coordinate falls within an alignment block of the read's cigar, the corresponding read coordinate;
* if the reference coordinate falls within a deletion, the first read coordinate after the deletion. Note: if the last cigar element is
* a deletion (which isn't meaningful), it returns {@code CLIPPING_GOAL_NOT_REACHED}.
*/
public static Pair<Integer, CigarOperator> getReadIndexForReferenceCoordinate(final int alignmentStart, final Cigar cigar, final int refCoord) {
if (refCoord < alignmentStart) {
return new MutablePair<>(READ_INDEX_NOT_FOUND, null);
}
int firstReadPosOfElement = 0; //inclusive
int firstRefPosOfElement = alignmentStart; //inclusive
int lastReadPosOfElement = 0; //exclusive
int lastRefPosOfElement = alignmentStart; //exclusive
// advance forward through all the cigar elements until we bracket the reference coordinate
for (final CigarElement element : cigar) {
final CigarOperator operator = element.getOperator();
firstReadPosOfElement = lastReadPosOfElement;
firstRefPosOfElement = lastRefPosOfElement;
lastReadPosOfElement += operator.consumesReadBases() ? element.getLength() : 0;
lastRefPosOfElement += operator.consumesReferenceBases() || operator == CigarOperator.S ? element.getLength() : 0;
if (firstRefPosOfElement <= refCoord && refCoord < lastRefPosOfElement) { // refCoord falls within this cigar element
final int readPosAtRefCoord = firstReadPosOfElement + (operator.consumesReadBases() ? ( refCoord - firstRefPosOfElement) : 0);
return Pair.of(readPosAtRefCoord, operator);
}
}
return new MutablePair<>(READ_INDEX_NOT_FOUND, null);
}
@Override
public void process(TimeSeriesData[] tuple)
{
@SuppressWarnings({"unchecked", "rawtypes"})
HashMap<String, Number>[] timeseriesMapData = new HashMap[tuple.length];
int i = 0;
for (TimeSeriesData data : tuple) {
HashMap<String, Number> timeseriesMap = Maps.newHashMapWithExpectedSize(2);
timeseriesMap.put("timestamp", data.time);
timeseriesMap.put("value", data.data);
timeseriesMapData[i++] = timeseriesMap;
}
if (operator.isWebSocketConnected) {
HashMap<String, Object> schemaObj = new HashMap<>();
schemaObj.put("type", "timeseries");
schemaObj.put("minValue", operator.timeSeriesMin);
schemaObj.put("maxValue", operator.timeSeriesMax);
operator.wsoo.input.process(new MutablePair<String, Object>(operator.getFullTopic( operator.timeSeriesTopic, schemaObj), timeseriesMapData));
} else {
operator.coo.input.process(tuple);
}
}
@Override
public void trim()
{
for (Iterator<Entry<String, MutablePair<String, CommentsNodeImpl>>> iterator = this.dataMap.entrySet().iterator(); iterator.hasNext(); )
{
Entry<String, MutablePair<String, CommentsNodeImpl>> entry = iterator.next();
MutablePair<String, CommentsNodeImpl> value = entry.getValue();
CommentsNodeImpl right = value.getRight();
if (right != null)
{
right.trim();
}
if (((right == null) || right.dataMap.isEmpty()) && (value.getLeft() == null))
{
iterator.remove();
continue;
}
if (right == null)
{
continue;
}
right.trim();
}
}
@Override
@Nullable
public String getComment(String path)
{
MutablePair<String, CommentsNodeImpl> nodePair = this.dataMap.get(path);
if (nodePair != null)
{
String comment = nodePair.getLeft();
if (comment != null)
{
return comment;
}
}
MutablePair<String, CommentsNodeImpl> anyNodePair = this.dataMap.get(ANY);
if (anyNodePair != null)
{
return anyNodePair.getKey();
}
return null;
}
@Override
public CommentsNodeImpl getNode(String path)
{
MutablePair<String, CommentsNodeImpl> nodePair = this.dataMap.get(path);
CommentsNodeImpl node = (nodePair == null) ? null : nodePair.getRight();
if (node == null)
{
MutablePair<String, CommentsNodeImpl> anyNodePair = this.dataMap.get(ANY);
node = (anyNodePair == null) ? null : anyNodePair.getRight();
if (node == null)
{
CommentsNodeImpl commentsNode = new CommentsNodeImpl(this);
if (nodePair != null)
{
nodePair.setRight(commentsNode);
}
else
{
this.dataMap.put(path, new MutablePair<>(null, commentsNode));
}
return commentsNode;
}
return node;
}
return node;
}
private LSPServerStatusWidget(LanguageServerWrapper wrapper) {
this.wrapper = wrapper;
this.ext = wrapper.getServerDefinition().ext;
this.project = wrapper.getProject();
this.projectName = project.getName();
this.icons = GUIUtils.getIconProviderFor(wrapper.getServerDefinition()).getStatusIcons();
for (Timeouts t : Timeouts.values()) {
timeouts.put(t, new MutablePair<>(0, 0));
}
}
@Test
public void shouldCopyConstructorWorkProperly() {
int numberOfObjectives = 2;
int numberOfConstraints = 1;
DoubleSolution doubleSolution =
new DefaultDoubleSolution(
Arrays.asList(new MutablePair<>(3.0, 5.0), new MutablePair<>(1.0, 3.0)),
numberOfObjectives,
numberOfConstraints);
IntegerSolution integerSolution =
new DefaultIntegerSolution(
Arrays.asList(new MutablePair<>(2, 10)), numberOfObjectives, numberOfConstraints);
CompositeSolution solution =
new CompositeSolution(Arrays.asList(doubleSolution, integerSolution));
CompositeSolution newSolution = new CompositeSolution(solution) ;
assertEquals(solution.getNumberOfVariables(), newSolution.getNumberOfVariables());
assertEquals(solution.getNumberOfObjectives(), newSolution.getNumberOfObjectives());
assertEquals(solution.getNumberOfConstraints(), newSolution.getNumberOfConstraints());
assertEquals(solution.getVariable(0).getNumberOfVariables(), newSolution.getVariable(0).getNumberOfVariables());
assertEquals(solution.getVariable(1).getNumberOfVariables(), newSolution.getVariable(1).getNumberOfVariables());
assertNotSame(solution.getVariable(0), newSolution.getVariable(0));
assertNotSame(solution.getVariable(1), newSolution.getVariable(1));
assertEquals(solution.getVariable(0).getVariables(), newSolution.getVariable(0).getVariables());
assertEquals(solution.getVariable(1).getVariables(), newSolution.getVariable(1).getVariables());
}
private void recreateContainerRequest(Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, ResourceRequestHandler resourceRequestor, List<ContainerRequest> removedContainerRequests)
{
for (Map.Entry<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> entry : requestedResources.entrySet()) {
if ((loopCounter - entry.getValue().getKey()) > NUMBER_MISSED_HEARTBEATS) {
StreamingContainerAgent.ContainerStartRequest csr = entry.getKey();
removedContainerRequests.add(entry.getValue().getRight());
ContainerRequest cr = resourceRequestor.createContainerRequest(csr, false);
if (cr.getNodes() != null && !cr.getNodes().isEmpty()) {
addHostSpecificRequest(csr, cr);
} else {
otherContainerRequests.put(cr, csr);
}
}
}
}
@Override
public void addContainerRequest(Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, List<ContainerRequest> containerRequests, StreamingContainerAgent.ContainerStartRequest csr, ContainerRequest cr)
{
if (cr.getNodes() != null && !cr.getNodes().isEmpty()) {
// Put it in a Map to check if multiple requests can be combined
addHostSpecificRequest(csr, cr);
} else {
LOG.info("No node specific request ", cr);
otherContainerRequests.put(cr, csr);
}
}
@Override
public void process(AbstractFileSplitter.FileMetadata fileMetadata)
{
blockInfo.clear();
long[] blocks = fileMetadata.getBlockIds();
String relativePath = fileMetadata.getRelativePath();
for (int i = 0; i < blocks.length; i++) {
blockInfo.put(blocks[i], new MutablePair<>(i + 1, relativePath));
}
}
@Override
public void process(Object tuple)
{
if (operator.isWebSocketConnected) {
HashMap<String, Object> schemaObj = new HashMap<String, Object>();
schemaObj.put("type", "simple");
operator.wsoo.input.process(new MutablePair<String, Object>(operator.getFullTopic(operator.simpleTopic, schemaObj), tuple.toString()));
} else {
operator.coo.input.process(tuple);
}
}
private static boolean resolve(Map<Pair<Integer, FastenURI>, Map<String, String>> result,
Map.Entry<Pair<Integer, FastenURI>, Map<String, String>> arc,
List<FastenURI> methods,
String product) {
for (final var method : methods) {
if (method.getEntity().contains(getSignature(arc.getKey().getValue().getEntity()))) {
result.put(new MutablePair<>(arc.getKey().getLeft(),
new FastenJavaURI("//" + product + method)), arc.getValue());
return true;
}
}
return false;
}
/**
* Performs a federated binary aggregation (currently only MV and VM is supported).
*
* @param mo1 the first matrix object
* @param mo2 the other matrix object
* @param out output matrix object
*/
private static void federatedAggregateBinary(MatrixObject mo1, MatrixObject mo2, MatrixObject out) {
boolean distributeCols = false;
// if distributeCols = true we distribute cols of mo2 and do a MV multiplications, otherwise we
// distribute rows of mo1 and do VM multiplications
if (mo1.isFederated() && mo2.isFederated()) {
// both are federated -> distribute smaller matrix
// TODO do more in depth checks like: how many federated workers, how big is the actual data we send and so on
// maybe once we track number of non zeros we could use that to get a better estimation of how much data
// will be requested?
distributeCols = mo2.getNumColumns() * mo2.getNumRows() < mo1.getNumColumns() * mo1.getNumRows();
}
else if (mo2.isFederated() && !mo1.isFederated()) {
// Distribute mo1 which is not federated
distributeCols = true;
}
// TODO performance if both matrices are federated
Map<FederatedRange, FederatedData> mapping = distributeCols ? mo1.getFedMapping() : mo2.getFedMapping();
MatrixBlock matrixBlock = distributeCols ? mo2.acquireRead() : mo1.acquireRead();
ExecutorService pool = CommonThreadPool.get(mapping.size());
ArrayList<Pair<FederatedRange, MatrixBlock>> results = new ArrayList<>();
ArrayList<FederatedMMTask> tasks = new ArrayList<>();
for (Map.Entry<FederatedRange, FederatedData> fedMap : mapping.entrySet()) {
// this resultPair will contain both position of partial result and the partial result itself of the operations
MutablePair<FederatedRange, MatrixBlock> resultPair = new MutablePair<>();
// they all get references to the real block, the task slices out the needed part and does the
// multiplication, therefore they can share the object since we use it immutably
tasks.add(new FederatedMMTask(fedMap.getKey(), fedMap.getValue(), resultPair, matrixBlock, distributeCols));
results.add(resultPair);
}
CommonThreadPool.invokeAndShutdown(pool, tasks);
(distributeCols?mo2:mo1).release();
// combine results
if (mo1.getNumRows() > Integer.MAX_VALUE || mo2.getNumColumns() > Integer.MAX_VALUE) {
throw new DMLRuntimeException("Federated matrix is too large for federated distribution");
}
out.acquireModify(combinePartialMMResults(results, (int) mo1.getNumRows(), (int) mo2.getNumColumns()));
out.release();
}
public FederatedMMTask(FederatedRange range, FederatedData fedData,
MutablePair<FederatedRange, MatrixBlock> result, MatrixBlock otherMatrix, boolean distributeCols)
{
_range = range;
_data = fedData;
_result = result;
_otherMatrix = otherMatrix;
_distributeCols = distributeCols;
}
/**
* Determine the appropriate start and stop offsets in the reads for the bases given the cigar string
* @param read
* @return
*/
private final Pair<Integer,Integer> calculateQueryRange(final GATKRead read) {
int queryStart = -1, queryStop = -1;
int readI = 0;
// iterate over the cigar elements to determine the start and stop of the read bases for the BAQ calculation
for ( CigarElement elt : read.getCigarElements() ) {
switch (elt.getOperator()) {
case N: return null; // cannot handle these
case H : case P : case D: break; // ignore pads, hard clips, and deletions
case I : case S: case M: case EQ: case X:
int prev = readI;
readI += elt.getLength();
if ( elt.getOperator() != CigarOperator.S) {
if ( queryStart == -1 ) {
queryStart = prev;
}
queryStop = readI;
}
// in the else case we aren't including soft clipped bases, so we don't update
// queryStart or queryStop
break;
default: throw new GATKException("BUG: Unexpected CIGAR element " + elt + " in read " + read.getName());
}
}
if ( queryStop == queryStart ) {
// this read is completely clipped away, and yet is present in the file for some reason
// usually they are flagged as non-PF, but it's possible to push them through the BAM
//System.err.printf("WARNING -- read is completely clipped away: " + read.format());
return null;
}
return new MutablePair<>(queryStart, queryStop);
}
public static MutablePair<Text, Boolean> transportOneRecord(
Record record, char fieldDelimiter, List<Configuration> columnsConfiguration, TaskPluginCollector taskPluginCollector) {
MutablePair<List<Object>, Boolean> transportResultList = transportOneRecord(record,columnsConfiguration,taskPluginCollector);
//保存<转换后的数据,是否是脏数据>
MutablePair<Text, Boolean> transportResult = new MutablePair<Text, Boolean>();
transportResult.setRight(false);
if(null != transportResultList){
Text recordResult = new Text(StringUtils.join(transportResultList.getLeft(), fieldDelimiter));
transportResult.setRight(transportResultList.getRight());
transportResult.setLeft(recordResult);
}
return transportResult;
}
public AbstractKinesisInputOperator()
{
/*
* Application may override the windowDataManger behaviour but default
* would be NoopWindowDataManager.
*/
windowDataManager = new WindowDataManager.NoopWindowDataManager();
currentWindowRecoveryState = new HashMap<String, MutablePair<String, Integer>>();
}
private static Pair<String, String> getApiGroup(String name) {
MutablePair<String, String> parts = new MutablePair<>();
for (Map.Entry<String, String> entry : apiGroups.entrySet()) {
if (name.startsWith(entry.getKey())) {
parts.left = entry.getValue();
parts.right = name.substring(entry.getKey().length());
break;
}
}
if (parts.left == null) parts.right = name;
return parts;
}
public Controller(
Class<ApiType> apiTypeClass,
DeltaFIFO queue,
ListerWatcher<ApiType, ApiListType> listerWatcher,
Consumer<Deque<MutablePair<DeltaFIFO.DeltaType, KubernetesObject>>> processFunc,
Supplier<Boolean> resyncFunc,
long fullResyncPeriod) {
this.queue = queue;
this.listerWatcher = listerWatcher;
this.apiTypeClass = apiTypeClass;
this.processFunc = processFunc;
this.resyncFunc = resyncFunc;
this.fullResyncPeriod = fullResyncPeriod;
// starts one daemon thread for reflector
this.reflectExecutor =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("controller-reflector-" + apiTypeClass.getName() + "-%d")
.build());
// starts one daemon thread for resync
this.resyncExecutor =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("controller-resync-" + apiTypeClass.getName() + "-%d")
.build());
}