下面列出了org.apache.hadoop.io.Text#write ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public WritableSortable(int j) throws IOException {
seed = r.nextLong();
r.setSeed(seed);
Text t = new Text();
StringBuilder sb = new StringBuilder();
indices = new int[j];
offsets = new int[j];
check = new String[j];
DataOutputBuffer dob = new DataOutputBuffer();
for (int i = 0; i < j; ++i) {
indices[i] = i;
offsets[i] = dob.getLength();
genRandom(t, r.nextInt(15) + 1, sb);
t.write(dob);
check[i] = t.toString();
}
eob = dob.getLength();
bytes = dob.getData();
comparator = WritableComparator.get(Text.class);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(start);
out.writeLong(length);
Text forestIdText = new Text(forestId.toByteArray());
forestIdText.write(out);
if (hostName != null && hostName.length > 0) {
Text.writeString(out, hostName[0]);
}
out.writeBoolean(isLastSplit);
int replicaSize = (replicas != null) ? replicas.size() : 0;
out.writeInt(replicaSize);
for (int i=0; i < replicaSize; i++) {
Text.writeString(out, replicas.get(i).getForest());
Text.writeString(out, replicas.get(i).getHostName());
}
}
public WritableSortable(int j) throws IOException {
seed = r.nextLong();
r.setSeed(seed);
Text t = new Text();
StringBuilder sb = new StringBuilder();
indices = new int[j];
offsets = new int[j];
check = new String[j];
DataOutputBuffer dob = new DataOutputBuffer();
for (int i = 0; i < j; ++i) {
indices[i] = i;
offsets[i] = dob.getLength();
genRandom(t, r.nextInt(15) + 1, sb);
t.write(dob);
check[i] = t.toString();
}
eob = dob.getLength();
bytes = dob.getData();
comparator = WritableComparator.get(Text.class);
}
@Test
public void testDeserialize() throws IOException {
// Use Hadoop's serializer, verify our deserializer can read the string back
for (String textToSerialize : textsToSerialize) {
ByteArrayOutputStream bOs = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(bOs);
Text hadoopText = new Text();
hadoopText.set(textToSerialize);
hadoopText.write(dataOutputStream);
dataOutputStream.close();
ByteArrayInputStream bIn = new ByteArrayInputStream(bOs.toByteArray());
DataInputStream dataInputStream = new DataInputStream(bIn);
String deserializedString = TextSerializer.readTextAsString(dataInputStream);
Assert.assertEquals(deserializedString, textToSerialize);
}
}
public WritableSortable(int j) throws IOException {
seed = r.nextLong();
r.setSeed(seed);
Text t = new Text();
StringBuffer sb = new StringBuffer();
indices = new int[j];
offsets = new int[j];
check = new String[j];
DataOutputBuffer dob = new DataOutputBuffer();
for (int i = 0; i < j; ++i) {
indices[i] = i;
offsets[i] = dob.getLength();
genRandom(t, r.nextInt(15) + 1, sb);
t.write(dob);
check[i] = t.toString();
}
eob = dob.getLength();
bytes = dob.getData();
comparator = WritableComparator.get(Text.class);
}
public WritableSortable(int j) throws IOException {
seed = r.nextLong();
r.setSeed(seed);
Text t = new Text();
StringBuffer sb = new StringBuffer();
indices = new int[j];
offsets = new int[j];
check = new String[j];
DataOutputBuffer dob = new DataOutputBuffer();
for (int i = 0; i < j; ++i) {
indices[i] = i;
offsets[i] = dob.getLength();
genRandom(t, r.nextInt(15) + 1, sb);
t.write(dob);
check[i] = t.toString();
}
eob = dob.getLength();
bytes = dob.getData();
comparator = WritableComparator.get(Text.class);
}
@Override
public void write(DataOutput paramDataOutput)
throws IOException
{
IntWritable localIntWritable = new IntWritable();
localIntWritable.set(this.int1);
localIntWritable.write(paramDataOutput);
Text localText = new Text();
localText.set(this.circle);
localText.write(paramDataOutput);
}
public void write(DataOutput dataOutput) throws IOException {
Text text = new Text(wifiProb==null?"":wifiProb);
text.write(dataOutput);
LongWritable longWritable = new LongWritable();
longWritable.set(hour);
longWritable.write(dataOutput);
longWritable.set(newCustomer);
longWritable.write(dataOutput);
longWritable.set(oldCustomer);
longWritable.write(dataOutput);
}
public void write(DataOutput dataOutput) throws IOException {
Text text = new Text(wifiProb==null?"":wifiProb);
text.write(dataOutput);
IntWritable intWritable = new IntWritable();
intWritable.set(inNoOutWifi);
intWritable.write(dataOutput);
intWritable.set(inNoOutStore);
intWritable.write(dataOutput);
intWritable.set(outNoInWifi);
intWritable.write(dataOutput);
intWritable.set(outNoInStore);
intWritable.write(dataOutput);
intWritable.set(inAndOutWifi);
intWritable.write(dataOutput);
intWritable.set(inAndOutStore);
intWritable.write(dataOutput);
intWritable.set(stayInWifi);
intWritable.write(dataOutput);
intWritable.set(stayInStore);
intWritable.write(dataOutput);
DoubleWritable doubleWritable = new DoubleWritable();
doubleWritable.set(jumpRate);
doubleWritable.write(dataOutput);
doubleWritable.set(deepVisit);
doubleWritable.write(dataOutput);
doubleWritable.set(inStoreRate);
doubleWritable.write(dataOutput);
}
@Override
public ByteBuffer serialize(Text value) {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
ObjectOutputStream oout;
try {
oout = new ObjectOutputStream(bout);
value.write(oout);
oout.close();
bout.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
return ByteBuffer.wrap(bout.toByteArray());
}
@Override
public void write(DataOutput out) throws IOException {
if (graphUri == null) {
out.writeByte(0);
} else {
out.writeByte(1);
Text t = new Text(graphUri);
t.write(out);
}
out.writeByte(type);
if (value instanceof Text) {
((Text) value).write(out);
} else if (value instanceof MarkLogicNode) {
((MarkLogicNode) value).write(out);
} else if (value instanceof BytesWritable) {
((BytesWritable) value).write(out);
}
//serialize permissions
if (permissions == null) {
out.writeByte(0);
} else {
out.writeByte(permissions.length);
for(int i=0; i<permissions.length; i++) {
Text role = new Text(permissions[i].getRole());
Text cap = new Text(permissions[i].getCapability().toString());
role.write(out);
cap.write(out);
}
}
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(this.values.size());
for (Text text : values) {
text.write(dataOutput);
}
}
@Override
public void write(DataOutput out) throws IOException {
Text text = new Text();
text.set(this.jobId);
text.write(out);
text.set(this.taskId);
text.write(out);
out.writeLong(this.startTime);
out.writeLong(this.endTime);
out.writeLong(this.duration);
super.write(out);
}
@Override
public DataInputBuffer getKey() throws IOException {
DataInputBuffer key = new DataInputBuffer();
Text text = new Text(keys[i]);
DataOutputBuffer out = new DataOutputBuffer();
text.write(out);
key.reset(out.getData(), out.getLength());
return key;
}
@Override
public void write(DataOutput out) throws IOException {
// 0. Timestamp
Text tms_text = new Text(tms);
tms_text.write(out);
// 1. num, int1, int2
IntWritable intw = new IntWritable();
for (int i = 0; i < num.length; i++) {
intw.set(num[i]);
intw.write(out);
}
intw.set(int1);
intw.write(out);
intw.set(int2);
intw.write(out);
// 2. st1
Text txt = new Text();
for (int i = 0; i < strings.length; i++) {
txt.set(strings[i]);
txt.write(out);
}
txt.set(st1);
txt.write(out);
// 3. doubles
DoubleWritable dw = new DoubleWritable();
for (int i = 0; i < dubs.length; i++) {
dw.set(dubs[i]);
dw.write(out);
}
dw.set(db);
dw.write(out);
// 4. floats
FloatWritable fw = new FloatWritable();
for (int i = 0; i < fts.length; i++) {
fw.set(fts[i]);
fw.write(out);
}
fw.set(ft);
fw.write(out);
// 5. longs
LongWritable lw = new LongWritable();
for (int i = 0; i < lngs.length; i++) {
lw.set(lngs[i]);
lw.write(out);
}
lw.set(lng);
lw.write(out);
// 6. booleans
BooleanWritable bw = new BooleanWritable();
for (int i = 0; i < bools.length; ++i) {
bw.set(bools[i]);
bw.write(out);
}
bw.set(bool);
bw.write(out);
// 7. shorts
ShortWritable sw = new ShortWritable();
for (int i = 0; i < shrts.length; ++i) {
sw.set(shrts[i]);
sw.write(out);
}
sw.set(shrt);
sw.write(out);
// 8. bytes
// BytesWritable btsw = new BytesWritable(bts);
// btsw.write(out);
BytesWritable btsw = new BytesWritable();
btsw.setCapacity(bts.length);
btsw.setSize(bts.length);
btsw.set(bts, 0, bts.length);
btsw.write(out);
}
/**
* Use the input splits to take samples of the input and generate sample
* keys. By default reads 100,000 keys from 10 locations in the input, sorts
* them and picks N-1 keys to generate N equally sized partitions.
* @param job the job to sample
* @param partFile where to write the output file to
* @throws Throwable if something goes wrong
*/
public static void writePartitionFile(final JobContext job,
Path partFile) throws Throwable {
long t1 = System.currentTimeMillis();
Configuration conf = job.getConfiguration();
final TeraInputFormat inFormat = new TeraInputFormat();
final TextSampler sampler = new TextSampler();
int partitions = job.getNumReduceTasks();
long sampleSize = conf.getLong(SAMPLE_SIZE, 100000);
final List<InputSplit> splits = inFormat.getSplits(job);
long t2 = System.currentTimeMillis();
System.out.println("Computing input splits took " + (t2 - t1) + "ms");
int samples = Math.min(conf.getInt(NUM_PARTITIONS, 10), splits.size());
System.out.println("Sampling " + samples + " splits of " + splits.size());
final long recordsPerSample = sampleSize / samples;
final int sampleStep = splits.size() / samples;
Thread[] samplerReader = new Thread[samples];
SamplerThreadGroup threadGroup = new SamplerThreadGroup("Sampler Reader Thread Group");
// take N samples from different parts of the input
for(int i=0; i < samples; ++i) {
final int idx = i;
samplerReader[i] =
new Thread (threadGroup,"Sampler Reader " + idx) {
{
setDaemon(true);
}
public void run() {
long records = 0;
try {
TaskAttemptContext context = new TaskAttemptContextImpl(
job.getConfiguration(), new TaskAttemptID());
RecordReader<Text, Text> reader =
inFormat.createRecordReader(splits.get(sampleStep * idx),
context);
reader.initialize(splits.get(sampleStep * idx), context);
while (reader.nextKeyValue()) {
sampler.addKey(new Text(reader.getCurrentKey()));
records += 1;
if (recordsPerSample <= records) {
break;
}
}
} catch (IOException ie){
System.err.println("Got an exception while reading splits " +
StringUtils.stringifyException(ie));
throw new RuntimeException(ie);
} catch (InterruptedException e) {
}
}
};
samplerReader[i].start();
}
FileSystem outFs = partFile.getFileSystem(conf);
DataOutputStream writer = outFs.create(partFile, true, 64*1024, (short) 10,
outFs.getDefaultBlockSize(partFile));
for (int i = 0; i < samples; i++) {
try {
samplerReader[i].join();
if(threadGroup.getThrowable() != null){
throw threadGroup.getThrowable();
}
} catch (InterruptedException e) {
}
}
for(Text split : sampler.createPartitions(partitions)) {
split.write(writer);
}
writer.close();
long t3 = System.currentTimeMillis();
System.out.println("Computing parititions took " + (t3 - t2) + "ms");
}
/**
* Use the input splits to take samples of the input and generate sample
* keys. By default reads 100,000 keys from 10 locations in the input, sorts
* them and picks N-1 keys to generate N equally sized partitions.
* @param job the job to sample
* @param partFile where to write the output file to
* @throws Throwable if something goes wrong
*/
public static void writePartitionFile(final JobContext job,
Path partFile) throws Throwable {
long t1 = System.currentTimeMillis();
Configuration conf = job.getConfiguration();
//Instead of reading from hdfs, now the input is from Pravega stream
final PravegaInputFormat inFormat = new PravegaInputFormat();
final TextSampler sampler = new TextSampler();
int partitions = job.getNumReduceTasks();
long sampleSize =
conf.getLong(TeraSortConfigKeys.SAMPLE_SIZE.key(),
TeraSortConfigKeys.DEFAULT_SAMPLE_SIZE);
final List<InputSplit> splits = inFormat.getSplits(job);
long t2 = System.currentTimeMillis();
System.out.println("Computing input splits took " + (t2 - t1) + "ms");
int samples =
Math.min(conf.getInt(TeraSortConfigKeys.NUM_PARTITIONS.key(),
TeraSortConfigKeys.DEFAULT_NUM_PARTITIONS),
splits.size());
System.out.println("Sampling " + samples + " splits of " + splits.size());
final long recordsPerSample = sampleSize / samples;
final int sampleStep = splits.size() / samples;
Thread[] samplerReader = new Thread[samples];
SamplerThreadGroup threadGroup = new SamplerThreadGroup("Sampler Reader Thread Group");
// take N samples from different parts of the input
for(int i=0; i < samples; ++i) {
final int idx = i;
samplerReader[i] =
new Thread (threadGroup,"Sampler Reader " + idx) {
{
setDaemon(true);
}
public void run() {
long records = 0;
try {
TaskAttemptContext context = new TaskAttemptContextImpl(
job.getConfiguration(), new TaskAttemptID());
RecordReader<Text, Text> reader =
inFormat.createRecordReader(splits.get(sampleStep * idx),
context);
reader.initialize(splits.get(sampleStep * idx), context);
while (reader.nextKeyValue()) {
sampler.addKey(new Text(reader.getCurrentValue().toString().substring(0, 10)));
records += 1;
if (recordsPerSample <= records) {
break;
}
}
} catch (IOException ie){
System.err.println("Got an exception while reading splits " +
StringUtils.stringifyException(ie));
throw new RuntimeException(ie);
} catch (InterruptedException e) {
}
}
};
samplerReader[i].start();
}
FileSystem outFs = partFile.getFileSystem(conf);
DataOutputStream writer = outFs.create(partFile, true, 64*1024, (short) 10,
outFs.getDefaultBlockSize(partFile));
for (int i = 0; i < samples; i++) {
try {
samplerReader[i].join();
if(threadGroup.getThrowable() != null){
throw threadGroup.getThrowable();
}
} catch (InterruptedException e) {
}
}
for(Text split : sampler.createPartitions(partitions)) {
split.write(writer);
}
writer.close();
long t3 = System.currentTimeMillis();
System.out.println("Computing parititions took " + (t3 - t2) + "ms");
}
/**
* Use the input splits to take samples of the input and generate sample
* keys. By default reads 100,000 keys from 10 locations in the input, sorts
* them and picks N-1 keys to generate N equally sized partitions.
* @param job the job to sample
* @param partFile where to write the output file to
* @throws Throwable if something goes wrong
*/
public static void writePartitionFile(final JobContext job,
Path partFile) throws Throwable {
long t1 = System.currentTimeMillis();
Configuration conf = job.getConfiguration();
final TeraInputFormat inFormat = new TeraInputFormat();
final TextSampler sampler = new TextSampler();
int partitions = job.getNumReduceTasks();
long sampleSize = conf.getLong(SAMPLE_SIZE, 100000);
final List<InputSplit> splits = inFormat.getSplits(job);
long t2 = System.currentTimeMillis();
System.out.println("Computing input splits took " + (t2 - t1) + "ms");
int samples = Math.min(conf.getInt(NUM_PARTITIONS, 10), splits.size());
System.out.println("Sampling " + samples + " splits of " + splits.size());
final long recordsPerSample = sampleSize / samples;
final int sampleStep = splits.size() / samples;
Thread[] samplerReader = new Thread[samples];
SamplerThreadGroup threadGroup = new SamplerThreadGroup("Sampler Reader Thread Group");
// take N samples from different parts of the input
for(int i=0; i < samples; ++i) {
final int idx = i;
samplerReader[i] =
new Thread (threadGroup,"Sampler Reader " + idx) {
{
setDaemon(true);
}
public void run() {
long records = 0;
try {
TaskAttemptContext context = new TaskAttemptContextImpl(
job.getConfiguration(), new TaskAttemptID());
RecordReader<Text, Text> reader =
inFormat.createRecordReader(splits.get(sampleStep * idx),
context);
reader.initialize(splits.get(sampleStep * idx), context);
while (reader.nextKeyValue()) {
sampler.addKey(new Text(reader.getCurrentKey()));
records += 1;
if (recordsPerSample <= records) {
break;
}
}
} catch (IOException ie){
System.err.println("Got an exception while reading splits " +
StringUtils.stringifyException(ie));
throw new RuntimeException(ie);
} catch (InterruptedException e) {
}
}
};
samplerReader[i].start();
}
FileSystem outFs = partFile.getFileSystem(conf);
DataOutputStream writer = outFs.create(partFile, true, 64*1024, (short) 10,
outFs.getDefaultBlockSize(partFile));
for (int i = 0; i < samples; i++) {
try {
samplerReader[i].join();
if(threadGroup.getThrowable() != null){
throw threadGroup.getThrowable();
}
} catch (InterruptedException e) {
}
}
for(Text split : sampler.createPartitions(partitions)) {
split.write(writer);
}
writer.close();
long t3 = System.currentTimeMillis();
System.out.println("Computing parititions took " + (t3 - t2) + "ms");
}
/**
* Use the input splits to take samples of the input and generate sample
* keys. By default reads 100,000 keys from 10 locations in the input, sorts
* them and picks N-1 keys to generate N equally sized partitions.
* @param job the job to sample
* @param partFile where to write the output file to
* @throws Throwable if something goes wrong
*/
public static void writePartitionFile(final JobContext job,
Path partFile) throws Throwable {
long t1 = System.currentTimeMillis();
Configuration conf = job.getConfiguration();
final TeraInputFormat inFormat = new TeraInputFormat();
final TextSampler sampler = new TextSampler();
int partitions = job.getNumReduceTasks();
long sampleSize = conf.getLong(SAMPLE_SIZE, 100000);
final List<InputSplit> splits = inFormat.getSplits(job);
long t2 = System.currentTimeMillis();
System.out.println("Computing input splits took " + (t2 - t1) + "ms");
int samples = Math.min(conf.getInt(NUM_PARTITIONS, 10), splits.size());
System.out.println("Sampling " + samples + " splits of " + splits.size());
final long recordsPerSample = sampleSize / samples;
final int sampleStep = splits.size() / samples;
Thread[] samplerReader = new Thread[samples];
SamplerThreadGroup threadGroup = new SamplerThreadGroup("Sampler Reader Thread Group");
// take N samples from different parts of the input
for(int i=0; i < samples; ++i) {
final int idx = i;
samplerReader[i] =
new Thread (threadGroup,"Sampler Reader " + idx) {
{
setDaemon(true);
}
public void run() {
long records = 0;
try {
TaskAttemptContext context = new TaskAttemptContextImpl(
job.getConfiguration(), new TaskAttemptID());
RecordReader<Text, Text> reader =
inFormat.createRecordReader(splits.get(sampleStep * idx),
context);
reader.initialize(splits.get(sampleStep * idx), context);
while (reader.nextKeyValue()) {
sampler.addKey(new Text(reader.getCurrentKey()));
records += 1;
if (recordsPerSample <= records) {
break;
}
}
} catch (IOException ie){
System.err.println("Got an exception while reading splits " +
StringUtils.stringifyException(ie));
throw new RuntimeException(ie);
} catch (InterruptedException e) {
}
}
};
samplerReader[i].start();
}
FileSystem outFs = partFile.getFileSystem(conf);
DataOutputStream writer = outFs.create(partFile, true, 64*1024, (short) 10,
outFs.getDefaultBlockSize(partFile));
for (int i = 0; i < samples; i++) {
try {
samplerReader[i].join();
if(threadGroup.getThrowable() != null){
throw threadGroup.getThrowable();
}
} catch (InterruptedException e) {
}
}
for(Text split : sampler.createPartitions(partitions)) {
split.write(writer);
}
writer.close();
long t3 = System.currentTimeMillis();
System.out.println("Computing parititions took " + (t3 - t2) + "ms");
}