下面列出了怎么用org.apache.zookeeper.server.persistence.FileTxnSnapLog的API类实例代码及写法,或者点击链接到github查看源代码。
public static void createAndStartZooKeeper()
throws IOException, ConfigException, InterruptedException {
ServerConfig zkConf = createZooKeeperConf();
zooKeeper = new ZooKeeperServer();
FileTxnSnapLog ftxn = new
FileTxnSnapLog(new File(zkConf.getDataLogDir()),
new File(zkConf.getDataDir()));
zooKeeper.setTxnLogFactory(ftxn);
zooKeeper.setTickTime(zkConf.getTickTime());
zooKeeper.setMinSessionTimeout(zkConf.getMinSessionTimeout());
zooKeeper.setMaxSessionTimeout(zkConf.getMaxSessionTimeout());
cnxnFactory =
new NIOServerCnxn.Factory(zkConf.getClientPortAddress(),
zkConf.getMaxClientCnxns());
cnxnFactory.startup(zooKeeper);
}
/**
* Starts zookeeper up on an ephemeral port.
*/
public void startNetwork() throws IOException, InterruptedException {
zooKeeperServer =
new ZooKeeperServer(
new FileTxnSnapLog(dataDir, snapDir),
new BasicDataTreeBuilder()) {
// TODO(John Sirois): Introduce a builder to configure the in-process server if and when
// some folks need JMX for in-process tests.
@Override protected void registerJMX() {
// noop
}
};
connectionFactory = new NIOServerCnxnFactory();
connectionFactory.configure(
new InetSocketAddress(port),
60 /* Semi-arbitrary, max 60 connections is the default used by NIOServerCnxnFactory */);
connectionFactory.startup(zooKeeperServer);
port = zooKeeperServer.getClientPort();
}
public void afterPropertiesSet() throws Exception {
if (purge) {
deleteFilesInDir(getDataLogDir());
deleteFilesInDir(getDataDir());
}
FileTxnSnapLog ftxn = new FileTxnSnapLog(getDataLogDir(), getDataDir());
zooKeeperServer.setTxnLogFactory(ftxn);
zooKeeperServer.setTickTime(getTickTime());
zooKeeperServer.setMinSessionTimeout(getMinSessionTimeout());
zooKeeperServer.setMaxSessionTimeout(getMaxSessionTimeout());
connectionFactory = new NIOServerCnxnFactory() {
@Override
protected void configureSaslLogin() throws IOException {
// do nothing
}
};
connectionFactory.configure(getClientPortAddress(), getMaxClientConnections());
connectionFactory.startup(zooKeeperServer);
}
public EmbeddedZookeeper(int port, Path baseDir) throws Exception {
this.port = port;
zookeeperBaseDir = baseDir;
zkServer = new ZooKeeperServer();
File dataDir = zookeeperBaseDir.resolve("log").toFile();
File snapDir = zookeeperBaseDir.resolve("data").toFile();
FileTxnSnapLog ftxn = new FileTxnSnapLog(dataDir, snapDir);
zkServer.setTxnLogFactory(ftxn);
zkServer.setTickTime(1000);
connectionFactory = new NIOServerCnxnFactory() {
@Override
protected void configureSaslLogin() throws IOException {
// do nothing
}
};
connectionFactory.configure(new InetSocketAddress("localhost", port), 0);
}
public ZookeeperServer(File root) throws IOException, InterruptedException {
zkServer = new ZooKeeperServer();
File dataDir = new File(root, "log");
File snapDir = new File(root, "data");
FileTxnSnapLog ftxn = new FileTxnSnapLog(dataDir, snapDir);
zkServer.setTxnLogFactory(ftxn);
zkServer.setTickTime(1000);
connectionFactory = new NIOServerCnxnFactory();
connectionFactory.configure(new InetSocketAddress("localhost", SocketUtils.findAvailableTcpPort()), 0);
connectionFactory.startup(zkServer);
}
private void startDistributed() throws IOException {
logger.info("Starting Embedded ZooKeeper Peer");
try {
transactionLog = new FileTxnSnapLog(new File(quorumPeerConfig.getDataLogDir()), new File(quorumPeerConfig.getDataDir()));
connectionFactory = ServerCnxnFactory.createFactory();
connectionFactory.configure(quorumPeerConfig.getClientPortAddress(), quorumPeerConfig.getMaxClientCnxns());
quorumPeer = new QuorumPeer();
quorumPeer.setClientPortAddress(quorumPeerConfig.getClientPortAddress());
quorumPeer.setTxnFactory(new FileTxnSnapLog(new File(quorumPeerConfig.getDataLogDir()), new File(quorumPeerConfig.getDataDir())));
quorumPeer.setQuorumPeers(quorumPeerConfig.getServers());
quorumPeer.setElectionType(quorumPeerConfig.getElectionAlg());
quorumPeer.setMyid(quorumPeerConfig.getServerId());
quorumPeer.setTickTime(quorumPeerConfig.getTickTime());
quorumPeer.setMinSessionTimeout(quorumPeerConfig.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(quorumPeerConfig.getMaxSessionTimeout());
quorumPeer.setInitLimit(quorumPeerConfig.getInitLimit());
quorumPeer.setSyncLimit(quorumPeerConfig.getSyncLimit());
quorumPeer.setQuorumVerifier(quorumPeerConfig.getQuorumVerifier());
quorumPeer.setCnxnFactory(connectionFactory);
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setLearnerType(quorumPeerConfig.getPeerType());
quorumPeer.setSyncEnabled(quorumPeerConfig.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(quorumPeerConfig.getQuorumListenOnAllIPs());
quorumPeer.start();
} catch (final IOException ioe) {
throw new IOException("Failed to start embedded ZooKeeper Peer", ioe);
} catch (final Exception e) {
throw new RuntimeException("Failed to start embedded ZooKeeper Peer", e);
}
}
/**
* Startup: start ZK. It is only after this that
* the binding information is valid.
* @throws Exception
*/
@Override
protected void serviceStart() throws Exception {
setupSecurity();
ZooKeeperServer zkServer = new ZooKeeperServer();
FileTxnSnapLog ftxn = new FileTxnSnapLog(dataDir, dataDir);
zkServer.setTxnLogFactory(ftxn);
zkServer.setTickTime(tickTime);
LOG.info("Starting Local Zookeeper service");
factory = ServerCnxnFactory.createFactory();
factory.configure(getAddress(port), -1);
factory.startup(zkServer);
String connectString = getConnectionString();
LOG.info("In memory ZK started at {}\n", connectString);
if (LOG.isDebugEnabled()) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
zkServer.dumpConf(pw);
pw.flush();
LOG.debug(sw.toString());
}
binding = new BindingInformation();
binding.ensembleProvider = new FixedEnsembleProvider(connectString);
binding.description =
getName() + " reachable at \"" + connectString + "\"";
addDiagnostics(binding.description);
// finally: set the binding information in the config
getConfig().set(KEY_REGISTRY_ZK_QUORUM, connectString);
}
private QuorumPeer createPeer(ServerCnxnFactory cnxnFactory, QuorumPeerConfig config) throws IOException {
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
QuorumPeer quorumPeer = new QuorumPeer();
quorumPeer.setClientPortAddress(config.getClientPortAddress());
quorumPeer.setTxnFactory(new FileTxnSnapLog(
new File(config.getDataLogDir()),
new File(config.getDataDir())));
quorumPeer.setQuorumPeers(config.getServers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
quorumPeer.start();
return quorumPeer;
}
@Override
protected void startUp() throws Exception {
ZooKeeperServer zkServer = new ZooKeeperServer();
FileTxnSnapLog ftxn = new FileTxnSnapLog(dataDir, dataDir);
zkServer.setTxnLogFactory(ftxn);
zkServer.setTickTime(tickTime);
factory = ServerCnxnFactory.createFactory();
factory.configure(getAddress(port), -1);
factory.startup(zkServer);
LOG.info("In memory ZK started: " + getConnectionStr());
}
/**
* Startup: start ZK. It is only after this that
* the binding information is valid.
* @throws Exception
*/
@Override
protected void serviceStart() throws Exception {
setupSecurity();
ZooKeeperServer zkServer = new ZooKeeperServer();
FileTxnSnapLog ftxn = new FileTxnSnapLog(dataDir, dataDir);
zkServer.setTxnLogFactory(ftxn);
zkServer.setTickTime(tickTime);
LOG.info("Starting Local Zookeeper service");
factory = ServerCnxnFactory.createFactory();
factory.configure(getAddress(port), -1);
factory.startup(zkServer);
String connectString = getConnectionString();
LOG.info("In memory ZK started at {}\n", connectString);
if (LOG.isDebugEnabled()) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
zkServer.dumpConf(pw);
pw.flush();
LOG.debug(sw.toString());
}
binding = new BindingInformation();
binding.ensembleProvider = new FixedEnsembleProvider(connectString);
binding.description =
getName() + " reachable at \"" + connectString + "\"";
addDiagnostics(binding.description);
// finally: set the binding information in the config
getConfig().set(KEY_REGISTRY_ZK_QUORUM, connectString);
}
/**
* Run from a ServerConfig.
* @param config ServerConfig to use.
* @throws IOException If there is a low-level I/O error.
*/
public void runFromConfig(ServerConfig config) throws IOException {
ObjectReleaseTracker.track(this);
log.info("Starting server");
try {
// ZooKeeper maintains a static collection of AuthenticationProviders, so
// we make sure the SASL provider is loaded so that it can be used in
// subsequent tests.
System.setProperty("zookeeper.authProvider.1",
"org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
// Note that this thread isn't going to be doing anything else,
// so rather than spawning another thread, we will just call
// run() in this thread.
// create a file logger url from the command line args
FileTxnSnapLog ftxn = new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir());
zooKeeperServer = new ZooKeeperServer(ftxn, config.getTickTime(),
config.getMinSessionTimeout(), config.getMaxSessionTimeout(),
new TestZKDatabase(ftxn, limiter));
cnxnFactory = new TestServerCnxnFactory(limiter);
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
cnxnFactory.startup(zooKeeperServer);
cnxnFactory.join();
if (violationReportAction != LimitViolationAction.IGNORE) {
String limitViolations = limiter.reportLimitViolations();
if (!limitViolations.isEmpty()) {
log.warn("Watch limit violations: {}", limitViolations);
if (violationReportAction == LimitViolationAction.FAIL) {
throw new AssertionError("Parallel watch limits violated");
}
}
}
} catch (InterruptedException e) {
// warn, but generally this is ok
log.warn("Server interrupted", e);
}
}
public void setupFromConfig() {
try {
txnLog = new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir());
}
catch (IOException e) {
e.printStackTrace();
}
zkServer = new ZooKeeperServer(txnLog, config.getTickTime(), config.getMinSessionTimeout(), config.getMaxSessionTimeout(), null);
}
public EmbeddedZookeeper(int port) throws IOException {
zkDataDir = Files.createTempDir();
zkServer = new ZooKeeperServer();
FileTxnSnapLog ftxn = new FileTxnSnapLog(zkDataDir, zkDataDir);
zkServer.setTxnLogFactory(ftxn);
cnxnFactory = new NIOServerCnxnFactory();
cnxnFactory.configure(new InetSocketAddress(port), 0);
}
public EmbeddedZookeeper(final int port)
throws IOException
{
this.port = port;
zkDataDir = Files.createTempDir();
zkServer = new ZooKeeperServer();
final FileTxnSnapLog ftxn = new FileTxnSnapLog(zkDataDir, zkDataDir);
zkServer.setTxnLogFactory(ftxn);
cnxnFactory = new NIOServerCnxn.Factory(new InetSocketAddress(this.port), 0);
}
public TestZooKeeperServer(FileTxnSnapLog txnLog, ServerConfig config)
{
this.txnLog = txnLog;
this.setTxnLogFactory(txnLog);
this.setMinSessionTimeout(config.getMinSessionTimeout());
this.setMaxSessionTimeout(config.getMaxSessionTimeout());
}
private void startDistributed() throws IOException {
logger.info("Starting Embedded ZooKeeper Peer");
try {
transactionLog = new FileTxnSnapLog(quorumPeerConfig.getDataLogDir(), quorumPeerConfig.getDataDir());
connectionFactory = ServerCnxnFactory.createFactory();
connectionFactory.configure(quorumPeerConfig.getClientPortAddress(), quorumPeerConfig.getMaxClientCnxns());
quorumPeer = new QuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(quorumPeerConfig.getDataLogDir(), quorumPeerConfig.getDataDir()));
quorumPeer.setElectionType(quorumPeerConfig.getElectionAlg());
quorumPeer.setMyid(quorumPeerConfig.getServerId());
quorumPeer.setTickTime(quorumPeerConfig.getTickTime());
quorumPeer.setMinSessionTimeout(quorumPeerConfig.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(quorumPeerConfig.getMaxSessionTimeout());
quorumPeer.setInitLimit(quorumPeerConfig.getInitLimit());
quorumPeer.setSyncLimit(quorumPeerConfig.getSyncLimit());
quorumPeer.setQuorumVerifier(quorumPeerConfig.getQuorumVerifier(), false);
quorumPeer.setCnxnFactory(connectionFactory);
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setLearnerType(quorumPeerConfig.getPeerType());
quorumPeer.setSyncEnabled(quorumPeerConfig.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(quorumPeerConfig.getQuorumListenOnAllIPs());
quorumPeer.start();
} catch (final IOException ioe) {
throw new IOException("Failed to start embedded ZooKeeper Peer", ioe);
} catch (final Exception e) {
throw new RuntimeException("Failed to start embedded ZooKeeper Peer", e);
}
}
public void connect() throws IOException, InterruptedException {
zkServer = new ZooKeeperServer(new FileTxnSnapLog(dataDir, logDir), new ZooKeeperServer.BasicDataTreeBuilder());
connectionFactory = new NIOServerCnxnFactory();
connectionFactory.configure(new InetSocketAddress(port), 10);
connectionFactory.startup(zkServer);
port = zkServer.getClientPort();
}
public SpliceZoo(QuorumPeerConfig config, int number) throws IOException {
this.config = config;
try {
if (QuorumPeer.class.getMethod("testingQuorumPeer", null) != null)
this.peer = (QuorumPeer) QuorumPeer.class.getMethod("testingQuorumPeer", null).invoke(null,null);
else
this.peer = QuorumPeer.class.newInstance();
} catch (Exception e) {
throw new RuntimeException("Quorum Peer Signature Issue for Unit Tests");
}
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),config.getMaxClientCnxns());
peer.setClientPortAddress(config.getClientPortAddress());
peer.setTxnFactory(new FileTxnSnapLog(new File(config.getDataLogDir()),
new File(config.getDataDir())));
peer.setQuorumPeers(config.getServers());
peer.setElectionType(config.getElectionAlg());
peer.setMyid(config.getServerId());
peer.setTickTime(config.getTickTime());
peer.setMinSessionTimeout(config.getMinSessionTimeout());
peer.setMaxSessionTimeout(config.getMaxSessionTimeout());
peer.setInitLimit(config.getInitLimit());
peer.setSyncLimit(config.getSyncLimit());
peer.setQuorumVerifier(config.getQuorumVerifier());
peer.setCnxnFactory(cnxnFactory);
peer.setZKDatabase(new ZKDatabase(peer.getTxnFactory()));
peer.setLearnerType(config.getPeerType());
peer.setMyid(number);
}
public TestZKDatabase(FileTxnSnapLog snapLog, WatchLimiter limiter) {
super(snapLog);
this.limiter = limiter;
}
public TestZookeeperServer(FileTxnSnapLog txnLogFactory, DataTreeBuilder treeBuilder) throws IOException
{
super(txnLogFactory, treeBuilder);
// TODO Auto-generated constructor stub
}
public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, DataTreeBuilder treeBuilder) throws IOException
{
super(txnLogFactory, tickTime, treeBuilder);
// TODO Auto-generated constructor stub
}
public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb)
{
super(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, treeBuilder, zkDb);
// TODO Auto-generated constructor stub
}
public TestZookeeperServer(FileTxnSnapLog txnLogFactory, DataTreeBuilder treeBuilder) throws IOException
{
super(txnLogFactory, treeBuilder);
// TODO Auto-generated constructor stub
}
public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, DataTreeBuilder treeBuilder)
throws IOException
{
super(txnLogFactory, tickTime, treeBuilder);
// TODO Auto-generated constructor stub
}
public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout,
int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb)
{
super(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, treeBuilder, zkDb);
// TODO Auto-generated constructor stub
}
public TestZookeeperServer(FileTxnSnapLog txnLogFactory, DataTreeBuilder treeBuilder) throws IOException
{
super(txnLogFactory, treeBuilder);
// TODO Auto-generated constructor stub
}
public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, DataTreeBuilder treeBuilder)
throws IOException
{
super(txnLogFactory, tickTime, treeBuilder);
// TODO Auto-generated constructor stub
}
public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout,
int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb)
{
super(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, treeBuilder, zkDb);
// TODO Auto-generated constructor stub
}
/**
* Starts Zookeeper.
*
* @throws IOException if an error occurs during initialization
* @throws InterruptedException if an error occurs during initialization
*/
public synchronized void start() throws IOException, InterruptedException {
log.info("Starting Zookeeper on port {}", port);
dataDir = Files.createTempDirectory(LocalZKServer.class.getSimpleName());
dataDir.toFile().deleteOnExit();
QuorumPeerConfig quorumConfig = new QuorumPeerConfig();
try {
quorumConfig.parseProperties(ConfigUtils.keyValueToProperties(
"dataDir", dataDir.toAbsolutePath(),
"clientPort", port
));
} catch (QuorumPeerConfig.ConfigException e) {
throw new IllegalArgumentException(e);
}
purgeManager =
new DatadirCleanupManager(quorumConfig.getDataDir(),
quorumConfig.getDataLogDir(),
quorumConfig.getSnapRetainCount(),
quorumConfig.getPurgeInterval());
purgeManager.start();
ServerConfig serverConfig = new ServerConfig();
serverConfig.readFrom(quorumConfig);
zkServer = new ZooKeeperServer();
zkServer.setTickTime(serverConfig.getTickTime());
zkServer.setMinSessionTimeout(serverConfig.getMinSessionTimeout());
zkServer.setMaxSessionTimeout(serverConfig.getMaxSessionTimeout());
// These two ServerConfig methods returned String in 3.4.x and File in 3.5.x
transactionLog = new FileTxnSnapLog(new File(serverConfig.getDataLogDir().toString()),
new File(serverConfig.getDataDir().toString()));
zkServer.setTxnLogFactory(transactionLog);
connectionFactory = ServerCnxnFactory.createFactory();
connectionFactory.configure(serverConfig.getClientPortAddress(), serverConfig.getMaxClientCnxns());
connectionFactory.startup(zkServer);
}