下面列出了org.apache.hadoop.mapreduce.OutputFormat#getRecordWriter ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
MapRunner() throws IOException, InterruptedException,
ClassNotFoundException {
// initiate the real mapper that does the work
mapper = ReflectionUtils.newInstance(mapClass,
outer.getConfiguration());
@SuppressWarnings("unchecked")
OutputFormat<K2, V2> outputFormat = (OutputFormat<K2, V2>)
ReflectionUtils.newInstance(outer.getOutputFormatClass(),
outer.getConfiguration());
try {
// outputFormat is not initialized. Relying on everything it
// needs can be obtained from the AssignmentManager singleton.
writer = outputFormat.getRecordWriter(outer);
subcontext = (Context)ReflectionUtil.createMapperContext(
mapper, outer.getConfiguration(), outer.getTaskAttemptID(),
new SubMapRecordReader(), writer,
outer.getOutputCommitter(), new SubMapStatusReporter(),
outer.getInputSplit());
} catch (Exception e) {
throw new IOException("Error creating mapper context", e);
}
}
@Test(enabled = true)
public void testWriteBufferData() throws Exception {
NullWritable nada = NullWritable.get();
MneDurableOutputSession<DurableBuffer<?>> sess =
new MneDurableOutputSession<DurableBuffer<?>>(m_tacontext, null,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
MneDurableOutputValue<DurableBuffer<?>> mdvalue =
new MneDurableOutputValue<DurableBuffer<?>>(sess);
OutputFormat<NullWritable, MneDurableOutputValue<DurableBuffer<?>>> outputFormat =
new MneOutputFormat<MneDurableOutputValue<DurableBuffer<?>>>();
RecordWriter<NullWritable, MneDurableOutputValue<DurableBuffer<?>>> writer =
outputFormat.getRecordWriter(m_tacontext);
DurableBuffer<?> dbuf = null;
Checksum cs = new CRC32();
cs.reset();
for (int i = 0; i < m_reccnt; ++i) {
dbuf = genupdDurableBuffer(sess, cs);
Assert.assertNotNull(dbuf);
writer.write(nada, mdvalue.of(dbuf));
}
m_checksum = cs.getValue();
writer.close(m_tacontext);
sess.close();
}
@Test(enabled = true)
public void testWriteChunkData() throws Exception {
NullWritable nada = NullWritable.get();
MneDurableOutputSession<DurableChunk<?>> sess =
new MneDurableOutputSession<DurableChunk<?>>(m_tacontext, null,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
MneDurableOutputValue<DurableChunk<?>> mdvalue =
new MneDurableOutputValue<DurableChunk<?>>(sess);
OutputFormat<NullWritable, MneDurableOutputValue<DurableChunk<?>>> outputFormat =
new MneOutputFormat<MneDurableOutputValue<DurableChunk<?>>>();
RecordWriter<NullWritable, MneDurableOutputValue<DurableChunk<?>>> writer =
outputFormat.getRecordWriter(m_tacontext);
DurableChunk<?> dchunk = null;
Checksum cs = new CRC32();
cs.reset();
for (int i = 0; i < m_reccnt; ++i) {
dchunk = genupdDurableChunk(sess, cs);
Assert.assertNotNull(dchunk);
writer.write(nada, mdvalue.of(dchunk));
}
m_checksum = cs.getValue();
writer.close(m_tacontext);
sess.close();
}
@Test(enabled = true)
public void testWritePersonData() throws Exception {
NullWritable nada = NullWritable.get();
MneDurableOutputSession<Person<Long>> sess =
new MneDurableOutputSession<Person<Long>>(m_tacontext, null,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
MneDurableOutputValue<Person<Long>> mdvalue =
new MneDurableOutputValue<Person<Long>>(sess);
OutputFormat<NullWritable, MneDurableOutputValue<Person<Long>>> outputFormat =
new MneOutputFormat<MneDurableOutputValue<Person<Long>>>();
RecordWriter<NullWritable, MneDurableOutputValue<Person<Long>>> writer =
outputFormat.getRecordWriter(m_tacontext);
Person<Long> person = null;
for (int i = 0; i < m_reccnt; ++i) {
person = sess.newDurableObjectRecord();
person.setAge((short) m_rand.nextInt(50));
person.setName(String.format("Name: [%s]", Utils.genRandomString()), true);
m_sumage += person.getAge();
writer.write(nada, mdvalue.of(person));
}
writer.close(m_tacontext);
sess.close();
}
@Test(enabled = true)
public void testWriteLongData() throws Exception {
NullWritable nada = NullWritable.get();
MneDurableOutputSession<Long> sess =
new MneDurableOutputSession<Long>(m_tacontext, null,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
MneDurableOutputValue<Long> mdvalue =
new MneDurableOutputValue<Long>(sess);
OutputFormat<NullWritable, MneDurableOutputValue<Long>> outputFormat =
new MneOutputFormat<MneDurableOutputValue<Long>>();
RecordWriter<NullWritable, MneDurableOutputValue<Long>> writer =
outputFormat.getRecordWriter(m_tacontext);
Long val = null;
for (int i = 0; i < m_reccnt; ++i) {
val = m_rand.nextLong();
m_sum += val;
writer.write(nada, mdvalue.of(val));
}
writer.close(m_tacontext);
sess.close();
}
@Override
public StoreFuncInterface createStoreFunc(POStore store)
throws IOException {
StoreFuncInterface storeFunc = store.getStoreFunc();
// call the setStoreLocation on the storeFunc giving it the
// Job. Typically this will result in the OutputFormat of the
// storeFunc storing the output location in the Configuration
// in the Job. The PigOutFormat.setLocation() method will merge
// this modified Configuration into the configuration of the
// Context we have
PigOutputFormat.setLocation(context, store);
OutputFormat<?,?> outputFormat = storeFunc.getOutputFormat();
// create a new record writer
try {
writer = outputFormat.getRecordWriter(context);
} catch (InterruptedException e) {
throw new IOException(e);
}
storeFunc.prepareToWrite(writer);
return storeFunc;
}
private RecordWriter<KeyT, ValueT> initRecordWriter(
OutputFormat<KeyT, ValueT> outputFormatObj, TaskAttemptContext taskAttemptContext)
throws IllegalStateException {
try {
LOG.info(
"Creating new RecordWriter for task {} of Job with id {}.",
taskAttemptContext.getTaskAttemptID().getTaskID().getId(),
taskAttemptContext.getJobID().getJtIdentifier());
return outputFormatObj.getRecordWriter(taskAttemptContext);
} catch (InterruptedException | IOException e) {
throw new IllegalStateException("Unable to create RecordWriter object: ", e);
}
}
public void testBinary() throws IOException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
Path outdir = new Path(System.getProperty("test.build.data", "/tmp"),
"outseq");
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
FileOutputFormat.setOutputPath(job, outdir);
SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job,
IntWritable.class );
SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job,
DoubleWritable.class );
SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK);
BytesWritable bkey = new BytesWritable();
BytesWritable bval = new BytesWritable();
TaskAttemptContext context =
MapReduceTestUtil.createDummyMapTaskAttemptContext(job.getConfiguration());
OutputFormat<BytesWritable, BytesWritable> outputFormat =
new SequenceFileAsBinaryOutputFormat();
OutputCommitter committer = outputFormat.getOutputCommitter(context);
committer.setupJob(job);
RecordWriter<BytesWritable, BytesWritable> writer = outputFormat.
getRecordWriter(context);
IntWritable iwritable = new IntWritable();
DoubleWritable dwritable = new DoubleWritable();
DataOutputBuffer outbuf = new DataOutputBuffer();
LOG.info("Creating data by SequenceFileAsBinaryOutputFormat");
try {
for (int i = 0; i < RECORDS; ++i) {
iwritable = new IntWritable(r.nextInt());
iwritable.write(outbuf);
bkey.set(outbuf.getData(), 0, outbuf.getLength());
outbuf.reset();
dwritable = new DoubleWritable(r.nextDouble());
dwritable.write(outbuf);
bval.set(outbuf.getData(), 0, outbuf.getLength());
outbuf.reset();
writer.write(bkey, bval);
}
} finally {
writer.close(context);
}
committer.commitTask(context);
committer.commitJob(job);
InputFormat<IntWritable, DoubleWritable> iformat =
new SequenceFileInputFormat<IntWritable, DoubleWritable>();
int count = 0;
r.setSeed(seed);
SequenceFileInputFormat.setInputPaths(job, outdir);
LOG.info("Reading data by SequenceFileInputFormat");
for (InputSplit split : iformat.getSplits(job)) {
RecordReader<IntWritable, DoubleWritable> reader =
iformat.createRecordReader(split, context);
MapContext<IntWritable, DoubleWritable, BytesWritable, BytesWritable>
mcontext = new MapContextImpl<IntWritable, DoubleWritable,
BytesWritable, BytesWritable>(job.getConfiguration(),
context.getTaskAttemptID(), reader, null, null,
MapReduceTestUtil.createDummyReporter(),
split);
reader.initialize(split, mcontext);
try {
int sourceInt;
double sourceDouble;
while (reader.nextKeyValue()) {
sourceInt = r.nextInt();
sourceDouble = r.nextDouble();
iwritable = reader.getCurrentKey();
dwritable = reader.getCurrentValue();
assertEquals(
"Keys don't match: " + "*" + iwritable.get() + ":" +
sourceInt + "*",
sourceInt, iwritable.get());
assertTrue(
"Vals don't match: " + "*" + dwritable.get() + ":" +
sourceDouble + "*",
Double.compare(dwritable.get(), sourceDouble) == 0 );
++count;
}
} finally {
reader.close();
}
}
assertEquals("Some records not found", RECORDS, count);
}
public void testBinary() throws IOException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
Path outdir = new Path(System.getProperty("test.build.data", "/tmp"),
"outseq");
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
FileOutputFormat.setOutputPath(job, outdir);
SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job,
IntWritable.class );
SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job,
DoubleWritable.class );
SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK);
BytesWritable bkey = new BytesWritable();
BytesWritable bval = new BytesWritable();
TaskAttemptContext context =
MapReduceTestUtil.createDummyMapTaskAttemptContext(job.getConfiguration());
OutputFormat<BytesWritable, BytesWritable> outputFormat =
new SequenceFileAsBinaryOutputFormat();
OutputCommitter committer = outputFormat.getOutputCommitter(context);
committer.setupJob(job);
RecordWriter<BytesWritable, BytesWritable> writer = outputFormat.
getRecordWriter(context);
IntWritable iwritable = new IntWritable();
DoubleWritable dwritable = new DoubleWritable();
DataOutputBuffer outbuf = new DataOutputBuffer();
LOG.info("Creating data by SequenceFileAsBinaryOutputFormat");
try {
for (int i = 0; i < RECORDS; ++i) {
iwritable = new IntWritable(r.nextInt());
iwritable.write(outbuf);
bkey.set(outbuf.getData(), 0, outbuf.getLength());
outbuf.reset();
dwritable = new DoubleWritable(r.nextDouble());
dwritable.write(outbuf);
bval.set(outbuf.getData(), 0, outbuf.getLength());
outbuf.reset();
writer.write(bkey, bval);
}
} finally {
writer.close(context);
}
committer.commitTask(context);
committer.commitJob(job);
InputFormat<IntWritable, DoubleWritable> iformat =
new SequenceFileInputFormat<IntWritable, DoubleWritable>();
int count = 0;
r.setSeed(seed);
SequenceFileInputFormat.setInputPaths(job, outdir);
LOG.info("Reading data by SequenceFileInputFormat");
for (InputSplit split : iformat.getSplits(job)) {
RecordReader<IntWritable, DoubleWritable> reader =
iformat.createRecordReader(split, context);
MapContext<IntWritable, DoubleWritable, BytesWritable, BytesWritable>
mcontext = new MapContextImpl<IntWritable, DoubleWritable,
BytesWritable, BytesWritable>(job.getConfiguration(),
context.getTaskAttemptID(), reader, null, null,
MapReduceTestUtil.createDummyReporter(),
split);
reader.initialize(split, mcontext);
try {
int sourceInt;
double sourceDouble;
while (reader.nextKeyValue()) {
sourceInt = r.nextInt();
sourceDouble = r.nextDouble();
iwritable = reader.getCurrentKey();
dwritable = reader.getCurrentValue();
assertEquals(
"Keys don't match: " + "*" + iwritable.get() + ":" +
sourceInt + "*",
sourceInt, iwritable.get());
assertTrue(
"Vals don't match: " + "*" + dwritable.get() + ":" +
sourceDouble + "*",
Double.compare(dwritable.get(), sourceDouble) == 0 );
++count;
}
} finally {
reader.close();
}
}
assertEquals("Some records not found", RECORDS, count);
}
private static void validateFileSplits(final List<FileSplit> fileSplits, final Configuration configuration,
final Class<? extends InputFormat<NullWritable, VertexWritable>> inputFormatClass,
final Optional<Class<? extends OutputFormat<NullWritable, VertexWritable>>> outFormatClass) throws Exception {
final InputFormat inputFormat = ReflectionUtils.newInstance(inputFormatClass, configuration);
final TaskAttemptContext job = new TaskAttemptContextImpl(configuration, new TaskAttemptID(UUID.randomUUID().toString(), 0, TaskType.MAP, 0, 0));
int vertexCount = 0;
int outEdgeCount = 0;
int inEdgeCount = 0;
final OutputFormat<NullWritable, VertexWritable> outputFormat = outFormatClass.isPresent() ? ReflectionUtils.newInstance(outFormatClass.get(), configuration) : null;
final RecordWriter<NullWritable, VertexWritable> writer = null == outputFormat ? null : outputFormat.getRecordWriter(job);
boolean foundKeyValue = false;
for (final FileSplit split : fileSplits) {
logger.info("\treading file split {}", split.getPath().getName() + " ({}", split.getStart() + "..." + (split.getStart() + split.getLength()), "{} {} bytes)");
final RecordReader reader = inputFormat.createRecordReader(split, job);
float lastProgress = -1f;
while (reader.nextKeyValue()) {
//System.out.println("" + reader.getProgress() + "> " + reader.getCurrentKey() + ": " + reader.getCurrentValue());
final float progress = reader.getProgress();
assertTrue(progress >= lastProgress);
assertEquals(NullWritable.class, reader.getCurrentKey().getClass());
final VertexWritable vertexWritable = (VertexWritable) reader.getCurrentValue();
if (null != writer) writer.write(NullWritable.get(), vertexWritable);
vertexCount++;
outEdgeCount = outEdgeCount + (int) IteratorUtils.count(vertexWritable.get().edges(Direction.OUT));
inEdgeCount = inEdgeCount + (int) IteratorUtils.count(vertexWritable.get().edges(Direction.IN));
//
final Vertex vertex = vertexWritable.get();
assertEquals(Integer.class, vertex.id().getClass());
if (vertex.value("name").equals("SUGAR MAGNOLIA")) {
foundKeyValue = true;
assertEquals(92, IteratorUtils.count(vertex.edges(Direction.OUT)));
assertEquals(77, IteratorUtils.count(vertex.edges(Direction.IN)));
}
lastProgress = progress;
}
}
assertEquals(8049, outEdgeCount);
assertEquals(8049, inEdgeCount);
assertEquals(outEdgeCount, inEdgeCount);
assertEquals(808, vertexCount);
assertTrue(foundKeyValue);
if (null != writer) {
writer.close(new TaskAttemptContextImpl(configuration, job.getTaskAttemptID()));
for (int i = 1; i < 10; i++) {
final File outputDirectory = new File(new URL(configuration.get("mapreduce.output.fileoutputformat.outputdir")).toURI());
final List<FileSplit> splits = generateFileSplits(new File(outputDirectory.getAbsoluteFile() + "/_temporary/0/_temporary/" + job.getTaskAttemptID().getTaskID().toString().replace("task", "attempt") + "_0" + "/part-m-00000"), i);
validateFileSplits(splits, configuration, inputFormatClass, Optional.empty());
}
}
}
public static void write(String out) throws IOException, ParserException,
InterruptedException, ExecException {
{
StringBuilder schemaString = new StringBuilder("a0: chararray");
for (int i = 1; i < COLUMN_COUNT; i++) {
schemaString.append(", a" + i + ": chararray");
}
String location = out;
String schema = schemaString.toString();
StoreFuncInterface storer = new ParquetStorer();
Job job = new Job(conf);
storer.setStoreFuncUDFContextSignature("sig");
String absPath = storer.relToAbsPathForStoreLocation(location, new Path(new File(".").getAbsoluteFile().toURI()));
storer.setStoreLocation(absPath, job);
storer.checkSchema(new ResourceSchema(Utils.getSchemaFromString(schema)));
@SuppressWarnings("unchecked") // that's how the base class is defined
OutputFormat<Void, Tuple> outputFormat = storer.getOutputFormat();
// it's ContextUtil.getConfiguration(job) and not just conf !
JobContext jobContext = ContextUtil.newJobContext(ContextUtil.getConfiguration(job), new JobID("jt", jobid ++));
outputFormat.checkOutputSpecs(jobContext);
if (schema != null) {
ResourceSchema resourceSchema = new ResourceSchema(Utils.getSchemaFromString(schema));
storer.checkSchema(resourceSchema);
if (storer instanceof StoreMetadata) {
((StoreMetadata)storer).storeSchema(resourceSchema, absPath, job);
}
}
TaskAttemptContext taskAttemptContext = ContextUtil.newTaskAttemptContext(ContextUtil.getConfiguration(job), new TaskAttemptID("jt", jobid, true, 1, 0));
RecordWriter<Void, Tuple> recordWriter = outputFormat.getRecordWriter(taskAttemptContext);
storer.prepareToWrite(recordWriter);
for (int i = 0; i < ROW_COUNT; i++) {
Tuple tuple = TupleFactory.getInstance().newTuple(COLUMN_COUNT);
for (int j = 0; j < COLUMN_COUNT; j++) {
tuple.set(j, "a" + i + "_" + j);
}
storer.putNext(tuple);
}
recordWriter.close(taskAttemptContext);
OutputCommitter outputCommitter = outputFormat.getOutputCommitter(taskAttemptContext);
outputCommitter.commitTask(taskAttemptContext);
outputCommitter.commitJob(jobContext);
}
}