下面列出了org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader#org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@SuppressWarnings("unchecked")
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
this.reader = reader;
// Get the schema string from the UDFContext object.
UDFContext udfc = UDFContext.getUDFContext();
Properties p =
udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
String strSchema = p.getProperty(SCHEMA_SIGNATURE);
if (strSchema == null) {
throw new IOException("Could not find schema in UDF context");
}
// Parse the schema from the string stored in the properties object.
schema = new ResourceSchema(Utils.getSchemaFromString(strSchema));
jsonFactory = new JsonFactory();
}
@SuppressWarnings("rawtypes")
@Override
public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
super.prepareToRead(reader, split);
numRowsSampled = 0;
avgTupleMemSz = 0;
rowNum = 0;
skipInterval = -1;
memToSkipPerSample = 0;
numRowSplTupleReturned = false;
newSample = null;
Configuration conf = split.getConf();
sampleRate = conf.getInt(PigConfiguration.PIG_POISSON_SAMPLER_SAMPLE_RATE, DEFAULT_SAMPLE_RATE);
heapPerc = conf.getFloat(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE,
PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE);
}
private boolean initializeReader() throws IOException,
InterruptedException {
if(curSplitIndex > inpSplits.size() - 1) {
// past the last split, we are done
return false;
}
if(reader != null){
reader.close();
}
InputSplit curSplit = inpSplits.get(curSplitIndex);
TaskAttemptContext tAContext = HadoopShims.createTaskAttemptContext(conf,
new TaskAttemptID());
reader = inputFormat.createRecordReader(curSplit, tAContext);
reader.initialize(curSplit, tAContext);
// create a dummy pigsplit - other than the actual split, the other
// params are really not needed here where we are just reading the
// input completely
PigSplit pigSplit = new PigSplit(new InputSplit[] {curSplit}, -1,
new ArrayList<OperatorKey>(), -1);
// Set the conf object so that if the wrappedLoadFunc uses it,
// it won't be null
pigSplit.setConf(conf);
wrappedLoadFunc.prepareToRead(reader, pigSplit);
return true;
}
@Override
public void attachInputs(Map<String, LogicalInput> inputs,
Configuration conf)
throws ExecException {
this.conf = conf;
LogicalInput logInput = inputs.get(inputKey);
if (logInput == null || !(logInput instanceof MRInput)) {
throw new ExecException("POSimpleTezLoad only accepts MRInputs");
}
input = (MRInput) logInput;
try {
reader = input.getReader();
// Set split index, MergeCoGroup need it. And this input is the only input of the
// MergeCoGroup vertex.
if (reader instanceof MRReader) {
int splitIndex = ((PigSplit)((MRReader)reader).getSplit()).getSplitIndex();
PigMapReduce.sJobContext.getConfiguration().setInt(PigImplConstants.PIG_SPLIT_INDEX, splitIndex);
}
} catch (IOException e) {
throw new ExecException(e);
}
}
@Override
public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
// Save reader to use in getNext()
this.reader = reader;
splitIndex = split.getSplitIndex();
// Get schema from front-end
UDFContext udfc = UDFContext.getUDFContext();
Properties p = udfc.getUDFProperties(this.getClass(), new String[] { udfContextSignature });
String strSchema = p.getProperty(SCHEMA_SIGNATURE);
if (strSchema == null) {
throw new IOException("Could not find schema in UDF context");
}
schema = new ResourceSchema(Utils.getSchemaFromString(strSchema));
requiredFields = (boolean[]) ObjectSerializer.deserialize(p.getProperty(REQUIRED_FIELDS_SIGNATURE));
if (requiredFields != null) {
numRequiredFields = 0;
for (int i = 0; i < requiredFields.length; i++) {
if (requiredFields[i])
numRequiredFields++;
}
}
}
@Test
public void testGetNext() throws Exception {
setUpJob();
List<Pair<String, MetricFeature>> mocks = new ArrayList<Pair<String, MetricFeature>>();
mocks.add(new Pair<String, MetricFeature>("", metric));
MockRecordReader<String, MetricFeature> mockRecordReader = new MockRecordReader<String, MetricFeature>(mocks);
MetricFeatureLoader loader = new MetricFeatureLoader();
setLocation(loader, new Job(), "mockInst", "mockZk",accumuloMiniClusterDriver.getRootPassword());
loader.prepareToRead(mockRecordReader, new PigSplit());
org.apache.pig.data.Tuple t;
while((t = loader.getNext()) != null) {
assertEquals(metric.getTimestamp(), t.get(0));
assertEquals(TimeUnit.MINUTES.toString(), t.get(1));
assertEquals(metric.getGroup(), t.get(2));
assertEquals(metric.getType(), t.get(3));
assertEquals(metric.getName(), t.get(4));
assertEquals(metric.getVisibility(), t.get(5));
assertEquals(metric.getVector().getSum(), t.get(6));
}
}
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
this.reader = reader;
final String resourceSchemaAsStr = getValueFromUDFContext(this.contextSignature,RESOURCE_SCHEMA_SIGNATURE);
if (resourceSchemaAsStr == null) {
throw new IOException("Could not find schema in UDF context");
}
schema = (ResourceSchema)ObjectSerializer.deserialize(resourceSchemaAsStr);
}
@SuppressWarnings("rawtypes")
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
AvroStorageLog.funcCall("prepareToRead");
this.reader = (PigAvroRecordReader) reader;
}
public void prepareToRead(RecordReader reader, PigSplit split)
{
this.reader = reader;
if (reader instanceof CqlRecordReader) {
nativeProtocolVersion = ((CqlRecordReader) reader).getNativeProtocolVersion();
}
}
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader reader, PigSplit split)
throws IOException {
LOG.debug("LoadFunc.prepareToRead({}, {})", reader, split);
this.reader = reader;
}
@Override
public void prepareToRead(RecordReader reader, PigSplit split) {
in = reader;
if (tagFile || tagPath) {
sourcePath = ((FileSplit)split.getWrappedSplit()).getPath();
}
}
@SuppressWarnings("rawtypes")
@Override
public final void prepareToRead(final RecordReader r, final PigSplit s)
throws IOException {
reader = r;
split = s;
}
private void writeDebugHeader() {
processError("===== Task Information Header =====" );
processError("\nCommand: " + command);
processError("\nStart time: " + new Date(System.currentTimeMillis()));
if (job.getBoolean(MRConfiguration.TASK_IS_MAP, false)) {
MapContext context = (MapContext)PigMapReduce.sJobContext;
PigSplit pigSplit = (PigSplit)context.getInputSplit();
int numPaths = pigSplit.getNumPaths();
processError("\nPigSplit contains " + numPaths + " wrappedSplits.");
StringBuilder sb = new StringBuilder();
for(int i = 0; i < numPaths; i++) {
InputSplit wrappedSplit = pigSplit.getWrappedSplit(i);
if (wrappedSplit instanceof FileSplit) {
FileSplit mapInputFileSplit = (FileSplit)wrappedSplit;
sb.append("\nInput-split: file=");
sb.append(mapInputFileSplit.getPath());
sb.append(" start-offset=");
sb.append(Long.toString(mapInputFileSplit.getStart()));
sb.append(" length=");
sb.append(Long.toString(mapInputFileSplit.getLength()));
processError(sb.toString());
sb.setLength(0);
}
}
}
processError("\n===== * * * =====\n");
}
@Test
public void test1() throws IOException, InterruptedException {
ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
rawSplits.add(new DummyInputSplit(500, new String[] {
"l1", "l2", "l3"
}));
rawSplits.add(new DummyInputSplit(400, new String[] {
"l1", "l2", "l3"
}));
rawSplits.add(new DummyInputSplit(400, new String[] {
"l1", "l4", "l5"
}));
List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
null, true, conf);
Assert.assertEquals(result.size(), 2);
int index = 0;
for (InputSplit split : result) {
PigSplit pigSplit = (PigSplit) split;
int len = pigSplit.getNumPaths();
if (index == 0) {
Assert.assertEquals(2, len);
checkLocations(pigSplit.getLocations(), new String[] {
"l1", "l2", "l3"
});
Assert.assertEquals(500, pigSplit.getLength(0));
Assert.assertEquals(400, pigSplit.getLength(1));
}
else {
Assert.assertEquals(1, len);
checkLocations(pigSplit.getLocations(), new String[] {
"l1", "l4", "l5"
});
Assert.assertEquals(400, pigSplit.getLength(0));
}
index++;
}
}
@Test
public void test3() throws IOException, InterruptedException {
ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
rawSplits.add(new DummyInputSplit(500, new String[] {
"l1", "l2", "l3"
}));
rawSplits.add(new DummyInputSplit(200, new String[] {
"l1", "l2", "l3"
}));
rawSplits.add(new DummyInputSplit(100, new String[] {
"l1", "l4", "l5"
}));
List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
null, true, conf);
Assert.assertEquals(1, result.size());
for (InputSplit split : result) {
PigSplit pigSplit = (PigSplit) split;
int len = pigSplit.getNumPaths();
Assert.assertEquals(3, len);
checkLocations(pigSplit.getLocations(), new String[] {
"l1", "l2", "l3", "l4", "l5"
});
Assert.assertEquals(500, pigSplit.getLength(0));
Assert.assertEquals(200, pigSplit.getLength(1));
Assert.assertEquals(100, pigSplit.getLength(2));
}
}
@Test
public void test8() throws IOException, InterruptedException {
ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
rawSplits.add(new DummyInputSplit(100, new String[] {
"l1", "l2", "l3"
}));
rawSplits.add(new DummyInputSplit(100, new String[] {
"l1", "l2", "l3"
}));
rawSplits.add(new DummyInputSplit(200, new String[] {
"l1", "l4", "l5"
}));
List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
null, true, conf);
Assert.assertEquals(result.size(), 1);
int index = 0;
for (InputSplit split : result) {
PigSplit pigSplit = (PigSplit) split;
int len = pigSplit.getNumPaths();
Assert.assertEquals(3, len);
checkLocations(pigSplit.getLocations(), new String[] {
"l1", "l2", "l3", "l4", "l5"
});
Assert.assertEquals(200, pigSplit.getLength(0));
Assert.assertEquals(100, pigSplit.getLength(1));
Assert.assertEquals(100, pigSplit.getLength(2));
index++;
}
}
@Test
public void test9() throws IOException, InterruptedException {
// verify locations in order
ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
rawSplits.add(new DummyInputSplit(100, new String[] {
"l1", "l2", "l3"
}));
rawSplits.add(new DummyInputSplit(200, new String[] {
"l3", "l4", "l5"
}));
rawSplits.add(new DummyInputSplit(400, new String[] {
"l5", "l6", "l1"
}));
List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
null, true, conf);
Assert.assertEquals(result.size(), 1);
int index = 0;
for (InputSplit split : result) {
PigSplit pigSplit = (PigSplit) split;
int len = pigSplit.getNumPaths();
Assert.assertEquals(3, len);
// only 5 locations are in list: refer to PIG-1648 for more details
checkLocationOrdering(pigSplit.getLocations(), new String[] {
"l5", "l1", "l6", "l3", "l4"
});
Assert.assertEquals(400, pigSplit.getLength(0));
Assert.assertEquals(200, pigSplit.getLength(1));
Assert.assertEquals(100, pigSplit.getLength(2));
index++;
}
}
@SuppressWarnings("rawtypes")
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
AvroStorageLog.funcCall("prepareToRead");
this.reader = (PigAvroRecordReader) reader;
}
@Override
public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader reader, PigSplit split) {
in = reader;
splitIndex = split.getSplitIndex();
if (headerTreatment == Headers.DEFAULT) {
headerTreatment = Headers.READ_INPUT_HEADER;
}
}
@SuppressWarnings("rawtypes")
@Override
public void prepareToRead(RecordReader reader, PigSplit arg1)
throws IOException {
this.reader = (ResourceRecordReader) reader;
// FileSplit fSplit = (FileSplit) arg1.getWrappedSplit();
// System.err.format("Prepare to read(%s) (%d-%d)\n",
// fSplit.getPath().toUri().toASCIIString(),
// fSplit.getStart(),fSplit.getLength());
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
this.reader = reader;
aliasesTupleNames = StringUtils.tokenize(getUDFProperties().getProperty(
InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS));
}
@Test
public void testGetNext() throws Exception {
setUpJob();
List<Pair<String, EntityWritable>> mocks = new ArrayList<Pair<String, EntityWritable>>();
mocks.add(new Pair<String, EntityWritable>("", new EntityWritable(entity)));
MockRecordReader<String, EntityWritable> mockRecordReader = new MockRecordReader<String, EntityWritable>(mocks);
EntityLoader loader = new EntityLoader("q.eq('key','val')");
loader.prepareToRead(mockRecordReader, new PigSplit());
org.apache.pig.data.Tuple t;
int count = 0;
Iterator<org.calrissian.mango.domain.Attribute> attributes = entity.getAttributes().iterator();
while((t = loader.getNext()) != null) {
org.calrissian.mango.domain.Attribute attribute = attributes.next();
count++;
if(count == 1) {
assertEquals(entity.getType(), t.get(0));
assertEquals(entity.getId(), t.get(1));
assertEquals(attribute.getKey(), t.get(2));
assertEquals(loader.registry.getAlias(attribute.getValue()), t.get(3));
assertEquals(loader.registry.encode(attribute.getValue()), t.get(4));
} else if(count == 2) {
assertEquals(entity.getType(), t.get(0));
assertEquals(entity.getId(), t.get(1));
assertEquals(attribute.getKey(), t.get(2));
assertEquals(loader.registry.getAlias(attribute.getValue()), t.get(3));
assertEquals(loader.registry.encode(attribute.getValue()), t.get(4));
}
}
assertEquals(2, count);
}
@Test
public void testGetNext() throws Exception {
setUpJob();
List<Pair<String, EventWritable>> mocks = new ArrayList<Pair<String, EventWritable>>();
mocks.add(new Pair<String, EventWritable>("", new EventWritable(event)));
MockRecordReader<String, EventWritable> mockRecordReader = new MockRecordReader<String, EventWritable>(mocks);
EventLoader loader = new EventLoader("q.eq('key','val')");
loader.prepareToRead(mockRecordReader, new PigSplit());
org.apache.pig.data.Tuple t;
int count = 0;
Iterator<org.calrissian.mango.domain.Attribute> attributes = event.getAttributes().iterator();
while((t = loader.getNext()) != null) {
org.calrissian.mango.domain.Attribute attribute = attributes.next();
count++;
if(count == 1) {
assertEquals(event.getId(), t.get(0));
assertEquals(event.getTimestamp(), t.get(1));
assertEquals(attribute.getKey(), t.get(2));
assertEquals(loader.registry.getAlias(attribute.getValue()), t.get(3));
assertEquals(loader.registry.encode(attribute.getValue()), t.get(4));
} else if(count == 2) {
assertEquals(event.getId(), t.get(0));
assertEquals(event.getTimestamp(), t.get(1));
assertEquals(attribute.getKey(), t.get(2));
assertEquals(loader.registry.getAlias(attribute.getValue()), t.get(3));
assertEquals(loader.registry.encode(attribute.getValue()), t.get(4));
}
}
assertEquals(2, count);
}
@Override
public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader newReader, PigSplit pigSplit) {
// Note that for this Loader, we don't care about the PigSplit.
if (newReader instanceof ApacheHttpdLogfileRecordReader) {
this.reader = (ApacheHttpdLogfileRecordReader) newReader;
} else {
throw new IncorrectRecordReaderException();
}
}
@Override
public void prepareToRead(RecordReader newReader, PigSplit split) {
LOG.info("[{}]: prepareToRead() -> {}", signature, split);
this.reader = (IcebergRecordReader) newReader;
}
@Override
public void prepareToRead(RecordReader reader, PigSplit split) {
LOG.info(format("[%s]: prepareToRead() -> %s", signature, split));
this.reader = (IcebergRecordReader) reader;
}
@Override
public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
recordReader = (VespaSimpleJsonInputFormat.VespaJsonRecordReader) reader;
}
@Override
public void prepareToRead(final RecordReader reader, final PigSplit split) {
this.reader = reader;
}
public void prepareToRead(RecordReader reader, PigSplit split)
{
this.reader = reader;
}
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
this.reader = (SequenceFileRecordReader) reader;
}