下面列出了io.netty.channel.Channel#isWritable ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Helper method to create, write and flush the keepalive message.
*/
private void createAndWriteKeepAlive(final ChannelHandlerContext ctx) {
final CouchbaseRequest keepAlive = createKeepAliveRequest();
if (keepAlive != null) {
Subscriber<CouchbaseResponse> subscriber = new KeepAliveResponseAction(ctx);
keepAlive.subscriber(subscriber);
keepAlive
.observable()
.timeout(env().keepAliveTimeout(), TimeUnit.MILLISECONDS)
.subscribe(subscriber);
onKeepAliveFired(ctx, keepAlive);
Channel channel = ctx.channel();
if (channel.isActive() && channel.isWritable()) {
ctx.pipeline().writeAndFlush(keepAlive);
}
}
}
private static void scheduleToSendData() {
Runnable runnable = () ->{
ConsoleUtil.clearConsole(true);
Channel channel = modbusClient.getChannel();
if(channel==null||(!channel.isActive())||!channel.isOpen()||!channel.isWritable())
return;
ChannelSender sender = ChannelSenderFactory.getInstance().get(channel);
//short unitIdentifier=1;
//ChannelSender sender2 =new ChannelSender(channel, unitIdentifier);
int startAddress=0;
int quantityOfCoils=4;
try {
sender.readCoilsAsync(startAddress, quantityOfCoils);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
};
int sleep=1000;
ScheduledUtil.getInstance().scheduleAtFixedRate(runnable, sleep * 5);
}
private static void scheduleToSendData() {
Runnable runnable = () -> {
ConsoleUtil.clearConsole(true);
Collection<Channel> channels = modbusServer.getChannels();
for (Channel channel : channels) {
if (channel == null || (!channel.isActive()) || !channel.isOpen() || !channel.isWritable())
return;
ChannelSender sender = ChannelSenderFactory.getInstance().get(channel);
// short unitIdentifier=1;
// ChannelSender sender2 =new ChannelSender(channel, unitIdentifier);
int startAddress = 0;
int quantityOfCoils = 4;
try {
sender.readCoilsAsync(startAddress, quantityOfCoils);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
int sleep = 1000;
ScheduledUtil.getInstance().scheduleAtFixedRate(runnable, sleep * 5);
}
public static String channelInfoForLogging(Channel ch)
{
if (ch == null) {
return "null";
}
String channelInfo = ch.toString()
+ ", active=" + ch.isActive()
+ ", open=" + ch.isOpen()
+ ", registered=" + ch.isRegistered()
+ ", writable=" + ch.isWritable()
+ ", id=" + ch.id();
CurrentPassport passport = CurrentPassport.fromChannel(ch);
return "Channel: " + channelInfo + ", Passport: " + String.valueOf(passport);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
ChannelConfig config = ch.config();
// 高水位线: ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK
// 低水位线: ChannelOption.WRITE_BUFFER_LOW_WATER_MARK
if (!ch.isWritable()) {
// 当前channel的缓冲区(OutboundBuffer)大小超过了WRITE_BUFFER_HIGH_WATER_MARK
if (logger.isWarnEnabled()) {
logger.warn("{} is not writable, high water mask: {}, the number of flushed entries that are not written yet: {}.",
ch, config.getWriteBufferHighWaterMark(), ch.unsafe().outboundBuffer().size());
}
config.setAutoRead(false);
} else {
// 曾经高于高水位线的OutboundBuffer现在已经低于WRITE_BUFFER_LOW_WATER_MARK了
if (logger.isWarnEnabled()) {
logger.warn("{} is writable(rehabilitate), low water mask: {}, the number of flushed entries that are not written yet: {}.",
ch, config.getWriteBufferLowWaterMark(), ch.unsafe().outboundBuffer().size());
}
config.setAutoRead(true);
}
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
ChannelConfig config = ch.config();
// 高水位线: ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK
// 低水位线: ChannelOption.WRITE_BUFFER_LOW_WATER_MARK
if (!ch.isWritable()) {
// 当前channel的缓冲区(OutboundBuffer)大小超过了WRITE_BUFFER_HIGH_WATER_MARK
if (logger.isWarnEnabled()) {
logger.warn("{} is not writable, high water mask: {}, the number of flushed entries that are not written yet: {}.",
ch, config.getWriteBufferHighWaterMark(), ch.unsafe().outboundBuffer().size());
}
config.setAutoRead(false);
} else {
// 曾经高于高水位线的OutboundBuffer现在已经低于WRITE_BUFFER_LOW_WATER_MARK了
if (logger.isWarnEnabled()) {
logger.warn("{} is writable(rehabilitate), low water mask: {}, the number of flushed entries that are not written yet: {}.",
ch, config.getWriteBufferLowWaterMark(), ch.unsafe().outboundBuffer().size());
}
config.setAutoRead(true);
}
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
ChannelConfig config = ch.config();
// 高水位线: ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK
// 低水位线: ChannelOption.WRITE_BUFFER_LOW_WATER_MARK
if (!ch.isWritable()) {
// 当前channel的缓冲区(OutboundBuffer)大小超过了WRITE_BUFFER_HIGH_WATER_MARK
if (logger.isWarnEnabled()) {
logger.warn("{} is not writable, high water mask: {}, the number of flushed entries that are not written yet: {}.",
ch, config.getWriteBufferHighWaterMark(), ch.unsafe().outboundBuffer().size());
}
config.setAutoRead(false);
} else {
// 曾经高于高水位线的OutboundBuffer现在已经低于WRITE_BUFFER_LOW_WATER_MARK了
if (logger.isWarnEnabled()) {
logger.warn("{} is writable(rehabilitate), low water mask: {}, the number of flushed entries that are not written yet: {}.",
ch, config.getWriteBufferLowWaterMark(), ch.unsafe().outboundBuffer().size());
}
config.setAutoRead(true);
}
}
/**
* 实际发送消息方法
*
* @param pushMessage
* @param status
* @param messageInfo
* @param deviceId
* @return
*/
private MessagePushedInfo makeMessageInfoToDevice(ChannelGroup mchannels, MessageInfo messageInfo, DeviceInfo deviceInfo) {
// System.out.println("makeMessageInfoToDevice come in!");
// 获取设备消息发送对象
MessagePushedInfo messagePushedInfo = getMessagePushedInfo(messageInfo, deviceInfo);
if (messagePushedInfo != null) {
// 发送消息
if (deviceInfo != null && deviceInfo.getIsOnline() == DEVICE_ONLINE_YES) {
// 如果设备在线 则添加发送通道
ChannelDeviceInfo channelDeviceInfo = this.getChannelDeviceInfoFromCache(deviceInfo.getDeviceId());
// System.out.println("makeMessageInfoToDevice channelDeviceInfo=" + channelDeviceInfo);
Channel channel = channelDeviceInfo == null ? null : channelDeviceInfo.getChannel();
if (channel != null && channel.isWritable()) {
mchannels.add(channel);
} else {
return null;
}
}
}
return messagePushedInfo;
}
protected void sendRequest() {
if (!busy.compareAndSet(false, true)) {
return;
}
Channel channel = ctx.channel();
try {
boolean flush = false;
while (!requestQueue.isEmpty() && channel.isWritable()) {
long id = requestQueue.poll();
OutRequest<?> outRequest = requestMap.get(id);
if (outRequest != null) {
outRequest.setChannelFuture(channel.write(outRequest.getRequest()));
flush = true;
}
}
if (flush) {
channel.flush();
}
} finally {
busy.set(false);
}
// in case there are new request added
if (channel.isWritable() && !requestQueue.isEmpty()) {
sendRequest();
}
}
public Optional<Channel> requestNode() {
Channel channel = connectionQ.pollFirst();
if (channel != null && channel.isActive() && channel.isOpen() && channel.isWritable()) {
connectionQ.addLast(channel);
return Optional.of(channel);
}
connectionRebuild.set(true);
return Optional.empty();
}
/**
* Sends asynchronously and returns a {@link ChannelFuture}
*
* @param isoMessage A message to send
* @return ChannelFuture which will be notified when message is sent
*/
public ChannelFuture sendAsync(IsoMessage isoMessage) {
Channel channel = getChannel();
if (channel == null) {
throw new IllegalStateException("Channel is not opened");
}
if (!channel.isWritable()) {
throw new IllegalStateException("Channel is not writable");
}
return channel.writeAndFlush(isoMessage);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
if (ch.isWritable()) {
channel.onNext(ch);
}
super.handlerAdded(ctx);
}
protected void sendResponse(ChannelHandlerContext ctx) {
if (!busy.compareAndSet(false, true)) {
return;
}
Channel channel = ctx.channel();
try {
boolean flush = false;
while (channel.isWritable()) {
Response res = responseQueue.poll();
if (res == null) {
break;
}
ctx.write(res, channel.voidPromise());
flush = true;
}
if (flush) {
ctx.flush();
}
} finally {
busy.set(false);
}
// in case there are new response added
if (channel.isWritable() && !responseQueue.isEmpty()) {
sendResponse(ctx);
}
}
private void process(Channel channel, FrequencyWidget frequencyWidget,
Profile profile, DashBoard dashBoard, int deviceId, long now) {
if (channel.isWritable()
&& sameDeviceId(profile, dashBoard, frequencyWidget.getDeviceId(), deviceId)
&& frequencyWidget.isTicked(now)) {
frequencyWidget.writeReadingCommand(channel);
tickedWidgets++;
}
}
public void sendAppSyncs(DashBoard dash, Channel appChannel, int targetId, boolean useNewFormat) {
sendPinStorageSyncs(dash, appChannel, targetId, useNewFormat);
for (Widget widget : dash.widgets) {
if (widget instanceof MobileSyncWidget && appChannel.isWritable()) {
((MobileSyncWidget) widget).sendAppSync(appChannel, dash.id, targetId, useNewFormat);
}
}
}
private void sendPinStorageSyncs(DashBoard dash, Channel appChannel, int targetId, boolean useNewFormat) {
for (Map.Entry<DashPinStorageKey, PinStorageValue> entry : pinsStorage.entrySet()) {
DashPinStorageKey key = entry.getKey();
if ((targetId == ANY_TARGET || targetId == key.deviceId)
&& dash.id == key.dashId
&& appChannel.isWritable()) {
PinStorageValue pinStorageValue = entry.getValue();
pinStorageValue.sendAppSync(appChannel, dash.id, key, useNewFormat);
}
}
}
public DefaultUiConnection(Channel channel) {
super("ui", channel);
this.channel = channel;
this.writable = channel.isWritable();
}
private void blockUntilReady() throws IOException {
boolean waitRequired = false;
long startTime = System.nanoTime();
while (true) {
Channel ch = ctx.channel();
if (ch == null || !ch.isActive()) {
throw new ClosedChannelException();
}
if (ch.isWritable()) {
if (waitRequired) {
long waited = System.nanoTime() - startTime;
DOWNLOAD_SLOW_CLIENT_WAIT.update(waited, TimeUnit.NANOSECONDS);
log.trace("waited for {} ns", waited);
}
return;
}
long remaining = 5000000000L;
if (maxBlockTime > 0) {
long elapsed = System.nanoTime() - startTime;
remaining = maxBlockTime - elapsed;
if (remaining <= 0) {
throw new IOException("timeout");
}
}
synchronized (this) {
try {
waitRequired = true;
this.wait(remaining / 1000000L, (int)(remaining % 1000000L));
} catch (InterruptedException ex) {
throw new IOException("interrupted", ex);
}
}
}
}
public static boolean validateChannel(Channel channel) {
Preconditions.checkNotNull(channel, "channel can not be null");
return channel.isActive() && channel.isOpen() && channel.isWritable();
}
public static void messageReceived(Holder holder, ChannelHandlerContext ctx,
MobileStateHolder state, StringMessage message) {
User user = state.user;
String dashBoardIdString = message.body;
int dashId = Integer.parseInt(dashBoardIdString);
log.debug("Activating dash {} for user {}", dashBoardIdString, user.email);
DashBoard dash = user.profile.getDashByIdOrThrow(dashId);
dash.activate();
user.lastModifiedTs = dash.updatedAt;
SessionDao sessionDao = holder.sessionDao;
Session session = sessionDao.get(state.userKey);
if (session.isHardwareConnected(dashId)) {
for (Device device : dash.devices) {
String pmBody = dash.buildPMMessage(device.id);
if (pmBody == null) {
if (!session.isHardwareConnected(dashId, device.id)) {
log.debug("No device in session.");
if (ctx.channel().isWritable() && !dash.isNotificationsOff) {
ctx.write(deviceNotInNetwork(PIN_MODE_MSG_ID), ctx.voidPromise());
}
}
} else {
if (device.fitsBufferSize(pmBody.length())) {
if (session.sendMessageToHardware(dashId, HARDWARE, PIN_MODE_MSG_ID, pmBody, device.id)) {
log.debug("No device in session.");
if (ctx.channel().isWritable() && !dash.isNotificationsOff) {
ctx.write(deviceNotInNetwork(PIN_MODE_MSG_ID), ctx.voidPromise());
}
}
} else {
ctx.write(deviceNotInNetwork(message.id), ctx.voidPromise());
log.warn("PM message is to large for {}, size : {}", user.email, pmBody.length());
}
}
}
ctx.write(ok(message.id), ctx.voidPromise());
} else {
log.debug("No device in session.");
if (dash.isNotificationsOff) {
ctx.write(ok(message.id), ctx.voidPromise());
} else {
ctx.write(deviceNotInNetwork(message.id), ctx.voidPromise());
}
}
ctx.flush();
for (Channel appChannel : session.appChannels) {
//send activate for shared apps
MobileStateHolder mobileStateHolder = getAppState(appChannel);
if (appChannel != ctx.channel() && mobileStateHolder != null && appChannel.isWritable()) {
appChannel.write(makeUTF8StringMessage(message.command, message.id, message.body));
}
boolean isNewSyncFormat = mobileStateHolder != null && mobileStateHolder.isNewSyncFormat();
user.profile.sendAppSyncs(dash, appChannel, ANY_TARGET, isNewSyncFormat);
appChannel.flush();
}
}