下面列出了怎么用org.apache.avro.ipc.Responder的API类实例代码及写法,或者点击链接到github查看源代码。
public static Server startTestFlumeServer(int port) {
Responder responder = new SpecificResponder(AvroSourceProtocol.class,
new OKAvroHandler());
Server server = new NettyServer(responder,
new InetSocketAddress("127.0.0.1", port));
server.start();
LOG.info("Server started on test flume server hostname: localhost, port: " + port);
try {
Thread.sleep(1000L);
} catch (InterruptedException ex) {
LOG.error("Thread interrupted. Exception follows.", ex);
Thread.currentThread().interrupt();
}
return server;
}
@Override
public void onConstruct(EnhancedInstance enhancedInstance, Object[] objects) {
Responder responder = (Responder) enhancedInstance;
Protocol protocol = responder.getLocal();
String prefix = protocol.getNamespace() + "." + protocol.getName() + ".";
responder.addRPCPlugin(new SWServerRPCPlugin(prefix));
}
@Override
public void start() {
logger.info("Starting {}...", this);
Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);
NioServerSocketChannelFactory socketChannelFactory = initSocketChannelFactory();
ChannelPipelineFactory pipelineFactory = initChannelPipelineFactory();
server = new NettyServer(responder, new InetSocketAddress(bindAddress, port),
socketChannelFactory, pipelineFactory, null);
connectionCountUpdater = Executors.newSingleThreadScheduledExecutor();
server.start();
sourceCounter.start();
super.start();
final NettyServer srv = (NettyServer)server;
connectionCountUpdater.scheduleWithFixedDelay(new Runnable(){
@Override
public void run() {
sourceCounter.setOpenConnectionCount(
Long.valueOf(srv.getNumActiveConnections()));
}
}, 0, 60, TimeUnit.SECONDS);
logger.info("Avro source {} started.", getName());
}
@Before
public void setUp() throws Exception {
headers = Maps.newHashMap();
headers.put("key1", "value1");
body = "body".getBytes(Charsets.UTF_8);
int port = findFreePort();
eventCollector = new EventCollector();
Responder responder = new SpecificResponder(AvroSourceProtocol.class,
eventCollector);
nettyServer = new NettyServer(responder,
new InetSocketAddress(HOSTNAME, port));
nettyServer.start();
// give the server a second to start
Thread.sleep(1000L);
properties = Maps.newHashMap();
properties.put("channel.type", "memory");
properties.put("channel.capacity", "200");
properties.put("sinks", "sink1 sink2");
properties.put("sink1.type", "avro");
properties.put("sink2.type", "avro");
properties.put("sink1.hostname", HOSTNAME);
properties.put("sink1.port", String.valueOf(port));
properties.put("sink2.hostname", HOSTNAME);
properties.put("sink2.port", String.valueOf(port));
properties.put("processor.type", "load_balance");
agent = new EmbeddedAgent("test-" + serialNumber.incrementAndGet());
}
public EventCollector(final int port) {
final Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);
nettyServer = new NettyServer(responder, new InetSocketAddress(HOSTNAME, port));
nettyServer.start();
}
public EventCollector(final int port) {
final Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);
nettyServer = new NettyServer(responder, new InetSocketAddress(HOSTNAME, port));
nettyServer.start();
}
public EventCollector(final int port) {
final Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);
System.out.println("Collector listening on port " + port);
nettyServer = new NettyServer(responder, new InetSocketAddress(HOSTNAME, port));
nettyServer.start();
}
public EventCollector(final int port) {
final Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);
nettyServer = new NettyServer(responder, new InetSocketAddress(HOSTNAME, port));
nettyServer.start();
}