下面列出了org.apache.hadoop.mapred.JobConfigurable#org.apache.hadoop.hive.serde2.Deserializer 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static Deserializer getDeserializer(Configuration configuration, Properties schema)
{
String name = getDeserializerClassName(schema);
// for collection delimiter, Hive 1.x, 2.x uses "colelction.delim" but Hive 3.x uses "collection.delim"
// see also https://issues.apache.org/jira/browse/HIVE-16922
if (name.equals("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) {
if (schema.containsKey("colelction.delim") && !schema.containsKey(COLLECTION_DELIM)) {
schema.put(COLLECTION_DELIM, schema.getProperty("colelction.delim"));
}
}
Deserializer deserializer = createDeserializer(getDeserializerClass(name));
initializeDeserializer(configuration, deserializer, schema);
return deserializer;
}
/**
* First tries getting the {@code FieldSchema}s from the {@code HiveRegistrationUnit}'s columns, if set.
* Else, gets the {@code FieldSchema}s from the deserializer.
*/
private static List<FieldSchema> getFieldSchemas(HiveRegistrationUnit unit) {
List<Column> columns = unit.getColumns();
List<FieldSchema> fieldSchemas = new ArrayList<>();
if (columns != null && columns.size() > 0) {
fieldSchemas = getFieldSchemas(columns);
} else {
Deserializer deserializer = getDeserializer(unit);
if (deserializer != null) {
try {
fieldSchemas = MetaStoreUtils.getFieldsFromDeserializer(unit.getTableName(), deserializer);
} catch (SerDeException | MetaException e) {
LOG.warn("Encountered exception while getting fields from deserializer.", e);
}
}
}
return fieldSchemas;
}
public static StructObjectInspector getTableObjectInspector(Deserializer deserializer)
{
try {
ObjectInspector inspector = deserializer.getObjectInspector();
checkArgument(inspector.getCategory() == Category.STRUCT, "expected STRUCT: %s", inspector.getCategory());
return (StructObjectInspector) inspector;
}
catch (SerDeException e) {
throw new RuntimeException(e);
}
}
private static Deserializer createDeserializer(Class<? extends Deserializer> clazz)
{
try {
return clazz.getConstructor().newInstance();
}
catch (ReflectiveOperationException e) {
throw new RuntimeException("error creating deserializer: " + clazz.getName(), e);
}
}
private static void initializeDeserializer(Configuration configuration, Deserializer deserializer, Properties schema)
{
try {
configuration = copy(configuration); // Some SerDes (e.g. Avro) modify passed configuration
deserializer.initialize(configuration, schema);
validate(deserializer);
}
catch (SerDeException | RuntimeException e) {
throw new RuntimeException("error initializing deserializer: " + deserializer.getClass().getName(), e);
}
}
private static Deserializer createDeserializer(final Class<? extends Deserializer> clazz) {
try {
return clazz.getConstructor().newInstance();
} catch (ReflectiveOperationException e) {
throw new RuntimeException("error creating deserializer: " + clazz.getName(), e);
}
}
@Override
public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer,
ExprNodeDesc predicate) {
if (jobConf.getBoolean(DynamoDBConstants.DYNAMODB_FILTER_PUSHDOWN, true)) {
return new DynamoDBFilterPushdown()
.pushPredicate(HiveDynamoDBUtil.extractHiveTypeMapping(jobConf), predicate);
} else {
return null;
}
}
@Override
public DecomposedPredicate decomposePredicate(JobConf jobConf,
Deserializer deserializer, ExprNodeDesc predicate) {
// TODO: Implement push down to Kudu here.
DecomposedPredicate decomposedPredicate = new DecomposedPredicate();
return decomposedPredicate;
}
@Override
public List<FieldSchema> getFieldsFromDeserializer(Configuration conf, Table table, boolean skipConfError) {
try {
Method utilMethod = getHiveMetaStoreUtilsClass().getMethod("getDeserializer", Configuration.class, Table.class);
Deserializer deserializer = (Deserializer) utilMethod.invoke(null, conf, table);
utilMethod = getHiveMetaStoreUtilsClass().getMethod("getFieldsFromDeserializer", String.class, Deserializer.class);
return (List<FieldSchema>) utilMethod.invoke(null, table.getTableName(), deserializer);
} catch (Exception e) {
throw new CatalogException("Failed to get table schema from deserializer", e);
}
}
@Override
public List<FieldSchema> getFieldsFromDeserializer(Configuration conf, Table table, boolean skipConfError) {
try {
Method utilMethod = getHiveMetaStoreUtilsClass().getMethod("getDeserializer",
Configuration.class, Table.class, boolean.class);
Deserializer deserializer = (Deserializer) utilMethod.invoke(null, conf, table, skipConfError);
utilMethod = getHiveMetaStoreUtilsClass().getMethod("getFieldsFromDeserializer", String.class, Deserializer.class);
return (List<FieldSchema>) utilMethod.invoke(null, table.getTableName(), deserializer);
} catch (Exception e) {
throw new CatalogException("Failed to get table schema from deserializer", e);
}
}
private void checkInitialize() throws Exception {
if (initialized) {
return;
}
JobConf jobConf = confWrapper.conf();
Object serdeLib = Class.forName(serDeInfo.getSerializationLib()).newInstance();
Preconditions.checkArgument(serdeLib instanceof Serializer && serdeLib instanceof Deserializer,
"Expect a SerDe lib implementing both Serializer and Deserializer, but actually got "
+ serdeLib.getClass().getName());
this.recordSerDe = (Serializer) serdeLib;
ReflectionUtils.setConf(recordSerDe, jobConf);
// TODO: support partition properties, for now assume they're same as table properties
SerDeUtils.initializeSerDe((Deserializer) recordSerDe, jobConf, tableProperties, null);
this.formatFields = allColumns.length - partitionColumns.length;
this.hiveConversions = new HiveObjectConversion[formatFields];
this.converters = new DataFormatConverter[formatFields];
List<ObjectInspector> objectInspectors = new ArrayList<>(hiveConversions.length);
for (int i = 0; i < formatFields; i++) {
DataType type = allTypes[i];
ObjectInspector objectInspector = HiveInspectors.getObjectInspector(type);
objectInspectors.add(objectInspector);
hiveConversions[i] = HiveInspectors.getConversion(
objectInspector, type.getLogicalType(), hiveShim);
converters[i] = DataFormatConverters.getConverterForDataType(type);
}
this.formatInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
Arrays.asList(allColumns).subList(0, formatFields),
objectInspectors);
this.initialized = true;
}
@Override
public DecomposedPredicate decomposePredicate(JobConf conf,
Deserializer deserializer,
ExprNodeDesc desc) {
if(conf.get(AccumuloSerde.NO_ITERATOR_PUSHDOWN) == null){
return predicateHandler.decompose(conf, desc);
} else {
log.info("Set to ignore iterator. skipping predicate handler");
return null;
}
}
private static void validate(Deserializer deserializer)
{
if (deserializer instanceof AbstractSerDe && !((AbstractSerDe) deserializer).getConfigurationErrors().isEmpty()) {
throw new RuntimeException("There are configuration errors: " + ((AbstractSerDe) deserializer).getConfigurationErrors());
}
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
try {
StorageDescriptor sd = hiveTablePartition.getStorageDescriptor();
Object serdeLib = Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance();
Preconditions.checkArgument(serdeLib instanceof Serializer && serdeLib instanceof Deserializer,
"Expect a SerDe lib implementing both Serializer and Deserializer, but actually got " + serdeLib.getClass().getName());
recordSerDe = (Serializer) serdeLib;
ReflectionUtils.setConf(recordSerDe, jobConf);
// TODO: support partition properties, for now assume they're same as table properties
SerDeUtils.initializeSerDe((Deserializer) recordSerDe, jobConf, tableProperties, null);
outputClass = recordSerDe.getSerializedClass();
} catch (IllegalAccessException | SerDeException | InstantiationException | ClassNotFoundException e) {
throw new FlinkRuntimeException("Error initializing Hive serializer", e);
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ String.format("%" + (6 - Integer.toString(taskNumber).length()) + "s", " ").replace(" ", "0")
+ taskNumber + "_0");
this.jobConf.set("mapred.task.id", taskAttemptID.toString());
this.jobConf.setInt("mapred.task.partition", taskNumber);
// for hadoop 2.2
this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
this.jobConf.setInt("mapreduce.task.partition", taskNumber);
this.context = new TaskAttemptContextImpl(this.jobConf, taskAttemptID);
if (!isDynamicPartition) {
staticWriter = writerForLocation(hiveTablePartition.getStorageDescriptor().getLocation());
} else {
dynamicPartitionOffset = fieldNames.length - partitionColumns.size() + hiveTablePartition.getPartitionSpec().size();
}
numNonPartitionColumns = isPartitioned ? fieldNames.length - partitionColumns.size() : fieldNames.length;
hiveConversions = new HiveObjectConversion[numNonPartitionColumns];
List<ObjectInspector> objectInspectors = new ArrayList<>(hiveConversions.length);
for (int i = 0; i < numNonPartitionColumns; i++) {
ObjectInspector objectInspector = HiveInspectors.getObjectInspector(fieldTypes[i]);
objectInspectors.add(objectInspector);
hiveConversions[i] = HiveInspectors.getConversion(objectInspector, fieldTypes[i].getLogicalType());
}
if (!isPartitioned) {
rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
Arrays.asList(fieldNames),
objectInspectors);
} else {
rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
Arrays.asList(fieldNames).subList(0, fieldNames.length - partitionColumns.size()),
objectInspectors);
defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
}
}
/**
* @see DBConfiguration#INPUT_CONDITIONS_PROPERTY
*/
@Override
public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc predicate) {
// TODO Auto-generated method stub
return null;
}
@Override
public DecomposedPredicate decomposePredicate(JobConf jobConf,
Deserializer deserializer, ExprNodeDesc predicate) {
// TODO Auto-generated method stub
return null;
}
/**
* Creates an instance of a given serde type
*
* @param serdeClassName the name of the serde class
* @return instance of a given serde
* @throws Exception if an error occurs during the creation of SerDe instance
*/
public static Deserializer createDeserializer(String serdeClassName) throws Exception {
Deserializer deserializer = (Deserializer) Utilities.createAnyInstance(serdeClassName);
return deserializer;
}