下面列出了怎么用org.apache.hadoop.io.Text的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Split a line into key and value.
* @param line: a byte array of line containing UTF-8 bytes
* @param key: key of a record
* @param val: value of a record
* @throws IOException
*/
void splitKeyVal(byte[] line, int length, Text key, Text val)
throws IOException {
int numKeyFields = getNumOfKeyFields();
byte[] separator = getFieldSeparator();
// Need to find numKeyFields separators
int pos = UTF8ByteArrayUtils.findBytes(line, 0, length, separator);
for(int k=1; k<numKeyFields && pos!=-1; k++) {
pos = UTF8ByteArrayUtils.findBytes(line, pos + separator.length,
length, separator);
}
try {
if (pos == -1) {
key.set(line, 0, length);
val.set("");
} else {
StreamKeyValUtil.splitKeyVal(line, 0, length, key, val, pos, separator.length);
}
} catch (CharacterCodingException e) {
LOG.warn(StringUtils.stringifyException(e));
}
}
public WritableSortable(int j) throws IOException {
seed = r.nextLong();
r.setSeed(seed);
Text t = new Text();
StringBuilder sb = new StringBuilder();
indices = new int[j];
offsets = new int[j];
check = new String[j];
DataOutputBuffer dob = new DataOutputBuffer();
for (int i = 0; i < j; ++i) {
indices[i] = i;
offsets[i] = dob.getLength();
genRandom(t, r.nextInt(15) + 1, sb);
t.write(dob);
check[i] = t.toString();
}
eob = dob.getLength();
bytes = dob.getData();
comparator = WritableComparator.get(Text.class);
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, queueName);
WritableUtils.writeEnum(out, queueState);
if(schedulingInfo!= null) {
Text.writeString(out, schedulingInfo);
}else {
Text.writeString(out, "N/A");
}
out.writeInt(stats.length);
for (JobStatus stat : stats) {
stat.write(out);
}
out.writeInt(children.size());
for(QueueInfo childQueueInfo : children) {
childQueueInfo.write(out);
}
}
@Override
public void map(LongWritable key,
Text value,
Context context) throws IOException, InterruptedException {
UserLog userLog = UserLog.fromText(value);
Tuple outputKey = new Tuple();
outputKey.setString(KeyFields.USER, userLog.getName());
outputKey.setInt(KeyFields.DATASET, USER_LOGS);
Tuple outputValue = new Tuple();
outputValue.setInt(ValueFields.DATASET, USER_LOGS);
outputValue.setString(ValueFields.DATA, value.toString());
context.write(outputKey, outputValue);
}
public void reduce(LongWritable key, Iterator<Text> values,
OutputCollector<LongWritable, Text> output,
Reporter reporter) throws IOException {
while (values.hasNext()) {
Text value = values.next();
writeFlag(conf, "reduce." + name + ".value." + value);
key.set(10);
output.collect(key, value);
if (byValue) {
assertEquals(10, key.get());
} else {
assertNotSame(10, key.get());
}
key.set(11);
}
}
public void map(Object key, Object value,
OutputCollector output, Reporter reporter) throws IOException {
if (this.reporter == null) {
this.reporter = reporter;
}
addLongValue("totalCount", 1);
TaggedMapOutput aRecord = generateTaggedMapOutput(value);
if (aRecord == null) {
addLongValue("discardedCount", 1);
return;
}
Text groupKey = generateGroupKey(aRecord);
if (groupKey == null) {
addLongValue("nullGroupKeyCount", 1);
return;
}
output.collect(groupKey, aRecord);
addLongValue("collectedCount", 1);
}
@Override
public String call(Text v1)
throws Exception
{
//parse input line
String line = v1.toString();
String[] cols = IOUtilFunctions.split(line, _delim);
//determine number of non-zeros of row (w/o string parsing)
int lnnz = IOUtilFunctions.countNnz(cols);
//update counters
_aNnz.add( lnnz );
return line;
}
@Override // IOMapperBase
void collectStats(OutputCollector<Text, Text> output,
String name,
long execTime,
Long objSize) throws IOException {
long totalSize = objSize.longValue();
float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
LOG.info("Number of bytes processed = " + totalSize);
LOG.info("Exec time = " + execTime);
LOG.info("IO rate = " + ioRateMbSec);
output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"),
new Text(String.valueOf(1)));
output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
new Text(String.valueOf(totalSize)));
output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
new Text(String.valueOf(execTime)));
output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
new Text(String.valueOf(ioRateMbSec*1000)));
output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"),
new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws IOException {
if (!isAllowedDelegationTokenOp()) {
throw new IOException(
"Delegation Token can be cancelled only with kerberos authentication");
}
org.apache.hadoop.yarn.api.records.Token protoToken = request.getDelegationToken();
Token<MRDelegationTokenIdentifier> token =
new Token<MRDelegationTokenIdentifier>(
protoToken.getIdentifier().array(), protoToken.getPassword()
.array(), new Text(protoToken.getKind()), new Text(
protoToken.getService()));
String user = UserGroupInformation.getCurrentUser().getUserName();
jhsDTSecretManager.cancelToken(token, user);
return Records.newRecord(CancelDelegationTokenResponse.class);
}
private SequenceFile.Writer getSequenceFileWriter(Configuration configuration, FileSystem fs,
long timeInMillis, byte versionNumber) throws IOException {
String snapshotDir = configuration.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timeInMillis);
SequenceFile.Metadata metadata = new SequenceFile.Metadata();
if (versionNumber > 1) {
metadata.set(new Text(TxConstants.TransactionLog.VERSION_KEY),
new Text(Byte.toString(versionNumber)));
}
switch (versionNumber) {
case 1:
case 2:
return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class,
co.cask.tephra.persist.TransactionEdit.class,
SequenceFile.CompressionType.NONE, null, null, metadata);
default:
return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class,
TransactionEdit.class, SequenceFile.CompressionType.NONE,
null, null, metadata);
}
}
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// Clear our lists
listA.clear();
listB.clear();
// iterate through all our values, binning each record based on what
// it was tagged with. Make sure to remove the tag!
while (values.iterator().hasNext()) {
tmp = values.iterator().next();
if (tmp.charAt(0) == 'A') {
listA.add(new Text(tmp.toString().substring(1)));
} else if (tmp.charAt('0') == 'B') {
listB.add(new Text(tmp.toString().substring(1)));
}
}
// Execute our join logic now that the lists are filled
executeJoinLogic(context);
}
protected void setUp() throws ProtocolException, IOException {
// prepare a temp file with expectedText as its content
// This system property is defined in ./src/plugin/build-plugin.xml
String path = System.getProperty("test.data");
if (path != null) {
File tempDir = new File(path);
if (!tempDir.exists())
tempDir.mkdir();
tempFile = File.createTempFile("nutch.test.plugin.ExtParser.",".txt",tempDir);
} else {
// otherwise in java.io.tmpdir
tempFile = File.createTempFile("nutch.test.plugin.ExtParser.",".txt");
}
urlString = tempFile.toURL().toString();
FileOutputStream fos = new FileOutputStream(tempFile);
fos.write(expectedText.getBytes());
fos.close();
// get nutch content
Protocol protocol = new ProtocolFactory(NutchConfiguration.create()).getProtocol(urlString);
content = protocol.getProtocolOutput(new Text(urlString), new CrawlDatum()).getContent();
protocol = null;
}
@Test
public void partial1ModeStringKeysExplicitParams() throws Exception {
ObjectInspector[] inspectors = new ObjectInspector[] { stringInspector, doubleInspector, intInspector, floatInspector };
GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false, false);
try (GenericUDAFEvaluator eval = new DataToDoubleSummarySketchUDAF().getEvaluator(info)) {
ObjectInspector resultInspector = eval.init(Mode.PARTIAL1, inspectors);
checkIntermediateResultInspector(resultInspector);
@SuppressWarnings("unchecked")
State<DoubleSummary> state = (State<DoubleSummary>) eval.getNewAggregationBuffer();
eval.iterate(state, new Object[] {new Text("a"), new DoubleWritable(1), new IntWritable(32), new FloatWritable(0.99f)});
eval.iterate(state, new Object[] {new Text("b"), new DoubleWritable(1), new IntWritable(32), new FloatWritable(0.99f)});
Object result = eval.terminatePartial(state);
Assert.assertNotNull(result);
Assert.assertTrue(result instanceof List);
List<?> r = (List<?>) result;
Assert.assertEquals(r.size(), 2);
Assert.assertEquals(((IntWritable) r.get(0)).get(), 32);
Sketch<DoubleSummary> resultSketch = Sketches.heapifySketch(
BytesWritableHelper.wrapAsMemory((BytesWritable) r.get(1)), new DoubleSummaryDeserializer());
// because of sampling probability < 1
Assert.assertTrue(resultSketch.isEstimationMode());
Assert.assertEquals(resultSketch.getEstimate(), 2.0, 0.05);
}
}
private MockFileSystem createFileSystemForServiceName(final String service)
throws IOException {
MockFileSystem mockFs = new MockFileSystem();
when(mockFs.getCanonicalServiceName()).thenReturn(service);
when(mockFs.getDelegationToken(any(String.class))).thenAnswer(
new Answer<Token<?>>() {
int unique = 0;
@Override
public Token<?> answer(InvocationOnMock invocation) throws Throwable {
Token<?> token = new Token<TokenIdentifier>();
token.setService(new Text(service));
// use unique value so when we restore from token storage, we can
// tell if it's really the same token
token.setKind(new Text("token" + unique++));
return token;
}
});
return mockFs;
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String arr[] = value.toString().split(",");
if (arr.length == 2) {
put = new Put(Bytes.toBytes(arr[0]));
put.add(Bytes.toBytes("blog"), Bytes.toBytes("url"),
Bytes.toBytes(arr[1]));
htable.put(put);
if ((++count % 100) == 0) {
context.setStatus("Mapper has insert records=" + count);
context.progress();
LOG.info("Mapper has insert records=" + count);
}
}
if (!wal) {
put.setWriteToWAL(false);
}
}
protected Object translateWritableToPigDataType(Writable w, byte dataType) {
switch(dataType) {
case DataType.CHARARRAY: return ((Text) w).toString();
case DataType.BYTEARRAY:
BytesWritable bw = (BytesWritable) w;
// Make a copy
return new DataByteArray(bw.getBytes(), 0, bw.getLength());
case DataType.BOOLEAN: return ((BooleanWritable) w).get();
case DataType.INTEGER: return ((IntWritable) w).get();
case DataType.LONG: return ((LongWritable) w).get();
case DataType.FLOAT: return ((FloatWritable) w).get();
case DataType.DOUBLE: return ((DoubleWritable) w).get();
case DataType.BYTE: return ((ByteWritable) w).get();
case DataType.DATETIME: return ((DateTimeWritable) w).get();
}
return null;
}
@Override
protected void processInputStream(InputStream stream, final FlowFile flowFile, final Writer writer) throws IOException {
try (final ZipInputStream zipIn = new ZipInputStream(new BufferedInputStream(stream))) {
ZipEntry zipEntry;
while ((zipEntry = zipIn.getNextEntry()) != null) {
if (zipEntry.isDirectory()) {
continue;
}
final File file = new File(zipEntry.getName());
final String key = file.getName();
long fileSize = zipEntry.getSize();
final InputStreamWritable inStreamWritable = new InputStreamWritable(zipIn, (int) fileSize);
writer.append(new Text(key), inStreamWritable);
logger.debug("Appending FlowFile {} to Sequence File", new Object[]{key});
}
}
}
@Override
public int getPartition(Text key, NullWritable value, int numReduceTasks) {
// get first byte, the first byte value is the dic col index ,start from 0
int colIndex = key.getBytes()[0];
int colReduceNum = reduceNumArr[colIndex];
int colReduceNumOffset = 0;
for (int i = 0; i < colIndex; i++) {
colReduceNumOffset += reduceNumArr[i];
}
// Calculate reduce number , reduce num = (value.hash % colReduceNum) + colReduceNumOffset
byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
int hashCode = new Text(keyBytes).hashCode() & 0x7FFFFFFF;
return hashCode % colReduceNum + colReduceNumOffset;
}
@VisibleForTesting
void addHistoryToken(Credentials ts) throws IOException, InterruptedException {
/* check if we have a hsproxy, if not, no need */
MRClientProtocol hsProxy = clientCache.getInitializedHSProxy();
if (UserGroupInformation.isSecurityEnabled() && (hsProxy != null)) {
/*
* note that get delegation token was called. Again this is hack for oozie
* to make sure we add history server delegation tokens to the credentials
*/
RMDelegationTokenSelector tokenSelector = new RMDelegationTokenSelector();
Text service = resMgrDelegate.getRMDelegationTokenService();
if (tokenSelector.selectToken(service, ts.getAllTokens()) != null) {
Text hsService = SecurityUtil.buildTokenService(hsProxy
.getConnectAddress());
if (ts.getToken(hsService) == null) {
ts.addToken(hsService, getDelegationTokenFromHS(hsProxy));
}
}
}
}
@Override // IOMapperBase
void collectStats(OutputCollector<Text, Text> output,
String name,
long execTime,
Long objSize) throws IOException {
long totalSize = objSize.longValue();
float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
LOG.info("Number of bytes processed = " + totalSize);
LOG.info("Exec time = " + execTime);
LOG.info("IO rate = " + ioRateMbSec);
output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"),
new Text(String.valueOf(1)));
output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
new Text(String.valueOf(totalSize)));
output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
new Text(String.valueOf(execTime)));
output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
new Text(String.valueOf(ioRateMbSec*1000)));
output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"),
new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
}
public static Range getLiteralRange(String fieldName, String normalizedQueryTerm) {
if (null == fieldName) {
return new Range(new Text(normalizedQueryTerm));
}
Key startKey = new Key(normalizedQueryTerm, fieldName, "");
return new Range(startKey, false, startKey.followingKey(PartialKey.ROW_COLFAM), false);
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// writes each character of the line with a UUID
String line = value.toString();
for (int i = 0; i < line.length(); i += 1) {
Group group = GROUP_FACTORY.newGroup();
group.add(0, Binary.fromString(UUID.randomUUID().toString()));
group.add(1, Binary.fromString(line.substring(i, i+1)));
context.write(null, group);
}
}
public synchronized void readFields(DataInput in) throws IOException {
this.jobid = new JobID();
this.jobid.readFields(in);
this.setupProgress = in.readFloat();
this.mapProgress = in.readFloat();
this.reduceProgress = in.readFloat();
this.cleanupProgress = in.readFloat();
this.runState = WritableUtils.readEnum(in, State.class);
this.startTime = in.readLong();
this.user = StringInterner.weakIntern(Text.readString(in));
this.priority = WritableUtils.readEnum(in, JobPriority.class);
this.schedulingInfo = StringInterner.weakIntern(Text.readString(in));
this.finishTime = in.readLong();
this.isRetired = in.readBoolean();
this.historyFile = StringInterner.weakIntern(Text.readString(in));
this.jobName = StringInterner.weakIntern(Text.readString(in));
this.trackingUrl = StringInterner.weakIntern(Text.readString(in));
this.jobFile = StringInterner.weakIntern(Text.readString(in));
this.isUber = in.readBoolean();
// De-serialize the job's ACLs
int numACLs = in.readInt();
for (int i = 0; i < numACLs; i++) {
JobACL aclType = WritableUtils.readEnum(in, JobACL.class);
AccessControlList acl = new AccessControlList(" ");
acl.readFields(in);
this.jobACLs.put(aclType, acl);
}
}
@Override
public void initialize(PipeMapRed pipeMapRed) throws IOException {
super.initialize(pipeMapRed);
clientIn = pipeMapRed.getClientInput();
conf = pipeMapRed.getConfiguration();
lineReader = new LineReader((InputStream)clientIn, conf);
key = new Text();
line = new Text();
}
public void reduce(MySortKey key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
System.out.println(value.toString()+","+key.toString());
context.write(value, key);
}
}
private void registerOffsets(List<Text> tableNames, TreeMap<Text,Integer> maxPartitionsByTable) {
HashMap<Text,Integer> offsetsByCategoryName = new HashMap<>();
// start the offsets from the back
int previousOffsetStart = this.reduceTasks;
for (Text tableName : tableNames) {
previousOffsetStart = registerOffsetForTable(maxPartitionsByTable, offsetsByCategoryName, previousOffsetStart, tableName);
}
}
public boolean doRun(Config upcolConfig) throws Exception {
JobConf jobConf = new JobConf(getConf(), UpdateColumnJob.class);
jobConf.setKeepFailedTaskFiles(false);
jobConf.setNumReduceTasks(0);
String jobName = String.format("indexr-upcol-%s-%s-%s",
upcolConfig.table,
LocalDateTime.now().format(timeFormatter),
RandomStringUtils.randomAlphabetic(5));
jobConf.setJobName(jobName);
jobConf.set(CONFKEY, JsonUtil.toJson(upcolConfig));
Path workDir = new Path(jobConf.getWorkingDirectory(), jobName);
jobConf.setWorkingDirectory(workDir);
Job job = Job.getInstance(jobConf);
job.setInputFormatClass(SegmentInputFormat.class);
job.setMapperClass(UpColSegmentMapper.class);
job.setJarByClass(UpdateColumnJob.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setMapSpeculativeExecution(false);
job.setOutputFormatClass(UpColSegmentOutputFormat.class);
job.submit();
boolean ok = job.waitForCompletion(true);
if (!ok) {
TaskReport[] reports = job.getTaskReports(TaskType.MAP);
if (reports != null) {
for (TaskReport report : reports) {
log.error("Error in task [%s] : %s", report.getTaskId(), Arrays.toString(report.getDiagnostics()));
}
}
}
return ok;
}
private static JavaRDD<String[]> getSequenceFormatHiveInput(JavaSparkContext sc, String inputPath) {
return sc.sequenceFile(inputPath, BytesWritable.class, Text.class).values()
.map(new Function<Text, String[]>() {
@Override
public String[] call(Text text) throws Exception {
String s = Bytes.toString(text.getBytes(), 0, text.getLength());
return s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER, -1);
}
});
}
public void addPropertyDeleteToMutation(Mutation m, Property property) {
Preconditions.checkNotNull(m, "mutation cannot be null");
Preconditions.checkNotNull(property, "property cannot be null");
Text columnQualifier = KeyHelper.getColumnQualifierFromPropertyColumnQualifier(property, getNameSubstitutionStrategy());
ColumnVisibility columnVisibility = visibilityToAccumuloVisibility(property.getVisibility());
m.putDelete(AccumuloElement.CF_PROPERTY, columnQualifier, columnVisibility, currentTimeMillis());
for (Metadata.Entry metadataEntry : property.getMetadata().entrySet()) {
Text metadataEntryColumnQualifier = getPropertyMetadataColumnQualifierText(property, metadataEntry.getKey());
ColumnVisibility metadataEntryVisibility = visibilityToAccumuloVisibility(metadataEntry.getVisibility());
addPropertyMetadataItemDeleteToMutation(m, metadataEntryColumnQualifier, metadataEntryVisibility);
}
}
@Override
public void convert(LongWritable k1, Text v1)
{
String str = v1.toString();
//handle support for matrix market format
if(str.startsWith("%")) {
if(str.startsWith("%%"))
toIgnore=true;
hasValue=false;
return;
}
else if(toIgnore) {
toIgnore=false;
hasValue=false;
return;
}
//reset the tokenizer
st.reset( str );
//convert text to matrix cell
indexes.setIndexes( st.nextLong(), st.nextLong() );
if( indexes.getRowIndex() == 0 || indexes.getColumnIndex() == 0 ) {
hasValue = false;
return;
}
value.setValue( st.nextDouble() );
hasValue = true;
}