下面列出了java.util.zip.Checksum#reset ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test(enabled = true)
public void testWriteChunkData() throws Exception {
NullWritable nada = NullWritable.get();
MneDurableOutputSession<DurableChunk<?>> sess =
new MneDurableOutputSession<DurableChunk<?>>(m_tacontext, null,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
MneDurableOutputValue<DurableChunk<?>> mdvalue =
new MneDurableOutputValue<DurableChunk<?>>(sess);
OutputFormat<NullWritable, MneDurableOutputValue<DurableChunk<?>>> outputFormat =
new MneOutputFormat<MneDurableOutputValue<DurableChunk<?>>>();
RecordWriter<NullWritable, MneDurableOutputValue<DurableChunk<?>>> writer =
outputFormat.getRecordWriter(null, m_conf, null, null);
DurableChunk<?> dchunk = null;
Checksum cs = new CRC32();
cs.reset();
for (int i = 0; i < m_reccnt; ++i) {
dchunk = genupdDurableChunk(sess, cs);
Assert.assertNotNull(dchunk);
writer.write(nada, mdvalue.of(dchunk));
}
m_checksum = cs.getValue();
writer.close(null);
sess.close();
}
@Test(enabled = true)
public void testWriteBufferData() throws Exception {
NullWritable nada = NullWritable.get();
MneDurableOutputSession<DurableBuffer<?>> sess =
new MneDurableOutputSession<DurableBuffer<?>>(m_tacontext, null,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
MneDurableOutputValue<DurableBuffer<?>> mdvalue =
new MneDurableOutputValue<DurableBuffer<?>>(sess);
OutputFormat<NullWritable, MneDurableOutputValue<DurableBuffer<?>>> outputFormat =
new MneOutputFormat<MneDurableOutputValue<DurableBuffer<?>>>();
RecordWriter<NullWritable, MneDurableOutputValue<DurableBuffer<?>>> writer =
outputFormat.getRecordWriter(m_tacontext);
DurableBuffer<?> dbuf = null;
Checksum cs = new CRC32();
cs.reset();
for (int i = 0; i < m_reccnt; ++i) {
dbuf = genupdDurableBuffer(sess, cs);
Assert.assertNotNull(dbuf);
writer.write(nada, mdvalue.of(dbuf));
}
m_checksum = cs.getValue();
writer.close(m_tacontext);
sess.close();
}
@Test(enabled = true)
public void testWriteChunkData() throws Exception {
NullWritable nada = NullWritable.get();
MneDurableOutputSession<DurableChunk<?>> sess =
new MneDurableOutputSession<DurableChunk<?>>(m_tacontext, null,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
MneDurableOutputValue<DurableChunk<?>> mdvalue =
new MneDurableOutputValue<DurableChunk<?>>(sess);
OutputFormat<NullWritable, MneDurableOutputValue<DurableChunk<?>>> outputFormat =
new MneOutputFormat<MneDurableOutputValue<DurableChunk<?>>>();
RecordWriter<NullWritable, MneDurableOutputValue<DurableChunk<?>>> writer =
outputFormat.getRecordWriter(m_tacontext);
DurableChunk<?> dchunk = null;
Checksum cs = new CRC32();
cs.reset();
for (int i = 0; i < m_reccnt; ++i) {
dchunk = genupdDurableChunk(sess, cs);
Assert.assertNotNull(dchunk);
writer.write(nada, mdvalue.of(dchunk));
}
m_checksum = cs.getValue();
writer.close(m_tacontext);
sess.close();
}
private static void createIndexFile(File indexFile, Configuration conf)
throws IOException {
if (indexFile.exists()) {
System.out.println("Deleting existing file");
indexFile.delete();
}
indexFile.createNewFile();
FSDataOutputStream output = FileSystem.getLocal(conf).getRaw().append(
new Path(indexFile.getAbsolutePath()));
Checksum crc = new PureJavaCrc32();
crc.reset();
CheckedOutputStream chk = new CheckedOutputStream(output, crc);
String msg = "Writing new index file. This file will be used only " +
"for the testing.";
chk.write(Arrays.copyOf(msg.getBytes(),
MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH));
output.writeLong(chk.getChecksum().getValue());
output.close();
}
@Test(dependsOnMethods = {"testGenChunkBuffers"})
public void testCheckChunkBuffers() {
Checksum cs = new CRC32();
cs.reset();
NonVolatileMemAllocator act = new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService("pmalloc"),
1L, "./pmchunkbuffertest.dat", false);
act.setChunkReclaimer(new Reclaim<Long>() {
@Override
public boolean reclaim(Long mres, Long sz) {
System.out.println(String.format("Reclaim Memory Chunk: %X Size: %s", System.identityHashCode(mres),
null == sz ? "NULL" : sz.toString()));
return false;
}
});
DurableChunk<NonVolatileMemAllocator> mch;
mch = act.retrieveChunk(act.getHandler(m_keyid));
Assert.assertNotNull(mch);
long bufcnt = mch.getSize() / m_bufsize;
ChunkBuffer ckbuf;
byte[] buf;
for (long idx = 0; idx < bufcnt; ++idx) {
ckbuf = mch.getChunkBuffer(idx * m_bufsize, m_bufsize);
Assert.assertNotNull(ckbuf);
buf = new byte[m_bufsize];
ckbuf.get().clear();
ckbuf.get().get(buf);
cs.update(buf, 0, buf.length);
}
act.close();
Assert.assertEquals(m_checksum, cs.getValue());
Assert.assertEquals(m_count, bufcnt);
System.out.println(String.format("The checksum of chunk buffers are %d, Total count is %d", m_checksum, m_count));
}
private static void testByteArrayOffset(Checksum checksum, long expected) {
byte[] unaligned_bytes_123456789 = new byte[BYTES_123456789.length + 64];
for (int i = 0; i < unaligned_bytes_123456789.length - BYTES_123456789.length; i++) {
checksum.reset();
System.arraycopy(BYTES_123456789, 0, unaligned_bytes_123456789, i, BYTES_123456789.length);
checksum.update(unaligned_bytes_123456789, i, BYTES_123456789.length);
checkChecksumOffset(checksum, expected, i);
}
}
private static void testLittleEndianDirectByteBufferOffset(Checksum checksum, long expected) {
byte[] unaligned_bytes_123456789 = new byte[BYTES_123456789.length + 64];
for (int i = 0; i < unaligned_bytes_123456789.length - BYTES_123456789.length; i++) {
checksum.reset();
ByteBuffer bb = ByteBuffer.allocateDirect(unaligned_bytes_123456789.length);
bb.order(ByteOrder.LITTLE_ENDIAN);
System.arraycopy(BYTES_123456789, 0, unaligned_bytes_123456789, i, BYTES_123456789.length);
bb.put(unaligned_bytes_123456789);
bb.position(i);
bb.limit(i + BYTES_123456789.length);
checksum.update(bb);
checkChecksumOffset(checksum, expected, i);
}
}
private static void testReadonlyByteBufferOffset(Checksum checksum, long expected) {
byte[] unaligned_bytes_123456789 = new byte[BYTES_123456789.length + 64];
for (int i = 0; i < unaligned_bytes_123456789.length - BYTES_123456789.length; i++) {
checksum.reset();
System.arraycopy(BYTES_123456789, 0, unaligned_bytes_123456789, i, BYTES_123456789.length);
ByteBuffer bb = ByteBuffer.wrap(unaligned_bytes_123456789).asReadOnlyBuffer();
bb.position(i);
bb.limit(i + BYTES_123456789.length);
checksum.update(bb);
checkChecksumOffset(checksum, expected, i);
}
}
@Override
public Boolean invoke(File f, VirtualChannel channel) throws IOException, InterruptedException {
PrintStream logger = listener.getLogger();
try (ZipFile zip = new ZipFile(f)) {
logger.print("Checking ");
logger.print(zip.size());
logger.print(" zipped entries in ");
logger.println(f.getAbsolutePath());
Checksum checksum = new CRC32();
byte[] buffer = new byte[4096];
Enumeration<? extends ZipEntry> entries = zip.entries();
while (entries.hasMoreElements()) {
checksum.reset();
ZipEntry entry = entries.nextElement();
if (!entry.isDirectory()) {
try (InputStream inputStream = zip.getInputStream(entry)) {
int length;
while ((length = IOUtils.read(inputStream, buffer)) > 0) {
checksum.update(buffer, 0, length);
}
if (checksum.getValue() != entry.getCrc()) {
listener.error("Checksum error in : " + f.getAbsolutePath() + ":" + entry.getName());
return false;
}
}
}
}
return true;
} catch (ZipException e) {
listener.error("Error validating zip file: " + e.getMessage());
return false;
} finally {
logger.flush();
}
}
public SpillRecord(Path indexFileName, JobConf job, Checksum crc,
String expectedIndexOwner)
throws IOException {
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
final FSDataInputStream in =
SecureIOUtils.openFSDataInputStream(new File(indexFileName.toUri()
.getRawPath()), expectedIndexOwner, null);
try {
final long length = rfs.getFileStatus(indexFileName).getLen();
final int partitions = (int) length / MAP_OUTPUT_INDEX_RECORD_LENGTH;
final int size = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
buf = ByteBuffer.allocate(size);
if (crc != null) {
crc.reset();
CheckedInputStream chk = new CheckedInputStream(in, crc);
IOUtils.readFully(chk, buf.array(), 0, size);
if (chk.getChecksum().getValue() != in.readLong()) {
throw new ChecksumException("Checksum error reading spill index: " +
indexFileName, -1);
}
} else {
IOUtils.readFully(in, buf.array(), 0, size);
}
entries = buf.asLongBuffer();
} finally {
in.close();
}
}
public static void outputRecords(OutputStream out,
boolean useAscii,
Unsigned16 firstRecordNumber,
Unsigned16 recordsToGenerate,
Unsigned16 checksum
) throws IOException {
byte[] row = new byte[100];
Unsigned16 recordNumber = new Unsigned16(firstRecordNumber);
Unsigned16 lastRecordNumber = new Unsigned16(firstRecordNumber);
Checksum crc = new PureJavaCrc32();
Unsigned16 tmp = new Unsigned16();
lastRecordNumber.add(recordsToGenerate);
Unsigned16 ONE = new Unsigned16(1);
Unsigned16 rand = Random16.skipAhead(firstRecordNumber);
while (!recordNumber.equals(lastRecordNumber)) {
Random16.nextRand(rand);
if (useAscii) {
generateAsciiRecord(row, rand, recordNumber);
} else {
generateRecord(row, rand, recordNumber);
}
if (checksum != null) {
crc.reset();
crc.update(row, 0, row.length);
tmp.set(crc.getValue());
checksum.add(tmp);
}
recordNumber.add(ONE);
out.write(row);
}
}
public static void outputRecords(OutputStream out,
boolean useAscii,
Unsigned16 firstRecordNumber,
Unsigned16 recordsToGenerate,
Unsigned16 checksum
) throws IOException {
byte[] row = new byte[100];
Unsigned16 recordNumber = new Unsigned16(firstRecordNumber);
Unsigned16 lastRecordNumber = new Unsigned16(firstRecordNumber);
Checksum crc = new PureJavaCrc32();
Unsigned16 tmp = new Unsigned16();
lastRecordNumber.add(recordsToGenerate);
Unsigned16 ONE = new Unsigned16(1);
Unsigned16 rand = Random16.skipAhead(firstRecordNumber);
while (!recordNumber.equals(lastRecordNumber)) {
Random16.nextRand(rand);
if (useAscii) {
generateAsciiRecord(row, rand, recordNumber);
} else {
generateRecord(row, rand, recordNumber);
}
if (checksum != null) {
crc.reset();
crc.update(row, 0, row.length);
tmp.set(crc.getValue());
checksum.add(tmp);
}
recordNumber.add(ONE);
out.write(row);
}
}
public SpillRecord(Path indexFileName, JobConf job, Checksum crc,
String expectedIndexOwner)
throws IOException {
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
final FSDataInputStream in =
SecureIOUtils.openFSDataInputStream(new File(indexFileName.toUri()
.getRawPath()), expectedIndexOwner, null);
try {
final long length = rfs.getFileStatus(indexFileName).getLen();
final int partitions = (int) length / MAP_OUTPUT_INDEX_RECORD_LENGTH;
final int size = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
buf = ByteBuffer.allocate(size);
if (crc != null) {
crc.reset();
CheckedInputStream chk = new CheckedInputStream(in, crc);
IOUtils.readFully(chk, buf.array(), 0, size);
if (chk.getChecksum().getValue() != in.readLong()) {
throw new ChecksumException("Checksum error reading spill index: " +
indexFileName, -1);
}
} else {
IOUtils.readFully(in, buf.array(), 0, size);
}
entries = buf.asLongBuffer();
} finally {
in.close();
}
}
public static void outputRecords(OutputStream out,
boolean useAscii,
Unsigned16 firstRecordNumber,
Unsigned16 recordsToGenerate,
Unsigned16 checksum
) throws IOException {
byte[] row = new byte[100];
Unsigned16 recordNumber = new Unsigned16(firstRecordNumber);
Unsigned16 lastRecordNumber = new Unsigned16(firstRecordNumber);
Checksum crc = new PureJavaCrc32();
Unsigned16 tmp = new Unsigned16();
lastRecordNumber.add(recordsToGenerate);
Unsigned16 ONE = new Unsigned16(1);
Unsigned16 rand = Random16.skipAhead(firstRecordNumber);
while (!recordNumber.equals(lastRecordNumber)) {
Random16.nextRand(rand);
if (useAscii) {
generateAsciiRecord(row, rand, recordNumber);
} else {
generateRecord(row, rand, recordNumber);
}
if (checksum != null) {
crc.reset();
crc.update(row, 0, row.length);
tmp.set(crc.getValue());
checksum.add(tmp);
}
recordNumber.add(ONE);
out.write(row);
}
}
public void testRandom() {
Checksum c1 = new CRC32();
Checksum c2 = new BufferedChecksum(new CRC32());
int iterations = atLeast(10000);
for (int i = 0; i < iterations; i++) {
switch(random().nextInt(4)) {
case 0:
// update(byte[], int, int)
int length = random().nextInt(1024);
byte bytes[] = new byte[length];
random().nextBytes(bytes);
c1.update(bytes, 0, bytes.length);
c2.update(bytes, 0, bytes.length);
break;
case 1:
// update(int)
int b = random().nextInt(256);
c1.update(b);
c2.update(b);
break;
case 2:
// reset()
c1.reset();
c2.reset();
break;
case 3:
// getValue()
assertEquals(c1.getValue(), c2.getValue());
break;
}
}
assertEquals(c1.getValue(), c2.getValue());
}
private static void testByteArray(Checksum checksum, long expected) {
checksum.reset();
checksum.update(BYTES_123456789);
checkChecksum(checksum, expected);
}
@Test(enabled = true, dependsOnMethods = { "testWriteChunkData" })
public void testBatchReadChunkDataUsingInputSession() throws Exception {
List<String> partfns = new ArrayList<String>();
long reccnt = 0L;
long tsize = 0L;
Checksum cs = new CRC32();
cs.reset();
File folder = new File(m_workdir.toString());
File[] listfiles = folder.listFiles();
for (int idx = 0; idx < listfiles.length; ++idx) {
if (listfiles[idx].isFile()
&& listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
&& listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
partfns.add(listfiles[idx].getName());
}
}
Collections.sort(partfns); // keep the order for checksum
List<Path> paths = new ArrayList<Path>();
for (String fns : partfns) {
paths.add(new Path(m_workdir, fns));
System.out.println(String.format("[Batch Mode] Added : %s", fns));
}
MneDurableInputSession<DurableChunk<?>> m_session =
new MneDurableInputSession<DurableChunk<?>>(m_tacontext, null,
paths.toArray(new Path[0]), MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
SessionIterator<DurableChunk<?>, ?, Void, Void> m_iter = m_session.iterator();
DurableChunk<?> val = null;
while (m_iter.hasNext()) {
val = m_iter.next();
byte b;
for (int j = 0; j < val.getSize(); ++j) {
b = unsafe.getByte(val.get() + j);
cs.update(b);
}
tsize += val.getSize();
++reccnt;
}
AssertJUnit.assertEquals(m_reccnt, reccnt);
AssertJUnit.assertEquals(m_totalsize, tsize);
AssertJUnit.assertEquals(m_checksum, cs.getValue());
System.out.println(String.format("The checksum of chunk is %d [Batch Mode]", m_checksum));
}
@Test(enabled = true, dependsOnMethods = {"testWriteChunkData"})
public void testReadChunkData() throws Exception {
List<String> partfns = new ArrayList<String>();
long reccnt = 0L;
long tsize = 0L;
Checksum cs = new CRC32();
cs.reset();
File folder = new File(m_workdir.toString());
File[] listfiles = folder.listFiles();
for (int idx = 0; idx < listfiles.length; ++idx) {
if (listfiles[idx].isFile()
&& listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
&& listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
partfns.add(listfiles[idx].getName());
}
}
Collections.sort(partfns); // keep the order for checksum
for (int idx = 0; idx < partfns.size(); ++idx) {
System.out.println(String.format("Verifying : %s", partfns.get(idx)));
FileSplit split = new FileSplit(
new Path(m_workdir, partfns.get(idx)), 0, 0L, new String[0]);
InputFormat<NullWritable, MneDurableInputValue<DurableChunk<?>>> inputFormat =
new MneInputFormat<MneDurableInputValue<DurableChunk<?>>, DurableChunk<?>>();
RecordReader<NullWritable, MneDurableInputValue<DurableChunk<?>>> reader =
inputFormat.getRecordReader(split, m_conf, null);
MneDurableInputValue<DurableChunk<?>> dchkval = null;
NullWritable dchkkey = reader.createKey();
while (true) {
dchkval = reader.createValue();
if (reader.next(dchkkey, dchkval)) {
byte b;
for (int j = 0; j < dchkval.getValue().getSize(); ++j) {
b = unsafe.getByte(dchkval.getValue().get() + j);
cs.update(b);
}
tsize += dchkval.getValue().getSize();
++reccnt;
} else {
break;
}
}
reader.close();
}
AssertJUnit.assertEquals(m_reccnt, reccnt);
AssertJUnit.assertEquals(m_totalsize, tsize);
AssertJUnit.assertEquals(m_checksum, cs.getValue());
System.out.println(String.format("The checksum of chunk is %d", m_checksum));
}
@Test(enabled = true, dependsOnMethods = { "testWriteChunkData" })
public void testReadChunkData() throws Exception {
List<String> partfns = new ArrayList<String>();
long reccnt = 0L;
long tsize = 0L;
Checksum cs = new CRC32();
cs.reset();
File folder = new File(m_workdir.toString());
File[] listfiles = folder.listFiles();
for (int idx = 0; idx < listfiles.length; ++idx) {
if (listfiles[idx].isFile()
&& listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
&& listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
partfns.add(listfiles[idx].getName());
}
}
Collections.sort(partfns); // keep the order for checksum
for (int idx = 0; idx < partfns.size(); ++idx) {
System.out.println(String.format("Verifying : %s", partfns.get(idx)));
FileSplit split = new FileSplit(
new Path(m_workdir, partfns.get(idx)), 0, 0L, new String[0]);
InputFormat<NullWritable, MneDurableInputValue<DurableChunk<?>>> inputFormat =
new MneInputFormat<MneDurableInputValue<DurableChunk<?>>, DurableChunk<?>>();
RecordReader<NullWritable, MneDurableInputValue<DurableChunk<?>>> reader =
inputFormat.createRecordReader(split, m_tacontext);
MneDurableInputValue<DurableChunk<?>> dchkval = null;
while (reader.nextKeyValue()) {
dchkval = reader.getCurrentValue();
byte b;
for (int j = 0; j < dchkval.getValue().getSize(); ++j) {
b = unsafe.getByte(dchkval.getValue().get() + j);
cs.update(b);
}
tsize += dchkval.getValue().getSize();
++reccnt;
}
reader.close();
}
AssertJUnit.assertEquals(m_reccnt, reccnt);
AssertJUnit.assertEquals(m_totalsize, tsize);
AssertJUnit.assertEquals(m_checksum, cs.getValue());
System.out.println(String.format("The checksum of chunk is %d", m_checksum));
}
@Test(enabled = true, dependsOnMethods = {"testWriteChunkData"})
public void testBatchReadChunkDataUsingInputSession() throws Exception {
List<String> partfns = new ArrayList<String>();
long reccnt = 0L;
long tsize = 0L;
Checksum cs = new CRC32();
cs.reset();
File folder = new File(m_workdir.toString());
File[] listfiles = folder.listFiles();
for (int idx = 0; idx < listfiles.length; ++idx) {
if (listfiles[idx].isFile()
&& listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
&& listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
partfns.add(listfiles[idx].getName());
}
}
Collections.sort(partfns); // keep the order for checksum
List<Path> paths = new ArrayList<Path>();
for (String fns : partfns) {
paths.add(new Path(m_workdir, fns));
System.out.println(String.format("[Batch Mode] Added : %s", fns));
}
MneDurableInputSession<DurableChunk<?>> m_session =
new MneDurableInputSession<DurableChunk<?>>(m_tacontext, null,
paths.toArray(new Path[0]), MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
SessionIterator<DurableChunk<?>, ?, Void, Void> m_iter = m_session.iterator();
DurableChunk<?> val = null;
while (m_iter.hasNext()) {
val = m_iter.next();
byte b;
for (int j = 0; j < val.getSize(); ++j) {
b = unsafe.getByte(val.get() + j);
cs.update(b);
}
tsize += val.getSize();
++reccnt;
}
AssertJUnit.assertEquals(m_reccnt, reccnt);
AssertJUnit.assertEquals(m_totalsize, tsize);
AssertJUnit.assertEquals(m_checksum, cs.getValue());
System.out.println(String.format("The checksum of chunk is %d [Batch Mode]", m_checksum));
}