下面列出了javax.naming.SizeLimitExceededException#org.zeromq.ZMQException 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void start() {
routerSocket = context.createSocket(ZMQ.ROUTER);
routerSocket.bind(String.format("tcp://%s:%d", bindHost, bindPort));
serverThread = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
byte[] packet = routerSocket.recv();
if (Arrays.equals(packet, "testClient".getBytes())) {
routerSocket.sendMore(packet);
continue;
}
Message message = new Message(packet);
routerSocket.send(message.getBytes(), 0);
}
} catch (ZMQException ex) {
// ignore ZMQException, it may be interrupted.
} catch (IOException ex) {
logger.error(ex.getMessage());
}
}
});
serverThread.start();
}
private byte[] getServerRPCResponse(String request) throws SizeLimitExceededException {
byte[] finalRequest = this.dataCompressor.compressStringToBytes(request);
if (finalRequest.length >= MAX_REQUEST_SIZE) {
throw new SizeLimitExceededException(MessageFormat.format("Size of request is too large (limit is {0} bytes)", MAX_REQUEST_SIZE));
}
byte[] serverResponse;
boolean success;
synchronized (sendRequestMonitor) {
if (connectionTimeout.get()) {
return null;
}
try {
success = requester.send(finalRequest);
} catch (ZMQException e) {
if (e.getErrorCode() == ZError.EFSM) {
success = resend(finalRequest);
} else {
throw e;
}
}
if (success) {
serverResponse = requester.recv(0);
if (serverResponse == null) {
if (requester.base().errno() == ZError.EAGAIN) {
int retries = timeout / INTERNAL_TIMEOUT;
while (serverResponse == null && retries > 0) {
if (connectionTimeout.get()) {
return null;
}
retries--;
serverResponse = requester.recv(0);
}
if (retries == 0 && resend(finalRequest)) {
serverResponse = requester.recv(0);
}
} else {
LOG.error("Error sending request");
}
}
} else {
LOG.error("Error sending request");
return null;
}
}
return serverResponse == null
? null
: dataCompressor.decompressBytesToString(serverResponse).getBytes();
}
boolean createSockets() {
ctx = new ZContext();
String ip = (String) connectionData.get("ip");
String transport = (String) connectionData.get("transport");
try {
// http://jupyter-client.readthedocs.org/en/latest/messaging.html#heartbeat-for-kernels
//
// Clients send ping messages on a REQ socket, which are echoed right back from the Kernel’s
// REP socket. These are simple bytestrings, not full JSON messages described above.
Heartbeat = ctx.createSocket(ZMQ.REP);
Heartbeat.bind(String.format("%s://%s:%s",
transport, ip, connectionData.get("hb_port")
));
// http://jupyter-client.readthedocs.org/en/latest/messaging.html#introduction
// Shell: this single ROUTER socket allows multiple incoming connections from frontends, and
// this is the socket where requests for code execution, object information, prompts, etc.
// are made to the kernel by any frontend.
String shellAddress = String.format("%s://%s:%s",
transport, ip, connectionData.get("shell_port"));
Shell = ctx.createSocket(ZMQ.ROUTER);
Shell.bind(shellAddress);
// Control: This channel is identical to Shell, but operates on a separate socket, to allow
// important messages to avoid queueing behind execution requests (e.g. shutdown or abort).
Control = ctx.createSocket(ZMQ.ROUTER);
Control.bind(String.format("%s://%s:%s",
transport, ip, connectionData.get("control_port")
));
// IOPub: this socket is the ‘broadcast channel’ where the kernel publishes all side effects
// (stdout, stderr, etc.) as well as the requests coming from any client over the shell socket
// send its own requests on the stdin socket.
IOPub = ctx.createSocket(ZMQ.PUB);
IOPub.bind(String.format("%s://%s:%s",
transport, ip, connectionData.get("iopub_port")
));
Stdin = ctx.createSocket(ZMQ.ROUTER);
Stdin.connect(String.format("%s://%s:%s",
transport, ip, connectionData.get("stdin_port")
));
} catch (ZMQException e) {
closeSockets();
Logger.getLogger(Session.class.getName()).log(Level.SEVERE, null, e);
}
sockets = new ZMQ.Poller(4);
sockets.register(Control, ZMQ.Poller.POLLIN);
sockets.register(Heartbeat, ZMQ.Poller.POLLIN);
sockets.register(Shell, ZMQ.Poller.POLLIN);
sockets.register(Stdin, ZMQ.Poller.POLLIN);
return true;
}