下面列出了怎么用org.apache.zookeeper.server.NIOServerCnxnFactory的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Constructs an embedded Zookeeper instance.
*
* @param connectString Zookeeper connection string.
*
* @throws IOException if an error occurs during Zookeeper initialization.
*/
public EmbeddedZookeeper(String connectString) throws IOException {
this.snapshotDir = KafkaTestUtils.getTempDir();
this.logDir = KafkaTestUtils.getTempDir();
this.factory = new NIOServerCnxnFactory();
String hostname = connectString.split(":")[0];
int port = Integer.parseInt(connectString.split(":")[1]);
int maxClientConnections = 1024;
factory.configure(new InetSocketAddress(hostname, port), maxClientConnections);
try {
int tickTime = 500;
factory.startup(new ZooKeeperServer(snapshotDir, logDir, tickTime));
} catch (InterruptedException e) {
throw new IOException(e);
}
}
public CCEmbeddedZookeeper() {
int tickTime = 500;
try {
File snapshotDir = CCKafkaTestUtils.newTempDir();
File logDir = CCKafkaTestUtils.newTempDir();
_zk = new ZooKeeperServer(snapshotDir, logDir, tickTime);
_cnxnFactory = new NIOServerCnxnFactory();
InetAddress localHost = InetAddress.getLocalHost();
_hostAddress = localHost.getHostAddress();
InetSocketAddress bindAddress = new InetSocketAddress(localHost, 0);
_cnxnFactory.configure(bindAddress, 0);
_cnxnFactory.startup(_zk);
_port = _zk.getClientPort();
} catch (Exception e) {
throw new IllegalStateException(e);
}
//sanity check
if (_zk.getClientPort() != _port) {
throw new IllegalStateException();
}
}
@BeforeClass
public static void setupZooKeeper() throws Exception {
LOG.info("Starting ZK server");
zkTmpDir = File.createTempFile("zookeeper", "test");
zkTmpDir.delete();
zkTmpDir.mkdir();
try {
zks = new ZooKeeperServer(zkTmpDir, zkTmpDir, ZooKeeperDefaultPort);
serverFactory = new NIOServerCnxnFactory();
serverFactory.configure(new InetSocketAddress(ZooKeeperDefaultPort), 10);
serverFactory.startup(zks);
} catch (Exception e) {
LOG.error("Exception while instantiating ZooKeeper", e);
}
boolean b = LocalBookKeeper.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT);
LOG.debug("ZooKeeper server up: " + b);
}
@BeforeClass
public static void setupZooKeeper() throws Exception {
// create a ZooKeeper server(dataDir, dataLogDir, port)
LOG.info("Starting ZK server");
ZkTmpDir = File.createTempFile("zookeeper", "test");
ZkTmpDir.delete();
ZkTmpDir.mkdir();
try {
zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperDefaultPort);
serverFactory = new NIOServerCnxnFactory();
serverFactory.configure(new InetSocketAddress(ZooKeeperDefaultPort), 10);
serverFactory.startup(zks);
} catch (Exception e) {
LOG.error("Exception while instantiating ZooKeeper", e);
}
boolean b = LocalBookKeeper.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT);
LOG.debug("ZooKeeper server up: " + b);
}
/**
* @throws IOException
*/
public void shutdown() throws IOException {
if (!started) {
return;
}
// shut down all the zk servers
for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
NIOServerCnxnFactory standaloneServerFactory =
standaloneServerFactoryList.get(i);
int clientPort = clientPortList.get(i);
standaloneServerFactory.shutdown();
if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
throw new IOException("Waiting for shutdown of standalone server");
}
}
// clear everything
started = false;
activeZKServerIndex = 0;
standaloneServerFactoryList.clear();
clientPortList.clear();
zooKeeperServers.clear();
logger.info("Shutdown MiniZK cluster with all ZK servers");
}
/**
* Kill one back up ZK servers
*
* @throws IOException
* @throws InterruptedException
*/
public void killOneBackupZooKeeperServer() throws IOException,
InterruptedException {
if (!started || activeZKServerIndex < 0 ||
standaloneServerFactoryList.size() <= 1) {
return;
}
int backupZKServerIndex = activeZKServerIndex + 1;
// Shutdown the current active one
NIOServerCnxnFactory standaloneServerFactory =
standaloneServerFactoryList.get(backupZKServerIndex);
int clientPort = clientPortList.get(backupZKServerIndex);
standaloneServerFactory.shutdown();
if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
throw new IOException("Waiting for shutdown of standalone server");
}
// remove this backup zk server
standaloneServerFactoryList.remove(backupZKServerIndex);
clientPortList.remove(backupZKServerIndex);
zooKeeperServers.remove(backupZKServerIndex);
logger.info("Kill one backup ZK servers in the cluster " +
"on client port: " + clientPort);
}
public EmbeddedZookeeper() {
try {
snapshotDir = KafkaTestUtils.newTempDir();
logDir = KafkaTestUtils.newTempDir();
tickTime = 500;
zk = new ZooKeeperServer(snapshotDir, logDir, tickTime);
registerShutdownHandler(zk);
cnxnFactory = new NIOServerCnxnFactory();
InetAddress localHost = InetAddress.getLocalHost();
hostAddress = localHost.getHostAddress();
InetSocketAddress bindAddress = new InetSocketAddress(localHost, port);
cnxnFactory.configure(bindAddress, 0);
cnxnFactory.startup(zk);
port = zk.getClientPort();
} catch (Exception e) {
throw new IllegalStateException(e);
}
//sanity check
if (zk.getClientPort() != port) {
throw new IllegalStateException();
}
}
@BeforeClass
public static void setupZooKeeper() throws Exception {
LOG.info("Starting ZK server");
zkTmpDir = File.createTempFile("zookeeper", "test");
zkTmpDir.delete();
zkTmpDir.mkdir();
try {
zks = new ZooKeeperServer(zkTmpDir, zkTmpDir, ZooKeeperDefaultPort);
serverFactory = new NIOServerCnxnFactory();
serverFactory.configure(new InetSocketAddress(ZooKeeperDefaultPort), 10);
serverFactory.startup(zks);
} catch (Exception e) {
LOG.error("Exception while instantiating ZooKeeper", e);
}
boolean b = LocalBookKeeper.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT);
LOG.debug("ZooKeeper server up: " + b);
}
@BeforeClass
public static void setupZooKeeper() throws Exception {
// create a ZooKeeper server(dataDir, dataLogDir, port)
LOG.info("Starting ZK server");
ZkTmpDir = File.createTempFile("zookeeper", "test");
ZkTmpDir.delete();
ZkTmpDir.mkdir();
try {
zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperDefaultPort);
serverFactory = new NIOServerCnxnFactory();
serverFactory.configure(new InetSocketAddress(ZooKeeperDefaultPort), 10);
serverFactory.startup(zks);
} catch (Exception e) {
LOG.error("Exception while instantiating ZooKeeper", e);
}
boolean b = LocalBookKeeper.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT);
LOG.debug("ZooKeeper server up: " + b);
}
@Override
public Statement apply(Statement s, Description d) {
return new StatementAdapter(s) {
@Override
protected void before() throws Throwable {
if (!applied.getAndSet(true)) {
UncaughtExceptionHandler p = Thread.getDefaultUncaughtExceptionHandler();
try {
// Try to initialize a zookeeper class that reinitializes default exception handler.
Class<?> cl = NIOServerCnxnFactory.class;
// Make sure static initializers have been called.
Class.forName(cl.getName(), true, cl.getClassLoader());
} finally {
if (p == Thread.getDefaultUncaughtExceptionHandler()) {
// throw new RuntimeException("Zookeeper no longer resets default thread handler.");
}
Thread.setDefaultUncaughtExceptionHandler(p);
}
}
}
};
}
private ZooKeeperTestServer(int port) throws IOException {
zooKeeperDir = getTempDir();
delete(zooKeeperDir);
if (!zooKeeperDir.mkdir()) {
throw new IllegalStateException("Failed to create directory " + zooKeeperDir);
}
zooKeeperDir.deleteOnExit();
server = new ZooKeeperServer(zooKeeperDir, zooKeeperDir, (int)tickTime.toMillis());
final int maxcc = 10000; // max number of connections from the same client
factory = new NIOServerCnxnFactory();
factory.configure(new InetSocketAddress(port), maxcc); // Use any port
try{
factory.startup(server);
} catch (InterruptedException e) {
throw (RuntimeException) new IllegalStateException("Interrupted during test startup: ").initCause(e);
}
}
public void start() throws IOException {
try {
// Allow all commands on ZK control port
System.setProperty("zookeeper.4lw.commands.whitelist", "*");
// disable the admin server as to not have any port conflicts
System.setProperty("zookeeper.admin.enableServer", "false");
zks = new ZooKeeperServer(zkTmpDir, zkTmpDir, ZooKeeperServer.DEFAULT_TICK_TIME);
zks.setMaxSessionTimeout(20000);
serverFactory = new NIOServerCnxnFactory();
serverFactory.configure(new InetSocketAddress(zkPort), 1000);
serverFactory.startup(zks);
} catch (Exception e) {
log.error("Exception while instantiating ZooKeeper", e);
}
this.zkPort = serverFactory.getLocalPort();
this.hostPort = "127.0.0.1:" + zkPort;
LocalBookkeeperEnsemble.waitForServerUp(hostPort, 30000);
log.info("ZooKeeper started at {}", hostPort);
}
@Override
protected void before() throws Throwable {
snapshotDir = tempDir(perTest("zk-snapshot"));
logDir = tempDir(perTest("zk-log"));
log.info("Setting up ZK Server with snapshotDir:{}, logDir:{}", snapshotDir, logDir);
int tickTime = 500;
try {
zooKeeperServer = new ZooKeeperServer(snapshotDir, logDir, tickTime);
cnxnFactory = new NIOServerCnxnFactory();
cnxnFactory.configure(new InetSocketAddress("127.0.0.1", port), 0);
cnxnFactory.startup(zooKeeperServer);
} catch (IOException e) {
e.printStackTrace();
}
}
public void startZookeeper(final int clusterId)
{
try {
//before start, clean the zookeeper files if it exists
FileUtils.deleteQuietly(new File(baseDir, zkBaseDir));
int clientPort = TEST_ZOOKEEPER_PORT[clusterId];
int numConnections = 10;
int tickTime = 2000;
File dir = new File(baseDir, zkdir[clusterId]);
TestZookeeperServer kserver = new TestZookeeperServer(dir, dir, tickTime);
zkFactory[clusterId] = new NIOServerCnxnFactory();
zkFactory[clusterId].configure(new InetSocketAddress(clientPort), numConnections);
zkFactory[clusterId].startup(kserver); // start the zookeeper server.
Thread.sleep(2000);
kserver.startup();
} catch (Exception ex) {
logger.debug(ex.getLocalizedMessage());
}
}
public static void startZookeeper(final int clusterId)
{
try {
int numConnections = 100;
int tickTime = 2000;
File dir = new File(baseDir, zkdir[clusterId]);
zkServer[clusterId] = new TestZookeeperServer(dir, dir, tickTime);
zkFactory[clusterId] = new NIOServerCnxnFactory();
zkFactory[clusterId].configure(new InetSocketAddress(TEST_ZOOKEEPER_PORT[clusterId]), numConnections);
zkFactory[clusterId].startup(zkServer[clusterId]); // start the zookeeper server.
Thread.sleep(2000);
//kserver.startup();
} catch (Exception ex) {
logger.error(ex.getLocalizedMessage());
}
}
public static void startZookeeper(final int clusterId)
{
try {
int numConnections = 100;
int tickTime = 2000;
File dir = new File(baseDir, zkdir[clusterId]);
zkServer[clusterId] = new TestZookeeperServer(dir, dir, tickTime);
zkFactory[clusterId] = new NIOServerCnxnFactory();
zkFactory[clusterId].configure(new InetSocketAddress(TEST_ZOOKEEPER_PORT[clusterId]), numConnections);
zkFactory[clusterId].startup(zkServer[clusterId]); // start the zookeeper server.
Thread.sleep(2000);
//kserver.startup();
} catch (Exception ex) {
logger.error(ex.getLocalizedMessage());
}
}
public void startup() throws IOException{
if (this.port == -1) {
this.port = TestUtils.getAvailablePort();
}
this.factory = NIOServerCnxnFactory.createFactory(new InetSocketAddress("localhost", port), 1024);
this.snapshotDir = TestUtils.constructTempDir("embeeded-zk/snapshot");
this.logDir = TestUtils.constructTempDir("embeeded-zk/log");
final ZooKeeperServer zooKeeperServer = new ZooKeeperServer(snapshotDir, logDir, tickTime);
try {
factory.startup(zooKeeperServer);
} catch (InterruptedException e) {
throw new IOException(e);
}
assertEquals("standalone", zooKeeperServer.getState());
assertEquals(this.port, zooKeeperServer.getClientPort());
}
/**
* Kill one back up ZK servers.
*
* @throws IOException if waiting for the shutdown of a server fails
*/
public void killOneBackupZooKeeperServer() throws IOException, InterruptedException {
if (!started || activeZKServerIndex < 0 || standaloneServerFactoryList.size() <= 1) {
return ;
}
int backupZKServerIndex = activeZKServerIndex+1;
// Shutdown the current active one
NIOServerCnxnFactory standaloneServerFactory =
standaloneServerFactoryList.get(backupZKServerIndex);
int clientPort = clientPortList.get(backupZKServerIndex);
standaloneServerFactory.shutdown();
if (!waitForServerDown(clientPort, connectionTimeout)) {
throw new IOException("Waiting for shutdown of standalone server");
}
zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
// remove this backup zk server
standaloneServerFactoryList.remove(backupZKServerIndex);
clientPortList.remove(backupZKServerIndex);
zooKeeperServers.remove(backupZKServerIndex);
LOG.info("Kill one backup ZK servers in the cluster on client port: {}", clientPort);
}
@Override
public void before() throws IOException {
snapshotDir = new File("target/test-classes/zk-snapshot");
snapshotDir.mkdirs();
logDir = new File("target/test-classes/zk-log");
logDir.mkdirs();
try {
zooKeeperServer = new ZooKeeperServer(snapshotDir, logDir, tickTime);
cnxnFactory = new NIOServerCnxnFactory();
cnxnFactory.configure(new InetSocketAddress("localhost", port), 1024);
cnxnFactory.startup(zooKeeperServer);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
/**
* Starts zookeeper up on an ephemeral port.
*/
public void startNetwork() throws IOException, InterruptedException {
zooKeeperServer =
new ZooKeeperServer(
new FileTxnSnapLog(dataDir, snapDir),
new BasicDataTreeBuilder()) {
// TODO(John Sirois): Introduce a builder to configure the in-process server if and when
// some folks need JMX for in-process tests.
@Override protected void registerJMX() {
// noop
}
};
connectionFactory = new NIOServerCnxnFactory();
connectionFactory.configure(
new InetSocketAddress(port),
60 /* Semi-arbitrary, max 60 connections is the default used by NIOServerCnxnFactory */);
connectionFactory.startup(zooKeeperServer);
port = zooKeeperServer.getClientPort();
}
public void afterPropertiesSet() throws Exception {
if (purge) {
deleteFilesInDir(getDataLogDir());
deleteFilesInDir(getDataDir());
}
FileTxnSnapLog ftxn = new FileTxnSnapLog(getDataLogDir(), getDataDir());
zooKeeperServer.setTxnLogFactory(ftxn);
zooKeeperServer.setTickTime(getTickTime());
zooKeeperServer.setMinSessionTimeout(getMinSessionTimeout());
zooKeeperServer.setMaxSessionTimeout(getMaxSessionTimeout());
connectionFactory = new NIOServerCnxnFactory() {
@Override
protected void configureSaslLogin() throws IOException {
// do nothing
}
};
connectionFactory.configure(getClientPortAddress(), getMaxClientConnections());
connectionFactory.startup(zooKeeperServer);
}
public EmbeddedZookeeper(int port, Path baseDir) throws Exception {
this.port = port;
zookeeperBaseDir = baseDir;
zkServer = new ZooKeeperServer();
File dataDir = zookeeperBaseDir.resolve("log").toFile();
File snapDir = zookeeperBaseDir.resolve("data").toFile();
FileTxnSnapLog ftxn = new FileTxnSnapLog(dataDir, snapDir);
zkServer.setTxnLogFactory(ftxn);
zkServer.setTickTime(1000);
connectionFactory = new NIOServerCnxnFactory() {
@Override
protected void configureSaslLogin() throws IOException {
// do nothing
}
};
connectionFactory.configure(new InetSocketAddress("localhost", port), 0);
}
public ZookeeperServer(File root) throws IOException, InterruptedException {
zkServer = new ZooKeeperServer();
File dataDir = new File(root, "log");
File snapDir = new File(root, "data");
FileTxnSnapLog ftxn = new FileTxnSnapLog(dataDir, snapDir);
zkServer.setTxnLogFactory(ftxn);
zkServer.setTickTime(1000);
connectionFactory = new NIOServerCnxnFactory();
connectionFactory.configure(new InetSocketAddress("localhost", SocketUtils.findAvailableTcpPort()), 0);
connectionFactory.startup(zkServer);
}
@Override
public void run() {
Utility util = Utility.getInstance();
AppConfigReader reader = AppConfigReader.getInstance();
String zkDir = reader.getProperty("zk.dir", "/tmp/zk");
int tickTime = util.str2int(reader.getProperty("zk.tick", "2000"));
if (tickTime < 1000) {
log.info("zk.tick is too small. Reset to 1000 ms");
tickTime = 1000;
}
File baseDir = new File(zkDir);
if (baseDir.exists()) {
// this guarantees that a standalone zookeeper will start with a clean state
util.cleanupDir(baseDir);
log.info("Clean up transient Zookeeper working directory at {}", baseDir);
}
File snapshotDir = new File(baseDir, "snapshots");
File logDir = new File(baseDir, "log");
try {
this.factory = NIOServerCnxnFactory.createFactory(2181, 512);
factory.startup(new ZooKeeperServer(snapshotDir, logDir, tickTime));
} catch (IOException | InterruptedException e) {
log.error("Unable to start Zookeeper - {}", e.getMessage());
System.exit(-1);
}
}
EmbeddedZooKeeper() throws IOException, InterruptedException {
this.tmpDir = Files.createTempDirectory(null).toFile();
this.factory = new NIOServerCnxnFactory();
this.zookeeper = new ZooKeeperServer(new File(tmpDir, "data"), new File(tmpDir, "log"), TICK_TIME);
InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 0);
factory.configure(addr, 0);
factory.startup(zookeeper);
this.port = zookeeper.getClientPort();
}
public MiniZooKeeperCluster() {
this.started = false;
// this.configuration = configuration;
activeZKServerIndex = -1;
zooKeeperServers = new ArrayList<ZooKeeperServer>();
clientPortList = new ArrayList<Integer>();
standaloneServerFactoryList = new ArrayList<NIOServerCnxnFactory>();
}
/**
* @return clientPort return clientPort if there is another ZK backup can run
* when killing the current active; return -1, if there is no backups.
* @throws IOException
* @throws InterruptedException
*/
public int killCurrentActiveZooKeeperServer() throws IOException,
InterruptedException {
if (!started || activeZKServerIndex < 0) {
return -1;
}
// Shutdown the current active one
NIOServerCnxnFactory standaloneServerFactory =
standaloneServerFactoryList.get(activeZKServerIndex);
int clientPort = clientPortList.get(activeZKServerIndex);
standaloneServerFactory.shutdown();
if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
throw new IOException("Waiting for shutdown of standalone server");
}
// remove the current active zk server
standaloneServerFactoryList.remove(activeZKServerIndex);
clientPortList.remove(activeZKServerIndex);
zooKeeperServers.remove(activeZKServerIndex);
logger.info("Kill the current active ZK servers in the cluster " +
"on client port: " + clientPort);
if (standaloneServerFactoryList.size() == 0) {
// there is no backup servers;
return -1;
}
clientPort = clientPortList.get(activeZKServerIndex);
logger.info("Activate a backup zk server in the cluster " +
"on client port: " + clientPort);
// return the next back zk server's port
return clientPort;
}
public ZooKeeperServer start() throws IOException, InterruptedException {
Objects.requireNonNull(workDir, "The localBaseFsLocation must be set before starting cluster.");
setupTestEnv();
stop();
File dir = new File(workDir, "zookeeper").getAbsoluteFile();
recreateDir(dir, clean);
int tickTimeToUse;
if (this.tickTime > 0) {
tickTimeToUse = this.tickTime;
} else {
tickTimeToUse = TICK_TIME;
}
this.zooKeeperServer = new ZooKeeperServer(dir, dir, tickTimeToUse);
standaloneServerFactory = new NIOServerCnxnFactory();
// NOTE: Changed from the original, where InetSocketAddress was
// originally created to bind to the wildcard IP, we now configure it.
LOG.info("Zookeeper force binding to: " + this.bindIP);
standaloneServerFactory.configure(new InetSocketAddress(bindIP, clientPort), 1000);
// Start up this ZK server
standaloneServerFactory.startup(zooKeeperServer);
String serverHostname;
if (bindIP.equals("0.0.0.0")) {
serverHostname = "localhost";
} else {
serverHostname = bindIP;
}
if (!waitForServerUp(serverHostname, clientPort, CONNECTION_TIMEOUT)) {
throw new IOException("Waiting for startup of standalone server");
}
started = true;
LOG.info("Zookeeper Minicluster service started on client port: " + clientPort);
return zooKeeperServer;
}
public void startup() throws IOException {
if (this.port == -1) {
this.port = TestUtils.getAvailablePort();
}
this.factory =
NIOServerCnxnFactory.createFactory(new InetSocketAddress("127.0.0.1", port), 1024);
this.snapshotDir = TestUtils.constructTempDir("embedded-zk/snapshot");
this.logDir = TestUtils.constructTempDir("embedded-zk/log");
try {
factory.startup(new ZooKeeperServer(snapshotDir, logDir, tickTime));
} catch (InterruptedException e) {
throw new IOException(e);
}
}
public void start() throws IOException {
try {
zks = new ZooKeeperServer(zkTmpDir, zkTmpDir, ZooKeeperServer.DEFAULT_TICK_TIME);
zks.setMaxSessionTimeout(20000);
serverFactory = new NIOServerCnxnFactory();
serverFactory.configure(new InetSocketAddress(zkPort), 1000);
serverFactory.startup(zks);
} catch (Exception e) {
log.error("Exception while instantiating ZooKeeper", e);
}
LocalBookkeeperEnsemble.waitForServerUp(hostPort, 30000);
log.info("ZooKeeper started at {}", hostPort);
}