下面列出了javax.ws.rs.core.Response.Status#PRECONDITION_FAILED 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* @return an HTTP response status based on the first issue contained within the OperationOutcomeIssue list with a code;
* Response.Status.INTERNAL_SERVER_ERROR if it is null or empty
*/
public static Response.Status issueListToStatus(List<Issue> issues) {
if (issues != null) {
for (Issue issue : issues) {
IssueType code = issue.getCode();
if (code != null && code.getValue() != null) {
IssueType.ValueSet issueType = code.getValueAsEnumConstant();
// Special case for IssueType CONFLICT which can be either an HTTP 409 (Conflict) or HTTP 412 (Precondition failed)
if (issueType == IssueType.ValueSet.CONFLICT &&
FHIRUtil.getExtensionStringValue(code, EXTENSION_URL_HTTP_FAILED_PRECONDITION) != null) {
return Status.PRECONDITION_FAILED;
} else if (issueType == IssueType.ValueSet.NOT_SUPPORTED &&
"interaction".equals(FHIRUtil.getExtensionStringValue(code, EXTENSION_URL_NOT_SUPPORTED_DETAIL))) {
return Status.BAD_REQUEST;
}
return issueTypeToResponseCode(issueType);
}
}
}
return Status.INTERNAL_SERVER_ERROR;
}
/**
* Checks whether the broker is allowed to do read-write operations based on the existence of a node in global
* zookeeper.
*
* @throws WebApplicationException
* if broker has a read only access if broker is not connected to the global zookeeper
*/
public void validatePoliciesReadOnlyAccess() {
boolean arePoliciesReadOnly = true;
try {
arePoliciesReadOnly = globalZkCache().exists(POLICIES_READONLY_FLAG_PATH);
} catch (Exception e) {
log.warn("Unable to fetch contents of [{}] from global zookeeper", POLICIES_READONLY_FLAG_PATH, e);
throw new RestException(e);
}
if (arePoliciesReadOnly) {
log.debug("Policies are read-only. Broker cannot do read-write operations");
throw new RestException(Status.FORBIDDEN, "Broker is forbidden to do read-write operations");
} else {
// Make sure the broker is connected to the global zookeeper before writing. If not, throw an exception.
if (globalZkCache().getZooKeeper().getState() != States.CONNECTED) {
log.debug("Broker is not connected to the global zookeeper");
throw new RestException(Status.PRECONDITION_FAILED,
"Broker needs to be connected to global zookeeper before making a read-write operation");
} else {
// Do nothing, just log the message.
log.debug("Broker is allowed to make read-write operations");
}
}
}
protected void internalDeleteTopic(boolean authoritative) {
validateWriteOperationOnTopic(authoritative);
try {
pulsar().getBrokerService().deleteTopic(topicName.toString(), false).get();
log.info("[{}] Successfully removed topic {}", clientAppId(), topicName);
} catch (Exception e) {
Throwable t = e.getCause();
log.error("[{}] Failed to delete topic {}", clientAppId(), topicName, t);
if (t instanceof TopicBusyException) {
throw new RestException(Status.PRECONDITION_FAILED, "Topic has active producers/subscriptions");
} else if (t instanceof MetadataNotFoundException) {
throw new RestException(Status.NOT_FOUND, "Topic not found");
} else {
throw new RestException(t);
}
}
}
private void validateOffloadPolicies(OffloadPolicies offloadPolicies) {
if (offloadPolicies == null) {
log.warn("[{}] Failed to update offload configuration for namespace {}: offloadPolicies is null",
clientAppId(), namespaceName);
throw new RestException(Status.PRECONDITION_FAILED,
"The offloadPolicies must be specified for namespace offload.");
}
if (!offloadPolicies.driverSupported()) {
log.warn("[{}] Failed to update offload configuration for namespace {}: " +
"driver is not supported, support value: {}",
clientAppId(), namespaceName, OffloadPolicies.getSupportedDriverNames());
throw new RestException(Status.PRECONDITION_FAILED,
"The driver is not supported, support value: " + OffloadPolicies.getSupportedDriverNames());
}
if (!offloadPolicies.bucketValid()) {
log.warn("[{}] Failed to update offload configuration for namespace {}: bucket must be specified",
clientAppId(), namespaceName);
throw new RestException(Status.PRECONDITION_FAILED,
"The bucket must be specified for namespace offload.");
}
}
private void validateClusters(TenantInfo info) {
// empty cluster shouldn't be allowed
if (info == null || info.getAllowedClusters().stream().filter(c -> !StringUtils.isBlank(c)).collect(Collectors.toSet()).isEmpty()
|| info.getAllowedClusters().stream().anyMatch(ac -> StringUtils.isBlank(ac))) {
log.warn("[{}] Failed to validate due to clusters are empty", clientAppId());
throw new RestException(Status.PRECONDITION_FAILED, "Clusters can not be empty");
}
List<String> nonexistentClusters;
try {
Set<String> availableClusters = clustersListCache().get();
Set<String> allowedClusters = info.getAllowedClusters();
nonexistentClusters = allowedClusters.stream()
.filter(cluster -> !(availableClusters.contains(cluster) || Constants.GLOBAL_CLUSTER.equals(cluster)))
.collect(Collectors.toList());
} catch (Exception e) {
log.error("[{}] Failed to get available clusters", clientAppId(), e);
throw new RestException(e);
}
if (nonexistentClusters.size() > 0) {
log.warn("[{}] Failed to validate due to clusters {} do not exist", clientAppId(), nonexistentClusters);
throw new RestException(Status.PRECONDITION_FAILED, "Clusters do not exist");
}
}
/**
* Checks whether the broker is the owner of the namespace. Otherwise it will raise an exception to redirect the
* client to the appropriate broker. If no broker owns the namespace yet, this function will try to acquire the
* ownership by default.
*
* @param authoritative
*
* @param tenant
* @param cluster
* @param namespace
*/
protected void validateTopicOwnership(TopicName topicName, boolean authoritative) {
NamespaceService nsService = pulsar().getNamespaceService();
try {
// per function name, this is trying to acquire the whole namespace ownership
Optional<URL> webUrl = nsService.getWebServiceUrl(topicName, authoritative, isRequestHttps(), false);
// Ensure we get a url
if (webUrl == null || !webUrl.isPresent()) {
log.info("Unable to get web service url");
throw new RestException(Status.PRECONDITION_FAILED, "Failed to find ownership for topic:" + topicName);
}
if (!nsService.isServiceUnitOwned(topicName)) {
boolean newAuthoritative = isLeaderBroker(pulsar());
// Replace the host and port of the current request and redirect
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.get().getHost())
.port(webUrl.get().getPort()).replaceQueryParam("authoritative", newAuthoritative).build();
// Redirect
log.debug("Redirecting the rest call to {}", redirect);
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}
} catch (TimeoutException te) {
String msg = String.format("Finding owner for topic %s timed out", topicName);
log.error(msg, te);
throw new RestException(Status.INTERNAL_SERVER_ERROR, msg);
} catch (IllegalArgumentException iae) {
// namespace format is not valid
log.debug("Failed to find owner for topic: {}", topicName, iae);
throw new RestException(Status.PRECONDITION_FAILED, "Can't find owner for topic " + topicName);
} catch (IllegalStateException ise) {
log.debug("Failed to find owner for topic: {}", topicName, ise);
throw new RestException(Status.PRECONDITION_FAILED, "Can't find owner for topic " + topicName);
} catch (WebApplicationException wae) {
throw wae;
} catch (Exception oe) {
log.debug("Failed to find owner for topic: {}", topicName, oe);
throw new RestException(oe);
}
}
@GET
@Path("/topics")
@ApiOperation(value = "Proxy topic stats api", response = Map.class, responseContainer = "Map")
@ApiResponses(value = { @ApiResponse(code = 412, message = "Proxy logging should be > 2 to capture topic stats"),
@ApiResponse(code = 503, message = "Proxy service is not initialized") })
public Map<String, TopicStats> topics() {
Optional<Integer> logLevel = proxyService().getConfiguration().getProxyLogLevel();
if (!logLevel.isPresent() || logLevel.get() < 2) {
throw new RestException(Status.PRECONDITION_FAILED, "Proxy doesn't have logging level 2");
}
return proxyService().getTopicStats();
}
@Deprecated
protected void validateNamespaceName(String property, String cluster, String namespace) {
try {
this.namespaceName = NamespaceName.get(property, cluster, namespace);
} catch (IllegalArgumentException e) {
log.warn("[{}] Failed to create namespace with invalid name {}", clientAppId(), namespace, e);
throw new RestException(Status.PRECONDITION_FAILED, "Namespace name is not valid");
}
}
protected void validateTopicName(String property, String namespace, String encodedTopic) {
String topic = Codec.decode(encodedTopic);
try {
this.namespaceName = NamespaceName.get(property, namespace);
this.topicName = TopicName.get(domain(), namespaceName, topic);
} catch (IllegalArgumentException e) {
log.warn("[{}] Failed to validate topic name {}://{}/{}/{}", clientAppId(), domain(), property, namespace,
topic, e);
throw new RestException(Status.PRECONDITION_FAILED, "Topic name is not valid");
}
this.topicName = TopicName.get(domain(), namespaceName, topic);
}
@Test
public void testRestExceptionMessage() {
String message = "my-message";
RestException exception = new RestException(Status.PRECONDITION_FAILED, message);
assertEquals(exception.getMessage(), message);
}
@Deprecated
protected void validateTopicName(String property, String cluster, String namespace, String encodedTopic) {
String topic = Codec.decode(encodedTopic);
try {
this.namespaceName = NamespaceName.get(property, cluster, namespace);
this.topicName = TopicName.get(domain(), namespaceName, topic);
} catch (IllegalArgumentException e) {
log.warn("[{}] Failed to validate topic name {}://{}/{}/{}/{}", clientAppId(), domain(), property, cluster,
namespace, topic, e);
throw new RestException(Status.PRECONDITION_FAILED, "Topic name is not valid");
}
}
protected void validateClusterExists(String cluster) {
try {
if (!clustersCache().get(path("clusters", cluster)).isPresent()) {
throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist.");
}
} catch (Exception e) {
throw new RestException(e);
}
}
/**
* Validate update of number of partition for partitioned topic.
* If there's already non partition topic with same name and contains partition suffix "-partition-"
* followed by numeric value X then the new number of partition of that partitioned topic can not be greater
* than that X else that non partition topic will essentially be overwritten and cause unexpected consequence.
*
* @param topicName
*/
private void validatePartitionTopicUpdate(String topicName, int numberOfPartition) {
List<String> existingTopicList = internalGetList();
TopicName partitionTopicName = TopicName.get(domain(), namespaceName, topicName);
PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(partitionTopicName, false, false);
int oldPartition = metadata.partitions;
String prefix = topicName + TopicName.PARTITIONED_TOPIC_SUFFIX;
for (String exsitingTopicName : existingTopicList) {
if (exsitingTopicName.contains(prefix)) {
try {
long suffix = Long.parseLong(exsitingTopicName.substring(
exsitingTopicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX)
+ TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
// Skip partition of partitioned topic by making sure the numeric suffix greater than old partition number.
if (suffix >= oldPartition && suffix <= (long) numberOfPartition) {
log.warn("[{}] Already have non partition topic {} which contains partition " +
"suffix '-partition-' and end with numeric value smaller than the new number of partition. " +
"Update of partitioned topic {} could cause conflict.", clientAppId(), exsitingTopicName, topicName);
throw new RestException(Status.PRECONDITION_FAILED,
"Already have non partition topic" + exsitingTopicName + " which contains partition suffix '-partition-' " +
"and end with numeric value and end with numeric value smaller than the new " +
"number of partition. Update of partitioned topic " + topicName + " could cause conflict.");
}
} catch (NumberFormatException e) {
// Do nothing, if value after partition suffix is not pure numeric value,
// as it can't conflict with internal created partitioned topic's name.
}
}
}
}
/**
* Validate non partition topic name,
* Validation will fail and throw RestException if
* 1) Topic name contains partition suffix "-partition-" and the remaining part follow the partition
* suffix is numeric value larger than the number of partition if there's already a partition topic with same
* name(the part before suffix "-partition-").
* 2)Topic name contains partition suffix "-partition-" and the remaining part follow the partition
* suffix is numeric value but there isn't a partitioned topic with same name.
*
* @param topicName
*/
private void validateNonPartitionTopicName(String topicName) {
if (topicName.contains(TopicName.PARTITIONED_TOPIC_SUFFIX)) {
try {
// First check if what's after suffix "-partition-" is number or not, if not number then can create.
int partitionIndex = topicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX);
long suffix = Long.parseLong(topicName.substring(partitionIndex
+ TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
TopicName partitionTopicName = TopicName.get(domain(), namespaceName, topicName.substring(0, partitionIndex));
PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(partitionTopicName, false, false);
// Partition topic index is 0 to (number of partition - 1)
if (metadata.partitions > 0 && suffix >= (long) metadata.partitions) {
log.warn("[{}] Can't create topic {} with \"-partition-\" followed by" +
" a number smaller then number of partition of partitioned topic {}.",
clientAppId(), topicName, partitionTopicName.getLocalName());
throw new RestException(Status.PRECONDITION_FAILED,
"Can't create topic " + topicName + " with \"-partition-\" followed by" +
" a number smaller then number of partition of partitioned topic " +
partitionTopicName.getLocalName());
} else if (metadata.partitions == 0) {
log.warn("[{}] Can't create topic {} with \"-partition-\" followed by" +
" numeric value if there isn't a partitioned topic {} created.",
clientAppId(), topicName, partitionTopicName.getLocalName());
throw new RestException(Status.PRECONDITION_FAILED,
"Can't create topic " + topicName + " with \"-partition-\" followed by" +
" numeric value if there isn't a partitioned topic " +
partitionTopicName.getLocalName() + " created.");
}
// If there is a partitioned topic with the same name and numeric suffix is smaller than the
// number of partition for that partitioned topic, validation will pass.
} catch (NumberFormatException e) {
// Do nothing, if value after partition suffix is not pure numeric value,
// as it can't conflict if user want to create partitioned topic with same
// topic name prefix in the future.
}
}
}
protected Set<String> internalGetNamespaceReplicationClusters() {
validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION, PolicyOperation.READ);
if (!namespaceName.isGlobal()) {
throw new RestException(Status.PRECONDITION_FAILED,
"Cannot get the replication clusters for a non-global namespace");
}
Policies policies = getNamespacePolicies(namespaceName);
return policies.replication_clusters;
}
private void unsubscribe(NamespaceName nsName, String bundleRange, String subscription) {
try {
List<Topic> topicList = pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(),
nsName.toString() + "/" + bundleRange);
List<CompletableFuture<Void>> futures = Lists.newArrayList();
if (subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
throw new RestException(Status.PRECONDITION_FAILED, "Cannot unsubscribe a replication cursor");
} else {
for (Topic topic : topicList) {
Subscription sub = topic.getSubscription(subscription);
if (sub != null) {
futures.add(sub.delete());
}
}
}
FutureUtil.waitForAll(futures).get();
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.error("[{}] Failed to unsubscribe {} for namespace {}/{}", clientAppId(), subscription,
nsName.toString(), bundleRange, e);
if (e.getCause() instanceof SubscriptionBusyException) {
throw new RestException(Status.PRECONDITION_FAILED, "Subscription has active connected consumers");
}
throw new RestException(e.getCause());
}
}
@POST
@Path("/logging/{logLevel}")
@ApiOperation(hidden = true, value = "Change proxy logging level dynamically", notes = "It only changes log-level in memory, change it config file to persist the change")
@ApiResponses(value = { @ApiResponse(code = 412, message = "Proxy log level can be [0-2]"), })
public void updateProxyLogLevel(@PathParam("logLevel") int logLevel) {
if (logLevel < 0 || logLevel > 2) {
throw new RestException(Status.PRECONDITION_FAILED, "Proxy log level can be only [0-2]");
}
proxyService().setProxyLogLevel(logLevel);
}
private static Status issueTypeToResponseCode(IssueType.ValueSet value) {
switch (value) {
case INFORMATIONAL:
return Status.OK;
case FORBIDDEN:
case SUPPRESSED:
case SECURITY:
case THROTTLED: // Consider HTTP 429?
return Status.FORBIDDEN;
case PROCESSING:
case BUSINESS_RULE: // Consider HTTP 422?
case CODE_INVALID: // Consider HTTP 422?
case EXTENSION: // Consider HTTP 422?
case INVALID: // Consider HTTP 422?
case INVARIANT: // Consider HTTP 422?
case REQUIRED: // Consider HTTP 422?
case STRUCTURE: // Consider HTTP 422?
case VALUE: // Consider HTTP 422?
case TOO_COSTLY: // Consider HTTP 403?
case DUPLICATE: // Consider HTTP 409?
return Status.BAD_REQUEST;
case DELETED:
return Status.GONE;
case CONFLICT:
return Status.CONFLICT;
case MULTIPLE_MATCHES:
return Status.PRECONDITION_FAILED;
case EXPIRED:
case LOGIN:
case UNKNOWN:
return Status.UNAUTHORIZED;
case NOT_FOUND:
case NOT_SUPPORTED:
return Status.NOT_FOUND;
case TOO_LONG:
return Status.REQUEST_ENTITY_TOO_LARGE;
case EXCEPTION:
case LOCK_ERROR:
case NO_STORE:
case TIMEOUT:
case TRANSIENT:
case INCOMPLETE:
default:
return Status.INTERNAL_SERVER_ERROR;
}
}
public void validateBundleOwnership(NamespaceBundle bundle, boolean authoritative, boolean readOnly)
throws Exception {
NamespaceService nsService = pulsar().getNamespaceService();
try {
// Call getWebServiceUrl() to acquire or redirect the request
// Get web service URL of owning broker.
// 1: If namespace is assigned to this broker, continue
// 2: If namespace is assigned to another broker, redirect to the webservice URL of another broker
// authoritative flag is ignored
// 3: If namespace is unassigned and readOnly is true, return 412
// 4: If namespace is unassigned and readOnly is false:
// - If authoritative is false and this broker is not leader, forward to leader
// - If authoritative is false and this broker is leader, determine owner and forward w/ authoritative=true
// - If authoritative is true, own the namespace and continue
Optional<URL> webUrl = nsService.getWebServiceUrl(bundle, authoritative, isRequestHttps(), readOnly);
// Ensure we get a url
if (webUrl == null || !webUrl.isPresent()) {
log.warn("Unable to get web service url");
throw new RestException(Status.PRECONDITION_FAILED,
"Failed to find ownership for ServiceUnit:" + bundle.toString());
}
if (!nsService.isServiceUnitOwned(bundle)) {
boolean newAuthoritative = this.isLeaderBroker();
// Replace the host and port of the current request and redirect
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.get().getHost())
.port(webUrl.get().getPort()).replaceQueryParam("authoritative", newAuthoritative).build();
log.debug("{} is not a service unit owned", bundle);
// Redirect
log.debug("Redirecting the rest call to {}", redirect);
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}
} catch (TimeoutException te) {
String msg = String.format("Finding owner for ServiceUnit %s timed out", bundle);
log.error(msg, te);
throw new RestException(Status.INTERNAL_SERVER_ERROR, msg);
} catch (IllegalArgumentException iae) {
// namespace format is not valid
log.debug("Failed to find owner for ServiceUnit {}", bundle, iae);
throw new RestException(Status.PRECONDITION_FAILED,
"ServiceUnit format is not expected. ServiceUnit " + bundle);
} catch (IllegalStateException ise) {
log.debug("Failed to find owner for ServiceUnit {}", bundle, ise);
throw new RestException(Status.PRECONDITION_FAILED, "ServiceUnit bundle is actived. ServiceUnit " + bundle);
} catch (NullPointerException e) {
log.warn("Unable to get web service url");
throw new RestException(Status.PRECONDITION_FAILED, "Failed to find ownership for ServiceUnit:" + bundle);
} catch (WebApplicationException wae) {
throw wae;
}
}
protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, AutoTopicCreationOverride autoTopicCreationOverride) {
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (!AutoTopicCreationOverride.isValidOverride(autoTopicCreationOverride)) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid configuration for autoTopicCreationOverride");
}
if (maxPartitions > 0 && autoTopicCreationOverride.defaultNumPartitions > maxPartitions) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be less than or equal to " + maxPartitions);
}
// Force to read the data s.t. the watch to the cache content is setup.
policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
policies -> {
if (policies.isPresent()) {
Entry<Policies, Stat> policiesNode = policies.get();
policiesNode.getKey().autoTopicCreationOverride = autoTopicCreationOverride;
try {
// Write back the new policies into zookeeper
globalZk().setData(path(POLICIES, namespaceName.toString()),
jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
(rc, path1, ctx, stat) -> {
if (rc == KeeperException.Code.OK.intValue()) {
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
String autoOverride = autoTopicCreationOverride.allowAutoTopicCreation ? "enabled" : "disabled";
log.info("[{}] Successfully {} autoTopicCreation on namespace {}", clientAppId(), autoOverride, namespaceName);
asyncResponse.resume(Response.noContent().build());
} else {
String errorMsg = String.format(
"[%s] Failed to modify autoTopicCreation status for namespace %s",
clientAppId(), namespaceName);
if (rc == KeeperException.Code.NONODE.intValue()) {
log.warn("{} : does not exist", errorMsg);
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
} else if (rc == KeeperException.Code.BADVERSION.intValue()) {
log.warn("{} : concurrent modification", errorMsg);
asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
} else {
asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
}
}
}, null);
return null;
} catch (Exception e) {
log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return null;
}
} else {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
return null;
}
}
).exceptionally(e -> {
log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return null;
});
}