下面列出了怎么用org.apache.hadoop.mapred.RecordWriter的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public RecordWriter<NullWritable, OrcSerdeRow> getRecordWriter(FileSystem fileSystem,
JobConf conf, String name, Progressable reporter) throws IOException {
ReaderWriterProfiler.setProfilerOptions(conf);
// To be compatible with older file formats like Sequence and RC
// Only works if mapred.work.output.dir is set in the conf
Path workOutputPath = FileOutputFormat.getWorkOutputPath(conf);
Path outputPath = workOutputPath == null ? new Path(name) : new Path(workOutputPath, name);
if (fileSystem == null && workOutputPath != null) {
fileSystem = workOutputPath.getFileSystem(conf);
}
return new OrcRecordWriter(fileSystem, outputPath, conf,
OrcConf.ConfVars.HIVE_ORC_STRIPE_SIZE.defaultLongVal,
OrcConf.ConfVars.HIVE_ORC_COMPRESSION.defaultVal,
OrcConf.ConfVars.HIVE_ORC_COMPRESSION_BLOCK_SIZE.defaultIntVal,
OrcConf.ConfVars.HIVE_ORC_ROW_INDEX_STRIDE.defaultIntVal);
}
@Override
public RecordWriter<NullWritable,SpreadSheetCellDAO> getRecordWriter(FileSystem ignored, JobConf conf, String name, Progressable progress) throws IOException {
// check if mimeType is set. If not assume new Excel format (.xlsx)
String defaultConf=conf.get(HadoopOfficeWriteConfiguration.CONF_MIMETYPE,ExcelFileOutputFormat.DEFAULT_MIMETYPE);
conf.set(HadoopOfficeWriteConfiguration.CONF_MIMETYPE,defaultConf);
Path file;
if (name!=null) {
file = getTaskOutputPath(conf, name);
// add suffix
file = file.suffix(ExcelFileOutputFormat.getSuffix(conf.get(HadoopOfficeWriteConfiguration.CONF_MIMETYPE)));
} else {
file = getOutputPath(conf);
}
try {
return new ExcelRecordWriter<>(HadoopUtil.getDataOutputStream(conf,file,progress,getCompressOutput(conf),getOutputCompressorClass(conf, ExcelFileOutputFormat.defaultCompressorClass)),file.getName(),conf);
} catch (InvalidWriterConfigurationException | OfficeWriterException e) {
LOG.error(e);
}
return null;
}
@Test(enabled = true)
public void testWriteChunkData() throws Exception {
NullWritable nada = NullWritable.get();
MneDurableOutputSession<DurableChunk<?>> sess =
new MneDurableOutputSession<DurableChunk<?>>(m_tacontext, null,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
MneDurableOutputValue<DurableChunk<?>> mdvalue =
new MneDurableOutputValue<DurableChunk<?>>(sess);
OutputFormat<NullWritable, MneDurableOutputValue<DurableChunk<?>>> outputFormat =
new MneOutputFormat<MneDurableOutputValue<DurableChunk<?>>>();
RecordWriter<NullWritable, MneDurableOutputValue<DurableChunk<?>>> writer =
outputFormat.getRecordWriter(null, m_conf, null, null);
DurableChunk<?> dchunk = null;
Checksum cs = new CRC32();
cs.reset();
for (int i = 0; i < m_reccnt; ++i) {
dchunk = genupdDurableChunk(sess, cs);
Assert.assertNotNull(dchunk);
writer.write(nada, mdvalue.of(dchunk));
}
m_checksum = cs.getValue();
writer.close(null);
sess.close();
}
@Test(enabled = true)
public void testWriteLongData() throws Exception {
NullWritable nada = NullWritable.get();
MneDurableOutputSession<Long> sess =
new MneDurableOutputSession<Long>(m_tacontext, null,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
MneDurableOutputValue<Long> mdvalue =
new MneDurableOutputValue<Long>(sess);
OutputFormat<NullWritable, MneDurableOutputValue<Long>> outputFormat =
new MneOutputFormat<MneDurableOutputValue<Long>>();
RecordWriter<NullWritable, MneDurableOutputValue<Long>> writer =
outputFormat.getRecordWriter(null, m_conf, null, null);
Long val = null;
for (int i = 0; i < m_reccnt; ++i) {
val = m_rand.nextLong();
m_sum += val;
writer.write(nada, mdvalue.of(val));
}
writer.close(null);
sess.close();
}
@Test(enabled = true)
public void testWritePersonData() throws Exception {
NullWritable nada = NullWritable.get();
MneDurableOutputSession<Person<Long>> sess =
new MneDurableOutputSession<Person<Long>>(m_tacontext, null,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
MneDurableOutputValue<Person<Long>> mdvalue =
new MneDurableOutputValue<Person<Long>>(sess);
OutputFormat<NullWritable, MneDurableOutputValue<Person<Long>>> outputFormat =
new MneOutputFormat<MneDurableOutputValue<Person<Long>>>();
RecordWriter<NullWritable, MneDurableOutputValue<Person<Long>>> writer =
outputFormat.getRecordWriter(null, m_conf, null, null);
Person<Long> person = null;
for (int i = 0; i < m_reccnt; ++i) {
person = sess.newDurableObjectRecord();
person.setAge((short) m_rand.nextInt(50));
person.setName(String.format("Name: [%s]", Utils.genRandomString()), true);
m_sumage += person.getAge();
writer.write(nada, mdvalue.of(person));
}
writer.close(null);
sess.close();
}
@Test(enabled = true)
public void testWriteBufferData() throws Exception {
NullWritable nada = NullWritable.get();
MneDurableOutputSession<DurableBuffer<?>> sess =
new MneDurableOutputSession<DurableBuffer<?>>(null, m_conf,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
MneDurableOutputValue<DurableBuffer<?>> mdvalue =
new MneDurableOutputValue<DurableBuffer<?>>(sess);
OutputFormat<NullWritable, MneDurableOutputValue<DurableBuffer<?>>> outputFormat =
new MneOutputFormat<MneDurableOutputValue<DurableBuffer<?>>>();
RecordWriter<NullWritable, MneDurableOutputValue<DurableBuffer<?>>> writer =
outputFormat.getRecordWriter(m_fs, m_conf, null, null);
DurableBuffer<?> dbuf = null;
Checksum cs = new CRC32();
cs.reset();
for (int i = 0; i < m_reccnt; ++i) {
dbuf = genupdDurableBuffer(sess, cs);
Assert.assertNotNull(dbuf);
writer.write(nada, mdvalue.of(dbuf));
}
m_checksum = cs.getValue();
writer.close(null);
sess.close();
}
public RecordWriter<WritableComparable, Writable> getRecordWriter(
final FileSystem fs, JobConf job,
String name, final Progressable progress) throws IOException {
final Path segmentDumpFile = new Path(FileOutputFormat.getOutputPath(job), name);
// Get the old copy out of the way
if (fs.exists(segmentDumpFile)) fs.delete(segmentDumpFile, true);
final PrintStream printStream = new PrintStream(fs.create(segmentDumpFile));
return new RecordWriter<WritableComparable, Writable>() {
public synchronized void write(WritableComparable key, Writable value) throws IOException {
printStream.println(value);
}
public synchronized void close(Reporter reporter) throws IOException {
printStream.close();
}
};
}
public RecordWriter<WritableComparable<?>, Writable> getRecordWriter(
final FileSystem fs, JobConf job,
String name, final Progressable progress) throws IOException {
final Path segmentDumpFile = new Path(FileOutputFormat.getOutputPath(job), name);
// Get the old copy out of the way
if (fs.exists(segmentDumpFile)) fs.delete(segmentDumpFile, true);
final PrintStream printStream = new PrintStream(fs.create(segmentDumpFile));
return new RecordWriter<WritableComparable<?>, Writable>() {
public synchronized void write(WritableComparable<?> key, Writable value) throws IOException {
printStream.println(value);
}
public synchronized void close(Reporter reporter) throws IOException {
printStream.close();
}
};
}
@Override
public RecordWriter<Text, NutchIndexAction> getRecordWriter(
FileSystem ignored, JobConf job, String name, Progressable progress)
throws IOException {
final IndexWriters writers = new IndexWriters(job);
writers.open(job, name);
return new RecordWriter<Text, NutchIndexAction>() {
public void close(Reporter reporter) throws IOException {
writers.close();
}
public void write(Text key, NutchIndexAction indexAction)
throws IOException {
if (indexAction.action == NutchIndexAction.ADD) {
writers.write(indexAction.doc);
} else if (indexAction.action == NutchIndexAction.DELETE) {
writers.delete(key.toString());
}
}
};
}
/** {@inheritDoc} */
public RecordWriter<K, V> getRecordWriter(FileSystem filesystem,
JobConf job, String name, Progressable progress) throws IOException {
DBConfiguration dbConf = new DBConfiguration(job);
String tableName = dbConf.getOutputTableName();
String[] fieldNames = dbConf.getOutputFieldNames();
try {
Connection connection = dbConf.getConnection();
PreparedStatement statement = null;
statement = connection.prepareStatement(constructQuery(tableName, fieldNames));
return new DBRecordWriter(connection, statement);
}
catch (Exception ex) {
throw new IOException(ex.getMessage());
}
}
@Override
public RecordWriter<NullWritable, HDFSTSRecord> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) {
return new RecordWriter<NullWritable, HDFSTSRecord>() {
@Override
public void write(NullWritable key, HDFSTSRecord value) {
throw new RuntimeException("Should not be called");
}
@Override
public void close(Reporter reporter) {
}
};
}
public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress) {
return new RecordWriter<K, V>(){
public void write(K key, V value) { }
public void close(Reporter reporter) { }
};
}
/** {@inheritDoc} */
public RecordWriter<K, V> getRecordWriter(FileSystem filesystem,
JobConf job, String name, Progressable progress) throws IOException {
org.apache.hadoop.mapreduce.RecordWriter<K, V> w = super.getRecordWriter(
new TaskAttemptContextImpl(job,
TaskAttemptID.forName(job.get(MRJobConfig.TASK_ATTEMPT_ID))));
org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter writer =
(org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter) w;
try {
return new DBRecordWriter(writer.getConnection(), writer.getStatement());
} catch(SQLException se) {
throw new IOException(se);
}
}
@Override
protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs,
JobConf job,
String name,
Progressable arg3)
throws IOException {
if (theSequenceFileOutputFormat == null) {
theSequenceFileOutputFormat = new SequenceFileOutputFormat<K,V>();
}
return theSequenceFileOutputFormat.getRecordWriter(fs, job, name, arg3);
}
/**
* {@inheritDoc}
*/
@Override
public RecordWriter<Key, VALUE> getRecordWriter(FileSystem fs, JobConf job,
String name, Progressable progress) throws IOException {
// TODO progress
return new MapRedGfxdRecordWriter(job);
}
@Override
public RecordWriter<Object, Object> getRecordWriter(
FileSystem ignored, JobConf job, String name, Progressable progress)
throws IOException {
ClientCache cache = getClientCacheInstance(job);
return new GFRecordWriter(cache, job);
}
public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress) {
return new RecordWriter<K, V>(){
public void write(K key, V value) { }
public void close(Reporter reporter) { }
};
}
public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress) {
return new RecordWriter<K, V>(){
public void write(K key, V value) { }
public void close(Reporter reporter) { }
};
}
@Override
protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job,
String name, Progressable arg3) throws IOException {
if (theTextOutputFormat == null) {
theTextOutputFormat = new TextOutputFormat<K, V>();
}
return theTextOutputFormat.getRecordWriter(fs, job, name, arg3);
}
/** {@inheritDoc} */
public RecordWriter<K, V> getRecordWriter(FileSystem filesystem,
JobConf job, String name, Progressable progress) throws IOException {
org.apache.hadoop.mapreduce.RecordWriter<K, V> w = super.getRecordWriter(
new TaskAttemptContextImpl(job,
TaskAttemptID.forName(job.get(MRJobConfig.TASK_ATTEMPT_ID))));
org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter writer =
(org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter) w;
try {
return new DBRecordWriter(writer.getConnection(), writer.getStatement());
} catch(SQLException se) {
throw new IOException(se);
}
}
@Override
protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs,
JobConf job,
String name,
Progressable arg3)
throws IOException {
if (theSequenceFileOutputFormat == null) {
theSequenceFileOutputFormat = new SequenceFileOutputFormat<K,V>();
}
return theSequenceFileOutputFormat.getRecordWriter(fs, job, name, arg3);
}
@Override
public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress) throws IOException {
if (baseOut == null) {
getBaseOutputFormat(job);
}
return new LazyRecordWriter<K, V>(job, baseOut, name, progress);
}
@Override
public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, Class<? extends Writable> valueClass,
boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException {
FileSystem fs = finalOutPath.getFileSystem(jc);
HiveExcelCellFileOutputFormat.setOutputPath(jc, finalOutPath);
RecordWriter<?, ?> recordWriter = this.getRecordWriter(fs, jc, null, progress);
return new HivePassThroughRecordWriter(recordWriter);
}
@Override
public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc,
Path finalOutPath, Class<? extends Writable> valueClass, boolean isCompressed, Properties tableProperties,
Progressable progress) throws IOException {
FileSystem fs = finalOutPath.getFileSystem(jc);
HiveExcelRowFileOutputFormat.setOutputPath(jc, finalOutPath);
RecordWriter<?, ?> recordWriter = this.getRecordWriter(fs, jc, null, progress);
return new HivePassThroughRecordWriter(recordWriter);
}
@Override
public RecordWriter<NullWritable, ArrayWritable> getRecordWriter(FileSystem ignored, JobConf conf, String name,
Progressable progress) throws IOException {
// check if mimeType is set. If not assume new Excel format (.xlsx)
String defaultConf = conf.get(HadoopOfficeWriteConfiguration.CONF_MIMETYPE,
ExcelFileOutputFormat.DEFAULT_MIMETYPE);
conf.set(HadoopOfficeWriteConfiguration.CONF_MIMETYPE, defaultConf);
Path file;
if (name!=null) {
file = getTaskOutputPath(conf, name);
// add suffix
file = file.suffix(ExcelFileOutputFormat.getSuffix(conf.get(HadoopOfficeWriteConfiguration.CONF_MIMETYPE)));
} else {
file = getOutputPath(conf);
}
try {
return new ExcelRowRecordWriter<>(
HadoopUtil.getDataOutputStream(conf, file, progress, getCompressOutput(conf),
getOutputCompressorClass(conf, ExcelFileOutputFormat.defaultCompressorClass)),
file.getName(), conf);
} catch (InvalidWriterConfigurationException | OfficeWriterException e) {
LOG.error(e);
}
return null;
}
@Override
public RecordWriter<K, Text> getRecordWriter(FileSystem ignored, JobConf job, String name,
Progressable progress) throws IOException {
String extension = "";
Path file = FileOutputFormat.getTaskOutputPath(job, MANIFEST_FILENAME);
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
if (getCompressOutput(job)) {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
extension = codec.getDefaultExtension();
}
return new ExportManifestRecordWriter<>(fileOut, FileOutputFormat.getOutputPath(job),
extension);
}
@Test
public void close_whenPoolCapacityExceeded_theeAllFilesClosed() throws IOException,
InterruptedException {
// Grab the flush lock, this means that only I can flush files.
flushLock.lock();
// Enqueue more than the thread pool can handle. This will enqueue
// pool-size tasks and then run the remainder on this current thread
// since we're using the CallerRuns policy.
for (int i = 0; i < ExportFileFlusher.FILE_FLUSHER_POOL_SIZE * 2; i++) {
RecordWriter<Integer, Integer> rw = mock(RecordWriter.class);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
flushLock.lock();
try {
closedFiles.incrementAndGet();
} finally {
flushLock.unlock();
}
return null;
}
}).when(rw).close(Reporter.NULL);
flusher.close(rw, Reporter.NULL);
}
// Half should have been closed, the other half should still be open.
Thread.sleep(250); // synchronize by sleep!
assertEquals(ExportFileFlusher.FILE_FLUSHER_POOL_SIZE, closedFiles.get());
// Release the flush lock, sync the flusher
flushLock.unlock();
flusher.sync();
assertEquals(ExportFileFlusher.FILE_FLUSHER_POOL_SIZE * 2, closedFiles.get());
}
@Test
public void close_whenIOE_thenConsecutiveCloseCallFails() throws IOException,
InterruptedException {
doThrow(new IOException()).when(recordWriter).close(Reporter.NULL);
flusher.close(recordWriter, Reporter.NULL);
expectedException.expect(IOException.class);
verify(recordWriter, timeout(250).times(1)).close(Reporter.NULL);
flusher.close(mock(RecordWriter.class), Reporter.NULL);
}
@Test
public void close_whenRTE_thenConsecutiveCloseCallFails() throws IOException,
InterruptedException {
doThrow(new RuntimeException()).when(recordWriter).close(Reporter.NULL);
flusher.close(recordWriter, Reporter.NULL);
expectedException.expect(RuntimeException.class);
verify(recordWriter, timeout(250).times(1)).close(Reporter.NULL);
flusher.close(mock(RecordWriter.class), Reporter.NULL);
}
@Test
public void testGetRecordWriterWithCompression() throws IOException {
ExportOutputFormat.setCompressOutput(conf, true);
ExportOutputFormat.setOutputCompressorClass(conf, GzipCodec.class);
ExportOutputFormat outputFormat = new ExportOutputFormat();
RecordWriter<NullWritable, DynamoDBItemWritable> recordWriter
= outputFormat.getRecordWriter(mockFileSystem, conf, EXPECTED_FILENAME, mockProgressable);
assertNotNull(recordWriter);
String expectedFilePath =
tempDir.getRoot().getAbsolutePath() + Path.SEPARATOR + EXPECTED_FILENAME + ".gz";
assertTrue(new File(expectedFilePath).exists());
}