下面列出了org.apache.hadoop.fs.ReadOption#org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static CacheDirectiveInfo readCacheDirectiveInfo(Stanza st)
throws InvalidXmlException {
CacheDirectiveInfo.Builder builder =
new CacheDirectiveInfo.Builder();
builder.setId(Long.parseLong(st.getValue("ID")));
String path = st.getValueOrNull("PATH");
if (path != null) {
builder.setPath(new Path(path));
}
String replicationString = st.getValueOrNull("REPLICATION");
if (replicationString != null) {
builder.setReplication(Short.parseShort(replicationString));
}
String pool = st.getValueOrNull("POOL");
if (pool != null) {
builder.setPool(pool);
}
String expiryTime = st.getValueOrNull("EXPIRATION");
if (expiryTime != null) {
builder.setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(
Long.parseLong(expiryTime)));
}
return builder.build();
}
/**
* Load cache directives from the fsimage
*/
private void loadDirectives(DataInput in) throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_ENTRIES);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
int numDirectives = in.readInt();
prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
for (int i = 0; i < numDirectives; i++) {
CacheDirectiveInfo info = FSImageSerialization.readCacheDirectiveInfo(in);
// Get pool reference by looking it up in the map
final String poolName = info.getPool();
CacheDirective directive =
new CacheDirective(info.getId(), info.getPath().toUri().getPath(),
info.getReplication(), info.getExpiration().getAbsoluteMillis());
addCacheDirective(poolName, directive);
counter.increment();
}
prog.endStep(Phase.LOADING_FSIMAGE, step);
}
public static void writeCacheDirectiveInfo(DataOutputStream out,
CacheDirectiveInfo directive) throws IOException {
writeLong(directive.getId(), out);
int flags =
((directive.getPath() != null) ? 0x1 : 0) |
((directive.getReplication() != null) ? 0x2 : 0) |
((directive.getPool() != null) ? 0x4 : 0) |
((directive.getExpiration() != null) ? 0x8 : 0);
out.writeInt(flags);
if (directive.getPath() != null) {
writeString(directive.getPath().toUri().getPath(), out);
}
if (directive.getReplication() != null) {
writeShort(directive.getReplication(), out);
}
if (directive.getPool() != null) {
writeString(directive.getPool(), out);
}
if (directive.getExpiration() != null) {
writeLong(directive.getExpiration().getMillis(), out);
}
}
public static CacheDirectiveInfo readCacheDirectiveInfo(DataInput in)
throws IOException {
CacheDirectiveInfo.Builder builder =
new CacheDirectiveInfo.Builder();
builder.setId(readLong(in));
int flags = in.readInt();
if ((flags & 0x1) != 0) {
builder.setPath(new Path(readString(in)));
}
if ((flags & 0x2) != 0) {
builder.setReplication(readShort(in));
}
if ((flags & 0x4) != 0) {
builder.setPool(readString(in));
}
if ((flags & 0x8) != 0) {
builder.setExpiration(
CacheDirectiveInfo.Expiration.newAbsolute(readLong(in)));
}
if ((flags & ~0xF) != 0) {
throw new IOException("unknown flags set in " +
"ModifyCacheDirectiveInfoOp: " + flags);
}
return builder.build();
}
public static CacheDirectiveInfo readCacheDirectiveInfo(Stanza st)
throws InvalidXmlException {
CacheDirectiveInfo.Builder builder =
new CacheDirectiveInfo.Builder();
builder.setId(Long.parseLong(st.getValue("ID")));
String path = st.getValueOrNull("PATH");
if (path != null) {
builder.setPath(new Path(path));
}
String replicationString = st.getValueOrNull("REPLICATION");
if (replicationString != null) {
builder.setReplication(Short.parseShort(replicationString));
}
String pool = st.getValueOrNull("POOL");
if (pool != null) {
builder.setPool(pool);
}
String expiryTime = st.getValueOrNull("EXPIRATION");
if (expiryTime != null) {
builder.setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(
Long.parseLong(expiryTime)));
}
return builder.build();
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<CacheDirectiveEntry> iter =
dfs.listCacheDirectives(
new CacheDirectiveInfo.Builder().
setPool(directive.getPool()).
setPath(directive.getPath()).
build());
if (!iter.hasNext()) {
return true;
}
Thread.sleep(1000);
}
return false;
}
@Override
public BatchedEntries<CacheDirectiveEntry>
listCacheDirectives(long prevId,
CacheDirectiveInfo filter) throws IOException {
if (filter == null) {
filter = new CacheDirectiveInfo.Builder().build();
}
try {
return new BatchedCacheEntries(
rpcProxy.listCacheDirectives(null,
ListCacheDirectivesRequestProto.newBuilder().
setPrevId(prevId).
setFilter(PBHelper.convert(filter)).
build()));
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override // ClientProtocol
public void modifyCacheDirective(
CacheDirectiveInfo directive, EnumSet<CacheFlag> flags) throws IOException {
checkNNStartup();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
boolean success = false;
try {
namesystem.modifyCacheDirective(directive, flags, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
/**
* Factory method that makes a new CacheDirectiveInfo by applying fields in a
* CacheDirectiveInfo to an existing CacheDirective.
*
* @param info with some or all fields set.
* @param defaults directive providing default values for unset fields in
* info.
*
* @return new CacheDirectiveInfo of the info applied to the defaults.
*/
private static CacheDirectiveInfo createFromInfoAndDefaults(
CacheDirectiveInfo info, CacheDirective defaults) {
// Initialize the builder with the default values
CacheDirectiveInfo.Builder builder =
new CacheDirectiveInfo.Builder(defaults.toInfo());
// Replace default with new value if present
if (info.getPath() != null) {
builder.setPath(info.getPath());
}
if (info.getReplication() != null) {
builder.setReplication(info.getReplication());
}
if (info.getPool() != null) {
builder.setPool(info.getPool());
}
if (info.getExpiration() != null) {
builder.setExpiration(info.getExpiration());
}
return builder.build();
}
public static CacheDirectiveInfoProto convert
(CacheDirectiveInfo info) {
CacheDirectiveInfoProto.Builder builder =
CacheDirectiveInfoProto.newBuilder();
if (info.getId() != null) {
builder.setId(info.getId());
}
if (info.getPath() != null) {
builder.setPath(info.getPath().toUri().getPath());
}
if (info.getReplication() != null) {
builder.setReplication(info.getReplication());
}
if (info.getPool() != null) {
builder.setPool(info.getPool());
}
if (info.getExpiration() != null) {
builder.setExpiration(convert(info.getExpiration()));
}
return builder.build();
}
public static CacheDirectiveInfo convert
(CacheDirectiveInfoProto proto) {
CacheDirectiveInfo.Builder builder =
new CacheDirectiveInfo.Builder();
if (proto.hasId()) {
builder.setId(proto.getId());
}
if (proto.hasPath()) {
builder.setPath(new Path(proto.getPath()));
}
if (proto.hasReplication()) {
builder.setReplication(Shorts.checkedCast(
proto.getReplication()));
}
if (proto.hasPool()) {
builder.setPool(proto.getPool());
}
if (proto.hasExpiration()) {
builder.setExpiration(convert(proto.getExpiration()));
}
return builder.build();
}
@Override
public ListCacheDirectivesResponseProto listCacheDirectives(
RpcController controller, ListCacheDirectivesRequestProto request)
throws ServiceException {
try {
CacheDirectiveInfo filter =
PBHelper.convert(request.getFilter());
BatchedEntries<CacheDirectiveEntry> entries =
server.listCacheDirectives(request.getPrevId(), filter);
ListCacheDirectivesResponseProto.Builder builder =
ListCacheDirectivesResponseProto.newBuilder();
builder.setHasMore(entries.hasMore());
for (int i=0, n=entries.size(); i<n; i++) {
builder.addElements(PBHelper.convert(entries.get(i)));
}
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public BatchedEntries<CacheDirectiveEntry>
listCacheDirectives(long prevId,
CacheDirectiveInfo filter) throws IOException {
if (filter == null) {
filter = new CacheDirectiveInfo.Builder().build();
}
try {
return new BatchedCacheEntries(
rpcProxy.listCacheDirectives(null,
ListCacheDirectivesRequestProto.newBuilder().
setPrevId(prevId).
setFilter(PBHelper.convert(filter)).
build()));
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<CacheDirectiveEntry> iter =
dfs.listCacheDirectives(
new CacheDirectiveInfo.Builder().
setPool(directive.getPool()).
setPath(directive.getPath()).
build());
if (iter.hasNext()) {
return true;
}
Thread.sleep(1000);
}
return false;
}
static CacheDirectiveInfo addCacheDirective(
FSNamesystem fsn, CacheManager cacheManager,
CacheDirectiveInfo directive, EnumSet<CacheFlag> flags,
boolean logRetryCache)
throws IOException {
final FSPermissionChecker pc = getFsPermissionChecker(fsn);
if (directive.getId() != null) {
throw new IOException("addDirective: you cannot specify an ID " +
"for this operation.");
}
CacheDirectiveInfo effectiveDirective =
cacheManager.addDirective(directive, pc, flags);
fsn.getEditLog().logAddCacheDirectiveInfo(effectiveDirective,
logRetryCache);
return effectiveDirective;
}
/**
* Add a list of cache directives, list cache directives,
* switch active NN, and list cache directives again.
*/
@Test (timeout=60000)
public void testListCacheDirectives() throws Exception {
final int poolCount = 7;
HashSet<String> poolNames = new HashSet<String>(poolCount);
Path path = new Path("/p");
for (int i=0; i<poolCount; i++) {
String poolName = "testListCacheDirectives-" + i;
CacheDirectiveInfo directiveInfo =
new CacheDirectiveInfo.Builder().setPool(poolName).setPath(path).build();
dfs.addCachePool(new CachePoolInfo(poolName));
dfs.addCacheDirective(directiveInfo, EnumSet.of(CacheFlag.FORCE));
poolNames.add(poolName);
}
listCacheDirectives(poolNames, 0);
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
cluster.waitActive(1);
listCacheDirectives(poolNames, 1);
}
@Test(timeout=60000)
public void testExceedsCapacity() throws Exception {
// Create a giant file
final Path fileName = new Path("/exceeds");
final long fileLen = CACHE_CAPACITY * (NUM_DATANODES*2);
int numCachedReplicas = (int) ((CACHE_CAPACITY*NUM_DATANODES)/BLOCK_SIZE);
DFSTestUtil.createFile(dfs, fileName, fileLen, (short) NUM_DATANODES,
0xFADED);
dfs.addCachePool(new CachePoolInfo("pool"));
dfs.addCacheDirective(new CacheDirectiveInfo.Builder().setPool("pool")
.setPath(fileName).setReplication((short) 1).build());
waitForCachedBlocks(namenode, -1, numCachedReplicas,
"testExceeds:1");
checkPendingCachedEmpty(cluster);
Thread.sleep(1000);
checkPendingCachedEmpty(cluster);
// Try creating a file with giant-sized blocks that exceed cache capacity
dfs.delete(fileName, false);
DFSTestUtil.createFile(dfs, fileName, 4096, fileLen, CACHE_CAPACITY * 2,
(short) 1, 0xFADED);
checkPendingCachedEmpty(cluster);
Thread.sleep(1000);
checkPendingCachedEmpty(cluster);
}
@Override
public ListCacheDirectivesResponseProto listCacheDirectives(
RpcController controller, ListCacheDirectivesRequestProto request)
throws ServiceException {
try {
CacheDirectiveInfo filter =
PBHelper.convert(request.getFilter());
BatchedEntries<CacheDirectiveEntry> entries =
server.listCacheDirectives(request.getPrevId(), filter);
ListCacheDirectivesResponseProto.Builder builder =
ListCacheDirectivesResponseProto.newBuilder();
builder.setHasMore(entries.hasMore());
for (int i=0, n=entries.size(); i<n; i++) {
builder.addElements(PBHelper.convert(entries.get(i)));
}
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
List<CacheDirectiveEntry> getAllCacheDirectives(UpstreamManager.Upstream upstream) throws IOException {
CacheDirectiveInfo filter = new CacheDirectiveInfo.Builder().build();
List<CacheDirectiveEntry> directives = new ArrayList<>();
long prevId = -1;
while (true) {
BatchedRemoteIterator.BatchedEntries<CacheDirectiveEntry> it =
upstream.protocol.listCacheDirectives(prevId, filter);
if (it.size() == 0) {
break;
}
for (int i = 0; i < it.size(); i++) {
CacheDirectiveEntry entry = it.get(i);
prevId = entry.getInfo().getId();
directives.add(entry);
}
}
return directives;
}
public static void writeCacheDirectiveInfo(ContentHandler contentHandler,
CacheDirectiveInfo directive) throws SAXException {
XMLUtils.addSaxString(contentHandler, "ID",
Long.toString(directive.getId()));
if (directive.getPath() != null) {
XMLUtils.addSaxString(contentHandler, "PATH",
directive.getPath().toUri().getPath());
}
if (directive.getReplication() != null) {
XMLUtils.addSaxString(contentHandler, "REPLICATION",
Short.toString(directive.getReplication()));
}
if (directive.getPool() != null) {
XMLUtils.addSaxString(contentHandler, "POOL", directive.getPool());
}
if (directive.getExpiration() != null) {
XMLUtils.addSaxString(contentHandler, "EXPIRATION",
"" + directive.getExpiration().getMillis());
}
}
@Override // ClientProtocol
public long addCacheDirective(
CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
checkNNStartup();
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion
(retryCache, null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (Long) cacheEntry.getPayload();
}
boolean success = false;
long ret = 0;
try {
ret = namesystem.addCacheDirective(path, flags, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success, ret);
}
return ret;
}
/**
* Add a new CacheDirective.
*
* @param info Information about a directive to add.
* @param flags {@link CacheFlag}s to use for this operation.
* @return the ID of the directive that was created.
* @throws IOException if the directive could not be added
*/
public long addCacheDirective(
CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
Preconditions.checkNotNull(info.getPath());
Path path = new Path(getPathName(fixRelativePart(info.getPath()))).
makeQualified(getUri(), getWorkingDirectory());
return dfs.addCacheDirective(
new CacheDirectiveInfo.Builder(info).
setPath(path).
build(),
flags);
}
/**
* Log a CacheDirectiveInfo returned from
* {@link CacheManager#addDirective(CacheDirectiveInfo, FSPermissionChecker)}
*/
void logAddCacheDirectiveInfo(CacheDirectiveInfo directive,
boolean toLogRpcIds) {
AddCacheDirectiveInfoOp op =
AddCacheDirectiveInfoOp.getInstance(cache.get())
.setDirective(directive);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
/**
* Modify a CacheDirective.
*
* @param info Information about the directive to modify. You must set the ID
* to indicate which CacheDirective you want to modify.
* @param flags {@link CacheFlag}s to use for this operation.
* @throws IOException if the directive could not be modified
*/
public void modifyCacheDirective(
CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
if (info.getPath() != null) {
info = new CacheDirectiveInfo.Builder(info).
setPath(new Path(getPathName(fixRelativePart(info.getPath()))).
makeQualified(getUri(), getWorkingDirectory())).build();
}
dfs.modifyCacheDirective(info, flags);
}
/**
* List cache directives. Incrementally fetches results from the server.
*
* @param filter Filter parameters to use when listing the directives, null to
* list all directives visible to us.
* @return A RemoteIterator which returns CacheDirectiveInfo objects.
*/
public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(
CacheDirectiveInfo filter) throws IOException {
if (filter == null) {
filter = new CacheDirectiveInfo.Builder().build();
}
if (filter.getPath() != null) {
filter = new CacheDirectiveInfo.Builder(filter).
setPath(new Path(getPathName(fixRelativePart(filter.getPath())))).
build();
}
final RemoteIterator<CacheDirectiveEntry> iter =
dfs.listCacheDirectives(filter);
return new RemoteIterator<CacheDirectiveEntry>() {
@Override
public boolean hasNext() throws IOException {
return iter.hasNext();
}
@Override
public CacheDirectiveEntry next() throws IOException {
// Although the paths we get back from the NameNode should always be
// absolute, we call makeQualified to add the scheme and authority of
// this DistributedFilesystem.
CacheDirectiveEntry desc = iter.next();
CacheDirectiveInfo info = desc.getInfo();
Path p = info.getPath().makeQualified(getUri(), getWorkingDirectory());
return new CacheDirectiveEntry(
new CacheDirectiveInfo.Builder(info).setPath(p).build(),
desc.getStats());
}
};
}
public AddCacheDirectiveInfoOp setDirective(
CacheDirectiveInfo directive) {
this.directive = directive;
assert(directive.getId() != null);
assert(directive.getPath() != null);
assert(directive.getReplication() != null);
assert(directive.getPool() != null);
assert(directive.getExpiration() != null);
return this;
}
public static CacheDirectiveInfoExpirationProto convert(
CacheDirectiveInfo.Expiration expiration) {
return CacheDirectiveInfoExpirationProto.newBuilder()
.setIsRelative(expiration.isRelative())
.setMillis(expiration.getMillis())
.build();
}
@Override // ClientProtocol
public BatchedEntries<CacheDirectiveEntry> listCacheDirectives(long prevId,
CacheDirectiveInfo filter) throws IOException {
checkNNStartup();
if (filter == null) {
filter = new CacheDirectiveInfo.Builder().build();
}
return namesystem.listCacheDirectives(prevId, filter);
}
private static String validatePoolName(CacheDirectiveInfo directive)
throws InvalidRequestException {
String pool = directive.getPool();
if (pool == null) {
throw new InvalidRequestException("No pool specified.");
}
if (pool.isEmpty()) {
throw new InvalidRequestException("Invalid empty pool name.");
}
return pool;
}
private static short validateReplication(CacheDirectiveInfo directive,
short defaultValue) throws InvalidRequestException {
short repl = (directive.getReplication() != null)
? directive.getReplication() : defaultValue;
if (repl <= 0) {
throw new InvalidRequestException("Invalid replication factor " + repl
+ " <= 0");
}
return repl;
}