下面列出了org.springframework.transaction.annotation.Isolation#SERIALIZABLE 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
@Transactional(isolation = Isolation.SERIALIZABLE, propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public ConsoleBaseResponse<?> addCProxy(String groupName, String clusterName, String host) throws Exception {
ConsumeGroup group = consumeGroupService.findByGroupName(groupName);
if (group == null) {
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "group not found");
}
Cluster cluster = clusterService.findByClusterName(clusterName);
if (cluster == null) {
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "cluster not found");
}
if (!validNodeExist(host, cluster, NodeType.CONSUMER_PROXY)) {
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "host not found");
}
List<ConsumeSubscription> subscriptionList = findByNotNullGroupClusterTopicId(group.getId(), cluster.getId(), null);
if (CollectionUtils.isEmpty(subscriptionList)) {
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "subscription not found");
}
addCProxy(host, subscriptionList);
return ConsoleBaseResponse.success();
}
@Override
@Transactional(isolation = Isolation.SERIALIZABLE)
public void disableVacationMode() {
long userId = CustomUser.getCurrentUserId();
User user = userRepository.getOne(userId);
if (!canDisableVacationMode(user)) {
// A hacking attempt, the button should be disabled.
logger.warn("Disabling vacation mode failed, requirements not met: userId={}", userId);
throw new CannotDisableVacationModeException();
}
logger.info("Disabling vacation mode: userId={}", userId);
Date now = Date.from(Instant.ofEpochSecond(Instant.now().getEpochSecond()));
// Bodies must be updated before vacation until is set, otherwise resources will be calculated incorrectly.
updateActivitiesAndBodies(user, now);
user.setVacationUntil(null);
user.setForcedVacation(false);
}
@Override
@Transactional(isolation = Isolation.SERIALIZABLE)
public void enableVacationMode() {
long userId = CustomUser.getCurrentUserId();
User user = userRepository.getOne(userId);
if (!canEnableVacationMode(user)) {
// A hacking attempt, the button should be disabled.
logger.warn("Enabling vacation mode failed, requirements not met: userId={}", userId);
throw new CannotEnableVacationModeException();
}
Date now = Date.from(Instant.ofEpochSecond(Instant.now().getEpochSecond()));
Date until = Date.from(now.toInstant().plus(2, ChronoUnit.DAYS));
logger.info("Enabling vacation mode: userId={} until='{}'", userId, until);
enableVacationMode(user, now, until);
}
@Override
@Transactional(isolation = Isolation.SERIALIZABLE, propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public ConsoleBaseResponse<?> pushCProxyByCluster(String clusterName) throws Exception {
Cluster cluster = clusterService.findByClusterName(clusterName);
if (cluster == null) {
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "集群不存在");
}
List<Node> list = nodeService.findByClusterIdNodeType(cluster.getId(), NodeType.CONSUMER_PROXY);
if (CollectionUtils.isEmpty(list)) {
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "集群暂无CProxy");
}
for (Node node : list) {
updateCProxyConfig(node.getId());
}
return ConsoleBaseResponse.success();
}
@Override
@Transactional(isolation = Isolation.SERIALIZABLE, propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public ConsoleBaseResponse<?> pushPProxyByCluster(String clusterName) throws Exception {
Cluster cluster = clusterService.findByClusterName(clusterName);
if (cluster == null) {
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "集群不存在");
}
List<Node> list = nodeService.findByClusterIdNodeType(cluster.getId(), NodeType.PRODUCER_PROXY);
if (CollectionUtils.isEmpty(list)) {
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "集群暂无PProxy");
}
for (Node node : list) {
updatePProxyConfig(node.getId());
}
return ConsoleBaseResponse.success();
}
@Override
@Transactional(isolation = Isolation.SERIALIZABLE, propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public ConsoleBaseResponse<?> pushCproxyConfig(String host) throws Exception {
List<Node> nodeList = nodeService.findByHostNodeType(host, NodeType.CONSUMER_PROXY);
if (CollectionUtils.isEmpty(nodeList)) {
nodeList = nodeService.findByHostNodeType(HostUtils.getIp(host), NodeType.CONSUMER_PROXY);
}
if (CollectionUtils.isEmpty(nodeList)) {
LOGGER.warn("[ZK_V4] not found cproxy node, host={}", host);
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "host不存在");
}
updateCProxyConfig(nodeList.get(0).getId());
return ConsoleBaseResponse.success();
}
@Override
@Transactional(isolation = Isolation.SERIALIZABLE, propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public ConsoleBaseResponse<?> pushPproxyConfig(String host) throws Exception {
List<Node> nodeList = nodeService.findByHostNodeType(host, NodeType.PRODUCER_PROXY);
if (CollectionUtils.isEmpty(nodeList)) {
nodeList = nodeService.findByHostNodeType(HostUtils.getIp(host), NodeType.PRODUCER_PROXY);
}
if (CollectionUtils.isEmpty(nodeList)) {
LOGGER.warn("[ZK_V4] not found pproxy node, host={}", host);
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "host不存在");
}
updatePProxyConfig(nodeList.get(0).getId());
return ConsoleBaseResponse.success();
}
@Override
@Transactional(isolation = Isolation.SERIALIZABLE, propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public ConsoleBaseResponse<?> pushTopicByCluster(String clusterName) throws Exception {
Cluster cluster = clusterService.findByClusterName(clusterName);
if (cluster == null) {
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "集群不存在");
}
List<Long> list = topicConfService.findTopicByClusterIdWithDeleted(cluster.getId());
if (CollectionUtils.isEmpty(list)) {
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "集群不存在topic");
}
Set<Long> clusters = Sets.newHashSet();
for (Long topicId : list) {
List<TopicConf> confList = topicConfService.findByTopicId(topicId);
clusters.addAll(confList.stream().map(TopicConf::getClusterId).collect(Collectors.toSet()));
updateTopicConfig(topicId, null);
}
updatePProxyConfigByClusterId("pushTopicByCluster", clusters);
return ConsoleBaseResponse.success();
}
@Override
@Transactional(isolation = Isolation.SERIALIZABLE, propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public ConsoleBaseResponse<?> removeCProxy(String groupName, String host) throws Exception {
ConsumeGroup group = consumeGroupService.findByGroupName(groupName);
if (group == null) {
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "groupName not found");
}
List<ConsumeSubscription> subList = findByGroupId(group.getId());
if (CollectionUtils.isEmpty(subList)) {
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "subscription not found");
}
removeCProxy(host, subList);
return ConsoleBaseResponse.success();
}
@Override
@Transactional(isolation = Isolation.SERIALIZABLE, propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public ConsoleBaseResponse<?> pushTopicConfig(String topicName) throws Exception {
Topic topic = topicService.findByTopicName(topicName);
if (topic == null) {
LOGGER.warn("[ZK_V4] not found topic, topic={}", topicName);
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "topic不存在");
}
return pushTopicConfig(topic);
}
@Override
@Transactional(isolation = Isolation.SERIALIZABLE, propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public ConsoleBaseResponse<?> create(MqServerBo bo) throws Exception {
if (findByName(bo.getName()) != null) {
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "cluster name existed");
}
if(bo.getType() == MqServerType.ROCKETMQ.getIndex()) {
if(!bo.getAddr().contains(";") && bo.getAddr().split(":").length > 2) {
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "RMQ cluster address must be separated by semicolon");
}
} else {
if(!bo.getAddr().contains(",") && bo.getAddr().split(":").length > 2) {
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "Kafka cluster address must be separated by comma");
}
}
MqServer mqServer = new MqServer();
BeanUtils.copyProperties(bo, mqServer);
mqServer.setIsDelete(IsDelete.NO.getIndex());
mqServer.setCreateTime(new Date());
mqServerMapper.insertSelective(mqServer);
Map<String, Long> map = Maps.newHashMap();
map.put("id", mqServer.getId());
return ConsoleBaseResponse.success(map);
}
@Override
@Transactional(isolation = Isolation.SERIALIZABLE, propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public ConsoleBaseResponse<?> deleteByGroupId(String user, Long groupId) throws Exception {
List<ConsumeSubscription> list = findByGroupId(groupId);
if (CollectionUtils.isEmpty(list)) {
return ConsoleBaseResponse.success();
}
deleteByIds(user, list.stream().map(ConsumeSubscription::getId).collect(Collectors.toList()));
Set<Long> clusterIdSet = list.stream().map(ConsumeSubscription::getClusterId).collect(Collectors.toSet());
pushV4ZkInfo(groupId, clusterIdSet);
return ConsoleBaseResponse.success();
}
@Override
@Transactional(isolation = Isolation.SERIALIZABLE, propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public ConsoleBaseResponse<?> removeCProxy(String host) throws Exception {
List<ConsumeSubscription> subList = findAll();
removeCProxy(host, subList);
return ConsoleBaseResponse.success();
}
@Override
@Transactional(isolation = Isolation.SERIALIZABLE, propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public ConsoleBaseResponse<?> changeState(String user, Long groupId, Integer state) throws Exception {
if (IsEnable.getByIndex(state.byteValue()) == null) {
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "only 0 or 1");
}
if (!validUserExist(user, groupId)) {
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, user + "not authorized");
}
return consumeSubscriptionService.changeState(groupId, state);
}
@Override
@Transactional(isolation=Isolation.SERIALIZABLE)
public void setNodeSlotMode(Node node, SlotMode mode, int cores, int ram) {
if (nodeDao.hasProcs(node, true)) {
throw new PlowWriteException("You cannot change slot configuration with running procs.");
}
nodeDao.setSlotMode(node, mode, cores, ram);
}
@Override
@Transactional(isolation = Isolation.SERIALIZABLE, propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public ConsoleBaseResponse<?> create(MqServerBo bo) throws Exception {
if (findByName(bo.getName()) != null) {
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "cluster name existed");
}
if(bo.getType() == MqServerType.ROCKETMQ.getIndex()) {
if(!bo.getAddr().contains(";") && bo.getAddr().split(":").length > 2) {
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "RMQ cluster address must be separated by semicolon");
}
} else {
if(!bo.getAddr().contains(",") && bo.getAddr().split(":").length > 2) {
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "Kafka cluster address must be separated by comma");
}
}
MqServer mqServer = new MqServer();
BeanUtils.copyProperties(bo, mqServer);
mqServer.setIsDelete(IsDelete.NO.getIndex());
mqServer.setCreateTime(new Date());
mqServerMapper.insertSelective(mqServer);
Map<String, Long> map = Maps.newHashMap();
map.put("id", mqServer.getId());
return ConsoleBaseResponse.success(map);
}
@Override
@Transactional(isolation = Isolation.SERIALIZABLE, propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public ConsoleBaseResponse<?> pushGroupConfig(String groupName) throws Exception {
ConsumeGroup group = consumeGroupService.findByGroupName(groupName);
if (group == null) {
LOGGER.warn("[ZK_V4] not found group, group={}", groupName);
return ConsoleBaseResponse.error(ConsoleBaseResponse.Status.INVALID_PARAM, "group不存在");
}
return pushGroupConfig(group);
}
@Override
@Transactional(isolation = Isolation.SERIALIZABLE)
public void sendProbes(long bodyId, CoordinatesDto targetCoordinates, int numProbes) {
Map<UnitKindDto, Integer> units = Collections.singletonMap(UnitKindDto.ESPIONAGE_PROBE, numProbes);
int holdTime = 0;
int factor = 10;
ResourcesDto resources = new ResourcesDto(0.0, 0.0, 0.0);
SendFleetParamsDto params = new SendFleetParamsDto(bodyId, units, MissionDto.ESPIONAGE, holdTime, targetCoordinates,
factor, resources, null);
send(params);
}
@Override
@Transactional(isolation = Isolation.SERIALIZABLE, propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public ConsoleBaseResponse<?> removePProxy(String host) throws Exception {
List<TopicConf> confList = topicConfService.findAll();
removePProxy(host, confList);
return ConsoleBaseResponse.success();
}
@Override
@Transactional(isolation = Isolation.SERIALIZABLE, propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public void updateCProxyConfig(Long nodeId) throws Exception {
Node node = nodeService.findById(nodeId);
if (node == null) {
LOGGER.warn("[ZK_V4_CProxy] node not found, nodeId={}", nodeId);
throw new ZkConfigException(String.format("[CProxy] node not found, nodeId=%s", nodeId));
}
if (node.getNodeType() != NodeType.CONSUMER_PROXY.getIndex()) {
LOGGER.warn("[ZK_V4_CProxy] node isn't cproxy node, nodeId={}", nodeId);
throw new ZkConfigException(String.format("[CProxy] node isn't cproxy node, nodeId=%s", nodeId));
}
Cluster cluster = clusterService.findById(node.getClusterId());
if (cluster == null || cluster.getIsDelete() == IsDelete.YES.getIndex()) {
LOGGER.warn(String.format("[ZK_V4_CProxy] not found cluster, nodeId=%s, clusterId=%s", nodeId, node.getClusterId()));
throw new ZkConfigException(String.format("[CProxy] not found cluster, nodeId=%s, clusterId=%s", nodeId, node.getClusterId()));
}
List<ClusterMqserverRelation> relationList = mqserverRelationService.findByClusterId(cluster.getId(), ClusterMqServerRelationType.C_PROXY);
if (CollectionUtils.isEmpty(relationList)) {
LOGGER.warn(String.format("[ZK_V4_CProxy] not found ClusterMqserverRelation, nodeId=%s, clusterId=%s", nodeId, node.getClusterId()));
throw new ZkConfigException(String.format("[CProxy] not found ClusterMqserverRelation, nodeId=%s, clusterId=%s", nodeId, node.getClusterId()));
}
String host = HostUtils.getIpPortFromHost(node.getHost(), DEFAULT_CPROXY_PORT);
CProxyConfig zkCProxyConfig = zkService.getCProxy(host);
if (node.getIsDelete() == IsDelete.YES.getIndex()) {
LOGGER.info("[ZK_V4_CProxy] node is deleted, delete it from zk, nodeId={}, clusterId={}, host={}", nodeId, node.getClusterId(), node.getHost());
if (zkCProxyConfig != null) {
updateBrokerConfigByMqserverId(relationList);
zkService.deleteCProxy(host);
}
return;
}
CProxyConfig cProxyConfig = buildCProxyConfig(host, cluster, relationList);
zkService.createOrUpdateCProxy(cProxyConfig);
if (zkCProxyConfig == null) {
updateBrokerConfigByMqserverId(relationList);
}
LOGGER.debug("[ZK_V4_CProxy] update cproxy success, nodeId={}, clusterId={}, host={}, cProxyConfig={}", nodeId, node.getClusterId(), node.getHost(), cProxyConfig);
LOGGER.info("[ZK_V4_CProxy] update cproxy success, nodeId={}, clusterId={}, host={}", nodeId, node.getClusterId(), node.getHost());
}