下面列出了怎么用org.elasticsearch.action.ActionRequest的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Take all requests from queue. This method is thread safe.
*
* @return a bulk request
*/
public IngestRequest takeAll() {
IngestRequest request = new IngestRequest();
while (!requests.isEmpty()) {
ActionRequest<?> actionRequest = requests.poll();
request.add(actionRequest);
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
long length = indexRequest.source() != null ? indexRequest.source().length() + REQUEST_OVERHEAD : REQUEST_OVERHEAD;
sizeInBytes.addAndGet(-length);
} else if (actionRequest instanceof DeleteRequest) {
sizeInBytes.addAndGet(REQUEST_OVERHEAD);
}
}
return request;
}
private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, ActionRequest request, String index, Throwable e) {
if (request instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request;
if (index.equals(indexRequest.index())) {
responses.set(idx, new BulkItemResponse(idx, "index", new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e)));
return true;
}
} else if (request instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) request;
if (index.equals(deleteRequest.index())) {
responses.set(idx, new BulkItemResponse(idx, "delete", new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), e)));
return true;
}
} else if (request instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) request;
if (index.equals(updateRequest.index())) {
responses.set(idx, new BulkItemResponse(idx, "update", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), e)));
return true;
}
} else {
throw new ElasticsearchException("Parsed unknown request in bulk actions: " + request.getClass().getSimpleName());
}
return false;
}
@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
if (action instanceof IndexRequest) {
Map<String, Object> json = new HashMap<>();
json.put("data", ((IndexRequest) action).source());
indexer.add(
Requests.indexRequest()
.index(index)
.type(type)
.id(((IndexRequest) action).id())
.source(json));
} else {
throw new IllegalStateException("unexpected");
}
}
@Override
public void onFailure(ActionRequest actionRequest, Throwable throwable, int i, RequestIndexer requestIndexer) throws Throwable {
if (ExceptionUtils.findThrowable(throwable, EsRejectedExecutionException.class).isPresent()) {
requestIndexer.add(new ActionRequest[]{actionRequest});
} else {
if (ExceptionUtils.findThrowable(throwable, SocketTimeoutException.class).isPresent()) {
return;
} else {
Optional<IOException> exp = ExceptionUtils.findThrowable(throwable, IOException.class);
if (exp.isPresent()) {
IOException ioExp = exp.get();
if (ioExp != null && ioExp.getMessage() != null && ioExp.getMessage().contains("max retry timeout")) {
log.error(ioExp.getMessage());
return;
}
}
}
throw throwable;
}
}
protected TermsByQueryRequest getTermsByQueryRequest(FilterJoinNode node, ActionRequest parentRequest) {
String[] lookupIndices = node.getLookupIndices();
String[] lookupTypes = node.getLookupTypes();
String lookupPath = node.getLookupPath();
XContentBuilder lookupQuery = node.getLookupQuery();
TermsByQueryRequest.Ordering ordering = node.getOrderBy();
Integer maxTermsPerShard = node.getMaxTermsPerShard();
TermsByQueryRequest.TermsEncoding termsEncoding = node.getTermsEncoding();
TermsByQueryRequest request = new TermsByQueryRequest(parentRequest, lookupIndices)
.field(lookupPath)
.types(lookupTypes)
.query(lookupQuery)
.orderBy(ordering)
.maxTermsPerShard(maxTermsPerShard)
.termsEncoding(termsEncoding);
if (node.hasCardinality()) {
request.expectedTerms(node.getCardinality());
}
return request;
}
@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
if (action instanceof IndexRequest) {
Map<String, Object> json = new HashMap<>();
json.put("data", ((IndexRequest) action).source());
indexer.add(
Requests.indexRequest()
.index(index)
.type(type)
.id(((IndexRequest) action).id())
.source(json));
} else {
throw new IllegalStateException("unexpected");
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(consistencyLevel.id());
out.writeVInt(requests.size());
for (ActionRequest request : requests) {
if (request instanceof IndexRequest) {
out.writeByte((byte) 0);
} else if (request instanceof DeleteRequest) {
out.writeByte((byte) 1);
} else if (request instanceof UpdateRequest) {
out.writeByte((byte) 2);
}
request.writeTo(out);
}
out.writeBoolean(refresh);
timeout.writeTo(out);
}
@Override
public void onFailure(ActionRequest actionRequest, Throwable throwable, int i, RequestIndexer requestIndexer) throws Throwable {
if (ExceptionUtils.findThrowable(throwable, EsRejectedExecutionException.class).isPresent()) {
requestIndexer.add(new ActionRequest[]{actionRequest});
} else {
if (ExceptionUtils.findThrowable(throwable, SocketTimeoutException.class).isPresent()) {
return;
} else {
Optional<IOException> exp = ExceptionUtils.findThrowable(throwable, IOException.class);
if (exp.isPresent()) {
IOException ioExp = exp.get();
if (ioExp != null && ioExp.getMessage() != null && ioExp.getMessage().contains("max retry timeout")) {
log.error(ioExp.getMessage());
return;
}
}
}
throw throwable;
}
}
@Override
public void onFailure(ActionRequest actionRequest, Throwable throwable, int i, RequestIndexer requestIndexer) throws Throwable {
if (ExceptionUtils.findThrowable(throwable, EsRejectedExecutionException.class).isPresent()) {
requestIndexer.add(new ActionRequest[]{actionRequest});
} else {
if (ExceptionUtils.findThrowable(throwable, SocketTimeoutException.class).isPresent()) {
return;
} else {
Optional<IOException> exp = ExceptionUtils.findThrowable(throwable, IOException.class);
if (exp.isPresent()) {
IOException ioExp = exp.get();
if (ioExp != null && ioExp.getMessage() != null && ioExp.getMessage().contains("max retry timeout")) {
log.error(ioExp.getMessage());
return;
}
}
}
throw throwable;
}
}
/**
* Execute the ActionRequest and returns the REST response using the channel.
*/
public void execute() throws Exception {
ActionRequest request = requestBuilder.request();
//todo: maby change to instanceof multi?
if(requestBuilder instanceof JoinRequestBuilder){
executeJoinRequestAndSendResponse();
}
else if (request instanceof SearchRequest) {
client.search((SearchRequest) request, new RestStatusToXContentListener<SearchResponse>(channel));
} else if (requestBuilder instanceof SqlElasticDeleteByQueryRequestBuilder) {
throw new UnsupportedOperationException("currently not support delete on elastic 2.0.0");
}
else if(request instanceof GetIndexRequest) {
this.requestBuilder.getBuilder().execute( new GetIndexRequestRestListener(channel, (GetIndexRequest) request));
}
else {
throw new Exception(String.format("Unsupported ActionRequest provided: %s", request.getClass().getName()));
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
timeout.writeTo(out);
out.writeLong(ingestId);
shardId.writeTo(out);
out.writeVInt(actionRequests.size());
for (ActionRequest<?> actionRequest : actionRequests) {
if (actionRequest == null) {
out.writeBoolean(false);
continue;
}
out.writeBoolean(true);
if (actionRequest instanceof IndexRequest) {
out.writeBoolean(true);
} else if (actionRequest instanceof DeleteRequest) {
out.writeBoolean(false);
} else {
throw new ElasticsearchException("action request not supported: " + actionRequest.getClass().getName());
}
actionRequest.writeTo(out);
}
}
/**
* Copy constructor that creates a new get request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public GetRequest(GetRequest getRequest, ActionRequest originalRequest) {
super(originalRequest);
this.index = getRequest.index;
this.type = getRequest.type;
this.id = getRequest.id;
this.routing = getRequest.routing;
this.preference = getRequest.preference;
this.fields = getRequest.fields;
this.fetchSourceContext = getRequest.fetchSourceContext;
this.refresh = getRequest.refresh;
this.realtime = getRequest.realtime;
this.version = getRequest.version;
this.versionType = getRequest.versionType;
this.ignoreErrorsOnGeneratedFields = getRequest.ignoreErrorsOnGeneratedFields;
}
@SuppressWarnings("unchecked")
@Test
public void testApplyActionRequestMakesPassThroughCallToOnResponse() {
//given the chain will be called and wrappers the listener
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
ActionListener listener = (ActionListener) args[3];
listener.onResponse(response);
return null;
}
}).when(chain).proceed(any(Task.class), anyString(), any(ActionRequest.class), any(ActionListener.class));
//when
filter.apply(task, action, request, listener, chain );
//then the original listener should be notified
verify(listener).onResponse(response);
}
public FilterJoinVisitor(Client client, RootNode root, FilterJoinCache cache, ActionRequest parentRequest) {
this.parentRequest = parentRequest;
this.client = client;
this.root = root;
this.cache = cache;
this.metadata = new CoordinateSearchMetadata();
}
@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
indexer.add(action);
} else {
// rethrow all other failures
throw failure;
}
}
UpdateResult(UpdateHelper.Result result, ActionRequest actionRequest, boolean retry, Throwable error, WriteResult writeResult) {
this.result = result;
this.actionRequest = actionRequest;
this.retry = retry;
this.error = error;
this.writeResult = writeResult;
this.noopResult = null;
}
@SuppressWarnings("unchecked")
public <Request extends ActionRequest, Response extends ActionResponse> void registerAction(GenericAction<Request, Response> action, Class<? extends HttpAction<Request, Response>> httpAction) {
try {
HttpAction<Request, Response> instance = httpAction.getDeclaredConstructor(Settings.class).newInstance(settings);
actionMap.put(action.name(), new HttpElasticsearchClient.ActionEntry<>(action, instance));
} catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e ) {
logger.error(e.getMessage(), e);
}
}
/**
* Add multiple {@link ActionRequest} to the indexer to prepare for sending requests to Elasticsearch.
*
* @param actionRequests The multiple {@link ActionRequest} to add.
* @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or {@link UpdateRequest}
*/
@Deprecated
default void add(ActionRequest... actionRequests) {
for (ActionRequest actionRequest : actionRequests) {
if (actionRequest instanceof IndexRequest) {
add((IndexRequest) actionRequest);
} else if (actionRequest instanceof DeleteRequest) {
add((DeleteRequest) actionRequest);
} else if (actionRequest instanceof UpdateRequest) {
add((UpdateRequest) actionRequest);
} else {
throw new IllegalArgumentException("RequestIndexer only supports Index, Delete and Update requests");
}
}
}
public IngestRequest add(ActionRequest<?> request) {
if (request instanceof IndexRequest) {
add((IndexRequest) request);
} else if (request instanceof DeleteRequest) {
add((DeleteRequest) request);
} else {
throw new IllegalArgumentException("no support for request [" + request + "]");
}
return this;
}
@Test
public void failoverIsExecutedAfterNonSuccessfulRequest() {
// given
Builder builder = createTestObjectFactoryBuilder();
ClientObjectFactory<TransportClient, BulkRequest> config = spy(builder.build());
FailoverPolicy failoverPolicy = spy(new NoopFailoverPolicy());
Function handler = spy(config.createFailureHandler(failoverPolicy));
when(config.createFailureHandler(any())).thenReturn(handler);
Settings settings = Settings.builder().put("node.local", "true").build();
TransportClient client = spy(TransportClient.builder().settings(settings).build());
client.addTransportAddress(new LocalTransportAddress("1"));
when(config.createClient()).thenReturn(client);
BulkProcessorFactory bulkProcessorFactory = new BulkProcessorFactory();
BatchEmitter batchEmitter = bulkProcessorFactory.createInstance(
1,
100,
config,
failoverPolicy);
String payload1 = "test1";
ActionRequest testRequest = createTestRequest(payload1);
// when
batchEmitter.add(testRequest);
// then
ArgumentCaptor<BulkRequest> captor = ArgumentCaptor.forClass(BulkRequest.class);
verify(handler, times(1)).apply(captor.capture());
assertEquals(payload1, new BulkRequestIntrospector().items(captor.getValue()).iterator().next());
}
/**
* Add a request to the current BulkRequest.
* @param request Request to add
* @param payload Optional payload
* @return the current bulk request
*/
public BulkRequest add(ActionRequest request, @Nullable Object payload) {
if (request instanceof IndexRequest) {
add((IndexRequest) request, payload);
} else if (request instanceof DeleteRequest) {
add((DeleteRequest) request, payload);
} else if (request instanceof UpdateRequest) {
add((UpdateRequest) request, payload);
} else {
throw new IllegalArgumentException("No support for request [" + request + "]");
}
return this;
}
public static String convertRequestToJson(ActionRequest request) throws IOException {
BytesStreamOutput bytesStreamOutput = new BytesStreamOutput();
request.writeTo(bytesStreamOutput);
XContentBuilder builder = XContentFactory.jsonBuilder(bytesStreamOutput);
builder.prettyPrint();
// builder.startObject();
// builder.endObject();
BytesArray bytesArray = builder.bytes().toBytesArray();
return new String(bytesArray.array(), bytesArray.arrayOffset(), bytesArray.length());
}
/**
* Send a nonblocking request with a timeout and return response. Blocking is not allowed in a
* transport call context. See BaseFuture.blockingAllowed
* @param request request like index/search/get
* @param LOG log
* @param consumer functional interface to operate as a client request like client::get
* @param <Request> ActionRequest
* @param <Response> ActionResponse
* @return the response
* @throws ElasticsearchTimeoutException when we cannot get response within time.
* @throws IllegalStateException when the waiting thread is interrupted
*/
public <Request extends ActionRequest, Response extends ActionResponse> Optional<Response> timedRequest(
Request request,
Logger LOG,
BiConsumer<Request, ActionListener<Response>> consumer
) {
try {
AtomicReference<Response> respReference = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
consumer
.accept(
request,
new LatchedActionListener<Response>(
ActionListener
.wrap(
response -> { respReference.set(response); },
exception -> { LOG.error("Cannot get response for request {}, error: {}", request, exception); }
),
latch
)
);
if (!latch.await(requestTimeout.getSeconds(), TimeUnit.SECONDS)) {
throw new ElasticsearchTimeoutException("Cannot get response within time limit: " + request.toString());
}
return Optional.ofNullable(respReference.get());
} catch (InterruptedException e1) {
LOG.error(CommonErrorMessages.WAIT_ERR_MSG);
throw new IllegalStateException(e1);
}
}
/**
* Send an asynchronous request and handle response with the provided listener.
* @param <Request> ActionRequest
* @param <Response> ActionResponse
* @param request request body
* @param consumer request method, functional interface to operate as a client request like client::get
* @param listener needed to handle response
*/
public <Request extends ActionRequest, Response extends ActionResponse> void asyncRequest(
Request request,
BiConsumer<Request, ActionListener<Response>> consumer,
ActionListener<Response> listener
) {
consumer
.accept(
request,
ActionListener.wrap(response -> { listener.onResponse(response); }, exception -> { listener.onFailure(exception); })
);
}
/**
* Execute a transport action and handle response with the provided listener.
* @param <Request> ActionRequest
* @param <Response> ActionResponse
* @param action transport action
* @param request request body
* @param listener needed to handle response
*/
public <Request extends ActionRequest, Response extends ActionResponse> void execute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
client
.execute(
action,
request,
ActionListener.wrap(response -> { listener.onResponse(response); }, exception -> { listener.onFailure(exception); })
);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
timeout.writeTo(out);
out.writeLong(ingestId);
out.writeVInt(requests.size());
for (ActionRequest<?> request : requests) {
if (request instanceof IndexRequest) {
out.writeByte((byte) 0);
} else if (request instanceof DeleteRequest) {
out.writeByte((byte) 1);
}
request.writeTo(out);
}
}
public ESJobContext(int id,
String operationName,
List<? extends ActionRequest> requests,
List<? extends ActionListener> listeners,
List<SettableFuture<TaskResult>> resultFutures,
TransportAction transportAction,
@Nullable FlatProjectorChain projectorChain) {
super(id, LOGGER);
this.operationName = operationName;
this.requests = requests;
this.listeners = listeners;
this.resultFutures = resultFutures;
this.transportAction = transportAction;
this.projectorChain = projectorChain;
}
@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
if (pluginEnabled) {
return Arrays.asList(
new ActionHandler<>(ClusteringAction.INSTANCE, TransportClusteringAction.class),
new ActionHandler<>(ListAlgorithmsAction.INSTANCE, ListAlgorithmsAction.TransportListAlgorithmsAction.class));
}
return Collections.emptyList();
}
@Test
public void failoverIsExecutedAfterNonSuccessfulRequest() {
// given
Builder builder = createTestObjectFactoryBuilder();
ClientObjectFactory<TransportClient, BulkRequest> config = spy(builder.build());
FailoverPolicy failoverPolicy = spy(new NoopFailoverPolicy());
Function handler = spy(config.createFailureHandler(failoverPolicy));
when(config.createFailureHandler(any())).thenReturn(handler);
Settings settings = Settings.builder().build();
TransportClient client = spy(new PreBuiltTransportClient(settings));
client.addTransportAddress(new LocalTransportAddress("1"));
when(config.createClient()).thenReturn(client);
BulkProcessorFactory bulkProcessorFactory = new BulkProcessorFactory();
BatchEmitter batchEmitter = bulkProcessorFactory.createInstance(
1,
100,
config,
failoverPolicy);
String payload1 = "test1";
ActionRequest testRequest = createTestRequest(payload1);
// when
batchEmitter.add(testRequest);
// then
ArgumentCaptor<BulkRequest> captor = ArgumentCaptor.forClass(BulkRequest.class);
verify(handler, times(1)).apply(captor.capture());
assertEquals(payload1, new BulkRequestIntrospector().items(captor.getValue()).iterator().next());
}
@Override
public <RequestT extends ActionRequest, ResponseT extends ActionResponse,
RequestBuilderT extends ActionRequestBuilder<RequestT, ResponseT, RequestBuilderT>> RequestBuilderT prepareExecute(
Action<RequestT, ResponseT, RequestBuilderT> action)
{
return null;
}