下面列出了怎么用javax.ws.rs.container.Suspended的API类实例代码及写法,或者点击链接到github查看源代码。
@Path("/async-hello")
@GET
@Produces({ MediaType.APPLICATION_JSON })
public void asyncSayHello(@Suspended final AsyncResponse asyncResponse) {
executor.submit(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
asyncResponse.resume("Hello!");
}
});
}
@ResourceSecurity(PUBLIC)
@GET
@Path("{queryId}/{slug}/{token}")
@Produces(MediaType.APPLICATION_JSON)
public void getQueryResults(
@PathParam("queryId") QueryId queryId,
@PathParam("slug") String slug,
@PathParam("token") long token,
@QueryParam("maxWait") Duration maxWait,
@QueryParam("targetResultSize") DataSize targetResultSize,
@Context UriInfo uriInfo,
@Suspended AsyncResponse asyncResponse)
{
Query query = getQuery(queryId, slug, token);
asyncQueryResults(query, token, maxWait, targetResultSize, uriInfo, asyncResponse);
}
@GET
@Path("/v1/proxy")
@Produces(APPLICATION_JSON)
public void getNext(
@QueryParam("uri") String uri,
@QueryParam("hmac") String hash,
@Context HttpServletRequest servletRequest,
@Context UriInfo uriInfo,
@Suspended AsyncResponse asyncResponse)
{
if (!hmac.hashString(uri, UTF_8).equals(HashCode.fromString(hash))) {
throw badRequest(FORBIDDEN, "Failed to validate HMAC of URI");
}
Request.Builder request = prepareGet().setUri(URI.create(uri));
performRequest(servletRequest, asyncResponse, request, response -> buildResponse(uriInfo, response));
}
@DELETE
@Path("/v1/proxy")
@Produces(APPLICATION_JSON)
public void cancelQuery(
@QueryParam("uri") String uri,
@QueryParam("hmac") String hash,
@Context HttpServletRequest servletRequest,
@Suspended AsyncResponse asyncResponse)
{
if (!hmac.hashString(uri, UTF_8).equals(HashCode.fromString(hash))) {
throw badRequest(FORBIDDEN, "Failed to validate HMAC of URI");
}
Request.Builder request = prepareDelete().setUri(URI.create(uri));
performRequest(servletRequest, asyncResponse, request, response -> responseWithHeaders(noContent(), response));
}
@POST
@Produces({"application/xml", "text/xml", "application/json", "text/yaml", "text/x-yaml", "application/x-yaml", "application/x-protobuf",
"application/x-protostuff"})
@Path("/{logicName}/async/create")
@GZIP
@GenerateQuerySessionId(cookieBasePath = "/DataWave/Query/")
@EnrichQueryMetrics(methodType = MethodType.CREATE)
@Interceptors({RequiredInterceptor.class, ResponseInterceptor.class})
@Asynchronous
@Timed(name = "dw.query.createQueryAsync", absolute = true)
public void createQueryAsync(@Required("logicName") @PathParam("logicName") String queryLogicName, MultivaluedMap<String,String> queryParameters,
@Suspended AsyncResponse asyncResponse) {
try {
GenericResponse<String> response = createQuery(queryLogicName, queryParameters);
asyncResponse.resume(response);
} catch (Throwable t) {
asyncResponse.resume(t);
}
}
@POST
@Produces({"application/xml", "text/xml", "application/json", "text/yaml", "text/x-yaml", "application/x-yaml", "application/x-protobuf",
"application/x-protostuff"})
@Path("/{logicName}/async/createAndNext")
@GZIP
@GenerateQuerySessionId(cookieBasePath = "/DataWave/Query/")
@EnrichQueryMetrics(methodType = MethodType.CREATE_AND_NEXT)
@Interceptors({ResponseInterceptor.class, RequiredInterceptor.class})
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
@Asynchronous
@Timed(name = "dw.query.createAndNextAsync", absolute = true)
public void createQueryAndNextAsync(@Required("logicName") @PathParam("logicName") String logicName, MultivaluedMap<String,String> queryParameters,
@Suspended AsyncResponse asyncResponse) {
try {
BaseQueryResponse response = createQueryAndNext(logicName, queryParameters);
asyncResponse.resume(response);
} catch (Throwable t) {
asyncResponse.resume(t);
}
}
@GET
@Path("/{id}/async/next")
@Produces({"application/xml", "text/xml", "application/json", "text/yaml", "text/x-yaml", "application/x-yaml", "application/x-protobuf",
"application/x-protostuff"})
@GZIP
@EnrichQueryMetrics(methodType = MethodType.NEXT)
@Interceptors({ResponseInterceptor.class, RequiredInterceptor.class})
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
@Asynchronous
@Timed(name = "dw.query.nextAsync", absolute = true)
public void nextAsync(@Required("id") @PathParam("id") String id, @Suspended AsyncResponse asyncResponse) {
try {
BaseQueryResponse response = next(id);
asyncResponse.resume(response);
} catch (Throwable t) {
asyncResponse.resume(t);
}
}
@POST
@Produces("*/*")
@Path("/{logicName}/async/execute")
@GZIP
@Interceptors({ResponseInterceptor.class, RequiredInterceptor.class})
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
@Asynchronous
@Timed(name = "dw.query.executeQueryAsync", absolute = true)
public void executeAsync(@PathParam("logicName") String logicName, MultivaluedMap<String,String> queryParameters, @Context HttpHeaders httpHeaders,
@Suspended AsyncResponse asyncResponse) {
try {
StreamingOutput output = execute(logicName, queryParameters, httpHeaders);
asyncResponse.resume(output);
} catch (Throwable t) {
asyncResponse.resume(t);
}
}
@GET
@Produces({"application/xml", "text/xml", "application/json", "text/yaml", "text/x-yaml", "application/x-yaml"})
@javax.ws.rs.Path("/async/load")
@GenerateQuerySessionId(cookieBasePath = "/DataWave/CachedResults/")
@Interceptors({RequiredInterceptor.class, ResponseInterceptor.class})
@Asynchronous
public void loadAsync(@QueryParam("queryId") @Required("queryId") String queryId, @QueryParam("alias") String alias, @Suspended AsyncResponse asyncResponse) {
String nameBase = UUID.randomUUID().toString().replaceAll("-", "");
CreateQuerySessionIDFilter.QUERY_ID.set(queryId);
try {
GenericResponse<String> response = load(queryId, alias, nameBase);
asyncResponse.resume(response);
} catch (Throwable t) {
asyncResponse.resume(t);
}
}
/**
* Perform a "Long-Poll" styled get. This method will attempt to get the value for the passed key
* blocking until the key is available or passed timeout is reached. Non-blocking IO is used to
* implement this, but the API will block the calling thread if no data is available
*
* @param id - the key of the value to retrieve
* @param timeout - the timeout for the long-poll
* @param asyncResponse - async response used to trigger the poll early should the appropriate
* value become available
*/
@GET
@ManagedAsync
@Path("/orders/{id}")
@Produces({MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN})
public void getWithTimeout(@PathParam("id") final String id,
@QueryParam("timeout") @DefaultValue(CALL_TIMEOUT) Long timeout,
@Suspended final AsyncResponse asyncResponse) {
setTimeout(timeout, asyncResponse);
log.info("running GET on this node");
try {
Order order = ordersStore().get(id);
if (order == null) {
log.info("Delaying get as order not present for id " + id);
outstandingRequests.put(id, new FilteredResponse<>(asyncResponse, (k, v) -> true));
} else {
asyncResponse.resume(order);
}
} catch (InvalidStateStoreException e) {
//Store not ready so delay
log.info("Delaying request for " + id + " because state store is not ready.");
outstandingRequests.put(id, new FilteredResponse<>(asyncResponse, (k, v) -> true));
}
}
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Path("{nodeName}")
public void createResource(
@Context Request request,
@Suspended final AsyncResponse asyncResponse,
@PathParam("rscName") String rscName,
@PathParam("nodeName") String nodeName,
String jsonData
)
{
try
{
// stuff single resource in a array and forward to the multiple resource creator
JsonGenTypes.ResourceCreate rscData = objectMapper.readValue(jsonData, JsonGenTypes.ResourceCreate.class);
rscData.resource.node_name = nodeName;
ArrayList<JsonGenTypes.ResourceCreate> rscDatas = new ArrayList<>();
rscDatas.add(rscData);
createResource(request, asyncResponse, rscName, objectMapper.writeValueAsString(rscDatas));
}
catch (IOException ioExc)
{
ApiCallRcRestUtils.handleJsonParseException(ioExc, asyncResponse);
}
}
@Path("/async-hello")
@GET
@Produces({ MediaType.APPLICATION_JSON })
public void asyncSayHello(@Suspended final AsyncResponse asyncResponse) {
executor.submit(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
asyncResponse.resume("Hello!");
}
});
}
@PUT
@Path("{nodeName}/toggle-disk/diskless/{disklessPool}")
public void toggleDiskDiskless(
@Context Request request,
@Suspended final AsyncResponse asyncResponse,
@PathParam("nodeName") String nodeName,
@PathParam("rscName") String rscName,
@PathParam("disklessPool") String disklessPool
)
{
Flux<ApiCallRc> flux = ctrlRscToggleDiskApiCallHandler.resourceToggleDisk(
nodeName,
rscName,
disklessPool,
null,
true)
.subscriberContext(requestHelper.createContext(ApiConsts.API_TOGGLE_DISK, request));
requestHelper.doFlux(asyncResponse, ApiCallRcRestUtils.mapToMonoResponse(flux));
}
@PUT
@Path("{nodeName}/migrate-disk/{fromNode}/{storagePool}")
public void migrateDisk(
@Context Request request,
@Suspended final AsyncResponse asyncResponse,
@PathParam("nodeName") String nodeName,
@PathParam("fromNode") String fromNode,
@PathParam("rscName") String rscName,
@PathParam("storagePool") String storagePool
)
{
Flux<ApiCallRc> flux = ctrlRscToggleDiskApiCallHandler.resourceToggleDisk(
nodeName,
rscName,
storagePool,
fromNode,
false)
.subscriberContext(requestHelper.createContext(ApiConsts.API_TOGGLE_DISK, request));
requestHelper.doFlux(asyncResponse, ApiCallRcRestUtils.mapToMonoResponse(flux));
}
@GET
@Path("storage-pools")
public void viewStoragePools(
@Context Request request,
@Suspended AsyncResponse asyncResponse,
@QueryParam("nodes") List<String> nodes,
@QueryParam("storage_pools") List<String> storagePools,
@QueryParam("props") List<String> propFilters,
@DefaultValue("0") @QueryParam("limit") int limit,
@DefaultValue("0") @QueryParam("offset") int offset
)
{
List<String> nodesFilter = nodes != null ? nodes : Collections.emptyList();
List<String> storagePoolsFilter = storagePools != null ? storagePools : Collections.emptyList();
RequestHelper.safeAsyncResponse(asyncResponse, () ->
{
Flux<List<StorPoolApi>> flux = ctrlStorPoolListApiCallHandler
.listStorPools(nodesFilter, storagePoolsFilter, propFilters)
.subscriberContext(requestHelper.createContext(ApiConsts.API_LST_STOR_POOL, request));
requestHelper.doFlux(asyncResponse, storPoolListToResponse(flux, limit, offset));
});
}
@POST
@Path("disable/{nodeA}/{nodeB}")
public void disableProxy(
@Context Request request,
@Suspended AsyncResponse asyncResponse,
@PathParam("rscName") String rscName,
@PathParam("nodeA") String nodeA,
@PathParam("nodeB") String nodeB
)
{
Flux<ApiCallRc> flux = ctrlDrbdProxyDisableApiCallHandler.disableProxy(
null,
nodeA,
nodeB,
rscName
).subscriberContext(requestHelper.createContext(ApiConsts.API_DISABLE_DRBD_PROXY, request));
requestHelper.doFlux(asyncResponse, ApiCallRcRestUtils.mapToMonoResponse(flux));
}
@PUT
@Path("{storagePool}")
public void modifyStoragePoolDefinition(
@Context Request request,
@Suspended final AsyncResponse asyncResponse,
@PathParam("storagePool") String storagePoolName,
String jsonData
)
throws IOException
{
JsonGenTypes.StoragePoolDefinitionModify data = objectMapper
.readValue(jsonData, JsonGenTypes.StoragePoolDefinitionModify.class);
Flux<ApiCallRc> flux = ctrlApiCallHandler.modifyStorPoolDfn(
null,
storagePoolName,
data.override_props,
new HashSet<>(data.delete_props),
new HashSet<>(data.delete_namespaces)
)
.subscriberContext(requestHelper.createContext(ApiConsts.API_MOD_STOR_POOL_DFN, request));
requestHelper.doFlux(asyncResponse, ApiCallRcRestUtils.mapToMonoResponse(flux, Response.Status.OK));
}
@POST
@Path("/schemas/{schemaid}/versions")
@Consumes({"application/json"})
@Produces({"application/json"})
public void apiSchemasSchemaidVersionsPost(
@Suspended AsyncResponse response,
@PathParam("schemaid") @NotNull String schemaid,
@NotNull @Valid NewSchemaVersion schema,
@DefaultValue("false") @QueryParam("verify") boolean verify
)
throws ArtifactNotFoundException {
service.apiSchemasSchemaidVersionsPost(response, schemaid, schema, verify);
}
@ResourceSecurity(PUBLIC)
@GET
@Path("queued/{queryId}/{slug}/{token}")
@Produces(APPLICATION_JSON)
public void getStatus(
@PathParam("queryId") QueryId queryId,
@PathParam("slug") String slug,
@PathParam("token") long token,
@QueryParam("maxWait") Duration maxWait,
@Context UriInfo uriInfo,
@Suspended AsyncResponse asyncResponse)
{
Query query = getQuery(queryId, slug, token);
// wait for query to be dispatched, up to the wait timeout
ListenableFuture<?> futureStateChange = addTimeout(
query.waitForDispatched(),
() -> null,
WAIT_ORDERING.min(MAX_WAIT_TIME, maxWait),
timeoutExecutor);
// when state changes, fetch the next result
ListenableFuture<QueryResults> queryResultsFuture = Futures.transform(
futureStateChange,
ignored -> query.getQueryResults(token, uriInfo),
responseExecutor);
// transform to Response
ListenableFuture<Response> response = Futures.transform(
queryResultsFuture,
queryResults -> Response.ok(queryResults).build(),
directExecutor());
bindAsyncResponse(asyncResponse, response, responseExecutor);
}
@ResourceSecurity(INTERNAL_ONLY)
@GET
@Path("{taskId}")
@Produces(MediaType.APPLICATION_JSON)
public void getTaskInfo(
@PathParam("taskId") final TaskId taskId,
@HeaderParam(PRESTO_CURRENT_STATE) TaskState currentState,
@HeaderParam(PRESTO_MAX_WAIT) Duration maxWait,
@Context UriInfo uriInfo,
@Suspended AsyncResponse asyncResponse)
{
requireNonNull(taskId, "taskId is null");
if (currentState == null || maxWait == null) {
TaskInfo taskInfo = taskManager.getTaskInfo(taskId);
if (shouldSummarize(uriInfo)) {
taskInfo = taskInfo.summarize();
}
asyncResponse.resume(taskInfo);
return;
}
Duration waitTime = randomizeWaitTime(maxWait);
ListenableFuture<TaskInfo> futureTaskInfo = addTimeout(
taskManager.getTaskInfo(taskId, currentState),
() -> taskManager.getTaskInfo(taskId),
waitTime,
timeoutExecutor);
if (shouldSummarize(uriInfo)) {
futureTaskInfo = Futures.transform(futureTaskInfo, TaskInfo::summarize, directExecutor());
}
// For hard timeout, add an additional time to max wait for thread scheduling contention and GC
Duration timeout = new Duration(waitTime.toMillis() + ADDITIONAL_WAIT_TIME.toMillis(), MILLISECONDS);
bindAsyncResponse(asyncResponse, futureTaskInfo, responseExecutor)
.withTimeout(timeout);
}
@ResourceSecurity(INTERNAL_ONLY)
@GET
@Path("{taskId}/status")
@Produces(MediaType.APPLICATION_JSON)
public void getTaskStatus(
@PathParam("taskId") TaskId taskId,
@HeaderParam(PRESTO_CURRENT_STATE) TaskState currentState,
@HeaderParam(PRESTO_MAX_WAIT) Duration maxWait,
@Context UriInfo uriInfo,
@Suspended AsyncResponse asyncResponse)
{
requireNonNull(taskId, "taskId is null");
if (currentState == null || maxWait == null) {
TaskStatus taskStatus = taskManager.getTaskStatus(taskId);
asyncResponse.resume(taskStatus);
return;
}
Duration waitTime = randomizeWaitTime(maxWait);
// TODO: With current implementation, a newly completed driver group won't trigger immediate HTTP response,
// leading to a slight delay of approx 1 second, which is not a major issue for any query that are heavy weight enough
// to justify group-by-group execution. In order to fix this, REST endpoint /v1/{task}/status will need change.
ListenableFuture<TaskStatus> futureTaskStatus = addTimeout(
taskManager.getTaskStatus(taskId, currentState),
() -> taskManager.getTaskStatus(taskId),
waitTime,
timeoutExecutor);
// For hard timeout, add an additional time to max wait for thread scheduling contention and GC
Duration timeout = new Duration(waitTime.toMillis() + ADDITIONAL_WAIT_TIME.toMillis(), MILLISECONDS);
bindAsyncResponse(asyncResponse, futureTaskStatus, responseExecutor)
.withTimeout(timeout);
}
@GET
@Path("/v1/info")
@Produces(APPLICATION_JSON)
public void getInfo(
@Context HttpServletRequest servletRequest,
@Suspended AsyncResponse asyncResponse)
{
Request.Builder request = prepareGet()
.setUri(uriBuilderFrom(remoteUri).replacePath("/v1/info").build());
performRequest(servletRequest, asyncResponse, request, response ->
responseWithHeaders(Response.ok(response.getBody()), response));
}
@POST
@Path("/v1/statement")
@Produces(APPLICATION_JSON)
public void postStatement(
String statement,
@Context HttpServletRequest servletRequest,
@Context UriInfo uriInfo,
@Suspended AsyncResponse asyncResponse)
{
Request.Builder request = preparePost()
.setUri(uriBuilderFrom(remoteUri).replacePath("/v1/statement").build())
.setBodyGenerator(createStaticBodyGenerator(statement, UTF_8));
performRequest(servletRequest, asyncResponse, request, response -> buildResponse(uriInfo, response));
}
@GET
@Path("/sqs-delayed")
@Timed
@ManagedAsync
public void scheduleSqsDelayed(
@QueryParam("name") Optional<String> name,
@QueryParam("delaySeconds") Optional<Integer> delaySeconds,
@Suspended final AsyncResponse asyncResponse) {
scheduleJob(name, delaySeconds, JobScheduleType.SQS_DELAYED, asyncResponse);
}
@GET
@Path("/immediate")
@Timed
@ManagedAsync
public void scheduleImmediate(
@QueryParam("name") Optional<String> name,
@QueryParam("delaySeconds") Optional<Integer> delaySeconds,
@Suspended final AsyncResponse asyncResponse) {
scheduleJob(name, Optional.of(0), JobScheduleType.IMMEDIATE, asyncResponse);
}
@GET
@Path("/scheduled")
@Timed
@ManagedAsync
public void scheduleScheduled(
@QueryParam("name") Optional<String> name,
@QueryParam("delaySeconds") Optional<Integer> delaySeconds,
@Suspended final AsyncResponse asyncResponse) {
scheduleJob(name, delaySeconds, JobScheduleType.SCHEDULED, asyncResponse);
}
@Produces(TEXT_PLAIN)
@Path("/suspended/timeout-resume")
@GET
public void getAsyncResponseTimeoutResume(@Suspended final AsyncResponse ar) {
ar.setTimeout(1, MINUTES);
ctx.executionContext().executor().timer(10, MILLISECONDS)
.afterOnComplete(() -> ar.resume("DONE"))
.subscribe();
}
@Produces(TEXT_PLAIN)
@Path("/suspended/timeout-expire")
@GET
public void getAsyncResponseTimeoutExpire(@Suspended final AsyncResponse ar) {
// Set timeout twice to ensure users can update it at will
ar.setTimeout(1, MINUTES);
ar.setTimeout(1, NANOSECONDS);
}
@Produces(TEXT_PLAIN)
@Path("/suspended/timeout-expire-handled")
@GET
public void getAsyncResponseTimeoutExpireHandled(@Suspended final AsyncResponse ar) {
ar.setTimeoutHandler(ar2 -> ar2.resume(status(GATEWAY_TIMEOUT).build()));
ar.setTimeout(1, NANOSECONDS);
}
@Produces(TEXT_PLAIN)
@Path("/suspended/resume-timeout")
@GET
public void getAsyncResponseResumeTimeout(@Suspended final AsyncResponse ar) {
ar.resume("DONE");
ar.setTimeout(1, MINUTES);
}