下面列出了怎么用org.apache.hadoop.io.MapWritable的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void collect(Configuration conf, BaseDimension key, BaseStatsValueWritable value, PreparedStatement pstmt, IDimensionConverter converter) throws SQLException, IOException {
StatsUserDimension statsUser = (StatsUserDimension) key;
MapWritableValue mapWritableValue = (MapWritableValue) value;
MapWritable map = mapWritableValue.getValue();
// hourly_active_user
int i = 0;
pstmt.setInt(++i, converter.getDimensionIdByValue(statsUser.getStatsCommon().getPlatform()));
pstmt.setInt(++i, converter.getDimensionIdByValue(statsUser.getStatsCommon().getDate()));
pstmt.setInt(++i, converter.getDimensionIdByValue(statsUser.getStatsCommon().getKpi())); // 根据kpi
// 设置每个小时的情况
for (i++; i < 28; i++) {
int v = ((IntWritable)map.get(new IntWritable(i - 4))).get();
pstmt.setInt(i, v);
pstmt.setInt(i + 25, v);
}
pstmt.setString(i, conf.get(GlobalConstants.RUNNING_DATE_PARAMS));
pstmt.addBatch();
}
@Override
protected void reduce(StatsUserDimension key, Iterable<TimeOutputValue> values, Context context)
throws IOException, InterruptedException {
this.unique.clear();
//开始计算uuid的个数
for (TimeOutputValue value : values) {
this.unique.add(value.getId());
}
MapWritable map = new MapWritable();
map.put(new IntWritable(-1), new IntWritable(this.unique.size()));
//设置kpi名称
String kpiName = key.getStatsCommon().getKpi().getKpiName();
if (KpiType.NEW_INSTALL_USER.name.equals(kpiName)) {
//计算stats_user表中的新增用户
outputValue.setKpi(KpiType.NEW_INSTALL_USER);
} else if (KpiType.BROWSER_NEW_INSTALL_USER.name.equals(kpiName)) {
//计算stats_device_browser的新增用户
outputValue.setKpi(KpiType.BROWSER_NEW_INSTALL_USER);
}
outputValue.setValue(map);
context.write(key, outputValue);
}
@Override
protected void reduce(StatsUserDimension key, Iterable<TimeOutputValue> values, Context context) throws IOException, InterruptedException {
this.unique.clear();
// 开始计算uuid的个数
for (TimeOutputValue value : values) {
this.unique.add(value.getId());//uid,用户ID
}
MapWritable map = new MapWritable();//相当于java中HashMap
map.put(new IntWritable(-1), new IntWritable(this.unique.size()));
outputValue.setValue(map);
// 设置kpi名称
String kpiName = key.getStatsCommon().getKpi().getKpiName();
if (KpiType.NEW_INSTALL_USER.name.equals(kpiName)) {
// 计算stats_user表中的新增用户
outputValue.setKpi(KpiType.NEW_INSTALL_USER);
} else if (KpiType.BROWSER_NEW_INSTALL_USER.name.equals(kpiName)) {
// 计算stats_device_browser表中的新增用户
outputValue.setKpi(KpiType.BROWSER_NEW_INSTALL_USER);
}
context.write(key, outputValue);
}
@Override
public void collect(Configuration conf, BaseDimension key, BaseStatsValueWritable value, PreparedStatement pstmt, IDimensionConverter converter) throws SQLException, IOException {
StatsUserDimension statsUser = (StatsUserDimension) key;
MapWritableValue mapWritableValue = (MapWritableValue) value;
MapWritable map = mapWritableValue.getValue();
// hourly_active_user
int i = 0;
pstmt.setInt(++i, converter.getDimensionIdByValue(statsUser.getStatsCommon().getPlatform()));
pstmt.setInt(++i, converter.getDimensionIdByValue(statsUser.getStatsCommon().getDate()));
pstmt.setInt(++i, converter.getDimensionIdByValue(statsUser.getStatsCommon().getKpi())); // 根据kpi
// 设置每个小时的情况
for (i++; i < 28; i++) {
int v = ((IntWritable)map.get(new IntWritable(i - 4))).get();
pstmt.setInt(i, v);
pstmt.setInt(i + 25, v);
}
pstmt.setString(i, conf.get(GlobalConstants.RUNNING_DATE_PARAMES));
pstmt.addBatch();
}
/**
* {@inheritDoc}
*/
@Override
protected void handleReportForDutyResponse(final MapWritable c)
throws IOException {
super.handleReportForDutyResponse(c);
initializeTHLog();
String n = Thread.currentThread().getName();
UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
public void uncaughtException(final Thread t, final Throwable e) {
abort("Set stop flag in " + t.getName(), e);
LOG.fatal("Set stop flag in " + t.getName(), e);
}
};
setDaemonThreadRunning(this.cleanOldTransactionsThread, n
+ ".oldTransactionCleaner", handler);
setDaemonThreadRunning(this.transactionLeases,
"Transactional leases");
}
@Override
protected void reduce(Text key, Iterable<MapWritable> values,
Reducer<Text, MapWritable, NullWritable, HDFSTSRecord>.Context context)
throws IOException, InterruptedException {
long sensor1_value_sum = 0;
long sensor2_value_sum = 0;
double sensor3_value_sum = 0;
long num = 0;
for (MapWritable value : values) {
num++;
sensor1_value_sum += ((LongWritable) value.get(new Text(Constant.SENSOR_1))).get();
sensor2_value_sum += ((LongWritable) value.get(new Text(Constant.SENSOR_2))).get();
sensor3_value_sum += ((DoubleWritable) value.get(new Text(Constant.SENSOR_3))).get();
}
HDFSTSRecord tsRecord = new HDFSTSRecord(1L, key.toString());
if (num != 0) {
DataPoint dPoint1 = new LongDataPoint(Constant.SENSOR_1, sensor1_value_sum / num);
DataPoint dPoint2 = new LongDataPoint(Constant.SENSOR_2, sensor2_value_sum / num);
DataPoint dPoint3 = new DoubleDataPoint(Constant.SENSOR_3, sensor3_value_sum / num);
tsRecord.addTuple(dPoint1);
tsRecord.addTuple(dPoint2);
tsRecord.addTuple(dPoint3);
}
context.write(NullWritable.get(), tsRecord);
}
@Test
public void generateEventHiveRecordLimited() throws Exception {
Map<Writable, Writable> map = new MapWritable();
map.put(new Text("one"), new IntWritable(1));
map.put(new Text("two"), new IntWritable(2));
map.put(new Text("three"), new IntWritable(3));
HiveType tuple = new HiveType(map, TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(
TypeInfoFactory.getMapTypeInfo(TypeInfoFactory.stringTypeInfo, TypeInfoFactory.intTypeInfo)));
SerializationEventConverter eventConverter = new SerializationEventConverter();
SerializationFailure iaeFailure = new SerializationFailure(new IllegalArgumentException("garbage"), tuple, new ArrayList<String>());
String rawEvent = eventConverter.getRawEvent(iaeFailure);
assertThat(rawEvent, startsWith("HiveType{[email protected]"));
String timestamp = eventConverter.getTimestamp(iaeFailure);
assertTrue(StringUtils.hasText(timestamp));
assertTrue(DateUtils.parseDate(timestamp).getTime().getTime() > 1L);
String exceptionType = eventConverter.renderExceptionType(iaeFailure);
assertEquals("illegal_argument_exception", exceptionType);
String exceptionMessage = eventConverter.renderExceptionMessage(iaeFailure);
assertEquals("garbage", exceptionMessage);
String eventMessage = eventConverter.renderEventMessage(iaeFailure);
assertEquals("Could not construct bulk entry from record", eventMessage);
}
@Test
public void generateEventWritable() throws Exception {
MapWritable document = new MapWritable();
document.put(new Text("field"), new Text("value"));
SerializationEventConverter eventConverter = new SerializationEventConverter();
SerializationFailure iaeFailure = new SerializationFailure(new IllegalArgumentException("garbage"), document, new ArrayList<String>());
String rawEvent = eventConverter.getRawEvent(iaeFailure);
assertThat(rawEvent, Matchers.startsWith("[email protected]"));
String timestamp = eventConverter.getTimestamp(iaeFailure);
assertTrue(StringUtils.hasText(timestamp));
assertTrue(DateUtils.parseDate(timestamp).getTime().getTime() > 1L);
String exceptionType = eventConverter.renderExceptionType(iaeFailure);
assertEquals("illegal_argument_exception", exceptionType);
String exceptionMessage = eventConverter.renderExceptionMessage(iaeFailure);
assertEquals("garbage", exceptionMessage);
String eventMessage = eventConverter.renderEventMessage(iaeFailure);
assertEquals("Could not construct bulk entry from record", eventMessage);
}
/**
* Pulls the correct selector from the MapWritable data element given the queryType
* <p>
* Pulls first element of array if element is an array type
*/
public static String getSelectorByQueryType(MapWritable dataMap, QuerySchema qSchema, DataSchema dSchema)
{
String selector;
String fieldName = qSchema.getSelectorName();
if (dSchema.isArrayElement(fieldName))
{
if (dataMap.get(dSchema.getTextName(fieldName)) instanceof WritableArrayWritable)
{
String[] selectorArray = ((WritableArrayWritable) dataMap.get(dSchema.getTextName(fieldName))).toStrings();
selector = selectorArray[0];
}
else
{
String[] elementArray = ((ArrayWritable) dataMap.get(dSchema.getTextName(fieldName))).toStrings();
selector = elementArray[0];
}
}
else
{
selector = dataMap.get(dSchema.getTextName(fieldName)).toString();
}
return selector;
}
@Override
public Boolean call(MapWritable dataElement) throws Exception
{
accum.incNumRecordsReceived(1);
// Perform the filter
boolean passFilter = ((DataFilter) filter).filterDataElement(dataElement, dSchema);
if (passFilter)
{
accum.incNumRecordsAfterFilter(1);
}
else
// false, then we filter out the record
{
accum.incNumRecordsFiltered(1);
}
return passFilter;
}
private JobConf createReadJobConf() throws IOException {
JobConf conf = HdpBootstrap.hadoopConfig();
conf.setInputFormat(EsInputFormat.class);
conf.setOutputFormat(PrintStreamOutputFormat.class);
conf.setOutputKeyClass(Text.class);
boolean type = random.nextBoolean();
Class<?> mapType = (type ? MapWritable.class : LinkedMapWritable.class);
conf.setOutputValueClass(MapWritable.class);
HadoopCfgUtils.setGenericOptions(conf);
conf.setNumReduceTasks(0);
conf.set(ConfigurationOptions.ES_READ_METADATA, String.valueOf(random.nextBoolean()));
conf.set(ConfigurationOptions.ES_READ_METADATA_VERSION, String.valueOf(true));
conf.set(ConfigurationOptions.ES_OUTPUT_JSON, "true");
FileInputFormat.setInputPaths(conf, new Path(MRSuite.testData.gibberishDat(conf)));
return conf;
}
private JobConf createJobConf() throws IOException {
JobConf conf = HdpBootstrap.hadoopConfig();
conf.setInputFormat(EsInputFormat.class);
conf.setOutputFormat(PrintStreamOutputFormat.class);
conf.setOutputKeyClass(Text.class);
boolean type = random.nextBoolean();
Class<?> mapType = (type ? MapWritable.class : LinkedMapWritable.class);
conf.setOutputValueClass(mapType);
HadoopCfgUtils.setGenericOptions(conf);
conf.set(ConfigurationOptions.ES_QUERY, query);
conf.setNumReduceTasks(0);
conf.set(ConfigurationOptions.ES_READ_METADATA, String.valueOf(readMetadata));
conf.set(ConfigurationOptions.ES_READ_METADATA_VERSION, String.valueOf(true));
conf.set(ConfigurationOptions.ES_OUTPUT_JSON, String.valueOf(readAsJson));
new QueryTestParams(tempFolder).provisionQueries(conf);
FileInputFormat.setInputPaths(conf, new Path(MRSuite.testData.sampleArtistsDatUri()));
HdpBootstrap.addProperties(conf, TestSettings.TESTING_PROPS, false);
return conf;
}
private void populateMap(SortedMap<ByteBuffer, IColumn> cvalue, MapWritable value)
{
for (Map.Entry<ByteBuffer, IColumn> e : cvalue.entrySet())
{
ByteBuffer k = e.getKey();
IColumn v = e.getValue();
if (!v.isLive()) {
continue;
}
BytesWritable newKey = convertByteBuffer(k);
BytesWritable newValue = convertByteBuffer(v.value());
value.put(newKey, newValue);
}
}
@Override
public Object deserialize(Writable w) throws SerDeException {
if (!(w instanceof MapWritable)) {
throw new SerDeException(getClass().getName() + ": expects MapWritable not "+w.getClass().getName());
}
MapWritable columnMap = (MapWritable) w;
cachedCassandraRow.init(columnMap, cassandraColumnNames, cassandraColumnNamesBytes);
return cachedCassandraRow;
}
@Test
public void testMapFieldExtractorNested() throws Exception {
ConstantFieldExtractor cfe = new MapWritableFieldExtractor();
Map<Writable, Writable> m = new MapWritable();
MapWritable nested = new MapWritable();
nested.put(new Text("bar"), new Text("found"));
m.put(new Text("foo"), nested);
assertEquals(new Text("found"), extract(cfe, "foo.bar", m));
}
@Override
protected void map(NullWritable key, MapWritable value,
Mapper<NullWritable, MapWritable, Text, MapWritable>.Context context)
throws IOException, InterruptedException {
Text deltaObjectId = (Text) value.get(new Text("device_id"));
long timestamp = ((LongWritable) value.get(new Text("time_stamp"))).get();
if (timestamp % 100000 == 0) {
context.write(deltaObjectId, new MapWritable(value));
}
}
public KafkaKey(KafkaKey other) {
this.partition = other.partition;
this.beginOffset = other.beginOffset;
this.offset = other.offset;
this.checksum = other.checksum;
this.topic = other.topic;
this.time = other.time;
this.server = other.server;
this.service = other.service;
this.partitionMap = new MapWritable(other.partitionMap);
}
@Override
public boolean next(BytesWritable key, MapWritable value) throws IOException {
if (!nextKeyValue()) {
return false;
}
key.set(getCurrentKey());
value.clear();
value.putAll(getCurrentValue());
return true;
}
@Test
public void testGetRecordReader() {
try {
RecordReader<NullWritable, MapWritable> recordReader = inputFormat.getRecordReader(inputSplit, job, null);
assertTrue(recordReader instanceof TSFHiveRecordReader);
} catch (IOException e) {
e.printStackTrace();
fail();
}
}
/**
* Converts a Map of Strings into a Writable and writes it.
*
* @param map
* @param output
* @throws IOException
*/
public static void writeMap(Map<String,String> map, DataOutput output) throws IOException {
MapWritable mw = new MapWritable();
for (Map.Entry<String,String> entry : map.entrySet()) {
mw.put(new Text(entry.getKey()), new Text(entry.getValue()));
}
mw.write(output);
}
@Override
protected void map(LongWritable key, MapWritable value,
Context context)
throws
IOException, InterruptedException {
for (java.util.Map.Entry<Writable, Writable> entry : value
.entrySet()) {
context.write((Text) entry.getKey(), (Text) entry.getValue());
}
}
public DiscoveredThing(String term, String field, String type, String date, String columnVisibility, long count, MapWritable countsByColumnVisibility) {
this.term = term;
this.field = field;
this.type = type;
this.date = date;
this.columnVisibility = columnVisibility;
this.count = new VLongWritable(count);
this.countsByColumnVisibility = countsByColumnVisibility;
}
@Override
public void map(Object key, MapWritable value, Context context)
throws IOException, InterruptedException {
for (Map.Entry<Writable, Writable> entry : value.entrySet()) {
word.set(entry.getValue().toString());
context.write(word, ONE);
}
}
@Test
public void testMapWritableFieldExtractorTopLevel() throws Exception {
ConstantFieldExtractor cfe = new MapWritableFieldExtractor();
Map<Writable, Writable> m = new MapWritable();
m.put(new Text("key"), new Text("value"));
assertEquals(new Text("value"), extract(cfe, "key", m));
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
// Instantiate a copy of the user's class to hold and parse the record.
String recordClassName = conf.get(
ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
if (null == recordClassName) {
throw new IOException("Export table class name ("
+ ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
+ ") is not set!");
}
try {
Class cls = Class.forName(recordClassName, true,
Thread.currentThread().getContextClassLoader());
recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
if (null == recordImpl) {
throw new IOException("Could not instantiate object of type "
+ recordClassName);
}
columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP,
MapWritable.class);
}
@Test
public void testAvroWithAllColumnsSpecified() throws Exception {
SqoopOptions opts = new SqoopOptions();
opts.setExportDir("myexportdir");
String[] columns = { "Age", "Name", "Gender" };
opts.setColumns(columns);
JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE);
Job job = new Job();
jdbcExportJob.configureInputFormat(job, null, null, null);
assertEquals(asSetOfText("Age", "Name", "Gender"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet());
}
@Test
public void testAvroWithOneColumnSpecified() throws Exception {
SqoopOptions opts = new SqoopOptions();
opts.setExportDir("myexportdir");
String[] columns = { "Gender" };
opts.setColumns(columns);
JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE);
Job job = new Job();
jdbcExportJob.configureInputFormat(job, null, null, null);
assertEquals(asSetOfText("Gender"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet());
}
@Test
public void testAvroWithSomeColumnsSpecified() throws Exception {
SqoopOptions opts = new SqoopOptions();
opts.setExportDir("myexportdir");
String[] columns = { "Age", "Name" };
opts.setColumns(columns);
JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE);
Job job = new Job();
jdbcExportJob.configureInputFormat(job, null, null, null);
assertEquals(asSetOfText("Age", "Name"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet());
}
@Test
public void testAvroWithMoreColumnsSpecified() throws Exception {
SqoopOptions opts = new SqoopOptions();
opts.setExportDir("myexportdir");
String[] columns = { "Age", "Name", "Gender", "Address" };
opts.setColumns(columns);
JdbcExportJob jdbcExportJob = stubJdbcExportJob(opts, FileType.AVRO_DATA_FILE);
Job job = new Job();
jdbcExportJob.configureInputFormat(job, null, null, null);
assertEquals(asSetOfText("Age", "Name", "Gender"), DefaultStringifier.load(job.getConfiguration(), AvroExportMapper.AVRO_COLUMN_TYPES_MAP, MapWritable.class).keySet());
}
public void writeMap(MapWritable mw) throws IOException {
out.writeMapHeader(mw.size());
for (Map.Entry<Writable, Writable> entry : mw.entrySet()) {
write(entry.getKey());
write(entry.getValue());
}
}