下面列出了java.util.concurrent.LinkedBlockingQueue#put ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* drainTo empties full queue, unblocking a waiting put.
*/
public void testDrainToWithActivePut() throws InterruptedException {
final LinkedBlockingQueue q = populatedQueue(SIZE);
Thread t = new Thread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
q.put(new Integer(SIZE + 1));
}});
t.start();
ArrayList l = new ArrayList();
q.drainTo(l);
assertTrue(l.size() >= SIZE);
for (int i = 0; i < SIZE; ++i)
assertEquals(l.get(i), new Integer(i));
t.join();
assertTrue(q.size() + l.size() >= SIZE);
}
protected void queueResultsWithoutDeepFetch(List<T> accumulator, LinkedBlockingQueue<TL> listQueue, Object shardId)
{
TL list = (TL) finderInstance.constructEmptyList();
list.addAll(accumulator);
try
{
listQueue.put(list); // must not touch tradeList after queuing, as another thread may be manipulating it.
String msg = "";
if (shardId != null)
{
msg = " for source " + shardId;
}
LOGGER.info("queued " + accumulator.size() + msg);
totalQueued.addAndGet(accumulator.size());
}
catch (InterruptedException e)
{
throw new RuntimeException("Unexpected exception", e);
}
}
/**
* drainTo empties full queue, unblocking a waiting put.
*/
public void testDrainToWithActivePut() throws InterruptedException {
final LinkedBlockingQueue q = populatedQueue(SIZE);
Thread t = new Thread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
q.put(new Integer(SIZE + 1));
}});
t.start();
ArrayList l = new ArrayList();
q.drainTo(l);
assertTrue(l.size() >= SIZE);
for (int i = 0; i < SIZE; ++i)
assertEquals(l.get(i), new Integer(i));
t.join();
assertTrue(q.size() + l.size() >= SIZE);
}
private void handleReceiveCompleteOrErrorMessage(SmartContractMessage message, String txId) {
if (StringUtils.isEmpty(txId)) {
return;
}
try {
LinkedBlockingQueue queue = txIdAndQueueMap.get(txId);
if (queue == null) {
return;
}
queue.put(message);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
/**
* all elements successfully put are contained
*/
public void testPut() throws InterruptedException {
LinkedBlockingQueue q = new LinkedBlockingQueue(SIZE);
for (int i = 0; i < SIZE; ++i) {
Integer x = new Integer(i);
q.put(x);
assertTrue(q.contains(x));
}
assertEquals(0, q.remainingCapacity());
}
private void putWithoutException(LinkedBlockingQueue queue, Object o)
{
while(true)
{
try
{
queue.put(o);
break;
}
catch (InterruptedException e)
{
//ignore - try again
}
}
}
public <T extends SlackApiResponse> void enqueue(
String messageId,
String teamId,
String methodName,
Map<String, String> params,
AsyncExecutionSupplier<T> methodsSupplier) throws InterruptedException {
AsyncMethodsRateLimiter.WaitTime waitTime;
if (methodName.equals(Methods.CHAT_POST_MESSAGE)) {
waitTime = rateLimiter.acquireWaitTimeForChatPostMessage(teamId, params.get("channel"));
} else {
waitTime = rateLimiter.acquireWaitTime(teamId, methodName);
}
LinkedBlockingQueue<Message> activeQueue = getOrCreateActiveQueue(methodName);
long epochMillisToRun = System.currentTimeMillis() + waitTime.getMillisToWait();
Message message = new Message(messageId, epochMillisToRun, waitTime, methodsSupplier);
activeQueue.put(message);
if (log.isDebugEnabled()) {
log.debug("A new message has been enqueued (id: {}, pace: {}, wait time: {})",
message.getId(),
message.getWaitTime().getPace(),
message.getWaitTime().getMillisToWait()
);
}
}
@Override
public boolean push(Request req) {
try {
LinkedBlockingQueue<Request> queue = getQueue(req.getCrawlerName());
queue.put(req);
return true;
} catch (InterruptedException e) {
logger.error(e.getMessage(),e);
}
return false;
}
public void stateChanged(Object newState, AbstractActivity<?> activity)
{
System.out.println("TaskActivity: conversation state changed to "
+ newState + " while in " + getState());
StateType interestedState = conversationQueueMaping.get(newState);
LinkedBlockingQueue<AbstractActivity<?>> queue = activityQueues.get(interestedState);
if (queue != null)
{
try
{
System.out.println("queueing message for " + interestedState);
queue.put(activity);
}
catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
else
{
System.out.println("can not find queue for " + interestedState);
}
}
/**
* all elements successfully put are contained
*/
public void testPut() throws InterruptedException {
LinkedBlockingQueue q = new LinkedBlockingQueue(SIZE);
for (int i = 0; i < SIZE; ++i) {
Integer x = new Integer(i);
q.put(x);
assertTrue(q.contains(x));
}
assertEquals(0, q.remainingCapacity());
}
private void testSplit(String[] values, NikobusCommand[] commands) throws InterruptedException {
NikobusCommandReceiver parser = new NikobusCommandReceiver(null);
final List<NikobusCommand> receivedCmds = new ArrayList<NikobusCommand>();
LinkedBlockingQueue<byte[]> queue = new LinkedBlockingQueue<byte[]>();
parser.setBufferQueue(queue);
parser.register(new NikobusCommandListener() {
@Override
public void processNikobusCommand(NikobusCommand command, NikobusBinding binding) {
receivedCmds.add(new NikobusCommand(command.getCommand(), command.getRepeats()));
}
@Override
public String getName() {
return "dummy";
}
});
for (String s : values) {
byte[] bb = s.getBytes();
queue.put(bb);
}
Thread t = new Thread(parser);
t.start();
Thread.sleep(1000);
t.interrupt();
for (int i = 0; i < commands.length; i++) {
NikobusCommand c = receivedCmds.get(i);
assertEquals(commands[i].getCommand(), c.getCommand());
assertEquals(commands[i].getRepeats(), c.getRepeats());
}
assertEquals(commands.length, receivedCmds.size());
}
public void handle(TezAbstractEvent event) {
if (stopped) {
return;
}
if (blockNewEvents) {
return;
}
drained = false;
// offload to specific dispatcher if one exists
Class<? extends Enum> type = event.getType().getDeclaringClass();
AsyncDispatcherConcurrent registeredDispatcher = eventDispatchers.get(type);
if (registeredDispatcher != null) {
registeredDispatcher.getEventHandler().handle(event);
return;
}
int index = numThreads > 1 ? event.getSerializingHash() % numThreads : 0;
// no registered dispatcher. use internal dispatcher.
LinkedBlockingQueue<Event> queue = eventQueues.get(index);
/* all this method does is enqueue all the events onto the queue */
int qSize = queue.size();
if (qSize !=0 && qSize %1000 == 0) {
LOG.info("Size of event-queue is " + qSize);
}
int remCapacity = queue.remainingCapacity();
if (remCapacity < 1000) {
LOG.warn("Very low remaining capacity in the event-queue: "
+ remCapacity);
}
try {
queue.put(event);
} catch (InterruptedException e) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", e);
}
throw new YarnRuntimeException(e);
}
}
@Override
public String call() throws Exception
{
LOG.debug("Start time={}, End time={}", Const.tsFormat(query.startTime()),
Const.tsFormat(query.endTime()));
String metricName = query.getQueries().get(0).getMetric();
String cacheResult = JedisClient.get().get(this.query.toString());
if (cacheResult != null) {
LOG.debug("Cache hit for start=" + query.startTime()
+ ", end=" + query.endTime() + ", metric=" + metricName);
return cacheResult;
}
String hostname = checker.getBestRegionHost(metricName,
query.startTime() / 1000, query.endTime() / 1000);
LOG.debug("Found region server hostname={} for metric={}", hostname, metricName);
LinkedBlockingQueue<String> TSDs;
if (hostname == null) {
LOG.error("Could not find region server for metric={}", metricName);
return "{'error': 'Could not find region server for metric=" + metricName + "'}";
}
TSDs = TSDMap.get(hostname);
if (TSDs == null) {
String host = select(); // randomly select a host (basic load balancing)
TSDs = TSDMap.get(host);
if (TSDs == null) {
LOG.error("We are not running TSDs on regionserver={}. Fallback failed. Returning error", hostname);
return "{'error': 'Fallback to hostname=" + hostname + " failed.'}";
} else {
LOG.info("Falling back to " + host + " for queries");
}
}
String server = TSDs.take();
String uri = "http://" + server + "/api/query/qexp/";
CloseableHttpClient postman = HttpClientBuilder.create().build();
try {
HttpPost postRequest = new HttpPost(uri);
StringEntity input = new StringEntity(JSON.serializeToString(query));
input.setContentType("application/json");
postRequest.setEntity(input);
LOG.debug("Sending request to: {} for query {} ", uri, query);
HttpResponse response = postman.execute(postRequest);
if (response.getStatusLine().getStatusCode() != 200) {
throw new RuntimeException("Failed : HTTP error code : "
+ response.getStatusLine().getStatusCode());
}
List<String> dl = IOUtils.readLines(response.getEntity().getContent());
String result = StringUtils.join(dl, "");
LOG.debug("Result={}", result);
if (isCacheable(query)) {
JedisClient.get().put(this.query.toString(), result);
}
return result;
} finally {
IOUtils.closeQuietly(postman);
TSDs.put(server);
LOG.debug("Returned {} into the available queue", server);
}
}
@Override
public String call() throws Exception {
LinkedBlockingQueue<String> TSDs;
//TODO: have it implement its own RegionChecker to get hbase locality looking for metric names
//lets have it just pick a random host
String hostname = getRandomHost();
TSDs = HttpWorker.TSDMap.get(hostname);
if (TSDs == null) {
LOG.error("We are not running TSDs on regionserver={}. Choosing a random host failed", hostname);
return "{'error': 'Choice of hostname=" + hostname + " failed.'}";
}
String server = TSDs.take();
String uri = "http://" + server + "/api/suggest?" + suggestQuery;
CloseableHttpClient postman = HttpClientBuilder.create().build();
try {
HttpGet getRequest = new HttpGet(uri);
LOG.info("Sending query=" + uri + " to TSD running on host=" + hostname);
HttpResponse response = postman.execute(getRequest);
if (response.getStatusLine().getStatusCode() != 200) {
throw new RuntimeException("Failed : HTTP error code : "
+ response.getStatusLine().getStatusCode());
}
List<String> dl = IOUtils.readLines(response.getEntity().getContent());
String result = StringUtils.join(dl, "");
LOG.info("Result={}", result);
return result;
} finally {
IOUtils.closeQuietly(postman);
TSDs.put(server);
LOG.info("Returned {} into the available queue", server);
}
}
@Test(timeout = 30000)
public void testRequestSweeper() throws ExecutionException, InterruptedException {
AbstractMap.SimpleEntry<Double, Double> segment1 = new AbstractMap.SimpleEntry<>(0.5, 0.75);
AbstractMap.SimpleEntry<Double, Double> segment2 = new AbstractMap.SimpleEntry<>(0.75, 1.0);
List<Long> sealedSegments = Collections.singletonList(1L);
CompletableFuture<Void> wait1 = new CompletableFuture<>();
CompletableFuture<Void> wait2 = new CompletableFuture<>();
LinkedBlockingQueue<CompletableFuture<Void>> waitQueue = new LinkedBlockingQueue<>();
waitQueue.put(wait1);
waitQueue.put(wait2);
CompletableFuture<Void> signal1 = new CompletableFuture<>();
CompletableFuture<Void> signal2 = new CompletableFuture<>();
LinkedBlockingQueue<CompletableFuture<Void>> signalQueue = new LinkedBlockingQueue<>();
signalQueue.put(signal1);
signalQueue.put(signal2);
doAnswer(x -> {
signalQueue.take().complete(null);
return waitQueue.take();
}).when(requestEventWriter).writeEvent(any(), any());
streamMetadataTasks.manualScale(SCOPE, stream1, sealedSegments, Arrays.asList(segment1, segment2),
System.currentTimeMillis(), null);
signal1.join();
// since we dont complete writeEventFuture, manual scale will not complete and index is not removed
// verify that index has the entry.
HostIndex hostIndex = getHostIndex();
List<String> entities = hostIndex.getEntities(HOSTNAME).join();
assertEquals(1, entities.size());
byte[] data = hostIndex.getEntityData(HOSTNAME, entities.get(0)).join();
ControllerEventSerializer serializer = new ControllerEventSerializer();
ControllerEvent event = serializer.fromByteBuffer(ByteBuffer.wrap(data));
assertTrue(event instanceof ScaleOpEvent);
RequestSweeper requestSweeper = new RequestSweeper(streamStore, executor, streamMetadataTasks);
CompletableFuture<Void> failoverFuture = requestSweeper.handleFailedProcess(HOSTNAME);
// verify that the event is posted.. signal 2 future should be completed.
signal2.join();
// let wait2 be complete as well.
wait2.complete(null);
// wait for failover to complete
failoverFuture.join();
// verify that entity is removed.
entities = hostIndex.getEntities(HOSTNAME).join();
assertTrue(entities.isEmpty());
// verify that the host is removed.
Set<String> hosts = hostIndex.getHosts().join();
assertTrue(hosts.isEmpty());
}
@Test
public void listStreamsInScopes() throws Exception {
// list stream in scope
String scope = "scopeList";
ZKStreamMetadataStore zkStore = spy((ZKStreamMetadataStore) store);
store.createScope(scope).get();
LinkedBlockingQueue<Integer> nextPositionList = new LinkedBlockingQueue<>();
nextPositionList.put(0);
nextPositionList.put(2);
nextPositionList.put(10000);
nextPositionList.put((int) Math.pow(10, 8));
nextPositionList.put((int) Math.pow(10, 9));
ZKScope myScope = spy((ZKScope) zkStore.getScope(scope));
doAnswer(x -> CompletableFuture.completedFuture(nextPositionList.poll())).when(myScope).getNextStreamPosition();
doAnswer(x -> myScope).when(zkStore).getScope(scope);
String stream1 = "stream1";
String stream2 = "stream2";
String stream3 = "stream3";
String stream4 = "stream4";
String stream5 = "stream5";
// add three streams and then list them. We should get 2 + 1 + 0
zkStore.createStream(scope, stream1, configuration1, System.currentTimeMillis(), null, executor).get();
zkStore.setState(scope, stream1, State.ACTIVE, null, executor).get();
zkStore.createStream(scope, stream2, configuration2, System.currentTimeMillis(), null, executor).get();
zkStore.setState(scope, stream2, State.ACTIVE, null, executor).get();
zkStore.createStream(scope, stream3, configuration2, System.currentTimeMillis(), null, executor).get();
zkStore.setState(scope, stream3, State.ACTIVE, null, executor).get();
Pair<List<String>, String> streamInScope = store.listStream(scope, "", 2, executor).get();
assertEquals("List streams in scope", 2, streamInScope.getKey().size());
assertTrue(streamInScope.getKey().contains(stream1));
assertTrue(streamInScope.getKey().contains(stream2));
assertFalse(Strings.isNullOrEmpty(streamInScope.getValue()));
streamInScope = store.listStream(scope, streamInScope.getValue(), 2, executor).get();
assertEquals("List streams in scope", 1, streamInScope.getKey().size());
assertTrue(streamInScope.getKey().contains(stream3));
assertFalse(Strings.isNullOrEmpty(streamInScope.getValue()));
streamInScope = store.listStream(scope, streamInScope.getValue(), 2, executor).get();
assertEquals("List streams in scope", 0, streamInScope.getKey().size());
assertFalse(Strings.isNullOrEmpty(streamInScope.getValue()));
// add 4th stream
zkStore.createStream(scope, stream4, configuration2, System.currentTimeMillis(), null, executor).get();
zkStore.setState(scope, stream4, State.ACTIVE, null, executor).get();
// list on previous token we should get 1 entry
streamInScope = store.listStream(scope, streamInScope.getValue(), 2, executor).get();
assertEquals("List streams in scope", 1, streamInScope.getKey().size());
assertTrue(streamInScope.getKey().contains(stream4));
assertFalse(Strings.isNullOrEmpty(streamInScope.getValue()));
// add 5th stream
zkStore.createStream(scope, stream5, configuration2, System.currentTimeMillis(), null, executor).get();
zkStore.setState(scope, stream5, State.ACTIVE, null, executor).get();
// delete stream 1
store.deleteStream(scope, stream1, null, executor).join();
// start listing with empty/default continuation token
streamInScope = store.listStream(scope, "", 2, executor).get();
assertEquals("List streams in scope", 2, streamInScope.getKey().size());
assertTrue(streamInScope.getKey().contains(stream2));
assertTrue(streamInScope.getKey().contains(stream3));
assertFalse(Strings.isNullOrEmpty(streamInScope.getValue()));
streamInScope = store.listStream(scope, streamInScope.getValue(), 2, executor).get();
assertEquals("List streams in scope", 2, streamInScope.getKey().size());
assertTrue(streamInScope.getKey().contains(stream4));
assertTrue(streamInScope.getKey().contains(stream5));
assertFalse(Strings.isNullOrEmpty(streamInScope.getValue()));
// delete stream 3
store.deleteStream(scope, stream3, null, executor).join();
// start listing with empty/default continuation token
streamInScope = store.listStream(scope, "", 2, executor).get();
assertEquals("List streams in scope", 2, streamInScope.getKey().size());
assertTrue(streamInScope.getKey().contains(stream2));
assertTrue(streamInScope.getKey().contains(stream4));
assertFalse(Strings.isNullOrEmpty(streamInScope.getValue()));
streamInScope = store.listStream(scope, streamInScope.getValue(), 2, executor).get();
assertEquals("List streams in scope", 1, streamInScope.getKey().size());
assertTrue(streamInScope.getKey().contains(stream5));
assertFalse(Strings.isNullOrEmpty(streamInScope.getValue()));
}
@Test
public void testStreaming() throws Exception {
LinkedBlockingQueue<String> dataQueue = new LinkedBlockingQueue<>();
Server streamingServer = new Server(0);
streamingServer.setHandler(newStreamingHandler(dataQueue));
streamingServer.start();
Properties props = new Properties();
props.setProperty("httpOutputBufferSize", "1");
props.setProperty("httpReverseProxy.foobar.path", "/stream");
props.setProperty("httpReverseProxy.foobar.proxyTo", streamingServer.getURI().toString());
props.setProperty("servicePort", "0");
props.setProperty("webServicePort", "0");
ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class);
AuthenticationService authService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
WebServer webServer = new WebServer(proxyConfig, authService);
ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null,
new BrokerDiscoveryProvider(proxyConfig, mockZooKeeperClientFactory));
webServer.start();
HttpClient httpClient = new HttpClient();
httpClient.start();
try {
LinkedBlockingQueue<Byte> responses = new LinkedBlockingQueue<>();
CompletableFuture<Result> promise = new CompletableFuture<>();
httpClient.newRequest(webServer.getServiceUri()).path("/stream")
.onResponseContent((response, content) -> {
while (content.hasRemaining()) {
try {
responses.put(content.get());
} catch (Exception e) {
log.error("Error reading response", e);
promise.completeExceptionally(e);
}
}
})
.send((result) -> {
log.info("Response complete");
promise.complete(result);
});
dataQueue.put("Some data");
assertEventuallyTrue(() -> responses.size() == "Some data".length());
Assert.assertEquals("Some data", drainToString(responses));
Assert.assertFalse(promise.isDone());
dataQueue.put("More data");
assertEventuallyTrue(() -> responses.size() == "More data".length());
Assert.assertEquals("More data", drainToString(responses));
Assert.assertFalse(promise.isDone());
dataQueue.put("DONE");
assertEventuallyTrue(() -> promise.isDone());
Assert.assertTrue(promise.get().isSucceeded());
} finally {
webServer.stop();
httpClient.stop();
streamingServer.stop();
}
}
@Test
public void testLocalSessionWrite() throws Exception {
SessionRawMessage rawLocalMessage = createSessionRawMessage(40, 20000);
MinaMessageQueue messageQueue = (MinaMessageQueue)MinaMessageQueue.getInstance();
//Replace MinaMessageQueue's message queue.
TestUtil.setPrivateFieldValue("localMessageServerId", messageQueue, "localhost:10000");
ConcurrentHashMap<String, MessageClient> messageClientMap =
(ConcurrentHashMap<String, MessageClient>)
TestUtil.getPrivateFieldValue("messageClientMap", messageQueue);
LinkedBlockingQueue internalMessageQueue = (LinkedBlockingQueue)createMock(LinkedBlockingQueue.class);
internalMessageQueue.put(anyObject());
expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
Object message = getCurrentArguments()[0];
// System.out.println("====="+message);
assertNotNull(message);
return null;
}
}).anyTimes();
Object oldMessageQueue = TestUtil.getPrivateFieldValue("messageQueue", messageQueue);
TestUtil.setPrivateFieldValue("messageQueue", messageQueue, internalMessageQueue);
IoSession localSession = createNiceMock(IoSession.class);
expect(localSession.getRemoteAddress()).andReturn(new InetSocketAddress("localhost", 20000)).anyTimes();
MessageClient messageClient = createMock(MessageClient.class);
messageClientMap.put("localhost:10001", messageClient);
ArrayList<String> serverList = new ArrayList<String>();
serverList.add("localhost:10002");
serverList.add("localhost:10001");
serverList.add("localhost:10000");
messageQueue.setUpMessageClient(serverList);
assertEquals(2, messageClientMap.keySet().size());
replay(messageClient);
replay(localSession);
replay(internalMessageQueue);
messageQueue.sessionWrite(rawLocalMessage.getSessionkey(), rawLocalMessage);
verify(messageClient);
verify(localSession);
verify(internalMessageQueue);
TestUtil.setPrivateFieldValue("messageQueue", messageQueue, oldMessageQueue);
}
@SuppressWarnings("unchecked")
@Test
public void testRemoteSessionWrite() throws Exception {
SessionRawMessage rawLocalMessage = createSessionRawMessage(40, 20000);
SessionRawMessage rawRemoteMessage = createSessionRawMessage(40, 20001);
MinaMessageQueue messageQueue = (MinaMessageQueue)MinaMessageQueue.getInstance();
TestUtil.setPrivateFieldValue("localMessageServerId", messageQueue, "localhost:10000");
ConcurrentHashMap<String, MessageClient> messageClientMap =
(ConcurrentHashMap<String, MessageClient>)
TestUtil.getPrivateFieldValue("messageClientMap", messageQueue);
LinkedBlockingQueue internalMessageQueue = (LinkedBlockingQueue)createNiceMock(LinkedBlockingQueue.class);
internalMessageQueue.put(anyObject());
expectLastCall().anyTimes();
Object oldMessageQueue = TestUtil.getPrivateFieldValue("messageQueue", messageQueue);
TestUtil.setPrivateFieldValue("messageQueue", messageQueue, internalMessageQueue);
IoSession localSession = createNiceMock(IoSession.class);
expect(localSession.getRemoteAddress()).andReturn(new InetSocketAddress("localhost", 20001)).anyTimes();
MessageClient messageClient = createMock(MessageClient.class);
messageClientMap.put("localhost:10001", messageClient);
ArrayList<String> serverList = new ArrayList<String>();
serverList.add("localhost:10002");
serverList.add("localhost:10001");
serverList.add("localhost:10000");
messageQueue.setUpMessageClient(serverList);
assertEquals(2, messageClientMap.keySet().size());
replay(messageClient);
replay(localSession);
replay(internalMessageQueue);
messageQueue.sessionWrite(rawLocalMessage.getSessionkey(), rawLocalMessage);
verify(messageClient);
verify(localSession);
verify(internalMessageQueue);
TestUtil.setPrivateFieldValue("messageQueue", messageQueue, oldMessageQueue);
}
@SuppressWarnings("unchecked")
@Test
public void testTryToCreateMessageClient() throws Exception {
SessionRawMessage rawLocalMessage = createSessionRawMessage(40, 20000);
SessionRawMessage rawRemoteMessage = createSessionRawMessage(40, 20001);
MinaMessageQueue messageQueue = (MinaMessageQueue)MinaMessageQueue.getInstance();
TestUtil.setPrivateFieldValue("localMessageServerId", messageQueue, "localhost:10000");
ConcurrentHashMap<String, MessageClient> messageClientMap =
(ConcurrentHashMap<String, MessageClient>)
TestUtil.getPrivateFieldValue("messageClientMap", messageQueue);
messageClientMap.clear();
LinkedBlockingQueue internalMessageQueue = (LinkedBlockingQueue)createNiceMock(LinkedBlockingQueue.class);
internalMessageQueue.put(anyObject());
expectLastCall().anyTimes();
Object oldMessageQueue = TestUtil.getPrivateFieldValue("messageQueue", messageQueue);
TestUtil.setPrivateFieldValue("messageQueue", messageQueue, internalMessageQueue);
GameContext gameContext = GameContext.getTestInstance();
SessionManager sessionManager = createNiceMock(SessionManager.class);
//Create the fake machine id
String fakeMachineId = "www.baidu.com:80";
expect(sessionManager.findSessionMachineId(anyObject(SessionKey.class))).andReturn(fakeMachineId);
TestUtil.setPrivateFieldValue("sessionManager", gameContext, sessionManager);
MessageClient messageClient = createMock(MessageClient.class);
assertEquals(0, messageClientMap.keySet().size());
replay(messageClient);
replay(internalMessageQueue);
replay(sessionManager);
messageQueue.sessionWrite(rawLocalMessage.getSessionkey(), rawLocalMessage);
verify(messageClient);
verify(internalMessageQueue);
verify(sessionManager);
assertEquals(1, messageClientMap.keySet().size());
assertTrue(messageClientMap.get(fakeMachineId) != null );
TestUtil.setPrivateFieldValue("messageQueue", messageQueue, oldMessageQueue);
}