下面列出了怎么用org.apache.zookeeper.server.NIOServerCnxn的API类实例代码及写法,或者点击链接到github查看源代码。
public static void createAndStartZooKeeper()
throws IOException, ConfigException, InterruptedException {
ServerConfig zkConf = createZooKeeperConf();
zooKeeper = new ZooKeeperServer();
FileTxnSnapLog ftxn = new
FileTxnSnapLog(new File(zkConf.getDataLogDir()),
new File(zkConf.getDataDir()));
zooKeeper.setTxnLogFactory(ftxn);
zooKeeper.setTickTime(zkConf.getTickTime());
zooKeeper.setMinSessionTimeout(zkConf.getMinSessionTimeout());
zooKeeper.setMaxSessionTimeout(zkConf.getMaxSessionTimeout());
cnxnFactory =
new NIOServerCnxn.Factory(zkConf.getClientPortAddress(),
zkConf.getMaxClientCnxns());
cnxnFactory.startup(zooKeeper);
}
public EmbeddedZookeeper(final int port)
throws IOException
{
this.port = port;
zkDataDir = Files.createTempDir();
zkServer = new ZooKeeperServer();
final FileTxnSnapLog ftxn = new FileTxnSnapLog(zkDataDir, zkDataDir);
zkServer.setTxnLogFactory(ftxn);
cnxnFactory = new NIOServerCnxn.Factory(new InetSocketAddress(this.port), 0);
}
/**
* Build a connection with a Chaos Monkey ZookeeperServer
*/
protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk) throws IOException
{
return new NIOServerCnxn(zkServer, sock, sk, this);
}
@Override
public void submitRequest(Request si)
{
long remaining = firstError != 0 ? LOCKOUT_DURATION_MS - (System.currentTimeMillis() - firstError) : 0;
if ( si.type != ZooDefs.OpCode.createSession && si.type != ZooDefs.OpCode.sync && si.type != ZooDefs.OpCode.ping
&& firstError != 0 && remaining > 0 )
{
log.debug("Rejected : " + si.toString());
// Still reject request
log.debug("Still not ready for " + remaining + "ms");
((NIOServerCnxn)si.cnxn).close();
return;
}
// Submit the request to the legacy Zookeeper server
log.debug("Applied : " + si.toString());
super.submitRequest(si);
// Raise an error if a lock is created
if ( si.type == ZooDefs.OpCode.create )
{
CreateRequest createRequest = new CreateRequest();
try
{
ByteBuffer duplicate = si.request.duplicate();
duplicate.rewind();
ByteBufferInputStream.byteBuffer2Record(duplicate, createRequest);
if ( createRequest.getPath().startsWith(CHAOS_ZNODE_PREFIX)
&& firstError == 0 )
{
firstError = System.currentTimeMillis();
// The znode has been created, close the connection and don't tell it to client
log.warn("Closing connection right after " + createRequest.getPath() + " creation");
((NIOServerCnxn)si.cnxn).close();
}
}
catch ( Exception e )
{
// Should not happen
((NIOServerCnxn)si.cnxn).close();
}
}
}