下面列出了org.junit.jupiter.api.extension.ParameterResolutionException#org.elasticsearch.client.RestClient 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void setup() {
RestClientBuilder restClientBuilder =
RestClient.builder(new HttpHost(properties.getHost(), properties.getPort(), properties.getScheme()));
if (StringUtils.isNotEmpty(properties.getUsername()) && StringUtils.isNotEmpty(properties.getPassword())) {
UsernamePasswordCredentials credentials =
new UsernamePasswordCredentials(properties.getUsername(), properties.getPassword());
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, credentials);
restClientBuilder.setHttpClientConfigCallback(
httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
} else {
if (StringUtils.isNotEmpty(properties.getUsername()) || StringUtils.isNotEmpty(properties.getPassword())) {
log.warn("Both username and password must be configured to setup ES authentication.");
}
}
client = new RestHighLevelClient(restClientBuilder);
}
/**
* Create an index in Elasticsearch. If necessary, this function should check whether a new index
* is required.
*
* @return true if a new index has been created, false otherwise
*/
public boolean createIndex() {
RestClient client = esrResource.getClient();
try {
Response r = client.performRequest("HEAD", "/" + index);
if (r.getStatusLine().getStatusCode() != 200) {
client.performRequest("PUT", "/" + index);
return true;
}
} catch (IOException ioe) {
getMonitor().error("Unable to create index", ioe);
}
return false;
}
private static RestClient connect(Map<String, Integer> coordinates,
Map<String, String> userConfig) {
Objects.requireNonNull(coordinates, "coordinates");
Preconditions.checkArgument(! coordinates.isEmpty(), "no ES coordinates specified");
final Set<HttpHost> set = new LinkedHashSet<>();
for (Map.Entry<String, Integer> entry : coordinates.entrySet()) {
set.add(new HttpHost(entry.getKey(), entry.getValue()));
}
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(userConfig.getOrDefault("esUser", "none"),
userConfig.getOrDefault("esPass", "none")));
return RestClient.builder(set.toArray(new HttpHost[0]))
.setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider))
.setMaxRetryTimeoutMillis(300000).build();
}
@Override public Schema create(SchemaPlus parentSchema, String name,
Map<String, Object> operand) {
final Map map = (Map) operand;
final ObjectMapper mapper = new ObjectMapper();
mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
try {
final Map<String, Integer> coordinates =
mapper.readValue((String) map.get("coordinates"),
new TypeReference<Map<String, Integer>>() { });
final RestClient client = connect(coordinates);
final Map<String, String> userConfig =
mapper.readValue((String) map.get("userConfig"),
new TypeReference<Map<String, String>>() { });
final String index = (String) map.get("index");
Preconditions.checkArgument(index != null, "index is missing in configuration");
return new ElasticsearchSchema(client, new ObjectMapper(), index);
} catch (IOException e) {
throw new RuntimeException("Cannot parse values from json", e);
}
}
/**
* Creates an ElasticsearchTable.
* @param client low-level ES rest client
* @param mapper Jackson API
* @param indexName elastic search index
* @param typeName elastic searh index type
*/
ElasticsearchTable(RestClient client, ObjectMapper mapper, String indexName, String typeName) {
super(Object[].class);
this.restClient = Objects.requireNonNull(client, "client");
try {
this.version = detectVersion(client, mapper);
} catch (IOException e) {
final String message = String.format(Locale.ROOT, "Couldn't detect ES version "
+ "for %s/%s", indexName, typeName);
throw new UncheckedIOException(message, e);
}
this.indexName = Objects.requireNonNull(indexName, "indexName");
this.typeName = Objects.requireNonNull(typeName, "typeName");
this.mapper = Objects.requireNonNull(mapper, "mapper");
}
private void verifyAnomaly(
String datasetName,
int intervalMinutes,
int trainTestSplit,
int shingleSize,
double minPrecision,
double minRecall,
double maxError
) throws Exception {
RestClient client = client();
String dataFileName = String.format("data/%s.data", datasetName);
String labelFileName = String.format("data/%s.label", datasetName);
List<JsonObject> data = getData(dataFileName);
List<Entry<Instant, Instant>> anomalies = getAnomalyWindows(labelFileName);
indexTrainData(datasetName, data, trainTestSplit, client);
String detectorId = createDetector(datasetName, intervalMinutes, client);
startDetector(detectorId, data, trainTestSplit, shingleSize, intervalMinutes, client);
indexTestData(data, datasetName, trainTestSplit, client);
double[] testResults = getTestResults(detectorId, data, trainTestSplit, intervalMinutes, anomalies, client);
verifyTestResults(testResults, anomalies, minPrecision, minRecall, maxError);
}
/**
* Update settings in Elasticsearch
* @param client Elasticsearch client
* @param index Index name
* @param settings Settings if any, null if no update settings
* @throws Exception if the elasticsearch API call is failing
*/
private static void updateIndexWithSettingsInElasticsearch(RestClient client, String index, String settings) throws Exception {
logger.trace("updateIndex([{}])", index);
assert client != null;
assert index != null;
if (settings != null) {
logger.trace("Found update settings for index [{}]: [{}]", index, settings);
logger.debug("updating settings for index [{}]", index);
Request request = new Request("PUT", "/" + index + "/_settings");
request.setJsonEntity(settings);
client.performRequest(request);
}
logger.trace("/updateIndex([{}])", index);
}
@Before
public void setUp() throws Exception {
/**
* 如果es集群安装了x-pack插件则以此种方式连接集群
* 1. java客户端的方式是以tcp协议在9300端口上进行通信
* 2. http客户端的方式是以http协议在9200端口上进行通信
*/
Settings settings = Settings.builder(). put("xpack.security.user", "elastic:changeme").build();
client = new PreBuiltXPackTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"),9200));
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("elastic", "changeme"));
restClient = RestClient.builder(new HttpHost("localhost",9200,"http"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
}).build();
}
/**
* Registers an instance of the high level client for Elasticsearch.
*
* @return An instance of Elasticsearch's high level rest client
*/
@Bean
public RestHighLevelClient restHighLevelClient() {
RestClientBuilder builder = RestClient.builder(new HttpHost(configProps.getHost(), configProps.getPort(), configProps.getScheme()));
RestClientBuilder.HttpClientConfigCallback httpClientConfigCallback = httpAsyncClientBuilder -> {
httpAsyncClientBuilder
.setMaxConnTotal(configProps.getRestClientMaxConnectionsTotal())
.setMaxConnPerRoute(configProps.getRestClientMaxConnectionsPerRoute());
if (null != configProps.getUser() && !configProps.getUser().isEmpty()) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(configProps.getUser(), configProps.getPassword()));
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
return httpAsyncClientBuilder;
};
builder.setHttpClientConfigCallback(httpClientConfigCallback);
builder.setMaxRetryTimeoutMillis(configProps.getRestClientMaxRetryTimeoutMillis());
//return new RestHighLevelClient(builder.build());
return new RestHighLevelClient(builder);
}
/**
* Create an alias if needed
* @param client Client to use
* @param alias Alias name
* @param index Index name
* @throws Exception When alias can not be set
*/
public static void createAlias(RestClient client, String alias, String index) throws Exception {
logger.trace("createAlias({},{})", alias, index);
assert client != null;
assert alias != null;
assert index != null;
Request request = new Request("POST", "/_aliases/");
request.setJsonEntity("{\"actions\":[{\"add\":{\"index\":\"" + index +"\",\"alias\":\"" + alias +"\"}}]}");
Response response = client.performRequest(request);
if (response.getStatusLine().getStatusCode() != 200) {
logger.warn("Could not create alias [{}] on index [{}]", alias, index);
throw new Exception("Could not create alias ["+alias+"] on index ["+index+"].");
}
logger.trace("/createAlias({},{})", alias, index);
}
@Test
public void canIndexPolicy() {
PolicyDocument policyDocument = new PolicyDocument(
"111-111",
LocalDate.of(2019, 1, 1),
LocalDate.of(2019, 12, 31),
"John Smith",
"SAFE_HOUSE",
BigDecimal.valueOf(1000),
"m.smith"
);
PolicyElasticRepository repository = new PolicyElasticRepository(
new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", el.getHttpPort(), "http"))),
new JsonConverter(objectMapper())
);
repository.save(policyDocument);
PolicyDocument saved = repository.findByNumber("111-111");
assertNotNull(saved);
}
@Bean
@ConditionalOnMissingBean
public RestHighLevelClient restHighLevelClient() {
List<String> clusterNodes = elasticsearchProperties.getClusterNodes();
clusterNodes.forEach(node -> {
try {
String[] parts = StringUtils.split(node, ":");
Assert.notNull(parts, "Must defined");
Assert.state(parts.length == 2, "Must be defined as 'host:port'");
httpHosts.add(new HttpHost(parts[0], Integer.parseInt(parts[1]), elasticsearchProperties.getSchema()));
} catch (Exception e) {
throw new IllegalStateException("Invalid ES nodes " + "property '" + node + "'", e);
}
});
RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[0]));
return getRestHighLevelClient(builder, elasticsearchProperties);
}
public static void main(String[] args) throws Exception{
client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("39.98.247.199", 9200, "http")));
// openIndex();
// createIndex();
// index(null);
anylyze();
// createIndex();
// index(null);
// search(null);
}
@Start
void start() {
String[] hosts = Plugin.getConfig(HTTP_HOST, "localhost:9200").split(" ");
HttpHost[] httpHosts = new HttpHost[hosts.length];
for (int i = 0; i < hosts.length; ++i) {
httpHosts[i] = HttpHost.create(hosts[i]);
}
client = RestClient.builder(httpHosts).setRequestConfigCallback(new CustomRequesetConfig()).build();
LOG.info("{} start succ", ElasticRestClientImpl.class.getName());
}
/**
* A constructor for the client builder.
* @param endpoint is the cluster's endpoint and is injected into the builder.
*/
public Builder(String endpoint)
{
this.endpoint = endpoint;
this.clientBuilder = RestClient.builder(HttpHost.create(this.endpoint));
this.signer = new AWS4Signer();
this.domainSplitter = Splitter.on(".");
}
/**
* Executes a match query for given field/value and returns the count of results.
*
* @param connectionConfiguration Specifies the index and type
* @param restClient To use to execute the call
* @param field The field to query
* @param value The value to match
* @return The count of documents in the search result
* @throws IOException On error communicating with Elasticsearch
*/
static int countByMatch(
ConnectionConfiguration connectionConfiguration,
RestClient restClient,
String field,
String value)
throws IOException {
String requestBody =
"{\n"
+ " \"query\" : {\"match\": {\n"
+ " \""
+ field
+ "\": \""
+ value
+ "\"\n"
+ " }}\n"
+ "}\n";
String endPoint =
String.format(
"/%s/%s/_search",
connectionConfiguration.getIndex(), connectionConfiguration.getType());
HttpEntity httpEntity = new NStringEntity(requestBody, ContentType.APPLICATION_JSON);
Request request = new Request("GET", endPoint);
request.addParameters(Collections.emptyMap());
request.setEntity(httpEntity);
Response response = restClient.performRequest(request);
JsonNode searchResult = parseResponse(response.getEntity());
if (getBackendVersion(connectionConfiguration) >= 7) {
return searchResult.path("hits").path("total").path("value").asInt();
} else {
return searchResult.path("hits").path("total").asInt();
}
}
/**
* LowLevelRestConfig
*
* @param
* @return org.elasticsearch.client.RestClient
* @author wliduo[[email protected]]
* @date 2019/8/12 18:56
*/
@Bean
public RestClient restClient() {
// 如果有多个从节点可以持续在内部new多个HttpHost,参数1是IP,参数2是端口,参数3是通信协议
RestClientBuilder clientBuilder = RestClient.builder(new HttpHost(hostname, port, "http"));
// 设置Header编码
Header[] defaultHeaders = {new BasicHeader("content-type", "application/json")};
clientBuilder.setDefaultHeaders(defaultHeaders);
// 添加其他配置,这些配置都是可选的,详情配置可看https://blog.csdn.net/jacksonary/article/details/82729556
return clientBuilder.build();
}
/** Inserts the given number of test documents into Elasticsearch. */
static void insertTestDocuments(
ConnectionConfiguration connectionConfiguration, long numDocs, RestClient restClient)
throws IOException {
List<String> data =
ElasticsearchIOTestUtils.createDocuments(
numDocs, ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
StringBuilder bulkRequest = new StringBuilder();
int i = 0;
for (String document : data) {
bulkRequest.append(
String.format(
"{ \"index\" : { \"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }%n%s%n",
connectionConfiguration.getIndex(),
connectionConfiguration.getType(),
i++,
document));
}
String endPoint =
String.format(
"/%s/%s/_bulk", connectionConfiguration.getIndex(), connectionConfiguration.getType());
HttpEntity requestBody =
new NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON);
Request request = new Request("POST", endPoint);
request.addParameters(Collections.singletonMap("refresh", "wait_for"));
request.setEntity(requestBody);
Response response = restClient.performRequest(request);
ElasticsearchIO.checkForErrors(
response.getEntity(), ElasticsearchIO.getBackendVersion(connectionConfiguration), false);
}
public ElasticsearchRequestSubmitter setup(SearchResponse response) throws IOException {
// mocks
RestHighLevelClient highLevelClient = mock(RestHighLevelClient.class);
ElasticsearchClient client = new ElasticsearchClient(mock(RestClient.class), highLevelClient);
// the client should return the given search response
when(highLevelClient.search(any())).thenReturn(response);
return new ElasticsearchRequestSubmitter(client);
}
@Override
protected void configure() {
ElasticSearchConfiguration esConfiguration = new SystemPropertiesElasticSearchConfiguration();
bind(ElasticSearchConfiguration.class).to(SystemPropertiesElasticSearchConfiguration.class);
bind(Client.class).toProvider(ElasticSearchTransportClientProvider.class).in(Singleton.class);
bind(RestClient.class).toProvider(ElasticSearchRestClientProvider.class).in(Singleton.class);
install(new ElasticSearchV5Module(esConfiguration));
}
protected RestHighLevelClient createClient(
final List<HttpHost> pairsList) throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, KeyManagementException {
RestClientBuilder builder;
if (StringUtil.isNotEmpty(user) && StringUtil.isNotEmpty(password)) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
if (StringUtil.isEmpty(trustStorePath)) {
builder = RestClient.builder(pairsList.toArray(new HttpHost[0]))
.setHttpClientConfigCallback(
httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(
credentialsProvider));
} else {
KeyStore truststore = KeyStore.getInstance("jks");
try (InputStream is = Files.newInputStream(Paths.get(trustStorePath))) {
truststore.load(is, trustStorePass.toCharArray());
}
SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(truststore, null);
final SSLContext sslContext = sslBuilder.build();
builder = RestClient.builder(pairsList.toArray(new HttpHost[0]))
.setHttpClientConfigCallback(
httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(
credentialsProvider)
.setSSLContext(sslContext));
}
} else {
builder = RestClient.builder(pairsList.toArray(new HttpHost[0]));
}
return new RestHighLevelClient(builder);
}
/**
* Connect to ES.
*/
public void connect(String keyStorePath, String keyStorePassword, String trustorePath, String trustorePassword) throws SSLException {
HttpHost httpHost = new HttpHost(this.es_host, this.es_native_port, "https");
SSLContext sslContext = initializeSSLContext(keyStorePath, keyStorePassword, trustorePath, trustorePassword);
this.client = new RestHighLevelClient(RestClient.builder(httpHost).setFailureListener(this).setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setSSLContext(sslContext)));
}
private static RestHighLevelClient createClient() throws IOException {
// Instantiate the client.
LOGGER.info("instantiating the ES client");
final HttpHost httpHost = new HttpHost(HOST_NAME, MavenHardcodedConstants.ES_PORT);
final RestClientBuilder clientBuilder =
RestClient.builder(httpHost);
final RestHighLevelClient client = new RestHighLevelClient(clientBuilder);
// Verify the connection.
LOGGER.info("verifying the ES connection");
final ClusterHealthResponse healthResponse = client
.cluster()
.health(new ClusterHealthRequest(), RequestOptions.DEFAULT);
Assertions
.assertThat(healthResponse.getStatus())
.isNotEqualTo(ClusterHealthStatus.RED);
// Delete the index.
LOGGER.info("deleting the ES index");
final DeleteIndexRequest deleteRequest =
new DeleteIndexRequest(MavenHardcodedConstants.ES_INDEX_NAME);
try {
final AcknowledgedResponse deleteResponse = client
.indices()
.delete(deleteRequest, RequestOptions.DEFAULT);
Assertions
.assertThat(deleteResponse.isAcknowledged())
.isTrue();
} catch (ElasticsearchStatusException error) {
Assertions.assertThat(error)
.satisfies(ignored -> Assertions
.assertThat(error.status())
.isEqualTo(RestStatus.NOT_FOUND));
}
return client;
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("httpDecoder", new HttpRequestDecoder());
pipeline.addLast("httpAggregator", new HttpObjectAggregator(conf.getClientMaxBodySize()));
pipeline.addLast("httpResponseEncoder", new HttpResponseEncoder());
pipeline.addLast("httpMyResponseHandler", new HttpSearchResponseHandler());
pipeline.addLast("httpSearchDecoder", new SearchQueryDecoder());
RestClient restClient = applicationContext.getBean("elasticRestClient", RestClient.class);
pipeline.addLast("httpSearchHandler", new HttpSearchHandler(this.executor, restClient));
}
private void indexTestData(List<JsonObject> data, String datasetName, int trainTestSplit, RestClient client) throws Exception {
data.stream().skip(trainTestSplit).forEach(r -> {
try {
Request req = new Request("POST", String.format("/%s/_doc/", datasetName));
req.setJsonEntity(r.toString());
client.performRequest(req);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
Thread.sleep(1_000);
}
public static RestHighLevelClient createESClient(final PropertiesProvider propertiesProvider) {
System.setProperty("es.set.netty.runtime.available.processors", "false");
String indexAddress = propertiesProvider.get(INDEX_ADDRESS_PROP).orElse(DEFAULT_ADDRESS);
RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(create(indexAddress)).setRequestConfigCallback(
requestConfigBuilder -> requestConfigBuilder
.setConnectTimeout(5000)
.setSocketTimeout(60000)).
setMaxRetryTimeoutMillis(50000)); // listener t/o cf https://github.com/ICIJ/datashare/issues/462
String clusterName = propertiesProvider.get(CLUSTER_PROP).orElse(ES_CLUSTER_NAME);
return client;
}
private Map<String, Object> getDetectionResult(String detectorId, Instant begin, Instant end, RestClient client) {
try {
Request request = new Request("POST", String.format("/_opendistro/_anomaly_detection/detectors/%s/_run", detectorId));
request
.setJsonEntity(
String.format(Locale.ROOT, "{ \"period_start\": %d, \"period_end\": %d }", begin.toEpochMilli(), end.toEpochMilli())
);
return entityAsMap(client.performRequest(request));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
RestClientBuilder restClientBuilder = (RestClientBuilder) (allArguments[0]);
RestClient restClient = restClientBuilder.build();
RestClientEnhanceInfo restClientEnhanceInfo = new RestClientEnhanceInfo();
List<Node> nodeList = restClient.getNodes();
for (Node node : nodeList) {
restClientEnhanceInfo.addHttpHost(node.getHost());
}
objInst.setSkyWalkingDynamicField(restClientEnhanceInfo);
}
public RestElasticClient(RestClient client, RestClient proxyClient, boolean enableRunAs) {
this.client = client;
this.proxyClient = proxyClient;
this.mapper = new ObjectMapper();
this.mapper.setDateFormat(DATE_FORMAT);
this.enableRunAs = enableRunAs;
}
@BeforeClass
public static void setUp() throws Exception {
ElasticsearchBaseIT.setUp();
// Create index and add data
HttpHost host = new HttpHost("127.0.0.1", esHttpPort);
restClient = RestClient.builder(host).build();
HttpEntity entity = new StringEntity(
"{\"mappings\":{\"tweet\":{\"properties\":{\"message\":{\"type\":\"text\"},\"timestamp\":{\"type\":\"date\"}}}}}"
);
restClient.performRequest(
"PUT",
"/twitter",
new HashMap<>(),
entity
);
final int numTestDocs = 100;
for (int i = 1; i <= numTestDocs; i++) {
restClient.performRequest("PUT", "/twitter/tweet/" + i, new HashMap<>(), makeTweet());
}
await().atMost(5, SECONDS).until(() -> dataIsAvailable(restClient, numTestDocs));
LOG.info("Finished setup");
}