下面列出了org.apache.hadoop.hbase.mapreduce.TableOutputFormat#org.apache.pig.impl.util.ObjectSerializer 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void setConf(Configuration conf) {
try {
mAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.sortOrder"));
mSecondaryAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.secondarySortOrder"));
mIsSecondarySort = true;
} catch (IOException ioe) {
mLog.error("Unable to deserialize sort order object" + ioe.getMessage());
throw new RuntimeException(ioe);
}
if (mAsc == null) {
mAsc = new boolean[1];
mAsc[0] = true;
}
if (mSecondaryAsc == null) {
mIsSecondarySort = false;
}
// If there's only one entry in mAsc, it means it's for the whole
// tuple. So we can't be looking for each column.
mWholeTuple = (mAsc.length == 1);
mFact = TupleFactory.getInstance();
mSedes = InterSedesFactory.getInterSedesInstance();
}
@Override
public void setConf(Configuration conf) {
try {
mAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.sortOrder"));
} catch (IOException ioe) {
mLog.error("Unable to deserialize pig.sortOrder " + ioe.getMessage());
throw new RuntimeException(ioe);
}
if (mAsc == null) {
mAsc = new boolean[1];
mAsc[0] = true;
}
// If there's only one entry in mAsc, it means it's for the whole
// tuple. So we can't be looking for each column.
mWholeTuple = (mAsc.length == 1);
mFact = TupleFactory.getInstance();
}
@Override
public RequiredFieldResponse pushProjection(
RequiredFieldList requiredFieldList) throws FrontendException {
if (requiredFieldList == null)
return null;
if (requiredFieldList.getFields() != null)
{
int schemaSize = ((StructTypeInfo)typeInfo).getAllStructFieldTypeInfos().size();
mRequiredColumns = new boolean[schemaSize];
for (RequiredField rf: requiredFieldList.getFields())
{
if (rf.getIndex()!=-1)
mRequiredColumns[rf.getIndex()] = true;
}
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
try {
p.setProperty(signature + RequiredColumnsSuffix, ObjectSerializer.serialize(mRequiredColumns));
} catch (Exception e) {
throw new RuntimeException("Cannot serialize mRequiredColumns");
}
}
return new RequiredFieldResponse(true);
}
@Override
@SuppressWarnings("unchecked")
public void setConf(Configuration conf) {
super.setConf(conf);
try {
this.mapStores = (List<POStore>) ObjectSerializer.deserialize(conf
.get(JobControlCompiler.PIG_MAP_STORES));
this.reduceStores = (List<POStore>) ObjectSerializer.deserialize(conf
.get(JobControlCompiler.PIG_REDUCE_STORES));
this.loads = (ArrayList<FileSpec>) ObjectSerializer.deserialize(conf
.get("pig.inputs"));
this.disableCounter = conf.getBoolean("pig.disable.counter", false);
} catch (IOException e) {
LOG.warn("Failed to deserialize the store list", e);
}
}
private void initRightLoader(int [] splitsToBeRead) throws IOException{
PigContext pc = (PigContext) ObjectSerializer
.deserialize(PigMapReduce.sJobConfInternal.get().get("pig.pigContext"));
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
// Hadoop security need this property to be set
if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
conf.set(MRConfiguration.JOB_CREDENTIALS_BINARY,
System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
}
//create ReadToEndLoader that will read the given splits in order
loader = new ReadToEndLoader((LoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec),
conf, inpLocation, splitsToBeRead);
}
private static POStore configureStorer(JobConf jobConf,
PhysicalOperator physicalOperator) throws IOException {
ArrayList<POStore> storeLocations = Lists.newArrayList();
POStore poStore = (POStore) physicalOperator;
storeLocations.add(poStore);
StoreFuncInterface sFunc = poStore.getStoreFunc();
sFunc.setStoreLocation(poStore.getSFile().getFileName(),
new org.apache.hadoop.mapreduce.Job(jobConf));
poStore.setInputs(null);
poStore.setParentPlan(null);
jobConf.set(JobControlCompiler.PIG_MAP_STORES,
ObjectSerializer.serialize(Lists.newArrayList()));
jobConf.set(JobControlCompiler.PIG_REDUCE_STORES,
ObjectSerializer.serialize(storeLocations));
return poStore;
}
private void addCombiner(PhysicalPlan combinePlan, TezOperator pkgTezOp,
Configuration conf) throws IOException {
POPackage combPack = (POPackage) combinePlan.getRoots().get(0);
POLocalRearrange combRearrange = (POLocalRearrange) combinePlan
.getLeaves().get(0);
setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf, pkgTezOp);
LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(
combinePlan, pkgTezOp, combPack);
lrDiscoverer.visit();
combinePlan.remove(combPack);
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS,
MRCombiner.class.getName());
conf.set(MRJobConfig.COMBINE_CLASS_ATTR,
PigCombiner.Combine.class.getName());
conf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
conf.set("udf.import.list",
ObjectSerializer.serialize(PigContext.getPackageImportList()));
conf.set("pig.combinePlan", ObjectSerializer.serialize(combinePlan));
conf.set("pig.combine.package", ObjectSerializer.serialize(combPack));
conf.set("pig.map.keytype", ObjectSerializer
.serialize(new byte[] { combRearrange.getKeyType() }));
}
/**
* Pass loader signature to LoadFunc and to InputFormat through
* the conf
* @param loadFunc the Loadfunc to set the signature on
* @param inputIndex the index of the input corresponding to the loadfunc
* @param conf the Configuration object into which the signature should be
* set
* @throws IOException on failure
*/
@SuppressWarnings("unchecked")
static void passLoadSignature(LoadFunc loadFunc, int inputIndex,
Configuration conf) throws IOException {
List<String> inpSignatureLists =
(ArrayList<String>)ObjectSerializer.deserialize(
conf.get("pig.inpSignatures"));
// signature can be null for intermediate jobs where it will not
// be required to be passed down
if(inpSignatureLists.get(inputIndex) != null) {
loadFunc.setUDFContextSignature(inpSignatureLists.get(inputIndex));
conf.set("pig.loader.signature", inpSignatureLists.get(inputIndex));
}
MapRedUtil.setupUDFContext(conf);
}
@Override
public void setStoreLocation(String location, Job job) throws IOException {
if (location.startsWith("hbase://")){
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, location.substring(8));
}else{
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, location);
}
String serializedSchema = getUDFProperties().getProperty(contextSignature + "_schema");
if (serializedSchema!= null) {
schema_ = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
}
m_conf = initializeLocalJobConfig(job);
// Not setting a udf property and getting the hbase delegation token
// only once like in setLocation as setStoreLocation gets different Job
// objects for each call and the last Job passed is the one that is
// launched. So we end up getting multiple hbase delegation tokens.
addHBaseDelegationToken(m_conf, job);
}
private void testPredicatePushdownLocal(String filterStmt, int expectedRows) throws IOException {
PigServer pigServer_disabledRule = new PigServer(ExecType.LOCAL);
// Test with PredicatePushdownOptimizer disabled.
HashSet<String> disabledOptimizerRules = new HashSet<String>();
disabledOptimizerRules.add("PredicatePushdownOptimizer");
pigServer_disabledRule.getPigContext().getProperties().setProperty(PigImplConstants.PIG_OPTIMIZER_RULES_KEY,
ObjectSerializer.serialize(disabledOptimizerRules));
pigServer_disabledRule.registerQuery("B = load '" + INPUT + "' using OrcStorage();");
pigServer_disabledRule.registerQuery("C = filter B by " + filterStmt + ";");
// Test with PredicatePushdownOptimizer enabled.
pigServer.registerQuery("D = load '" + INPUT + "' using OrcStorage();");
pigServer.registerQuery("E = filter D by " + filterStmt + ";");
//Verify that results are same
Util.checkQueryOutputs(pigServer_disabledRule.openIterator("C"), pigServer.openIterator("E"), expectedRows);
}
@Test
public void testSortOrder() throws IOException {
// prototype < t but we use inverse sort order
list.set(2, (Double) list.get(2) + 0.1);
NullableTuple t = new NullableTuple(tf.newTuple(list));
JobConf jobConf = new JobConf();
jobConf.set("pig.sortOrder", ObjectSerializer.serialize(new boolean[] {false}));
comparator.setConf(jobConf);
int res = compareHelper(prototype, t, comparator);
assertEquals(-1 * Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res > 0);
jobConf.set("pig.sortOrder", ObjectSerializer.serialize(new boolean[] {true,true,false,true,true,true,true,true,true}));
comparator.setConf(jobConf);
res = compareHelper(prototype, t, comparator);
assertEquals(-1 * Math.signum(prototype.compareTo(t)), Math.signum(res), 0);
assertTrue(res > 0);
}
@Override
public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
// Save reader to use in getNext()
this.reader = reader;
splitIndex = split.getSplitIndex();
// Get schema from front-end
UDFContext udfc = UDFContext.getUDFContext();
Properties p = udfc.getUDFProperties(this.getClass(), new String[] { udfContextSignature });
String strSchema = p.getProperty(SCHEMA_SIGNATURE);
if (strSchema == null) {
throw new IOException("Could not find schema in UDF context");
}
schema = new ResourceSchema(Utils.getSchemaFromString(strSchema));
requiredFields = (boolean[]) ObjectSerializer.deserialize(p.getProperty(REQUIRED_FIELDS_SIGNATURE));
if (requiredFields != null) {
numRequiredFields = 0;
for (int i = 0; i < requiredFields.length; i++) {
if (requiredFields[i])
numRequiredFields++;
}
}
}
@Override
public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList) throws FrontendException {
if (requiredFieldList == null)
return null;
if (fields != null && requiredFieldList.getFields() != null)
{
requiredFields = new boolean[fields.length];
for (RequiredField f : requiredFieldList.getFields()) {
requiredFields[f.getIndex()] = true;
}
UDFContext udfc = UDFContext.getUDFContext();
Properties p = udfc.getUDFProperties(this.getClass(), new String[]{ udfContextSignature });
try {
p.setProperty(REQUIRED_FIELDS_SIGNATURE, ObjectSerializer.serialize(requiredFields));
} catch (Exception e) {
throw new RuntimeException("Cannot serialize requiredFields for pushProjection");
}
}
return new RequiredFieldResponse(true);
}
@Override
@SuppressWarnings("unchecked")
public List<InputSplit> getSplits(JobContext context) throws IOException {
if (splits != null) {
LOG.info("Returning cached splits: {}", splits.size());
return splits;
}
splits = Lists.newArrayList();
TableScan scan = table.newScan();
//Apply Filters
Expression filterExpression =
(Expression) ObjectSerializer.deserialize(context.getConfiguration().get(scope(ICEBERG_FILTER_EXPRESSION)));
LOG.info("[{}]: iceberg filter expressions: {}", signature, filterExpression);
if (filterExpression != null) {
LOG.info("Filter Expression: {}", filterExpression);
scan = scan.filter(filterExpression);
}
//Wrap in Splits
try (CloseableIterable<CombinedScanTask> tasks = scan.planTasks()) {
tasks.forEach(scanTask -> splits.add(new IcebergSplit(scanTask)));
}
return splits;
}
@Override
@SuppressWarnings("unchecked")
public List<InputSplit> getSplits(JobContext context) throws IOException {
if (splits != null) {
LOG.info("Returning cached splits: " + splits.size());
return splits;
}
splits = Lists.newArrayList();
TableScan scan = table.newScan();
//Apply Filters
Expression filterExpression = (Expression) ObjectSerializer.deserialize(context.getConfiguration().get(ICEBERG_FILTER_EXPRESSION));
if (filterExpression != null) {
LOG.info("Filter Expression: " + filterExpression);
scan = scan.filter(filterExpression);
}
//Wrap in Splits
try (CloseableIterable<CombinedScanTask> tasks = scan.planTasks()) {
tasks.forEach((scanTask) -> splits.add(new IcebergSplit(scanTask)));
}
return splits;
}
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
this.reader = reader;
final String resourceSchemaAsStr = getValueFromUDFContext(this.contextSignature,RESOURCE_SCHEMA_SIGNATURE);
if (resourceSchemaAsStr == null) {
throw new IOException("Could not find schema in UDF context");
}
schema = (ResourceSchema)ObjectSerializer.deserialize(resourceSchemaAsStr);
}
@Override
public ResourceSchema getSchema(String location, Job job) throws IOException {
if(schema != null) {
return schema;
}
final Configuration configuration = job.getConfiguration();
this.initializePhoenixPigConfiguration(location, configuration);
this.schema = PhoenixPigSchemaUtil.getResourceSchema(this.config);
if(LOG.isDebugEnabled()) {
LOG.debug(String.format("Resource Schema generated for location [%s] is [%s]", location, schema.toString()));
}
this.storeInUDFContext(this.contextSignature, RESOURCE_SCHEMA_SIGNATURE, ObjectSerializer.serialize(schema));
return schema;
}
/**
* @param configuration configuration for the current job
* @return List of required fields from pushProjection
*/
static RequiredFieldList getRequiredFields(Configuration configuration) {
String requiredFieldString = configuration.get(PARQUET_PIG_REQUIRED_FIELDS);
if(requiredFieldString == null) {
return null;
}
try {
return (RequiredFieldList) ObjectSerializer.deserialize(requiredFieldString);
} catch (IOException iOException) {
throw new RuntimeException("Failed to deserialize pushProjection");
}
}
public static RequiredFieldList deserializeRequiredFieldList(String requiredFieldString) {
if(requiredFieldString == null) {
return null;
}
try {
return (RequiredFieldList) ObjectSerializer.deserialize(requiredFieldString);
} catch (IOException e) {
throw new RuntimeException("Failed to deserialize pushProjection", e);
}
}
static String serializeRequiredFieldList(RequiredFieldList requiredFieldList) {
try {
return ObjectSerializer.serialize(requiredFieldList);
} catch (IOException e) {
throw new RuntimeException("Failed to searlize required fields.", e);
}
}
@Override
public void prepare() {
String s;
try {
s = ObjectSerializer.serialize(schema);
} catch (IOException e) {
throw new RuntimeException("Unable to serialize schema: " + schema, e);
}
add("private static Schema schema = staticSchemaGen(\"" + s + "\");");
}
protected static Schema staticSchemaGen(String s) {
try {
if (s.equals("")) {
Log.warn("No Schema present in SchemaTuple generated class");
return new Schema();
}
return (Schema) ObjectSerializer.deserialize(s);
} catch (IOException e) {
throw new RuntimeException("Unable to deserialize serialized Schema: " + s, e);
}
}
@Override
public void checkSchema(ResourceSchema rs) throws IOException {
ResourceFieldSchema fs = new ResourceFieldSchema();
fs.setType(DataType.TUPLE);
fs.setSchema(rs);
typeInfo = OrcUtils.getTypeInfo(fs);
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
p.setProperty(signature + SchemaSignatureSuffix, ObjectSerializer.serialize(typeInfo));
}
@Override
public void setLocation(String location, Job job) throws IOException {
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
if (!UDFContext.getUDFContext().isFrontend()) {
typeInfo = (TypeInfo)ObjectSerializer.deserialize(p.getProperty(signature + SchemaSignatureSuffix));
} else if (typeInfo == null) {
typeInfo = getTypeInfo(location, job);
}
if (typeInfo != null && oi == null) {
oi = OrcStruct.createObjectInspector(typeInfo);
}
if (!UDFContext.getUDFContext().isFrontend()) {
if (p.getProperty(signature + RequiredColumnsSuffix) != null) {
mRequiredColumns = (boolean[]) ObjectSerializer.deserialize(p
.getProperty(signature + RequiredColumnsSuffix));
job.getConfiguration().setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
job.getConfiguration().set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
getReqiredColumnIdString(mRequiredColumns));
if (p.getProperty(signature + SearchArgsSuffix) != null) {
// Bug in setSearchArgument which always expects READ_COLUMN_NAMES_CONF_STR to be set
job.getConfiguration().set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
getReqiredColumnNamesString(getSchema(location, job), mRequiredColumns));
}
} else if (p.getProperty(signature + SearchArgsSuffix) != null) {
// Bug in setSearchArgument which always expects READ_COLUMN_NAMES_CONF_STR to be set
job.getConfiguration().set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
getReqiredColumnNamesString(getSchema(location, job)));
}
if (p.getProperty(signature + SearchArgsSuffix) != null) {
job.getConfiguration().set(SARG_PUSHDOWN, p.getProperty(signature + SearchArgsSuffix));
}
}
FileInputFormat.setInputPaths(job, location);
}
private TypeInfo getTypeInfo(String location, Job job) throws IOException {
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
TypeInfo typeInfo = (TypeInfo) ObjectSerializer.deserialize(p.getProperty(signature + SchemaSignatureSuffix));
if (typeInfo == null) {
typeInfo = getTypeInfoFromLocation(location, job);
}
if (typeInfo != null) {
p.setProperty(signature + SchemaSignatureSuffix, ObjectSerializer.serialize(typeInfo));
}
return typeInfo;
}
@Override
@SuppressWarnings("unchecked")
public void setConf(Configuration conf) {
super.setConf(conf);
try {
// TODO: We should replace PIG_REDUCE_STORES with something else in
// tez. For now, we keep it since it's used in PigOutputFormat.
this.stores = (List<POStore>) ObjectSerializer.deserialize(
conf.get(JobControlCompiler.PIG_REDUCE_STORES));
this.loads = (List<FileSpec>) ObjectSerializer.deserialize(
conf.get("pig.inputs"));
} catch (IOException e) {
LOG.warn("Failed to deserialize the store list", e);
}
}
/**
* stolen from JobControlCompiler TODO: refactor it to share this
*
* @param physicalPlan
* @param poLoad
* @param jobConf
* @return
* @throws java.io.IOException
*/
private static JobConf configureLoader(PhysicalPlan physicalPlan,
POLoad poLoad, JobConf jobConf) throws IOException {
Job job = new Job(jobConf);
LoadFunc loadFunc = poLoad.getLoadFunc();
loadFunc.setLocation(poLoad.getLFile().getFileName(), job);
// stolen from JobControlCompiler
ArrayList<FileSpec> pigInputs = new ArrayList<FileSpec>();
// Store the inp filespecs
pigInputs.add(poLoad.getLFile());
ArrayList<List<OperatorKey>> inpTargets = Lists.newArrayList();
ArrayList<String> inpSignatures = Lists.newArrayList();
ArrayList<Long> inpLimits = Lists.newArrayList();
// Store the target operators for tuples read
// from this input
List<PhysicalOperator> loadSuccessors = physicalPlan
.getSuccessors(poLoad);
List<OperatorKey> loadSuccessorsKeys = Lists.newArrayList();
if (loadSuccessors != null) {
for (PhysicalOperator loadSuccessor : loadSuccessors) {
loadSuccessorsKeys.add(loadSuccessor.getOperatorKey());
}
}
inpTargets.add(loadSuccessorsKeys);
inpSignatures.add(poLoad.getSignature());
inpLimits.add(poLoad.getLimit());
jobConf.set("pig.inputs", ObjectSerializer.serialize(pigInputs));
jobConf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
jobConf.set("pig.inpSignatures",
ObjectSerializer.serialize(inpSignatures));
jobConf.set("pig.inpLimits", ObjectSerializer.serialize(inpLimits));
return jobConf;
}
void initializeJobConf() {
if (this.jobConf == null) {
this.jobConf = KryoSerializer.deserializeJobConf(this.confBytes);
PigMapReduce.sJobConfInternal.set(jobConf);
try {
MapRedUtil.setupUDFContext(jobConf);
PigContext pc = (PigContext) ObjectSerializer.deserialize(jobConf.get("pig.pigContext"));
SchemaTupleBackend.initialize(jobConf, pc);
} catch (IOException ioe) {
String msg = "Problem while configuring UDFContext from ForEachConverter.";
throw new RuntimeException(msg, ioe);
}
}
}
public static JobConf newJobConf(PigContext pigContext) throws IOException {
JobConf jobConf = new JobConf(
ConfigurationUtil.toConfiguration(pigContext.getProperties()));
jobConf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
UDFContext.getUDFContext().serialize(jobConf);
jobConf.set("udf.import.list",
ObjectSerializer.serialize(PigContext.getPackageImportList()));
return jobConf;
}
/**
* @param inputIndex
* @param conf
* @return
* @throws IOException
*/
@SuppressWarnings("unchecked")
private static LoadFunc getLoadFunc(int inputIndex, Configuration conf) throws IOException {
ArrayList<FileSpec> inputs =
(ArrayList<FileSpec>) ObjectSerializer.deserialize(
conf.get(PIG_INPUTS));
FuncSpec loadFuncSpec = inputs.get(inputIndex).getFuncSpec();
return (LoadFunc) PigContext.instantiateFuncFromSpec(loadFuncSpec);
}