下面列出了com.google.common.util.concurrent.MoreExecutors#listeningDecorator ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Counts yield and q30 of fastqs in the fastqsPerSample multimap, using 1 thread per file.
* The yield and q30 of the Undetermined sample will count towards the total yield and q30 of the flowcell.
*
* @param fastqsPerSample multimap of sampleName and fastqs to process
* @param threadCount number of maximum threads
* @return FastqTracker with yield and q30 stats for the fastqs processed.
*/
@NotNull
static FastqTracker processFastqs(@NotNull final Multimap<String, File> fastqsPerSample, final int threadCount)
throws InterruptedException {
LOGGER.info("Using {} threads. Processing {} fastQ files.", threadCount, fastqsPerSample.size());
final FastqTrackerWrapper tracker = new FastqTrackerWrapper();
final ListeningExecutorService threadPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadCount));
for (final String sampleName : fastqsPerSample.keySet()) {
final Collection<File> fastqs = fastqsPerSample.get(sampleName);
for (final File fastq : fastqs) {
final String laneName = getLaneName(fastq);
final ListenableFuture<FastqData> futureResult = threadPool.submit(() -> processFile(fastq));
addCallback(futureResult, (data) -> tracker.addDataFromSampleFile(sampleName, laneName, data),
(error) -> LOGGER.error("Failed to process file: {}", fastq.getName(), error));
}
}
threadPool.shutdown();
threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
return tracker.tracker();
}
@Test
public void testSValue() throws Exception {
// Check that we never generate an S value that is larger than half the curve order. This avoids a malleability
// issue that can allow someone to change a transaction [hash] without invalidating the signature.
final int ITERATIONS = 10;
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(ITERATIONS));
List<ListenableFuture<ECKey.ECDSASignature>> sigFutures = Lists.newArrayList();
final ECKey key = new ECKey();
for (byte i = 0; i < ITERATIONS; i++) {
final byte[] hash = HashUtil.sha3(new byte[]{i});
sigFutures.add(executor.submit(new Callable<ECKey.ECDSASignature>() {
@Override
public ECKey.ECDSASignature call() throws Exception {
return key.doSign(hash);
}
}));
}
List<ECKey.ECDSASignature> sigs = Futures.allAsList(sigFutures).get();
for (ECKey.ECDSASignature signature : sigs) {
assertTrue(signature.s.compareTo(ECKey.HALF_CURVE_ORDER) <= 0);
}
final ECKey.ECDSASignature duplicate = new ECKey.ECDSASignature(sigs.get(0).r, sigs.get(0).s);
assertEquals(sigs.get(0), duplicate);
assertEquals(sigs.get(0).hashCode(), duplicate.hashCode());
}
protected Loader(boolean lazy) {
children = new ArrayList<>();
executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));
entryCache = new HashMap<>();
childHash = 31;
this.lazy = lazy;
}
@Test(timeout = 5000)
public void testSignalFatalAndThrow() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_SIGNAL_FATAL_AND_THROW);
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture =
taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.awaitStart();
TestProcessor.signal();
TaskRunner2Result result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorIOException(), false, TaskFailureType.FATAL);
TestProcessor.awaitCompletion();
assertNull(taskReporter.currentCallable);
umbilical.verifyTaskFailedEvent(
FAILURE_START_STRING,
IOException.class.getName() + ": " + IOException.class.getSimpleName(), TaskFailureType.FATAL);
assertTrue(TestProcessor.wasAborted());
} finally {
executor.shutdownNow();
}
}
public static SyncEventWriter initialize(TasmoViewModel tasmoViewModel,
WrittenEventProvider writtenEventProvider,
TasmoStorageProvider tasmoStorageProvider,
CallbackStream<List<BookkeepingEvent>> bookkeepingStream,
TasmoBlacklist tasmoBlacklist,
TasmoSyncWriteConfig config) throws Exception {
ConcurrencyStore concurrencyStore = new HBaseBackedConcurrencyStore(tasmoStorageProvider.concurrencyStorage());
EventValueStore eventValueStore = new EventValueStore(concurrencyStore, tasmoStorageProvider.eventStorage());
ReferenceStore referenceStore = new ReferenceStore(concurrencyStore, tasmoStorageProvider.multiLinksStorage(),
tasmoStorageProvider.multiBackLinksStorage());
EventPersistor eventPersistor = new SyncWriteEventPersistor(writtenEventProvider,
new WrittenInstanceHelper(),
concurrencyStore,
eventValueStore,
referenceStore);
ThreadFactory syncEventWritorThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("sync-event-writer-%d")
.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.error("Thread " + t.getName() + " threw uncaught exception", e);
}
})
.build();
ExecutorService syncEventWritorThreads = Executors.newFixedThreadPool(config.getNumberOfSyncEventWritorThreads(), syncEventWritorThreadFactory);
ModifierStore modifierStore = new ModifierStore(tasmoStorageProvider.modifierStorage());
return new SyncEventWriter(MoreExecutors.listeningDecorator(syncEventWritorThreads),
tasmoViewModel,
eventPersistor,
modifierStore,
bookkeepingStream,
tasmoBlacklist);
}
@Provides @Singleton @CachingSubscriptionDAOExecutorService
ListeningExecutorService provideCachingSubscriptionDAOExecutorService(LifeCycleRegistry lifeCycleRegistry) {
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(
1, new ThreadFactoryBuilder().setNameFormat("subscription-cache-%d").build()));
lifeCycleRegistry.manage(new ExecutorServiceManager(service, Duration.seconds(1), "subscription-cache"));
return service;
}
@Override
public synchronized void start ()
{
if ( this.executor != null )
{
// double start
return;
}
this.executor = MoreExecutors.listeningDecorator ( Executors.newSingleThreadScheduledExecutor ( new NamedThreadFactory ( this.threadName, false, true ) ) );
}
@BeforeMethod(alwaysRun=true)
@Override
public void setUp() throws Exception {
super.setUp();
executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
creationConcurrencyMonitor = new ConcurrencyMonitor();
deletionConcurrencyMonitor = new ConcurrencyMonitor();
}
@NotNull
@Override
@Persistence
public ListeningExecutorService get() {
if (executorService == null) {
final ThreadFactory threadFactory = ThreadFactoryUtil.create("persistence-executor");
final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(threadFactory);
this.executorService = MoreExecutors.listeningDecorator(singleThreadExecutor);
}
return executorService;
}
/**
* Constructor.
*
* @param registry registry for spectator
* @param config Program configuration
*/
@Autowired
public ThreadServiceManager(final Registry registry, final Config config) {
final ExecutorService executorService = newFixedThreadPool(
config.getServiceMaxNumberOfThreads(),
"metacat-service-pool-%d",
1000
);
this.executor = MoreExecutors.listeningDecorator(executorService);
RegistryUtil.registerThreadPool(registry, "metacat-service-pool", (ThreadPoolExecutor) executorService);
}
@Test
public void testEventQueue() throws Exception {
// initialize the queue
ClusterEventBlockingQueue queue = new ClusterEventBlockingQueue();
// add an event
ClusterEvent event1 = new ClusterEvent(ClusterEventType.IdealStateChange);
queue.put(event1);
Assert.assertEquals(queue.size(), 1);
// add an event with a different name
ClusterEvent event2 = new ClusterEvent(ClusterEventType.ConfigChange);
queue.put(event2);
Assert.assertEquals(queue.size(), 2);
// add an event with the same type as event1 (should not change queue size)
ClusterEvent newEvent1 = new ClusterEvent(ClusterEventType.IdealStateChange);
newEvent1.addAttribute("attr", 1);
queue.put(newEvent1);
Assert.assertEquals(queue.size(), 2);
// test peek
ClusterEvent peeked = queue.peek();
Assert.assertEquals(peeked.getEventType(), ClusterEventType.IdealStateChange);
Assert.assertEquals((int) peeked.getAttribute("attr"), 1);
Assert.assertEquals(queue.size(), 2);
// test take the head
ListeningExecutorService service =
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ClusterEvent takenEvent1 = safeTake(queue, service);
Assert.assertEquals(takenEvent1.getEventType(), ClusterEventType.IdealStateChange);
Assert.assertEquals((int) takenEvent1.getAttribute("attr"), 1);
Assert.assertEquals(queue.size(), 1);
// test take the tail
ClusterEvent takenEvent2 = safeTake(queue, service);
Assert.assertEquals(takenEvent2.getEventType(), ClusterEventType.ConfigChange);
Assert.assertEquals(queue.size(), 0);
}
private BuildResult runInParallel(
Project project,
BlazeContext context,
Function<List<TargetExpression>, BuildResult> invocation) {
// new executor for each sync, so we get an up-to-date experiment value. This is fine, because
// it's just a view of the single application pool executor. Doesn't need to be shutdown for the
// same reason
ListeningExecutorService executor =
MoreExecutors.listeningDecorator(
AppExecutorUtil.createBoundedApplicationPoolExecutor(
"RemoteBlazeExecutor", remoteConcurrentSyncs.getValue()));
ListenableFuture<List<BuildResult>> future =
Futures.allAsList(
shardedTargets.stream()
.map(s -> executor.submit(() -> invocation.apply(s)))
.collect(toImmutableList()));
String buildSystem = Blaze.buildSystemName(project);
List<BuildResult> results =
FutureUtil.waitForFuture(context, future)
.onError(String.format("%s build failed", buildSystem))
.run()
.result();
if (results == null) {
return BuildResult.FATAL_ERROR;
}
return results.stream().reduce(BuildResult::combine).orElse(BuildResult.FATAL_ERROR);
}
public static TasmoServiceHandle<ReadMaterializerViewFields> initialize(TasmoReadMaterializerConfig config,
TasmoViewModel tasmoViewModel,
WrittenEventProvider writtenEventProvider,
TasmoStorageProvider tasmoStorageProvider) throws Exception {
ConcurrencyStore concurrencyStore = new HBaseBackedConcurrencyStore(tasmoStorageProvider.concurrencyStorage());
ReferenceStore referenceStore = new ReferenceStore(concurrencyStore,
tasmoStorageProvider.multiLinksStorage(),
tasmoStorageProvider.multiBackLinksStorage());
// TODO add config option to switch between batching and serial.
ReferenceTraverser referenceTraverser = new SerialReferenceTraverser(referenceStore);
EventValueStore eventValueStore = new EventValueStore(concurrencyStore, tasmoStorageProvider.eventStorage());
FieldValueReader fieldValueReader = new EventValueStoreFieldValueReader(eventValueStore);
ThreadFactory eventProcessorThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("view-read-materialization-processor-%d")
.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.error("Thread " + t.getName() + " threw uncaught exception", e);
}
})
.build();
ExecutorService processorThreads = Executors.newFixedThreadPool(config.getNumberOfViewRequestProcessorThreads(), eventProcessorThreadFactory);
final ListeningExecutorService listeningDecorator = MoreExecutors.listeningDecorator(processorThreads);
final ReadMaterializerViewFields readMaterializer = new ReadMaterializerViewFields(referenceTraverser,
fieldValueReader, concurrencyStore, tasmoViewModel, listeningDecorator);
return new TasmoServiceHandle<ReadMaterializerViewFields>() {
@Override
public ReadMaterializerViewFields getService() {
return readMaterializer;
}
@Override
public void start() throws Exception {
}
@Override
public void stop() throws Exception {
listeningDecorator.shutdown();
}
};
}
public DataProviderServiceImpl() {
this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4));
}
@Provides @Singleton @Sync(defer=false)
ListeningExecutorService immediateListeningExecutor(@Sync(defer=false) ExecutorService executor) {
return MoreExecutors.listeningDecorator(executor);
}
public CacheLoaderAsync(Cache<K, V> cache, DataStoreReader<K,V> dataStoreReader, ExecutorService executorService) {
super(cache, dataStoreReader);
this.executorService = MoreExecutors.listeningDecorator(executorService);
}
public BuildExecutors() {
int nProcessors = Runtime.getRuntime().availableProcessors();
int nThreads = Math.max(2, Math.min(4, nProcessors));
sharedService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(nThreads, new ThreadFactoryBuilder().setNameFormat("ParallelGenerator-%d").build()));
}
private ContentUploadServiceImpl(Builder builder) {
super(builder);
executorService = MoreExecutors.listeningDecorator(builder.executorService);
mediaUploader = builder.mediaUploader;
}
public static TasmoEventIngress initialize(
OrderIdProvider threadTimestamp,
TasmoViewModel tasmoViewModel,
WrittenEventProvider writtenEventProvider,
TasmoStorageProvider tasmoStorageProvider,
CommitChange commitChange,
ViewChangeNotificationProcessor viewChangeNotificationProcessor,
ViewNotificationListener allViewNotificationsListener,
CallbackStream<List<BookkeepingEvent>> bookkeepingStream,
final Optional<WrittenEventProcessorDecorator> writtenEventProcessorDecorator,
TasmoBlacklist tasmoBlacklist,
TasmoServiceConfig config) throws Exception {
ConcurrencyStore concurrencyStore = new HBaseBackedConcurrencyStore(tasmoStorageProvider.concurrencyStorage());
EventValueStore eventValueStore = new EventValueStore(concurrencyStore, tasmoStorageProvider.eventStorage());
ReferenceStore referenceStore = new ReferenceStore(concurrencyStore, tasmoStorageProvider.multiLinksStorage(),
tasmoStorageProvider.multiBackLinksStorage());
WrittenEventProcessorDecorator bookKeepingEventProcessor = new WrittenEventProcessorDecorator() {
@Override
public WrittenEventProcessor decorateWrittenEventProcessor(WrittenEventProcessor writtenEventProcessor) {
EventBookKeeper eventBookKeeper = new EventBookKeeper(writtenEventProcessor);
if (writtenEventProcessorDecorator.isPresent()) {
return writtenEventProcessorDecorator.get().decorateWrittenEventProcessor(eventBookKeeper);
} else {
return eventBookKeeper;
}
}
};
ReferenceTraverser referenceTraverser = new SerialReferenceTraverser(referenceStore);
TasmoEventTraversal eventTraverser = new TasmoEventTraverser(bookKeepingEventProcessor,
new OrderIdProviderImpl(new ConstantWriterIdProvider(1)));
WrittenInstanceHelper writtenInstanceHelper = new WrittenInstanceHelper();
WriteFanoutEventPersistor eventPersistor = new WriteFanoutEventPersistor(writtenEventProvider,
writtenInstanceHelper, concurrencyStore, eventValueStore, referenceStore);
final TasmoProcessingStats processingStats = new TasmoProcessingStats();
StatCollectingFieldValueReader fieldValueReader = new StatCollectingFieldValueReader(processingStats,
new EventValueStoreFieldValueReader(eventValueStore));
commitChange = new ConcurrencyAndExistenceCommitChange(concurrencyStore, commitChange);
TasmoEventProcessor tasmoEventProcessor = new TasmoEventProcessor(tasmoViewModel,
eventPersistor,
writtenEventProvider,
eventTraverser,
viewChangeNotificationProcessor,
allViewNotificationsListener,
concurrencyStore,
referenceStore,
fieldValueReader,
referenceTraverser,
commitChange,
processingStats);
ThreadFactory eventProcessorThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("event-processor-%d")
.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.error("Thread " + t.getName() + " threw uncaught exception", e);
}
})
.build();
ExecutorService eventProcessorThreads = Executors.newFixedThreadPool(config.getNumberOfEventProcessorThreads(), eventProcessorThreadFactory);
TasmoWriteMaterializer materializer = new TasmoWriteMaterializer(bookkeepingStream,
tasmoEventProcessor,
MoreExecutors.listeningDecorator(eventProcessorThreads), tasmoBlacklist);
TasmoEventIngress tasmoEventIngress = new TasmoEventIngress(materializer);
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
processingStats.logStats();
} catch (Exception x) {
LOG.error("Issue with logging stats. ", x);
}
}
}, 60, 60, TimeUnit.SECONDS);
return tasmoEventIngress;
}
public static void newTaskPool(int workers) {
if (workers > 0)
ioWorker = MoreExecutors.listeningDecorator(IoWorker.newExecutors(workers));
}