下面列出了org.apache.hadoop.io.Text#toString ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void readFields(DataInput dataInput) throws IOException {
Text text = new Text();
text.readFields(dataInput);
wifiProb = text.toString();
LongWritable reader = new LongWritable();
reader.readFields(dataInput);
hour = reader.get();
reader.readFields(dataInput);
newCustomer = (int)reader.get();
reader.readFields(dataInput);
oldCustomer = (int)reader.get();
}
protected static int readJSONLFrameFromInputSplit(InputSplit split, InputFormat<LongWritable, Text> inputFormat,
JobConf jobConf, Types.ValueType[] schema, Map<String, Integer> schemaMap, FrameBlock dest, int currentRow)
throws IOException, JSONException
{
RecordReader<LongWritable, Text> reader = inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
LongWritable key = new LongWritable();
Text value = new Text();
int row = currentRow;
try {
while (reader.next(key, value)) {
// Potential Problem if JSON/L Object is very large
JSONObject jsonObject = new JSONObject(value.toString());
for (Map.Entry<String, Integer> entry : schemaMap.entrySet()) {
String strCellValue = getStringFromJSONPath(jsonObject, entry.getKey());
dest.set(row, entry.getValue(), UtilFunctions.stringToObject(schema[entry.getValue()], strCellValue));
}
row++;
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
return row;
}
public BytesWritable evaluate(Text wkwrap) throws UDFArgumentException {
String wkt = wkwrap.toString();
try {
OGCGeometry ogcObj = OGCGeometry.fromText(wkt);
ogcObj.setSpatialReference(null);
if (ogcObj.geometryType().equals("Polygon")) {
return GeometryUtils.geometryToEsriShapeBytesWritable(ogcObj);
} else {
LogUtils.Log_InvalidType(LOG, GeometryUtils.OGCType.ST_POLYGON, GeometryUtils.OGCType.UNKNOWN);
return null;
}
} catch (Exception e) { // IllegalArgumentException, GeometryException
LogUtils.Log_InvalidText(LOG, wkt);
return null;
}
}
private void loadGenerations() throws IOException {
FileSystem fileSystem = _path.getFileSystem(_configuration);
FileStatus[] listStatus = fileSystem.listStatus(_path);
SortedSet<FileStatus> existing = new TreeSet<FileStatus>(Arrays.asList(listStatus));
if (existing.isEmpty()) {
return;
}
FileStatus last = existing.last();
Reader reader = new SequenceFile.Reader(fileSystem, last.getPath(), _configuration);
Text key = new Text();
LongWritable value = new LongWritable();
while (reader.next(key, value)) {
String name = key.toString();
long gen = value.get();
_namesToGenerations.put(name, gen);
Set<String> names = _generationsToNames.get(gen);
if (names == null) {
names = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
_generationsToNames.put(gen, names);
}
names.add(name);
}
reader.close();
existing.remove(last);
cleanupOldFiles(fileSystem, existing);
}
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
String pid = key.toString(); //key:"a" value:"d c b"
int runCount = context.getConfiguration().getInt("runCount", 1); //从上下文配置中获取runCount的值,如果值为空则默认为1
People people = null;
if (runCount == 1) { //第一次运行时,Map输入记录中没有PR值,将PR值默认设为1.0
people = People.fromMR("1.0" + People.fieldSeparator + value.toString());//参数String格式:"PeopleRank值 u1 u2..."
} else { //后续迭代的Map输入value中已经包含PR值,无需再指定
people = People.fromMR(value.toString());
}
context.write(new Text(pid), new Text(people.toString())); //Map输出格式:"userid pr值 userlist"
if (people.containsAttentionPeoples()) { //如果Map输入中有被关注人,则计算每个被关注人的概率并通过Map输出
double outValue = people.getPeopleRank() / people.getAttentionPeoples().length; //例如:a关注bcd,则bcd得到的概率都是1.0/3
for (int i = 0; i < people.getAttentionPeoples().length; i++) {
context.write(new Text(people.getAttentionPeoples()[i]), new Text(outValue + "")); //Map输出格式:"被关注人id 关注人投给被关注人的概率"
}
}
}
public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
String strs = value.toString();
Text uString = new Text(strs.substring(0, 1));
String[] friends = strs.substring(2).split(",");
//A:B,C,D,F,E,O
for (int i = 0; i < friends.length; i++) {
// 以<B,A>,<C,A>形式输出
context.write(new Text(friends[i]),uString);
}
}
public void map(K key, Text value,
OutputCollector<Text, LongWritable> output,
Reporter reporter)
throws IOException {
String text = value.toString();
Matcher matcher = pattern.matcher(text);
while (matcher.find()) {
output.collect(new Text(matcher.group(group)), new LongWritable(1));
}
}
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}
public final void map(final LongWritable key, final Text value, final Context context)
throws IOException, InterruptedException {
final String line = value.toString();
final String[] data = line.trim().split("\t");
if (data.length == 6) {
final String product = "JustADummyKey";
final Double sales = Double.parseDouble(data[4]);
word.set(product);
context.write(word, new DoubleWritable(sales));
}
}
@Override
public void reduce(Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
Map<String, IntWritable> vectorTerms= new HashMap<String, IntWritable>();
String freqString = new String("");
String termVector = new String("");
String word = new String("");
int freq = 0;
for(Text value: values){
termVector = value.toString();
int index = termVector.lastIndexOf(":");
word = termVector.substring(0, index);
freqString = termVector.substring(index+1);
freq = Integer.parseInt(freqString);
if (vectorTerms.containsKey(word)){
freq += vectorTerms.get(word).get();
}
vectorTerms.put(word, new IntWritable(freq));
}
Map<String, IntWritable> vectorTermsSorted = sortByValue(vectorTerms);
Set<Map.Entry<String, IntWritable>> set = vectorTermsSorted.entrySet();
Iterator<Map.Entry<String, IntWritable>> i = set.iterator();
while(i.hasNext()){
Map.Entry<String, IntWritable> me = (Map.Entry<String, IntWritable>)i.next();
if(me.getValue().get() >= CUTOFF){
String termVectorString = new String(me.getKey() + ":" + me.getValue());
context.write(key, new Text (termVectorString));
}
}
}
public void map(K key, Text value,
OutputCollector<Text, LongWritable> output,
Reporter reporter)
throws IOException {
// get input text
String text = value.toString(); // value is line of text
// tokenize the value
StringTokenizer st = new StringTokenizer(text);
while (st.hasMoreTokens()) {
// output <token,1> pairs
output.collect(new Text(st.nextToken()), new LongWritable(1));
}
}
public int getPartition(Text key, Text value, int numPartitions) {
IntervalKey intKey = new IntervalKey(key.toString());
if(intKey.statName.equals(StatSeries.STAT_ALL_SLOT_TIME.toString())) {
if(intKey.taskType.equals("MAP"))
return 0;
else if(intKey.taskType.equals("REDUCE"))
return 1;
} else if(intKey.statName.equals(
StatSeries.STAT_SUBMIT_PENDING_SLOT_TIME.toString())) {
if(intKey.taskType.equals("MAP"))
return 2;
else if(intKey.taskType.equals("REDUCE"))
return 3;
} else if(intKey.statName.equals(
StatSeries.STAT_LAUNCHED_PENDING_SLOT_TIME.toString())) {
if(intKey.taskType.equals("MAP"))
return 4;
else if(intKey.taskType.equals("REDUCE"))
return 5;
} else if(intKey.statName.equals(
StatSeries.STAT_FAILED_SLOT_TIME.toString())) {
if(intKey.taskType.equals("MAP"))
return 6;
else if(intKey.taskType.equals("REDUCE"))
return 7;
}
return 8;
}
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
public NutchDocument filter(NutchDocument doc, Parse parse, Text url,
CrawlDatum datum, Inlinks inlinks) throws IndexingException {
String url_s = url.toString();
addTime(doc, parse.getData(), url_s, datum);
addLength(doc, parse.getData(), url_s);
addType(doc, parse.getData(), url_s, datum);
resetTitle(doc, parse.getData(), url_s);
return doc;
}
OperationOutput(Text key, Object value) {
this(key.toString(), value);
}
@Override
protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
/**
* FileFormat
*
* Metis format
* ******
* Remote
*/
//example: 4elt_0.part
String fileName = key.toString();
String _parts[] = fileName.split("_");
String dotParts[] = _parts[1].split("\\.");
InputStream inputStream = new ByteArrayInputStream(value.getBytes());
int rank = Integer.parseInt(dotParts[0]);
if(verbose) {
System.out.println("Begin");
}
try {
Community c = new Community(inputStream,-1,nb_pass,precision);
Graph g =null;
boolean improvement = true;
double mod = c.modularity(), new_mod;
int level = 0;
if (verbose) {
System.out.print("" + rank + ":" + "level " + level );
System.out.print(" start computation");
System.out.println( " network size: "
+ c.getG().getNb_nodes() + " nodes, "
+ c.getG().getNb_links() + " links, "
+ c.getG().getTotal_weight() + " weight." );
}
improvement = c.one_level();
new_mod = c.modularity();
if (++level == display_level)
g.display();
if (display_level == -1){
String filepath = outpath + File.separator + "out_" + level + "_" + rank + ".txt";
c.display_partition(filepath);
}
g = c.partition2graph_binary();
if(verbose) {
System.out.println( " network size: "
+ c.getG().getNb_nodes() + " nodes, "
+ c.getG().getNb_links() + " links, "
+ c.getG().getTotal_weight() + " weight." );
}
GraphMessage msg = createGraphMessage(g,c,rank);
//Send to reducer
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oo = new ObjectOutputStream(bos);
oo.writeObject(msg);
context.write(new Text("one"),new BytesWritable(bos.toByteArray()));
} catch (Exception e) {
e.printStackTrace();
throw new InterruptedException(e.toString());
}
}
@Override
public String getVertexValueAsString(Text vertexValue) {
return vertexValue.toString();
}
@Override
public Tuple getNext() throws IOException {
if (!mRequiredColumnsInitialized) {
if (signature != null) {
mRequiredColumns = (boolean[]) ObjectSerializer.deserialize(getUdfProperty(REQUIRED_COLUMNS));
}
mRequiredColumnsInitialized = true;
}
if (reader == null) {
return null;
}
if (serializer == null) {
serializer = new AegisthusSerializer();
}
try {
while (reader.nextKeyValue()) {
Text value = (Text) reader.getCurrentValue();
String s = value.toString();
if (s.contains("\t")) {
s = s.split("\t")[1];
}
Map<String, Object> map = serializer.deserialize(s);
if (clean) {
cleanse(map);
// when clean if we have an empty row we will ignore it. The
// map will be size 2 because it will only
// have the key and the deleted ts
// TODO: only remove row if it is empty and is deleted.
if (map.size() == 2) {
continue;
}
}
return tuple(map);
}
} catch (InterruptedException e) {
// ignore
}
return null;
}
@Override
public void reduce(
final Text key,
final Iterable<CountofDoubleWritable> values,
final Reducer<Text, CountofDoubleWritable, GeoWaveOutputKey, DistortionEntry>.Context context)
throws IOException, InterruptedException {
double expectation = 0.0;
final List<AnalyticItemWrapper<Object>> centroids =
centroidManager.getCentroidsForGroup(key.toString());
// it is possible that the number of items in a group are smaller
// than the cluster
final Integer kCount;
if (expectedK == null) {
kCount = centroids.size();
} else {
kCount = expectedK;
}
if (centroids.size() == 0) {
return;
}
final double numDimesions = 2 + centroids.get(0).getExtraDimensions().length;
double ptCount = 0;
for (final CountofDoubleWritable value : values) {
expectation += value.getValue();
ptCount += value.getCount();
}
if (ptCount > 0) {
expectation /= ptCount;
final Double distortion = Math.pow(expectation / numDimesions, -(numDimesions / 2));
final DistortionEntry entry =
new DistortionEntry(key.toString(), batchId, kCount, distortion);
context.write(
new GeoWaveOutputKey(
DistortionDataAdapter.ADAPTER_TYPE_NAME,
DistortionGroupManagement.DISTORTIONS_INDEX_ARRAY),
entry);
}
}
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
if (sortSpec == null) {
return super.compare(b1, s1, l1, b2, s2, l2);
}
try {
Text logline1 = new Text();
logline1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1)));
String line1 = logline1.toString();
String[] logColumns1 = line1.split(columnSeparator);
Text logline2 = new Text();
logline2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2)));
String line2 = logline2.toString();
String[] logColumns2 = line2.split(columnSeparator);
if (logColumns1 == null || logColumns2 == null) {
return super.compare(b1, s1, l1, b2, s2, l2);
}
//Compare column-wise according to *sortSpec*
for(int i=0; i < sortSpec.length; ++i) {
int column = (Integer.valueOf(sortSpec[i]).intValue());
String c1 = logColumns1[column];
String c2 = logColumns2[column];
//Compare columns
int comparision = super.compareBytes(
c1.getBytes(), 0, c1.length(),
c2.getBytes(), 0, c2.length()
);
//They differ!
if (comparision != 0) {
return comparision;
}
}
} catch (IOException ioe) {
LOG.fatal("Caught " + ioe);
return 0;
}
return 0;
}