下面列出了怎么用org.elasticsearch.action.get.GetRequestBuilder的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected Client getTransportClient(Settings.Builder settingsBuilder, String xPackPath,
String username, String password,
List<InetSocketAddress> esHosts, ComponentLog log)
throws MalformedURLException {
TransportClient mockClient = mock(TransportClient.class);
GetRequestBuilder getRequestBuilder = spy(new GetRequestBuilder(mockClient, GetAction.INSTANCE));
if (exceptionToThrow != null) {
doThrow(exceptionToThrow).when(getRequestBuilder).execute();
} else {
doReturn(new MockGetRequestBuilderExecutor(documentExists, esHosts.get(0))).when(getRequestBuilder).execute();
}
when(mockClient.prepareGet(anyString(), anyString(), anyString())).thenReturn(getRequestBuilder);
return mockClient;
}
@Override
public void get(String index, String type, String id, GetOptions options, Handler<AsyncResult<com.hubrick.vertx.elasticsearch.model.GetResponse>> resultHandler) {
final GetRequestBuilder builder = client.prepareGet(index, type, id);
populateGetRequestBuilder(builder, options);
builder.execute(new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
resultHandler.handle(Future.succeededFuture(mapToUpdateResponse(getResponse)));
}
@Override
public void onFailure(Exception t) {
handleFailure(resultHandler, t);
}
});
}
private void populateGetRequestBuilder(GetRequestBuilder builder, GetOptions options) {
if (options != null) {
if (options.getRouting() != null) builder.setRouting(options.getRouting());
if (options.getParent() != null) builder.setParent(options.getParent());
if (options.getRefresh() != null) builder.setRefresh(options.getRefresh());
if (options.getVersion() != null) builder.setVersion(options.getVersion());
if (options.getVersionType() != null) builder.setVersionType(options.getVersionType());
if (options.getPreference() != null) builder.setPreference(options.getPreference());
if (!options.getFields().isEmpty()) {
builder.setStoredFields(options.getFields().toArray(new String[options.getFields().size()]));
}
if (options.getFetchSource() != null) builder.setFetchSource(options.getFetchSource());
if (!options.getFetchSourceIncludes().isEmpty() || !options.getFetchSourceExcludes().isEmpty()) {
String[] includes = options.getFetchSourceIncludes().toArray(new String[options.getFetchSourceIncludes().size()]);
String[] excludes = options.getFetchSourceExcludes().toArray(new String[options.getFetchSourceExcludes().size()]);
builder.setFetchSource(includes, excludes);
}
if (options.getRealtime() != null) builder.setRealtime(options.getRealtime());
}
}
@Override
protected Client getTransportClient(Settings.Builder settingsBuilder, String xPackPath,
String username, String password,
List<InetSocketAddress> esHosts, ComponentLog log)
throws MalformedURLException {
TransportClient mockClient = mock(TransportClient.class);
GetRequestBuilder getRequestBuilder = spy(new GetRequestBuilder(mockClient, GetAction.INSTANCE));
if (exceptionToThrow != null) {
doThrow(exceptionToThrow).when(getRequestBuilder).execute();
} else {
doReturn(new MockGetRequestBuilderExecutor(documentExists, esHosts.get(0))).when(getRequestBuilder).execute();
}
when(mockClient.prepareGet(anyString(), anyString(), anyString())).thenReturn(getRequestBuilder);
return mockClient;
}
/**
* 通过ID获取数据
* @param index 索引,类似数据库
* @param type 类型,类似表
* @param id 数据ID
* @param fields 需要显示的字段,逗号分隔(缺省为全部字段)
* @return 结果
*/
public static Map<String, Object> searchDataById(String index, String type, String id, String fields) {
GetRequestBuilder getRequestBuilder = client.prepareGet(index, type, id);
if (StringUtils.isNotEmpty(fields)) {
getRequestBuilder.setFetchSource(fields.split(","), null);
}
GetResponse getResponse = getRequestBuilder.execute().actionGet();
return getResponse.getSource();
}
public <T> T query(ESBasicInfo esBasicInfo, Class<T> clazz) throws IOException {
GetRequestBuilder requestBuilder =
esClient.prepareGet(esBasicInfo.getIndex(), esBasicInfo.getType(), esBasicInfo.getIds()[0]);
GetResponse response = requestBuilder.execute().actionGet();
return response.getSourceAsString() != null ? mapper.readValue(response.getSourceAsString(), clazz) : null;
}
public <T> T query(ESBasicInfo esBasicInfo, Class<T> clazz) throws IOException {
GetRequestBuilder requestBuilder =
esClient.prepareGet(esBasicInfo.getIndex(), esBasicInfo.getType(), esBasicInfo.getIds()[0]);
GetResponse response = requestBuilder.execute().actionGet();
return response.getSourceAsString() != null ? mapper.readValue(response.getSourceAsString(), clazz) : null;
}
@Override
protected TransportClient getTransportClient(Settings.Builder settingsBuilder, String shieldUrl,
String username, String password)
throws MalformedURLException {
TransportClient mockClient = mock(TransportClient.class);
GetRequestBuilder getRequestBuilder = spy(new GetRequestBuilder(mockClient, GetAction.INSTANCE));
if (exceptionToThrow != null) {
doThrow(exceptionToThrow).when(getRequestBuilder).execute();
} else {
doReturn(new MockGetRequestBuilderExecutor(documentExists)).when(getRequestBuilder).execute();
}
when(mockClient.prepareGet(anyString(), anyString(), anyString())).thenReturn(getRequestBuilder);
return mockClient;
}
/**
* Retrieve one entity model by its type.
*
* @param entityType The entity type.
* @param client The client that will communicate with Elasticsearch.
* @return The response from Elasticsearch.
* @throws ForbiddenException
*/
public static GetResponse getEntityModel(String entityType, NodeClient client) throws ForbiddenException {
GetRequestBuilder request = client.prepareGet(INDEX_NAME, "doc", entityType);
try {
return request.get();
} catch (IndexNotFoundException e) {
try {
SetupAction.createIndex(client);
} catch (ElasticsearchSecurityException se) {
throw new ForbiddenException("The .zentity-models index does not exist and you do not have the 'create_index' privilege. An authorized user must create the index by submitting: POST _zentity/_setup");
}
return request.get();
}
}
/**
* 通过ID获取数据
* @param index 索引,类似数据库
* @param type 类型,类似表
* @param id 数据ID
* @param fields 需要显示的字段,逗号分隔(缺省为全部字段)
* @return
*/
public static Map<String, Object> searchDataById(String index, String type, String id, String fields) {
GetRequestBuilder getRequestBuilder = client.prepareGet(index, type, id);
if (!StringUtils.isEmpty(fields)) {
getRequestBuilder.setFetchSource(fields.split(","), null);
}
GetResponse getResponse = getRequestBuilder.execute().actionGet();
return getResponse.getSource();
}
@Override
public Observable<Optional<PersistentServiceDef>> call(final String id) {
final Elasticsearch elasticSearch = vertxContext.verticle().elasticsearch();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Request {%s,%s,%s}", elasticSearch.defaultType(), elasticSearch.serviceDefTypeIndex(), id));
}
GetRequestBuilder request = elasticSearch.get()
.prepareGet(
elasticSearch.serviceDefTypeIndex(),
elasticSearch.defaultType(),
id);
return elasticSearch.execute(vertxContext, request, elasticSearch.getDefaultGetTimeout())
.map(oGetResponse -> {
GetResponse getResponse = oGetResponse.get();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Response {%s,%s,%s} = %s", elasticSearch.defaultType(), elasticSearch.serviceDefTypeIndex(), id, Jsonify.toString(getResponse)));
}
if (getResponse.isExists()) {
return fromNullable(fromGetResponse(getResponse));
} else {
return absent();
}
});
}
@Override
public Observable<Optional<PersistentAccount>> call(final String accountId) {
final Elasticsearch elasticSearch = vertxContext.verticle().elasticsearch();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Get Request {%s,%s}", elasticSearch.defaultType(), elasticSearch.accountIndex(), accountId));
}
GetRequestBuilder request =
elasticSearch.get()
.prepareGet(
elasticSearch.accountIndex(),
elasticSearch.defaultType(),
accountId);
return elasticSearch.execute(vertxContext, request, elasticSearch.getDefaultGetTimeout())
.map(oGetResponse -> {
GetResponse getResponse = oGetResponse.get();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Get Response {%s,%s} = %s", getResponse.getType(), elasticSearch.accountIndex(), accountId, Jsonify.toString(getResponse)));
}
if (getResponse.isExists()) {
return of(fromGetResponse(getResponse));
} else {
return absent();
}
});
}
@Override
public Observable<Optional<PersistentMasterKey>> call(String id) {
final Elasticsearch elasticSearch = vertxContext.verticle().elasticsearch();
GetRequestBuilder request = elasticSearch.get()
.prepareGet(
elasticSearch.masterKeyTypeIndex(),
elasticSearch.defaultType(),
id);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Search Request {%s,%s} = %s", elasticSearch.defaultType(), elasticSearch.masterKeyTypeIndex(), Jsonify.toString(request)));
}
return elasticSearch.execute(vertxContext, request, elasticSearch.getDefaultGetTimeout())
.map(oGetResponse -> {
GetResponse getResponse = oGetResponse.get();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Get Response {%s,%s,%s} = %s", elasticSearch.defaultType(), elasticSearch.masterKeyTypeIndex(), id, Jsonify.toString(getResponse)));
}
if (getResponse.isExists()) {
return of(fromGetResponse(getResponse));
} else {
return absent();
}
});
}
@Override
public Observable<Holder3<PersistentContainer, String, Optional<PersistentContainerKey>>> call(final Holder2<PersistentContainer, String> input) {
final Elasticsearch elasticSearch = vertxContext.verticle().elasticsearch();
final String id = input.value1();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Get Request {%s,%s,%s}", elasticSearch.defaultType(), elasticSearch.containerKeyIndex(), id));
}
GetRequestBuilder request = elasticSearch.get()
.prepareGet(
elasticSearch.containerKeyIndex(),
elasticSearch.defaultType(),
id);
return elasticSearch.execute(vertxContext, request, elasticSearch.getDefaultGetTimeout())
.map(oGetResponse -> {
Holder3<PersistentContainer, String, Optional<PersistentContainerKey>> output = new Holder3<>();
output.value0(input.value0());
output.value1(input.value1());
GetResponse getResponse = oGetResponse.get();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Get Response {%s,%s,%s} = %s", elasticSearch.defaultType(), elasticSearch.containerKeyIndex(), id, Jsonify.toString(getResponse)));
}
if (getResponse.isExists()) {
output.value2 = of(fromGetResponse(input.value0(), getResponse));
} else {
output.value2 = absent();
}
return output;
});
}
@Override
public Observable<Optional<PersistentObject>> call(String objectId) {
final Elasticsearch elasticSearch = vertxContext.verticle().elasticsearch();
String objectIndex = elasticSearch.objectIndex(persistentContainer.getName());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Get Request {%s,%s,%s}", elasticSearch.defaultType(), objectIndex, objectId));
}
GetRequestBuilder request = elasticSearch.get()
.prepareGet(
objectIndex,
elasticSearch.defaultType(),
objectId);
return elasticSearch.execute(vertxContext, request, elasticSearch.getDefaultGetTimeout())
.map(oGetResponse -> {
GetResponse getResponse = oGetResponse.get();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Get Response {%s,%s,%s} = %s", elasticSearch.defaultType(), objectIndex, objectId, Jsonify.toString(getResponse)));
}
if (getResponse.isExists()) {
return of(fromGetResponse(persistentContainer, getResponse));
} else {
return absent();
}
});
}
@Override
public Observable<Optional<PersistentContainer>> call(String containerId) {
final Elasticsearch elasticSearch = vertxContext.verticle().elasticsearch();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Get Request {%s,%s,%s}", elasticSearch.defaultType(), elasticSearch.containerIndex(), containerId));
}
GetRequestBuilder request = elasticSearch.get()
.prepareGet(
elasticSearch.containerIndex(),
elasticSearch.defaultType(),
containerId);
return elasticSearch.execute(vertxContext, request, elasticSearch.getDefaultGetTimeout())
.map(oGetResponse -> {
GetResponse getResponse = oGetResponse.get();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Get Response {%s,%s,%s} = %s", elasticSearch.defaultType(), elasticSearch.containerIndex(), containerId, Jsonify.toString(getResponse)));
}
if (getResponse.isExists()) {
return of(fromGetResponse(account, getResponse));
} else {
return absent();
}
});
}
public boolean documentExists(final String index, final String type, final String id) {
return execute(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
LOGGER.trace("Checking for existence of document: '{}/{}/{}'", index, type, id);
GetRequestBuilder builder = client.prepareGet().setIndex(index).setType(type).setId(id);
GetResponse response = builder.get();
final boolean exists = response.isExists();
LOGGER.trace("Document '{}/{}/{}' exists? {}", index, type, id, exists);
return exists;
}
});
}
@Override
protected ElasticsearchDocumentHistory loadExistingDocumentHistory(String documentId)
throws BaleenException {
try {
GetResponse response =
new GetRequestBuilder(elasticsearch.getClient(), GetAction.INSTANCE)
.setId(documentId)
.setIndex(esIndex)
.setType(esType)
.get();
if (!response.isExists() || response.isSourceEmpty()) {
// If we don't have any data, then let parent implementation create a new history
return null;
} else {
ESHistory esh = mapper.readValue(response.getSourceAsBytes(), ESHistory.class);
if (esh == null) {
return new ElasticsearchDocumentHistory(
this, documentId, new LinkedBlockingDeque<HistoryEvent>(Collections.emptyList()));
} else {
return new ElasticsearchDocumentHistory(
this, documentId, new LinkedBlockingDeque<HistoryEvent>(esh.getEvents()));
}
}
} catch (IOException e) {
throw new BaleenException(e);
}
}
@Override
public List<StreamsDatum> process(StreamsDatum entry) {
List<StreamsDatum> result = new ArrayList<>();
ObjectNode metadataObjectNode;
try {
metadataObjectNode = mapper.readValue((String) entry.getDocument(), ObjectNode.class);
} catch (IOException ex) {
return result;
}
Map<String, Object> metadata = ElasticsearchMetadataUtil.asMap(metadataObjectNode);
if (entry.getMetadata() == null) {
return result;
}
String index = ElasticsearchMetadataUtil.getIndex(metadata, config);
String type = ElasticsearchMetadataUtil.getType(metadata, config);
String id = ElasticsearchMetadataUtil.getId(metadata);
GetRequestBuilder getRequestBuilder = elasticsearchClientManager.client().prepareGet(index, type, id);
getRequestBuilder.setFields("*", "_timestamp");
getRequestBuilder.setFetchSource(true);
GetResponse getResponse = getRequestBuilder.get();
if ( getResponse == null || !getResponse.isExists() || getResponse.isSourceEmpty()) {
return result;
}
entry.setDocument(getResponse.getSource());
if ( getResponse.getField("_timestamp") != null) {
DateTime timestamp = new DateTime(((Long) getResponse.getField("_timestamp").getValue()).longValue());
entry.setTimestamp(timestamp);
}
result.add(entry);
return result;
}
@Override
public List<StreamsDatum> process(StreamsDatum entry) {
List<StreamsDatum> result = new ArrayList<>();
if (entry == null || entry.getMetadata() == null) {
return result;
}
Map<String, Object> metadata = entry.getMetadata();
String index = ElasticsearchMetadataUtil.getIndex(metadata, config);
String type = ElasticsearchMetadataUtil.getType(metadata, config);
String id = ElasticsearchMetadataUtil.getId(entry);
GetRequestBuilder getRequestBuilder = elasticsearchClientManager.client().prepareGet(index, type, id);
getRequestBuilder.setFields("*", "_timestamp");
getRequestBuilder.setFetchSource(true);
GetResponse getResponse = getRequestBuilder.get();
if ( getResponse == null || !getResponse.isExists() || getResponse.isSourceEmpty() ) {
return result;
}
entry.setDocument(getResponse.getSource());
if ( getResponse.getField("_timestamp") != null) {
DateTime timestamp = new DateTime(((Long) getResponse.getField("_timestamp").getValue()).longValue());
entry.setTimestamp(timestamp);
}
result.add(entry);
return result;
}
@Override
protected TransportClient getTransportClient(Settings.Builder settingsBuilder, String shieldUrl,
String username, String password)
throws MalformedURLException {
TransportClient mockClient = mock(TransportClient.class);
GetRequestBuilder getRequestBuilder = spy(new GetRequestBuilder(mockClient, GetAction.INSTANCE));
if (exceptionToThrow != null) {
doThrow(exceptionToThrow).when(getRequestBuilder).execute();
} else {
doReturn(new MockGetRequestBuilderExecutor(documentExists)).when(getRequestBuilder).execute();
}
when(mockClient.prepareGet(anyString(), anyString(), anyString())).thenReturn(getRequestBuilder);
return mockClient;
}
protected <T> T get(final String index, final String type, final String id, final SearchCondition<GetRequestBuilder> condition,
final SearchResult<T, GetRequestBuilder, GetResponse> searchResult) {
final long startTime = System.currentTimeMillis();
GetResponse response = null;
final GetRequestBuilder requestBuilder = client.prepareGet(index, type, id);
if (condition.build(requestBuilder)) {
response = requestBuilder.execute().actionGet(ComponentUtil.getFessConfig().getIndexSearchTimeout());
}
final long execTime = System.currentTimeMillis() - startTime;
return searchResult.build(requestBuilder, execTime, OptionalEntity.ofNullable(response, () -> {}));
}
/**
* The fields _routing, _ttl and _timestamp can be imported. The ttl value
* is always from now to the end date, no matter if a time stamp value is set.
* Invalidated objects will not be imported (when actual time is above ttl time stamp).
*/
@Test
public void testFields() {
esSetup.execute(deleteAll(), createIndex("test").withSettings(
fromClassPath("essetup/settings/test_a.json")).withMapping("d",
"{\"d\": {\"_timestamp\": {\"enabled\": true, \"store\": \"yes\"}}}"));
long now = new Date().getTime();
long ttl = 1867329687097L - now;
String path = getClass().getResource("/importdata/import_4").getPath();
ImportResponse response = executeImportRequest("{\"directory\": \"" + path + "\"}");
List<Map<String, Object>> imports = getImports(response);
assertEquals(1, imports.size());
Map<String, Object> nodeInfo = imports.get(0);
assertNotNull(nodeInfo.get("node_id"));
assertTrue(Long.valueOf(nodeInfo.get("took").toString()) > 0);
assertTrue(nodeInfo.get("imported_files").toString().matches(
"\\[\\{file_name=(.*)/importdata/import_4/import_4.json, successes=2, failures=0, invalidated=1}]"));
GetRequestBuilder rb = new GetRequestBuilder(esSetup.client(), "test");
GetResponse res = rb.setType("d").setId("402").setFields("_ttl", "_timestamp", "_routing").execute().actionGet();
assertEquals("the_routing", res.getField("_routing").getValue());
assertTrue(ttl - Long.valueOf(res.getField("_ttl").getValue().toString()) < 10000);
assertEquals(1367329785380L, res.getField("_timestamp").getValue());
res = rb.setType("d").setId("403").setFields("_ttl", "_timestamp").execute().actionGet();
assertTrue(ttl - Long.valueOf(res.getField("_ttl").getValue().toString()) < 10000);
assertTrue(now - Long.valueOf(res.getField("_timestamp").getValue().toString()) < 10000);
assertFalse(existsWithField("404", "name", "404"));
}
@Test
public void testNestedObjectsRewriting() {
prepareNested();
SearchIntoRequest request = new SearchIntoRequest("test");
request.source("{\"fields\": [\"_id\", [\"x.city\", \"_source.city\"], [\"x.surname\", \"_source.name.surname\"], [\"x.name\", \"_source.name.name\"], [\"_index\", \"'newindex'\"]]}");
SearchIntoResponse res = esSetup.client().execute(SearchIntoAction.INSTANCE, request).actionGet();
GetRequestBuilder rb = new GetRequestBuilder(esSetup.client(), "newindex");
GetResponse getRes = rb.setType("a").setId("1").execute().actionGet();
assertTrue(getRes.isExists());
assertEquals("{\"x\":{\"name\":\"Doe\",\"surname\":\"John\",\"city\":\"Dornbirn\"}}", getRes.getSourceAsString());
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
final String docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
final ComponentLog logger = getLogger();
try {
logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId});
final long startNanos = System.nanoTime();
GetRequestBuilder getRequestBuilder = esClient.get().prepareGet(index, docType, docId);
if (authToken != null) {
getRequestBuilder.putHeader("Authorization", authToken);
}
final GetResponse getResponse = getRequestBuilder.execute().actionGet();
if (getResponse == null || !getResponse.isExists()) {
logger.warn("Failed to read {}/{}/{} from Elasticsearch: Document not found",
new Object[]{index, docType, docId});
// We couldn't find the document, so penalize it and send it to "not found"
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_NOT_FOUND);
} else {
flowFile = session.putAttribute(flowFile, "filename", docId);
flowFile = session.putAttribute(flowFile, "es.index", index);
flowFile = session.putAttribute(flowFile, "es.type", docType);
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(getResponse.getSourceAsString().getBytes(charset));
}
});
logger.debug("Elasticsearch document " + docId + " fetched, routing to success");
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
final String uri = context.getProperty(HOSTS).evaluateAttributeExpressions().getValue() + "/" + index + "/" + docType + "/" + docId;
session.getProvenanceReporter().fetch(flowFile, uri, millis);
session.transfer(flowFile, REL_SUCCESS);
}
} catch (NoNodeAvailableException
| ElasticsearchTimeoutException
| ReceiveTimeoutTransportException
| NodeClosedException exceptionToRetry) {
logger.error("Failed to read into Elasticsearch due to {}, this may indicate an error in configuration "
+ "(hosts, username/password, etc.). Routing to retry",
new Object[]{exceptionToRetry.getLocalizedMessage()}, exceptionToRetry);
session.transfer(flowFile, REL_RETRY);
context.yield();
} catch (Exception e) {
logger.error("Failed to read {} from Elasticsearch due to {}", new Object[]{flowFile, e.getLocalizedMessage()}, e);
session.transfer(flowFile, REL_FAILURE);
context.yield();
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
synchronized (esClient) {
if(esClient.get() == null) {
super.setup(context);
}
}
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
final String docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
final ComponentLog logger = getLogger();
try {
logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId});
GetRequestBuilder getRequestBuilder = esClient.get().prepareGet(index, docType, docId);
final GetResponse getResponse = getRequestBuilder.execute().actionGet();
if (getResponse == null || !getResponse.isExists()) {
logger.warn("Failed to read {}/{}/{} from Elasticsearch: Document not found",
new Object[]{index, docType, docId});
// We couldn't find the document, so penalize it and send it to "not found"
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_NOT_FOUND);
} else {
flowFile = session.putAllAttributes(flowFile, new HashMap<String, String>() {{
put("filename", docId);
put("es.index", index);
put("es.type", docType);
}});
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(getResponse.getSourceAsString().getBytes(charset));
}
});
logger.debug("Elasticsearch document " + docId + " fetched, routing to success");
// The document is JSON, so update the MIME type of the flow file
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
session.getProvenanceReporter().fetch(flowFile, getResponse.remoteAddress().getAddress());
session.transfer(flowFile, REL_SUCCESS);
}
} catch (NoNodeAvailableException
| ElasticsearchTimeoutException
| ReceiveTimeoutTransportException
| NodeClosedException exceptionToRetry) {
logger.error("Failed to read into Elasticsearch due to {}, this may indicate an error in configuration "
+ "(hosts, username/password, etc.), or this issue may be transient. Routing to retry",
new Object[]{exceptionToRetry.getLocalizedMessage()}, exceptionToRetry);
session.transfer(flowFile, REL_RETRY);
context.yield();
} catch (Exception e) {
logger.error("Failed to read {} from Elasticsearch due to {}", new Object[]{flowFile, e.getLocalizedMessage()}, e);
session.transfer(flowFile, REL_FAILURE);
context.yield();
}
}
@Override
public GetRequestBuilder prepareGet() {
return wrapped.prepareGet();
}
@Override
public GetRequestBuilder prepareGet(String index, String type, String id) {
return wrapped.prepareGet(index, type, id);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
final String docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
final ComponentLog logger = getLogger();
try {
logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId});
final long startNanos = System.nanoTime();
GetRequestBuilder getRequestBuilder = esClient.get().prepareGet(index, docType, docId);
if (authToken != null) {
getRequestBuilder.putHeader("Authorization", authToken);
}
final GetResponse getResponse = getRequestBuilder.execute().actionGet();
if (getResponse == null || !getResponse.isExists()) {
logger.debug("Failed to read {}/{}/{} from Elasticsearch: Document not found",
new Object[]{index, docType, docId});
// We couldn't find the document, so penalize it and send it to "not found"
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_NOT_FOUND);
} else {
flowFile = session.putAttribute(flowFile, "filename", docId);
flowFile = session.putAttribute(flowFile, "es.index", index);
flowFile = session.putAttribute(flowFile, "es.type", docType);
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(getResponse.getSourceAsString().getBytes(charset));
}
});
logger.debug("Elasticsearch document " + docId + " fetched, routing to success");
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
final String uri = context.getProperty(HOSTS).evaluateAttributeExpressions().getValue() + "/" + index + "/" + docType + "/" + docId;
session.getProvenanceReporter().fetch(flowFile, uri, millis);
session.transfer(flowFile, REL_SUCCESS);
}
} catch (NoNodeAvailableException
| ElasticsearchTimeoutException
| ReceiveTimeoutTransportException
| NodeClosedException exceptionToRetry) {
logger.error("Failed to read into Elasticsearch due to {}, this may indicate an error in configuration "
+ "(hosts, username/password, etc.). Routing to retry",
new Object[]{exceptionToRetry.getLocalizedMessage()}, exceptionToRetry);
session.transfer(flowFile, REL_RETRY);
context.yield();
} catch (Exception e) {
logger.error("Failed to read {} from Elasticsearch due to {}", new Object[]{flowFile, e.getLocalizedMessage()}, e);
session.transfer(flowFile, REL_FAILURE);
context.yield();
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
synchronized (esClient) {
if(esClient.get() == null) {
super.setup(context);
}
}
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
final String docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
final ComponentLog logger = getLogger();
try {
logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId});
GetRequestBuilder getRequestBuilder = esClient.get().prepareGet(index, docType, docId);
final GetResponse getResponse = getRequestBuilder.execute().actionGet();
if (getResponse == null || !getResponse.isExists()) {
logger.debug("Failed to read {}/{}/{} from Elasticsearch: Document not found",
new Object[]{index, docType, docId});
// We couldn't find the document, so penalize it and send it to "not found"
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_NOT_FOUND);
} else {
flowFile = session.putAllAttributes(flowFile, new HashMap<String, String>() {{
put("filename", docId);
put("es.index", index);
put("es.type", docType);
}});
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(getResponse.getSourceAsString().getBytes(charset));
}
});
logger.debug("Elasticsearch document " + docId + " fetched, routing to success");
// The document is JSON, so update the MIME type of the flow file
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
session.getProvenanceReporter().fetch(flowFile, getResponse.remoteAddress().getAddress());
session.transfer(flowFile, REL_SUCCESS);
}
} catch (NoNodeAvailableException
| ElasticsearchTimeoutException
| ReceiveTimeoutTransportException
| NodeClosedException exceptionToRetry) {
logger.error("Failed to read into Elasticsearch due to {}, this may indicate an error in configuration "
+ "(hosts, username/password, etc.), or this issue may be transient. Routing to retry",
new Object[]{exceptionToRetry.getLocalizedMessage()}, exceptionToRetry);
session.transfer(flowFile, REL_RETRY);
context.yield();
} catch (Exception e) {
logger.error("Failed to read {} from Elasticsearch due to {}", new Object[]{flowFile, e.getLocalizedMessage()}, e);
session.transfer(flowFile, REL_FAILURE);
context.yield();
}
}