下面列出了怎么用org.springframework.util.concurrent.SettableListenableFuture的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public ListenableFuture<WebSocketSession> connect(TransportRequest request, WebSocketHandler handler) {
final SettableListenableFuture<WebSocketSession> future = new SettableListenableFuture<>();
WebSocketClientSockJsSession session = new WebSocketClientSockJsSession(request, handler, future);
handler = new ClientSockJsWebSocketHandler(session);
request.addTimeoutTask(session.getTimeoutTask());
URI url = request.getTransportUrl();
WebSocketHttpHeaders headers = new WebSocketHttpHeaders(request.getHandshakeHeaders());
if (logger.isDebugEnabled()) {
logger.debug("Starting WebSocket session on " + url);
}
this.webSocketClient.doHandshake(handler, headers, url).addCallback(
new ListenableFutureCallback<WebSocketSession>() {
@Override
public void onSuccess(@Nullable WebSocketSession webSocketSession) {
// WebSocket session ready, SockJS Session not yet
}
@Override
public void onFailure(Throwable ex) {
future.setException(ex);
}
});
return future;
}
@Override
public final ListenableFuture<WebSocketSession> doHandshake(
WebSocketHandler handler, @Nullable WebSocketHttpHeaders headers, URI url) {
Assert.notNull(handler, "WebSocketHandler is required");
Assert.notNull(url, "URL is required");
String scheme = url.getScheme();
if (!supportedProtocols.contains(scheme)) {
throw new IllegalArgumentException("Invalid scheme: '" + scheme + "'");
}
SettableListenableFuture<WebSocketSession> connectFuture = new SettableListenableFuture<>();
try {
SockJsUrlInfo sockJsUrlInfo = new SockJsUrlInfo(url);
ServerInfo serverInfo = getServerInfo(sockJsUrlInfo, getHttpRequestHeaders(headers));
createRequest(sockJsUrlInfo, headers, serverInfo).connect(handler, connectFuture);
}
catch (Throwable exception) {
if (logger.isErrorEnabled()) {
logger.error("Initial SockJS \"Info\" request to server failed, url=" + url, exception);
}
connectFuture.setException(exception);
}
return connectFuture;
}
@Override
public ListenableFuture<WebSocketSession> connect(TransportRequest request, WebSocketHandler handler) {
SettableListenableFuture<WebSocketSession> connectFuture = new SettableListenableFuture<>();
XhrClientSockJsSession session = new XhrClientSockJsSession(request, handler, this, connectFuture);
request.addTimeoutTask(session.getTimeoutTask());
URI receiveUrl = request.getTransportUrl();
if (logger.isDebugEnabled()) {
logger.debug("Starting XHR " +
(isXhrStreamingDisabled() ? "Polling" : "Streaming") + "session url=" + receiveUrl);
}
HttpHeaders handshakeHeaders = new HttpHeaders();
handshakeHeaders.putAll(request.getHandshakeHeaders());
connectInternal(request, handler, receiveUrl, handshakeHeaders, session, connectFuture);
return connectFuture;
}
@Override
public ListenableFuture<Void> send(Message<byte[]> message) {
updateLastWriteTime();
SettableListenableFuture<Void> future = new SettableListenableFuture<>();
try {
WebSocketSession session = this.session;
Assert.state(session != null, "No WebSocketSession available");
session.sendMessage(this.codec.encode(message, session.getClass()));
future.set(null);
}
catch (Throwable ex) {
future.setException(ex);
}
finally {
updateLastWriteTime();
}
return future;
}
@Override
protected ListenableFuture<ClientHttpResponse> executeInternal(final HttpHeaders headers) throws IOException {
final SettableListenableFuture<ClientHttpResponse> responseFuture = new SettableListenableFuture<>();
ChannelFutureListener connectionListener = future -> {
if (future.isSuccess()) {
Channel channel = future.channel();
channel.pipeline().addLast(new RequestExecuteHandler(responseFuture));
FullHttpRequest nettyRequest = createFullHttpRequest(headers);
channel.writeAndFlush(nettyRequest);
}
else {
responseFuture.setException(future.cause());
}
};
this.bootstrap.connect(this.uri.getHost(), getPort(this.uri)).addListener(connectionListener);
return responseFuture;
}
@Test
public void sendWithExecutionException() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
IllegalStateException exception = new IllegalStateException("simulated exception");
SettableListenableFuture<Void> future = new SettableListenableFuture<>();
future.setException(exception);
when(this.connection.send(any())).thenReturn(future);
this.expected.expect(MessageDeliveryException.class);
this.expected.expectCause(Matchers.sameInstance(exception));
this.session.send("/topic/foo", "sample payload".getBytes(StandardCharsets.UTF_8));
verifyNoMoreInteractions(this.connection);
}
protected ListenableFuture<ClientHttpResponse> executeInternal(final HttpHeaders headers) {
final SettableListenableFuture<ClientHttpResponse> responseFuture = new SettableListenableFuture<>();
ChannelFutureListener connectionListener = future -> {
if (future.isSuccess()) {
Channel channel = future.channel();
channel.pipeline().addLast(new NettyResponseHandler(responseFuture));
FullHttpRequest nettyRequest = createFullHttpRequest(headers);
channel.writeAndFlush(nettyRequest);
}
else {
responseFuture.setException(future.cause());
}
};
this.bootstrap.connect(this.uri.getHost(), getPort(this.uri)).addListener(connectionListener);
return responseFuture;
}
@Override
public ListenableFuture<Void> send(Message<byte[]> message) {
updateLastWriteTime();
SettableListenableFuture<Void> future = new SettableListenableFuture<Void>();
try {
this.session.sendMessage(this.codec.encode(message, this.session.getClass()));
future.set(null);
}
catch (Throwable ex) {
future.setException(ex);
}
finally {
updateLastWriteTime();
}
return future;
}
@Override
public final ListenableFuture<WebSocketSession> doHandshake(
WebSocketHandler handler, @Nullable WebSocketHttpHeaders headers, URI url) {
Assert.notNull(handler, "WebSocketHandler is required");
Assert.notNull(url, "URL is required");
String scheme = url.getScheme();
if (!supportedProtocols.contains(scheme)) {
throw new IllegalArgumentException("Invalid scheme: '" + scheme + "'");
}
SettableListenableFuture<WebSocketSession> connectFuture = new SettableListenableFuture<>();
try {
SockJsUrlInfo sockJsUrlInfo = new SockJsUrlInfo(url);
ServerInfo serverInfo = getServerInfo(sockJsUrlInfo, getHttpRequestHeaders(headers));
createRequest(sockJsUrlInfo, headers, serverInfo).connect(handler, connectFuture);
}
catch (Throwable exception) {
if (logger.isErrorEnabled()) {
logger.error("Initial SockJS \"Info\" request to server failed, url=" + url, exception);
}
connectFuture.setException(exception);
}
return connectFuture;
}
@Override
public ListenableFuture<WebSocketSession> connect(TransportRequest request, WebSocketHandler handler) {
SettableListenableFuture<WebSocketSession> connectFuture = new SettableListenableFuture<>();
XhrClientSockJsSession session = new XhrClientSockJsSession(request, handler, this, connectFuture);
request.addTimeoutTask(session.getTimeoutTask());
URI receiveUrl = request.getTransportUrl();
if (logger.isDebugEnabled()) {
logger.debug("Starting XHR " +
(isXhrStreamingDisabled() ? "Polling" : "Streaming") + "session url=" + receiveUrl);
}
HttpHeaders handshakeHeaders = new HttpHeaders();
handshakeHeaders.putAll(request.getHandshakeHeaders());
connectInternal(request, handler, receiveUrl, handshakeHeaders, session, connectFuture);
return connectFuture;
}
@Override
protected ListenableFuture<ClientHttpResponse> executeInternal(final HttpHeaders headers) throws IOException {
final SettableListenableFuture<ClientHttpResponse> responseFuture = new SettableListenableFuture<>();
ChannelFutureListener connectionListener = future -> {
if (future.isSuccess()) {
Channel channel = future.channel();
channel.pipeline().addLast(new RequestExecuteHandler(responseFuture));
FullHttpRequest nettyRequest = createFullHttpRequest(headers);
channel.writeAndFlush(nettyRequest);
}
else {
responseFuture.setException(future.cause());
}
};
this.bootstrap.connect(this.uri.getHost(), getPort(this.uri)).addListener(connectionListener);
return responseFuture;
}
@Override
protected ListenableFuture<ClientHttpResponse> executeInternal(final HttpHeaders headers) throws IOException {
final SettableListenableFuture<ClientHttpResponse> responseFuture =
new SettableListenableFuture<ClientHttpResponse>();
ChannelFutureListener connectionListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
Channel channel = future.channel();
channel.pipeline().addLast(new RequestExecuteHandler(responseFuture));
FullHttpRequest nettyRequest = createFullHttpRequest(headers);
channel.writeAndFlush(nettyRequest);
}
else {
responseFuture.setException(future.cause());
}
}
};
this.bootstrap.connect(this.uri.getHost(), getPort(this.uri)).addListener(connectionListener);
return responseFuture;
}
/**
* Pulls messages asynchronously, on demand, using the pull request in argument.
*
* @param pullRequest pull request containing the subscription name
* @return the ListenableFuture for the asynchronous execution, returning
* the list of {@link AcknowledgeablePubsubMessage} containing the ack ID, subscription
* and acknowledger
*/
private ListenableFuture<List<AcknowledgeablePubsubMessage>> pullAsync(PullRequest pullRequest) {
Assert.notNull(pullRequest, "The pull request can't be null.");
ApiFuture<PullResponse> pullFuture = this.subscriberStub.pullCallable().futureCall(pullRequest);
final SettableListenableFuture<List<AcknowledgeablePubsubMessage>> settableFuture = new SettableListenableFuture<>();
ApiFutures.addCallback(pullFuture, new ApiFutureCallback<PullResponse>() {
@Override
public void onFailure(Throwable throwable) {
settableFuture.setException(throwable);
}
@Override
public void onSuccess(PullResponse pullResponse) {
List<AcknowledgeablePubsubMessage> result = toAcknowledgeablePubsubMessageList(
pullResponse.getReceivedMessagesList(), pullRequest.getSubscription());
settableFuture.set(result);
}
}, asyncPullExecutor);
return settableFuture;
}
@Override
public final ListenableFuture<WebSocketSession> doHandshake(
WebSocketHandler handler, WebSocketHttpHeaders headers, URI url) {
Assert.notNull(handler, "WebSocketHandler is required");
Assert.notNull(url, "URL is required");
String scheme = url.getScheme();
if (!supportedProtocols.contains(scheme)) {
throw new IllegalArgumentException("Invalid scheme: '" + scheme + "'");
}
SettableListenableFuture<WebSocketSession> connectFuture = new SettableListenableFuture<WebSocketSession>();
try {
SockJsUrlInfo sockJsUrlInfo = new SockJsUrlInfo(url);
ServerInfo serverInfo = getServerInfo(sockJsUrlInfo, getHttpRequestHeaders(headers));
createRequest(sockJsUrlInfo, headers, serverInfo).connect(handler, connectFuture);
}
catch (Throwable exception) {
if (logger.isErrorEnabled()) {
logger.error("Initial SockJS \"Info\" request to server failed, url=" + url, exception);
}
connectFuture.setException(exception);
}
return connectFuture;
}
private SettableListenableFuture<Job> createJobFuture(Job pendingJob) {
// Prepare the polling task for the ListenableFuture result returned to end-user
SettableListenableFuture<Job> result = new SettableListenableFuture<>();
ScheduledFuture<?> scheduledFuture = taskScheduler.scheduleAtFixedRate(() -> {
Job job = pendingJob.reload();
if (State.DONE.equals(job.getStatus().getState())) {
if (job.getStatus().getError() != null) {
result.setException(
new BigQueryException(job.getStatus().getError().getMessage()));
}
else {
result.set(job);
}
}
}, this.jobPollInterval);
result.addCallback(
response -> scheduledFuture.cancel(true),
response -> {
pendingJob.cancel();
scheduledFuture.cancel(true);
});
return result;
}
@Test
public void sendWithExecutionException() throws Exception {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
IllegalStateException exception = new IllegalStateException("simulated exception");
SettableListenableFuture<Void> future = new SettableListenableFuture<>();
future.setException(exception);
when(this.connection.send(any())).thenReturn(future);
this.expected.expect(MessageDeliveryException.class);
this.expected.expectCause(Matchers.sameInstance(exception));
this.session.send("/topic/foo", "sample payload".getBytes(UTF_8));
verifyNoMoreInteractions(this.connection);
}
@Override
public ListenableFuture<WebSocketSession> connect(TransportRequest request, WebSocketHandler handler) {
final SettableListenableFuture<WebSocketSession> future = new SettableListenableFuture<WebSocketSession>();
WebSocketClientSockJsSession session = new WebSocketClientSockJsSession(request, handler, future);
handler = new ClientSockJsWebSocketHandler(session);
request.addTimeoutTask(session.getTimeoutTask());
URI url = request.getTransportUrl();
WebSocketHttpHeaders headers = new WebSocketHttpHeaders(request.getHandshakeHeaders());
if (logger.isDebugEnabled()) {
logger.debug("Starting WebSocket session url=" + url);
}
this.webSocketClient.doHandshake(handler, headers, url).addCallback(
new ListenableFutureCallback<WebSocketSession>() {
@Override
public void onSuccess(WebSocketSession webSocketSession) {
// WebSocket session ready, SockJS Session not yet
}
@Override
public void onFailure(Throwable ex) {
future.setException(ex);
}
});
return future;
}
@Override
public ListenableFuture<Void> shutdown() {
if (this.stopping) {
SettableListenableFuture<Void> future = new SettableListenableFuture<>();
future.set(null);
return future;
}
this.stopping = true;
Mono<Void> result;
if (this.channelGroup != null) {
result = FutureMono.from(this.channelGroup.close());
if (this.loopResources != null) {
result = result.onErrorResume(ex -> Mono.empty()).then(this.loopResources.disposeLater());
}
if (this.poolResources != null) {
result = result.onErrorResume(ex -> Mono.empty()).then(this.poolResources.disposeLater());
}
result = result.onErrorResume(ex -> Mono.empty()).then(stopScheduler());
}
else {
result = stopScheduler();
}
return new MonoToListenableFutureAdapter<>(result);
}
@Test
public void sendWithExecutionException() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
IllegalStateException exception = new IllegalStateException("simulated exception");
SettableListenableFuture<Void> future = new SettableListenableFuture<>();
future.setException(exception);
given(this.connection.send(any())).willReturn(future);
assertThatExceptionOfType(MessageDeliveryException.class).isThrownBy(() ->
this.session.send("/topic/foo", "sample payload".getBytes(StandardCharsets.UTF_8)))
.withCause(exception);
}
protected AbstractClientSockJsSession(TransportRequest request, WebSocketHandler handler,
SettableListenableFuture<WebSocketSession> connectFuture) {
Assert.notNull(request, "'request' is required");
Assert.notNull(handler, "'handler' is required");
Assert.notNull(connectFuture, "'connectFuture' is required");
this.request = request;
this.webSocketHandler = handler;
this.connectFuture = connectFuture;
}
@Override
protected void connectInternal(TransportRequest request, WebSocketHandler handler, URI receiveUrl,
HttpHeaders handshakeHeaders, XhrClientSockJsSession session,
SettableListenableFuture<WebSocketSession> connectFuture) {
executeReceiveRequest(request, receiveUrl, handshakeHeaders, session, connectFuture);
}
private void executeReceiveRequest(final TransportRequest transportRequest,
final URI url, final HttpHeaders headers, final XhrClientSockJsSession session,
final SettableListenableFuture<WebSocketSession> connectFuture) {
if (logger.isTraceEnabled()) {
logger.trace("Starting XHR receive request for " + url);
}
ClientCallback<ClientConnection> clientCallback = new ClientCallback<ClientConnection>() {
@Override
public void completed(ClientConnection connection) {
ClientRequest request = new ClientRequest().setMethod(Methods.POST).setPath(url.getPath());
HttpString headerName = HttpString.tryFromString(HttpHeaders.HOST);
request.getRequestHeaders().add(headerName, url.getHost());
addHttpHeaders(request, headers);
HttpHeaders httpHeaders = transportRequest.getHttpRequestHeaders();
connection.sendRequest(request, createReceiveCallback(transportRequest,
url, httpHeaders, session, connectFuture));
}
@Override
public void failed(IOException ex) {
throw new SockJsTransportFailureException("Failed to execute request to " + url, ex);
}
};
this.httpClient.connect(clientCallback, url, this.worker, this.bufferPool, this.optionMap);
}
public SockJsResponseListener(TransportRequest request, ClientConnection connection, URI url,
HttpHeaders headers, XhrClientSockJsSession sockJsSession,
SettableListenableFuture<WebSocketSession> connectFuture) {
this.request = request;
this.connection = connection;
this.url = url;
this.headers = headers;
this.session = sockJsSession;
this.connectFuture = connectFuture;
}
public XhrClientSockJsSession(TransportRequest request, WebSocketHandler handler,
XhrTransport transport, SettableListenableFuture<WebSocketSession> connectFuture) {
super(request, handler, connectFuture);
Assert.notNull(transport, "XhrTransport is required");
this.transport = transport;
this.headers = request.getHttpRequestHeaders();
this.sendHeaders = new HttpHeaders();
this.sendHeaders.putAll(this.headers);
this.sendHeaders.setContentType(MediaType.APPLICATION_JSON);
this.sendUrl = request.getSockJsUrlInfo().getTransportUrl(TransportType.XHR_SEND);
}
@Override
protected void connectInternal(TransportRequest transportRequest, WebSocketHandler handler,
URI url, HttpHeaders handshakeHeaders, XhrClientSockJsSession session,
SettableListenableFuture<WebSocketSession> connectFuture) {
HttpHeaders httpHeaders = transportRequest.getHttpRequestHeaders();
SockJsResponseListener listener = new SockJsResponseListener(url, httpHeaders, session, connectFuture);
executeReceiveRequest(url, handshakeHeaders, listener);
}
@Override
protected void connectInternal(TransportRequest transportRequest, WebSocketHandler handler,
URI url, HttpHeaders handshakeHeaders, XhrClientSockJsSession session,
SettableListenableFuture<WebSocketSession> connectFuture) {
HttpHeaders httpHeaders = transportRequest.getHttpRequestHeaders();
SockJsResponseListener listener = new SockJsResponseListener(url, httpHeaders, session, connectFuture);
executeReceiveRequest(url, handshakeHeaders, listener);
}
public SockJsResponseListener(URI url, HttpHeaders headers, XhrClientSockJsSession sockJsSession,
SettableListenableFuture<WebSocketSession> connectFuture) {
this.transportUrl = url;
this.receiveHeaders = headers;
this.connectFuture = connectFuture;
this.sockJsSession = sockJsSession;
}
@Override
protected void connectInternal(TransportRequest request, WebSocketHandler handler, URI receiveUrl,
HttpHeaders handshakeHeaders, XhrClientSockJsSession session,
SettableListenableFuture<WebSocketSession> connectFuture) {
this.actualHandshakeHeaders = handshakeHeaders;
this.actualSession = session;
}
@SuppressWarnings("unchecked")
@Before
public void setup() throws Exception {
this.connectCallback = mock(ListenableFutureCallback.class);
this.connectFuture = new SettableListenableFuture<>();
this.connectFuture.addCallback(this.connectCallback);
this.webSocketTransport = new TestTransport("WebSocketTestTransport");
this.xhrTransport = new TestTransport("XhrTestTransport");
}
@Before
public void setup() throws Exception {
SockJsUrlInfo urlInfo = new SockJsUrlInfo(new URI("https://example.com"));
Transport transport = mock(Transport.class);
TransportRequest request = new DefaultTransportRequest(urlInfo, null, null, transport, TransportType.XHR, CODEC);
this.handler = mock(WebSocketHandler.class);
this.connectFuture = new SettableListenableFuture<>();
this.session = new TestClientSockJsSession(request, this.handler, this.connectFuture);
}