下面列出了怎么用com.mongodb.client.gridfs.model.GridFSUploadOptions的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public Future<String> uploadFileWithOptions(String fileName, GridFsUploadOptions options) {
requireNonNull(fileName, "fileName cannot be null");
OpenOptions openOptions = new OpenOptions().setRead(true);
return vertx.fileSystem().open(fileName, openOptions)
.flatMap(file -> {
GridFSReadStreamPublisher publisher = new GridFSReadStreamPublisher(file);
Promise<ObjectId> promise = vertx.promise();
if (options == null) {
bucket.uploadFromPublisher(fileName, publisher).subscribe(new SingleResultSubscriber<>(promise));
} else {
GridFSUploadOptions uploadOptions = new GridFSUploadOptions();
uploadOptions.chunkSizeBytes(options.getChunkSizeBytes());
if (options.getMetadata() != null) {
uploadOptions.metadata(new Document(options.getMetadata().getMap()));
}
bucket.uploadFromPublisher(fileName, publisher, uploadOptions).subscribe(new SingleResultSubscriber<>(promise));
}
return promise.future().map(ObjectId::toHexString);
});
}
@Override
public Observable<ObjectId> uploadFromStream(final String filename, final AsyncInputStream source, final GridFSUploadOptions options) {
return RxObservables.create(Observables.observe(new Block<SingleResultCallback<ObjectId>>() {
@Override
public void apply(final SingleResultCallback<ObjectId> callback) {
wrapped.uploadFromStream(filename, toCallbackAsyncInputStream(source), options, callback);
}
}), observableAdapter);
}
@Override
public Observable<Success> uploadFromStream(final BsonValue id, final String filename, final AsyncInputStream source,
final GridFSUploadOptions options) {
return RxObservables.create(Observables.observe(new Block<SingleResultCallback<Success>>() {
@Override
public void apply(final SingleResultCallback<Success> callback) {
wrapped.uploadFromStream(id, filename, toCallbackAsyncInputStream(source), options, voidToSuccessCallback(callback));
}
}), observableAdapter);
}
@Override
public Publisher<ObjectId> uploadFromStream(final String filename,
final com.mongodb.reactivestreams.client.gridfs.AsyncInputStream source,
final GridFSUploadOptions options) {
return new ObservableToPublisher<ObjectId>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<ObjectId>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<ObjectId> callback) {
wrapped.uploadFromStream(filename, toCallbackAsyncInputStream(source), options, callback);
}
}));
}
@Override
public Publisher<Success> uploadFromStream(final BsonValue id, final String filename,
final com.mongodb.reactivestreams.client.gridfs.AsyncInputStream source,
final GridFSUploadOptions options) {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.uploadFromStream(id, filename, toCallbackAsyncInputStream(source), options,
voidToSuccessCallback(callback));
}
}));
}
@Override
public Publisher<ObjectId> uploadFromStream(final ClientSession clientSession, final String filename,
final com.mongodb.reactivestreams.client.gridfs.AsyncInputStream source,
final GridFSUploadOptions options) {
return new ObservableToPublisher<ObjectId>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<ObjectId>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<ObjectId> callback) {
wrapped.uploadFromStream(clientSession.getWrapped(), filename, toCallbackAsyncInputStream(source), options,
callback);
}
}));
}
@Override
public Publisher<Success> uploadFromStream(final ClientSession clientSession, final BsonValue id, final String filename,
final com.mongodb.reactivestreams.client.gridfs.AsyncInputStream source,
final GridFSUploadOptions options) {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.uploadFromStream(clientSession.getWrapped(), id, filename, toCallbackAsyncInputStream(source), options,
voidToSuccessCallback(callback));
}
}));
}
@Override
public void storeAttachment(AttachmentId attachmentId, InputStream dataInputStream)
throws IOException {
GridFSUploadOptions options = new GridFSUploadOptions().chunkSizeBytes(1024)
.metadata(new Document("type", "attachment").append("id", attachmentId.serialise()));
attachmentGrid.uploadFromStream(attachmentId.serialise(), dataInputStream, options);
}
@Override
public void storeMetadata(AttachmentId attachmentId, AttachmentMetadata metaData)
throws IOException {
AttachmentMetadataProtoImpl proto = new AttachmentMetadataProtoImpl(metaData);
byte[] bytes = proto.getPB().toByteArray();
GridFSUploadOptions options = new GridFSUploadOptions().chunkSizeBytes(255)
.metadata(new Document("type", "metadata").append("id", attachmentId.serialise()));
metadataGrid.uploadFromStream(attachmentId.serialise(), new ByteArrayInputStream(bytes),
options);
}
@Override
public void storeThumbnail(AttachmentId attachmentId, InputStream dataInputStream)
throws IOException {
GridFSUploadOptions options = new GridFSUploadOptions()
.chunkSizeBytes(1024)
.metadata(new Document("type", "thumbnail").append("id", attachmentId.serialise()));
thumbnailGrid.uploadFromStream(attachmentId.serialise(), dataInputStream,
options);
}
@Override
public Future<String> uploadByFileNameWithOptions(ReadStream<Buffer> stream, String fileName, GridFsUploadOptions options) {
GridFSUploadOptions uploadOptions = new GridFSUploadOptions();
uploadOptions.chunkSizeBytes(options.getChunkSizeBytes());
if (options.getMetadata() != null) uploadOptions.metadata(new Document(options.getMetadata().getMap()));
GridFSReadStreamPublisher publisher = new GridFSReadStreamPublisher(stream);
Promise<ObjectId> promise = vertx.promise();
bucket.uploadFromPublisher(fileName, publisher, uploadOptions).subscribe(new SingleResultSubscriber<>(promise));
return promise.future().map(ObjectId::toHexString);
}
private GridFSUploadOptions getGridFSUploadOptions(String uniqueId, String fileName, boolean compress, long timestamp, Map<String, String> metadataMap) {
Document metadata = new Document();
if (metadataMap != null) {
for (String key : metadataMap.keySet()) {
metadata.put(key, metadataMap.get(key));
}
}
metadata.put(TIMESTAMP, timestamp);
metadata.put(COMPRESSED_FLAG, compress);
metadata.put(DOCUMENT_UNIQUE_ID_KEY, uniqueId);
metadata.put(FILE_UNIQUE_ID_KEY, getGridFsId(uniqueId, fileName));
return new GridFSUploadOptions().chunkSizeBytes(1024).metadata(metadata);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile input = session.get();
if (input == null) {
return;
}
GridFSBucket bucket = getBucket(input, context);
if (!canUploadFile(context, input, bucket.getBucketName())) {
getLogger().error("Cannot upload the file because of the uniqueness policy configured.");
session.transfer(input, REL_DUPLICATE);
return;
}
final int chunkSize = context.getProperty(CHUNK_SIZE).evaluateAttributeExpressions(input).asDataSize(DataUnit.B).intValue();
try (InputStream fileInput = session.read(input)) {
String fileName = context.getProperty(FILE_NAME).evaluateAttributeExpressions(input).getValue();
GridFSUploadOptions options = new GridFSUploadOptions()
.chunkSizeBytes(chunkSize)
.metadata(getMetadata(input, context));
ObjectId id = bucket.uploadFromStream(fileName, fileInput, options);
fileInput.close();
if (id != null) {
input = session.putAttribute(input, ID_ATTRIBUTE, id.toString());
session.transfer(input, REL_SUCCESS);
session.getProvenanceReporter().send(input, getTransitUri(id, input, context));
} else {
getLogger().error("ID was null, assuming failure.");
session.transfer(input, REL_FAILURE);
}
} catch (Exception ex) {
getLogger().error("Failed to upload file", ex);
session.transfer(input, REL_FAILURE);
}
}
public ObjectId writeTestFile(String fileName, String content, String bucketName, Map<String, Object> attrs) {
GridFSBucket bucket = GridFSBuckets.create(client.getDatabase(DB), bucketName);
GridFSUploadOptions options = new GridFSUploadOptions().metadata(new Document(attrs));
ByteArrayInputStream input = new ByteArrayInputStream(content.getBytes());
ObjectId retVal = bucket.uploadFromStream(fileName, input, options);
return retVal;
}
@Override
public void uploadFile(FileUpload file) {
bucket.uploadFromStream(file.getName(), file.getContent(), new GridFSUploadOptions().metadata(new Document().append("contentType", file.getContentType())));
}
@Override
public GridFSUploadStream openUploadStream(final String filename, final GridFSUploadOptions options) {
return new GridFSUploadStreamImpl(wrapped.openUploadStream(filename, options), observableAdapter);
}
@Override
public GridFSUploadStream openUploadStream(final BsonValue id, final String filename, final GridFSUploadOptions options) {
return new GridFSUploadStreamImpl(wrapped.openUploadStream(id, filename, options), observableAdapter);
}
@Override
public com.mongodb.reactivestreams.client.gridfs.GridFSUploadStream
openUploadStream(final String filename) {
return openUploadStream(filename, new GridFSUploadOptions());
}
@Override
public com.mongodb.reactivestreams.client.gridfs.GridFSUploadStream
openUploadStream(final String filename, final GridFSUploadOptions options) {
return new GridFSUploadStreamImpl(wrapped.openUploadStream(filename, options));
}
@Override
public com.mongodb.reactivestreams.client.gridfs.GridFSUploadStream
openUploadStream(final BsonValue id, final String filename) {
return openUploadStream(id, filename, new GridFSUploadOptions());
}
@Override
public com.mongodb.reactivestreams.client.gridfs.GridFSUploadStream
openUploadStream(final BsonValue id, final String filename, final GridFSUploadOptions options) {
return new GridFSUploadStreamImpl(wrapped.openUploadStream(id, filename, options));
}
@Override
public com.mongodb.reactivestreams.client.gridfs.GridFSUploadStream
openUploadStream(final ClientSession clientSession, final String filename) {
return openUploadStream(clientSession, filename, new GridFSUploadOptions());
}
@Override
public com.mongodb.reactivestreams.client.gridfs.GridFSUploadStream
openUploadStream(final ClientSession clientSession, final String filename, final GridFSUploadOptions options) {
return new GridFSUploadStreamImpl(wrapped.openUploadStream(clientSession.getWrapped(), filename, options));
}
@Override
public com.mongodb.reactivestreams.client.gridfs.GridFSUploadStream
openUploadStream(final ClientSession clientSession, final BsonValue id, final String filename) {
return openUploadStream(clientSession, id, filename, new GridFSUploadOptions());
}
@Override
public com.mongodb.reactivestreams.client.gridfs.GridFSUploadStream
openUploadStream(final ClientSession clientSession, final BsonValue id, final String filename, final GridFSUploadOptions options) {
return new GridFSUploadStreamImpl(wrapped.openUploadStream(clientSession.getWrapped(), id, filename, options));
}
@Override
public Publisher<ObjectId> uploadFromStream(final String filename,
final com.mongodb.reactivestreams.client.gridfs.AsyncInputStream source) {
return uploadFromStream(filename, source, new GridFSUploadOptions());
}
@Override
public Publisher<Success> uploadFromStream(final BsonValue id, final String filename,
final com.mongodb.reactivestreams.client.gridfs.AsyncInputStream source) {
return uploadFromStream(id, filename, source, new GridFSUploadOptions());
}
@Override
public Publisher<ObjectId> uploadFromStream(final ClientSession clientSession, final String filename,
final com.mongodb.reactivestreams.client.gridfs.AsyncInputStream source) {
return uploadFromStream(clientSession, filename, source, new GridFSUploadOptions());
}
@Override
public Publisher<Success> uploadFromStream(final ClientSession clientSession, final BsonValue id, final String filename,
final com.mongodb.reactivestreams.client.gridfs.AsyncInputStream source) {
return uploadFromStream(clientSession, id, filename, source, new GridFSUploadOptions());
}
@Override
public GridFSUploadPublisher<ObjectId> uploadFromPublisher(final String filename, final Publisher<ByteBuffer> source) {
return uploadFromPublisher(filename, source, new GridFSUploadOptions());
}