下面列出了怎么用org.apache.hadoop.util.bloom.BloomFilter的API类实例代码及写法,或者点击链接到github查看源代码。
protected void buildAllPhrases(ArrayList<Collection<String>> terms, String zone, RawRecordContainer event, int position, BloomFilter alreadyIndexedTerms,
TaskInputOutputContext<KEYIN,? extends RawRecordContainer,KEYOUT,VALUEOUT> context, ContextWriter<KEYOUT,VALUEOUT> contextWriter,
StatusReporter reporter) throws IOException, InterruptedException {
if (terms.size() < 2) {
// An empty list has no tokens/phrases to emit and phrases of length one
// were already handled
return;
}
StringBuilder sb = new StringBuilder();
for (String term : terms.get(0)) {
if (term.length() <= tokenHelper.getTermLengthMinimum()) {
continue;
}
sb.append(term);
// Need to move the position pointer back by the amount of the phrase lengths
// accounting for zero-indexing
completePhrase(sb, terms.subList(1, terms.size()), zone, event, position - (terms.size() - 1), alreadyIndexedTerms, context, contextWriter,
reporter);
sb.setLength(0);
}
}
@Override
public void add(Key key) {
if (key == null) {
throw new NullPointerException("Key can not be null");
}
org.apache.hadoop.util.bloom.BloomFilter bf = getActiveStandardBF();
if (bf == null) {
addRow();
bf = matrix[matrix.length - 1];
currentNbRecord = 0;
}
bf.add(key);
currentNbRecord++;
}
private void init() throws IOException {
filter = new BloomFilter();
String dir = "./" + getFilenameFromPath(bloomFile);
String[] partFiles = new File(dir)
.list(new FilenameFilter() {
@Override
public boolean accept(File current, String name) {
return name.startsWith("part");
}
});
String dcFile = dir + "/" + partFiles[0];
DataInputStream dis = new DataInputStream(new FileInputStream(dcFile));
try {
filter.readFields(dis);
} finally {
dis.close();
}
}
@Override
public Tuple exec(Tuple input) throws IOException {
if (input == null || input.size() == 0) return null;
// Strip off the initial level of bag
DataBag values = (DataBag)input.get(0);
Iterator<Tuple> it = values.iterator();
Tuple t = it.next();
// If the input tuple has only one field, then we'll extract
// that field and serialize it into a key. If it has multiple
// fields, we'll serialize the whole tuple.
byte[] b;
if (t.size() == 1) b = DataType.toBytes(t.get(0));
else b = DataType.toBytes(t, DataType.TUPLE);
Key k = new Key(b);
filter = new BloomFilter(vSize, numHash, hType);
filter.add(k);
return TupleFactory.getInstance().newTuple(bloomOut());
}
private void completePhrase(StringBuilder baseTerm, List<Collection<String>> terms, String zone, RawRecordContainer event, int position,
BloomFilter alreadyIndexedTerms, TaskInputOutputContext<KEYIN,? extends RawRecordContainer,KEYOUT,VALUEOUT> context,
ContextWriter<KEYOUT,VALUEOUT> contextWriter, StatusReporter reporter) throws IOException, InterruptedException {
if (terms.isEmpty()) {
return;
}
for (String term : terms.get(0)) {
if (term == null) {
continue;
}
boolean properLen = term.length() >= tokenHelper.getTermLengthMinimum();
// Add the current term and emit the phrase if the current term isn't empty
if (properLen) {
baseTerm.append(SPACE).append(term);
counters.increment(ContentIndexCounters.PHRASES_PROCESSED_COUNTER, reporter);
processTermAndZone(event, position, new TermAndZone(baseTerm.toString(), zone), alreadyIndexedTerms, context, contextWriter, reporter);
}
// If we have more terms to add to this phrase, recurse
if (terms.size() > 1) {
completePhrase(baseTerm, terms.subList(1, terms.size()), zone, event, position, alreadyIndexedTerms, context, contextWriter, reporter);
}
// Only remove the space and term if we actually added one
if (properLen) {
// Remove the space and the token we appended last
baseTerm.setLength(baseTerm.length() - 1 - term.length());
}
}
}
@Nonnull
public static BloomFilter newBloomFilter(@Nonnegative final int expectedNumberOfElements,
@Nonnegative final float errorRate) {
// k = ceil(-log_2(false prob.))
int nbHash = Math.max(2, (int) Math.ceil(-(Math.log(errorRate) / LOG2)));
return newBloomFilter(expectedNumberOfElements, errorRate, nbHash);
}
@Nonnull
public static BloomFilter newBloomFilter(@Nonnegative final int expectedNumberOfElements,
@Nonnegative final float errorRate, @Nonnegative final int nbHash) {
// vector size should be `-kn / (ln(1 - c^(1/k)))` bits for
// single key, where `k` is the number of hash functions,
// `n` is the number of keys and `c` is the desired max error rate.
int vectorSize = (int) Math.ceil((-nbHash * expectedNumberOfElements)
/ Math.log(1.d - Math.pow(errorRate, 1.d / nbHash)));
return new BloomFilter(vectorSize, nbHash, Hash.MURMUR_HASH);
}
/**
* Constructor.
* <p>
* Builds an empty Dynamic Bloom filter.
*
* @param vectorSize The number of bits in the vector.
* @param nbHash The number of hash function to consider.
* @param hashType type of the hashing function (see {@link org.apache.hadoop.util.hash.Hash}).
* @param nr The threshold for the maximum number of keys to record in a dynamic Bloom filter row.
*/
public InternalDynamicBloomFilter(int vectorSize, int nbHash, int hashType, int nr, int maxNr) {
super(vectorSize, nbHash, hashType);
this.nr = nr;
this.currentNbRecord = 0;
this.maxNr = maxNr;
matrix = new org.apache.hadoop.util.bloom.BloomFilter[1];
matrix[0] = new org.apache.hadoop.util.bloom.BloomFilter(this.vectorSize, this.nbHash, this.hashType);
}
@Override
public boolean membershipTest(Key key) {
if (key == null) {
return true;
}
for (BloomFilter bloomFilter : matrix) {
if (bloomFilter.membershipTest(key)) {
return true;
}
}
return false;
}
@Override
public String toString() {
StringBuilder res = new StringBuilder();
for (BloomFilter bloomFilter : matrix) {
res.append(bloomFilter);
res.append(Character.LINE_SEPARATOR);
}
return res.toString();
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeInt(nr);
out.writeInt(currentNbRecord);
out.writeInt(matrix.length);
for (BloomFilter bloomFilter : matrix) {
bloomFilter.write(out);
}
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
nr = in.readInt();
currentNbRecord = in.readInt();
int len = in.readInt();
matrix = new org.apache.hadoop.util.bloom.BloomFilter[len];
for (int i = 0; i < matrix.length; i++) {
matrix[i] = new org.apache.hadoop.util.bloom.BloomFilter();
matrix[i].readFields(in);
}
}
/**
* Adds a new row to <i>this</i> dynamic Bloom filter.
*/
private void addRow() {
BloomFilter[] tmp = new BloomFilter[matrix.length + 1];
System.arraycopy(matrix, 0, tmp, 0, matrix.length);
tmp[tmp.length - 1] = new BloomFilter(vectorSize, nbHash, hashType);
matrix = tmp;
}
/**
* Returns the active standard Bloom filter in <i>this</i> dynamic Bloom filter.
*
* @return BloomFilter The active standard Bloom filter.
* <code>Null</code> otherwise.
*/
private BloomFilter getActiveStandardBF() {
if (reachedMax) {
return matrix[curMatrixIndex++ % matrix.length];
}
if (currentNbRecord >= nr && (matrix.length * nr) < maxNr) {
return null;
} else if (currentNbRecord >= nr && (matrix.length * nr) >= maxNr) {
reachedMax = true;
return matrix[0];
}
return matrix[matrix.length - 1];
}
public static BloomFilter readFromAvro(InputStream is) throws IOException {
DataFileStream<Object> reader =
new DataFileStream<Object>(
is, new GenericDatumReader<Object>());
reader.hasNext();
BloomFilter filter = new BloomFilter();
AvroBytesRecord
.fromGenericRecord((GenericRecord) reader.next(), filter);
IOUtils.closeQuietly(is);
IOUtils.closeQuietly(reader);
return filter;
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Cli cli = Cli.builder().setArgs(args).addOptions(CliCommonOpts.MrIoOpts.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path inputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.INPUT));
Path outputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.OUTPUT));
Configuration conf = super.getConf();
JobConf job = new JobConf(conf);
job.setJarByClass(BloomFilterCreator.class);
job.set(AvroJob.OUTPUT_SCHEMA, AvroBytesRecord.SCHEMA.toString());
job.set(AvroJob.OUTPUT_CODEC, SnappyCodec.class.getName());
job.setInputFormat(KeyValueTextInputFormat.class);
job.setOutputFormat(AvroOutputFormat.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(BloomFilter.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(BloomFilter.class);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
return JobClient.runJob(job).isSuccessful() ? 0 : 1;
}
@Override
public void map(Text key, Text value,
OutputCollector<NullWritable, BloomFilter> output,
Reporter reporter) throws IOException {
System.out.println("K[" + key + "]");
int age = Integer.valueOf(value.toString());
if (age > 30) {
filter.add(new Key(key.toString().getBytes()));
}
collector = output;
}
@Override
public void reduce(NullWritable key, Iterator<BloomFilter> values,
OutputCollector<AvroWrapper<GenericRecord>,
NullWritable> output,
Reporter reporter) throws IOException {
while (values.hasNext()) {
BloomFilter bf = values.next();
filter.or(bf);
System.out.println(filter);
}
collector = output;
}
public static BloomFilter readFromAvro(InputStream is) throws IOException {
DataFileStream<Object> reader =
new DataFileStream<Object>(
is, new GenericDatumReader<Object>());
reader.hasNext();
BloomFilter filter = new BloomFilter();
AvroBytesRecord
.fromGenericRecord((GenericRecord) reader.next(), filter);
IOUtils.closeQuietly(is);
IOUtils.closeQuietly(reader);
return filter;
}
/**
* For testing only, do not use directly.
*/
public void setFilter(DataByteArray dba) throws IOException {
DataInputStream dis = new DataInputStream(new
ByteArrayInputStream(dba.get()));
filter = new BloomFilter();
filter.readFields(dis);
}
protected DataByteArray bloomOr(Tuple input) throws IOException {
filter = new BloomFilter(vSize, numHash, hType);
try {
DataBag values = (DataBag)input.get(0);
for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
Tuple t = it.next();
filter.or(bloomIn((DataByteArray)t.get(0)));
}
} catch (ExecException ee) {
throw new IOException(ee);
}
return bloomOut();
}
protected BloomFilter bloomIn(DataByteArray b) throws IOException {
DataInputStream dis = new DataInputStream(new
ByteArrayInputStream(b.get()));
BloomFilter f = new BloomFilter();
f.readFields(dis);
return f;
}
/**
* Process a term and zone by writting all applicable keys to the context.
*
* @param event
* @param position
* @param termAndZone
* @param alreadyIndexedTerms
* @param context
* @param contextWriter
* @param reporter
* @throws IOException
* @throws InterruptedException
*/
private void processTermAndZone(RawRecordContainer event, int position, TermAndZone termAndZone, BloomFilter alreadyIndexedTerms,
TaskInputOutputContext<KEYIN,? extends RawRecordContainer,KEYOUT,VALUEOUT> context, ContextWriter<KEYOUT,VALUEOUT> contextWriter,
StatusReporter reporter) throws IOException, InterruptedException {
// Make sure the term length is greater than the minimum allowed length
if (termAndZone.term.length() < tokenHelper.getTermLengthMinimum()) {
log.debug("Ignoring token of length " + termAndZone.term.length() + " because it is too short");
counters.increment(ContentIndexCounters.TOO_SHORT_COUNTER, reporter);
return;
}
// Track all tokens (including synonyms) processed
counters.increment(ContentIndexCounters.ALL_PROCESSED_COUNTER, reporter);
// Normalize the term since it won't be auto-normalized through the eventFields map
NormalizedFieldAndValue normFnV = new NormalizedFieldAndValue(termAndZone.zone, termAndZone.term);
Set<NormalizedContentInterface> ncis = this.ingestHelper.normalize(normFnV);
// nfv = (NormalizedFieldAndValue) this.ingestHelper.normalize(nfv);
for (NormalizedContentInterface nci : ncis) {
if (!(nci instanceof NormalizedFieldAndValue)) {
log.warn("Can't handle a " + nci.getClass() + "; must be a NormalizedFieldAndValue.");
}
NormalizedFieldAndValue nfv = (NormalizedFieldAndValue) nci;
byte[] fieldVisibility = getVisibility(event, nfv);
// Build the event column key/value
createShardEventColumn(event, contextWriter, context, nfv, this.shardId, fieldVisibility);
// Create a index normalized variant of the term and zone for indexing purposes
TermAndZone indexedTermAndZone = new TermAndZone(nfv.getIndexedFieldValue(), nfv.getIndexedFieldName());
org.apache.hadoop.util.bloom.Key alreadySeen = null;
if ((alreadyIndexedTerms != null)
&& alreadyIndexedTerms.membershipTest(alreadySeen = new org.apache.hadoop.util.bloom.Key(indexedTermAndZone.getToken().getBytes()))) {
if (log.isDebugEnabled()) {
log.debug("Not creating index mutations for " + termAndZone + " as we've already created mutations for it.");
}
counters.increment(ContentIndexCounters.BLOOM_FILTER_EXISTS, reporter);
} else if ((tokenOffsetCache != null) && tokenOffsetCache.containsKey(indexedTermAndZone)) {
if (log.isDebugEnabled()) {
log.debug("Not creating index mutations for " + termAndZone + " as we've already created mutations for it.");
}
counters.increment(ContentIndexCounters.TOKEN_OFFSET_CACHE_EXISTS, reporter);
} else {
// create the index
createShardIndexColumns(event, contextWriter, context, nfv, this.shardId, fieldVisibility);
if (alreadyIndexedTerms != null) {
alreadyIndexedTerms.add(alreadySeen);
counters.increment(ContentIndexCounters.BLOOM_FILTER_ADDED, reporter);
}
}
// Now add the offset to the token offset queue, and if we overflow then output the overflow
if (tokenOffsetCache != null) {
OffsetList<Integer> overflow = tokenOffsetCache.addOffset(indexedTermAndZone, position);
if (overflow != null) {
// no need to normalize as that was already done upon insertion into the token offset cache
NormalizedFieldAndValue overflowNfv = new NormalizedFieldAndValue(overflow.termAndZone.zone, overflow.termAndZone.term);
byte[] overflowFieldVisibility = getVisibility(event, overflowNfv);
// Build the field index key/value
createTermFrequencyIndex(event, contextWriter, context, this.shardId, overflowNfv, overflow.offsets, overflowFieldVisibility,
this.ingestHelper.getDeleteMode());
counters.increment(ContentIndexCounters.TOKENIZER_OFFSET_CACHE_OVERFLOWS, reporter);
counters.incrementValue(ContentIndexCounters.TOKENIZER_OFFSET_CACHE_POSITIONS_OVERFLOWED, overflow.offsets.size(), reporter);
}
} else {
createTermFrequencyIndex(event, contextWriter, context, this.shardId, nfv, Arrays.asList(position), fieldVisibility,
this.ingestHelper.getDeleteMode());
}
}
}
/**
* Process a term and zone by writting all applicable keys to the context.
*
* @param event
* @param position
* @param term
* @param alreadyIndexedTerms
* @param context
* @param contextWriter
* @param fieldName
* @param fieldNameToken
* @param reporter
* @throws IOException
* @throws InterruptedException
*/
protected void processTerm(RawRecordContainer event, int position, String term, BloomFilter alreadyIndexedTerms,
TaskInputOutputContext<KEYIN,? extends RawRecordContainer,KEYOUT,VALUEOUT> context, ContextWriter<KEYOUT,VALUEOUT> contextWriter,
String fieldName, String fieldNameToken, StatusReporter reporter) throws IOException, InterruptedException {
// Track all tokens (including synonyms) processed
if (context != null) {
counters.increment(ContentIndexCounters.ALL_PROCESSED_COUNTER, reporter);
}
// Normalize the term since it won't be auto-normalized through the eventFields map
NormalizedFieldAndValue normFnV = new NormalizedFieldAndValue(fieldNameToken, term);
Set<NormalizedContentInterface> ncis = this.ingestHelper.normalize(normFnV);
for (NormalizedContentInterface nci : ncis) {
if (!(nci instanceof NormalizedFieldAndValue)) {
log.warn("Can't handle a " + nci.getClass() + "; must be a NormalizedFieldAndValue.");
}
NormalizedFieldAndValue nfv = (NormalizedFieldAndValue) nci;
byte[] fieldVisibility = getVisibility(event, nfv);
// Build the event column key/value
createShardEventColumn(event, contextWriter, context, nfv, this.shardId, fieldVisibility);
// Create a index normalized variant of the term and zone for indexing purposes
TermAndZone indexedTermAndZone = new TermAndZone(nfv.getIndexedFieldValue(), nfv.getIndexedFieldName());
if ((tokenOffsetCache != null) && tokenOffsetCache.containsKey(indexedTermAndZone)) {
if (log.isDebugEnabled()) {
log.debug("Not creating index mutations for " + term + " as we've already created mutations for it.");
}
counters.increment(ContentIndexCounters.TOKEN_OFFSET_CACHE_EXISTS, reporter);
} else {
// create the index
createShardIndexColumns(event, contextWriter, context, nfv, this.shardId, fieldVisibility);
}
// Now add the offset to the token offset queue, and if we overflow then output the overflow
if (tokenOffsetCache != null) {
OffsetList overflow = tokenOffsetCache.addOffset(indexedTermAndZone, position);
if (overflow != null) {
// no need to normalize as that was already done upon insertion into the token offset cache
NormalizedFieldAndValue overflowNfv = new NormalizedFieldAndValue(overflow.termAndZone.zone, overflow.termAndZone.term);
byte[] overflowFieldVisibility = getVisibility(event, overflowNfv);
// Build the field index key/value
createTermFrequencyIndex(event, contextWriter, context, this.shardId, overflowNfv, overflow.offsets, overflowFieldVisibility, false);
counters.increment(ContentIndexCounters.TOKENIZER_OFFSET_CACHE_OVERFLOWS, reporter);
counters.incrementValue(ContentIndexCounters.TOKENIZER_OFFSET_CACHE_POSITIONS_OVERFLOWED, overflow.offsets.size(), reporter);
}
} else {
createTermFrequencyIndex(event, contextWriter, context, this.shardId, nfv, Arrays.asList(position), fieldVisibility, false);
}
}
}
@Nonnull
public static BloomFilter newBloomFilter(@Nonnegative final int expectedNumberOfElements) {
return newBloomFilter(expectedNumberOfElements, DEFAULT_ERROR_RATE);
}
@Override
public void not() {
for (BloomFilter bloomFilter : matrix) {
bloomFilter.not();
}
}
public static BloomFilter fromFile(File f) throws IOException {
return readFromAvro(FileUtils.openInputStream(f));
}
public static BloomFilter fromPath(Configuration config, Path path) throws IOException {
FileSystem hdfs = path.getFileSystem(config);
return readFromAvro(hdfs.open(path));
}
@Override
protected void reduce(NullWritable key, Iterable<BloomFilter> values, Context context) throws IOException, InterruptedException {
for (BloomFilter bf : values) {
filter.or(bf);
}
}
public static BloomFilter fromFile(File f) throws IOException {
return readFromAvro(FileUtils.openInputStream(f));
}