下面列出了怎么用org.apache.zookeeper.txn.TxnHeader的API类实例代码及写法,或者点击链接到github查看源代码。
public void add(InputStream stream) throws Exception
{
ZooKeeperLogParser logParser = new ZooKeeperLogParser(stream);
if ( logParser.isValid() )
{
logParser.parse
(
new LogEntryReceiver()
{
@Override
public void receiveEntry(TxnHeader header, Record record) throws Exception
{
indexRecord(header, record, count, from, to);
}
}
);
}
}
private Document makeDocument(TxnHeader header, EntryTypes type, AtomicInteger count, AtomicLong from, AtomicLong to)
{
count.incrementAndGet();
if ( header.getTime() < from.get() )
{
from.set(header.getTime());
}
if ( header.getTime() > to.get() )
{
to.set(header.getTime());
}
NumericField dateField = new NumericField(FieldNames.DATE, Field.Store.YES, true);
dateField.setLongValue(header.getTime());
Document document = new Document();
document.add(new Field(FieldNames.TYPE, Integer.toString(type.getId()), Field.Store.YES, Field.Index.NOT_ANALYZED));
document.add(dateField);
return document;
}
private static String formatTransaction(TxnHeader header, Record txn) {
StringBuilder sb = new StringBuilder();
sb.append("time").append(fieldDelim).append(header.getTime());
sb.append(fieldSep).append("session").append(fieldDelim).append("0x")
.append(Long.toHexString(header.getClientId()));
sb.append(fieldSep).append("cxid").append(fieldDelim).append("0x")
.append(Long.toHexString(header.getCxid()));
sb.append(fieldSep).append("zxid").append(fieldDelim).append("0x")
.append(Long.toHexString(header.getZxid()));
sb.append(fieldSep).append("type").append(fieldDelim).append(op2String(header.getType()));
if (txn != null) {
try {
byte[] data = null;
for (PropertyDescriptor pd : Introspector.getBeanInfo(txn.getClass())
.getPropertyDescriptors()) {
if (pd.getName().equalsIgnoreCase("data")) {
data = (byte[]) pd.getReadMethod().invoke(txn);
continue;
}
if (pd.getReadMethod() != null && !"class".equals(pd.getName())) {
sb.append(fieldSep).append(pd.getDisplayName()).append(fieldDelim)
.append(pd.getReadMethod().invoke(txn).toString().replaceAll("[\\s]+", ""));
}
}
if (data != null) {
sb.append(fieldSep).append("data").append(fieldDelim)
.append(new String(data).replaceAll("[\\s]+", ""));
}
} catch (Exception e) {
LOG.error("Error while retrieving bean property values for " + txn.getClass(), e);
}
}
return sb.toString();
}
/**
* 读取多行日志
* @param total 读取的行数
* @return
* @throws IOException
*/
public String getLastLog(int total) throws IOException {
StringBuilder sb=new StringBuilder(1024);
FileInputStream fis = new FileInputStream(this.logFile);
BinaryInputArchive logStream = BinaryInputArchive.getArchive(fis);
FileHeader fhdr = new FileHeader();
fhdr.deserialize(logStream, "fileheader");
if (fhdr.getMagic() != FileTxnLog.TXNLOG_MAGIC) {
return "Invalid magic number for " + logFile;
}
sb.append("ZooKeeper Transactional Log File with dbid "
+ fhdr.getDbid() + " txnlog format version "
+ fhdr.getVersion()+"\r\n");
int count=0;
while (count<total) {
long crcValue;
byte[] bytes;
try {
crcValue = logStream.readLong("crcvalue");
bytes = logStream.readBuffer("txnEntry");
} catch (EOFException e) {
sb.append("EOF reached after " + count + " txns.\r\n");
break;
}
if (bytes.length == 0) {
// Since we preallocate, we define EOF to be an
// empty transaction
sb.append("EOF reached after " + count + " txns.\r\n");
break;
}
Checksum crc = new Adler32();
crc.update(bytes, 0, bytes.length);
if (crcValue != crc.getValue()) {
throw new IOException("CRC doesn't match " + crcValue +
" vs " + crc.getValue());
}
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(bytes, hdr);
sb.append(DateFormat.getDateTimeInstance(DateFormat.SHORT,
DateFormat.LONG).format(new Date(hdr.getTime()))
+ " session 0x"
+ Long.toHexString(hdr.getClientId())
+ " cxid 0x"
+ Long.toHexString(hdr.getCxid())
+ " zxid 0x"
+ Long.toHexString(hdr.getZxid())
+ " " + ZooLog.op2String(hdr.getType()) + " " + txn+"\r\n");
if (logStream.readByte("EOR") != 'B') {
sb.append("Last transaction was partial.");
}
count++;
}
return sb.toString();
}
/**
* 读取多行日志
* @param total 读取的行数
* @return
* @throws IOException
*/
public String getLastLog(int total) throws IOException {
StringBuilder sb=new StringBuilder(1024);
FileInputStream fis = new FileInputStream(this.logFile);
BinaryInputArchive logStream = BinaryInputArchive.getArchive(fis);
FileHeader fhdr = new FileHeader();
fhdr.deserialize(logStream, "fileheader");
if (fhdr.getMagic() != FileTxnLog.TXNLOG_MAGIC) {
return "Invalid magic number for " + logFile;
}
sb.append("ZooKeeper Transactional Log File with dbid "
+ fhdr.getDbid() + " txnlog format version "
+ fhdr.getVersion()+"\r\n");
int count=0;
while (count<total) {
long crcValue;
byte[] bytes;
try {
crcValue = logStream.readLong("crcvalue");
bytes = logStream.readBuffer("txnEntry");
} catch (EOFException e) {
sb.append("EOF reached after " + count + " txns.\r\n");
break;
}
if (bytes.length == 0) {
// Since we preallocate, we define EOF to be an
// empty transaction
sb.append("EOF reached after " + count + " txns.\r\n");
break;
}
Checksum crc = new Adler32();
crc.update(bytes, 0, bytes.length);
if (crcValue != crc.getValue()) {
throw new IOException("CRC doesn't match " + crcValue +
" vs " + crc.getValue());
}
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(bytes, hdr);
sb.append(DateFormat.getDateTimeInstance(DateFormat.SHORT,
DateFormat.LONG).format(new Date(hdr.getTime()))
+ " session 0x"
+ Long.toHexString(hdr.getClientId())
+ " cxid 0x"
+ Long.toHexString(hdr.getCxid())
+ " zxid 0x"
+ Long.toHexString(hdr.getZxid())
+ " " + ZooLog.op2String(hdr.getType()) + " " + txn+"\r\n");
if (logStream.readByte("EOR") != 'B') {
sb.append("Last transaction was partial.");
}
count++;
}
return sb.toString();
}
@Override
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
return zkdb.processTxn(hdr, txn);
}
public void parse(LogEntryReceiver receiver) throws Exception
{
if ( !validHeader )
{
throw new Exception("Invalid magic number for");
}
while ( true )
{
long crcValue;
byte[] bytes;
try
{
crcValue = logStream.readLong("crcvalue");
bytes = logStream.readBuffer("txnEntry");
}
catch ( EOFException e )
{
break;
}
if ( bytes.length == 0 )
{
// Since we preallocate, we define EOF to be an
// empty transaction
break;
}
Checksum crc = new Adler32();
crc.update(bytes, 0, bytes.length);
if ( crcValue != crc.getValue() )
{
throw new IOException("CRC doesn't match " + crcValue + " vs " + crc.getValue());
}
InputArchive iab = BinaryInputArchive.getArchive(new ByteArrayInputStream(bytes));
TxnHeader hdr = new TxnHeader();
Record record = useOldDeserializeMethod ? (Record)deserializeTxnMethod.invoke(null, iab, hdr) : (Record)deserializeTxnMethod.invoke(null, bytes, hdr);
if ( logStream.readByte("EOR") != 'B' )
{
break; // partial transaction
}
receiver.receiveEntry(hdr, record);
}
}
private static void readTransactionLog(String logfilepath) throws FileNotFoundException,
IOException, EOFException {
FileInputStream fis = new FileInputStream(logfilepath);
BinaryInputArchive logStream = BinaryInputArchive.getArchive(fis);
FileHeader fhdr = new FileHeader();
fhdr.deserialize(logStream, "fileheader");
if (fhdr.getMagic() != FileTxnLog.TXNLOG_MAGIC) {
System.err.println("Invalid magic number for " + logfilepath);
System.exit(2);
}
if (bw != null) {
bw.write("ZooKeeper Transactional Log File with dbid " + fhdr.getDbid()
+ " txnlog format version " + fhdr.getVersion());
bw.newLine();
} else {
System.out.println("ZooKeeper Transactional Log File with dbid " + fhdr.getDbid()
+ " txnlog format version " + fhdr.getVersion());
}
int count = 0;
while (true) {
long crcValue;
byte[] bytes;
try {
crcValue = logStream.readLong("crcvalue");
bytes = logStream.readBuffer("txnEntry");
} catch (EOFException e) {
if (bw != null) {
bw.write("EOF reached after " + count + " txns.");
bw.newLine();
} else {
System.out.println("EOF reached after " + count + " txns.");
}
break;
}
if (bytes.length == 0) {
// Since we preallocate, we define EOF to be an
// empty transaction
if (bw != null) {
bw.write("EOF reached after " + count + " txns.");
bw.newLine();
} else {
System.out.println("EOF reached after " + count + " txns.");
}
return;
}
Checksum crc = new Adler32();
crc.update(bytes, 0, bytes.length);
if (crcValue != crc.getValue()) {
throw new IOException("CRC doesn't match " + crcValue + " vs " + crc.getValue());
}
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(bytes, hdr);
if (bw != null) {
bw.write(formatTransaction(hdr, txn));
bw.newLine();
} else {
System.out.println(formatTransaction(hdr, txn));
}
if (logStream.readByte("EOR") != 'B') {
LOG.error("Last transaction was partial.");
throw new EOFException("Last transaction was partial.");
}
count++;
}
}
public void receiveEntry(TxnHeader header, Record record) throws Exception;