下面列出了怎么用com.mongodb.client.gridfs.GridFSBucket的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public InputStream getAssociatedDocumentStream(String uniqueId, String fileName) {
GridFSBucket gridFS = createGridFSConnection();
GridFSFile file = gridFS.find(new Document(ASSOCIATED_METADATA + "." + FILE_UNIQUE_ID_KEY, getGridFsId(uniqueId, fileName))).first();
if (file == null) {
return null;
}
InputStream is = gridFS.openDownloadStream(file.getObjectId());
;
Document metadata = file.getMetadata();
if (metadata.containsKey(COMPRESSED_FLAG)) {
boolean compressed = (boolean) metadata.remove(COMPRESSED_FLAG);
if (compressed) {
is = new InflaterInputStream(is);
}
}
return is;
}
public boolean fileHasProperties(String name, String bucketName, Map<String, String> attrs) {
GridFSBucket bucket = GridFSBuckets.create(client.getDatabase(DB), bucketName);
MongoCursor it = bucket.find(Document.parse(String.format("{ \"filename\": \"%s\" }", name))).iterator();
boolean retVal = false;
if (it.hasNext()) {
GridFSFile file = (GridFSFile)it.next();
Document metadata = file.getMetadata();
if (metadata != null && metadata.size() == attrs.size()) {
retVal = true;
for (Map.Entry<String, Object> entry : metadata.entrySet()) {
Object val = attrs.get(entry.getKey());
if (val == null || !entry.getValue().equals(val)) {
retVal = false;
break;
}
}
}
}
it.close();
return retVal;
}
@Override
public void deleteAllDocuments() {
GridFSBucket gridFS = createGridFSConnection();
gridFS.drop();
MongoDatabase db = mongoClient.getDatabase(database);
MongoCollection<Document> coll = db.getCollection(rawCollectionName);
coll.deleteMany(new Document());
}
@Override
public List<AssociatedDocument> getAssociatedDocuments(String uniqueId, FetchType fetchType) throws Exception {
GridFSBucket gridFS = createGridFSConnection();
List<AssociatedDocument> assocDocs = new ArrayList<>();
if (!FetchType.NONE.equals(fetchType)) {
GridFSFindIterable files = gridFS.find(new Document(ASSOCIATED_METADATA + "." + DOCUMENT_UNIQUE_ID_KEY, uniqueId));
for (GridFSFile file : files) {
AssociatedDocument ad = loadGridFSToAssociatedDocument(gridFS, file, fetchType);
assocDocs.add(ad);
}
}
return assocDocs;
}
@Override
public AssociatedDocument getAssociatedDocument(String uniqueId, String fileName, FetchType fetchType) throws Exception {
GridFSBucket gridFS = createGridFSConnection();
if (!FetchType.NONE.equals(fetchType)) {
GridFSFile file = gridFS.find(new Document(ASSOCIATED_METADATA + "." + FILE_UNIQUE_ID_KEY, getGridFsId(uniqueId, fileName))).first();
if (null != file) {
return loadGridFSToAssociatedDocument(gridFS, file, fetchType);
}
}
return null;
}
private AssociatedDocument loadGridFSToAssociatedDocument(GridFSBucket gridFS, GridFSFile file, FetchType fetchType) throws IOException {
AssociatedDocument.Builder aBuilder = AssociatedDocument.newBuilder();
aBuilder.setFilename(file.getFilename());
Document metadata = file.getMetadata();
boolean compressed = false;
if (metadata.containsKey(COMPRESSED_FLAG)) {
compressed = (boolean) metadata.remove(COMPRESSED_FLAG);
}
long timestamp = (long) metadata.remove(TIMESTAMP);
aBuilder.setCompressed(compressed);
aBuilder.setTimestamp(timestamp);
aBuilder.setDocumentUniqueId((String) metadata.remove(DOCUMENT_UNIQUE_ID_KEY));
for (String field : metadata.keySet()) {
aBuilder.addMetadata(Metadata.newBuilder().setKey(field).setValue((String) metadata.get(field)));
}
if (FetchType.FULL.equals(fetchType)) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
gridFS.downloadToStream(file.getObjectId(), byteArrayOutputStream);
byte[] bytes = byteArrayOutputStream.toByteArray();
if (null != bytes) {
if (compressed) {
bytes = CommonCompression.uncompressZlib(bytes);
}
aBuilder.setDocument(ByteString.copyFrom(bytes));
}
}
aBuilder.setIndexName(indexName);
return aBuilder.build();
}
@Override
public List<String> getAssociatedFilenames(String uniqueId) throws Exception {
GridFSBucket gridFS = createGridFSConnection();
ArrayList<String> fileNames = new ArrayList<>();
gridFS.find(new Document(ASSOCIATED_METADATA + "." + DOCUMENT_UNIQUE_ID_KEY, uniqueId))
.forEach((Consumer<com.mongodb.client.gridfs.model.GridFSFile>) gridFSFile -> fileNames.add(gridFSFile.getFilename()));
return fileNames;
}
@Override
public void deleteAssociatedDocument(String uniqueId, String fileName) {
GridFSBucket gridFS = createGridFSConnection();
gridFS.find(new Document(ASSOCIATED_METADATA + "." + FILE_UNIQUE_ID_KEY, getGridFsId(uniqueId, fileName)))
.forEach((Block<com.mongodb.client.gridfs.model.GridFSFile>) gridFSFile -> gridFS.delete(gridFSFile.getObjectId()));
}
protected GridFSBucket getBucket(FlowFile input, ProcessContext context) {
final String name = getBucketName(input, context);
if (StringUtils.isEmpty(name)) {
return GridFSBuckets.create(getDatabase(input, context));
} else {
return GridFSBuckets.create(getDatabase(input, context), name);
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile input = session.get();
if (input == null) {
return;
}
final String deleteQuery = getQuery(context, input);
final String queryAttribute = context.getProperty(QUERY_ATTRIBUTE).isSet()
? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
: null;
GridFSBucket bucket = getBucket(input, context);
try {
Document query = Document.parse(deleteQuery);
MongoCursor cursor = bucket.find(query).iterator();
if (cursor.hasNext()) {
GridFSFile file = (GridFSFile)cursor.next();
bucket.delete(file.getObjectId());
if (!StringUtils.isEmpty(queryAttribute)) {
input = session.putAttribute(input, queryAttribute, deleteQuery);
}
session.transfer(input, REL_SUCCESS);
} else {
getLogger().error(String.format("Query %s did not delete anything in %s", deleteQuery, bucket.getBucketName()));
session.transfer(input, REL_FAILURE);
}
cursor.close();
} catch (Exception ex) {
getLogger().error(String.format("Error deleting using query: %s", deleteQuery), ex);
session.transfer(input, REL_FAILURE);
}
}
private void handleFile(GridFSBucket bucket, ProcessSession session, ProcessContext context, FlowFile parent, GridFSFile input, String query) {
Map<String, String> attrs = new HashMap<>();
attrs.put(METADATA_ATTRIBUTE, input.getMetadata() != null ? input.getMetadata().toJson() : "{}");
if (context.getProperty(QUERY_ATTRIBUTE).isSet()) {
String key = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(parent).getValue();
attrs.put(key, query);
}
attrs.put(CoreAttributes.FILENAME.key(), input.getFilename());
FlowFile output = parent != null ? session.create(parent) : session.create();
output = session.write(output, out -> bucket.downloadToStream(input.getObjectId(), out));
output = session.putAllAttributes(output, attrs);
session.transfer(output, REL_SUCCESS);
session.getProvenanceReporter().receive(output, getTransitUri(input.getObjectId(), output, context));
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile input = session.get();
if (input == null) {
return;
}
GridFSBucket bucket = getBucket(input, context);
if (!canUploadFile(context, input, bucket.getBucketName())) {
getLogger().error("Cannot upload the file because of the uniqueness policy configured.");
session.transfer(input, REL_DUPLICATE);
return;
}
final int chunkSize = context.getProperty(CHUNK_SIZE).evaluateAttributeExpressions(input).asDataSize(DataUnit.B).intValue();
try (InputStream fileInput = session.read(input)) {
String fileName = context.getProperty(FILE_NAME).evaluateAttributeExpressions(input).getValue();
GridFSUploadOptions options = new GridFSUploadOptions()
.chunkSizeBytes(chunkSize)
.metadata(getMetadata(input, context));
ObjectId id = bucket.uploadFromStream(fileName, fileInput, options);
fileInput.close();
if (id != null) {
input = session.putAttribute(input, ID_ATTRIBUTE, id.toString());
session.transfer(input, REL_SUCCESS);
session.getProvenanceReporter().send(input, getTransitUri(id, input, context));
} else {
getLogger().error("ID was null, assuming failure.");
session.transfer(input, REL_FAILURE);
}
} catch (Exception ex) {
getLogger().error("Failed to upload file", ex);
session.transfer(input, REL_FAILURE);
}
}
public boolean fileExists(String name, String bucketName) {
GridFSBucket bucket = GridFSBuckets.create(client.getDatabase(DB), bucketName);
MongoCursor it = bucket.find(Document.parse(String.format("{ \"filename\": \"%s\" }", name))).iterator();
boolean retVal = it.hasNext();
it.close();
return retVal;
}
public ObjectId writeTestFile(String fileName, String content, String bucketName, Map<String, Object> attrs) {
GridFSBucket bucket = GridFSBuckets.create(client.getDatabase(DB), bucketName);
GridFSUploadOptions options = new GridFSUploadOptions().metadata(new Document(attrs));
ByteArrayInputStream input = new ByteArrayInputStream(content.getBytes());
ObjectId retVal = bucket.uploadFromStream(fileName, input, options);
return retVal;
}
private GridFSBucket createGridFSConnection() {
MongoDatabase db = mongoClient.getDatabase(database);
return GridFSBuckets.create(db, ASSOCIATED_FILES);
}
public void getAssociatedDocuments(OutputStream outputstream, Document filter) throws IOException {
Charset charset = Charset.forName("UTF-8");
GridFSBucket gridFS = createGridFSConnection();
GridFSFindIterable gridFSFiles = gridFS.find(filter);
outputstream.write("{\n".getBytes(charset));
outputstream.write(" \"associatedDocs\": [\n".getBytes(charset));
boolean first = true;
for (GridFSFile gridFSFile : gridFSFiles) {
if (first) {
first = false;
}
else {
outputstream.write(",\n".getBytes(charset));
}
Document metadata = gridFSFile.getMetadata();
String uniqueId = metadata.getString(DOCUMENT_UNIQUE_ID_KEY);
String uniquieIdKeyValue = " { \"uniqueId\": \"" + uniqueId + "\", ";
outputstream.write(uniquieIdKeyValue.getBytes(charset));
String filename = gridFSFile.getFilename();
String filenameKeyValue = "\"filename\": \"" + filename + "\", ";
outputstream.write(filenameKeyValue.getBytes(charset));
Date uploadDate = gridFSFile.getUploadDate();
String uploadDateKeyValue = "\"uploadDate\": {\"$date\":" + uploadDate.getTime() + "}";
outputstream.write(uploadDateKeyValue.getBytes(charset));
metadata.remove(TIMESTAMP);
metadata.remove(COMPRESSED_FLAG);
metadata.remove(DOCUMENT_UNIQUE_ID_KEY);
metadata.remove(FILE_UNIQUE_ID_KEY);
if (!metadata.isEmpty()) {
String metaJson = metadata.toJson();
String metaString = ", \"meta\": " + metaJson;
outputstream.write(metaString.getBytes(charset));
}
outputstream.write(" }".getBytes(charset));
}
outputstream.write("\n ]\n}".getBytes(charset));
}
@Override
public void deleteAssociatedDocuments(String uniqueId) {
GridFSBucket gridFS = createGridFSConnection();
gridFS.find(new Document(ASSOCIATED_METADATA + "." + DOCUMENT_UNIQUE_ID_KEY, uniqueId))
.forEach((Block<com.mongodb.client.gridfs.model.GridFSFile>) gridFSFile -> gridFS.delete(gridFSFile.getObjectId()));
}