类org.springframework.messaging.tcp.TcpConnection源码实例Demo

下面列出了怎么用org.springframework.messaging.tcp.TcpConnection的API类实例代码及写法,或者点击链接到github查看源代码。

/**
 * Clean up state associated with the connection and close it.
 * Any exception arising from closing the connection are propagated.
 */
public void clearConnection() {
	if (logger.isDebugEnabled()) {
		logger.debug("Cleaning up connection state for session " + this.sessionId);
	}

	if (this.isRemoteClientSession) {
		StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId);
	}

	this.isStompConnected = false;

	TcpConnection<byte[]> conn = this.tcpConnection;
	this.tcpConnection = null;
	if (conn != null) {
		if (logger.isDebugEnabled()) {
			logger.debug("Closing TCP connection in session " + this.sessionId);
		}
		conn.close();
	}
}
 
private void sendSystemSubscriptions() {
	int i = 0;
	for (String destination : getSystemSubscriptions().keySet()) {
		StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
		accessor.setSubscriptionId(String.valueOf(i++));
		accessor.setDestination(destination);
		if (logger.isDebugEnabled()) {
			logger.debug("Subscribing to " + destination + " on \"system\" connection.");
		}
		TcpConnection<byte[]> conn = getTcpConnection();
		if (conn != null) {
			MessageHeaders headers = accessor.getMessageHeaders();
			conn.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers)).addCallback(
					result -> {},
					ex -> {
						String error = "Failed to subscribe in \"system\" session.";
						handleTcpConnectionFailure(error, ex);
					});
		}
	}
}
 
@Override
@SuppressWarnings("unchecked")
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
	inbound.withConnection(conn -> {
		if (logger.isDebugEnabled()) {
			logger.debug("Connected to " + conn.address());
		}
	});
	DirectProcessor<Void> completion = DirectProcessor.create();
	TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound,  codec, completion);
	scheduler.schedule(() -> this.connectionHandler.afterConnected(connection));

	inbound.withConnection(conn -> conn.addHandler(new StompMessageDecoder<>(codec)));

	inbound.receiveObject()
			.cast(Message.class)
			.publishOn(scheduler, PUBLISH_ON_BUFFER_SIZE)
			.subscribe(
					this.connectionHandler::handleMessage,
					this.connectionHandler::handleFailure,
					this.connectionHandler::afterConnectionClosed);

	return completion;
}
 
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void cancelInactivityTasks() throws Exception {
	TcpConnection<byte[]> tcpConnection = getTcpConnection();

	ScheduledFuture future = mock(ScheduledFuture.class);
	given(this.taskScheduler.scheduleWithFixedDelay(any(), eq(1L))).willReturn(future);

	tcpConnection.onReadInactivity(mock(Runnable.class), 2L);
	tcpConnection.onWriteInactivity(mock(Runnable.class), 2L);

	this.webSocketHandlerCaptor.getValue().afterConnectionClosed(this.webSocketSession, CloseStatus.NORMAL);

	verify(future, times(2)).cancel(true);
	verifyNoMoreInteractions(future);
}
 
/**
 * Clean up state associated with the connection and close it.
 * Any exception arising from closing the connection are propagated.
 */
public void clearConnection() {
	if (logger.isDebugEnabled()) {
		logger.debug("Cleaning up connection state for session " + this.sessionId);
	}

	if (this.isRemoteClientSession) {
		StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId);
	}

	this.isStompConnected = false;

	TcpConnection<byte[]> conn = this.tcpConnection;
	this.tcpConnection = null;
	if (conn != null) {
		if (logger.isDebugEnabled()) {
			logger.debug("Closing TCP connection in session " + this.sessionId);
		}
		conn.close();
	}
}
 
private void sendSystemSubscriptions() {
	int i = 0;
	for (String destination : getSystemSubscriptions().keySet()) {
		StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
		accessor.setSubscriptionId(String.valueOf(i++));
		accessor.setDestination(destination);
		if (logger.isDebugEnabled()) {
			logger.debug("Subscribing to " + destination + " on \"system\" connection.");
		}
		TcpConnection<byte[]> conn = getTcpConnection();
		if (conn != null) {
			MessageHeaders headers = accessor.getMessageHeaders();
			conn.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers)).addCallback(
					result -> {},
					ex -> {
						String error = "Failed to subscribe in \"system\" session.";
						handleTcpConnectionFailure(error, ex);
					});
		}
	}
}
 
@Override
@SuppressWarnings("unchecked")
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
	inbound.withConnection(conn -> {
		if (logger.isDebugEnabled()) {
			logger.debug("Connected to " + conn.address());
		}
	});
	DirectProcessor<Void> completion = DirectProcessor.create();
	TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound,  codec, completion);
	scheduler.schedule(() -> this.connectionHandler.afterConnected(connection));

	inbound.withConnection(conn -> conn.addHandler(new StompMessageDecoder<>(codec)));

	inbound.receiveObject()
			.cast(Message.class)
			.publishOn(scheduler, PUBLISH_ON_BUFFER_SIZE)
			.subscribe(
					this.connectionHandler::handleMessage,
					this.connectionHandler::handleFailure,
					this.connectionHandler::afterConnectionClosed);

	return completion;
}
 
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void cancelInactivityTasks() throws Exception {
	TcpConnection<byte[]> tcpConnection = getTcpConnection();

	ScheduledFuture future = mock(ScheduledFuture.class);
	when(this.taskScheduler.scheduleWithFixedDelay(any(), eq(1L))).thenReturn(future);

	tcpConnection.onReadInactivity(mock(Runnable.class), 2L);
	tcpConnection.onWriteInactivity(mock(Runnable.class), 2L);

	this.webSocketHandlerCaptor.getValue().afterConnectionClosed(this.webSocketSession, CloseStatus.NORMAL);

	verify(future, times(2)).cancel(true);
	verifyNoMoreInteractions(future);
}
 
/**
 * Clean up state associated with the connection and close it.
 * Any exception arising from closing the connection are propagated.
 */
public void clearConnection() {
	if (logger.isDebugEnabled()) {
		logger.debug("Cleaning up connection state for session " + this.sessionId);
	}

	if (this.isRemoteClientSession) {
		StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId);
	}

	this.isStompConnected = false;

	TcpConnection<byte[]> conn = this.tcpConnection;
	this.tcpConnection = null;
	if (conn != null) {
		if (logger.isDebugEnabled()) {
			logger.debug("Closing TCP connection in session " + this.sessionId);
		}
		conn.close();
	}
}
 
private void sendSystemSubscriptions() {
	int i = 0;
	for (String destination : getSystemSubscriptions().keySet()) {
		StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
		accessor.setSubscriptionId(String.valueOf(i++));
		accessor.setDestination(destination);
		if (logger.isDebugEnabled()) {
			logger.debug("Subscribing to " + destination + " on \"system\" connection.");
		}
		TcpConnection<byte[]> conn = getTcpConnection();
		if (conn != null) {
			MessageHeaders headers = accessor.getMessageHeaders();
			conn.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers)).addCallback(
					new ListenableFutureCallback<Void>() {
						public void onSuccess(Void result) {
						}
						public void onFailure(Throwable ex) {
							String error = "Failed to subscribe in \"system\" session.";
							handleTcpConnectionFailure(error, ex);
						}
					});
		}
	}
}
 
@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void cancelInactivityTasks() throws Exception {
	TcpConnection<byte[]> tcpConnection = getTcpConnection();

	ScheduledFuture future = mock(ScheduledFuture.class);
	when(this.taskScheduler.scheduleWithFixedDelay(any(), eq(1L))).thenReturn(future);

	tcpConnection.onReadInactivity(mock(Runnable.class), 2L);
	tcpConnection.onWriteInactivity(mock(Runnable.class), 2L);

	this.webSocketHandlerCaptor.getValue().afterConnectionClosed(this.webSocketSession, CloseStatus.NORMAL);

	verify(future, times(2)).cancel(true);
	verifyNoMoreInteractions(future);
}
 
@Override
public void afterConnected(TcpConnection<byte[]> connection) {
	if (logger.isDebugEnabled()) {
		logger.debug("TCP connection opened in session=" + getSessionId());
	}
	this.tcpConnection = connection;
	connection.onReadInactivity(() -> {
		if (this.tcpConnection != null && !this.isStompConnected) {
			handleTcpConnectionFailure("No CONNECTED frame received in " +
					MAX_TIME_TO_CONNECTED_FRAME + " ms.", null);
		}
	}, MAX_TIME_TO_CONNECTED_FRAME);
	connection.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, this.connectHeaders.getMessageHeaders()));
}
 
源代码13 项目: spring-analysis-note   文件: DefaultStompSession.java
@Override
public void afterConnected(TcpConnection<byte[]> connection) {
	this.connection = connection;
	if (logger.isDebugEnabled()) {
		logger.debug("Connection established in session id=" + this.sessionId);
	}
	StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.CONNECT);
	accessor.addNativeHeaders(this.connectHeaders);
	if (this.connectHeaders.getAcceptVersion() == null) {
		accessor.setAcceptVersion("1.1,1.2");
	}
	Message<byte[]> message = createMessage(accessor, EMPTY_PAYLOAD);
	execute(message);
}
 
源代码14 项目: spring-analysis-note   文件: DefaultStompSession.java
private void resetConnection() {
	TcpConnection<?> conn = this.connection;
	this.connection = null;
	if (conn != null) {
		try {
			conn.close();
		}
		catch (Throwable ex) {
			// ignore
		}
	}
}
 
源代码15 项目: spring-analysis-note   文件: DefaultStompSession.java
@Override
public void run() {
	TcpConnection<byte[]> conn = connection;
	if (conn != null) {
		conn.send(HEARTBEAT).addCallback(
				new ListenableFutureCallback<Void>() {
					public void onSuccess(@Nullable Void result) {
					}
					public void onFailure(Throwable ex) {
						handleFailure(ex);
					}
				});
	}
}
 
@Test
public void readInactivityAfterDelayHasElapsed() throws Exception {
	TcpConnection<byte[]> tcpConnection = getTcpConnection();
	Runnable runnable = mock(Runnable.class);
	long delay = 2;
	tcpConnection.onReadInactivity(runnable, delay);
	testInactivityTaskScheduling(runnable, delay, 10);
}
 
@Test
public void readInactivityBeforeDelayHasElapsed() throws Exception {
	TcpConnection<byte[]> tcpConnection = getTcpConnection();
	Runnable runnable = mock(Runnable.class);
	long delay = 10000;
	tcpConnection.onReadInactivity(runnable, delay);
	testInactivityTaskScheduling(runnable, delay, 0);
}
 
@Test
public void writeInactivityAfterDelayHasElapsed() throws Exception {
	TcpConnection<byte[]> tcpConnection = getTcpConnection();
	Runnable runnable = mock(Runnable.class);
	long delay = 2;
	tcpConnection.onWriteInactivity(runnable, delay);
	testInactivityTaskScheduling(runnable, delay, 10);
}
 
@Test
public void writeInactivityBeforeDelayHasElapsed() throws Exception {
	TcpConnection<byte[]> tcpConnection = getTcpConnection();
	Runnable runnable = mock(Runnable.class);
	long delay = 1000;
	tcpConnection.onWriteInactivity(runnable, delay);
	testInactivityTaskScheduling(runnable, delay, 0);
}
 
@Override
public void afterConnected(TcpConnection<byte[]> connection) {
	if (logger.isDebugEnabled()) {
		logger.debug("TCP connection opened in session=" + getSessionId());
	}
	this.tcpConnection = connection;
	connection.onReadInactivity(() -> {
		if (this.tcpConnection != null && !this.isStompConnected) {
			handleTcpConnectionFailure("No CONNECTED frame received in " +
					MAX_TIME_TO_CONNECTED_FRAME + " ms.", null);
		}
	}, MAX_TIME_TO_CONNECTED_FRAME);
	connection.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, this.connectHeaders.getMessageHeaders()));
}
 
@Override
public void afterConnected(TcpConnection<byte[]> connection) {
	this.connection = connection;
	if (logger.isDebugEnabled()) {
		logger.debug("Connection established in session id=" + this.sessionId);
	}
	StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.CONNECT);
	accessor.addNativeHeaders(this.connectHeaders);
	if (this.connectHeaders.getAcceptVersion() == null) {
		accessor.setAcceptVersion("1.1,1.2");
	}
	Message<byte[]> message = createMessage(accessor, EMPTY_PAYLOAD);
	execute(message);
}
 
private void resetConnection() {
	TcpConnection<?> conn = this.connection;
	this.connection = null;
	if (conn != null) {
		try {
			conn.close();
		}
		catch (Throwable ex) {
			// ignore
		}
	}
}
 
@Override
public void run() {
	TcpConnection<byte[]> conn = connection;
	if (conn != null) {
		conn.send(HEARTBEAT).addCallback(
				new ListenableFutureCallback<Void>() {
					public void onSuccess(@Nullable Void result) {
					}
					public void onFailure(Throwable ex) {
						handleFailure(ex);
					}
				});
	}
}
 
@Test
public void readInactivityAfterDelayHasElapsed() throws Exception {
	TcpConnection<byte[]> tcpConnection = getTcpConnection();
	Runnable runnable = mock(Runnable.class);
	long delay = 2;
	tcpConnection.onReadInactivity(runnable, delay);
	testInactivityTaskScheduling(runnable, delay, 10);
}
 
@Test
public void readInactivityBeforeDelayHasElapsed() throws Exception {
	TcpConnection<byte[]> tcpConnection = getTcpConnection();
	Runnable runnable = mock(Runnable.class);
	long delay = 10000;
	tcpConnection.onReadInactivity(runnable, delay);
	testInactivityTaskScheduling(runnable, delay, 0);
}
 
@Test
public void writeInactivityAfterDelayHasElapsed() throws Exception {
	TcpConnection<byte[]> tcpConnection = getTcpConnection();
	Runnable runnable = mock(Runnable.class);
	long delay = 2;
	tcpConnection.onWriteInactivity(runnable, delay);
	testInactivityTaskScheduling(runnable, delay, 10);
}
 
@Test
public void writeInactivityBeforeDelayHasElapsed() throws Exception {
	TcpConnection<byte[]> tcpConnection = getTcpConnection();
	Runnable runnable = mock(Runnable.class);
	long delay = 1000;
	tcpConnection.onWriteInactivity(runnable, delay);
	testInactivityTaskScheduling(runnable, delay, 0);
}
 
@Override
public void afterConnected(TcpConnection<byte[]> connection) {
	if (logger.isDebugEnabled()) {
		logger.debug("TCP connection opened in session=" + getSessionId());
	}
	this.tcpConnection = connection;
	connection.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, this.connectHeaders.getMessageHeaders()));
}
 
@Override
public void afterConnected(TcpConnection<byte[]> connection) {
	this.connection = connection;
	if (logger.isDebugEnabled()) {
		logger.debug("Connection established in session id=" + this.sessionId);
	}
	StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.CONNECT);
	accessor.addNativeHeaders(this.connectHeaders);
	accessor.setAcceptVersion("1.1,1.2");
	Message<byte[]> message = createMessage(accessor, EMPTY_PAYLOAD);
	execute(message);
}
 
private void resetConnection() {
	TcpConnection<?> conn = this.connection;
	this.connection = null;
	if (conn != null) {
		try {
			conn.close();
		}
		catch (Throwable ex) {
			// Ignore
		}
	}
}
 
 类方法
 同包方法