java.util.concurrent.LinkedBlockingQueue#put ( )源码实例Demo

下面列出了java.util.concurrent.LinkedBlockingQueue#put ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: openjdk-jdk9   文件: LinkedBlockingQueueTest.java
/**
 * drainTo empties full queue, unblocking a waiting put.
 */
public void testDrainToWithActivePut() throws InterruptedException {
    final LinkedBlockingQueue q = populatedQueue(SIZE);
    Thread t = new Thread(new CheckedRunnable() {
        public void realRun() throws InterruptedException {
            q.put(new Integer(SIZE + 1));
        }});

    t.start();
    ArrayList l = new ArrayList();
    q.drainTo(l);
    assertTrue(l.size() >= SIZE);
    for (int i = 0; i < SIZE; ++i)
        assertEquals(l.get(i), new Integer(i));
    t.join();
    assertTrue(q.size() + l.size() >= SIZE);
}
 
源代码2 项目: reladomo   文件: MultiThreadedBatchProcessor.java
protected void queueResultsWithoutDeepFetch(List<T> accumulator, LinkedBlockingQueue<TL> listQueue, Object shardId)
{
    TL list = (TL) finderInstance.constructEmptyList();
    list.addAll(accumulator);
    try
    {
        listQueue.put(list); // must not touch tradeList after queuing, as another thread may be manipulating it.
        String msg = "";
        if (shardId != null)
        {
            msg = " for source " + shardId;
        }
        LOGGER.info("queued " + accumulator.size() + msg);
        totalQueued.addAndGet(accumulator.size());
    }
    catch (InterruptedException e)
    {
        throw new RuntimeException("Unexpected exception", e);
    }
}
 
源代码3 项目: j2objc   文件: LinkedBlockingQueueTest.java
/**
 * drainTo empties full queue, unblocking a waiting put.
 */
public void testDrainToWithActivePut() throws InterruptedException {
    final LinkedBlockingQueue q = populatedQueue(SIZE);
    Thread t = new Thread(new CheckedRunnable() {
        public void realRun() throws InterruptedException {
            q.put(new Integer(SIZE + 1));
        }});

    t.start();
    ArrayList l = new ArrayList();
    q.drainTo(l);
    assertTrue(l.size() >= SIZE);
    for (int i = 0; i < SIZE; ++i)
        assertEquals(l.get(i), new Integer(i));
    t.join();
    assertTrue(q.size() + l.size() >= SIZE);
}
 
源代码4 项目: julongchain   文件: SmartContractSupportService.java
private void handleReceiveCompleteOrErrorMessage(SmartContractMessage message, String txId) {
    if (StringUtils.isEmpty(txId)) {
        return;
    }
    try {
        LinkedBlockingQueue queue = txIdAndQueueMap.get(txId);
        if (queue == null) {
            return;
        }
        queue.put(message);
    } catch (InterruptedException e) {
        logger.error(e.getMessage(), e);
    }
}
 
源代码5 项目: openjdk-jdk9   文件: LinkedBlockingQueueTest.java
/**
 * all elements successfully put are contained
 */
public void testPut() throws InterruptedException {
    LinkedBlockingQueue q = new LinkedBlockingQueue(SIZE);
    for (int i = 0; i < SIZE; ++i) {
        Integer x = new Integer(i);
        q.put(x);
        assertTrue(q.contains(x));
    }
    assertEquals(0, q.remainingCapacity());
}
 
源代码6 项目: reladomo   文件: InactivateForArchivingLoader.java
private void putWithoutException(LinkedBlockingQueue queue, Object o)
{
    while(true)
    {
        try
        {
            queue.put(o);
            break;
        }
        catch (InterruptedException e)
        {
            //ignore - try again
        }
    }
}
 
源代码7 项目: java-slack-sdk   文件: AsyncRateLimitQueue.java
public <T extends SlackApiResponse> void enqueue(
        String messageId,
        String teamId,
        String methodName,
        Map<String, String> params,
        AsyncExecutionSupplier<T> methodsSupplier) throws InterruptedException {

    AsyncMethodsRateLimiter.WaitTime waitTime;
    if (methodName.equals(Methods.CHAT_POST_MESSAGE)) {
        waitTime = rateLimiter.acquireWaitTimeForChatPostMessage(teamId, params.get("channel"));
    } else {
        waitTime = rateLimiter.acquireWaitTime(teamId, methodName);
    }

    LinkedBlockingQueue<Message> activeQueue = getOrCreateActiveQueue(methodName);
    long epochMillisToRun = System.currentTimeMillis() + waitTime.getMillisToWait();
    Message message = new Message(messageId, epochMillisToRun, waitTime, methodsSupplier);
    activeQueue.put(message);

    if (log.isDebugEnabled()) {
        log.debug("A new message has been enqueued (id: {}, pace: {}, wait time: {})",
                message.getId(),
                message.getWaitTime().getPace(),
                message.getWaitTime().getMillisToWait()
        );
    }
}
 
源代码8 项目: SeimiCrawler   文件: DefaultLocalQueue.java
@Override
public boolean push(Request req) {
    try {
        LinkedBlockingQueue<Request> queue = getQueue(req.getCrawlerName());
        queue.put(req);
        return true;
    } catch (InterruptedException e) {
        logger.error(e.getMessage(),e);
    }
    return false;
}
 
源代码9 项目: hypergraphdb   文件: TaskActivity.java
public void stateChanged(Object newState, AbstractActivity<?> activity)
{
    System.out.println("TaskActivity: conversation state changed to "
                       + newState + " while in " + getState());

    StateType interestedState = conversationQueueMaping.get(newState);

    LinkedBlockingQueue<AbstractActivity<?>> queue = activityQueues.get(interestedState);

    if (queue != null)
    {
        try
        {
            System.out.println("queueing message for " + interestedState);
            queue.put(activity);
        }
        catch (InterruptedException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    else
    {
        System.out.println("can not find queue for " + interestedState);
    }
}
 
源代码10 项目: j2objc   文件: LinkedBlockingQueueTest.java
/**
 * all elements successfully put are contained
 */
public void testPut() throws InterruptedException {
    LinkedBlockingQueue q = new LinkedBlockingQueue(SIZE);
    for (int i = 0; i < SIZE; ++i) {
        Integer x = new Integer(i);
        q.put(x);
        assertTrue(q.contains(x));
    }
    assertEquals(0, q.remainingCapacity());
}
 
源代码11 项目: openhab1-addons   文件: NikobusCommandParserTest.java
private void testSplit(String[] values, NikobusCommand[] commands) throws InterruptedException {

        NikobusCommandReceiver parser = new NikobusCommandReceiver(null);

        final List<NikobusCommand> receivedCmds = new ArrayList<NikobusCommand>();

        LinkedBlockingQueue<byte[]> queue = new LinkedBlockingQueue<byte[]>();
        parser.setBufferQueue(queue);
        parser.register(new NikobusCommandListener() {

            @Override
            public void processNikobusCommand(NikobusCommand command, NikobusBinding binding) {
                receivedCmds.add(new NikobusCommand(command.getCommand(), command.getRepeats()));
            }

            @Override
            public String getName() {
                return "dummy";
            }
        });

        for (String s : values) {
            byte[] bb = s.getBytes();
            queue.put(bb);
        }
        Thread t = new Thread(parser);
        t.start();
        Thread.sleep(1000);
        t.interrupt();

        for (int i = 0; i < commands.length; i++) {
            NikobusCommand c = receivedCmds.get(i);
            assertEquals(commands[i].getCommand(), c.getCommand());
            assertEquals(commands[i].getRepeats(), c.getRepeats());
        }

        assertEquals(commands.length, receivedCmds.size());
    }
 
源代码12 项目: tez   文件: AsyncDispatcherConcurrent.java
public void handle(TezAbstractEvent event) {
  if (stopped) {
    return;
  }
  if (blockNewEvents) {
    return;
  }
  drained = false;
  
  // offload to specific dispatcher if one exists
  Class<? extends Enum> type = event.getType().getDeclaringClass();
  AsyncDispatcherConcurrent registeredDispatcher = eventDispatchers.get(type);
  if (registeredDispatcher != null) {
    registeredDispatcher.getEventHandler().handle(event);
    return;
  }
  
  int index = numThreads > 1 ? event.getSerializingHash() % numThreads : 0;

 // no registered dispatcher. use internal dispatcher.
  LinkedBlockingQueue<Event> queue = eventQueues.get(index);
  /* all this method does is enqueue all the events onto the queue */
  int qSize = queue.size();
  if (qSize !=0 && qSize %1000 == 0) {
    LOG.info("Size of event-queue is " + qSize);
  }
  int remCapacity = queue.remainingCapacity();
  if (remCapacity < 1000) {
    LOG.warn("Very low remaining capacity in the event-queue: "
        + remCapacity);
  }
  try {
    queue.put(event);
  } catch (InterruptedException e) {
    if (!stopped) {
      LOG.warn("AsyncDispatcher thread interrupted", e);
    }
    throw new YarnRuntimeException(e);
  }
}
 
源代码13 项目: splicer   文件: HttpWorker.java
@Override
public String call() throws Exception
{
	LOG.debug("Start time={}, End time={}", Const.tsFormat(query.startTime()),
			Const.tsFormat(query.endTime()));

	String metricName = query.getQueries().get(0).getMetric();
	String cacheResult = JedisClient.get().get(this.query.toString());
	if (cacheResult != null) {
		LOG.debug("Cache hit for start=" + query.startTime()
				+ ", end=" + query.endTime() + ", metric=" + metricName);
		return cacheResult;
	}

	String hostname = checker.getBestRegionHost(metricName,
			query.startTime() / 1000, query.endTime() / 1000);
	LOG.debug("Found region server hostname={} for metric={}", hostname, metricName);

	LinkedBlockingQueue<String> TSDs;
	if (hostname == null) {
		LOG.error("Could not find region server for metric={}", metricName);
		return "{'error': 'Could not find region server for metric=" + metricName + "'}";
	}

	TSDs = TSDMap.get(hostname);
	if (TSDs == null) {
		String host = select(); // randomly select a host (basic load balancing)
		TSDs = TSDMap.get(host);
		if (TSDs == null) {
			LOG.error("We are not running TSDs on regionserver={}. Fallback failed. Returning error", hostname);
			return "{'error': 'Fallback to hostname=" + hostname + " failed.'}";
		} else {
			LOG.info("Falling back to " + host + " for queries");
		}
	}

	String server = TSDs.take();
	String uri = "http://" + server + "/api/query/qexp/";

	CloseableHttpClient postman = HttpClientBuilder.create().build();
	try {

		HttpPost postRequest = new HttpPost(uri);

		StringEntity input = new StringEntity(JSON.serializeToString(query));
		input.setContentType("application/json");
		postRequest.setEntity(input);
		LOG.debug("Sending request to: {} for query {} ", uri, query);

		HttpResponse response = postman.execute(postRequest);

		if (response.getStatusLine().getStatusCode() != 200) {
			throw new RuntimeException("Failed : HTTP error code : "
					+ response.getStatusLine().getStatusCode());
		}

		List<String> dl = IOUtils.readLines(response.getEntity().getContent());
		String result = StringUtils.join(dl, "");
		LOG.debug("Result={}", result);
		if (isCacheable(query)) {
			JedisClient.get().put(this.query.toString(), result);
		}
		return result;
	} finally {
		IOUtils.closeQuietly(postman);

		TSDs.put(server);
		LOG.debug("Returned {} into the available queue", server);
	}
}
 
源代码14 项目: splicer   文件: SuggestHttpWorker.java
@Override
public String call() throws Exception {
	LinkedBlockingQueue<String> TSDs;

	//TODO: have it implement its own RegionChecker to get hbase locality looking for metric names
	//lets have it just pick a random host
	String hostname = getRandomHost();
	TSDs = HttpWorker.TSDMap.get(hostname);

	if (TSDs == null) {
		LOG.error("We are not running TSDs on regionserver={}. Choosing a random host failed", hostname);
		return "{'error': 'Choice of hostname=" + hostname + " failed.'}";
	}

	String server = TSDs.take();
	String uri = "http://" + server + "/api/suggest?" + suggestQuery;

	CloseableHttpClient postman = HttpClientBuilder.create().build();
	try {
		HttpGet getRequest = new HttpGet(uri);

		LOG.info("Sending query=" + uri + " to TSD running on host=" + hostname);

		HttpResponse response = postman.execute(getRequest);

		if (response.getStatusLine().getStatusCode() != 200) {
			throw new RuntimeException("Failed : HTTP error code : "
					+ response.getStatusLine().getStatusCode());
		}

		List<String> dl = IOUtils.readLines(response.getEntity().getContent());
		String result = StringUtils.join(dl, "");
		LOG.info("Result={}", result);

		return result;
	} finally {
		IOUtils.closeQuietly(postman);

		TSDs.put(server);
		LOG.info("Returned {} into the available queue", server);
	}
}
 
源代码15 项目: pravega   文件: RequestSweeperTest.java
@Test(timeout = 30000)
public void testRequestSweeper() throws ExecutionException, InterruptedException {
    AbstractMap.SimpleEntry<Double, Double> segment1 = new AbstractMap.SimpleEntry<>(0.5, 0.75);
    AbstractMap.SimpleEntry<Double, Double> segment2 = new AbstractMap.SimpleEntry<>(0.75, 1.0);
    List<Long> sealedSegments = Collections.singletonList(1L);

    CompletableFuture<Void> wait1 = new CompletableFuture<>();
    CompletableFuture<Void> wait2 = new CompletableFuture<>();
    LinkedBlockingQueue<CompletableFuture<Void>> waitQueue = new LinkedBlockingQueue<>();
    waitQueue.put(wait1);
    waitQueue.put(wait2);
    CompletableFuture<Void> signal1 = new CompletableFuture<>();
    CompletableFuture<Void> signal2 = new CompletableFuture<>();
    LinkedBlockingQueue<CompletableFuture<Void>> signalQueue = new LinkedBlockingQueue<>();
    signalQueue.put(signal1);
    signalQueue.put(signal2);
    doAnswer(x -> {
        signalQueue.take().complete(null);
        return waitQueue.take();
    }).when(requestEventWriter).writeEvent(any(), any());
    
    streamMetadataTasks.manualScale(SCOPE, stream1, sealedSegments, Arrays.asList(segment1, segment2), 
            System.currentTimeMillis(), null);

    signal1.join();
    // since we dont complete writeEventFuture, manual scale will not complete and index is not removed
    // verify that index has the entry.
    HostIndex hostIndex = getHostIndex();
    List<String> entities = hostIndex.getEntities(HOSTNAME).join();
    assertEquals(1, entities.size());
    byte[] data = hostIndex.getEntityData(HOSTNAME, entities.get(0)).join();
    ControllerEventSerializer serializer = new ControllerEventSerializer();
    ControllerEvent event = serializer.fromByteBuffer(ByteBuffer.wrap(data));
    assertTrue(event instanceof ScaleOpEvent);
    
    RequestSweeper requestSweeper = new RequestSweeper(streamStore, executor, streamMetadataTasks);
    CompletableFuture<Void> failoverFuture = requestSweeper.handleFailedProcess(HOSTNAME);

    // verify that the event is posted.. signal 2 future should be completed. 
    signal2.join();
    // let wait2 be complete as well. 
    wait2.complete(null);
    
    // wait for failover to complete
    failoverFuture.join();
    
    // verify that entity is removed. 
    entities = hostIndex.getEntities(HOSTNAME).join();
    assertTrue(entities.isEmpty());
    
    // verify that the host is removed.
    Set<String> hosts = hostIndex.getHosts().join();
    assertTrue(hosts.isEmpty());
}
 
源代码16 项目: pravega   文件: ZKStreamMetadataStoreTest.java
@Test
public void listStreamsInScopes() throws Exception {
    // list stream in scope
    String scope = "scopeList";
    ZKStreamMetadataStore zkStore = spy((ZKStreamMetadataStore) store);
    
    store.createScope(scope).get();

    LinkedBlockingQueue<Integer> nextPositionList = new LinkedBlockingQueue<>();
    nextPositionList.put(0);
    nextPositionList.put(2);
    nextPositionList.put(10000);
    nextPositionList.put((int) Math.pow(10, 8));
    nextPositionList.put((int) Math.pow(10, 9));
    ZKScope myScope = spy((ZKScope) zkStore.getScope(scope));
    doAnswer(x -> CompletableFuture.completedFuture(nextPositionList.poll())).when(myScope).getNextStreamPosition();
    doAnswer(x -> myScope).when(zkStore).getScope(scope);
    
    String stream1 = "stream1";
    String stream2 = "stream2";
    String stream3 = "stream3";
    String stream4 = "stream4";
    String stream5 = "stream5";

    // add three streams and then list them. We should get 2 + 1 + 0
    zkStore.createStream(scope, stream1, configuration1, System.currentTimeMillis(), null, executor).get();
    zkStore.setState(scope, stream1, State.ACTIVE, null, executor).get();
    zkStore.createStream(scope, stream2, configuration2, System.currentTimeMillis(), null, executor).get();
    zkStore.setState(scope, stream2, State.ACTIVE, null, executor).get();
    zkStore.createStream(scope, stream3, configuration2, System.currentTimeMillis(), null, executor).get();
    zkStore.setState(scope, stream3, State.ACTIVE, null, executor).get();
    
    Pair<List<String>, String> streamInScope = store.listStream(scope, "", 2, executor).get();
    assertEquals("List streams in scope", 2, streamInScope.getKey().size());
    assertTrue(streamInScope.getKey().contains(stream1));
    assertTrue(streamInScope.getKey().contains(stream2));
    assertFalse(Strings.isNullOrEmpty(streamInScope.getValue()));

    streamInScope = store.listStream(scope, streamInScope.getValue(), 2, executor).get();
    assertEquals("List streams in scope", 1, streamInScope.getKey().size());
    assertTrue(streamInScope.getKey().contains(stream3));
    assertFalse(Strings.isNullOrEmpty(streamInScope.getValue()));

    streamInScope = store.listStream(scope, streamInScope.getValue(), 2, executor).get();
    assertEquals("List streams in scope", 0, streamInScope.getKey().size());
    assertFalse(Strings.isNullOrEmpty(streamInScope.getValue()));

    // add 4th stream
    zkStore.createStream(scope, stream4, configuration2, System.currentTimeMillis(), null, executor).get();
    zkStore.setState(scope, stream4, State.ACTIVE, null, executor).get();

    // list on previous token we should get 1 entry
    streamInScope = store.listStream(scope, streamInScope.getValue(), 2, executor).get();
    assertEquals("List streams in scope", 1, streamInScope.getKey().size());
    assertTrue(streamInScope.getKey().contains(stream4));
    assertFalse(Strings.isNullOrEmpty(streamInScope.getValue()));
 
    // add 5th stream
    zkStore.createStream(scope, stream5, configuration2, System.currentTimeMillis(), null, executor).get();
    zkStore.setState(scope, stream5, State.ACTIVE, null, executor).get();

    // delete stream 1
    store.deleteStream(scope, stream1, null, executor).join();

    // start listing with empty/default continuation token
    streamInScope = store.listStream(scope, "", 2, executor).get();
    assertEquals("List streams in scope", 2, streamInScope.getKey().size());
    assertTrue(streamInScope.getKey().contains(stream2));
    assertTrue(streamInScope.getKey().contains(stream3));
    assertFalse(Strings.isNullOrEmpty(streamInScope.getValue()));

    streamInScope = store.listStream(scope, streamInScope.getValue(), 2, executor).get();
    assertEquals("List streams in scope", 2, streamInScope.getKey().size());
    assertTrue(streamInScope.getKey().contains(stream4));
    assertTrue(streamInScope.getKey().contains(stream5));
    assertFalse(Strings.isNullOrEmpty(streamInScope.getValue()));

    // delete stream 3
    store.deleteStream(scope, stream3, null, executor).join();
    
    // start listing with empty/default continuation token
    streamInScope = store.listStream(scope, "", 2, executor).get();
    assertEquals("List streams in scope", 2, streamInScope.getKey().size());
    assertTrue(streamInScope.getKey().contains(stream2));
    assertTrue(streamInScope.getKey().contains(stream4));
    assertFalse(Strings.isNullOrEmpty(streamInScope.getValue()));

    streamInScope = store.listStream(scope, streamInScope.getValue(), 2, executor).get();
    assertEquals("List streams in scope", 1, streamInScope.getKey().size());
    assertTrue(streamInScope.getKey().contains(stream5));
    assertFalse(Strings.isNullOrEmpty(streamInScope.getValue()));
}
 
源代码17 项目: pulsar   文件: ProxyIsAHttpProxyTest.java
@Test
public void testStreaming() throws Exception {
    LinkedBlockingQueue<String> dataQueue = new LinkedBlockingQueue<>();
    Server streamingServer = new Server(0);
    streamingServer.setHandler(newStreamingHandler(dataQueue));
    streamingServer.start();

    Properties props = new Properties();
    props.setProperty("httpOutputBufferSize", "1");
    props.setProperty("httpReverseProxy.foobar.path", "/stream");
    props.setProperty("httpReverseProxy.foobar.proxyTo", streamingServer.getURI().toString());
    props.setProperty("servicePort", "0");
    props.setProperty("webServicePort", "0");

    ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
    AuthenticationService authService = new AuthenticationService(
            PulsarConfigurationLoader.convertFrom(proxyConfig));

    WebServer webServer = new WebServer(proxyConfig, authService);
    ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
                                             new BrokerDiscoveryProvider(proxyConfig, mockZooKeeperClientFactory));
    webServer.start();

    HttpClient httpClient = new HttpClient();
    httpClient.start();
    try {
        LinkedBlockingQueue<Byte> responses = new LinkedBlockingQueue<>();
        CompletableFuture<Result> promise = new CompletableFuture<>();
        httpClient.newRequest(webServer.getServiceUri()).path("/stream")
            .onResponseContent((response, content) -> {
                    while (content.hasRemaining()) {
                        try {
                            responses.put(content.get());
                        } catch (Exception e) {
                            log.error("Error reading response", e);
                            promise.completeExceptionally(e);
                        }
                    }
                })
            .send((result) -> {
                    log.info("Response complete");
                    promise.complete(result);
                });

        dataQueue.put("Some data");
        assertEventuallyTrue(() -> responses.size() == "Some data".length());
        Assert.assertEquals("Some data", drainToString(responses));
        Assert.assertFalse(promise.isDone());

        dataQueue.put("More data");
        assertEventuallyTrue(() -> responses.size() == "More data".length());
        Assert.assertEquals("More data", drainToString(responses));
        Assert.assertFalse(promise.isDone());

        dataQueue.put("DONE");
        assertEventuallyTrue(() -> promise.isDone());
        Assert.assertTrue(promise.get().isSucceeded());
    } finally {
        webServer.stop();
        httpClient.stop();
        streamingServer.stop();
    }
}
 
源代码18 项目: gameserver   文件: MinaMessageQueueTest.java
@Test
	public void testLocalSessionWrite() throws Exception {
		SessionRawMessage rawLocalMessage = createSessionRawMessage(40, 20000);
		
		MinaMessageQueue messageQueue = (MinaMessageQueue)MinaMessageQueue.getInstance();
		//Replace MinaMessageQueue's message queue.
		TestUtil.setPrivateFieldValue("localMessageServerId", messageQueue, "localhost:10000");
		ConcurrentHashMap<String, MessageClient> messageClientMap = 
				(ConcurrentHashMap<String, MessageClient>)
				TestUtil.getPrivateFieldValue("messageClientMap", messageQueue);
		
		LinkedBlockingQueue internalMessageQueue = (LinkedBlockingQueue)createMock(LinkedBlockingQueue.class);
		internalMessageQueue.put(anyObject());
		expectLastCall().andAnswer(new IAnswer<Object>() {
			@Override
			public Object answer() throws Throwable {
				Object message = getCurrentArguments()[0];
//				System.out.println("====="+message);
				assertNotNull(message);
				return null;
			}
		}).anyTimes();
		Object oldMessageQueue = TestUtil.getPrivateFieldValue("messageQueue", messageQueue);
		TestUtil.setPrivateFieldValue("messageQueue", messageQueue, internalMessageQueue);
		
		IoSession localSession = createNiceMock(IoSession.class);
		expect(localSession.getRemoteAddress()).andReturn(new InetSocketAddress("localhost", 20000)).anyTimes();
		

		MessageClient messageClient = createMock(MessageClient.class);
		messageClientMap.put("localhost:10001", messageClient);
		
		ArrayList<String> serverList = new ArrayList<String>();
		serverList.add("localhost:10002");
		serverList.add("localhost:10001");
		serverList.add("localhost:10000");
		messageQueue.setUpMessageClient(serverList);
		assertEquals(2, messageClientMap.keySet().size());
		
		replay(messageClient);
		replay(localSession);
		replay(internalMessageQueue);
		
		messageQueue.sessionWrite(rawLocalMessage.getSessionkey(), rawLocalMessage);
		
		verify(messageClient);
		verify(localSession);
		verify(internalMessageQueue);
		
		TestUtil.setPrivateFieldValue("messageQueue", messageQueue, oldMessageQueue);
	}
 
源代码19 项目: gameserver   文件: MinaMessageQueueTest.java
@SuppressWarnings("unchecked")
@Test
public void testRemoteSessionWrite() throws Exception {
	SessionRawMessage rawLocalMessage = createSessionRawMessage(40, 20000);
	SessionRawMessage rawRemoteMessage = createSessionRawMessage(40, 20001);
	
	MinaMessageQueue messageQueue = (MinaMessageQueue)MinaMessageQueue.getInstance();
	TestUtil.setPrivateFieldValue("localMessageServerId", messageQueue, "localhost:10000");
	ConcurrentHashMap<String, MessageClient> messageClientMap = 
			(ConcurrentHashMap<String, MessageClient>)
			TestUtil.getPrivateFieldValue("messageClientMap", messageQueue);
	
	LinkedBlockingQueue internalMessageQueue = (LinkedBlockingQueue)createNiceMock(LinkedBlockingQueue.class);
	internalMessageQueue.put(anyObject());
	expectLastCall().anyTimes();
	Object oldMessageQueue = TestUtil.getPrivateFieldValue("messageQueue", messageQueue);
	TestUtil.setPrivateFieldValue("messageQueue", messageQueue, internalMessageQueue);
	
	IoSession localSession = createNiceMock(IoSession.class);
	expect(localSession.getRemoteAddress()).andReturn(new InetSocketAddress("localhost", 20001)).anyTimes();
	

	MessageClient messageClient = createMock(MessageClient.class);
	messageClientMap.put("localhost:10001", messageClient);
	
	ArrayList<String> serverList = new ArrayList<String>();
	serverList.add("localhost:10002");
	serverList.add("localhost:10001");
	serverList.add("localhost:10000");
	messageQueue.setUpMessageClient(serverList);
	assertEquals(2, messageClientMap.keySet().size());
	
	replay(messageClient);
	replay(localSession);
	replay(internalMessageQueue);
	
	messageQueue.sessionWrite(rawLocalMessage.getSessionkey(), rawLocalMessage);
	
	verify(messageClient);
	verify(localSession);
	verify(internalMessageQueue);
	
	TestUtil.setPrivateFieldValue("messageQueue", messageQueue, oldMessageQueue);
}
 
源代码20 项目: gameserver   文件: MinaMessageQueueTest.java
@SuppressWarnings("unchecked")
@Test
public void testTryToCreateMessageClient() throws Exception {
	SessionRawMessage rawLocalMessage = createSessionRawMessage(40, 20000);
	SessionRawMessage rawRemoteMessage = createSessionRawMessage(40, 20001);
	
	MinaMessageQueue messageQueue = (MinaMessageQueue)MinaMessageQueue.getInstance();
	TestUtil.setPrivateFieldValue("localMessageServerId", messageQueue, "localhost:10000");
	ConcurrentHashMap<String, MessageClient> messageClientMap = 
			(ConcurrentHashMap<String, MessageClient>)
			TestUtil.getPrivateFieldValue("messageClientMap", messageQueue);
	messageClientMap.clear();
	
	LinkedBlockingQueue internalMessageQueue = (LinkedBlockingQueue)createNiceMock(LinkedBlockingQueue.class);
	internalMessageQueue.put(anyObject());
	expectLastCall().anyTimes();
	Object oldMessageQueue = TestUtil.getPrivateFieldValue("messageQueue", messageQueue);
	TestUtil.setPrivateFieldValue("messageQueue", messageQueue, internalMessageQueue);
	
	GameContext gameContext = GameContext.getTestInstance();
	SessionManager sessionManager = createNiceMock(SessionManager.class);
	//Create the fake machine id
	String fakeMachineId = "www.baidu.com:80";
	expect(sessionManager.findSessionMachineId(anyObject(SessionKey.class))).andReturn(fakeMachineId);
	TestUtil.setPrivateFieldValue("sessionManager", gameContext, sessionManager);
	
	MessageClient messageClient = createMock(MessageClient.class);
	
	assertEquals(0, messageClientMap.keySet().size());
	
	replay(messageClient);
	replay(internalMessageQueue);
	replay(sessionManager);
	
	messageQueue.sessionWrite(rawLocalMessage.getSessionkey(), rawLocalMessage);
	
	verify(messageClient);
	verify(internalMessageQueue);
	verify(sessionManager);
	
	assertEquals(1, messageClientMap.keySet().size());
	assertTrue(messageClientMap.get(fakeMachineId) != null );
	
	TestUtil.setPrivateFieldValue("messageQueue", messageQueue, oldMessageQueue);
}