下面列出了怎么用org.eclipse.lsp4j.jsonrpc.MessageConsumer的API类实例代码及写法,或者点击链接到github查看源代码。
private void createClientConnection(AsynchronousSocketChannel socketChannel) {
Injector injector = Guice.createInjector(getGLSPModule());
GsonConfigurator gsonConf = injector.getInstance(GsonConfigurator.class);
InputStream in = Channels.newInputStream(socketChannel);
OutputStream out = Channels.newOutputStream(socketChannel);
Consumer<GsonBuilder> configureGson = (GsonBuilder builder) -> gsonConf.configureGsonBuilder(builder);
Function<MessageConsumer, MessageConsumer> wrapper = Function.identity();
GLSPServer languageServer = injector.getInstance(GLSPServer.class);
Launcher<GLSPClient> launcher = Launcher.createIoLauncher(languageServer, GLSPClient.class, in, out, threadPool,
wrapper, configureGson);
languageServer.connect(launcher.getRemoteProxy());
launcher.startListening();
try {
SocketAddress remoteAddress = socketChannel.getRemoteAddress();
log.info("Started language server for client " + remoteAddress);
} catch (IOException ex) {
log.error("Failed to get the remoteAddress for the new client connection: " + ex.getMessage(), ex);
}
}
/**
* Launches {@link XMLLanguageServer} using asynchronous server-socket channel and makes it accessible through the JSON
* RPC protocol defined by the LSP.
*
* @param args standard launch arguments. may contain <code>--port</code> argument to change the default port 5008
*/
public void launch(String[] args) throws Exception {
AsynchronousServerSocketChannel _open = AsynchronousServerSocketChannel.open();
int _port = getPort(args);
InetSocketAddress _inetSocketAddress = new InetSocketAddress("0.0.0.0", _port);
final AsynchronousServerSocketChannel serverSocket = _open.bind(_inetSocketAddress);
while (true) {
final AsynchronousSocketChannel socketChannel = serverSocket.accept().get();
final InputStream in = Channels.newInputStream(socketChannel);
final OutputStream out = Channels.newOutputStream(socketChannel);
final ExecutorService executorService = Executors.newCachedThreadPool();
XMLLanguageServer languageServer = new XMLLanguageServer();
final Launcher<LanguageClient> launcher = Launcher.createIoLauncher(languageServer, LanguageClient.class,
in, out, executorService, (MessageConsumer it) -> {
return it;
});
languageServer.setClient(launcher.getRemoteProxy());
launcher.startListening();
}
}
/**
* @param out
* a consumer that transmits messages to the remote service
* @param localEndpoint
* the local service implementation
* @param exceptionHandler
* an exception handler that should never return null.
*/
@SuppressWarnings("unchecked")
public PatchedRemoteEndpoint(MessageConsumer out, Endpoint localEndpoint,
Function<Throwable, ResponseError> exceptionHandler) {
super(out, localEndpoint, exceptionHandler);
this.localEndpoint = localEndpoint;
this.exceptionHandler = exceptionHandler;
this.out = out;
Field field;
try {
field = RemoteEndpoint.class.getDeclaredField("receivedRequestMap");
field.setAccessible(true);
receivedRequestMap = (Map<String, CompletableFuture<?>>) field.get(this);
} catch (NoSuchFieldException | SecurityException | IllegalArgumentException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
@Test
public void testExceptionInOutputStream() throws Exception {
LogMessageAccumulator logMessages = new LogMessageAccumulator();
try {
logMessages.registerTo(RemoteEndpoint.class);
TestEndpoint endp = new TestEndpoint();
MessageConsumer consumer = new MessageConsumer() {
@Override
public void consume(Message message) throws JsonRpcException {
throw new JsonRpcException(new SocketException("Permission denied: connect"));
}
};
RemoteEndpoint endpoint = new RemoteEndpoint(consumer, endp);
endpoint.notify("foo", null);
logMessages.await(Level.WARNING, "Failed to send notification message.");
} finally {
logMessages.unregister();
}
}
@Test
public void testOutputStreamClosed() throws Exception {
LogMessageAccumulator logMessages = new LogMessageAccumulator();
try {
logMessages.registerTo(RemoteEndpoint.class);
TestEndpoint endp = new TestEndpoint();
MessageConsumer consumer = new MessageConsumer() {
@Override
public void consume(Message message) throws JsonRpcException {
throw new JsonRpcException(new SocketException("Socket closed"));
}
};
RemoteEndpoint endpoint = new RemoteEndpoint(consumer, endp);
endpoint.notify("foo", null);
logMessages.await(Level.INFO, "Failed to send notification message.");
} finally {
logMessages.unregister();
}
}
@Override
public MessageConsumer apply(MessageConsumer messageConsumer) {
return message -> {
if(isRunning.getAsBoolean()) {
handleMessage(message);
messageConsumer.consume(message);
}
};
}
@Override
public MessageConsumer apply(final MessageConsumer consumer) {
//inject our own consumer to refresh the timestamp
return message -> {
lastActivityTime=System.currentTimeMillis();
consumer.consume(message);
};
}
/**
* Launches {@link XMLLanguageServer} and makes it accessible through the JSON
* RPC protocol defined by the LSP.
*
* @param launcherFuture The future returned by
* {@link org.eclipse.lsp4j.jsonrpc.Launcher#startListening()}.
* (I'm not 100% sure how it meant to be used though, as
* it's undocumented...)
*/
public static Future<?> launch(InputStream in, OutputStream out) {
XMLLanguageServer server = new XMLLanguageServer();
Function<MessageConsumer, MessageConsumer> wrapper;
if ("false".equals(System.getProperty("watchParentProcess"))) {
wrapper = it -> it;
} else {
wrapper = new ParentProcessWatcher(server);
}
Launcher<LanguageClient> launcher = createServerLauncher(server, in, out, Executors.newCachedThreadPool(), wrapper);
server.setClient(launcher.getRemoteProxy());
return launcher.startListening();
}
@Override
protected RemoteEndpoint createRemoteEndpoint(MessageJsonHandler jsonHandler) {
MessageConsumer outgoingMessageStream = new StreamMessageConsumer(output, jsonHandler);
outgoingMessageStream = wrapMessageConsumer(outgoingMessageStream);
Endpoint localEndpoint = ServiceEndpoints.toEndpoint(localServices);
RemoteEndpoint remoteEndpoint;
if (exceptionHandler == null)
remoteEndpoint = new PatchedRemoteEndpoint(outgoingMessageStream, localEndpoint);
else
remoteEndpoint = new PatchedRemoteEndpoint(outgoingMessageStream, localEndpoint, exceptionHandler);
jsonHandler.setMethodProvider(remoteEndpoint);
return remoteEndpoint;
}
static <T> Launcher<T> createSocketLauncher(Object localService, Class<T> remoteInterface, SocketAddress socketAddress, ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper) throws IOException {
AsynchronousServerSocketChannel serverSocket = AsynchronousServerSocketChannel.open().bind(socketAddress);
AsynchronousSocketChannel socketChannel;
try {
socketChannel = serverSocket.accept().get();
return Launcher.createIoLauncher(localService, remoteInterface, Channels.newInputStream(socketChannel), Channels.newOutputStream(socketChannel), executorService, wrapper);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return null;
}
static <T> Launcher<T> createSocketLauncher(Object localService, Class<T> remoteInterface,
SocketAddress socketAddress, ExecutorService executorService,
Function<MessageConsumer, MessageConsumer> wrapper, InputStream inputStream, OutputStream outputStream)
throws IOException {
return Launcher.createIoLauncher(localService, remoteInterface, inputStream, outputStream, executorService,
wrapper);
}
@Override
public MessageConsumer apply(final MessageConsumer consumer) {
//inject our own consumer to refresh the timestamp
return message -> {
lastActivityTime=System.currentTimeMillis();
consumer.consume(message);
};
}
@Override
protected RemoteEndpoint createRemoteEndpoint(MessageJsonHandler jsonHandler) {
MessageConsumer outgoingMessageStream = new WebSocketMessageConsumer(session, jsonHandler);
outgoingMessageStream = wrapMessageConsumer(outgoingMessageStream);
Endpoint localEndpoint = ServiceEndpoints.toEndpoint(localServices);
RemoteEndpoint remoteEndpoint;
if (exceptionHandler == null)
remoteEndpoint = new RemoteEndpoint(outgoingMessageStream, localEndpoint);
else
remoteEndpoint = new RemoteEndpoint(outgoingMessageStream, localEndpoint, exceptionHandler);
jsonHandler.setMethodProvider(remoteEndpoint);
return remoteEndpoint;
}
/**
* Create a new Launcher for a language server and an input and output stream. Threads are started with the given
* executor service. The wrapper function is applied to the incoming and outgoing message streams so additional
* message handling such as validation and tracing can be included.
*
* @param server - the server that receives method calls from the remote client
* @param in - input stream to listen for incoming messages
* @param out - output stream to send outgoing messages
* @param executorService - the executor service used to start threads
* @param wrapper - a function for plugging in additional message consumers
*/
public static Launcher<LanguageClient> createServerLauncher(LanguageServer server, InputStream in, OutputStream out,
ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper) {
return new Builder<LanguageClient>()
.setLocalService(server)
.setRemoteInterface(LanguageClient.class)
.setInput(in)
.setOutput(out)
.setExecutorService(executorService)
.wrapMessages(wrapper)
.create();
}
/**
* Create a new Launcher for a language client and an input and output stream. Threads are started with the given
* executor service. The wrapper function is applied to the incoming and outgoing message streams so additional
* message handling such as validation and tracing can be included.
*
* @param client - the client that receives method calls from the remote server
* @param in - input stream to listen for incoming messages
* @param out - output stream to send outgoing messages
* @param executorService - the executor service used to start threads
* @param wrapper - a function for plugging in additional message consumers
*/
public static Launcher<LanguageServer> createClientLauncher(LanguageClient client, InputStream in, OutputStream out,
ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper) {
return new Builder<LanguageServer>()
.setLocalService(client)
.setRemoteInterface(LanguageServer.class)
.setInput(in)
.setOutput(out)
.setExecutorService(executorService)
.wrapMessages(wrapper)
.create();
}
@Test
public void testNullResponse() throws InterruptedException, ExecutionException {
LogMessageAccumulator logMessages = new LogMessageAccumulator();
try {
logMessages.registerTo(GenericEndpoint.class);
Endpoint endpoint = ServiceEndpoints.toEndpoint(this);
Map<String, JsonRpcMethod> methods = ServiceEndpoints.getSupportedMethods(LanguageServer.class);
MessageJsonHandler handler = new MessageJsonHandler(methods);
List<Message> msgs = new ArrayList<>();
MessageConsumer consumer = (message) -> {
msgs.add(message);
};
RemoteEndpoint re = new RemoteEndpoint(consumer, endpoint);
RequestMessage request = new RequestMessage();
request.setId("1");
request.setMethod("shutdown");
re.consume(request);
Assert.assertEquals("{\"jsonrpc\":\"2.0\",\"id\":\"1\",\"result\":null}", handler.serialize(msgs.get(0)));
msgs.clear();
shutdownReturn = new Object();
re.consume(request);
Assert.assertEquals("{\"jsonrpc\":\"2.0\",\"id\":\"1\",\"result\":{}}", handler.serialize(msgs.get(0)));
} finally {
logMessages.unregister();
}
}
@Override
protected RemoteEndpoint createRemoteEndpoint(MessageJsonHandler jsonHandler) {
MessageConsumer outgoingMessageStream = new StreamMessageConsumer(output, jsonHandler);
outgoingMessageStream = wrapMessageConsumer(outgoingMessageStream);
Endpoint localEndpoint = ServiceEndpoints.toEndpoint(localServices);
RemoteEndpoint remoteEndpoint;
if (exceptionHandler == null)
remoteEndpoint = new DebugRemoteEndpoint(outgoingMessageStream, localEndpoint);
else
remoteEndpoint = new DebugRemoteEndpoint(outgoingMessageStream, localEndpoint, exceptionHandler);
jsonHandler.setMethodProvider(remoteEndpoint);
return remoteEndpoint;
}
static <T> Builder<T> createBuilder(MessageContextStore<T> store) {
return new Builder<T>() {
@Override
protected ConcurrentMessageProcessor createMessageProcessor(MessageProducer reader,
MessageConsumer messageConsumer, T remoteProxy) {
return new CustomConcurrentMessageProcessor<T>(reader, messageConsumer, remoteProxy, store);
}
};
}
public WebSocketMessageHandler(MessageConsumer callback, MessageJsonHandler jsonHandler, MessageIssueHandler issueHandler) {
this.callback = callback;
this.jsonHandler = jsonHandler;
this.issueHandler = issueHandler;
}
protected void addMessageHandlers(MessageJsonHandler jsonHandler, RemoteEndpoint remoteEndpoint) {
MessageConsumer messageConsumer = wrapMessageConsumer(remoteEndpoint);
session.addMessageHandler(new WebSocketMessageHandler(messageConsumer, jsonHandler, remoteEndpoint));
}
public DebugRemoteEndpoint(MessageConsumer out, Endpoint localEndpoint) {
super(out, localEndpoint);
}
public DebugRemoteEndpoint(MessageConsumer out, Endpoint localEndpoint,
Function<Throwable, ResponseError> exceptionHandler) {
super(out, localEndpoint, exceptionHandler);
}
/**
* Forward messages to the given consumer unless an issue is found.
*/
public ReflectiveMessageValidator(MessageConsumer delegate) {
this.delegate = delegate;
}
public ConcurrentMessageProcessor(MessageProducer messageProducer, MessageConsumer messageConsumer) {
this.messageProducer = messageProducer;
this.messageConsumer = messageConsumer;
}
@Override
public void listen(MessageConsumer callback) {
if (keepRunning) {
throw new IllegalStateException("This StreamMessageProducer is already running.");
}
this.keepRunning = true;
this.callback = callback;
try {
StringBuilder headerBuilder = null;
StringBuilder debugBuilder = null;
boolean newLine = false;
Headers headers = new Headers();
while (keepRunning) {
int c = input.read();
if (c == -1) {
// End of input stream has been reached
keepRunning = false;
} else {
if (debugBuilder == null)
debugBuilder = new StringBuilder();
debugBuilder.append((char) c);
if (c == '\n') {
if (newLine) {
// Two consecutive newlines have been read, which signals the start of the message content
if (headers.contentLength < 0) {
fireError(new IllegalStateException("Missing header " + CONTENT_LENGTH_HEADER
+ " in input \"" + debugBuilder + "\""));
} else {
boolean result = handleMessage(input, headers);
if (!result)
keepRunning = false;
newLine = false;
}
headers = new Headers();
debugBuilder = null;
} else if (headerBuilder != null) {
// A single newline ends a header line
parseHeader(headerBuilder.toString(), headers);
headerBuilder = null;
}
newLine = true;
} else if (c != '\r') {
// Add the input to the current header line
if (headerBuilder == null)
headerBuilder = new StringBuilder();
headerBuilder.append((char) c);
newLine = false;
}
}
} // while (keepRunning)
} catch (IOException exception) {
if (JsonRpcException.indicatesStreamClosed(exception)) {
// Only log the error if we had intended to keep running
if (keepRunning)
fireStreamClosed(exception);
} else
throw new JsonRpcException(exception);
} finally {
this.callback = null;
this.keepRunning = false;
}
}
public CustomConcurrentMessageProcessor(MessageProducer reader, MessageConsumer messageConsumer,
T remoteProxy, MessageContextStore<T> threadMap) {
super(reader, messageConsumer);
this.remoteProxy = remoteProxy;
this.threadMap = threadMap;
}
/**
* Create a new Launcher for a language server and an input and output stream.
* Threads are started with the given executor service. The wrapper function is
* applied to the incoming and outgoing message streams so additional message
* handling such as validation and tracing can be included.
*
* @param server - the server that receives method calls from the
* remote client
* @param in - input stream to listen for incoming messages
* @param out - output stream to send outgoing messages
* @param executorService - the executor service used to start threads
* @param wrapper - a function for plugging in additional message
* consumers
*/
private static Launcher<LanguageClient> createServerLauncher(LanguageServer server, InputStream in, OutputStream out,
ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper) {
return new Builder<LanguageClient>().
setLocalService(server)
.setRemoteInterface(XMLLanguageClientAPI.class) // Set client as XML language client
.setInput(in)
.setOutput(out)
.setExecutorService(executorService)
.wrapMessages(wrapper)
.create();
}
/**
* Create a new Launcher for a given local service object, a given remote
* interface and an input and output stream. Threads are started with the given
* executor service. The wrapper function is applied to the incoming and
* outgoing message streams so additional message handling such as validation
* and tracing can be included.
*
* @param localService
* - an object on which classes RPC methods are looked up
* @param remoteInterface
* - an interface on which RPC methods are looked up
* @param in
* - inputstream to listen for incoming messages
* @param out
* - outputstream to send outgoing messages
* @param executorService
* - the executor service used to start threads
* @param wrapper
* - a function for plugging in additional message consumers
*/
public static <T> Launcher<T> createIoLauncher(Object localService, Class<T> remoteInterface, InputStream in,
OutputStream out, ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper) {
return new Builder<T>()
.setLocalService(localService)
.setRemoteInterface(remoteInterface)
.setInput(in)
.setOutput(out)
.setExecutorService(executorService)
.wrapMessages(wrapper)
.create();
}
/**
* Create a new Launcher for a given local service object, a given remote
* interface and an input and output stream. Threads are started with the given
* executor service. The wrapper function is applied to the incoming and
* outgoing message streams so additional message handling such as validation
* and tracing can be included. The {@code configureGson} function can be used
* to register additional type adapters in the {@link GsonBuilder} in order to
* support protocol classes that cannot be handled by Gson's reflective
* capabilities.
*
* @param localService
* - an object on which classes RPC methods are looked up
* @param remoteInterface
* - an interface on which RPC methods are looked up
* @param in
* - inputstream to listen for incoming messages
* @param out
* - outputstream to send outgoing messages
* @param executorService
* - the executor service used to start threads
* @param wrapper
* - a function for plugging in additional message consumers
* @param configureGson
* - a function for Gson configuration
*/
public static <T> Launcher<T> createIoLauncher(Object localService, Class<T> remoteInterface, InputStream in,
OutputStream out, ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper,
Consumer<GsonBuilder> configureGson) {
return new Builder<T>()
.setLocalService(localService)
.setRemoteInterface(remoteInterface)
.setInput(in)
.setOutput(out)
.setExecutorService(executorService)
.wrapMessages(wrapper)
.configureGson(configureGson)
.create();
}
/**
* Start a thread that listens for messages in the message producer and forwards them to the message consumer.
*
* @param messageProducer - produces messages, e.g. by reading from an input channel
* @param messageConsumer - processes messages and potentially forwards them to other consumers
* @param executorService - the thread is started using this service
* @return a future that is resolved when the started thread is terminated, e.g. by closing a stream
* @deprecated Please use the non-static ConcurrentMessageProcessor.beginProcessing() instead.
*/
@Deprecated
public static Future<Void> startProcessing(MessageProducer messageProducer, MessageConsumer messageConsumer,
ExecutorService executorService) {
ConcurrentMessageProcessor reader = new ConcurrentMessageProcessor(messageProducer, messageConsumer);
final Future<?> result = executorService.submit(reader);
return wrapFuture(result, messageProducer);
}