下面列出了java.util.concurrent.atomic.AtomicReference#get() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
static <T> Future<T> initiateAndDie(final Task<T> task) {
final AtomicReference<Future<T>> result = new AtomicReference<Future<T>>();
Runnable r = new Runnable() {
public void run() {
result.set(task.run());
}
};
Thread t = new Thread(r);
t.start();
while (t.isAlive()) {
try {
t.join();
} catch (InterruptedException x) {
}
}
return result.get();
}
/**
* Schedules the given {@link RunnableWithResult} on the JavaFX application
* thread and waits for its execution to finish.
*
* @param runnableWithResult
* @throws Throwable
*/
public synchronized <T> T runAndWait(final RunnableWithResult<T> runnableWithResult) throws Throwable {
final AtomicReference<Throwable> throwableRef = new AtomicReference<>(null);
final AtomicReference<T> resultRef = new AtomicReference<>(null);
final CountDownLatch latch = new CountDownLatch(1);
run(() -> {
try {
resultRef.set(runnableWithResult.run());
} catch (Throwable t) {
throwableRef.set(t);
} finally {
latch.countDown();
}
});
wait(latch);
Throwable throwable = throwableRef.get();
if (throwable != null) {
throw throwable;
}
return resultRef.get();
}
private void checkVirtualSource(String file) throws Exception {
FileObject fo = getTestFile(file);
BaseDocument doc = getDocument(fo);
Source source = Source.create(doc);
final AtomicReference<String> jsCodeRef = new AtomicReference<>();
ParserManager.parse(Collections.singleton(source), new UserTask() {
@Override
public void run(ResultIterator resultIterator) throws Exception {
ResultIterator jsRi = WebUtils.getResultIterator(resultIterator, "text/javascript");
if (jsRi != null) {
jsCodeRef.set(jsRi.getSnapshot().getText().toString());
} else {
//no js embedded code
}
}
});
String jsCode = jsCodeRef.get();
assertDescriptionMatches(fo, jsCode, false, ".virtual", true);
}
protected <T> T doInJDBC(ConnectionCallable<T> callable) {
AtomicReference<T> result = new AtomicReference<>();
Session session = null;
Transaction txn = null;
try {
session = getSessionFactory().openSession();
txn = session.beginTransaction();
session.doWork(connection -> {
result.set(callable.execute(connection));
});
txn.commit();
} catch (RuntimeException e) {
if ( txn != null && txn.isActive() ) txn.rollback();
throw e;
} finally {
if (session != null) {
session.close();
}
}
return result.get();
}
/**
* Removes the given flow rule from the bucket.
*
* @param rule the rule to remove
* @param term the term in which the change occurred
* @param clock the logical clock
* @return the removed flow entry
*/
public FlowEntry remove(FlowEntry rule, long term, LogicalClock clock) {
final AtomicReference<FlowEntry> removedRule = new AtomicReference<>();
flowBucket.computeIfPresent(rule.id(), (flowId, flowEntries) -> {
flowEntries.computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
if (rule instanceof DefaultFlowEntry) {
DefaultFlowEntry toRemove = (DefaultFlowEntry) rule;
if (stored instanceof DefaultFlowEntry) {
DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
if (toRemove.created() < storedEntry.created()) {
LOGGER.debug("Trying to remove more recent flow entry {} (stored: {})", toRemove, stored);
// the key is not updated, removedRule remains null
return stored;
}
}
}
removedRule.set(stored);
return null;
});
return flowEntries.isEmpty() ? null : flowEntries;
});
if (removedRule.get() != null) {
recordUpdate(term, clock.getTimestamp());
return removedRule.get();
} else {
return null;
}
}
@Override
protected FlowFile doTransform(ProcessContext context, ProcessSession session, FlowFile flowFile, InvocationContextProperties contextProperties) {
AtomicReference<Map<String, String>> attributeRef = new AtomicReference<Map<String, String>>();
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
attributeRef.set(transform(in, out, contextProperties));
}
});
if (attributeRef.get() != null) {
flowFile = session.putAllAttributes(flowFile, attributeRef.get());
}
return flowFile;
}
/**
* Given a set of already existing slot names within an utterance and another slot name this method takes care of returning a unique slot name not duplicating a name of the existing ones
* @param existingSlots list of existing slot names within a sample utterance
* @param slotName next slot name within the sample utterance
* @return unique name for slotName
*/
public static String resolveToUniqueSlotName(final List<Slot> existingSlots, final String slotName) {
final AtomicReference<String> newSlotName = new AtomicReference<>(slotName);
final AtomicInteger index = new AtomicInteger(0);
final Integer maxIndex = slotNameAppendices.size();
while(existingSlots.stream().anyMatch(s -> s.getName().equals(newSlotName.get())) && index.get() < maxIndex) {
newSlotName.set(slotName + "_" + slotNameAppendices.get(index.getAndIncrement()));
}
return newSlotName.get();
}
/**
* Variant of releaseWaiters that additionally tries to remove any
* nodes no longer waiting for advance due to timeout or
* interrupt. Currently, nodes are removed only if they are at
* head of queue, which suffices to reduce memory footprint in
* most usages.
*
* @return current phase on exit
*/
private int abortWait(int phase) {
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
for (;;) {
Thread t;
QNode q = head.get();
int p = (int)(root.state >>> PHASE_SHIFT);
if (q == null || ((t = q.thread) != null && q.phase == p))
return p;
if (head.compareAndSet(q, q.next) && t != null) {
q.thread = null;
LockSupport.unpark(t);
}
}
}
private void clickListRow(JList<?> list, int row) {
AtomicReference<Rectangle> ref = new AtomicReference<>();
runSwing(() -> ref.set(list.getCellBounds(row, row)));
Rectangle rect = ref.get();
clickMouse(list, 1, rect.x + 5, rect.y + 5, 1, 0);
}
public Set<String> getSetOfFailedOrClosedTradeIdsFromLockedInFunds() throws TradeTxException {
AtomicReference<TradeTxException> tradeTxException = new AtomicReference<>();
Set<String> tradesIdSet = getTradesStreamWithFundsLockedIn()
.filter(Trade::hasFailed)
.map(Trade::getId)
.collect(Collectors.toSet());
tradesIdSet.addAll(failedTradesManager.getTradesStreamWithFundsLockedIn()
.filter(trade -> trade.getDepositTx() != null)
.map(trade -> {
log.warn("We found a failed trade with locked up funds. " +
"That should never happen. trade ID=" + trade.getId());
return trade.getId();
})
.collect(Collectors.toSet()));
tradesIdSet.addAll(closedTradableManager.getTradesStreamWithFundsLockedIn()
.map(trade -> {
Transaction depositTx = trade.getDepositTx();
if (depositTx != null) {
TransactionConfidence confidence = btcWalletService.getConfidenceForTxId(depositTx.getHashAsString());
if (confidence != null && confidence.getConfidenceType() != TransactionConfidence.ConfidenceType.BUILDING) {
tradeTxException.set(new TradeTxException(Res.get("error.closedTradeWithUnconfirmedDepositTx", trade.getShortId())));
} else {
log.warn("We found a closed trade with locked up funds. " +
"That should never happen. trade ID=" + trade.getId());
}
} else {
tradeTxException.set(new TradeTxException(Res.get("error.closedTradeWithNoDepositTx", trade.getShortId())));
}
return trade.getId();
})
.collect(Collectors.toSet()));
if (tradeTxException.get() != null)
throw tradeTxException.get();
return tradesIdSet;
}
public void send() {
if (logger.isInfoEnabled()) {
logger.info("Send statistics to monitor " + getUrl());
}
String timestamp = String.valueOf(System.currentTimeMillis());
for (Map.Entry<Statistics, AtomicReference<long[]>> entry : statisticsMap.entrySet()) {
// 获取已统计数据
Statistics statistics = entry.getKey();
AtomicReference<long[]> reference = entry.getValue();
long[] numbers = reference.get();
long success = numbers[0];
long failure = numbers[1];
long input = numbers[2];
long output = numbers[3];
long elapsed = numbers[4];
long concurrent = numbers[5];
long maxInput = numbers[6];
long maxOutput = numbers[7];
long maxElapsed = numbers[8];
long maxConcurrent = numbers[9];
// 发送汇总信息
URL url = statistics.getUrl()
.addParameters(MonitorService.TIMESTAMP, timestamp,
MonitorService.SUCCESS, String.valueOf(success),
MonitorService.FAILURE, String.valueOf(failure),
MonitorService.INPUT, String.valueOf(input),
MonitorService.OUTPUT, String.valueOf(output),
MonitorService.ELAPSED, String.valueOf(elapsed),
MonitorService.CONCURRENT, String.valueOf(concurrent),
MonitorService.MAX_INPUT, String.valueOf(maxInput),
MonitorService.MAX_OUTPUT, String.valueOf(maxOutput),
MonitorService.MAX_ELAPSED, String.valueOf(maxElapsed),
MonitorService.MAX_CONCURRENT, String.valueOf(maxConcurrent)
);
monitorService.collect(url);
// 减掉已统计数据
long[] current;
long[] update = new long[LENGTH];
do {
current = reference.get();
if (current == null) {
update[0] = 0;
update[1] = 0;
update[2] = 0;
update[3] = 0;
update[4] = 0;
update[5] = 0;
} else {
update[0] = current[0] - success;
update[1] = current[1] - failure;
update[2] = current[2] - input;
update[3] = current[3] - output;
update[4] = current[4] - elapsed;
update[5] = current[5] - concurrent;
}
} while (! reference.compareAndSet(current, update));
}
}
/**
* We first check the connection to the IRC server. If it fails, this test is ignored.
*/
@Test
@RetryOnFailure(times = 1)
public void testWikipediaEditsSource() throws Exception {
if (canConnect(1, TimeUnit.SECONDS)) {
final Time testTimeout = Time.seconds(60);
final WikipediaEditsSource wikipediaEditsSource = new WikipediaEditsSource();
ExecutorService executorService = null;
try {
executorService = Executors.newSingleThreadExecutor();
BlockingQueue<Object> collectedEvents = new ArrayBlockingQueue<>(1);
AtomicReference<Exception> asyncError = new AtomicReference<>();
// Execute the source in a different thread and collect events into the queue.
// We do this in a separate thread in order to not block the main test thread
// indefinitely in case that something bad happens (like not receiving any
// events)
executorService.execute(() -> {
try {
wikipediaEditsSource.run(new CollectingSourceContext<>(collectedEvents));
} catch (Exception e) {
boolean interrupted = e instanceof InterruptedException;
if (!interrupted) {
LOG.warn("Failure in WikipediaEditsSource", e);
}
asyncError.compareAndSet(null, e);
}
});
long deadline = deadlineNanos(testTimeout);
Object event = null;
Exception error = null;
// Check event or error
while (event == null && error == null && System.nanoTime() < deadline) {
event = collectedEvents.poll(1, TimeUnit.SECONDS);
error = asyncError.get();
}
if (error != null) {
// We don't use assertNull, because we want to include the error message
fail("Failure in WikipediaEditsSource: " + error.getMessage());
}
assertNotNull("Did not receive a WikipediaEditEvent within the desired timeout", event);
assertTrue("Received unexpected event " + event, event instanceof WikipediaEditEvent);
} finally {
wikipediaEditsSource.cancel();
if (executorService != null) {
executorService.shutdownNow();
executorService.awaitTermination(1, TimeUnit.SECONDS);
}
}
} else {
LOG.info("Skipping test, because not able to connect to IRC server.");
}
}
/**
* Possibly blocks and waits for phase to advance unless aborted.
* Call only on root phaser.
*
* @param phase current phase
* @param node if non-null, the wait node to track interrupt and timeout;
* if null, denotes noninterruptible wait
* @return current phase
*/
private int internalAwaitAdvance(int phase, QNode node) {
// assert root == this;
releaseWaiters(phase-1); // ensure old queue clean
boolean queued = false; // true when node is enqueued
int lastUnarrived = 0; // to increase spins upon change
int spins = SPINS_PER_ARRIVAL;
long s;
int p;
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
if (node == null) { // spinning in noninterruptible mode
int unarrived = (int)s & UNARRIVED_MASK;
if (unarrived != lastUnarrived &&
(lastUnarrived = unarrived) < NCPU)
spins += SPINS_PER_ARRIVAL;
boolean interrupted = Thread.interrupted();
if (interrupted || --spins < 0) { // need node to record intr
node = new QNode(this, phase, false, false, 0L);
node.wasInterrupted = interrupted;
}
}
else if (node.isReleasable()) // done or aborted
break;
else if (!queued) { // push onto queue
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
QNode q = node.next = head.get();
if ((q == null || q.phase == phase) &&
(int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
queued = head.compareAndSet(q, node);
}
else {
try {
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
node.wasInterrupted = true;
}
}
}
if (node != null) {
if (node.thread != null)
node.thread = null; // avoid need for unpark()
if (node.wasInterrupted && !node.interruptible)
Thread.currentThread().interrupt();
if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
return abortWait(phase); // possibly clean up on abort
}
releaseWaiters(phase);
return p;
}
@Test
public void testCloseFromProduceCallbackOnSenderThread() throws Exception {
String topic = "testCloseFromProduceCallbackOnSenderThread";
createTopic(topic, 1);
Random random = new Random(666);
Properties extra = new Properties();
extra.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "" + 50000000); //~50MB (larger than broker-size setting)
extra.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
extra.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
extra.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
Properties baseProducerConfig = getProducerProperties(extra);
LiKafkaInstrumentedProducerImpl<byte[], byte[]> producer = new LiKafkaInstrumentedProducerImpl<byte[], byte[]>(
baseProducerConfig,
Collections.emptyMap(),
(baseConfig, overrideConfig) -> new LiKafkaProducerImpl<byte[], byte[]>(LiKafkaClientsUtils.getConsolidatedProperties(baseConfig, overrideConfig)),
() -> "bogus",
10 //dont wait for a mario connection
);
byte[] key = new byte[3000];
byte[] value = new byte[49000000];
random.nextBytes(key);
random.nextBytes(value); //random data is incompressible, making sure our request is large
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, key, value);
AtomicReference<Throwable> issueRef = new AtomicReference<>();
Thread testThread = new Thread(new Runnable() {
@Override
public void run() {
try {
final Thread ourThread = Thread.currentThread();
Future<RecordMetadata> future = producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
//we expect a RecordTooLargeException. we also expect this to happen
//on the same thread.
if (Thread.currentThread() != ourThread) {
issueRef.compareAndSet(null,
new IllegalStateException("completion did not happen on caller thread by " + Thread.currentThread().getName())
);
}
producer.close(1, TimeUnit.SECONDS);
}
});
RecordMetadata recordMetadata = future.get(1, TimeUnit.MINUTES);
} catch (Throwable anything) {
issueRef.compareAndSet(null, anything);
}
}
}, "testCloseFromProduceCallbackOnSenderThread-thread");
testThread.setDaemon(true);
testThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
issueRef.compareAndSet(null, e);
}
});
testThread.start();
testThread.join(TimeUnit.MINUTES.toMillis(1));
Thread.State state = testThread.getState();
Assert.assertEquals(
state,
Thread.State.TERMINATED,
"thread was expected to finish, instead its " + state
);
Throwable issue = issueRef.get();
Throwable root = Throwables.getRootCause(issue);
Assert.assertTrue(root instanceof RecordTooLargeException, root.getMessage());
}
@Test
public void onErrorIncremental() throws InterruptedException {
AtomicInteger count = new AtomicInteger(0);
AtomicBoolean data = new AtomicBoolean(false);
AtomicReference<Vector<Integer>> result = new AtomicReference<>(Vector.empty());
AtomicBoolean complete = new AtomicBoolean(false);
AtomicReference<Throwable> error = new AtomicReference<Throwable>(null);
Subscription sub =of(1, 2, 3).<Integer>map(i -> {
throw new RuntimeException();
})
.onError(e -> count.incrementAndGet())
.forEach(0,n -> {
result.updateAndGet(v->v.plus(n));
data.set(true);
}, e -> {
error.set(e);
}, () -> {
complete.set(true);
});
assertThat(data.get(), equalTo(false));
assertThat(complete.get(), equalTo(false));
assertThat(error.get(), equalTo(null));
assertThat(result.get(),equalTo(Vector.empty()));
sub.request(1l);
while(error.get()==null){
LockSupport.parkNanos(10l);
}
assertThat(data.get(), equalTo(false));
assertThat(complete.get(), equalTo(false));
assertThat(error.get(), instanceOf(RuntimeException.class));
assertThat(result.get(),equalTo(Vector.empty()));
sub.request(100l);
while(!complete.get()){
LockSupport.parkNanos(10l);
}
assertThat(data.get(), equalTo(false));
assertThat(complete.get(), equalTo(true));
assertThat(error.get(), instanceOf(RuntimeException.class));
assertThat(result.get(),equalTo(Vector.empty()));
assertThat(count.get(),equalTo(3));
}
static final boolean isAllowedToEdit(AtomicReference<Thread> x, AtomicReference<Thread> y) {
return x != null && y != null && (x == y || x.get() == y.get());
}
public static String getClearCommentFileBody(File file, CodeTypeEnum codeTypeEnum) {
VirtualFile vf = LocalFileSystem.getInstance().refreshAndFindFileByIoFile(file);
if (FileDocumentManager.getInstance().isFileModified(vf)) {
try {
ThrowableComputable<Boolean, Throwable> action = new ThrowableComputable<Boolean, Throwable>() {
@Override
public Boolean compute() throws Throwable {
FileDocumentManager.getInstance().saveDocument(FileDocumentManager.getInstance().getDocument(vf));
return true;
}
};
Application application = ApplicationManager.getApplication();
if (application.isDispatchThread()) {
ApplicationManager.getApplication().runWriteAction(action);
} else {
if (application.isReadAccessAllowed()) {
LogUtils.LOG.error("Must not start write action from within read action in the other thread - deadlock is coming");
}
AtomicReference<Boolean> result = new AtomicReference();
AtomicReference<Throwable> exception = new AtomicReference();
TransactionGuard.getInstance().submitTransactionAndWait(() -> {
try {
result.set(WriteAction.compute(action));
} catch (Throwable var4) {
exception.set(var4);
}
});
Throwable t = (Throwable) exception.get();
if (t != null) {
t.addSuppressed(new RuntimeException());
ExceptionUtil.rethrowUnchecked(t);
throw t;
}
}
} catch (Throwable ignore) {
LogUtils.LOG.error("自动保存文件错误", ignore);
}
}
StringBuffer code = new StringBuffer();
try {
String body = VfsUtil.loadText(vf);
if (StringUtils.isNotBlank(body)) {
List<String> codeList = new LinkedList<>();
int codeBegin = -1;
int codeEnd = -1;
int lineCount = 0;
String[] lines = body.split("\r\n|\r|\n");
for (String line : lines) {
if (StringUtils.isNotBlank(line) && trim(line).equals(trim(codeTypeEnum.getComment() + Constant.SUBMIT_REGION_BEGIN))) {
codeBegin = lineCount;
} else if (StringUtils.isNotBlank(line) && trim(line).equals(trim(codeTypeEnum.getComment() + Constant.SUBMIT_REGION_END))) {
codeEnd = lineCount;
}
codeList.add(line);
lineCount++;
}
if (codeBegin >= 0 && codeEnd > 0 && codeBegin < codeEnd) {
for (int i = codeBegin + 1; i < codeEnd; i++) {
code.append(codeList.get(i)).append("\n");
}
} else {
Boolean isCode = Boolean.FALSE;
for (int i = 0; i < codeList.size(); i++) {
String str = codeList.get(i);
if (!isCode) {
if (StringUtils.isNotBlank(str) && !str.startsWith(codeTypeEnum.getComment())) {
isCode = Boolean.TRUE;
code.append(str).append("\n");
} else {
continue;
}
} else {
code.append(str).append("\n");
}
}
}
}
} catch (IOException id) {
}
return code.toString();
}
public void collect(URL url) {
// 读写统计变量
int success = url.getParameter(MonitorService.SUCCESS, 0);
int failure = url.getParameter(MonitorService.FAILURE, 0);
int input = url.getParameter(MonitorService.INPUT, 0);
int output = url.getParameter(MonitorService.OUTPUT, 0);
int elapsed = url.getParameter(MonitorService.ELAPSED, 0);
int concurrent = url.getParameter(MonitorService.CONCURRENT, 0);
// 初始化原子引用
Statistics statistics = new Statistics(url);
AtomicReference<long[]> reference = statisticsMap.get(statistics);
if (reference == null) {
statisticsMap.putIfAbsent(statistics, new AtomicReference<long[]>());
reference = statisticsMap.get(statistics);
}
// CompareAndSet并发加入统计数据
long[] current;
long[] update = new long[LENGTH];
do {
current = reference.get();
if (current == null) {
update[0] = success;
update[1] = failure;
update[2] = input;
update[3] = output;
update[4] = elapsed;
update[5] = concurrent;
update[6] = input;
update[7] = output;
update[8] = elapsed;
update[9] = concurrent;
} else {
update[0] = current[0] + success;
update[1] = current[1] + failure;
update[2] = current[2] + input;
update[3] = current[3] + output;
update[4] = current[4] + elapsed;
update[5] = (current[5] + concurrent) / 2;
update[6] = current[6] > input ? current[6] : input;
update[7] = current[7] > output ? current[7] : output;
update[8] = current[8] > elapsed ? current[8] : elapsed;
update[9] = current[9] > concurrent ? current[9] : concurrent;
}
} while (! reference.compareAndSet(current, update));
}
private void signalStarted(RunningWorker rw, AtomicReference<SubscriptionStateHandler> ref) {
rw.signalStarted();
if (ref.get() != null)
ref.get().start();
}
/** Returns a GCEController using default application credentials. */
public static GCEController newGCEController(
String projectName, Map<ClientParams, Integer> clients, ScheduledExecutorService executor) {
try {
HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
JsonFactory jsonFactory = new JacksonFactory();
GoogleCredential credential = GoogleCredential.getApplicationDefault(transport, jsonFactory);
if (credential.createScopedRequired()) {
credential =
credential.createScoped(
Collections.singletonList("https://www.googleapis.com/auth/cloud-platform"));
}
Storage storage =
new Storage.Builder(transport, jsonFactory, credential)
.setApplicationName("Cloud Pub/Sub Loadtest Framework")
.build();
Compute compute =
new Compute.Builder(transport, jsonFactory, credential)
.setApplicationName("Cloud Pub/Sub Loadtest Framework")
.build();
Pubsub pubsub =
new Pubsub.Builder(transport, jsonFactory, credential)
.setApplicationName("Cloud Pub/Sub Loadtest Framework")
.build();
ArrayList<ResourceController> controllers = new ArrayList<>();
ArrayList<ComputeResourceController> computeControllers = new ArrayList<>();
// Using atomic for effectively final not thread safety.
AtomicBoolean hasJavaClient = new AtomicBoolean(false);
AtomicReference<Boolean> hasKafkaClient = new AtomicReference<>(null);
clients.forEach(
(params, count) -> {
hasJavaClient.set(
hasJavaClient.get()
|| (params.getClientType().language == ClientType.Language.JAVA));
if (hasKafkaClient.get() != null) {
if (hasKafkaClient.get() != params.getClientType().isKafka()) {
if (!params.getClientType().isKafka()) {
log.error("Cannot use mixed kafka and gcp client types.");
System.exit(1);
}
}
} else {
hasKafkaClient.set(params.getClientType().isKafka());
}
GCEComputeResourceController computeController =
new GCEComputeResourceController(projectName, params, count, executor, compute);
controllers.add(computeController);
computeControllers.add(computeController);
});
controllers.add(new FirewallResourceController(projectName, executor, compute));
if (hasKafkaClient.get() != null && hasKafkaClient.get()) {
controllers.add(new KafkaResourceController(Client.TOPIC, executor));
}
controllers.add(
new PubsubResourceController(
projectName, Client.TOPIC, ImmutableList.of(Client.SUBSCRIPTION), executor, pubsub));
controllers.add(
new StorageResourceController(
projectName, Client.RESOURCE_DIR, false, hasJavaClient.get(), executor, storage));
return new GCEController(clients, executor, controllers, computeControllers);
} catch (Throwable t) {
log.error("Unable to initialize GCE: ", t);
return null;
}
}