下面列出了java.util.concurrent.BlockingQueue#add ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void supportsAsyncBroadcastedServicesWithVoidReturnType() throws Exception {
AstrixRemotingDriver remotingDriver = new AstrixRemotingDriver();
final BlockingQueue<String> receivedRequest = new LinkedBlockingQueue<>();
BroadcastVoidService impl = new BroadcastVoidService() {
@Override
public void hello(String message) {
receivedRequest.add(message);
}
};
remotingDriver.registerServer(BroadcastVoidService.class, impl);
BroadcastVoidServiceAsync testService = remotingDriver.createRemotingProxy(BroadcastVoidServiceAsync.class, BroadcastVoidService.class);
testService.hello("kalle").subscribe();
String lastReceivedRequest = receivedRequest.poll(0, TimeUnit.SECONDS);
assertEquals("kalle", lastReceivedRequest);
}
/**
* If the session.commit() fails, we should not remove the unprocessed message
*/
@Test
public void testMessageNotConsumedOnCommitFail() throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
testRunner.run(1, false);
ConsumeMQTT processor = (ConsumeMQTT) testRunner.getProcessor();
MQTTQueueMessage mock = mock(MQTTQueueMessage.class);
when(mock.getPayload()).thenReturn(new byte[0]);
when(mock.getTopic()).thenReturn("testTopic");
BlockingQueue<MQTTQueueMessage> mqttQueue = getMqttQueue(processor);
mqttQueue.add(mock);
try {
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
transferQueue(processor,
(ProcessSession) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] { ProcessSession.class }, (proxy, method, args) -> {
if (method.getName().equals("commit")) {
throw new RuntimeException();
} else {
return method.invoke(session, args);
}
}));
fail("Expected runtime exception");
} catch (InvocationTargetException e) {
assertTrue("Expected generic runtime exception, not " + e, e.getCause() instanceof RuntimeException);
}
assertTrue("Expected mqttQueue to contain uncommitted message.", mqttQueue.contains(mock));
}
public void addIntoProcessingQueueAndList(Set<JobContext> jobSet, BlockingQueue<JobContext> queue, ResourceType type) {
try {
readWriteLock.writeLock().lock();
LOG.info("Write lock acquired");
List<String> processingList = zkStateManager.readProcessedJobs(type);
processingList.addAll(extractJobList(type));
for (JobContext context: jobSet) {
String jobId = context.jobId;
if (!processingList.contains(jobId)) {
addIntoProcessingList(type, context);
queue.add(context);
}
}
}
finally {
try {readWriteLock.writeLock().unlock(); LOG.info("Write lock released");}
catch (Throwable t) { LOG.error("Fail to release Write lock", t);}
}
}
private BlockingQueue<StreamsDatum> constructQueue() {
BlockingQueue<StreamsDatum> datums = new ArrayBlockingQueue<>(numMessages);
for(int i=0;i<numMessages;i++) {
datums.add(new StreamsDatum(i));
}
return datums;
}
@Override
public void add(QueuedRequest entity) {
LOG.debug("Add entity {}", entity);
String endpoint = entity.getEndpoint();
BlockingQueue<QueuedRequest> requestQueue = getMessageQueueForEndpoint(endpoint);
requestQueue.add(entity);
requestQueueMap.putIfAbsent(endpoint, requestQueue);
}
@Test
public void testCleanExpiredRequestInQueue() throws Exception {
BrokerFastFailure brokerFastFailure = new BrokerFastFailure(null);
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
brokerFastFailure.cleanExpiredRequestInQueue(queue, 1);
assertThat(queue.size()).isZero();
//Normal Runnable
Runnable runnable = new Runnable() {
@Override
public void run() {
}
};
queue.add(runnable);
assertThat(queue.size()).isEqualTo(1);
brokerFastFailure.cleanExpiredRequestInQueue(queue, 1);
assertThat(queue.size()).isEqualTo(1);
queue.clear();
//With expired request
RequestTask expiredRequest = new RequestTask(runnable, null, null);
queue.add(new FutureTaskExt<>(expiredRequest, null));
TimeUnit.MILLISECONDS.sleep(100);
RequestTask requestTask = new RequestTask(runnable, null, null);
queue.add(new FutureTaskExt<>(requestTask, null));
assertThat(queue.size()).isEqualTo(2);
brokerFastFailure.cleanExpiredRequestInQueue(queue, 100);
assertThat(queue.size()).isEqualTo(1);
assertThat(((FutureTaskExt) queue.peek()).getRunnable()).isEqualTo(requestTask);
}
private static BlockingQueue<byte[]> setupBuffers(int bufferSize, int count, Meter meter) {
BlockingQueue<byte[]> queue = new ArrayBlockingQueue<byte[]>(count);
for (int i = 0; i < count; i++) {
meter.mark();
queue.add(new byte[bufferSize]);
}
return queue;
}
public synchronized void addKnownInput(int partition, int partitionCount,
InputAttemptIdentifier srcAttempt) {
PartitionRange partitionRange = new PartitionRange(partition, partitionCount);
BlockingQueue<InputAttemptIdentifier> inputs =
partitionToInputs.get(partitionRange);
if (inputs == null) {
inputs = new LinkedBlockingQueue<InputAttemptIdentifier>();
partitionToInputs.put(partitionRange, inputs);
}
inputs.add(srcAttempt);
}
private BlockingQueue<Connection> queued(int inside) {
BlockingQueue<Connection> ret = new LinkedBlockingDeque<>();
for (int i=0; i<inside; i++) {
ret.add(new Connection() {
@Override
public void send(Request request, Response.CompleteListener listener) {}
@Override
public void close() {}
});
}
return ret;
}
/**
* Tests {@link com.sonymobile.tools.gerrit.gerritevents.GerritHandler#addListener(GerritEventListener)}.
* With 10000 listeners added by 10 threads at the same time.
*
* @throws Exception if so.
*/
@Test
public void testAddListenerManyAtTheSameTime() throws Exception {
final int nrOfListeners = 100000;
BlockingQueue<Runnable> listeners = new LinkedBlockingQueue<Runnable>(nrOfListeners);
System.out.print("Creating Listeners");
for (int i = 0; i < nrOfListeners; i++) {
listeners.add(new Runnable() {
GerritEventListener listener = new ListenerMock();
@Override
public void run() {
handler.addListener(listener);
}
});
if (i % 1000 == 0) {
System.out.print(".");
}
}
System.out.println(".Done!");
ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 1, TimeUnit.MINUTES, listeners);
executor.prestartAllCoreThreads();
executor.shutdown();
do {
System.out.printf("Waiting for listeners to be added...Running#: %5d Left#: %5d Count#: %5d\n",
executor.getActiveCount(), listeners.size(), handler.getEventListenersCount());
} while (!executor.awaitTermination(1, TimeUnit.SECONDS));
System.out.printf(" Listeners are added...Running#: %5d Left#: %5d Count#: %5d\n",
executor.getActiveCount(), listeners.size(), handler.getEventListenersCount());
assertEquals(nrOfListeners, handler.getEventListenersCount());
}
@Test
public void runOnQueueRemovesItemFromQueuCallsCommitDirty() {
BlockingQueue<Integer> bc = new LinkedBlockingQueue<>(5);
bc.add(1);
IntegerConsumerSpy ic = new IntegerConsumerSpy(bc);
ic.run();
assertTrue(bc.isEmpty());
assertTrue(ic.isCommitDirtyCalled());
assertTrue(ic.isUpdateCommitTimeCalled());
}
private BlockingQueue<StreamsDatum> createInputQueue(int numDatums) {
BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>();
for(int i=0; i < numDatums; ++i) {
queue.add(new StreamsDatum(i));
}
return queue;
}
public static void main(String[] args) throws Exception {
MBeanServer mbs = MBeanServerFactory.newMBeanServer();
ObjectName relSvcName = new ObjectName("a:type=relationService");
RelationServiceMBean relSvc =
JMX.newMBeanProxy(mbs, relSvcName, RelationServiceMBean.class);
mbs.createMBean("javax.management.relation.RelationService",
relSvcName,
new Object[] {Boolean.TRUE},
new String[] {"boolean"});
final BlockingQueue<Notification> q =
new ArrayBlockingQueue<Notification>(100);
NotificationListener qListener = new NotificationListener() {
public void handleNotification(Notification notification,
Object handback) {
q.add(notification);
}
};
mbs.addNotificationListener(relSvcName, qListener, null, null);
RoleInfo leftInfo =
new RoleInfo("left", "javax.management.timer.TimerMBean");
RoleInfo rightInfo =
new RoleInfo("right", "javax.management.timer.Timer");
relSvc.createRelationType("typeName", new RoleInfo[] {leftInfo, rightInfo});
ObjectName timer1 = new ObjectName("a:type=timer,number=1");
ObjectName timer2 = new ObjectName("a:type=timer,number=2");
mbs.createMBean("javax.management.timer.Timer", timer1);
mbs.createMBean("javax.management.timer.Timer", timer2);
Role leftRole =
new Role("left", Arrays.asList(new ObjectName[] {timer1}));
Role rightRole =
new Role("right", Arrays.asList(new ObjectName[] {timer2}));
RoleList roles =
new RoleList(Arrays.asList(new Role[] {leftRole, rightRole}));
final int NREPEAT = 10;
for (int i = 0; i < NREPEAT; i++) {
relSvc.createRelation("relationName", "typeName", roles);
relSvc.removeRelation("relationName");
}
Notification firstNotif = q.remove();
long seqNo = firstNotif.getSequenceNumber();
for (int i = 0; i < NREPEAT * 2 - 1; i++) {
Notification n = q.remove();
long nSeqNo = n.getSequenceNumber();
if (nSeqNo != seqNo + 1) {
throw new Exception(
"TEST FAILED: expected seqNo " + (seqNo + 1) + "; got " +
nSeqNo);
}
seqNo++;
}
System.out.println("TEST PASSED: got " + (NREPEAT * 2) + " notifications " +
"with contiguous sequence numbers");
}
public static void main(String[] args) throws Exception {
MBeanServer mbs = MBeanServerFactory.newMBeanServer();
ObjectName relSvcName = new ObjectName("a:type=relationService");
RelationServiceMBean relSvc =
JMX.newMBeanProxy(mbs, relSvcName, RelationServiceMBean.class);
mbs.createMBean("javax.management.relation.RelationService",
relSvcName,
new Object[] {Boolean.TRUE},
new String[] {"boolean"});
final BlockingQueue<Notification> q =
new ArrayBlockingQueue<Notification>(100);
NotificationListener qListener = new NotificationListener() {
public void handleNotification(Notification notification,
Object handback) {
q.add(notification);
}
};
mbs.addNotificationListener(relSvcName, qListener, null, null);
RoleInfo leftInfo =
new RoleInfo("left", "javax.management.timer.TimerMBean");
RoleInfo rightInfo =
new RoleInfo("right", "javax.management.timer.Timer");
relSvc.createRelationType("typeName", new RoleInfo[] {leftInfo, rightInfo});
ObjectName timer1 = new ObjectName("a:type=timer,number=1");
ObjectName timer2 = new ObjectName("a:type=timer,number=2");
mbs.createMBean("javax.management.timer.Timer", timer1);
mbs.createMBean("javax.management.timer.Timer", timer2);
Role leftRole =
new Role("left", Arrays.asList(new ObjectName[] {timer1}));
Role rightRole =
new Role("right", Arrays.asList(new ObjectName[] {timer2}));
RoleList roles =
new RoleList(Arrays.asList(new Role[] {leftRole, rightRole}));
final int NREPEAT = 10;
for (int i = 0; i < NREPEAT; i++) {
relSvc.createRelation("relationName", "typeName", roles);
relSvc.removeRelation("relationName");
}
Notification firstNotif = q.remove();
long seqNo = firstNotif.getSequenceNumber();
for (int i = 0; i < NREPEAT * 2 - 1; i++) {
Notification n = q.remove();
long nSeqNo = n.getSequenceNumber();
if (nSeqNo != seqNo + 1) {
throw new Exception(
"TEST FAILED: expected seqNo " + (seqNo + 1) + "; got " +
nSeqNo);
}
seqNo++;
}
System.out.println("TEST PASSED: got " + (NREPEAT * 2) + " notifications " +
"with contiguous sequence numbers");
}
@Test(enabled = false) // this test is very flakey - it needs to be re-written at some point
public void testMissedDelete() throws Exception
{
Timing timing = new Timing();
PathChildrenCache cache = null;
CuratorFramework client1 = null;
CuratorFramework client2 = null;
TestingCluster cluster = createAndStartCluster(3);
try
{
// client 1 only connects to 1 server
InstanceSpec client1Instance = cluster.getInstances().iterator().next();
client1 = CuratorFrameworkFactory.newClient(client1Instance.getConnectString(), 1000, 1000, new RetryOneTime(1));
cache = new PathChildrenCache(client1, "/test", true);
final BlockingQueue<PathChildrenCacheEvent.Type> events = Queues.newLinkedBlockingQueue();
PathChildrenCacheListener listener = new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
events.add(event.getType());
}
};
cache.getListenable().addListener(listener);
client2 = CuratorFrameworkFactory.newClient(cluster.getConnectString(), 1000, 1000, new RetryOneTime(1));
client1.start();
client2.start();
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
Assert.assertEquals(events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED);
Assert.assertEquals(events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.INITIALIZED);
client2.create().creatingParentsIfNeeded().forPath("/test/node", "first".getBytes());
Assert.assertEquals(events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
cluster.killServer(client1Instance);
Assert.assertEquals(events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED);
Assert.assertEquals(events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CONNECTION_LOST);
client2.delete().forPath("/test/node");
client2.create().forPath("/test/node", "second".getBytes());
cluster.restartServer(client1Instance);
Assert.assertEquals(events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED);
Assert.assertEquals(events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED); // "/test/node" is different - should register as updated
}
finally
{
CloseableUtils.closeQuietly(client1);
CloseableUtils.closeQuietly(client2);
CloseableUtils.closeQuietly(cache);
CloseableUtils.closeQuietly(cluster);
}
}
public static void main(String[] args) throws Exception {
MBeanServer mbs = MBeanServerFactory.newMBeanServer();
ObjectName relSvcName = new ObjectName("a:type=relationService");
RelationServiceMBean relSvc =
JMX.newMBeanProxy(mbs, relSvcName, RelationServiceMBean.class);
mbs.createMBean("javax.management.relation.RelationService",
relSvcName,
new Object[] {Boolean.TRUE},
new String[] {"boolean"});
final BlockingQueue<Notification> q =
new ArrayBlockingQueue<Notification>(100);
NotificationListener qListener = new NotificationListener() {
public void handleNotification(Notification notification,
Object handback) {
q.add(notification);
}
};
mbs.addNotificationListener(relSvcName, qListener, null, null);
RoleInfo leftInfo =
new RoleInfo("left", "javax.management.timer.TimerMBean");
RoleInfo rightInfo =
new RoleInfo("right", "javax.management.timer.Timer");
relSvc.createRelationType("typeName", new RoleInfo[] {leftInfo, rightInfo});
ObjectName timer1 = new ObjectName("a:type=timer,number=1");
ObjectName timer2 = new ObjectName("a:type=timer,number=2");
mbs.createMBean("javax.management.timer.Timer", timer1);
mbs.createMBean("javax.management.timer.Timer", timer2);
Role leftRole =
new Role("left", Arrays.asList(new ObjectName[] {timer1}));
Role rightRole =
new Role("right", Arrays.asList(new ObjectName[] {timer2}));
RoleList roles =
new RoleList(Arrays.asList(new Role[] {leftRole, rightRole}));
final int NREPEAT = 10;
for (int i = 0; i < NREPEAT; i++) {
relSvc.createRelation("relationName", "typeName", roles);
relSvc.removeRelation("relationName");
}
Notification firstNotif = q.remove();
long seqNo = firstNotif.getSequenceNumber();
for (int i = 0; i < NREPEAT * 2 - 1; i++) {
Notification n = q.remove();
long nSeqNo = n.getSequenceNumber();
if (nSeqNo != seqNo + 1) {
throw new Exception(
"TEST FAILED: expected seqNo " + (seqNo + 1) + "; got " +
nSeqNo);
}
seqNo++;
}
System.out.println("TEST PASSED: got " + (NREPEAT * 2) + " notifications " +
"with contiguous sequence numbers");
}
/**
* This test emits data and watermark by emulating an unbounded source readable.
*
* @throws Exception exception on the way.
*/
@Test()
public void testUnboundedSourceVertexDataFetching() throws Exception {
final IRVertex sourceIRVertex = new TestUnboundedSourceVertex();
final Long watermark = 1234567L;
final BlockingQueue<Long> watermarkQueue = new LinkedBlockingQueue<>();
watermarkQueue.add(watermark);
final Readable readable = new TestUnboundedSourceReadable(watermarkQueue, 1);
final Map<String, Readable> vertexIdToReadable = new HashMap<>();
vertexIdToReadable.put(sourceIRVertex.getId(), readable);
final List<Watermark> emittedWatermarks = new LinkedList<>();
final Transform transform = new StreamTransformNoWatermarkEmit(emittedWatermarks);
final OperatorVertex operatorVertex = new OperatorVertex(transform);
final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag =
new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
.addVertex(sourceIRVertex)
.addVertex(operatorVertex)
.connectVertices(createEdge(sourceIRVertex, operatorVertex, "edge1"))
.buildWithoutSourceSinkCheck();
final StageEdge taskOutEdge = mockStageEdgeFrom(operatorVertex);
final Task task =
new Task(
"testSourceVertexDataFetching",
generateTaskId(),
TASK_EXECUTION_PROPERTY_MAP,
new byte[0],
Collections.emptyList(),
Collections.singletonList(taskOutEdge),
vertexIdToReadable);
// Execute the task.
final TaskExecutor taskExecutor = getTaskExecutor(task, taskDag);
taskExecutor.execute();
// Check whether the watermark is emitted
assertEquals(Arrays.asList(new Watermark(watermark)), emittedWatermarks);
// Check the output.
assertEquals(elements, runtimeEdgeToOutputData.get(taskOutEdge.getId()));
}
@Test(timeout = 5_000)
public void buildSingleProjectTestCase() throws Exception {
// given
DatastoreMock datastoreMock = new DatastoreMock();
TestProjectConfigurationBuilder configurationBuilder = new TestProjectConfigurationBuilder(datastoreMock);
DatastoreAdapter datastoreAdapter = new DatastoreAdapter(datastoreMock);
SystemConfig systemConfig = createConfiguration();
BuildQueue queue = new BuildQueue(systemConfig);
BlockingQueue<BuildStatusChangedEvent> receivedStatuses = new ArrayBlockingQueue<>(5);
Consumer<BuildStatusChangedEvent> onStatusUpdate = (event) -> {
receivedStatuses.add(event);
};
EventListener buildStatusChangedEventNotifier = new EventListener(onStatusUpdate);
BlockingQueue<BpmTask> task = new ArrayBlockingQueue<>(5);
Consumer<BpmTask> onBpmTaskCreated = (t) -> {
task.add(t);
};
BuildSchedulerFactory buildSchedulerFactory = new BuildSchedulerFactory(onBpmTaskCreated);
BuildCoordinator coordinator = new DefaultBuildCoordinator(
datastoreAdapter,
buildStatusChangedEventNotifier,
null,
buildSchedulerFactory,
queue,
systemConfig,
groupBuildMapper,
buildMapper);
coordinator.start();
queue.initSemaphore();
coordinator.build(
configurationBuilder.buildConfigurationToCancel(1, "c1-bpm"),
MockUser.newTestUser(1),
new BuildOptions());
waitForStatus(receivedStatuses, BuildStatus.BUILDING);
BpmTask bpmTask = task.poll(1, TimeUnit.SECONDS);
BuildResultRest result = new BuildResultRest();
result.setCompletionStatus(CompletionStatus.CANCELLED);
// when
bpmTask.notify(BUILD_COMPLETE, result);
waitForStatus(receivedStatuses, BuildStatus.CANCELLED);
// expect
List<BuildRecord> buildRecords = datastoreMock.getBuildRecords();
Assert.assertEquals("Too many build records in datastore: " + buildRecords, 1, buildRecords.size());
BuildRecord buildRecord = buildRecords.get(0);
Assert.assertNotNull(buildRecord.getSubmitTime());
Assert.assertNotNull(buildRecord.getStartTime());
Assert.assertNotNull(buildRecord.getEndTime());
Assert.assertEquals(BuildStatus.CANCELLED, buildRecord.getStatus());
}
@Override
public AccountInfo load(UUID uuid) {
try {
byte[] uuidBytes = getBytesFromUUID(uuid);
// loadAccount
loadAccount.setBytes(1, uuidBytes);
try (ResultSet resultLoadAccount = loadAccount.executeQuery()) {
loadAccount.clearParameters();
if (!resultLoadAccount.next()) return new AccountInfo(new Account(uuid), true, true);
// getIgnores
getIgnores.setBytes(1, uuidBytes);
try (ResultSet resultGetIgnores = getIgnores.executeQuery()) {
getIgnores.clearParameters();
BlockingQueue<UUID> ignores = new LinkedBlockingQueue<>();
while (resultGetIgnores.next()) {
ignores.add(getUUIDFromBytes(resultGetIgnores.getBytes(tableIgnoresColumnIgnores)));
}
return new AccountInfo(
new Account(
uuid,
ChannelType.valueOf(resultLoadAccount.getString(tableAccountsColumnChannelType)),
resultLoadAccount.getBoolean(tableAccountsColumnVanished),
resultLoadAccount.getBoolean(tableAccountsColumnMessenger),
resultLoadAccount.getBoolean(tableAccountsColumnSocialSpy),
resultLoadAccount.getBoolean(tableAccountsColumnLocalSpy),
ignores,
resultLoadAccount.getTimestamp(tableAccountsColumnMutedUntil),
Optional.ofNullable(resultLoadAccount.getString(tableAccountsColumnStoredPrefix)),
Optional.ofNullable(
resultLoadAccount.getString(tableAccountsColumnStoredSuffix))),
true,
false);
}
}
} catch (SQLException e) {
LoggerHelper.error("Could not load user " + uuid + " from database!", e);
return new AccountInfo(new Account(uuid), true, true);
}
}
public static void main(String[] args) throws Exception {
MBeanServer mbs = MBeanServerFactory.newMBeanServer();
ObjectName relSvcName = new ObjectName("a:type=relationService");
RelationServiceMBean relSvc =
JMX.newMBeanProxy(mbs, relSvcName, RelationServiceMBean.class);
mbs.createMBean("javax.management.relation.RelationService",
relSvcName,
new Object[] {Boolean.TRUE},
new String[] {"boolean"});
final BlockingQueue<Notification> q =
new ArrayBlockingQueue<Notification>(100);
NotificationListener qListener = new NotificationListener() {
public void handleNotification(Notification notification,
Object handback) {
q.add(notification);
}
};
mbs.addNotificationListener(relSvcName, qListener, null, null);
RoleInfo leftInfo =
new RoleInfo("left", "javax.management.timer.TimerMBean");
RoleInfo rightInfo =
new RoleInfo("right", "javax.management.timer.Timer");
relSvc.createRelationType("typeName", new RoleInfo[] {leftInfo, rightInfo});
ObjectName timer1 = new ObjectName("a:type=timer,number=1");
ObjectName timer2 = new ObjectName("a:type=timer,number=2");
mbs.createMBean("javax.management.timer.Timer", timer1);
mbs.createMBean("javax.management.timer.Timer", timer2);
Role leftRole =
new Role("left", Arrays.asList(new ObjectName[] {timer1}));
Role rightRole =
new Role("right", Arrays.asList(new ObjectName[] {timer2}));
RoleList roles =
new RoleList(Arrays.asList(new Role[] {leftRole, rightRole}));
final int NREPEAT = 10;
for (int i = 0; i < NREPEAT; i++) {
relSvc.createRelation("relationName", "typeName", roles);
relSvc.removeRelation("relationName");
}
Notification firstNotif = q.remove();
long seqNo = firstNotif.getSequenceNumber();
for (int i = 0; i < NREPEAT * 2 - 1; i++) {
Notification n = q.remove();
long nSeqNo = n.getSequenceNumber();
if (nSeqNo != seqNo + 1) {
throw new Exception(
"TEST FAILED: expected seqNo " + (seqNo + 1) + "; got " +
nSeqNo);
}
seqNo++;
}
System.out.println("TEST PASSED: got " + (NREPEAT * 2) + " notifications " +
"with contiguous sequence numbers");
}