下面列出了怎么用org.apache.commons.lang.mutable.MutableBoolean的API类实例代码及写法,或者点击链接到github查看源代码。
public SelectionWorker(int start, int stop, final List<Campaign> list, final BidRequest br,
final boolean exchangeIsAdx, MutableBoolean flag, boolean test) {
if (test) {
logger.info("WORKER: {} - {}",start,stop);
}
this.test = test;
this.start = start;
this.stop = stop;
for (int i=start;i<stop;i++) {
this.list.add(list.get(i));
}
this.exchangeIsAdx = exchangeIsAdx;
count = 0;
this.br = br;
this.flag = flag;
}
public JexlNode getIndexQuery(Set<String> termFrequencyFields, Set<String> indexedFields, Set<String> contentFields) {
LinkedList<JexlNode> nodes = Lists.newLinkedList();
// get the cartesian product of all the fields and terms
MutableBoolean oredFields = new MutableBoolean();
Set<String>[] fieldsAndTerms = fieldsAndTerms(termFrequencyFields, indexedFields, contentFields, oredFields, true);
if (!fieldsAndTerms[0].isEmpty()) {
final JexlNode eq = new ASTEQNode(ParserTreeConstants.JJTEQNODE);
for (String field : fieldsAndTerms[0]) {
nodes.add(JexlNodeFactory.createNodeTreeFromFieldValues(ContainerType.AND_NODE, eq, null, field, fieldsAndTerms[1]));
}
}
// A single field needs no wrapper node.
if (1 == fieldsAndTerms[0].size()) {
return nodes.iterator().next();
} else if (oredFields.booleanValue()) {
return JexlNodeFactory.createOrNode(nodes);
} else {
return JexlNodeFactory.createAndNode(nodes);
}
}
@Override
public void onSpeculativeExecutionCheck() {
MutableBoolean isNewCloneCreated = new MutableBoolean(false);
BatchSchedulerUtils.selectEarliestSchedulableGroup(sortedScheduleGroups, planStateManager)
.ifPresent(scheduleGroup ->
scheduleGroup.stream().map(Stage::getId).forEach(stageId -> {
final Stage stage = planStateManager.getPhysicalPlan().getStageDAG().getVertexById(stageId);
// Only if the ClonedSchedulingProperty is set...
stage.getPropertyValue(ClonedSchedulingProperty.class).ifPresent(cloneConf -> {
if (!cloneConf.isUpFrontCloning()) { // Upfront cloning is already handled.
isNewCloneCreated.setValue(doSpeculativeExecution(stage, cloneConf));
}
});
}));
if (isNewCloneCreated.booleanValue()) {
doSchedule(); // Do schedule the new clone.
}
}
@Test
public void testCancelledRunableGC() throws Exception {
final MutableBoolean collected = new MutableBoolean();
Runnable r = new Runnable() {
@Override
public void run() {
cronService.cancel(this);
}
@Override
public void finalize() throws Throwable {
collected.setValue(true);
super.finalize();
}
};
cronService.schedule(r, "0/2 * * * * ?");
r = null;
sleep(5);
for (int i = 0; i < 100; i++) {
System.gc();
}
assertEquals(collected.booleanValue(), true);
}
public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer,
MutableBoolean usedPeer, ExtendedBlockId blockId,
String clientName) throws IOException {
lock.lock();
try {
if (closed) {
LOG.trace(this + ": the DfsClientShmManager isclosed.");
return null;
}
EndpointShmManager shmManager = datanodes.get(datanode);
if (shmManager == null) {
shmManager = new EndpointShmManager(datanode);
datanodes.put(datanode, shmManager);
}
return shmManager.allocSlot(peer, usedPeer, clientName, blockId);
} finally {
lock.unlock();
}
}
public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer,
MutableBoolean usedPeer, ExtendedBlockId blockId,
String clientName) throws IOException {
lock.lock();
try {
if (closed) {
LOG.trace(this + ": the DfsClientShmManager isclosed.");
return null;
}
EndpointShmManager shmManager = datanodes.get(datanode);
if (shmManager == null) {
shmManager = new EndpointShmManager(datanode);
datanodes.put(datanode, shmManager);
}
return shmManager.allocSlot(peer, usedPeer, clientName, blockId);
} finally {
lock.unlock();
}
}
/**
* Erase table base alias and get where part of main table when joins
* @param exp
* @param tholder
* @return
*/
private Expression preprocessWhere(Expression exp, FromHolder tholder){
if(sqlCommandInfoHolder.getJoins()!=null && !sqlCommandInfoHolder.getJoins().isEmpty()) {
ExpressionHolder partialWhereExpHolder = new ExpressionHolder(null);
MutableBoolean haveOrExpression = new MutableBoolean(false);
exp.accept(new WhereVisitorMatchAndLookupPipelineMatchBuilder(tholder.getBaseAliasTable(), partialWhereExpHolder, haveOrExpression));
if(haveOrExpression.booleanValue()) {
return null;//with or exp we can't use match first step
}
exp = partialWhereExpHolder.getExpression();
}
if(exp != null) {
exp.accept(new ExpVisitorEraseAliasTableBaseBuilder(tholder.getBaseAliasTable()));
}
return exp;
}
@Override
protected FrameBlock readBlobFromRDD(RDDObject rdd, MutableBoolean status)
throws IOException
{
//note: the read of a frame block from an RDD might trigger
//lazy evaluation of pending transformations.
RDDObject lrdd = rdd;
//prepare return status (by default only collect)
status.setValue(false);
MetaDataFormat iimd = (MetaDataFormat) _metaData;
DataCharacteristics dc = iimd.getDataCharacteristics();
int rlen = (int)dc.getRows();
int clen = (int)dc.getCols();
//handle missing schema if necessary
ValueType[] lschema = (_schema!=null) ? _schema :
UtilFunctions.nCopies(clen>=1 ? (int)clen : 1, ValueType.STRING);
FrameBlock fb = null;
try {
//prevent unnecessary collect through rdd checkpoint
if( rdd.allowsShortCircuitCollect() ) {
lrdd = (RDDObject)rdd.getLineageChilds().get(0);
}
//collect frame block from binary block RDD
fb = SparkExecutionContext.toFrameBlock(lrdd, lschema, rlen, clen);
}
catch(DMLRuntimeException ex) {
throw new IOException(ex);
}
//sanity check correct output
if( fb == null )
throw new IOException("Unable to load frame from rdd.");
return fb;
}
@Override
@SuppressWarnings("unchecked")
protected TensorBlock readBlobFromRDD(RDDObject rdd, MutableBoolean status) {
status.setValue(false);
TensorCharacteristics tc = (TensorCharacteristics) _metaData.getDataCharacteristics();
// TODO correct blocksize;
// TODO read from RDD
return SparkExecutionContext.toTensorBlock((JavaPairRDD<TensorIndexes, TensorBlock>) rdd.getRDD(), tc);
}
private void waitForReplicaAnchorStatus(final ShortCircuitCache cache,
final ExtendedBlock block, final boolean expectedIsAnchorable,
final boolean expectedIsAnchored, final int expectedOutstandingMmaps)
throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
final MutableBoolean result = new MutableBoolean(false);
cache.accept(new CacheVisitor() {
@Override
public void visit(int numOutstandingMmaps,
Map<ExtendedBlockId, ShortCircuitReplica> replicas,
Map<ExtendedBlockId, InvalidToken> failedLoads,
Map<Long, ShortCircuitReplica> evictable,
Map<Long, ShortCircuitReplica> evictableMmapped) {
Assert.assertEquals(expectedOutstandingMmaps, numOutstandingMmaps);
ShortCircuitReplica replica =
replicas.get(ExtendedBlockId.fromExtendedBlock(block));
Assert.assertNotNull(replica);
Slot slot = replica.getSlot();
if ((expectedIsAnchorable != slot.isAnchorable()) ||
(expectedIsAnchored != slot.isAnchored())) {
LOG.info("replica " + replica + " has isAnchorable = " +
slot.isAnchorable() + ", isAnchored = " + slot.isAnchored() +
". Waiting for isAnchorable = " + expectedIsAnchorable +
", isAnchored = " + expectedIsAnchored);
return;
}
result.setValue(true);
}
});
return result.toBoolean();
}
}, 10, 60000);
}
@Test(timeout=60000)
public void testExpiry() throws Exception {
final ShortCircuitCache cache =
new ShortCircuitCache(2, 1, 1, 10000000, 1, 10000000, 0);
final TestFileDescriptorPair pair = new TestFileDescriptorPair();
ShortCircuitReplicaInfo replicaInfo1 =
cache.fetchOrCreate(
new ExtendedBlockId(123, "test_bp1"),
new SimpleReplicaCreator(123, cache, pair));
Preconditions.checkNotNull(replicaInfo1.getReplica());
Preconditions.checkState(replicaInfo1.getInvalidTokenException() == null);
pair.compareWith(replicaInfo1.getReplica().getDataStream(),
replicaInfo1.getReplica().getMetaStream());
replicaInfo1.getReplica().unref();
final MutableBoolean triedToCreate = new MutableBoolean(false);
do {
Thread.sleep(10);
ShortCircuitReplicaInfo replicaInfo2 =
cache.fetchOrCreate(
new ExtendedBlockId(123, "test_bp1"), new ShortCircuitReplicaCreator() {
@Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
triedToCreate.setValue(true);
return null;
}
});
if ((replicaInfo2 != null) && (replicaInfo2.getReplica() != null)) {
replicaInfo2.getReplica().unref();
}
} while (triedToCreate.isFalse());
cache.close();
}
private void addToken(TokenGroup group, int start, int end, String charString, MutableBoolean isNewLine,
Token lastToken) {
Token token = new Token(start, end, charString);
if (isNewLine.booleanValue()) {
group.addNewLine(start);
isNewLine.setValue(false);
}
token.setPreceedBySpace(start - lastToken.getEnd() > 0);
int spaces = 0;
if (lastToken != null && lastToken.getEnd() != 0) {
int endLast = lastToken.getEnd();
spaces = lastToken.getSpaceOffset();
if (start == endLast) {
spaces++;
} else {
spaces -= Math.max(0, start - endLast - 1);
}
}
token.setSpaceOffset(spaces);
// Normalization
String n;
if (charString.length() == 1) {
int c = charString.charAt(0);
n = normalizedChars.get(c);
} else {
n = normalizedStrings.get(charString);
}
if (n != null) {
token.setNormForm(n);
}
lastToken.updateByToken(token);
group.addToken(token);
}
private void waitForReplicaAnchorStatus(final ShortCircuitCache cache,
final ExtendedBlock block, final boolean expectedIsAnchorable,
final boolean expectedIsAnchored, final int expectedOutstandingMmaps)
throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
final MutableBoolean result = new MutableBoolean(false);
cache.accept(new CacheVisitor() {
@Override
public void visit(int numOutstandingMmaps,
Map<ExtendedBlockId, ShortCircuitReplica> replicas,
Map<ExtendedBlockId, InvalidToken> failedLoads,
Map<Long, ShortCircuitReplica> evictable,
Map<Long, ShortCircuitReplica> evictableMmapped) {
Assert.assertEquals(expectedOutstandingMmaps, numOutstandingMmaps);
ShortCircuitReplica replica =
replicas.get(ExtendedBlockId.fromExtendedBlock(block));
Assert.assertNotNull(replica);
Slot slot = replica.getSlot();
if ((expectedIsAnchorable != slot.isAnchorable()) ||
(expectedIsAnchored != slot.isAnchored())) {
LOG.info("replica " + replica + " has isAnchorable = " +
slot.isAnchorable() + ", isAnchored = " + slot.isAnchored() +
". Waiting for isAnchorable = " + expectedIsAnchorable +
", isAnchored = " + expectedIsAnchored);
return;
}
result.setValue(true);
}
});
return result.toBoolean();
}
}, 10, 60000);
}
@Test(timeout=60000)
public void testExpiry() throws Exception {
final ShortCircuitCache cache =
new ShortCircuitCache(2, 1, 1, 10000000, 1, 10000000, 0);
final TestFileDescriptorPair pair = new TestFileDescriptorPair();
ShortCircuitReplicaInfo replicaInfo1 =
cache.fetchOrCreate(
new ExtendedBlockId(123, "test_bp1"),
new SimpleReplicaCreator(123, cache, pair));
Preconditions.checkNotNull(replicaInfo1.getReplica());
Preconditions.checkState(replicaInfo1.getInvalidTokenException() == null);
pair.compareWith(replicaInfo1.getReplica().getDataStream(),
replicaInfo1.getReplica().getMetaStream());
replicaInfo1.getReplica().unref();
final MutableBoolean triedToCreate = new MutableBoolean(false);
do {
Thread.sleep(10);
ShortCircuitReplicaInfo replicaInfo2 =
cache.fetchOrCreate(
new ExtendedBlockId(123, "test_bp1"), new ShortCircuitReplicaCreator() {
@Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
triedToCreate.setValue(true);
return null;
}
});
if ((replicaInfo2 != null) && (replicaInfo2.getReplica() != null)) {
replicaInfo2.getReplica().unref();
}
} while (triedToCreate.isFalse());
cache.close();
}
/**
* Constructor
*
* @param running A mutable boolean that tells if the watcher is running
* @param poolsToObserve The list of pools to watch
* @param logger The logger to send the result to
* @param sleepTime The amount time between when the observation is made
*/
ControlledThreadPoolExecutorDebuggerWatcher(MutableBoolean running,
ExecutorService[] poolsToObserve,
Logger logger,
int sleepTime) {
this.running = running;
this.poolsToObserve = poolsToObserve;
this.logger = logger;
this.sleepTime = sleepTime;
for (ExecutorService poolToObserve : poolsToObserve) {
lastNumberOfTasks.put(poolToObserve, 0L);
lastTime.put(poolToObserve, System.currentTimeMillis());
}
}
@Override
protected FrameBlock readBlobFromRDD(RDDObject rdd, MutableBoolean status)
throws IOException
{
//note: the read of a frame block from an RDD might trigger
//lazy evaluation of pending transformations.
RDDObject lrdd = rdd;
//prepare return status (by default only collect)
status.setValue(false);
MetaDataFormat iimd = (MetaDataFormat) _metaData;
DataCharacteristics dc = iimd.getDataCharacteristics();
int rlen = (int)dc.getRows();
int clen = (int)dc.getCols();
//handle missing schema if necessary
ValueType[] lschema = (_schema!=null) ? _schema :
UtilFunctions.nCopies(clen>=1 ? (int)clen : 1, ValueType.STRING);
FrameBlock fb = null;
try {
//prevent unnecessary collect through rdd checkpoint
if( rdd.allowsShortCircuitCollect() ) {
lrdd = (RDDObject)rdd.getLineageChilds().get(0);
}
//collect frame block from binary block RDD
fb = SparkExecutionContext.toFrameBlock(lrdd, lschema, rlen, clen);
}
catch(DMLRuntimeException ex) {
throw new IOException(ex);
}
//sanity check correct output
if( fb == null )
throw new IOException("Unable to load frame from rdd.");
return fb;
}
@Override
@SuppressWarnings("unchecked")
protected TensorBlock readBlobFromRDD(RDDObject rdd, MutableBoolean status) {
status.setValue(false);
TensorCharacteristics tc = (TensorCharacteristics) _metaData.getDataCharacteristics();
// TODO correct blocksize;
// TODO read from RDD
return SparkExecutionContext.toTensorBlock((JavaPairRDD<TensorIndexes, TensorBlock>) rdd.getRDD(), tc);
}
private static final byte[] toByteArray(final HttpEntity entity,
int maxContent, MutableBoolean trimmed) throws IOException {
if (entity == null)
return new byte[] {};
final InputStream instream = entity.getContent();
if (instream == null) {
return null;
}
Args.check(entity.getContentLength() <= Integer.MAX_VALUE,
"HTTP entity too large to be buffered in memory");
int reportedLength = (int) entity.getContentLength();
// set default size for buffer: 100 KB
int bufferInitSize = 102400;
if (reportedLength != -1) {
bufferInitSize = reportedLength;
}
// avoid init of too large a buffer when we will trim anyway
if (maxContent != -1 && bufferInitSize > maxContent) {
bufferInitSize = maxContent;
}
final ByteArrayBuffer buffer = new ByteArrayBuffer(bufferInitSize);
final byte[] tmp = new byte[4096];
int lengthRead;
while ((lengthRead = instream.read(tmp)) != -1) {
// check whether we need to trim
if (maxContent != -1 && buffer.length() + lengthRead > maxContent) {
buffer.append(tmp, 0, maxContent - buffer.length());
trimmed.setValue(true);
break;
}
buffer.append(tmp, 0, lengthRead);
}
return buffer.toByteArray();
}
public Set<String>[] fieldsAndTerms(Set<String> termFrequencyFields, Set<String> indexedFields, Set<String> contentFields, MutableBoolean oredFields) {
return fieldsAndTerms(termFrequencyFields, indexedFields, contentFields, oredFields, false);
}
private synchronized T acquireReadIntern() {
if ( !isAvailableToRead() )
throw new DMLRuntimeException("MatrixObject not available to read.");
//get object from cache
if( _data == null )
getCache();
//call acquireHostRead if gpuHandle is set as well as is allocated
if( DMLScript.USE_ACCELERATOR && _gpuObjects != null ) {
boolean copiedFromGPU = false;
for (Map.Entry<GPUContext, GPUObject> kv : _gpuObjects.entrySet()) {
GPUObject gObj = kv.getValue();
if (gObj != null && copiedFromGPU && gObj.isDirty())
throw new DMLRuntimeException("Internal Error : Inconsistent internal state, A copy of this CacheableData was dirty on more than 1 GPU");
else if (gObj != null) {
copiedFromGPU = gObj.acquireHostRead(null);
if( _data == null )
getCache();
}
}
}
//read data from HDFS/RDD if required
//(probe data for cache_nowrite / jvm_reuse)
if( _data==null && isEmpty(true) ) {
try {
if( DMLScript.STATISTICS )
CacheStatistics.incrementHDFSHits();
if( getRDDHandle()==null || getRDDHandle().allowsShortCircuitRead() ) {
//check filename
if( _hdfsFileName == null )
throw new DMLRuntimeException("Cannot read matrix for empty filename.");
//read cacheable data from hdfs
_data = readBlobFromHDFS( _hdfsFileName );
//mark for initial local write despite read operation
_requiresLocalWrite = CACHING_WRITE_CACHE_ON_READ;
}
else {
//read matrix from rdd (incl execute pending rdd operations)
MutableBoolean writeStatus = new MutableBoolean();
_data = readBlobFromRDD( getRDDHandle(), writeStatus );
//mark for initial local write (prevent repeated execution of rdd operations)
_requiresLocalWrite = writeStatus.booleanValue() ?
CACHING_WRITE_CACHE_ON_READ : true;
}
setDirty(false);
}
catch (IOException e) {
throw new DMLRuntimeException("Reading of " + _hdfsFileName + " ("+hashCode()+") failed.", e);
}
_isAcquireFromEmpty = true;
}
else if( _data!=null && DMLScript.STATISTICS ) {
CacheStatistics.incrementMemHits();
}
//cache status maintenance
acquire( false, _data==null );
return _data;
}
protected abstract T readBlobFromRDD(RDDObject rdd, MutableBoolean status)
throws IOException;
@Override
protected MatrixBlock readBlobFromRDD(RDDObject rdd, MutableBoolean writeStatus)
throws IOException
{
//note: the read of a matrix block from an RDD might trigger
//lazy evaluation of pending transformations.
RDDObject lrdd = rdd;
//prepare return status (by default only collect)
writeStatus.setValue(false);
MetaDataFormat iimd = (MetaDataFormat) _metaData;
DataCharacteristics mc = iimd.getDataCharacteristics();
InputInfo ii = iimd.getInputInfo();
MatrixBlock mb = null;
try
{
//prevent unnecessary collect through rdd checkpoint
if( rdd.allowsShortCircuitCollect() ) {
lrdd = (RDDObject)rdd.getLineageChilds().get(0);
}
//obtain matrix block from RDD
int rlen = (int)mc.getRows();
int clen = (int)mc.getCols();
int blen = mc.getBlocksize();
long nnz = mc.getNonZerosBound();
//guarded rdd collect
if( ii == InputInfo.BinaryBlockInputInfo && //guarded collect not for binary cell
!OptimizerUtils.checkSparkCollectMemoryBudget(mc, getPinnedSize()+getBroadcastSize(), true) ) {
//write RDD to hdfs and read to prevent invalid collect mem consumption
//note: lazy, partition-at-a-time collect (toLocalIterator) was significantly slower
if( !HDFSTool.existsFileOnHDFS(_hdfsFileName) ) { //prevent overwrite existing file
long newnnz = SparkExecutionContext.writeRDDtoHDFS(lrdd, _hdfsFileName, iimd.getOutputInfo());
_metaData.getDataCharacteristics().setNonZeros(newnnz);
rdd.setPending(false); //mark rdd as non-pending (for export)
rdd.setHDFSFile(true); //mark rdd as hdfs file (for restore)
writeStatus.setValue(true); //mark for no cache-write on read
//note: the flag hdfsFile is actually not entirely correct because we still hold an rdd
//reference to the input not to an rdd of the hdfs file but the resulting behavior is correct
}
mb = readBlobFromHDFS(_hdfsFileName);
}
else if( ii == InputInfo.BinaryCellInputInfo ) {
//collect matrix block from binary block RDD
mb = SparkExecutionContext.toMatrixBlock(lrdd, rlen, clen, nnz);
}
else {
//collect matrix block from binary cell RDD
mb = SparkExecutionContext.toMatrixBlock(lrdd, rlen, clen, blen, nnz);
}
}
catch(DMLRuntimeException ex) {
throw new IOException(ex);
}
//sanity check correct output
if( mb == null )
throw new IOException("Unable to load matrix from rdd.");
return mb;
}
/**
* Allocate a new shared memory slot connected to this datanode.
*
* Must be called with the EndpointShmManager lock held.
*
* @param peer The peer to use to talk to the DataNode.
* @param usedPeer (out param) Will be set to true if we used the peer.
* When a peer is used
*
* @param clientName The client name.
* @param blockId The block ID to use.
* @return null if the DataNode does not support shared memory
* segments, or experienced an error creating the
* shm. The shared memory segment itself on success.
* @throws IOException If there was an error communicating over the socket.
*/
Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer,
String clientName, ExtendedBlockId blockId) throws IOException {
while (true) {
if (closed) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": the DfsClientShmManager has been closed.");
}
return null;
}
if (disabled) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": shared memory segment access is disabled.");
}
return null;
}
// Try to use an existing slot.
Slot slot = allocSlotFromExistingShm(blockId);
if (slot != null) {
return slot;
}
// There are no free slots. If someone is loading more slots, wait
// for that to finish.
if (loading) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": waiting for loading to finish...");
}
finishedLoading.awaitUninterruptibly();
} else {
// Otherwise, load the slot ourselves.
loading = true;
lock.unlock();
DfsClientShm shm;
try {
shm = requestNewShm(clientName, peer);
if (shm == null) continue;
// See #{DfsClientShmManager#domainSocketWatcher} for details
// about why we do this before retaking the manager lock.
domainSocketWatcher.add(peer.getDomainSocket(), shm);
// The DomainPeer is now our responsibility, and should not be
// closed by the caller.
usedPeer.setValue(true);
} finally {
lock.lock();
loading = false;
finishedLoading.signalAll();
}
if (shm.isDisconnected()) {
// If the peer closed immediately after the shared memory segment
// was created, the DomainSocketWatcher callback might already have
// fired and marked the shm as disconnected. In this case, we
// obviously don't want to add the SharedMemorySegment to our list
// of valid not-full segments.
if (LOG.isDebugEnabled()) {
LOG.debug(this + ": the UNIX domain socket associated with " +
"this short-circuit memory closed before we could make " +
"use of the shm.");
}
} else {
notFull.put(shm.getShmId(), shm);
}
}
}
}
@Test(timeout=60000)
public void testEviction() throws Exception {
final ShortCircuitCache cache =
new ShortCircuitCache(2, 10000000, 1, 10000000, 1, 10000, 0);
final TestFileDescriptorPair pairs[] = new TestFileDescriptorPair[] {
new TestFileDescriptorPair(),
new TestFileDescriptorPair(),
new TestFileDescriptorPair(),
};
ShortCircuitReplicaInfo replicaInfos[] = new ShortCircuitReplicaInfo[] {
null,
null,
null
};
for (int i = 0; i < pairs.length; i++) {
replicaInfos[i] = cache.fetchOrCreate(
new ExtendedBlockId(i, "test_bp1"),
new SimpleReplicaCreator(i, cache, pairs[i]));
Preconditions.checkNotNull(replicaInfos[i].getReplica());
Preconditions.checkState(replicaInfos[i].getInvalidTokenException() == null);
pairs[i].compareWith(replicaInfos[i].getReplica().getDataStream(),
replicaInfos[i].getReplica().getMetaStream());
}
// At this point, we have 3 replicas in use.
// Let's close them all.
for (int i = 0; i < pairs.length; i++) {
replicaInfos[i].getReplica().unref();
}
// The last two replicas should still be cached.
for (int i = 1; i < pairs.length; i++) {
final Integer iVal = new Integer(i);
replicaInfos[i] = cache.fetchOrCreate(
new ExtendedBlockId(i, "test_bp1"),
new ShortCircuitReplicaCreator() {
@Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
Assert.fail("expected to use existing entry for " + iVal);
return null;
}
});
Preconditions.checkNotNull(replicaInfos[i].getReplica());
Preconditions.checkState(replicaInfos[i].getInvalidTokenException() == null);
pairs[i].compareWith(replicaInfos[i].getReplica().getDataStream(),
replicaInfos[i].getReplica().getMetaStream());
}
// The first (oldest) replica should not be cached.
final MutableBoolean calledCreate = new MutableBoolean(false);
replicaInfos[0] = cache.fetchOrCreate(
new ExtendedBlockId(0, "test_bp1"),
new ShortCircuitReplicaCreator() {
@Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
calledCreate.setValue(true);
return null;
}
});
Preconditions.checkState(replicaInfos[0].getReplica() == null);
Assert.assertTrue(calledCreate.isTrue());
// Clean up
for (int i = 1; i < pairs.length; i++) {
replicaInfos[i].getReplica().unref();
}
for (int i = 0; i < pairs.length; i++) {
pairs[i].close();
}
cache.close();
}
/**
* Allocate a new shared memory slot connected to this datanode.
*
* Must be called with the EndpointShmManager lock held.
*
* @param peer The peer to use to talk to the DataNode.
* @param usedPeer (out param) Will be set to true if we used the peer.
* When a peer is used
*
* @param clientName The client name.
* @param blockId The block ID to use.
* @return null if the DataNode does not support shared memory
* segments, or experienced an error creating the
* shm. The shared memory segment itself on success.
* @throws IOException If there was an error communicating over the socket.
*/
Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer,
String clientName, ExtendedBlockId blockId) throws IOException {
while (true) {
if (closed) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": the DfsClientShmManager has been closed.");
}
return null;
}
if (disabled) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": shared memory segment access is disabled.");
}
return null;
}
// Try to use an existing slot.
Slot slot = allocSlotFromExistingShm(blockId);
if (slot != null) {
return slot;
}
// There are no free slots. If someone is loading more slots, wait
// for that to finish.
if (loading) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": waiting for loading to finish...");
}
finishedLoading.awaitUninterruptibly();
} else {
// Otherwise, load the slot ourselves.
loading = true;
lock.unlock();
DfsClientShm shm;
try {
shm = requestNewShm(clientName, peer);
if (shm == null) continue;
// See #{DfsClientShmManager#domainSocketWatcher} for details
// about why we do this before retaking the manager lock.
domainSocketWatcher.add(peer.getDomainSocket(), shm);
// The DomainPeer is now our responsibility, and should not be
// closed by the caller.
usedPeer.setValue(true);
} finally {
lock.lock();
loading = false;
finishedLoading.signalAll();
}
if (shm.isDisconnected()) {
// If the peer closed immediately after the shared memory segment
// was created, the DomainSocketWatcher callback might already have
// fired and marked the shm as disconnected. In this case, we
// obviously don't want to add the SharedMemorySegment to our list
// of valid not-full segments.
if (LOG.isDebugEnabled()) {
LOG.debug(this + ": the UNIX domain socket associated with " +
"this short-circuit memory closed before we could make " +
"use of the shm.");
}
} else {
notFull.put(shm.getShmId(), shm);
}
}
}
}
@Test(timeout=60000)
public void testEviction() throws Exception {
final ShortCircuitCache cache =
new ShortCircuitCache(2, 10000000, 1, 10000000, 1, 10000, 0);
final TestFileDescriptorPair pairs[] = new TestFileDescriptorPair[] {
new TestFileDescriptorPair(),
new TestFileDescriptorPair(),
new TestFileDescriptorPair(),
};
ShortCircuitReplicaInfo replicaInfos[] = new ShortCircuitReplicaInfo[] {
null,
null,
null
};
for (int i = 0; i < pairs.length; i++) {
replicaInfos[i] = cache.fetchOrCreate(
new ExtendedBlockId(i, "test_bp1"),
new SimpleReplicaCreator(i, cache, pairs[i]));
Preconditions.checkNotNull(replicaInfos[i].getReplica());
Preconditions.checkState(replicaInfos[i].getInvalidTokenException() == null);
pairs[i].compareWith(replicaInfos[i].getReplica().getDataStream(),
replicaInfos[i].getReplica().getMetaStream());
}
// At this point, we have 3 replicas in use.
// Let's close them all.
for (int i = 0; i < pairs.length; i++) {
replicaInfos[i].getReplica().unref();
}
// The last two replicas should still be cached.
for (int i = 1; i < pairs.length; i++) {
final Integer iVal = new Integer(i);
replicaInfos[i] = cache.fetchOrCreate(
new ExtendedBlockId(i, "test_bp1"),
new ShortCircuitReplicaCreator() {
@Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
Assert.fail("expected to use existing entry for " + iVal);
return null;
}
});
Preconditions.checkNotNull(replicaInfos[i].getReplica());
Preconditions.checkState(replicaInfos[i].getInvalidTokenException() == null);
pairs[i].compareWith(replicaInfos[i].getReplica().getDataStream(),
replicaInfos[i].getReplica().getMetaStream());
}
// The first (oldest) replica should not be cached.
final MutableBoolean calledCreate = new MutableBoolean(false);
replicaInfos[0] = cache.fetchOrCreate(
new ExtendedBlockId(0, "test_bp1"),
new ShortCircuitReplicaCreator() {
@Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
calledCreate.setValue(true);
return null;
}
});
Preconditions.checkState(replicaInfos[0].getReplica() == null);
Assert.assertTrue(calledCreate.isTrue());
// Clean up
for (int i = 1; i < pairs.length; i++) {
replicaInfos[i].getReplica().unref();
}
for (int i = 0; i < pairs.length; i++) {
pairs[i].close();
}
cache.close();
}
public static List<Document> toPipelineSteps(QueryConverter queryConverter, FromHolder tholder, List<Join> ljoins, Expression whereExpression) throws ParseException, net.sf.jsqlparser.parser.ParseException {
List<Document> ldoc = new LinkedList<Document>();
MutableBoolean haveOrExpression = new MutableBoolean();
for(Join j : ljoins) {
if(j.isInner() || j.isLeft()) {
if(j.getRightItem() instanceof Table || j.getRightItem() instanceof SubSelect) {
ExpressionHolder whereExpHolder;
String joinTableAlias = j.getRightItem().getAlias().getName();
String joinTableName = tholder.getSQLHolder(j.getRightItem()).getBaseTableName();
whereExpHolder = new ExpressionHolder(null);
if(whereExpression != null) {
haveOrExpression.setValue(false);
whereExpression.accept(new WhereVisitorMatchAndLookupPipelineMatchBuilder(joinTableAlias, whereExpHolder, haveOrExpression));
if(!haveOrExpression.booleanValue() && whereExpHolder.getExpression() != null) {
whereExpHolder.getExpression().accept(new ExpVisitorEraseAliasTableBaseBuilder(joinTableAlias));
}
else {
whereExpHolder.setExpression(null);
}
}
List<Document> subqueryDocs = new LinkedList<>();
if(j.getRightItem() instanceof SubSelect) {
subqueryDocs = queryConverter.fromSQLCommandInfoHolderToAggregateSteps((SQLCommandInfoHolder)tholder.getSQLHolder(j.getRightItem()));
}
ldoc.add(generateLookupStep(tholder,joinTableName,joinTableAlias,j.getOnExpression(),whereExpHolder.getExpression(),subqueryDocs));
ldoc.add(generateUnwindStep(tholder,joinTableAlias,j.isLeft()));
}
else {
throw new ParseException("From join not supported");
}
}
else {
throw new ParseException("Only inner join and left supported");
}
}
if(haveOrExpression.booleanValue()) {//if there is some "or" we use this step for support this logic and no other match steps
ldoc.add(generateMatchAfterJoin(tholder,whereExpression));
}
return ldoc;
}
public WhereVisitorMatchAndLookupPipelineMatchBuilder(String baseAliasTable, ExpressionHolder outputMatch, MutableBoolean haveOrExpression) {
this.baseAliasTable = baseAliasTable;
this.outputMatch = outputMatch;
this.haveOrExpression = haveOrExpression;
}
private synchronized T acquireReadIntern() {
if ( !isAvailableToRead() )
throw new DMLRuntimeException("MatrixObject not available to read.");
//get object from cache
if( _data == null )
getCache();
//call acquireHostRead if gpuHandle is set as well as is allocated
if( DMLScript.USE_ACCELERATOR && _gpuObjects != null ) {
boolean copiedFromGPU = false;
for (Map.Entry<GPUContext, GPUObject> kv : _gpuObjects.entrySet()) {
GPUObject gObj = kv.getValue();
if (gObj != null && copiedFromGPU && gObj.isDirty())
throw new DMLRuntimeException("Internal Error : Inconsistent internal state, A copy of this CacheableData was dirty on more than 1 GPU");
else if (gObj != null) {
copiedFromGPU = gObj.acquireHostRead(null);
if( _data == null )
getCache();
}
}
}
//read data from HDFS/RDD if required
//(probe data for cache_nowrite / jvm_reuse)
if( _data==null && isEmpty(true) ) {
try {
if( DMLScript.STATISTICS )
CacheStatistics.incrementHDFSHits();
if( getRDDHandle()==null || getRDDHandle().allowsShortCircuitRead() ) {
//check filename
if( _hdfsFileName == null )
throw new DMLRuntimeException("Cannot read matrix for empty filename.");
//read cacheable data from hdfs
_data = readBlobFromHDFS( _hdfsFileName );
//mark for initial local write despite read operation
_requiresLocalWrite = CACHING_WRITE_CACHE_ON_READ;
}
else {
//read matrix from rdd (incl execute pending rdd operations)
MutableBoolean writeStatus = new MutableBoolean();
_data = readBlobFromRDD( getRDDHandle(), writeStatus );
//mark for initial local write (prevent repeated execution of rdd operations)
_requiresLocalWrite = writeStatus.booleanValue() ?
CACHING_WRITE_CACHE_ON_READ : true;
}
setDirty(false);
}
catch (IOException e) {
throw new DMLRuntimeException("Reading of " + _hdfsFileName + " ("+hashCode()+") failed.", e);
}
_isAcquireFromEmpty = true;
}
else if( _data!=null && DMLScript.STATISTICS ) {
CacheStatistics.incrementMemHits();
}
//cache status maintenance
acquire( false, _data==null );
return _data;
}
protected abstract T readBlobFromRDD(RDDObject rdd, MutableBoolean status)
throws IOException;