下面列出了怎么用org.springframework.messaging.tcp.TcpConnection的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Clean up state associated with the connection and close it.
* Any exception arising from closing the connection are propagated.
*/
public void clearConnection() {
if (logger.isDebugEnabled()) {
logger.debug("Cleaning up connection state for session " + this.sessionId);
}
if (this.isRemoteClientSession) {
StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId);
}
this.isStompConnected = false;
TcpConnection<byte[]> conn = this.tcpConnection;
this.tcpConnection = null;
if (conn != null) {
if (logger.isDebugEnabled()) {
logger.debug("Closing TCP connection in session " + this.sessionId);
}
conn.close();
}
}
private void sendSystemSubscriptions() {
int i = 0;
for (String destination : getSystemSubscriptions().keySet()) {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
accessor.setSubscriptionId(String.valueOf(i++));
accessor.setDestination(destination);
if (logger.isDebugEnabled()) {
logger.debug("Subscribing to " + destination + " on \"system\" connection.");
}
TcpConnection<byte[]> conn = getTcpConnection();
if (conn != null) {
MessageHeaders headers = accessor.getMessageHeaders();
conn.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers)).addCallback(
result -> {},
ex -> {
String error = "Failed to subscribe in \"system\" session.";
handleTcpConnectionFailure(error, ex);
});
}
}
}
@Override
@SuppressWarnings("unchecked")
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
inbound.withConnection(conn -> {
if (logger.isDebugEnabled()) {
logger.debug("Connected to " + conn.address());
}
});
DirectProcessor<Void> completion = DirectProcessor.create();
TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion);
scheduler.schedule(() -> this.connectionHandler.afterConnected(connection));
inbound.withConnection(conn -> conn.addHandler(new StompMessageDecoder<>(codec)));
inbound.receiveObject()
.cast(Message.class)
.publishOn(scheduler, PUBLISH_ON_BUFFER_SIZE)
.subscribe(
this.connectionHandler::handleMessage,
this.connectionHandler::handleFailure,
this.connectionHandler::afterConnectionClosed);
return completion;
}
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void cancelInactivityTasks() throws Exception {
TcpConnection<byte[]> tcpConnection = getTcpConnection();
ScheduledFuture future = mock(ScheduledFuture.class);
given(this.taskScheduler.scheduleWithFixedDelay(any(), eq(1L))).willReturn(future);
tcpConnection.onReadInactivity(mock(Runnable.class), 2L);
tcpConnection.onWriteInactivity(mock(Runnable.class), 2L);
this.webSocketHandlerCaptor.getValue().afterConnectionClosed(this.webSocketSession, CloseStatus.NORMAL);
verify(future, times(2)).cancel(true);
verifyNoMoreInteractions(future);
}
/**
* Clean up state associated with the connection and close it.
* Any exception arising from closing the connection are propagated.
*/
public void clearConnection() {
if (logger.isDebugEnabled()) {
logger.debug("Cleaning up connection state for session " + this.sessionId);
}
if (this.isRemoteClientSession) {
StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId);
}
this.isStompConnected = false;
TcpConnection<byte[]> conn = this.tcpConnection;
this.tcpConnection = null;
if (conn != null) {
if (logger.isDebugEnabled()) {
logger.debug("Closing TCP connection in session " + this.sessionId);
}
conn.close();
}
}
private void sendSystemSubscriptions() {
int i = 0;
for (String destination : getSystemSubscriptions().keySet()) {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
accessor.setSubscriptionId(String.valueOf(i++));
accessor.setDestination(destination);
if (logger.isDebugEnabled()) {
logger.debug("Subscribing to " + destination + " on \"system\" connection.");
}
TcpConnection<byte[]> conn = getTcpConnection();
if (conn != null) {
MessageHeaders headers = accessor.getMessageHeaders();
conn.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers)).addCallback(
result -> {},
ex -> {
String error = "Failed to subscribe in \"system\" session.";
handleTcpConnectionFailure(error, ex);
});
}
}
}
@Override
@SuppressWarnings("unchecked")
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
inbound.withConnection(conn -> {
if (logger.isDebugEnabled()) {
logger.debug("Connected to " + conn.address());
}
});
DirectProcessor<Void> completion = DirectProcessor.create();
TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion);
scheduler.schedule(() -> this.connectionHandler.afterConnected(connection));
inbound.withConnection(conn -> conn.addHandler(new StompMessageDecoder<>(codec)));
inbound.receiveObject()
.cast(Message.class)
.publishOn(scheduler, PUBLISH_ON_BUFFER_SIZE)
.subscribe(
this.connectionHandler::handleMessage,
this.connectionHandler::handleFailure,
this.connectionHandler::afterConnectionClosed);
return completion;
}
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void cancelInactivityTasks() throws Exception {
TcpConnection<byte[]> tcpConnection = getTcpConnection();
ScheduledFuture future = mock(ScheduledFuture.class);
when(this.taskScheduler.scheduleWithFixedDelay(any(), eq(1L))).thenReturn(future);
tcpConnection.onReadInactivity(mock(Runnable.class), 2L);
tcpConnection.onWriteInactivity(mock(Runnable.class), 2L);
this.webSocketHandlerCaptor.getValue().afterConnectionClosed(this.webSocketSession, CloseStatus.NORMAL);
verify(future, times(2)).cancel(true);
verifyNoMoreInteractions(future);
}
/**
* Clean up state associated with the connection and close it.
* Any exception arising from closing the connection are propagated.
*/
public void clearConnection() {
if (logger.isDebugEnabled()) {
logger.debug("Cleaning up connection state for session " + this.sessionId);
}
if (this.isRemoteClientSession) {
StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId);
}
this.isStompConnected = false;
TcpConnection<byte[]> conn = this.tcpConnection;
this.tcpConnection = null;
if (conn != null) {
if (logger.isDebugEnabled()) {
logger.debug("Closing TCP connection in session " + this.sessionId);
}
conn.close();
}
}
private void sendSystemSubscriptions() {
int i = 0;
for (String destination : getSystemSubscriptions().keySet()) {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
accessor.setSubscriptionId(String.valueOf(i++));
accessor.setDestination(destination);
if (logger.isDebugEnabled()) {
logger.debug("Subscribing to " + destination + " on \"system\" connection.");
}
TcpConnection<byte[]> conn = getTcpConnection();
if (conn != null) {
MessageHeaders headers = accessor.getMessageHeaders();
conn.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers)).addCallback(
new ListenableFutureCallback<Void>() {
public void onSuccess(Void result) {
}
public void onFailure(Throwable ex) {
String error = "Failed to subscribe in \"system\" session.";
handleTcpConnectionFailure(error, ex);
}
});
}
}
}
@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void cancelInactivityTasks() throws Exception {
TcpConnection<byte[]> tcpConnection = getTcpConnection();
ScheduledFuture future = mock(ScheduledFuture.class);
when(this.taskScheduler.scheduleWithFixedDelay(any(), eq(1L))).thenReturn(future);
tcpConnection.onReadInactivity(mock(Runnable.class), 2L);
tcpConnection.onWriteInactivity(mock(Runnable.class), 2L);
this.webSocketHandlerCaptor.getValue().afterConnectionClosed(this.webSocketSession, CloseStatus.NORMAL);
verify(future, times(2)).cancel(true);
verifyNoMoreInteractions(future);
}
@Override
public void afterConnected(TcpConnection<byte[]> connection) {
if (logger.isDebugEnabled()) {
logger.debug("TCP connection opened in session=" + getSessionId());
}
this.tcpConnection = connection;
connection.onReadInactivity(() -> {
if (this.tcpConnection != null && !this.isStompConnected) {
handleTcpConnectionFailure("No CONNECTED frame received in " +
MAX_TIME_TO_CONNECTED_FRAME + " ms.", null);
}
}, MAX_TIME_TO_CONNECTED_FRAME);
connection.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, this.connectHeaders.getMessageHeaders()));
}
@Override
public void afterConnected(TcpConnection<byte[]> connection) {
this.connection = connection;
if (logger.isDebugEnabled()) {
logger.debug("Connection established in session id=" + this.sessionId);
}
StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.CONNECT);
accessor.addNativeHeaders(this.connectHeaders);
if (this.connectHeaders.getAcceptVersion() == null) {
accessor.setAcceptVersion("1.1,1.2");
}
Message<byte[]> message = createMessage(accessor, EMPTY_PAYLOAD);
execute(message);
}
private void resetConnection() {
TcpConnection<?> conn = this.connection;
this.connection = null;
if (conn != null) {
try {
conn.close();
}
catch (Throwable ex) {
// ignore
}
}
}
@Override
public void run() {
TcpConnection<byte[]> conn = connection;
if (conn != null) {
conn.send(HEARTBEAT).addCallback(
new ListenableFutureCallback<Void>() {
public void onSuccess(@Nullable Void result) {
}
public void onFailure(Throwable ex) {
handleFailure(ex);
}
});
}
}
@Test
public void readInactivityAfterDelayHasElapsed() throws Exception {
TcpConnection<byte[]> tcpConnection = getTcpConnection();
Runnable runnable = mock(Runnable.class);
long delay = 2;
tcpConnection.onReadInactivity(runnable, delay);
testInactivityTaskScheduling(runnable, delay, 10);
}
@Test
public void readInactivityBeforeDelayHasElapsed() throws Exception {
TcpConnection<byte[]> tcpConnection = getTcpConnection();
Runnable runnable = mock(Runnable.class);
long delay = 10000;
tcpConnection.onReadInactivity(runnable, delay);
testInactivityTaskScheduling(runnable, delay, 0);
}
@Test
public void writeInactivityAfterDelayHasElapsed() throws Exception {
TcpConnection<byte[]> tcpConnection = getTcpConnection();
Runnable runnable = mock(Runnable.class);
long delay = 2;
tcpConnection.onWriteInactivity(runnable, delay);
testInactivityTaskScheduling(runnable, delay, 10);
}
@Test
public void writeInactivityBeforeDelayHasElapsed() throws Exception {
TcpConnection<byte[]> tcpConnection = getTcpConnection();
Runnable runnable = mock(Runnable.class);
long delay = 1000;
tcpConnection.onWriteInactivity(runnable, delay);
testInactivityTaskScheduling(runnable, delay, 0);
}
@Override
public void afterConnected(TcpConnection<byte[]> connection) {
if (logger.isDebugEnabled()) {
logger.debug("TCP connection opened in session=" + getSessionId());
}
this.tcpConnection = connection;
connection.onReadInactivity(() -> {
if (this.tcpConnection != null && !this.isStompConnected) {
handleTcpConnectionFailure("No CONNECTED frame received in " +
MAX_TIME_TO_CONNECTED_FRAME + " ms.", null);
}
}, MAX_TIME_TO_CONNECTED_FRAME);
connection.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, this.connectHeaders.getMessageHeaders()));
}
@Override
public void afterConnected(TcpConnection<byte[]> connection) {
this.connection = connection;
if (logger.isDebugEnabled()) {
logger.debug("Connection established in session id=" + this.sessionId);
}
StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.CONNECT);
accessor.addNativeHeaders(this.connectHeaders);
if (this.connectHeaders.getAcceptVersion() == null) {
accessor.setAcceptVersion("1.1,1.2");
}
Message<byte[]> message = createMessage(accessor, EMPTY_PAYLOAD);
execute(message);
}
private void resetConnection() {
TcpConnection<?> conn = this.connection;
this.connection = null;
if (conn != null) {
try {
conn.close();
}
catch (Throwable ex) {
// ignore
}
}
}
@Override
public void run() {
TcpConnection<byte[]> conn = connection;
if (conn != null) {
conn.send(HEARTBEAT).addCallback(
new ListenableFutureCallback<Void>() {
public void onSuccess(@Nullable Void result) {
}
public void onFailure(Throwable ex) {
handleFailure(ex);
}
});
}
}
@Test
public void readInactivityAfterDelayHasElapsed() throws Exception {
TcpConnection<byte[]> tcpConnection = getTcpConnection();
Runnable runnable = mock(Runnable.class);
long delay = 2;
tcpConnection.onReadInactivity(runnable, delay);
testInactivityTaskScheduling(runnable, delay, 10);
}
@Test
public void readInactivityBeforeDelayHasElapsed() throws Exception {
TcpConnection<byte[]> tcpConnection = getTcpConnection();
Runnable runnable = mock(Runnable.class);
long delay = 10000;
tcpConnection.onReadInactivity(runnable, delay);
testInactivityTaskScheduling(runnable, delay, 0);
}
@Test
public void writeInactivityAfterDelayHasElapsed() throws Exception {
TcpConnection<byte[]> tcpConnection = getTcpConnection();
Runnable runnable = mock(Runnable.class);
long delay = 2;
tcpConnection.onWriteInactivity(runnable, delay);
testInactivityTaskScheduling(runnable, delay, 10);
}
@Test
public void writeInactivityBeforeDelayHasElapsed() throws Exception {
TcpConnection<byte[]> tcpConnection = getTcpConnection();
Runnable runnable = mock(Runnable.class);
long delay = 1000;
tcpConnection.onWriteInactivity(runnable, delay);
testInactivityTaskScheduling(runnable, delay, 0);
}
@Override
public void afterConnected(TcpConnection<byte[]> connection) {
if (logger.isDebugEnabled()) {
logger.debug("TCP connection opened in session=" + getSessionId());
}
this.tcpConnection = connection;
connection.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, this.connectHeaders.getMessageHeaders()));
}
@Override
public void afterConnected(TcpConnection<byte[]> connection) {
this.connection = connection;
if (logger.isDebugEnabled()) {
logger.debug("Connection established in session id=" + this.sessionId);
}
StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.CONNECT);
accessor.addNativeHeaders(this.connectHeaders);
accessor.setAcceptVersion("1.1,1.2");
Message<byte[]> message = createMessage(accessor, EMPTY_PAYLOAD);
execute(message);
}
private void resetConnection() {
TcpConnection<?> conn = this.connection;
this.connection = null;
if (conn != null) {
try {
conn.close();
}
catch (Throwable ex) {
// Ignore
}
}
}