下面列出了怎么用com.mongodb.client.gridfs.model.GridFSFile的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Retrieves an artifact from the store by its SHA1 hash.
*
* @param sha1Hash
* the sha1-hash of the file to lookup.
*
* @return The DbArtifact object or {@code null} if no file exists.
*/
@Override
public AbstractDbArtifact getArtifactBySha1(final String tenant, final String sha1Hash) {
try {
GridFSFile found = gridFs.findOne(new Query()
.addCriteria(Criteria.where(FILENAME).is(sha1Hash).and(TENANT_QUERY).is(sanitizeTenant(tenant))));
// fallback pre-multi-tenancy
if (found == null) {
found = gridFs.findOne(
new Query().addCriteria(Criteria.where(FILENAME).is(sha1Hash).and(TENANT_QUERY).exists(false)));
}
return createGridFsArtifact(found);
} catch (final MongoClientException e) {
throw new ArtifactStoreException(e.getMessage(), e);
}
}
@SuppressWarnings("squid:S2589")
// False positive: file.getMetadata() can return null
private static final String getContentType(final GridFSFile file) {
final Document metadata = file.getMetadata();
String contentType = null;
if (metadata != null) {
contentType = metadata.getString(CONTENT_TYPE);
}
if (contentType == null) {
try {
contentType = file.getContentType();
} catch (final MongoGridFSException e) {
throw new ArtifactStoreException("Could not determine content type for file " + file.getId(), e);
}
}
return contentType;
}
@Override
public AttachmentMetadata getMetadata(AttachmentId attachmentId) throws IOException {
GridFSFile metadataFile = getMetadataGridFSFile(attachmentId);
if (metadataFile == null)
return null;
try (GridFSDownloadStream metadataStream = metadataGrid
.openDownloadStream(metadataFile.getObjectId())) {
if (metadataStream == null) {
return null;
}
AttachmentProto.AttachmentMetadata protoMetadata = AttachmentProto.AttachmentMetadata
.parseFrom(metadataStream);
return new AttachmentMetadataProtoImpl(protoMetadata);
} catch (MongoException e) {
throw new IOException(e);
}
}
private AttachmentData fileToAttachmentData(final GridFSFile attachmentFile) {
if (attachmentFile == null) {
return null;
} else {
return new AttachmentData() {
@Override
public InputStream getInputStream() throws IOException {
return attachmentGrid.openDownloadStream(attachmentFile.getObjectId());
}
@Override
public long getSize() {
return attachmentFile.getLength();
}
};
}
}
@Override
public InputStream getAssociatedDocumentStream(String uniqueId, String fileName) {
GridFSBucket gridFS = createGridFSConnection();
GridFSFile file = gridFS.find(new Document(ASSOCIATED_METADATA + "." + FILE_UNIQUE_ID_KEY, getGridFsId(uniqueId, fileName))).first();
if (file == null) {
return null;
}
InputStream is = gridFS.openDownloadStream(file.getObjectId());
;
Document metadata = file.getMetadata();
if (metadata.containsKey(COMPRESSED_FLAG)) {
boolean compressed = (boolean) metadata.remove(COMPRESSED_FLAG);
if (compressed) {
is = new InflaterInputStream(is);
}
}
return is;
}
public boolean fileHasProperties(String name, String bucketName, Map<String, String> attrs) {
GridFSBucket bucket = GridFSBuckets.create(client.getDatabase(DB), bucketName);
MongoCursor it = bucket.find(Document.parse(String.format("{ \"filename\": \"%s\" }", name))).iterator();
boolean retVal = false;
if (it.hasNext()) {
GridFSFile file = (GridFSFile)it.next();
Document metadata = file.getMetadata();
if (metadata != null && metadata.size() == attrs.size()) {
retVal = true;
for (Map.Entry<String, Object> entry : metadata.entrySet()) {
Object val = attrs.get(entry.getKey());
if (val == null || !entry.getValue().equals(val)) {
retVal = false;
break;
}
}
}
}
it.close();
return retVal;
}
private MongoFile gfs2Mg(GridFSFile gridFSDBFile) {
try {
MongoFile mongoFile = new MongoFile();
Document metaData = gridFSDBFile.getMetadata();
mongoFile.setContentType(String.valueOf(metaData.get("contentType")));
mongoFile.setFilename(String.valueOf(metaData.get("filename")));
mongoFile.setCreateTime(gridFSDBFile.getUploadDate().getTime());
mongoFile.setGridId(gridFSDBFile.getFilename());
GridFsResource gridFsResource = gridFsTemplate.getResource(gridFSDBFile.getFilename());
mongoFile.setContent(org.springframework.util.StreamUtils.copyToByteArray(gridFsResource.getInputStream()));
return mongoFile;
} catch (IOException e) {
throw new RuntimeException("文件读取异常!");
}
}
public long contentLength() throws IOException {
GridFSFile file = gridfs.findOne(query(whereFilename().is(location)));
if (file == null) {
return 0L;
}
return file.getLength();
}
public long lastModified() throws IOException {
GridFSFile file = gridfs.findOne(query(whereFilename().is(location)));
if (file == null) {
return -1L;
}
return file.getUploadDate().getTime();
}
public Object getId() {
GridFSFile file = gridfs.findOne(query(whereFilename().is(location)));
if (file == null) {
return null;
}
return file.getId();
}
public String getContentType() {
GridFSFile file = gridfs.findOne(query(whereFilename().is(location)));
if (file == null) {
return null;
}
return file.getContentType();
}
public InputStream getInputStream() throws IOException, IllegalStateException {
GridFSFile file = gridfs.findOne(query(whereFilename().is(location)));
if (file == null) {
return null;
}
return gridfs.getResource(location).getInputStream();
}
private void deleteArtifact(final GridFSFile file) {
if (file != null) {
try {
gridFs.delete(new Query().addCriteria(Criteria.where(ID).is(file.getId())));
} catch (final MongoClientException e) {
throw new ArtifactStoreException(e.getMessage(), e);
}
}
}
@Override
protected AbstractDbArtifact store(final String tenant, final DbArtifactHash base16Hashes, final String contentType,
final String tempFile) throws IOException {
final GridFSFile result = gridFs.findOne(new Query().addCriteria(
Criteria.where(FILENAME).is(base16Hashes.getSha1()).and(TENANT_QUERY).is(sanitizeTenant(tenant))));
if (result == null) {
try {
final GridFSFile temp = loadTempFile(tempFile);
final Document metadata = new Document();
metadata.put(SHA1, base16Hashes.getSha1());
metadata.put(TENANT, tenant);
metadata.put(FILENAME, base16Hashes.getSha1());
metadata.put(CONTENT_TYPE, contentType);
final GridFsResource resource = gridFs.getResource(temp);
final ObjectId id = gridFs.store(resource.getInputStream(), base16Hashes.getSha1(), contentType, metadata);
final GridFSFile file = gridFs.findOne(new Query().addCriteria(Criteria.where(ID).is(id)));
return createGridFsArtifact(file, contentType, base16Hashes);
} catch (final MongoClientException e) {
throw new ArtifactStoreException(e.getMessage(), e);
}
}
return createGridFsArtifact(result, contentType, base16Hashes);
}
/**
* Maps a single {@link GridFSFile} to {@link GridFsArtifact}.
*
* @param file
* the {@link GridFSFile} object.
* @param contentType
* the content type of the artifact
* @param hashes
* the {@link DbArtifactHash} object of the artifact
* @return a mapped artifact from the given file
*/
private GridFsArtifact createGridFsArtifact(final GridFSFile file, final String contentType,
final DbArtifactHash hashes) {
if (file == null) {
return null;
}
return new GridFsArtifact(file.getId().toString(), hashes, file.getLength(), contentType, () -> {
try {
return gridFs.getResource(file).getInputStream();
} catch (final IllegalStateException | IOException e) {
throw new ArtifactStoreException(e.getMessage(), e);
}
});
}
@Override
public boolean existsByTenantAndSha1(final String tenant, final String sha1Hash) {
final GridFSFile artifact = gridFs.findOne(new Query()
.addCriteria(Criteria.where(FILENAME).is(sha1Hash).and(TENANT_QUERY).is(sanitizeTenant(tenant))));
return artifact != null;
}
@Override
public Observable<GridFSFile> first() {
return RxObservables.create(Observables.observe(new Block<SingleResultCallback<GridFSFile>>(){
@Override
public void apply(final SingleResultCallback<GridFSFile> callback) {
wrapped.first(callback);
}
}), observableAdapter);
}
@Override
public Observable<GridFSFile> getGridFSFile() {
return RxObservables.create(Observables.observe(new Block<SingleResultCallback<GridFSFile>>() {
@Override
public void apply(final SingleResultCallback<GridFSFile> callback) {
wrapped.getGridFSFile(callback);
}
}), observableAdapter);
}
@Override
public Publisher<GridFSFile> getGridFSFile() {
return new ObservableToPublisher<GridFSFile>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<GridFSFile>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<GridFSFile> callback) {
wrapped.getGridFSFile(callback);
}
}));
}
@Override
public Publisher<GridFSFile> first() {
return new ObservableToPublisher<GridFSFile>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<GridFSFile>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<GridFSFile> callback) {
wrapped.first(callback);
}
}));
}
@Override
public void deleteAttachment(AttachmentId attachmentId) {
GridFSFile attachmentFile = getAttachmentGridFSFile(attachmentId);
if (attachmentFile != null)
attachmentGrid.delete(attachmentFile.getObjectId());
GridFSFile thumbnailFile = getThumnailGridFSFile(attachmentId);
if (thumbnailFile != null)
thumbnailGrid.delete(thumbnailFile.getObjectId());
GridFSFile metadataFile = getMetadataGridFSFile(attachmentId);
if (metadataFile != null)
metadataGrid.delete(metadataFile.getObjectId());
}
@Test
public void shouldStoreSimpleFile() throws IOException {
try (InputStream is = new BufferedInputStream(new ClassPathResource("./example-file.txt").getInputStream())) {
// store file
gridFsOperations.store(is, "example-file.txt");
}
// get file or resource by filename
GridFSFile file = gridFsOperations.findOne(query(whereFilename().is("example-file.txt")));
assertThat(file.getFilename()).isEqualTo("example-file.txt");
}
@Test
public void shouldStoreFileWithMetadata() throws IOException {
try (InputStream is = new BufferedInputStream(new ClassPathResource("./example-file.txt").getInputStream())) {
// store file with metaData
Customer customerMetaData = new Customer("Hardy", "Lang");
gridFsOperations.store(is, "example-file.txt", customerMetaData);
}
// search by metaData
GridFSFile file = gridFsOperations.findOne(query(whereMetaData("firstName").is("Hardy")));
assertThat(file.getFilename()).isEqualTo("example-file.txt");
}
@Override
public List<AssociatedDocument> getAssociatedDocuments(String uniqueId, FetchType fetchType) throws Exception {
GridFSBucket gridFS = createGridFSConnection();
List<AssociatedDocument> assocDocs = new ArrayList<>();
if (!FetchType.NONE.equals(fetchType)) {
GridFSFindIterable files = gridFS.find(new Document(ASSOCIATED_METADATA + "." + DOCUMENT_UNIQUE_ID_KEY, uniqueId));
for (GridFSFile file : files) {
AssociatedDocument ad = loadGridFSToAssociatedDocument(gridFS, file, fetchType);
assocDocs.add(ad);
}
}
return assocDocs;
}
@Override
public AssociatedDocument getAssociatedDocument(String uniqueId, String fileName, FetchType fetchType) throws Exception {
GridFSBucket gridFS = createGridFSConnection();
if (!FetchType.NONE.equals(fetchType)) {
GridFSFile file = gridFS.find(new Document(ASSOCIATED_METADATA + "." + FILE_UNIQUE_ID_KEY, getGridFsId(uniqueId, fileName))).first();
if (null != file) {
return loadGridFSToAssociatedDocument(gridFS, file, fetchType);
}
}
return null;
}
private AssociatedDocument loadGridFSToAssociatedDocument(GridFSBucket gridFS, GridFSFile file, FetchType fetchType) throws IOException {
AssociatedDocument.Builder aBuilder = AssociatedDocument.newBuilder();
aBuilder.setFilename(file.getFilename());
Document metadata = file.getMetadata();
boolean compressed = false;
if (metadata.containsKey(COMPRESSED_FLAG)) {
compressed = (boolean) metadata.remove(COMPRESSED_FLAG);
}
long timestamp = (long) metadata.remove(TIMESTAMP);
aBuilder.setCompressed(compressed);
aBuilder.setTimestamp(timestamp);
aBuilder.setDocumentUniqueId((String) metadata.remove(DOCUMENT_UNIQUE_ID_KEY));
for (String field : metadata.keySet()) {
aBuilder.addMetadata(Metadata.newBuilder().setKey(field).setValue((String) metadata.get(field)));
}
if (FetchType.FULL.equals(fetchType)) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
gridFS.downloadToStream(file.getObjectId(), byteArrayOutputStream);
byte[] bytes = byteArrayOutputStream.toByteArray();
if (null != bytes) {
if (compressed) {
bytes = CommonCompression.uncompressZlib(bytes);
}
aBuilder.setDocument(ByteString.copyFrom(bytes));
}
}
aBuilder.setIndexName(indexName);
return aBuilder.build();
}
@Override
public List<String> getAssociatedFilenames(String uniqueId) throws Exception {
GridFSBucket gridFS = createGridFSConnection();
ArrayList<String> fileNames = new ArrayList<>();
gridFS.find(new Document(ASSOCIATED_METADATA + "." + DOCUMENT_UNIQUE_ID_KEY, uniqueId))
.forEach((Consumer<com.mongodb.client.gridfs.model.GridFSFile>) gridFSFile -> fileNames.add(gridFSFile.getFilename()));
return fileNames;
}
@Override
public void deleteAssociatedDocument(String uniqueId, String fileName) {
GridFSBucket gridFS = createGridFSConnection();
gridFS.find(new Document(ASSOCIATED_METADATA + "." + FILE_UNIQUE_ID_KEY, getGridFsId(uniqueId, fileName)))
.forEach((Block<com.mongodb.client.gridfs.model.GridFSFile>) gridFSFile -> gridFS.delete(gridFSFile.getObjectId()));
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile input = session.get();
if (input == null) {
return;
}
final String deleteQuery = getQuery(context, input);
final String queryAttribute = context.getProperty(QUERY_ATTRIBUTE).isSet()
? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
: null;
GridFSBucket bucket = getBucket(input, context);
try {
Document query = Document.parse(deleteQuery);
MongoCursor cursor = bucket.find(query).iterator();
if (cursor.hasNext()) {
GridFSFile file = (GridFSFile)cursor.next();
bucket.delete(file.getObjectId());
if (!StringUtils.isEmpty(queryAttribute)) {
input = session.putAttribute(input, queryAttribute, deleteQuery);
}
session.transfer(input, REL_SUCCESS);
} else {
getLogger().error(String.format("Query %s did not delete anything in %s", deleteQuery, bucket.getBucketName()));
session.transfer(input, REL_FAILURE);
}
cursor.close();
} catch (Exception ex) {
getLogger().error(String.format("Error deleting using query: %s", deleteQuery), ex);
session.transfer(input, REL_FAILURE);
}
}
private void handleFile(GridFSBucket bucket, ProcessSession session, ProcessContext context, FlowFile parent, GridFSFile input, String query) {
Map<String, String> attrs = new HashMap<>();
attrs.put(METADATA_ATTRIBUTE, input.getMetadata() != null ? input.getMetadata().toJson() : "{}");
if (context.getProperty(QUERY_ATTRIBUTE).isSet()) {
String key = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(parent).getValue();
attrs.put(key, query);
}
attrs.put(CoreAttributes.FILENAME.key(), input.getFilename());
FlowFile output = parent != null ? session.create(parent) : session.create();
output = session.write(output, out -> bucket.downloadToStream(input.getObjectId(), out));
output = session.putAllAttributes(output, attrs);
session.transfer(output, REL_SUCCESS);
session.getProvenanceReporter().receive(output, getTransitUri(input.getObjectId(), output, context));
}