下面列出了怎么用org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException的API类实例代码及写法,或者点击链接到github查看源代码。
public static ZooKeeperStateServer create(final NiFiProperties properties) throws IOException, ConfigException {
final File propsFile = properties.getEmbeddedZooKeeperPropertiesFile();
if (propsFile == null) {
return null;
}
if (!propsFile.exists() || !propsFile.canRead()) {
throw new IOException("Cannot create Embedded ZooKeeper Server because the Properties File " + propsFile.getAbsolutePath()
+ " referenced in nifi.properties does not exist or cannot be read");
}
final Properties zkProperties = new Properties();
try (final InputStream fis = new FileInputStream(propsFile);
final InputStream bis = new BufferedInputStream(fis)) {
zkProperties.load(bis);
}
return new ZooKeeperStateServer(zkProperties);
}
protected void initializeAndRun(String[] args) throws ConfigException,
IOException {
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
log.warn("Unable to register log4j JMX control", e);
}
ServerConfig config = new ServerConfig();
if (args.length == 1) {
config.parse(args[0]);
} else {
config.parse(args);
}
runFromConfig(config);
}
private static ServerConfig createZooKeeperConf()
throws IOException, ConfigException {
// create conf file
File zkConfDir = new File(TEST_DIR);
zkConfDir.mkdirs();
File zkConfFile = new File(ZK_CONF_FILE);
zkConfFile.delete();
zkConfFile.createNewFile();
Properties zkConfProps = new Properties();
zkConfProps.setProperty("tickTime", "2000");
zkConfProps.setProperty("dataDir", ZK_DATA_DIR);
zkConfProps.setProperty("clientPort", new Integer(zkClientPort).toString());
zkConfProps.setProperty("maxClientCnxns", "30");
zkConfProps.store(new FileOutputStream(zkConfFile), "");
// create config object
ServerConfig zkConf = new ServerConfig();
zkConf.parse(ZK_CONF_FILE);
return zkConf;
}
public static void createAndStartZooKeeper()
throws IOException, ConfigException, InterruptedException {
ServerConfig zkConf = createZooKeeperConf();
zooKeeper = new ZooKeeperServer();
FileTxnSnapLog ftxn = new
FileTxnSnapLog(new File(zkConf.getDataLogDir()),
new File(zkConf.getDataDir()));
zooKeeper.setTxnLogFactory(ftxn);
zooKeeper.setTickTime(zkConf.getTickTime());
zooKeeper.setMinSessionTimeout(zkConf.getMinSessionTimeout());
zooKeeper.setMaxSessionTimeout(zkConf.getMaxSessionTimeout());
cnxnFactory =
new NIOServerCnxn.Factory(zkConf.getClientPortAddress(),
zkConf.getMaxClientCnxns());
cnxnFactory.startup(zooKeeper);
}
public static ZooKeeperStateServer create(final NiFiProperties properties) throws IOException, ConfigException {
final File propsFile = properties.getEmbeddedZooKeeperPropertiesFile();
if (propsFile == null) {
return null;
}
if (!propsFile.exists() || !propsFile.canRead()) {
throw new IOException("Cannot create Embedded ZooKeeper Server because the Properties File " + propsFile.getAbsolutePath()
+ " referenced in nifi.properties does not exist or cannot be read");
}
final Properties zkProperties = new Properties();
try (final InputStream fis = new FileInputStream(propsFile);
final InputStream bis = new BufferedInputStream(fis)) {
zkProperties.load(bis);
}
return new ZooKeeperStateServer(zkProperties);
}
ZookeeperLaucher create(Properties zkProperties) throws ConfigException, IOException {
QuorumPeerConfig zkConfig = new QuorumPeerConfig();
zkConfig.parseProperties(zkProperties);
DatadirCleanupManager purgeMgr =
new DatadirCleanupManager(zkConfig.getDataDir(), zkConfig.getDataLogDir(),
zkConfig.getSnapRetainCount(), zkConfig.getPurgeInterval());
purgeMgr.start();
if (zkConfig.getServers().size() > 0) {
return new QuorumPeerMainExt(zkConfig);
} else {
System.out
.println("Either no config or no quorum defined in config, running in standalone mode");
// there is only server in the quorum -- run as standalone
return new ZooKeeperServerMainExt(zkConfig);
}
}
@Override
protected IteratorBuildingVisitor createIteratorBuildingVisitor(final Range documentRange, boolean isQueryFullySatisfied, boolean sortedUIDs)
throws MalformedURLException, ConfigException, InstantiationException, IllegalAccessException {
IteratorBuildingVisitor v = createIteratorBuildingVisitor(AncestorIndexBuildingVisitor.class, documentRange, isQueryFullySatisfied, sortedUIDs)
.setIteratorBuilder(AncestorIndexIteratorBuilder.class);
return ((AncestorIndexBuildingVisitor) v).setEquality(equality);
}
public Iterator<Entry<Key,Document>> getDocumentIterator(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException,
ConfigException, InstantiationException, IllegalAccessException {
createAndSeekIndexIterator(range, columnFamilies, inclusive);
// Take the document Keys and transform it into Entry<Key,Document>, removing Attributes for this Document
// which do not fall within the expected time range
return this.fieldIndexResults.iterator();
}
@Override
protected IteratorBuildingVisitor createIteratorBuildingVisitor(final Range documentRange, boolean isQueryFullySatisfied, boolean sortedUIDs)
throws MalformedURLException, ConfigException, IllegalAccessException, InstantiationException {
return super.createIteratorBuildingVisitor(documentRange, isQueryFullySatisfied, sortedUIDs).setIteratorBuilder(CardinalityIteratorBuilder.class)
.setFieldsToAggregate(configuration.getFacetedFields());
}
public ZookeeperQueryLock(String zookeeperConfig, long clientCleanupInterval, String queryId) throws ConfigException {
this.queryId = queryId;
this.clientCleanupInterval = clientCleanupInterval;
URI zookeeperConfigFile = null;
try {
zookeeperConfigFile = new Path(zookeeperConfig).toUri();
if (new File(zookeeperConfigFile).exists()) {
QuorumPeerConfig zooConfig = new QuorumPeerConfig();
zooConfig.parse(zookeeperConfigFile.getPath());
StringBuilder builder = new StringBuilder();
for (QuorumServer server : zooConfig.getServers().values()) {
if (builder.length() > 0) {
builder.append(',');
}
builder.append(server.addr.getHostName()).append(':').append(zooConfig.getClientPortAddress().getPort());
}
if (builder.length() == 0) {
builder.append(zooConfig.getClientPortAddress().getHostName()).append(':').append(zooConfig.getClientPortAddress().getPort());
}
zookeeperConfig = builder.toString();
}
} catch (IllegalArgumentException iae) {
// ok, try as is
}
this.zookeeperConfig = zookeeperConfig;
}
public void run() {
try {
QuorumPeerConfig quorumPeerConfig = new QuorumPeerConfig();
ServerConfig configuration = new ServerConfig();
quorumPeerConfig.parseProperties(properties);
configuration.readFrom(quorumPeerConfig);
this.runFromConfig(configuration);
} catch (IOException | ConfigException e) {
throw new ZookeeperException("Unable to start embedded zookeeper server: {}", e.getMessage());
}
}
/**
* 通过官方的ZooKeeperServerMain启动类启动单机模式
* @param zkConfig 配置对象
* @throws ConfigException 配置异常
* @throws IOException IO异常
*/
public void startStandalone(ZkConfig zkConfig) throws ConfigException, IOException {
Properties zkProp = zkConfig.toProp();
QuorumPeerConfig config = new QuorumPeerConfig();
config.parseProperties(zkProp);
ServerConfig serverConfig = new ServerConfig();
serverConfig.readFrom(config);
ZooKeeperServerMain zkServer = new ZooKeeperServerMain();
zkServer.runFromConfig(serverConfig);
}
/**
* 通过官方的QuorumPeerMain启动类启动真集群模式
* 会执行quorumPeer.join();
* 需要在不同的服务器上执行
* @param zkConfig 配置对象
* @throws ConfigException 配置异常
* @throws IOException IO异常
*/
public void startCluster(ZkConfig zkConfig) throws ConfigException, IOException {
Properties zkProp = zkConfig.toProp();
QuorumPeerConfig config = new QuorumPeerConfig();
config.parseProperties(zkProp);
QuorumPeerMain main = new QuorumPeerMain();
main.runFromConfig(config);
}
public static ZooServer runZookeeperServer(String zooConfFile) {
zkSystemProps();
ServerConfig config = new ServerConfig();
try {
config.parse(zooConfFile);
} catch (ConfigException e) {
FmtLog.error(LOG, "Error in Zookeeper configuration file '%s': %s", zooConfFile, e.getMessage());
throw new IllegalArgumentException(e);
}
ZooServer zksm = new ZooServer(config);
zksm.setupFromConfig();
return zksm;
}
public MiniAvatarCluster(Configuration conf,
int numDataNodes,
boolean format,
String[] racks,
String[] hosts)
throws IOException, ConfigException, InterruptedException {
this(conf, numDataNodes, format, racks, hosts, 1, false);
}
/**
* Test that file data becomes available before file is closed.
*/
@Test
public void testFileCreationSimulated()
throws IOException, ConfigException, InterruptedException {
simulatedStorage = true;
testFileCreation();
simulatedStorage = false;
}
@Override
protected IteratorBuildingVisitor createIteratorBuildingVisitor(final Range documentRange, boolean isQueryFullySatisfied, boolean sortedUIDs)
throws MalformedURLException, ConfigException, InstantiationException, IllegalAccessException {
return createIteratorBuildingVisitor(TLDIndexBuildingVisitor.class, documentRange, isQueryFullySatisfied, sortedUIDs).setIteratorBuilder(
TLDIndexIteratorBuilder.class);
}
protected void createAndSeekIndexIterator(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException, ConfigException,
IllegalAccessException, InstantiationException {
boolean isQueryFullySatisfiedInitialState = true;
String hitListOptionString = documentOptions.get("hit.list");
if (hitListOptionString != null) {
boolean hitListOption = Boolean.parseBoolean(hitListOptionString);
if (hitListOption) {
isQueryFullySatisfiedInitialState = false; // if hit list is on, don't attempt satisfiability
// don't even make a SatisfactionVisitor.....
}
}
Collection<String> unindexedTypes = Lists.newArrayList();
Set<String> keys = fetchDataTypeKeys(this.documentOptions.get(NON_INDEXED_DATATYPES));
String compressedOptionString = this.documentOptions.get(QUERY_MAPPING_COMPRESS);
if (!org.apache.commons.lang3.StringUtils.isBlank(compressedOptionString)) {
boolean compressedOption = Boolean.parseBoolean(compressedOptionString);
if (compressedOption) {
for (String key : keys) {
unindexedTypes.add(decompressOption(key, QueryOptions.UTF8));
}
}
} else {
unindexedTypes.addAll(keys);
}
if (isQueryFullySatisfiedInitialState) {
SatisfactionVisitor satisfactionVisitor = this.createSatisfiabilityVisitor(true); // we'll charge in with optimism
satisfactionVisitor.setUnindexedFields(unindexedTypes);
// visit() and get the root which is the root of a tree of Boolean Logic Iterator<Key>'s
this.script.jjtAccept(satisfactionVisitor, null);
isQueryFullySatisfiedInitialState = satisfactionVisitor.isQueryFullySatisfied();
}
IteratorBuildingVisitor visitor = createIteratorBuildingVisitor(null, isQueryFullySatisfiedInitialState, sortedUIDs);
visitor.setUnindexedFields(unindexedTypes);
// visit() and get the root which is the root of a tree of Boolean Logic Iterator<Key>'s
script.jjtAccept(visitor, null);
NestedIterator<Key> root = visitor.root();
if (null == root) {
throw new UnindexedException("Could not instantiate iterators over field index for " + this.getQuery());
} else {
this.fieldIndexResults = new AccumuloFieldIndexIterable(root);
}
// Otherwise, we have to use the field index
// Seek() the boolean logic stuff
this.fieldIndexResults.seek(range, columnFamilies, inclusive);
}
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public Iterator<Entry<Key,Document>> getDocumentIterator(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException,
ConfigException, InstantiationException, IllegalAccessException {
// Otherwise, we have to use the field index
// Seek() the boolean logic stuff
createAndSeekIndexIterator(range, columnFamilies, inclusive);
Function<Entry<Key,Document>,Entry<DocumentData,Document>> keyToDoc = null;
// TODO consider using the new EventDataQueryExpressionFilter
EventDataQueryFieldFilter projection = null;
Iterator<Entry<Key,Document>> documents = null;
if (!configuration.getFacetedFields().isEmpty()) {
projection = new EventDataQueryFieldFilter();
projection.initializeWhitelist(configuration.getFacetedFields());
}
if (!configuration.hasFieldLimits() || projection != null) {
keyToDoc = new KeyToDocumentData(source.deepCopy(myEnvironment), super.equality, projection, this.includeHierarchyFields,
this.includeHierarchyFields);
}
AccumuloTreeIterable<Key,DocumentData> doc = null;
if (null != keyToDoc) {
doc = new AccumuloTreeIterable<>(fieldIndexResults.tree, keyToDoc);
} else {
if (log.isTraceEnabled()) {
log.trace("Skipping document lookup, because we don't need it");
}
doc = new AccumuloTreeIterable<>(fieldIndexResults.tree, new Function<Entry<Key,Document>,Entry<DocumentData,Document>>() {
@Override
@Nullable
public Entry<DocumentData,Document> apply(@Nullable Entry<Key,Document> input) {
Set<Key> docKeys = Sets.newHashSet();
List<Entry<Key,Value>> attrs = Lists.newArrayList();
return Maps.immutableEntry(new DocumentData(input.getKey(), docKeys, attrs), input.getValue());
}
});
}
doc.seek(range, columnFamilies, inclusive);
TypeMetadata typeMetadata = this.getTypeMetadata();
documents = Iterators.transform(doc.iterator(), new Aggregation(this.getTimeFilter(), typeMetadata, compositeMetadata, this.isIncludeGroupingContext(),
this.includeRecordId, false, null));
switch (configuration.getType()) {
case SHARD_COUNT:
case DAY_COUNT:
SortedKeyValueIterator<Key,Value> sourceDeepCopy = source.deepCopy(myEnvironment);
documents = getEvaluation(sourceDeepCopy, documents, compositeMetadata, typeMetadata, columnFamilies, inclusive);
// Take the document Keys and transform it into Entry<Key,Document>, removing Attributes for this Document
// which do not fall within the expected time range
documents = Iterators.transform(documents, new DocumentCountCardinality(configuration.getType(), !merge));
default:
break;
}
return documents;
}
public QueryLock getQueryLock() throws MalformedURLException, ConfigException {
return new QueryLock.Builder().forQueryId(getQueryId()).forFSCache(getFileSystemCache())
.forIvaratorDirs(ivaratorCacheDirConfigs.stream().map(IvaratorCacheDirConfig::getBasePathURI).collect(Collectors.joining(",")))
.forZookeeper(getZookeeperConfig(), HdfsBackedControl.CANCELLED_CHECK_INTERVAL * 2).build();
}
protected Iterator<Entry<Key,Document>> getEvaluation(NestedQueryIterator<Key> documentSource, SortedKeyValueIterator<Key,Value> sourceDeepCopy,
Iterator<Entry<Key,Document>> documents, CompositeMetadata compositeMetadata, TypeMetadata typeMetadataForEval,
Collection<ByteSequence> columnFamilies, boolean inclusive) {
// Filter the Documents by testing them against the JEXL query
if (!this.disableEvaluation) {
JexlEvaluation jexlEvaluationFunction = getJexlEvaluation(documentSource);
Collection<String> variables = null;
if (null != documentSource && null != documentSource.getQuery()) {
variables = VariableNameVisitor.parseQuery(jexlEvaluationFunction.parse(documentSource.getQuery()));
} else {
variables = VariableNameVisitor.parseQuery(jexlEvaluationFunction.parse(query));
}
final Iterator<Tuple2<Key,Document>> tupleItr = Iterators.transform(documents, new EntryToTuple<>());
// get the function we use for the tf functionality. Note we are
// getting an additional source deep copy for this function
final Iterator<Tuple3<Key,Document,Map<String,Object>>> itrWithContext;
if (this.isTermFrequenciesRequired()) {
Function<Tuple2<Key,Document>,Tuple3<Key,Document,Map<String,Object>>> tfFunction;
tfFunction = TFFactory.getFunction(getScript(documentSource), getContentExpansionFields(), getTermFrequencyFields(), this.getTypeMetadata(),
super.equality, getEvaluationFilter(), sourceDeepCopy.deepCopy(myEnvironment));
itrWithContext = TraceIterators.transform(tupleItr, tfFunction, "Term Frequency Lookup");
} else {
itrWithContext = Iterators.transform(tupleItr, new EmptyContext<>());
}
try {
IteratorBuildingVisitor iteratorBuildingVisitor = createIteratorBuildingVisitor(getDocumentRange(documentSource), false, this.sortedUIDs);
iteratorBuildingVisitor.setExceededOrEvaluationCache(exceededOrEvaluationCache);
Multimap<String,JexlNode> delayedNonEventFieldMap = DelayedNonEventSubTreeVisitor.getDelayedNonEventFieldMap(iteratorBuildingVisitor, script,
getNonEventFields());
IndexOnlyContextCreatorBuilder contextCreatorBuilder = new IndexOnlyContextCreatorBuilder().setSource(sourceDeepCopy)
.setRange(getDocumentRange(documentSource)).setTypeMetadata(typeMetadataForEval).setCompositeMetadata(compositeMetadata)
.setOptions(this).setVariables(variables).setIteratorBuildingVisitor(iteratorBuildingVisitor)
.setDelayedNonEventFieldMap(delayedNonEventFieldMap).setEquality(equality).setColumnFamilies(columnFamilies)
.setInclusive(inclusive).setComparatorFactory(this);
final IndexOnlyContextCreator contextCreator = contextCreatorBuilder.build();
if (exceededOrEvaluationCache != null) {
contextCreator.addAdditionalEntries(exceededOrEvaluationCache);
}
final Iterator<Tuple3<Key,Document,DatawaveJexlContext>> itrWithDatawaveJexlContext = Iterators.transform(itrWithContext, contextCreator);
Iterator<Tuple3<Key,Document,DatawaveJexlContext>> matchedDocuments = statelessFilter(itrWithDatawaveJexlContext, jexlEvaluationFunction);
if (log.isTraceEnabled()) {
log.trace("arithmetic:" + arithmetic + " range:" + getDocumentRange(documentSource) + ", thread:" + Thread.currentThread());
}
return Iterators.transform(matchedDocuments, new TupleToEntry<>());
} catch (InstantiationException | MalformedURLException | IllegalAccessException | ConfigException e) {
throw new IllegalStateException("Could not perform delayed index only evaluation", e);
}
} else if (log.isTraceEnabled()) {
log.trace("Evaluation is disabled, not instantiating Jexl evaluation logic");
}
return documents;
}
protected NestedIterator<Key> getOrSetKeySource(final Range documentRange, ASTJexlScript rangeScript) throws IOException, ConfigException,
IllegalAccessException, InstantiationException {
NestedIterator<Key> sourceIter = null;
// If we're doing field index or a non-fulltable (aka a normal
// query)
if (!this.isFullTableScanOnly()) {
boolean isQueryFullySatisfiedInitialState = batchedQueries <= 0;
String hitListOptionString = documentOptions.get(QueryOptions.HIT_LIST);
if (hitListOptionString != null) {
boolean hitListOption = Boolean.parseBoolean(hitListOptionString);
if (hitListOption) {
isQueryFullySatisfiedInitialState = false; // if hit
// list is
// on, don't
// attempt
// satisfiability
// don't even make a SatisfactionVisitor.....
}
}
if (isQueryFullySatisfiedInitialState) {
SatisfactionVisitor satisfactionVisitor = this.createSatisfiabilityVisitor(true); // we'll
// charge
// in
// with
// optimism
// visit() and get the root which is the root of a tree of
// Boolean Logic Iterator<Key>'s
rangeScript.jjtAccept(satisfactionVisitor, null);
isQueryFullySatisfiedInitialState = satisfactionVisitor.isQueryFullySatisfied();
}
IteratorBuildingVisitor visitor = createIteratorBuildingVisitor(documentRange, isQueryFullySatisfiedInitialState, this.sortedUIDs);
// visit() and get the root which is the root of a tree of
// Boolean Logic Iterator<Key>'s
rangeScript.jjtAccept(visitor, null);
sourceIter = visitor.root();
if (visitor.isQueryFullySatisfied()) {
this.fieldIndexSatisfiesQuery = true;
}
// Print out the boolean logic tree of iterators
debugBooleanLogicIterators(sourceIter);
if (sourceIter != null) {
sourceIter = new SeekableNestedIterator(sourceIter, this.myEnvironment);
}
}
// resort to a full table scan otherwise
if (sourceIter == null) {
sourceIter = getEventDataNestedIterator(source);
}
return sourceIter;
}
protected IteratorBuildingVisitor createIteratorBuildingVisitor(final Range documentRange, boolean isQueryFullySatisfied, boolean sortedUIDs)
throws ConfigException, MalformedURLException, InstantiationException, IllegalAccessException {
return createIteratorBuildingVisitor(IteratorBuildingVisitor.class, documentRange, isQueryFullySatisfied, sortedUIDs);
}
protected IteratorBuildingVisitor createIteratorBuildingVisitor(Class<? extends IteratorBuildingVisitor> c, final Range documentRange,
boolean isQueryFullySatisfied, boolean sortedUIDs) throws MalformedURLException, ConfigException, IllegalAccessException,
InstantiationException {
if (log.isTraceEnabled()) {
log.trace(documentRange);
}
// determine the list of indexed fields
Set<String> indexedFields = this.getIndexedFields();
indexedFields.removeAll(this.getNonIndexedDataTypeMap().keySet());
// @formatter:off
return c.newInstance()
.setSource(this, this.myEnvironment)
.setTimeFilter(this.getTimeFilter())
.setTypeMetadata(this.getTypeMetadata())
.setFieldsToAggregate(this.getNonEventFields())
.setAttrFilter(this.getEvaluationFilter())
.setDatatypeFilter(this.getFieldIndexKeyDataTypeFilter())
.setFiAggregator(this.fiAggregator)
.setHdfsFileSystem(this.getFileSystemCache())
.setQueryLock(this.getQueryLock())
.setIvaratorCacheDirConfigs(this.getIvaratorCacheDirConfigs())
.setQueryId(this.getQueryId())
.setScanId(this.getScanId())
.setIvaratorCacheSubDirPrefix(this.getHdfsCacheSubDirPrefix())
.setHdfsFileCompressionCodec(this.getHdfsFileCompressionCodec())
.setIvaratorCacheBufferSize(this.getIvaratorCacheBufferSize())
.setIvaratorCacheScanPersistThreshold(this.getIvaratorCacheScanPersistThreshold())
.setIvaratorCacheScanTimeout(this.getIvaratorCacheScanTimeout())
.setMaxRangeSplit(this.getMaxIndexRangeSplit())
.setIvaratorMaxOpenFiles(this.getIvaratorMaxOpenFiles())
.setIvaratorNumRetries(this.getIvaratorNumRetries())
.setIvaratorPersistOptions(this.getIvaratorPersistOptions())
.setUnsortedIvaratorSource(this.sourceForDeepCopies)
.setIvaratorSourcePool(createIvaratorSourcePool(this.maxIvaratorSources))
.setMaxIvaratorResults(this.getMaxIvaratorResults())
.setIncludes(indexedFields)
.setTermFrequencyFields(this.getTermFrequencyFields())
.setIsQueryFullySatisfied(isQueryFullySatisfied)
.setSortedUIDs(sortedUIDs)
.limit(documentRange)
.disableIndexOnly(disableFiEval)
.limit(this.sourceLimit)
.setCollectTimingDetails(this.collectTimingDetails)
.setQuerySpanCollector(this.querySpanCollector)
.setIndexOnlyFields(this.getAllIndexOnlyFields())
.setAllowTermFrequencyLookup(this.allowTermFrequencyLookup)
.setCompositeMetadata(compositeMetadata)
.setExceededOrEvaluationCache(exceededOrEvaluationCache);
// @formatter:on
// TODO: .setStatsPort(this.statsdHostAndPort);
}
public QueryLock build() throws ConfigException, MalformedURLException {
List<QueryLock> locks = new ArrayList<>();
if (!isEmpty(zookeeperConfig)) {
if (queryId == null) {
throw new IllegalArgumentException("Cannot create a query lock without a query id");
}
locks.add(new ZookeeperQueryLock(zookeeperConfig, cleanupInterval, queryId));
}
if (!isEmpty(fstURIs) || (!isEmpty(ivaratorURIs) && ivaratorURIs.contains("hdfs://"))) {
if (!isEmpty(hdfsSiteConfigs) || fsCache != null) {
if (queryId == null) {
throw new IllegalArgumentException("Cannot create a query lock without a query id");
}
if (!isEmpty(fstURIs)) {
if (fsCache == null) {
locks.add(new HdfsQueryLock(hdfsSiteConfigs, fstURIs, queryId));
} else {
locks.add(new HdfsQueryLock(fsCache, fstURIs, queryId));
}
}
if (!isEmpty(ivaratorURIs) && ivaratorURIs.contains("hdfs://")) {
if (fsCache == null) {
locks.add(new HdfsQueryLock(hdfsSiteConfigs, ivaratorURIs, queryId));
} else {
locks.add(new HdfsQueryLock(fsCache, ivaratorURIs, queryId));
}
}
} else {
throw new IllegalArgumentException("Configured hdfs URIs but missing hdfs configuration");
}
}
if (locks.size() == 1) {
return locks.get(0);
} else if (locks.size() > 1) {
return new CombinedQueryLock(locks);
} else {
return null;
}
}
private ZooKeeperStateServer(final Properties zkProperties) throws IOException, ConfigException {
quorumPeerConfig = new QuorumPeerConfig();
quorumPeerConfig.parseProperties(zkProperties);
}
@Override
public void initializeAndRun(String[] args) throws ConfigException, IOException {
super.initializeAndRun(args);
}
/**
* start multiple NNs and single DN and verifies per BP registrations and
* handshakes.
*
* @throws IOException
*/
@Test
public void test2NNRegistration()
throws IOException, ConfigException, InterruptedException{
MiniAvatarCluster cluster = new MiniAvatarCluster(conf, 1, true, null, null, 2, true);
try {
NameNodeInfo nn1 = cluster.getNameNode(0);
NameNodeInfo nn2 = cluster.getNameNode(1);
AvatarNode nn1zero = nn1.avatars.get(0).avatar;
AvatarNode nn1one = nn1.avatars.get(1).avatar;
AvatarNode nn2zero = nn2.avatars.get(0).avatar;
AvatarNode nn2one = nn2.avatars.get(1).avatar;
assertNotNull("cannot create nn1 avatar 0", nn1zero);
assertNotNull("cannot create nn1 avatar 1", nn1one);
assertNotNull("cannot create nn2 avatar 0", nn2zero);
assertNotNull("cannot create nn2 avatar 1", nn2one);
int ns1zero = nn1zero.getNamespaceID();
int ns1one = nn1one.getNamespaceID();
assertEquals("namespace ids for namenode 1 should be the same",
ns1zero, ns1one);
int ns2zero = nn2zero.getNamespaceID();
int ns2one = nn2one.getNamespaceID();
assertEquals("namespace ids for namenode 2 should be the same",
ns2zero, ns2one);
int lv1zero = nn1zero.getFSImage().getLayoutVersion();
int lv1one = nn1one.getFSImage().getLayoutVersion();
int lv2zero = nn2zero.getFSImage().getLayoutVersion();
int lv2one = nn2one.getFSImage().getLayoutVersion();
assertNotSame("namespace ids should be different", ns1zero, ns2zero);
LOG.info("nn1zero: lv=" + lv1zero + ";uri=" + nn1zero.getNameNodeAddress());
LOG.info("nn1one: lv=" + lv1one + ";uri=" + nn1one.getNameNodeAddress());
LOG.info("nn2zero: lv=" + lv2zero + ";uri=" + nn2zero.getNameNodeAddress());
LOG.info("nn2one: lv=" + lv2one + ";uri=" + nn2one.getNameNodeAddress());
// check number of volumes in fsdataset
AvatarDataNode dn = cluster.getDataNodes().get(0);
for (NamespaceService nsos : dn.getAllNamespaceServices()) {
LOG.info("reg: nsid =" + nsos.getNamespaceId() + "; name="
+ nsos.getNsRegistration().name + "; sid="
+ nsos.getNsRegistration().storageID + "; nna="
+ nsos.getNNSocketAddress());
}
NamespaceService nsos1 = dn.getAllNamespaceServices()[0];
NamespaceService nsos2 = dn.getAllNamespaceServices()[1];
// The order of bpos is not guaranteed, so fix the order
if (nsos1.getNNSocketAddress().equals(nn2zero.getNameNodeDNAddress())) {
NamespaceService tmp = nsos1;
nsos1 = nsos2;
nsos2 = tmp;
}
assertEquals("wrong nn address", nsos1.getNNSocketAddress(),
nn1zero.getNameNodeDNAddress());
assertEquals("wrong nn address", nsos2.getNNSocketAddress(),
nn2zero.getNameNodeDNAddress());
assertEquals("wrong nsid", nsos1.getNamespaceId(), ns1zero);
assertEquals("wrong nsid", nsos2.getNamespaceId(), ns2zero);
} finally {
cluster.shutDown();
}
}
/**
* starts single nn and single dn and verifies registration and handshake
*
* @throws IOException
*/
@Test
public void testFedSingleNN()
throws IOException, ConfigException, InterruptedException {
MiniAvatarCluster cluster = new MiniAvatarCluster(conf, 1, true, null, null, 1, true);
try {
NameNodeInfo nn1 = cluster.getNameNode(0);
AvatarNode nn1zero = nn1.avatars.get(0).avatar;
AvatarNode nn1one = nn1.avatars.get(1).avatar;
assertNotNull("cannot create nn1 zero", nn1zero);
assertNotNull("cannot create nn1 one", nn1one);
int nsid1zero = nn1zero.getNamespaceID();
int nsid1one = nn1one.getNamespaceID();
assertEquals("namespace ids for namenode 1 should be the same",
nsid1zero, nsid1one);
int lv1zero = nn1zero.getFSImage().getLayoutVersion();
int lv1one = nn1one.getFSImage().getLayoutVersion();
LOG.info("nn1: lv=" + lv1zero + ";nsid=" + nsid1zero + ";uri="
+ nn1zero.getNameNodeAddress());
LOG.info("nn1: lv=" + lv1one + ";nsid=" + nsid1one + ";uri="
+ nn1one.getNameNodeAddress());
// check number of vlumes in fsdataset
AvatarDataNode dn = cluster.getDataNodes().get(0);
for (NamespaceService nsos : dn.getAllNamespaceServices()) {
LOG.info("reg: nsid =" + nsos.getNamespaceId() + "; name="
+ nsos.getNsRegistration().name + "; sid="
+ nsos.getNsRegistration().storageID + "; nna="
+ nsos.getNNSocketAddress());
}
// try block report
NamespaceService nsos1 = dn.getAllNamespaceServices()[0];
nsos1.scheduleBlockReport(0);
assertEquals("wrong nn address", nsos1.getNNSocketAddress(),
nn1zero.getNameNodeDNAddress());
assertEquals("wrong nsid", nsos1.getNamespaceId(), nsid1zero);
cluster.shutDown();
// Ensure all the BPOfferService threads are shutdown
assertEquals(0, dn.getAllNamespaceServices().length);
cluster = null;
} finally {
if (cluster != null) {
cluster.shutDown();
}
}
}
/**
* Test that file data becomes available before file is closed.
*/
@Test
public void testFileCreation()
throws IOException, ConfigException, InterruptedException {
Configuration conf = new Configuration();
if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
}
MiniAvatarCluster cluster = new MiniAvatarCluster(conf, 1, true, null, null, 3, true);
try {
for (int i = 0; i < cluster.getNumNameNodes(); i++) {
FileSystem fs = cluster.getFileSystem(i);
//
// check that / exists
//
Path path = new Path("/");
System.out.println("Path : \"" + path.toString() + "\"");
System.out.println(fs.getFileStatus(path).isDir());
assertTrue("/ should be a directory",
fs.getFileStatus(path).isDir() == true);
//
// Create a directory inside /, then try to overwrite it
//
Path dir1 = new Path("/test_dir");
fs.mkdirs(dir1);
System.out.println("createFile: Creating " + dir1.getName() +
" for overwrite of existing directory.");
try {
fs.create(dir1, true); // Create path, overwrite=true
fs.close();
assertTrue("Did not prevent directory from being overwritten.", false);
} catch (IOException ie) {
if (!ie.getMessage().contains("already exists as a directory."))
throw ie;
}
// create a new file in home directory. Do not close it.
//
Path file1 = new Path("filestatus.dat");
FSDataOutputStream stm = TestFileCreation.createFile(fs, file1, 1);
// verify that file exists in FS namespace
assertTrue(file1 + " should be a file",
fs.getFileStatus(file1).isDir() == false);
System.out.println("Path : \"" + file1 + "\"");
// write to file
TestFileCreation.writeFile(stm);
// Make sure a client can read it before it is closed.
checkFile(fs, file1, 1);
// verify that file size has changed
long len = fs.getFileStatus(file1).getLen();
assertTrue(file1 + " should be of size " + (numBlocks * blockSize) +
" but found to be of size " + len,
len == numBlocks * blockSize);
stm.close();
// verify that file size has changed to the full size
len = fs.getFileStatus(file1).getLen();
assertTrue(file1 + " should be of size " + fileSize +
" but found to be of size " + len,
len == fileSize);
// Check storage usage
// can't check capacities for real storage since the OS file system may be changing under us.
if (simulatedStorage) {
AvatarDataNode dn = cluster.getDataNodes().get(0);
int namespaceId = cluster.getNameNode(i).avatars.get(0).avatar.getNamespaceID();
assertEquals(fileSize, dn.getFSDataset().getNSUsed(namespaceId));
//Because all namespaces share the same simulated dataset
assertEquals(SimulatedFSDataset.DEFAULT_CAPACITY-fileSize*(i+1),
dn.getFSDataset().getRemaining());
}
}
} finally {
cluster.shutDown();
}
}