下面列出了org.apache.hadoop.mapred.OutputFormat#getRecordWriter ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test(enabled = true)
public void testWriteChunkData() throws Exception {
NullWritable nada = NullWritable.get();
MneDurableOutputSession<DurableChunk<?>> sess =
new MneDurableOutputSession<DurableChunk<?>>(m_tacontext, null,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
MneDurableOutputValue<DurableChunk<?>> mdvalue =
new MneDurableOutputValue<DurableChunk<?>>(sess);
OutputFormat<NullWritable, MneDurableOutputValue<DurableChunk<?>>> outputFormat =
new MneOutputFormat<MneDurableOutputValue<DurableChunk<?>>>();
RecordWriter<NullWritable, MneDurableOutputValue<DurableChunk<?>>> writer =
outputFormat.getRecordWriter(null, m_conf, null, null);
DurableChunk<?> dchunk = null;
Checksum cs = new CRC32();
cs.reset();
for (int i = 0; i < m_reccnt; ++i) {
dchunk = genupdDurableChunk(sess, cs);
Assert.assertNotNull(dchunk);
writer.write(nada, mdvalue.of(dchunk));
}
m_checksum = cs.getValue();
writer.close(null);
sess.close();
}
@Test(enabled = true)
public void testWriteLongData() throws Exception {
NullWritable nada = NullWritable.get();
MneDurableOutputSession<Long> sess =
new MneDurableOutputSession<Long>(m_tacontext, null,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
MneDurableOutputValue<Long> mdvalue =
new MneDurableOutputValue<Long>(sess);
OutputFormat<NullWritable, MneDurableOutputValue<Long>> outputFormat =
new MneOutputFormat<MneDurableOutputValue<Long>>();
RecordWriter<NullWritable, MneDurableOutputValue<Long>> writer =
outputFormat.getRecordWriter(null, m_conf, null, null);
Long val = null;
for (int i = 0; i < m_reccnt; ++i) {
val = m_rand.nextLong();
m_sum += val;
writer.write(nada, mdvalue.of(val));
}
writer.close(null);
sess.close();
}
@Test(enabled = true)
public void testWritePersonData() throws Exception {
NullWritable nada = NullWritable.get();
MneDurableOutputSession<Person<Long>> sess =
new MneDurableOutputSession<Person<Long>>(m_tacontext, null,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
MneDurableOutputValue<Person<Long>> mdvalue =
new MneDurableOutputValue<Person<Long>>(sess);
OutputFormat<NullWritable, MneDurableOutputValue<Person<Long>>> outputFormat =
new MneOutputFormat<MneDurableOutputValue<Person<Long>>>();
RecordWriter<NullWritable, MneDurableOutputValue<Person<Long>>> writer =
outputFormat.getRecordWriter(null, m_conf, null, null);
Person<Long> person = null;
for (int i = 0; i < m_reccnt; ++i) {
person = sess.newDurableObjectRecord();
person.setAge((short) m_rand.nextInt(50));
person.setName(String.format("Name: [%s]", Utils.genRandomString()), true);
m_sumage += person.getAge();
writer.write(nada, mdvalue.of(person));
}
writer.close(null);
sess.close();
}
@Test(enabled = true)
public void testWriteBufferData() throws Exception {
NullWritable nada = NullWritable.get();
MneDurableOutputSession<DurableBuffer<?>> sess =
new MneDurableOutputSession<DurableBuffer<?>>(null, m_conf,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
MneDurableOutputValue<DurableBuffer<?>> mdvalue =
new MneDurableOutputValue<DurableBuffer<?>>(sess);
OutputFormat<NullWritable, MneDurableOutputValue<DurableBuffer<?>>> outputFormat =
new MneOutputFormat<MneDurableOutputValue<DurableBuffer<?>>>();
RecordWriter<NullWritable, MneDurableOutputValue<DurableBuffer<?>>> writer =
outputFormat.getRecordWriter(m_fs, m_conf, null, null);
DurableBuffer<?> dbuf = null;
Checksum cs = new CRC32();
cs.reset();
for (int i = 0; i < m_reccnt; ++i) {
dbuf = genupdDurableBuffer(sess, cs);
Assert.assertNotNull(dbuf);
writer.write(nada, mdvalue.of(dbuf));
}
m_checksum = cs.getValue();
writer.close(null);
sess.close();
}
/**
* @param jobConf Job configuration.
* @param taskCtx Task context.
* @param directWrite Direct write flag.
* @param fileName File name.
* @throws IOException In case of IO exception.
*/
HadoopV1OutputCollector(JobConf jobConf, HadoopTaskContext taskCtx, boolean directWrite,
@Nullable String fileName, TaskAttemptID attempt) throws IOException {
this.jobConf = jobConf;
this.taskCtx = taskCtx;
this.attempt = attempt;
if (directWrite) {
jobConf.set("mapreduce.task.attempt.id", attempt.toString());
OutputFormat outFormat = jobConf.getOutputFormat();
writer = outFormat.getRecordWriter(null, jobConf, fileName, Reporter.NULL);
}
else
writer = null;
}
public RecordWriter<K, V> getRecordWriter( FileSystem fs, JobConf job, String name, Progressable progress )
throws IOException
{
String outputFilename = getOutputFilename( job );
OutputFormat<K,V> of = getOutputFormat( job );
return of.getRecordWriter( fs, job, outputFilename, progress );
}