下面列出了怎么用io.netty.util.internal.ConcurrentSet的API类实例代码及写法,或者点击链接到github查看源代码。
@Init
public void init() throws URISyntaxException {
//
this.currentTerm = "0";
this.votedFor = null;
this.status = ServerStatus.Follower;
this.landContext.addVotedListener(this);
this.landContext.addStatusListener(this);
//
// .添加节点
this.allNodes = new ArrayList<NodeData>();
Collection<String> serverIDs = this.landContext.getServerIDs();
for (String serverID : serverIDs) {
this.allNodes.add(new NodeData(serverID, this.landContext));
}
//
this.supporterVote = new ConcurrentSet<String>();
}
@OnWebSocketConnect
public void onConnect(Session session) {
Integer port = ((WebSocketSession) session).getRequestURI().getPort();
if (MinicapServerManager.portSessionMapping.get(port) == null) {
MinicapServerManager.portSessionMapping.put(port, new ConcurrentSet<>());
}
MinicapServerManager.portSessionMapping.get(port).add(session);
System.out.println("New session opened");
}
/**
* 添加员工对象的引用
*
* @param groupId 设备组 ID
* @param deviceId 设备 ID
* @param item 员工对象
*/
public void addEmployeeItemsRef(int groupId, int deviceId, Employee item) {
if (!employeeItemSetByDepartment.containsKey(item.getDepartment())) {
employeeItemSetByDepartment.put(item.getDepartment(), new ConcurrentSet<>());
}
employeeItemSetByDepartment.get(item.getDepartment()).add(item);
obtainDeviceItem(groupId, deviceId).saveEmployee(item);
}
private CompletableFuture<Void> waitTillCommitted(Controller controller, Stream s, UUID key, ConcurrentSet<UUID> uncommitted) {
AtomicBoolean committed = new AtomicBoolean(false);
AtomicInteger counter = new AtomicInteger(0);
// check 6 times with 5 second gap until transaction is committed. if it is not committed, declare it uncommitted
return Futures.loop(() -> !committed.get() && counter.getAndIncrement() < 5,
() -> Futures.delayedFuture(() -> controller.checkTransactionStatus(s, key)
.thenAccept(status -> {
committed.set(status.equals(Transaction.Status.COMMITTED));
}), 5000, executor), executor)
.thenAccept(v -> {
if (!committed.get()) {
uncommitted.add(key);
}
});
}
@Test
public void testThreadCount() throws InterruptedException, IOException, TimeoutException {
int coreSize = 2;
int keeperAliveTimeSeconds = 1;
Set<Thread> threadSet = new ConcurrentSet<>();
ExecutorService executorService = DefaultExecutorFactory.createAllowCoreTimeout(
getTestName(), coreSize, keeperAliveTimeSeconds).createExecutorService();
for(int i=0;i<coreSize;i++){
executorService.execute(new Runnable() {
@Override
public void run() {
logger.info("running");
threadSet.add(Thread.currentThread());
}
});
}
sleep(keeperAliveTimeSeconds * 2000);
logger.info("size: {}", threadSet.size(), threadSet);
Assert.assertEquals(coreSize, threadSet.size());
for(Thread thread : threadSet){
Assert.assertFalse(thread.isAlive());
}
}
@Override
public boolean add(Channel channel) {
ConcurrentSet<Channel> set =
channel instanceof ServerChannel? serverChannels : nonServerChannels;
boolean added = set.add(channel);
if (added) {
channel.closeFuture().addListener(remover);
}
return added;
}
private Observable<Void> handleUpdateRecommendationsForUser(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
System.out.println("HTTP request -> update recommendations for user: " + request.getPath());
final String userId = userIdFromPath(request.getPath());
if (userId == null) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return response.close();
}
return request.getContent().flatMap(new Func1<ByteBuf, Observable<Void>>() {
@Override
public Observable<Void> call(ByteBuf byteBuf) {
String movieId = byteBuf.toString(Charset.defaultCharset());
System.out.println(format(" updating: {user=%s, movie=%s}", userId, movieId));
synchronized (this) {
Set<String> recommendations;
if (userRecommendations.containsKey(userId)) {
recommendations = userRecommendations.get(userId);
} else {
recommendations = new ConcurrentSet<String>();
userRecommendations.put(userId, recommendations);
}
recommendations.add(movieId);
}
response.setStatus(HttpResponseStatus.OK);
return response.close();
}
});
}
public void sendImage(MinicapJettyServer server) {
Integer port = server.getPort();
BlockingQueue<byte[]> imgdataQueue = portQueueMapping.get(port);
Thread sendImgThread =
new Thread() {
@Override
public void run() {
byte[] buffer = {};
while (!isInterrupted()) {
try {
byte[] candidate = {};
if (imgdataQueue != null) {
byte[] currentImg =
imgdataQueue.poll(IMG_POLL_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
if (currentImg == null) {
candidate = buffer.clone();
} else {
candidate = currentImg;
buffer = candidate.clone();
}
} else {
Thread.sleep(WAIT_FOR_IMG_QUEUE.toMillis());
continue;
}
// not ready
if (port == null) {
return;
}
// Send the new img to all open WebSocket sessions
ConcurrentSet<Session> sessions = portSessionMapping.get(port);
if (sessions == null) {
continue;
}
for (Session session : sessions) {
if (!session.isOpen()) {
portSessionMapping.get(port).remove(session);
} else {
session.getRemote().sendBytes(ByteBuffer.wrap(candidate));
}
}
} catch (Throwable e) {
// Let the owning Thread know it's been interrupted, so it can clean up
interrupt();
logger.info("No data from minicap.");
}
}
logger.info(String.format("Thread id(%s) killed.", this.getId()));
}
};
sendImgThread.start();
portSendImgThreadMapping.put(port, sendImgThread);
}
public void disableAllEvents() {
this.enabledActions = new ConcurrentSet<>();
}
@Override
public void onContextRefreshed() {
runningJobs = new ConcurrentSet<>();
executorService = Executors.newCachedThreadPool();
}
@Before
public void prepare() {
rapidNodeRunners = new ConcurrentSet<>();
}
public HttpServerHandler(ConcurrentSet activeChannels, HttpServer server, List<PostProcessorEntry> postProcessors)
{
this.server = server;
this.activeChannels = activeChannels;
this.postProcessors = postProcessors;
}
public HaProxyBackendServerHandler(ConcurrentSet activeChannels, HttpServer httpServer, List<PostProcessorEntry> postProcessors) {
super(activeChannels, httpServer, postProcessors);
}
/**
* This is a map of the enabled modlog actions and which channel they post to.
* This will never be an none-set channel (-1).
*
* @return The map of channelId(s) and actions to log to them channels.
*/
public Set<ModlogAction> getEnabledActions() {
if (enabledActions == null)
enabledActions = new ConcurrentSet<>();
return enabledActions;
}