下面列出了怎么用org.apache.hadoop.io.BoundedByteArrayOutputStream的API类实例代码及写法,或者点击链接到github查看源代码。
public InMemoryMapOutput(Configuration conf, TaskAttemptID mapId,
MergeManagerImpl<K, V> merger,
int size, CompressionCodec codec,
boolean primaryMapOutput) {
super(mapId, (long)size, primaryMapOutput);
this.conf = conf;
this.merger = merger;
this.codec = codec;
byteStream = new BoundedByteArrayOutputStream(size);
memory = byteStream.getBuffer();
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
} else {
decompressor = null;
}
}
public InMemoryMapOutput(Configuration conf, TaskAttemptID mapId,
MergeManagerImpl<K, V> merger,
int size, CompressionCodec codec,
boolean primaryMapOutput) {
super(mapId, (long)size, primaryMapOutput);
this.conf = conf;
this.merger = merger;
this.codec = codec;
byteStream = new BoundedByteArrayOutputStream(size);
memory = byteStream.getBuffer();
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
} else {
decompressor = null;
}
}
MapOutput(InputAttemptIdentifier attemptIdentifier, MergeManager merger, int size,
boolean primaryMapOutput) {
this.id = ID.incrementAndGet();
this.attemptIdentifier = attemptIdentifier;
this.merger = merger;
type = Type.MEMORY;
byteStream = new BoundedByteArrayOutputStream(size);
memory = byteStream.getBuffer();
this.size = size;
localFS = null;
disk = null;
outputPath = null;
tmpOutputPath = null;
this.primaryMapOutput = primaryMapOutput;
}
/**
* Note that we do not allow compression in in-mem stream.
* When spilled over to file, compression gets enabled.
*
* @param conf
* @param fs
* @param taskOutput
* @param keyClass
* @param valueClass
* @param codec
* @param writesCounter
* @param serializedBytesCounter
* @param cacheSize
* @throws IOException
*/
public FileBackedInMemIFileWriter(Configuration conf, FileSystem fs,
TezTaskOutput taskOutput, Class keyClass, Class valueClass,
CompressionCodec codec,
TezCounter writesCounter,
TezCounter serializedBytesCounter,
int cacheSize) throws IOException {
super(conf, new FSDataOutputStream(createBoundedBuffer(cacheSize), null),
keyClass, valueClass, null, writesCounter, serializedBytesCounter);
this.fs = fs;
this.cacheStream = (BoundedByteArrayOutputStream) this.rawOut.getWrappedStream();
this.taskOutput = taskOutput;
this.bufferFull = (cacheStream == null);
this.totalSize = getBaseCacheSize();
this.fileCodec = codec;
}
private List<TezMerger.Segment> createInMemorySegments(int segmentCount, int keysPerSegment)
throws IOException {
List<TezMerger.Segment> segmentList = Lists.newLinkedList();
Random rnd = new Random();
DataInputBuffer key = new DataInputBuffer();
DataInputBuffer value = new DataInputBuffer();
for (int i = 0; i < segmentCount; i++) {
BoundedByteArrayOutputStream stream = new BoundedByteArrayOutputStream(10000);
InMemoryWriter writer = new InMemoryWriter(stream);
for (int j = 0; j < keysPerSegment; j++) {
populateData(new IntWritable(rnd.nextInt()), new LongWritable(rnd.nextLong()), key, value);
writer.append(key, value);
}
writer.close();
InMemoryReader reader = new InMemoryReader(merger, null, stream.getBuffer(), 0, stream.getLimit());
segmentList.add(new TezMerger.Segment(reader, null));
}
return segmentList;
}
private void fillOutput(InMemoryMapOutput<Text, Text> output) throws IOException {
BoundedByteArrayOutputStream stream = output.getArrayStream();
int count = stream.getLimit();
for (int i=0; i < count; ++i) {
stream.write(i);
}
}
private void fillOutput(InMemoryMapOutput<Text, Text> output) throws IOException {
BoundedByteArrayOutputStream stream = output.getArrayStream();
int count = stream.getLimit();
for (int i=0; i < count; ++i) {
stream.write(i);
}
}
@Test
//Test InMemoryWriter
public void testInMemoryWriter() throws IOException {
InMemoryWriter writer = null;
BoundedByteArrayOutputStream bout = new BoundedByteArrayOutputStream(1024 * 1024);
List<KVPair> data = KVDataGen.generateTestData(true, 0);
//No RLE, No RepeatKeys, no compression
writer = new InMemoryWriter(bout);
writeTestFileUsingDataBuffer(writer, false, false, data, null);
readUsingInMemoryReader(bout.getBuffer(), data);
//No RLE, RepeatKeys, no compression
bout.reset();
writer = new InMemoryWriter(bout);
writeTestFileUsingDataBuffer(writer, false, true, data, null);
readUsingInMemoryReader(bout.getBuffer(), data);
//RLE, No RepeatKeys, no compression
bout.reset();
writer = new InMemoryWriter(bout);
writeTestFileUsingDataBuffer(writer, true, false, data, null);
readUsingInMemoryReader(bout.getBuffer(), data);
//RLE, RepeatKeys, no compression
bout.reset();
writer = new InMemoryWriter(bout);
writeTestFileUsingDataBuffer(writer, true, true, data, null);
readUsingInMemoryReader(bout.getBuffer(), data);
}
/**
* Flip over from memory to file based writer.
*
* 1. Content format: HEADER + real data + CHECKSUM. Checksum is for real
* data.
* 2. Before flipping, close checksum stream, so that checksum is written
* out.
* 3. Create relevant file based writer.
* 4. Write header and then real data.
*
* @throws IOException
*/
private void resetToFileBasedWriter() throws IOException {
// Close out stream, so that data checksums are written.
// Buf contents = HEADER + real data + CHECKSUM
this.out.close();
// Get the buffer which contains data in memory
BoundedByteArrayOutputStream bout =
(BoundedByteArrayOutputStream) this.rawOut.getWrappedStream();
// Create new file based writer
if (outputPath == null) {
outputPath = taskOutput.getOutputFileForWrite();
}
LOG.info("Switching from mem stream to disk stream. File: " + outputPath);
FSDataOutputStream newRawOut = fs.create(outputPath);
this.rawOut = newRawOut;
this.ownOutputStream = true;
setupOutputStream(fileCodec);
// Write header to file
headerWritten = false;
writeHeader(newRawOut);
// write real data
int sPos = HEADER.length;
int len = (bout.size() - checksumSize - HEADER.length);
this.out.write(bout.getBuffer(), sPos, len);
bufferFull = true;
bout.reset();
}
@Test(timeout = 5000)
//Test InMemoryWriter
public void testInMemoryWriter() throws IOException {
InMemoryWriter writer = null;
BoundedByteArrayOutputStream bout = new BoundedByteArrayOutputStream(1024 * 1024);
List<KVPair> data = KVDataGen.generateTestData(true, 10);
//No RLE, No RepeatKeys, no compression
writer = new InMemoryWriter(bout);
writeTestFileUsingDataBuffer(writer, false, data);
readUsingInMemoryReader(bout.getBuffer(), data);
//No RLE, RepeatKeys, no compression
bout.reset();
writer = new InMemoryWriter(bout);
writeTestFileUsingDataBuffer(writer, true, data);
readUsingInMemoryReader(bout.getBuffer(), data);
//RLE, No RepeatKeys, no compression
bout.reset();
writer = new InMemoryWriter(bout, true);
writeTestFileUsingDataBuffer(writer, false, data);
readUsingInMemoryReader(bout.getBuffer(), data);
//RLE, RepeatKeys, no compression
bout.reset();
writer = new InMemoryWriter(bout, true);
writeTestFileUsingDataBuffer(writer, true, data);
readUsingInMemoryReader(bout.getBuffer(), data);
}
public InMemoryWriter(BoundedByteArrayOutputStream arrayStream) {
super(null);
this.out =
new DataOutputStream(new IFileOutputStream(arrayStream));
}
public BoundedByteArrayOutputStream getArrayStream() {
return byteStream;
}
@Override
public void close() throws IOException {
if (closed == true) {
return;
}
try {
++errorCount;
byte[] key = currentKeyBufferOS.getBuffer();
int len = currentKeyBufferOS.size();
/**
* verify length.
*/
if (expectedLength >= 0 && expectedLength != len) {
throw new IOException("Incorrect key length: expected="
+ expectedLength + " actual=" + len);
}
Utils.writeVInt(blkAppender, len);
blkAppender.write(key, 0, len);
if (tfileIndex.getFirstKey() == null) {
tfileIndex.setFirstKey(key, 0, len);
}
if (tfileMeta.isSorted() && tfileMeta.getRecordCount()>0) {
byte[] lastKey = lastKeyBufferOS.getBuffer();
int lastLen = lastKeyBufferOS.size();
if (tfileMeta.getComparator().compare(key, 0, len, lastKey, 0,
lastLen) < 0) {
throw new IOException("Keys are not added in sorted order");
}
}
BoundedByteArrayOutputStream tmp = currentKeyBufferOS;
currentKeyBufferOS = lastKeyBufferOS;
lastKeyBufferOS = tmp;
--errorCount;
} finally {
closed = true;
state = State.END_KEY;
}
}
public InMemoryWriter(BoundedByteArrayOutputStream arrayStream) {
super(null);
this.out =
new DataOutputStream(new IFileOutputStream(arrayStream));
}
public BoundedByteArrayOutputStream getArrayStream() {
return byteStream;
}
@Override
public void close() throws IOException {
if (closed == true) {
return;
}
try {
++errorCount;
byte[] key = currentKeyBufferOS.getBuffer();
int len = currentKeyBufferOS.size();
/**
* verify length.
*/
if (expectedLength >= 0 && expectedLength != len) {
throw new IOException("Incorrect key length: expected="
+ expectedLength + " actual=" + len);
}
Utils.writeVInt(blkAppender, len);
blkAppender.write(key, 0, len);
if (tfileIndex.getFirstKey() == null) {
tfileIndex.setFirstKey(key, 0, len);
}
if (tfileMeta.isSorted() && tfileMeta.getRecordCount()>0) {
byte[] lastKey = lastKeyBufferOS.getBuffer();
int lastLen = lastKeyBufferOS.size();
if (tfileMeta.getComparator().compare(key, 0, len, lastKey, 0,
lastLen) < 0) {
throw new IOException("Keys are not added in sorted order");
}
}
BoundedByteArrayOutputStream tmp = currentKeyBufferOS;
currentKeyBufferOS = lastKeyBufferOS;
lastKeyBufferOS = tmp;
--errorCount;
} finally {
closed = true;
state = State.END_KEY;
}
}
@Override
public void close() throws IOException {
if (closed == true) {
return;
}
try {
++errorCount;
byte[] key = currentKeyBufferOS.getBuffer();
int len = currentKeyBufferOS.size();
/**
* verify length.
*/
if (expectedLength >= 0 && expectedLength != len) {
throw new IOException("Incorrect key length: expected="
+ expectedLength + " actual=" + len);
}
Utils.writeVInt(blkAppender, len);
blkAppender.write(key, 0, len);
if (tfileIndex.getFirstKey() == null) {
tfileIndex.setFirstKey(key, 0, len);
}
if (tfileMeta.isSorted() && tfileMeta.getRecordCount()>0) {
byte[] lastKey = lastKeyBufferOS.getBuffer();
int lastLen = lastKeyBufferOS.size();
if (tfileMeta.getComparator().compare(key, 0, len, lastKey, 0,
lastLen) < 0) {
throw new IOException("Keys are not added in sorted order");
}
}
BoundedByteArrayOutputStream tmp = currentKeyBufferOS;
currentKeyBufferOS = lastKeyBufferOS;
lastKeyBufferOS = tmp;
--errorCount;
} finally {
closed = true;
state = State.END_KEY;
}
}
public MemoryFetchedInput(long actualSize, long compressedSize,
InputAttemptIdentifier inputAttemptIdentifier,
FetchedInputCallback callbackHandler) {
super(Type.MEMORY, actualSize, compressedSize, inputAttemptIdentifier, callbackHandler);
this.byteStream = new BoundedByteArrayOutputStream((int) actualSize);
}
public InMemoryWriter(BoundedByteArrayOutputStream arrayStream) {
super(null, null);
this.out =
new DataOutputStream(new IFileOutputStream(arrayStream));
}
public BoundedByteArrayOutputStream getArrayStream() {
return byteStream;
}
public InMemoryWriter(BoundedByteArrayOutputStream arrayStream) {
this(arrayStream, false);
}
public InMemoryWriter(BoundedByteArrayOutputStream arrayStream, boolean rle) {
super(null, null, rle);
this.out = new NonSyncDataOutputStream(new IFileOutputStream(arrayStream));
}
/**
* create inmemory segments
*
* @return
* @throws IOException
*/
@SuppressWarnings("unchecked")
public List<TezMerger.Segment> createInMemStreams() throws IOException {
int numberOfStreams = Math.max(2, rnd.nextInt(10));
LOG.info("No of streams : " + numberOfStreams);
SerializationFactory serializationFactory = new SerializationFactory(conf);
Serializer keySerializer = serializationFactory.getSerializer(keyClass);
Serializer valueSerializer = serializationFactory.getSerializer(valClass);
LocalDirAllocator localDirAllocator =
new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
InputContext context = createTezInputContext();
MergeManager mergeManager = new MergeManager(conf, fs, localDirAllocator,
context, null, null, null, null, null, 1024 * 1024 * 10, null, false, -1);
DataOutputBuffer keyBuf = new DataOutputBuffer();
DataOutputBuffer valBuf = new DataOutputBuffer();
DataInputBuffer keyIn = new DataInputBuffer();
DataInputBuffer valIn = new DataInputBuffer();
keySerializer.open(keyBuf);
valueSerializer.open(valBuf);
List<TezMerger.Segment> segments = new LinkedList<TezMerger.Segment>();
for (int i = 0; i < numberOfStreams; i++) {
BoundedByteArrayOutputStream bout = new BoundedByteArrayOutputStream(1024 * 1024);
InMemoryWriter writer =
new InMemoryWriter(bout);
Map<Writable, Writable> data = createData();
//write data
for (Map.Entry<Writable, Writable> entry : data.entrySet()) {
keySerializer.serialize(entry.getKey());
valueSerializer.serialize(entry.getValue());
keyIn.reset(keyBuf.getData(), 0, keyBuf.getLength());
valIn.reset(valBuf.getData(), 0, valBuf.getLength());
writer.append(keyIn, valIn);
originalData.put(entry.getKey(), entry.getValue());
keyBuf.reset();
valBuf.reset();
keyIn.reset();
valIn.reset();
}
IFile.Reader reader = new InMemoryReader(mergeManager, null, bout.getBuffer(), 0,
bout.getBuffer().length);
segments.add(new TezMerger.Segment(reader, null));
data.clear();
writer.close();
}
return segments;
}
/**
* Constructor
*
* @param fsdos
* output stream for writing. Must be at position 0.
* @param minBlockSize
* Minimum compressed block size in bytes. A compression block will
* not be closed until it reaches this size except for the last
* block.
* @param compressName
* Name of the compression algorithm. Must be one of the strings
* returned by {@link TFile#getSupportedCompressionAlgorithms()}.
* @param comparator
* Leave comparator as null or empty string if TFile is not sorted.
* Otherwise, provide the string name for the comparison algorithm
* for keys. Two kinds of comparators are supported.
* <ul>
* <li>Algorithmic comparator: binary comparators that is language
* independent. Currently, only "memcmp" is supported.
* <li>Language-specific comparator: binary comparators that can
* only be constructed in specific language. For Java, the syntax
* is "jclass:", followed by the class name of the RawComparator.
* Currently, we only support RawComparators that can be
* constructed through the default constructor (with no
* parameters). Parameterized RawComparators such as
* {@link WritableComparator} or
* {@link JavaSerializationComparator} may not be directly used.
* One should write a wrapper class that inherits from such classes
* and use its default constructor to perform proper
* initialization.
* </ul>
* @param conf
* The configuration object.
* @throws IOException
*/
public Writer(FSDataOutputStream fsdos, int minBlockSize,
String compressName, String comparator, Configuration conf)
throws IOException {
sizeMinBlock = minBlockSize;
tfileMeta = new TFileMeta(comparator);
tfileIndex = new TFileIndex(tfileMeta.getComparator());
writerBCF = new BCFile.Writer(fsdos, compressName, conf);
currentKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
lastKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
this.conf = conf;
}
/**
* Constructor
*
* @param fsdos
* output stream for writing. Must be at position 0.
* @param minBlockSize
* Minimum compressed block size in bytes. A compression block will
* not be closed until it reaches this size except for the last
* block.
* @param compressName
* Name of the compression algorithm. Must be one of the strings
* returned by {@link TFile#getSupportedCompressionAlgorithms()}.
* @param comparator
* Leave comparator as null or empty string if TFile is not sorted.
* Otherwise, provide the string name for the comparison algorithm
* for keys. Two kinds of comparators are supported.
* <ul>
* <li>Algorithmic comparator: binary comparators that is language
* independent. Currently, only "memcmp" is supported.
* <li>Language-specific comparator: binary comparators that can
* only be constructed in specific language. For Java, the syntax
* is "jclass:", followed by the class name of the RawComparator.
* Currently, we only support RawComparators that can be
* constructed through the default constructor (with no
* parameters). Parameterized RawComparators such as
* {@link WritableComparator} or
* {@link JavaSerializationComparator} may not be directly used.
* One should write a wrapper class that inherits from such classes
* and use its default constructor to perform proper
* initialization.
* </ul>
* @param conf
* The configuration object.
* @throws IOException
*/
public Writer(FSDataOutputStream fsdos, int minBlockSize,
String compressName, String comparator, Configuration conf)
throws IOException {
sizeMinBlock = minBlockSize;
tfileMeta = new TFileMeta(comparator);
tfileIndex = new TFileIndex(tfileMeta.getComparator());
writerBCF = new BCFile.Writer(fsdos, compressName, conf);
currentKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
lastKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
this.conf = conf;
}
/**
* Constructor
*
* @param fsdos
* output stream for writing. Must be at position 0.
* @param minBlockSize
* Minimum compressed block size in bytes. A compression block will
* not be closed until it reaches this size except for the last
* block.
* @param compressName
* Name of the compression algorithm. Must be one of the strings
* returned by {@link DTFile#getSupportedCompressionAlgorithms()}.
* @param comparator
* Leave comparator as null or empty string if TFile is not sorted.
* Otherwise, provide the string name for the comparison algorithm
* for keys. Two kinds of comparators are supported.
* <ul>
* <li>Algorithmic comparator: binary comparators that is language
* independent. Currently, only "memcmp" is supported.
* <li>Language-specific comparator: binary comparators that can
* only be constructed in specific language. For Java, the syntax
* is "jclass:", followed by the class name of the RawComparator.
* Currently, we only support RawComparators that can be
* constructed through the default constructor (with no
* parameters). Parameterized RawComparators such as
* {@link WritableComparator} or
* {@link JavaSerializationComparator} may not be directly used.
* One should write a wrapper class that inherits from such classes
* and use its default constructor to perform proper
* initialization.
* </ul>
* @param conf
* The configuration object.
* @throws IOException
*/
public Writer(FSDataOutputStream fsdos, int minBlockSize,
String compressName, String comparator, Configuration conf)
throws IOException {
sizeMinBlock = minBlockSize;
tfileMeta = new TFileMeta(comparator);
tfileIndex = new TFileIndex(tfileMeta.getComparator());
writerBCF = new DTBCFile.Writer(fsdos, compressName, conf);
currentKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
lastKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
this.conf = conf;
}
/**
* Create in mem stream. In it is too small, adjust it's size
*
* @param size
* @return in memory stream
*/
public static BoundedByteArrayOutputStream createBoundedBuffer(int size) {
int resize = Math.max(getBaseCacheSize(), size);
return new BoundedByteArrayOutputStream(resize);
}