下面列出了怎么用org.apache.hadoop.io.ObjectWritable的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected void mapNativeValue(
final GeoWaveInputKey key,
final Object value,
final org.apache.hadoop.mapreduce.Mapper<GeoWaveInputKey, ObjectWritable, GeoWaveInputKey, ObjectWritable>.Context context)
throws IOException, InterruptedException {
@SuppressWarnings("unchecked")
final AnalyticItemWrapper<T> wrapper = itemWrapperFactory.create((T) value);
outputKey.setInternalAdapterId(key.getInternalAdapterId());
outputKey.setDataId(
new ByteArray(
StringUtils.stringToBinary(nestedGroupCentroidAssigner.getGroupForLevel(wrapper))));
outputKey.setGeoWaveKey(key.getGeoWaveKey());
context.write(outputKey, currentValue);
}
@Override
protected void setup(
final Reducer<GeoWaveInputKey, ObjectWritable, GeoWaveOutputKey, Object>.Context context)
throws IOException, InterruptedException {
super.setup(context);
internalAdapterStore = GeoWaveOutputFormat.getJobContextInternalAdapterStore(context);
final ScopedJobConfiguration config =
new ScopedJobConfiguration(
context.getConfiguration(),
InputToOutputKeyReducer.class,
LOGGER);
outputKey =
new GeoWaveOutputKey(
"na",
new String[] {config.getString(OutputParameters.Output.INDEX_ID, "na")});
}
@Override
protected void reduceNativeValues(
final GeoWaveInputKey key,
final Iterable<Object> values,
final ReduceContext<GeoWaveInputKey, ObjectWritable, GeoWaveInputKey, Object> context)
throws IOException, InterruptedException {
final Iterator<Object> valIt = values.iterator();
if (valIt.hasNext()) {
key.setInternalAdapterId( // TODO this is a bit of a hack, but the
// adapter is seemingly completely
// transient and never actually
// persisted - it seems unlikely that
// the value for internal adapter ID
// even matters, but if it does this is
// the best effort
InternalAdapterStoreImpl.getLazyInitialAdapterId(outputAdapter.getTypeName()));
final SimpleFeature feature = getSimpleFeature(key, valIt.next());
context.write(key, feature);
}
}
@Override
protected void reduceNativeValues(
final GeoWaveInputKey key,
final Iterable<Object> values,
final Reducer<GeoWaveInputKey, ObjectWritable, GeoWaveOutputKey, T>.Context context)
throws IOException, InterruptedException {
final String groupID = KeyManager.getGroupAsString(key.getDataId().getBytes());
for (final Object value : values) {
final AnalyticItemWrapper<T> sampleItem = itemWrapperFactory.create((T) value);
Integer outputCount = outputCounts.get(groupID);
outputCount = outputCount == null ? Integer.valueOf(0) : outputCount;
if ((outputCount == null) || (outputCount < maxCount)) {
final AnalyticItemWrapper<T> centroid = createCentroid(groupID, sampleItem);
if (centroid != null) {
context.write(
new GeoWaveOutputKey(sampleDataTypeName, indexNames),
centroid.getWrappedItem());
outputCount++;
outputCounts.put(groupID, outputCount);
}
}
}
}
public void testSimpleArray(String typeName, DataType elementDataType, Object[] values, Object[] expected) throws SerDeException {
NiFiRecordSerDe serDe = createSerDe("listc",
"array<" + typeName + ">"
);
RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(
new RecordField("listc", RecordFieldType.ARRAY.getArrayDataType(elementDataType))
));
Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, new HashMap<String, Object>() {{
put("listc", values);
}})));
List<Object> fields = (List<Object>)deserialized;
assertEquals(1, fields.size());
List<Object> nested = (List<Object>) fields.get(0);
for(int i=0; i<expected.length; i++){
assertEquals(expected[i], nested.get(i));
}
}
@Override
protected void configure(final Job job) throws Exception {
job.setJobName("GeoWave Dedupe (" + dataStoreOptions.getGeoWaveNamespace() + ")");
job.setMapperClass(GeoWaveDedupeMapper.class);
job.setCombinerClass(GeoWaveDedupeCombiner.class);
job.setReducerClass(getReducer());
job.setMapOutputKeyClass(GeoWaveInputKey.class);
job.setMapOutputValueClass(ObjectWritable.class);
job.setOutputKeyClass(GeoWaveInputKey.class);
job.setOutputValueClass(ObjectWritable.class);
job.setInputFormatClass(GeoWaveInputFormat.class);
job.setOutputFormatClass(getOutputFormatClass());
job.setNumReduceTasks(getNumReduceTasks());
job.setSpeculativeExecution(false);
try (final FileSystem fs = FileSystem.get(job.getConfiguration())) {
final Path outputPath = getHdfsOutputPath();
fs.delete(outputPath, true);
FileOutputFormat.setOutputPath(job, outputPath);
}
}
/**
* Convert values to ObjectWritable
*/
public void map(Text key, Writable value,
OutputCollector<Text, ObjectWritable> output, Reporter reporter)
throws IOException {
ObjectWritable objWrite = new ObjectWritable();
objWrite.set(value);
output.collect(key, objWrite);
}
/**
* Changes input into ObjectWritables.
*/
public void map(Text key, Writable value,
OutputCollector<Text, ObjectWritable> output, Reporter reporter)
throws IOException {
ObjectWritable objWrite = new ObjectWritable();
objWrite.set(value);
output.collect(key, objWrite);
}
/**
* Runs the inverter job. The inverter job flips outlinks to inlinks to be
* passed into the analysis job.
*
* The inverter job takes a link loops database if it exists. It is an
* optional componenet of link analysis due to its extreme computational and
* space requirements but it can be very useful is weeding out and eliminating
* link farms and other spam pages.
*
* @param nodeDb The node database to use.
* @param outlinkDb The outlink database to use.
* @param loopDb The loop database to use if it exists.
* @param output The output directory.
*
* @throws IOException If an error occurs while running the inverter job.
*/
private void runInverter(Path nodeDb, Path outlinkDb, Path loopDb, Path output)
throws IOException {
// configure the inverter
JobConf inverter = new NutchJob(getConf());
inverter.setJobName("LinkAnalysis Inverter");
FileInputFormat.addInputPath(inverter, nodeDb);
FileInputFormat.addInputPath(inverter, outlinkDb);
// add the loop database if it exists, isn't null
if (loopDb != null) {
FileInputFormat.addInputPath(inverter, loopDb);
}
FileOutputFormat.setOutputPath(inverter, output);
inverter.setInputFormat(SequenceFileInputFormat.class);
inverter.setMapperClass(Inverter.class);
inverter.setReducerClass(Inverter.class);
inverter.setMapOutputKeyClass(Text.class);
inverter.setMapOutputValueClass(ObjectWritable.class);
inverter.setOutputKeyClass(Text.class);
inverter.setOutputValueClass(LinkDatum.class);
inverter.setOutputFormat(SequenceFileOutputFormat.class);
inverter.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
// run the inverter job
LOG.info("Starting inverter job");
try {
JobClient.runJob(inverter);
}
catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
throw e;
}
LOG.info("Finished inverter job.");
}
/**
* Wraps all values in ObjectWritables.
*/
public void map(Text key, Writable value,
OutputCollector<Text, ObjectWritable> output, Reporter reporter)
throws IOException {
ObjectWritable objWrite = new ObjectWritable();
objWrite.set(value);
output.collect(key, objWrite);
}
/**
* Convert values to ObjectWritable
*/
public void map(Text key, Writable value,
OutputCollector<Text, ObjectWritable> output, Reporter reporter)
throws IOException {
ObjectWritable objWrite = new ObjectWritable();
objWrite.set(value);
output.collect(key, objWrite);
}
/**
* Convert values to ObjectWritable
*/
public void map(Text key, Writable value,
OutputCollector<Text, ObjectWritable> output, Reporter reporter)
throws IOException {
ObjectWritable objWrite = new ObjectWritable();
objWrite.set(WritableUtils.clone(value, conf));
output.collect(key, objWrite);
}
/**
* Wraps all values in ObjectWritables.
*/
public void map(Text key, Writable value,
OutputCollector<Text, ObjectWritable> output, Reporter reporter)
throws IOException {
ObjectWritable objWrite = new ObjectWritable();
objWrite.set(value);
output.collect(key, objWrite);
}
/**
* Wraps values in ObjectWritable.
*/
public void map(Text key, Writable value,
OutputCollector<Text, ObjectWritable> output, Reporter reporter)
throws IOException {
ObjectWritable objWrite = new ObjectWritable();
objWrite.set(value);
output.collect(key, objWrite);
}
/**
* Takes any node that has inlinks and sets up a route for all of its
* outlinks. These routes will then be followed to a maximum depth inside of
* the Looper job.
*/
public void reduce(Text key, Iterator<ObjectWritable> values,
OutputCollector<Text, Route> output, Reporter reporter)
throws IOException {
String url = key.toString();
Node node = null;
List<LinkDatum> outlinkList = new ArrayList<LinkDatum>();
// collect all outlinks and assign node
while (values.hasNext()) {
ObjectWritable objWrite = values.next();
Object obj = objWrite.get();
if (obj instanceof LinkDatum) {
outlinkList.add((LinkDatum)obj);
}
else if (obj instanceof Node) {
node = (Node)obj;
}
}
// has to have inlinks otherwise cycle not possible
if (node != null) {
int numInlinks = node.getNumInlinks();
if (numInlinks > 0) {
// initialize and collect a route for every outlink
for (LinkDatum datum : outlinkList) {
String outlinkUrl = datum.getUrl();
Route route = new Route();
route.setFound(false);
route.setLookingFor(url);
route.setOutlinkUrl(outlinkUrl);
output.collect(new Text(outlinkUrl), route);
}
}
}
}
@Override
protected void reduceNativeValues(
final GeoWaveInputKey key,
final Iterable<Object> values,
final ReduceContext<GeoWaveInputKey, ObjectWritable, GeoWaveInputKey, Object> context)
throws IOException, InterruptedException {
final GridCoverage mergedCoverage = helper.getMergedCoverage(key, values);
if (mergedCoverage != null) {
context.write(key, mergedCoverage);
}
}
/**
* Convert values to ObjectWritable
*/
public void map(Text key, Writable value,
OutputCollector<Text, ObjectWritable> output, Reporter reporter)
throws IOException {
ObjectWritable objWrite = new ObjectWritable();
objWrite.set(WritableUtils.clone(value, conf));
output.collect(key, objWrite);
}
@Override
protected void reduceNativeValues(
final GeoWaveInputKey key,
final Iterable<Object> values,
final Reducer<GeoWaveInputKey, ObjectWritable, GeoWaveOutputKey, GridCoverage>.Context context)
throws IOException, InterruptedException {
final GridCoverage mergedCoverage = helper.getMergedCoverage(key, values);
if (mergedCoverage != null) {
context.write(helper.getGeoWaveOutputKey(), mergedCoverage);
}
}
@Override
protected void setup(
final Reducer<GeoWaveInputKey, ObjectWritable, GeoWaveOutputKey, GridCoverage>.Context context)
throws IOException, InterruptedException {
super.setup(context);
helper = new RasterTileResizeHelper(context);
}
public ObjectWritable toWritable(final short adapterId, final Object entry) {
if (entry instanceof Writable) {
objectWritable.set(entry);
} else {
objectWritable.set(getHadoopWritableSerializerForAdapter(adapterId).toWritable(entry));
}
return objectWritable;
}
@Test
public void testStructMap() throws SerDeException{
NiFiRecordSerDe serDe = createSerDe(
"mapc",
"map<string,struct<id:int,balance:decimal(18,2)>>"
);
RecordSchema recordSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("id", RecordFieldType.INT.getDataType()),
new RecordField("balance", RecordFieldType.DOUBLE.getDataType())
));
RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(
new RecordField("mapc", RecordFieldType.MAP.getMapDataType(RecordFieldType.RECORD.getRecordDataType(recordSchema)))
));
HashMap<String, Object> input = new HashMap<String, Object>() {{
put("mapc", new HashMap<String, Object>() {{
put("current", new MapRecord(recordSchema, new HashMap<String, Object>() {{
put("id", 1);
put("balance", 56.9);
}}));
put("savings", new MapRecord(recordSchema, new HashMap<String, Object>() {{
put("id", 2);
put("balance", 104.65);
}}));
}});
}};
Object expected = Collections.singletonList(
new HashMap<String, Object>() {{
put("current", Arrays.asList(1, HiveDecimal.create(56.9)));
put("savings", Arrays.asList(2, HiveDecimal.create(104.65)));
}}
);
Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, input)));
assertEquals(expected, deserialized);
}
@Override
protected void map(
final KEYIN key,
final VALUEIN value,
final Mapper<KEYIN, VALUEIN, GeoWaveInputKey, ObjectWritable>.Context context)
throws IOException, InterruptedException {
mapWritableValue(key, value, context);
}
@Override
public Object deserialize(Writable writable) throws SerDeException {
ObjectWritable t = (ObjectWritable) writable;
Record record = (Record) t.get();
List<Object> result = deserialize(record, schema);
stats.setRowCount(stats.getRowCount() + 1);
return result;
}
@Test
public void testArrays() throws SerDeException {
NiFiRecordSerDe serDe = createSerDe(
"binaryc,binaryc2",
"binary:binary"
);
RecordSchema schema = new SimpleRecordSchema(
Arrays.asList(
new RecordField("binaryc", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())),
new RecordField("binaryc2", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()))
)
);
HashMap<String, Object> input = new HashMap<String, Object>() {{
put("binaryc", new byte[]{1, 2});
put("binaryc2", "Hello");
}};
Object[] expected = new Object[]{
new byte[]{1, 2},
"Hello".getBytes(StandardCharsets.UTF_8)
};
Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, input)));
assertArrayEquals(expected, ((List)deserialized).toArray());
}
public Object encode(Record record) throws SerializationError {
try {
ObjectWritable blob = new ObjectWritable(record);
return serde.deserialize(blob);
} catch (SerDeException e) {
throw new SerializationError("Unable to convert Record into Object", e);
}
}
@Override
protected void reduceNativeValues(
final GeoWaveInputKey key,
final Iterable<Object> values,
final Reducer<GeoWaveInputKey, ObjectWritable, GeoWaveOutputKey, Object>.Context context)
throws IOException, InterruptedException {
outputKey.setTypeName(internalAdapterStore.getTypeName(key.getInternalAdapterId()));
for (final Object value : values) {
context.write(outputKey, value);
}
}
@Override
public void configure(final Job job) throws Exception {
job.setMapperClass(GroupAssignmentMapReduce.GroupAssignmentMapper.class);
job.setMapOutputKeyClass(GeoWaveInputKey.class);
job.setMapOutputValueClass(ObjectWritable.class);
job.setReducerClass(Reducer.class);
job.setOutputKeyClass(GeoWaveInputKey.class);
job.setOutputValueClass(ObjectWritable.class);
}
@Override
public void configure(final Job job) throws Exception {
job.setMapperClass(ConvexHullMapReduce.ConvexHullMap.class);
job.setMapOutputKeyClass(GeoWaveInputKey.class);
job.setMapOutputValueClass(ObjectWritable.class);
job.setReducerClass(ConvexHullMapReduce.ConvexHullReducer.class);
job.setReduceSpeculativeExecution(false);
job.setOutputKeyClass(GeoWaveOutputKey.class);
job.setOutputValueClass(Object.class);
}
@Override
public void configure(final Job job) throws Exception {
job.setMapperClass(Mapper.class);
job.setReducerClass(InputToOutputKeyReducer.class);
job.setMapOutputKeyClass(GeoWaveInputKey.class);
job.setMapOutputValueClass(ObjectWritable.class);
job.setOutputKeyClass(GeoWaveOutputKey.class);
job.setOutputValueClass(Object.class);
job.setSpeculativeExecution(false);
job.setJobName("GeoWave Input to Output");
job.setReduceSpeculativeExecution(false);
}
@Override
protected void reduce(
final GeoWaveInputKey key,
final Iterable<ObjectWritable> values,
final Reducer<GeoWaveInputKey, ObjectWritable, KEYOUT, VALUEOUT>.Context context)
throws IOException, InterruptedException {
reduceWritableValues(key, values, context);
}