下面列出了怎么用java.util.concurrent.CopyOnWriteArraySet的API类实例代码及写法,或者点击链接到github查看源代码。
@SuppressWarnings("unchecked")
protected static <T> Set<T> createSimilarSet(Set<T> orig) {
if (orig instanceof SortedSet) {
Comparator comparator = ((SortedSet) orig).comparator();
if (orig instanceof ConcurrentSkipListSet) {
return new ConcurrentSkipListSet<T>(comparator);
} else {
return new TreeSet<T>(comparator);
}
} else {
if (orig instanceof CopyOnWriteArraySet) {
return new CopyOnWriteArraySet<T>();
} else {
// Do not use HashSet
return new LinkedHashSet<T>();
}
}
}
@Test
public void shouldGenerateUniqueIdsInMultithreadedEnvironment()
throws BrokenBarrierException, InterruptedException {
AwsXRayIdsGenerator generator = new AwsXRayIdsGenerator();
Set<TraceId> traceIds = new CopyOnWriteArraySet<>();
Set<SpanId> spanIds = new CopyOnWriteArraySet<>();
int threads = 8;
int generations = 128;
CyclicBarrier barrier = new CyclicBarrier(threads + 1);
Executor executor = Executors.newFixedThreadPool(threads);
for (int i = 0; i < threads; i++) {
executor.execute(new GenerateRunner(generations, generator, barrier, traceIds, spanIds));
}
barrier.await();
barrier.await();
assertThat(traceIds).hasSize(threads * generations);
assertThat(spanIds).hasSize(threads * generations);
}
/**
* Adds a feature to the CacheListener attribute of the CacheWatchRepairable object
* <p>
* @param cacheName The feature to be added to the CacheListener attribute
* @param obj The feature to be added to the CacheListener attribute
* @throws IOException
*/
@Override
public <K, V> void addCacheListener( String cacheName, ICacheListener<K, V> obj )
throws IOException
{
// Record the added cache listener locally, regardless of whether the
// remote add-listener operation succeeds or fails.
Set<ICacheListener<?, ?>> listenerSet = cacheMap.computeIfAbsent(cacheName, key -> {
return new CopyOnWriteArraySet<>();
});
listenerSet.add( obj );
log.info( "Adding listener to cache watch. ICacheListener = {0} | "
+ "ICacheObserver = {1} | cacheName = {2}", obj, cacheWatch,
cacheName );
cacheWatch.addCacheListener( cacheName, obj );
}
/**
* Unregisters all subscribers on the given listener object.
*/
void unregister(Object listener) {
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> listenerMethodsForType = entry.getValue();
CopyOnWriteArraySet<Subscriber> currentSubscribers = subscribers.get(eventType);
if (currentSubscribers == null
|| !currentSubscribers.removeAll(listenerMethodsForType)) {
// if removeAll returns true, all we really know is that at least one subscriber was
// removed... however, barring something very strange we can assume that if at least one
// subscriber was removed, all subscribers on listener for that event type were... after
// all, the definition of subscribers on a particular class is totally static
throw new IllegalArgumentException("missing event subscriber for an annotated method. Is " + listener
+ " registered?");
}
// don't try to remove the set if it's empty; that can't be done safely without a lock
// anyway, if the set is empty it'll just be wrapping an array of length 0
}
}
/**
* Post.
*
* @param event the event
*/
@Override
public void post(final Event event) {
if (!isEnable()) {
return;
}
CopyOnWriteArraySet<EventSubscriber> subscribers = eventSubscriberMap.get(event.getClass());
if (null != subscribers && !subscribers.isEmpty()) {
for (final EventSubscriber subscriber : subscribers) {
if (subscriber.isSync()) {
handleEvent(subscriber, event);
} else { // 异步
executor.execute(new Runnable() {
@Override
public void run() {
handleEvent(subscriber, event);
}
});
}
}
}
}
@Override
@SuppressWarnings("unchecked")
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
CopyOnWriteArraySet<SubscriptionInfo> commandSubscriptions = (CopyOnWriteArraySet)
session.getAttributes().get(CommandHandlers.SUBSCRIPTION_SET_NAME);
commandService.sendUnsubscribeRequest(commandSubscriptions.stream()
.map(SubscriptionInfo::getSubscriptionId).collect(Collectors.toSet()));
CopyOnWriteArraySet<SubscriptionInfo> notificationSubscriptions = (CopyOnWriteArraySet)
session.getAttributes().get(NotificationHandlers.SUBSCRIPTION_SET_NAME);
notificationService.unsubscribe(notificationSubscriptions.stream()
.map(SubscriptionInfo::getSubscriptionId).collect(Collectors.toSet()));
sessionMonitor.removeSession(session.getId());
if(session.isOpen()) {
session.close();
}
logger.info("Websocket Connection Closed: session id {}, close status is {} ", session.getId(), status);
}
@Test
public void copySetTest() {
Teacher teacher = newTeacher();
Set<Teacher> teachers = newHashSet(teacher, teacher);
Set<TeacherEntity> targets = copier.copySet(teachers);
for (TeacherEntity target : targets) {
// 名称不同
assertThat(target.getUsername()).isNull();
assertThat(target.getAge()).isEqualTo(teacher.getAge());
assertThat(target.getSex()).isEqualTo(teacher.getSex());
assertThat(target.getHeight()).isEqualTo(teacher.getHeight());
// 类型不同
assertThat(target.getWeight()).isNull();
assertThat(target.getHandsome()).isEqualTo(teacher.getHandsome());
assertThat(target.getHouse()).isEqualTo(teacher.getHouse());
// 类型不同
assertThat(target.getLover()).isNull();
// 这里解释一下为什么相等,因为两个字段名称相同且类型都为 list
assertThat(target.getStudents()).isEqualTo(teacher.getStudents());
}
// 简单测试一下
copier.copySet(teachers, LinkedHashSet::new);
copier.copySet(teachers, CopyOnWriteArraySet::new);
copier.copySet(teachers, ConcurrentSkipListSet::new);
}
/**
* Constructor
* @param cm The connection manager
* @param mc The managed connection
* @param credential The credential
* @param mcp The ManagedConnectionPool
* @param flushStrategy The FlushStrategy
*/
public AbstractConnectionListener(ConnectionManager cm, ManagedConnection mc, Credential credential,
ManagedConnectionPool mcp, FlushStrategy flushStrategy)
{
this.cm = cm;
this.mc = mc;
this.mcp = mcp;
this.flushStrategy = flushStrategy;
this.credential = credential;
this.state = new AtomicInteger(FREE);
this.connectionHandles = new CopyOnWriteArraySet<Object>();
if (cm.getConnectionManagerConfiguration().isTracking() != null &&
cm.getConnectionManagerConfiguration().isTracking().booleanValue())
this.connectionTraces = new HashMap<Object, Exception>();
long timestamp = System.currentTimeMillis();
this.validated = timestamp;
this.fromPool = timestamp;
this.toPool = timestamp;
mc.addConnectionEventListener(this);
}
/**
* Unregisters all subscribers on the given listener object.
*/
void unregister(Object listener) {
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> listenerMethodsForType = entry.getValue();
CopyOnWriteArraySet<Subscriber> currentSubscribers = subscribers.get(eventType);
if (currentSubscribers == null
|| !currentSubscribers.removeAll(listenerMethodsForType)) {
// if removeAll returns true, all we really know is that at least one subscriber was
// removed... however, barring something very strange we can assume that if at least one
// subscriber was removed, all subscribers on listener for that event type were... after
// all, the definition of subscribers on a particular class is totally static
throw new IllegalArgumentException("missing event subscriber for an annotated method. Is " + listener
+ " registered?");
}
// don't try to remove the set if it's empty; that can't be done safely without a lock
// anyway, if the set is empty it'll just be wrapping an array of length 0
}
}
public void addAsyncEventQueueId(String asyncEventQueueId) {
if(this.asyncEventQueueIds == null){
this.asyncEventQueueIds = new CopyOnWriteArraySet<String>();
this.asyncEventQueueIds.add(asyncEventQueueId);
} else{
synchronized (this.asyncEventQueueIds) { // TODO: revisit this
// synchronization : added as per
// above code
if (this.asyncEventQueueIds.contains(asyncEventQueueId)) {
throw new IllegalArgumentException(
LocalizedStrings.AttributesFactory_ASYNC_EVENT_QUEUE_ID_0_IS_ALREADY_ADDED
.toLocalizedString(asyncEventQueueId));
}
this.asyncEventQueueIds.add(asyncEventQueueId);
}
}
setHasAsyncEventListeners(true);
}
@Before
public void resetClientConfiguration() {
dnsResolutionCounter = new AtomicInteger(0);
requestedHosts = new CopyOnWriteArraySet<String>();
ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.withMaxErrorRetry(0);
clientConfiguration.withDnsResolver(new DnsResolver() {
DnsResolver system = new SystemDefaultDnsResolver();
@Override
public InetAddress[] resolve(String host) throws UnknownHostException {
dnsResolutionCounter.incrementAndGet();
requestedHosts.add(host);
return system.resolve(host);
}
});
testedClient = new AmazonHttpClient(clientConfiguration);
}
public <T> void registerSubscriber(@NonNull Object observer, @NonNull CustomSubscriber<T> subscriber) {
ObjectHelper.requireNonNull(observer, "Observer to register must not be null.");
ObjectHelper.requireNonNull(subscriber, "Subscriber to register must not be null.");
SUBSCRIBERS.putIfAbsent(observer.getClass(), new CopyOnWriteArraySet<CustomSubscriber<?>>());
Set<CustomSubscriber<?>> subscribers = SUBSCRIBERS.get(observer.getClass());
if (subscribers.contains(subscriber))
throw new IllegalArgumentException("Subscriber has already been registered.");
else
subscribers.add(subscriber);
Observable<T> observable = bus.ofType(subscriber.getEventClass())
.observeOn(subscriber.getScheduler() == null ?
AndroidSchedulers.mainThread() : subscriber.getScheduler());
Class<?> observerClass = observer.getClass();
OBSERVERS.putIfAbsent(observerClass, new CompositeDisposable());
CompositeDisposable composite = OBSERVERS.get(observerClass);
composite.add(((subscriber.getFilter() == null) ? observable :
observable.filter(subscriber.getFilter()))
.subscribe(subscriber));
}
@Bean
public LiveManSetting getLiveManSetting() throws Exception {
LiveManSetting liveManSetting;
if (settingFile.exists()) {
liveManSetting = readSetting();
if (liveManSetting.getServers() == null) {
liveManSetting.setServers(new CopyOnWriteArraySet<>());
}
if (liveManSetting.getExternalAppSecretDOS() == null) {
liveManSetting.setExternalAppSecretDOS(new CopyOnWriteArraySet<>());
}
} else {
liveManSetting = new LiveManSetting();
liveManSetting.setAccounts(new CopyOnWriteArraySet<>());
liveManSetting.setChannels(new CopyOnWriteArraySet<>());
liveManSetting.setServers(new CopyOnWriteArraySet<>());
liveManSetting.setExternalAppSecretDOS(new CopyOnWriteArraySet<>());
liveManSetting.setBannedKeywords(new String[0]);
liveManSetting.setBannedYoutubeChannel(new String[0]);
liveManSetting.setTempPath("liveManTemp");
saveSetting(liveManSetting);
}
return liveManSetting;
}
/**
* iterator() returns an iterator containing the elements of the
* set in insertion order
*/
public void testIterator() {
Collection empty = new CopyOnWriteArraySet();
assertFalse(empty.iterator().hasNext());
try {
empty.iterator().next();
shouldThrow();
} catch (NoSuchElementException success) {}
Integer[] elements = new Integer[SIZE];
for (int i = 0; i < SIZE; i++)
elements[i] = i;
Collections.shuffle(Arrays.asList(elements));
Collection<Integer> full = populatedSet(elements);
Iterator it = full.iterator();
for (int j = 0; j < SIZE; j++) {
assertTrue(it.hasNext());
assertEquals(elements[j], it.next());
}
assertIteratorExhausted(it);
}
@Override
public void enableAutoTrackFragment(Class<?> fragment) {
try {
if (fragment == null) {
return;
}
if (mAutoTrackFragments == null) {
mAutoTrackFragments = new CopyOnWriteArraySet<>();
}
String canonicalName = fragment.getCanonicalName();
if (!TextUtils.isEmpty(canonicalName)) {
mAutoTrackFragments.add(canonicalName.hashCode());
}
} catch (Exception ex) {
SALog.printStackTrace(ex);
}
}
@Override
public Set<String> keys() {
tx.validateForQuery(this);
if (properties == null) {
return Collections.emptySet();
} else {
return new CopyOnWriteArraySet<String>(Arrays.asList(properties.getPropertyKeys()));
}
}
/**
* Unregister all event handler methods for a subscriber.
* By the way this method also removes all subscribers that was already garbage collected.
*
* @param subscriber a @Nullable object whose event handlers methods should be unregistered.
* Pass null to remove old, already garbage collected objects.
*/
public void unregister(@Nullable Object subscriber) {
List<EventHandler> eventHandlersToRemove = new ArrayList<>();
//NOTE: mRegisteredEventHandlersByEventType is a concurrent map, reads are permitted without synchronisation.
//this method does not modify the mRegisteredEventHandlersByEventType, it modify only its values, sets of eventHandlers.
for (CopyOnWriteArraySet<EventHandler> eventHandlers : mRegisteredEventHandlersByEventType.values()) {
for (EventHandler eventHandler : eventHandlers) {
Object eventHandlerSubscriber = eventHandler.mSubscriber.get();
//Note: if the eventHandlerSubscriber is null, it means that object was GCed,
//so it should be unregistered too.
if (eventHandlerSubscriber == null || eventHandlerSubscriber == subscriber) {
//Note: the eventHandlers is a CopyOnWriteArraySet, it is much better performance-wise
//to remove all handlers in one steep, at the end of loop.
eventHandlersToRemove.add(eventHandler);
}
}
eventHandlers.removeAll(eventHandlersToRemove);
eventHandlersToRemove.clear();
}
//remove related default thread handler
for (IdentityWeakReferenceKey<Object> key : mSubscribersDefaultThreads.keySet()) {
Object ref = key.get();
//Note: if the ref is null, it means that object was GCed,
//so it should be removed too.
if (ref == null || ref == subscriber) {
mSubscribersDefaultThreads.remove(key);
}
}
}
public Config() {
parent = null;
retryableExceptions = new CopyOnWriteArraySet<Class<? extends Exception>>();
recoverableExceptions = new CopyOnWriteArraySet<Class<? extends Exception>>();
for (Class<Exception> e : RECURRING_EXCEPTIONS) {
retryableExceptions.add(e);
recoverableExceptions.add(e);
}
}
/**
* toArray() returns an Object array containing all elements from
* the set in insertion order
*/
public void testToArray() {
Object[] a = new CopyOnWriteArraySet().toArray();
assertTrue(Arrays.equals(new Object[0], a));
assertSame(Object[].class, a.getClass());
Integer[] elements = new Integer[SIZE];
for (int i = 0; i < SIZE; i++)
elements[i] = i;
shuffle(elements);
Collection<Integer> full = populatedSet(elements);
assertTrue(Arrays.equals(elements, full.toArray()));
assertSame(Object[].class, full.toArray().getClass());
}
@Before
public void setUp() {
commitQueue = new LinkedBlockingQueue<>();
clock = new Clock();
CopyOnWriteArraySet<Map<TopicPartition, OffsetAndMetadata>> ackSet = new CopyOnWriteArraySet<>();
acknowledgements = Collections.synchronizedSet(ackSet);
acknowledgeTimeoutMs = 2000;
recordsUtil = new RecordsUtil();
offsetState = new OffsetState(acknowledgements, acknowledgeTimeoutMs, 1000);
offsetAcknowledger = new OffsetAcknowledger(acknowledgements);
workerState = new WorkerState();
committer = new OffsetCommitWorker("committer", new QueueConfig(200), kafkaConsumer, offsetState, commitQueue, workerState, clock);
}
public void subscribe(String path, HelixPropertyListener listener) {
synchronized (_listener) {
Set<HelixPropertyListener> listeners = _listener.get(path);
if (listeners == null) {
listeners = new CopyOnWriteArraySet<HelixPropertyListener>();
_listener.put(path, listeners);
}
listeners.add(listener);
}
}
static CopyOnWriteArraySet populatedSet(Integer[] elements) {
CopyOnWriteArraySet<Integer> a = new CopyOnWriteArraySet<>();
assertTrue(a.isEmpty());
for (int i = 0; i < elements.length; i++)
a.add(elements[i]);
assertFalse(a.isEmpty());
assertEquals(elements.length, a.size());
return a;
}
protected AbstractMediaRouteController() {
mContext = ContextUtils.getApplicationContext();
assert (getContext() != null);
mHandler = new Handler();
mMediaRouteSelector = buildMediaRouteSelector();
MediaRouter mediaRouter;
try {
// Pre-MR1 versions of JB do not have the complete MediaRouter APIs,
// so getting the MediaRouter instance will throw an exception.
mediaRouter = MediaRouter.getInstance(getContext());
} catch (NoSuchMethodError e) {
Log.e(TAG, "Can't get an instance of MediaRouter, casting is not supported."
+ " Are you still on JB (JVP15S)?");
mediaRouter = null;
}
mMediaRouter = mediaRouter;
mAvailableRouteListeners = new HashSet<MediaStateListener>();
// TODO(aberent): I am unclear why this is accessed from multiple threads, but
// if I make it a HashSet then it gets ConcurrentModificationExceptions on some
// types of disconnect. Investigate and fix.
mUiListeners = new CopyOnWriteArraySet<UiListener>();
mDeviceDiscoveryCallback = new DeviceDiscoveryCallback();
mDeviceSelectionCallback = new DeviceSelectionCallback();
}
/**
* Creates a new roster item.
*
* @param user the user.
* @param name the user's name.
*/
public Item(String user, String name) {
this.user = user.toLowerCase();
this.name = name;
itemType = null;
itemStatus = null;
groupNames = new CopyOnWriteArraySet<String>();
}
/**
* Creates a {@code CopyOnWriteArraySet} instance containing the given elements.
*
* @param elements the elements that the set should contain, in order
* @return a new {@code CopyOnWriteArraySet} containing those elements
* @since 12.0
*/
@GwtIncompatible // CopyOnWriteArraySet
public static <E> CopyOnWriteArraySet<E> newCopyOnWriteArraySet(Iterable<? extends E> elements) {
// We copy elements to an ArrayList first, rather than incurring the
// quadratic cost of adding them to the COWAS directly.
Collection<? extends E> elementsCollection =
(elements instanceof Collection)
? Collections2.cast(elements)
: Lists.newArrayList(elements);
return new CopyOnWriteArraySet<E>(elementsCollection);
}
static CopyOnWriteArraySet<Integer> populatedSet(int n) {
CopyOnWriteArraySet<Integer> a = new CopyOnWriteArraySet<>();
assertTrue(a.isEmpty());
for (int i = 0; i < n; i++)
a.add(i);
assertEquals(n == 0, a.isEmpty());
assertEquals(n, a.size());
return a;
}
@Override
public UserEntity registerUser(UserEntity user) throws Exception {
if (!checkPasswordLength(user.getPassword())) {
throw new Exception("Invalid Password!");
} else if (userRepository.findByUsername(user.getUsername().toLowerCase()).isPresent()) {
throw new Exception("Username Already Exists!");
}
user.setPassword(passwordEncoder.encode(user.getPassword()));
Set<GrantedAuthority> authorities = new CopyOnWriteArraySet<>();
authorities.add(new SimpleGrantedAuthority("ROLE_USER"));
user.setAuthorities(authorities);
return userRepository.save(user);
}
public Set<String> findSubscribedTopicByClientId(final String clientId) {
Set<String> result = new HashSet<>();
Iterator<Map.Entry<String, CopyOnWriteArraySet<String>>> it = clientIdMap.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, CopyOnWriteArraySet<String>> entry = it.next();
String topic = entry.getKey();
if (entry.getValue().contains(clientId)) {
result.add(topic);
}
}
return result;
}
/**
* get/init state attribute STATE_EXPANDED_COLLECTIONS
* @param session The tool session to get the object from or create it in.
* @return An {@link #STATE_EXPANDED_COLLECTIONS} but never <code>null</code>.
*/
private static Set<String> getExpandedCollections(ToolSession session) {
Set<String> current = (Set<String>) session.getAttribute(STATE_EXPANDED_COLLECTIONS);
if(current == null)
{
current = new CopyOnWriteArraySet<String>();
session.setAttribute(STATE_EXPANDED_COLLECTIONS, current);
}
return current;
}
/**
* Constructs a {@link DownloadManager}.
*
* @param constructorHelper A {@link DownloaderConstructorHelper} to create {@link Downloader}s
* for downloading data.
* @param maxSimultaneousDownloads The maximum number of simultaneous download tasks.
* @param minRetryCount The minimum number of times a task must be retried before failing.
* @param actionFile The file in which active actions are saved.
* @param deserializers Used to deserialize {@link DownloadAction}s. If empty, {@link
* DownloadAction#getDefaultDeserializers()} is used instead.
*/
public DownloadManager(
DownloaderConstructorHelper constructorHelper,
int maxSimultaneousDownloads,
int minRetryCount,
File actionFile,
Deserializer... deserializers) {
this.downloaderConstructorHelper = constructorHelper;
this.maxActiveDownloadTasks = maxSimultaneousDownloads;
this.minRetryCount = minRetryCount;
this.actionFile = new ActionFile(actionFile);
this.deserializers =
deserializers.length > 0 ? deserializers : DownloadAction.getDefaultDeserializers();
this.downloadsStopped = true;
tasks = new ArrayList<>();
activeDownloadTasks = new ArrayList<>();
Looper looper = Looper.myLooper();
if (looper == null) {
looper = Looper.getMainLooper();
}
handler = new Handler(looper);
fileIOThread = new HandlerThread("DownloadManager file i/o");
fileIOThread.start();
fileIOHandler = new Handler(fileIOThread.getLooper());
listeners = new CopyOnWriteArraySet<>();
loadActions();
logd("Created");
}