下面列出了org.apache.hadoop.io.SequenceFile#Metadata ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private SequenceFile.Writer getSequenceFileWriter(Configuration configuration, FileSystem fs,
long timeInMillis, byte versionNumber) throws IOException {
String snapshotDir = configuration.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timeInMillis);
SequenceFile.Metadata metadata = new SequenceFile.Metadata();
if (versionNumber > 1) {
metadata.set(new Text(TxConstants.TransactionLog.VERSION_KEY),
new Text(Byte.toString(versionNumber)));
}
switch (versionNumber) {
case 1:
case 2:
return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class,
co.cask.tephra.persist.TransactionEdit.class,
SequenceFile.CompressionType.NONE, null, null, metadata);
default:
return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class,
TransactionEdit.class, SequenceFile.CompressionType.NONE,
null, null, metadata);
}
}
/**
* return a mapping of expected keys -> records
*/
private HashMap<String, Record> createTextSequenceFile(File file, int numRecords) throws IOException {
HashMap<String, Record> map = new HashMap<String, Record>();
SequenceFile.Metadata metadata = new SequenceFile.Metadata(getMetadataForSequenceFile());
FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(file), null);
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(new Configuration(), out, Text.class, Text.class,
SequenceFile.CompressionType.NONE, null, metadata);
for (int i = 0; i < numRecords; ++i) {
Text key = new Text("key" + i);
Text value = new Text("value" + i);
writer.append(key, value);
Record record = new Record();
record.put("key", key);
record.put("value", value);
map.put(key.toString(), record);
}
} finally {
Closeables.closeQuietly(writer);
}
return map;
}
/**
* return a mapping of expected keys -> records
*/
private HashMap<String, Record> createMyWritableSequenceFile(File file, int numRecords) throws IOException {
HashMap<String, Record> map = new HashMap<String, Record>();
SequenceFile.Metadata metadata = new SequenceFile.Metadata(getMetadataForSequenceFile());
FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(file), null);
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(new Configuration(), out, Text.class, ParseTextMyWritableBuilder.MyWritable.class,
SequenceFile.CompressionType.NONE, null, metadata);
for (int i = 0; i < numRecords; ++i) {
Text key = new Text("key" + i);
ParseTextMyWritableBuilder.MyWritable value = new ParseTextMyWritableBuilder.MyWritable("value", i);
writer.append(key, value);
Record record = new Record();
record.put("key", key);
record.put("value", value);
map.put(key.toString(), record);
}
} finally {
Closeables.closeQuietly(writer);
}
return map;
}
LogWriter(FileSystem fs, Configuration hConf, Path logPath) throws IOException {
// TODO: retry a few times to ride over transient failures?
SequenceFile.Metadata metadata = new SequenceFile.Metadata();
metadata.set(new Text(TxConstants.TransactionLog.VERSION_KEY),
new Text(Byte.toString(TxConstants.TransactionLog.CURRENT_VERSION)));
this.internalWriter = SequenceFile.createWriter(fs, hConf, logPath, LongWritable.class, TransactionEdit.class,
SequenceFile.CompressionType.NONE, null, null, metadata);
LOG.debug("Created a new TransactionLog writer for " + logPath);
}
@Override
protected boolean doProcess(Record record, InputStream in) throws IOException {
Path attachmentPath = getAttachmentPath(record);
SingleStreamFileSystem fs = new SingleStreamFileSystem(in, attachmentPath);
RCFile.Reader reader = null;
try {
reader = new RCFile.Reader(fs, attachmentPath, conf);
Record template = record.copy();
removeAttachments(template);
template.put(Fields.ATTACHMENT_MIME_TYPE, OUTPUT_MEDIA_TYPE);
if (includeMetaData) {
SequenceFile.Metadata metadata = reader.getMetadata();
if (metadata != null) {
template.put(RC_FILE_META_DATA, metadata);
}
}
switch (readMode) {
case row:
return readRowWise(reader, template);
case column:
return readColumnWise(reader, template);
default :
throw new IllegalStateException();
}
} catch (IOException e) {
throw new MorphlineRuntimeException("IOException while processing attachment "
+ attachmentPath.getName(), e);
} finally {
if (reader != null) {
reader.close();
}
}
}
private void createRCFile(final String fileName, final int numRecords,
final int maxColumns, boolean addNullValue) throws IOException {
// Write the sequence file
SequenceFile.Metadata metadata = getMetadataForRCFile();
Configuration conf = new Configuration();
conf.set(RCFile.COLUMN_NUMBER_CONF_STR, String.valueOf(maxColumns));
Path inputFile = dfs.makeQualified(new Path(testDirectory, fileName));
RCFile.Writer rcFileWriter = new RCFile.Writer(dfs, conf, inputFile, null,
metadata, null);
for (int row = 0; row < numRecords; row++) {
BytesRefArrayWritable dataWrite = new BytesRefArrayWritable(maxColumns);
dataWrite.resetValid(maxColumns);
for (int column = 0; column < maxColumns; column++) {
Writable sampleText = new Text(
"ROW-NUM:" + row + ", COLUMN-NUM:" + column);
// Set the last column of the last row as null
if (addNullValue && column == maxColumns - 1 && row == numRecords - 1) {
sampleText = NullWritable.get();
}
ByteArrayDataOutput dataOutput = ByteStreams.newDataOutput();
sampleText.write(dataOutput);
dataWrite.set(column, new BytesRefWritable(dataOutput.toByteArray()));
}
rcFileWriter.append(dataWrite);
}
rcFileWriter.close();
}
@SuppressWarnings("deprecation")
protected final void writeBinaryBlockMatrixToSequenceFile( Path path, JobConf job, FileSystem fs, MatrixBlock src, int blen, int rl, int ru )
throws IOException
{
boolean sparse = src.isInSparseFormat();
int rlen = src.getNumRows();
int clen = src.getNumColumns();
// 1) create sequence file writer, with right replication factor
// (config via MRConfigurationNames.DFS_REPLICATION not possible since sequence file internally calls fs.getDefaultReplication())
SequenceFile.Writer writer = null;
if( _replication > 0 ) //if replication specified (otherwise default)
{
//copy of SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class), except for replication
writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class,
job.getInt(HDFSTool.IO_FILE_BUFFER_SIZE, 4096),
(short)_replication, fs.getDefaultBlockSize(), null, new SequenceFile.Metadata());
}
else
{
writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class);
}
try
{
// 2) bound check for src block
if( src.getNumRows() > rlen || src.getNumColumns() > clen )
{
throw new IOException("Matrix block [1:"+src.getNumRows()+",1:"+src.getNumColumns()+"] " +
"out of overall matrix range [1:"+rlen+",1:"+clen+"].");
}
//3) reblock and write
MatrixIndexes indexes = new MatrixIndexes();
if( rlen <= blen && clen <= blen && rl == 0 ) //opt for single block
{
//directly write single block
indexes.setIndexes(1, 1);
writer.append(indexes, src);
}
else //general case
{
//initialize blocks for reuse (at most 4 different blocks required)
MatrixBlock[] blocks = createMatrixBlocksForReuse(rlen, clen, blen, sparse, src.getNonZeros());
//create and write subblocks of matrix
for(int blockRow = rl/blen; blockRow < (int)Math.ceil(ru/(double)blen); blockRow++)
for(int blockCol = 0; blockCol < (int)Math.ceil(src.getNumColumns()/(double)blen); blockCol++)
{
int maxRow = (blockRow*blen + blen < src.getNumRows()) ? blen : src.getNumRows() - blockRow*blen;
int maxCol = (blockCol*blen + blen < src.getNumColumns()) ? blen : src.getNumColumns() - blockCol*blen;
int row_offset = blockRow*blen;
int col_offset = blockCol*blen;
//get reuse matrix block
MatrixBlock block = getMatrixBlockForReuse(blocks, maxRow, maxCol, blen);
//copy submatrix to block
src.slice( row_offset, row_offset+maxRow-1,
col_offset, col_offset+maxCol-1, block );
//append block to sequence file
indexes.setIndexes(blockRow+1, blockCol+1);
writer.append(indexes, block);
//reset block for later reuse
block.reset();
}
}
}
finally {
IOUtilFunctions.closeSilently(writer);
}
}
@SuppressWarnings("deprecation")
protected final void writeDiagBinaryBlockMatrixToHDFS( Path path, JobConf job, FileSystem fs, MatrixBlock src, long rlen, long clen, int blen )
throws IOException, DMLRuntimeException
{
boolean sparse = src.isInSparseFormat();
// 1) create sequence file writer, with right replication factor
// (config via MRConfigurationNames.DFS_REPLICATION not possible since sequence file internally calls fs.getDefaultReplication())
SequenceFile.Writer writer = null;
if( _replication > 0 ) //if replication specified (otherwise default)
{
//copy of SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class), except for replication
writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class,
job.getInt(HDFSTool.IO_FILE_BUFFER_SIZE, 4096),
(short)_replication, fs.getDefaultBlockSize(), null, new SequenceFile.Metadata());
}
else
{
writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class);
}
try
{
// 2) bound check for src block
if( src.getNumRows() > rlen || src.getNumColumns() > clen )
{
throw new IOException("Matrix block [1:"+src.getNumRows()+",1:"+src.getNumColumns()+"] " +
"out of overall matrix range [1:"+rlen+",1:"+clen+"].");
}
//3) reblock and write
MatrixIndexes indexes = new MatrixIndexes();
if( rlen <= blen && clen <= blen ) //opt for single block
{
//directly write single block
indexes.setIndexes(1, 1);
writer.append(indexes, src);
}
else //general case
{
//initialize blocks for reuse (at most 4 different blocks required)
MatrixBlock[] blocks = createMatrixBlocksForReuse(rlen, clen, blen, sparse, src.getNonZeros());
MatrixBlock emptyBlock = new MatrixBlock();
//create and write subblocks of matrix
for(int blockRow = 0; blockRow < (int)Math.ceil(src.getNumRows()/(double)blen); blockRow++)
for(int blockCol = 0; blockCol < (int)Math.ceil(src.getNumColumns()/(double)blen); blockCol++)
{
int maxRow = (blockRow*blen + blen < src.getNumRows()) ? blen : src.getNumRows() - blockRow*blen;
int maxCol = (blockCol*blen + blen < src.getNumColumns()) ? blen : src.getNumColumns() - blockCol*blen;
MatrixBlock block = null;
if( blockRow==blockCol ) //block on diagonal
{
int row_offset = blockRow*blen;
int col_offset = blockCol*blen;
//get reuse matrix block
block = getMatrixBlockForReuse(blocks, maxRow, maxCol, blen);
//copy submatrix to block
src.slice( row_offset, row_offset+maxRow-1,
col_offset, col_offset+maxCol-1, block );
}
else //empty block (not on diagonal)
{
block = emptyBlock;
block.reset(maxRow, maxCol);
}
//append block to sequence file
indexes.setIndexes(blockRow+1, blockCol+1);
writer.append(indexes, block);
//reset block for later reuse
if( blockRow!=blockCol )
block.reset();
}
}
}
finally {
IOUtilFunctions.closeSilently(writer);
}
}
@SuppressWarnings("deprecation")
protected final void writeBinaryBlockMatrixToSequenceFile( Path path, JobConf job, FileSystem fs, MatrixBlock src, int blen, int rl, int ru )
throws IOException
{
boolean sparse = src.isInSparseFormat();
int rlen = src.getNumRows();
int clen = src.getNumColumns();
// 1) create sequence file writer, with right replication factor
// (config via MRConfigurationNames.DFS_REPLICATION not possible since sequence file internally calls fs.getDefaultReplication())
SequenceFile.Writer writer = null;
if( _replication > 0 ) //if replication specified (otherwise default)
{
//copy of SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class), except for replication
writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class,
job.getInt(HDFSTool.IO_FILE_BUFFER_SIZE, 4096),
(short)_replication, fs.getDefaultBlockSize(), null, new SequenceFile.Metadata());
}
else
{
writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class);
}
try
{
// 2) bound check for src block
if( src.getNumRows() > rlen || src.getNumColumns() > clen )
{
throw new IOException("Matrix block [1:"+src.getNumRows()+",1:"+src.getNumColumns()+"] " +
"out of overall matrix range [1:"+rlen+",1:"+clen+"].");
}
//3) reblock and write
MatrixIndexes indexes = new MatrixIndexes();
if( rlen <= blen && clen <= blen && rl == 0 ) //opt for single block
{
//directly write single block
indexes.setIndexes(1, 1);
writer.append(indexes, src);
}
else //general case
{
//initialize blocks for reuse (at most 4 different blocks required)
MatrixBlock[] blocks = createMatrixBlocksForReuse(rlen, clen, blen, sparse, src.getNonZeros());
//create and write subblocks of matrix
for(int blockRow = rl/blen; blockRow < (int)Math.ceil(ru/(double)blen); blockRow++)
for(int blockCol = 0; blockCol < (int)Math.ceil(src.getNumColumns()/(double)blen); blockCol++)
{
int maxRow = (blockRow*blen + blen < src.getNumRows()) ? blen : src.getNumRows() - blockRow*blen;
int maxCol = (blockCol*blen + blen < src.getNumColumns()) ? blen : src.getNumColumns() - blockCol*blen;
int row_offset = blockRow*blen;
int col_offset = blockCol*blen;
//get reuse matrix block
MatrixBlock block = getMatrixBlockForReuse(blocks, maxRow, maxCol, blen);
//copy submatrix to block
src.slice( row_offset, row_offset+maxRow-1,
col_offset, col_offset+maxCol-1, block );
//append block to sequence file
indexes.setIndexes(blockRow+1, blockCol+1);
writer.append(indexes, block);
//reset block for later reuse
block.reset();
}
}
}
finally {
IOUtilFunctions.closeSilently(writer);
}
}
@SuppressWarnings("deprecation")
protected final void writeDiagBinaryBlockMatrixToHDFS( Path path, JobConf job, FileSystem fs, MatrixBlock src, long rlen, long clen, int blen )
throws IOException, DMLRuntimeException
{
boolean sparse = src.isInSparseFormat();
// 1) create sequence file writer, with right replication factor
// (config via MRConfigurationNames.DFS_REPLICATION not possible since sequence file internally calls fs.getDefaultReplication())
SequenceFile.Writer writer = null;
if( _replication > 0 ) //if replication specified (otherwise default)
{
//copy of SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class), except for replication
writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class,
job.getInt(HDFSTool.IO_FILE_BUFFER_SIZE, 4096),
(short)_replication, fs.getDefaultBlockSize(), null, new SequenceFile.Metadata());
}
else
{
writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class);
}
try
{
// 2) bound check for src block
if( src.getNumRows() > rlen || src.getNumColumns() > clen )
{
throw new IOException("Matrix block [1:"+src.getNumRows()+",1:"+src.getNumColumns()+"] " +
"out of overall matrix range [1:"+rlen+",1:"+clen+"].");
}
//3) reblock and write
MatrixIndexes indexes = new MatrixIndexes();
if( rlen <= blen && clen <= blen ) //opt for single block
{
//directly write single block
indexes.setIndexes(1, 1);
writer.append(indexes, src);
}
else //general case
{
//initialize blocks for reuse (at most 4 different blocks required)
MatrixBlock[] blocks = createMatrixBlocksForReuse(rlen, clen, blen, sparse, src.getNonZeros());
MatrixBlock emptyBlock = new MatrixBlock();
//create and write subblocks of matrix
for(int blockRow = 0; blockRow < (int)Math.ceil(src.getNumRows()/(double)blen); blockRow++)
for(int blockCol = 0; blockCol < (int)Math.ceil(src.getNumColumns()/(double)blen); blockCol++)
{
int maxRow = (blockRow*blen + blen < src.getNumRows()) ? blen : src.getNumRows() - blockRow*blen;
int maxCol = (blockCol*blen + blen < src.getNumColumns()) ? blen : src.getNumColumns() - blockCol*blen;
MatrixBlock block = null;
if( blockRow==blockCol ) //block on diagonal
{
int row_offset = blockRow*blen;
int col_offset = blockCol*blen;
//get reuse matrix block
block = getMatrixBlockForReuse(blocks, maxRow, maxCol, blen);
//copy submatrix to block
src.slice( row_offset, row_offset+maxRow-1,
col_offset, col_offset+maxCol-1, block );
}
else //empty block (not on diagonal)
{
block = emptyBlock;
block.reset(maxRow, maxCol);
}
//append block to sequence file
indexes.setIndexes(blockRow+1, blockCol+1);
writer.append(indexes, block);
//reset block for later reuse
if( blockRow!=blockCol )
block.reset();
}
}
}
finally {
IOUtilFunctions.closeSilently(writer);
}
}
@Override
protected boolean doProcess(Record inputRecord, final InputStream in) throws IOException {
SequenceFile.Metadata sequenceFileMetaData = null;
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(conf, SequenceFile.Reader.stream(new FSDataInputStream(new ForwardOnlySeekable(in))));
if (includeMetaData) {
sequenceFileMetaData = reader.getMetadata();
}
Class keyClass = reader.getKeyClass();
Class valueClass = reader.getValueClass();
Record template = inputRecord.copy();
removeAttachments(template);
while (true) {
Writable key = (Writable)ReflectionUtils.newInstance(keyClass, conf);
Writable val = (Writable)ReflectionUtils.newInstance(valueClass, conf);
try {
if (!reader.next(key, val)) {
break;
}
} catch (EOFException ex) {
// SequenceFile.Reader will throw an EOFException after reading
// all the data, if it doesn't know the length. Since we are
// passing in an InputStream, we hit this case;
LOG.trace("Received expected EOFException", ex);
break;
}
incrementNumRecords();
Record outputRecord = template.copy();
outputRecord.put(keyField, key);
outputRecord.put(valueField, val);
outputRecord.put(Fields.ATTACHMENT_MIME_TYPE, OUTPUT_MEDIA_TYPE);
if (includeMetaData && sequenceFileMetaData != null) {
outputRecord.put(SEQUENCE_FILE_META_DATA, sequenceFileMetaData);
}
// pass record to next command in chain:
if (!getChild().process(outputRecord)) {
return false;
}
}
} finally {
Closeables.closeQuietly(reader);
}
return true;
}
private SequenceFile.Metadata getMetadataForRCFile() {
return RCFile.createMetadata(new Text("metaField"), new Text("metaValue"));
}