下面列出了怎么用java.util.concurrent.ThreadPoolExecutor.DiscardPolicy的API类实例代码及写法,或者点击链接到github查看源代码。
private ScheduleInfoCache(TopicTalosResourceName topicTalosResourceName,
TalosClientConfig talosClientConfig, MessageService.Iface messageClient){
this.topicTalosResourceName = topicTalosResourceName;
this.messageClient = messageClient;
this.talosClientConfig = talosClientConfig;
this.isAutoLocation = false;
this.clientNum = new AtomicInteger(0);
GetScheduleInfoScheduleExecutor = Executors.newSingleThreadScheduledExecutor(
new NamedThreadFactory("talos-ScheduleInfoCache"));
GetScheduleInfoExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1), new DiscardPolicy());
LOG.warn("SimpleProducer or SimpleConsumer was built using improperly constructed function."
+ "Auto location was forbidden");
}
public MetricFetcher() {
int cores = Runtime.getRuntime().availableProcessors() * 2;
long keepAliveTime = 0;
int queueSize = 2048;
RejectedExecutionHandler handler = new DiscardPolicy();
fetchService = new ThreadPoolExecutor(cores, cores,
keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize),
new NamedThreadFactory("sentinel-dashboard-metrics-fetchService"), handler);
fetchWorker = new ThreadPoolExecutor(cores, cores,
keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize),
new NamedThreadFactory("sentinel-dashboard-metrics-fetchWorker"), handler);
IOReactorConfig ioConfig = IOReactorConfig.custom()
.setConnectTimeout(3000)
.setSoTimeout(3000)
.setIoThreadCount(Runtime.getRuntime().availableProcessors() * 2)
.build();
httpclient = HttpAsyncClients.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(final String method) {
return false;
}
}).setMaxConnTotal(4000)
.setMaxConnPerRoute(1000)
.setDefaultIOReactorConfig(ioConfig)
.build();
httpclient.start();
start();
}
public MetricFetcher() {
int cores = Runtime.getRuntime().availableProcessors() * 2;
long keepAliveTime = 0;
int queueSize = 2048;
RejectedExecutionHandler handler = new DiscardPolicy();
fetchService = new ThreadPoolExecutor(cores, cores,
keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize),
new NamedThreadFactory("sentinel-dashboard-metrics-fetchService"), handler);
fetchWorker = new ThreadPoolExecutor(cores, cores,
keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize),
new NamedThreadFactory("sentinel-dashboard-metrics-fetchWorker"), handler);
IOReactorConfig ioConfig = IOReactorConfig.custom()
.setConnectTimeout(3000)
.setSoTimeout(3000)
.setIoThreadCount(Runtime.getRuntime().availableProcessors() * 2)
.build();
httpclient = HttpAsyncClients.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(final String method) {
return false;
}
}).setMaxConnTotal(4000)
.setMaxConnPerRoute(1000)
.setDefaultIOReactorConfig(ioConfig)
.build();
httpclient.start();
start();
}
private ScheduleInfoCache(TopicTalosResourceName topicTalosResourceName,
TalosClientConfig talosClientConfig, MessageService.Iface messageClient,
TalosClientFactory talosClientFactory) {
this.topicTalosResourceName = topicTalosResourceName;
this.talosClientConfig = talosClientConfig;
this.isAutoLocation = talosClientConfig.isAutoLocation();
this.readWriteLock = new ReentrantReadWriteLock();
this.messageClient = messageClient;
this.talosClientFactory = talosClientFactory;
this.messageClientMap = new HashMap<String, MessageService.Iface>();
this.clientNum = new AtomicInteger(0);
// GetScheduleInfoScheduleExecutor for Schedule get work, cause ScheduledExecutorService
// use DelayedWorkQueue storage its task, which is unbounded. To private OOM, use
// GetScheduleInfoExecutor execute task when transfered, setting Queue size as 2.
GetScheduleInfoScheduleExecutor = Executors.newSingleThreadScheduledExecutor(
new NamedThreadFactory("talos-ScheduleInfoCache"));
GetScheduleInfoExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1), new DiscardPolicy());
LOG.info(this.isAutoLocation ? "Auto location is enabled for request of " + topicTalosResourceName
: "Auto location is forbidden for request of " + topicTalosResourceName);
try {
//get and update scheduleInfoMap
getScheduleInfo(topicTalosResourceName);
} catch (Throwable throwable) {
LOG.error("Exception in GetScheduleInfoTask: ", throwable);
if (Utils.isTopicNotExist(throwable)) {
return;
}
}
}
@Test
public void givenDiscardPolicy_WhenSaturated_ThenExecutorDiscardsTheNewTask() throws InterruptedException {
executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new SynchronousQueue<>(), new DiscardPolicy());
executor.execute(() -> waitFor(100));
BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("Result"));
assertThat(queue.poll(200, MILLISECONDS)).isNull();
}