下面列出了怎么用io.netty.channel.unix.DomainSocketAddress的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected void doClose() throws Exception {
try {
super.doClose();
} finally {
DomainSocketAddress local = this.local;
if (local != null) {
// Delete the socket file if possible.
File socketFile = new File(local.path());
boolean success = socketFile.delete();
if (!success && logger.isDebugEnabled()) {
logger.debug("Failed to delete a domain socket file: {}", local.path());
}
}
}
}
@Test
public void testPeerCreds() throws IOException {
BsdSocket s1 = BsdSocket.newSocketDomain();
BsdSocket s2 = BsdSocket.newSocketDomain();
try {
DomainSocketAddress dsa = UnixTestUtils.newSocketAddress();
s1.bind(dsa);
s1.listen(1);
assertTrue(s2.connect(dsa));
byte [] addr = new byte[64];
s1.accept(addr);
PeerCredentials pc = s1.getPeerCredentials();
assertNotEquals(pc.uid(), -1);
} finally {
s1.close();
s2.close();
}
}
@Override
protected void doClose() throws Exception {
try {
super.doClose();
} finally {
DomainSocketAddress local = this.local;
if (local != null) {
// Delete the socket file if possible.
File socketFile = new File(local.path());
boolean success = socketFile.delete();
if (!success && logger.isDebugEnabled()) {
logger.debug("Failed to delete a domain socket file: {}", local.path());
}
}
}
}
@Test
public void testPeerCreds() throws IOException {
LinuxSocket s1 = LinuxSocket.newSocketDomain();
LinuxSocket s2 = LinuxSocket.newSocketDomain();
try {
DomainSocketAddress dsa = UnixTestUtils.newSocketAddress();
s1.bind(dsa);
s1.listen(1);
assertTrue(s2.connect(dsa));
byte [] addr = new byte[64];
s1.accept(addr);
PeerCredentials pc = s1.getPeerCredentials();
assertNotEquals(pc.uid(), -1);
} finally {
s1.close();
s2.close();
}
}
/**
* If {@code address} if a ServiceTalk specific address it is unwrapped into a Netty address.
*
* @param address the address to convert.
* @return an address that Netty understands.
*/
public static SocketAddress toNettyAddress(Object address) {
// The order of the instance of checks is important because `DomainSocketAddress` is also of type
// `SocketAddress`, and we want to identify the more specific types before returning the fallback
// `SocketAddress` type.
if (address instanceof io.servicetalk.transport.api.DomainSocketAddress) {
return new DomainSocketAddress(((io.servicetalk.transport.api.DomainSocketAddress) address).getPath());
}
if (address instanceof SocketAddress) {
return (SocketAddress) address;
}
if (address instanceof HostAndPort) {
return toResolvedInetSocketAddress((HostAndPort) address);
}
throw new IllegalArgumentException("Unsupported address: " + address);
}
String toExternalForm() {
StringBuilder sb = new StringBuilder();
SocketAddress address = remoteAddress.get();
if (address instanceof DomainSocketAddress) {
sb.append(((DomainSocketAddress) address).path());
}
else {
sb.append(scheme);
sb.append("://");
sb.append(address != null
? toSocketAddressStringWithoutDefaultPort(address, isSecure())
: "localhost");
sb.append(pathAndQuery);
}
return sb.toString();
}
/**
* Update the provided address with the new host string.
*
* @param address the address supplier
* @param host the new host string
* @return the updated address
*/
public static SocketAddress updateHost(@Nullable Supplier<? extends SocketAddress> address, String host) {
if (address == null) {
return createUnresolved(host, 0);
}
SocketAddress socketAddress = address.get();
if (socketAddress instanceof DomainSocketAddress) {
throw new IllegalArgumentException("Cannot update DomainSocketAddress with host name [" + host + "].");
}
if (!(socketAddress instanceof InetSocketAddress)) {
return createUnresolved(host, 0);
}
InetSocketAddress inet = (InetSocketAddress) socketAddress;
return createUnresolved(host, inet.getPort());
}
/**
* Update the provided address with the new port.
*
* @param address the address supplier
* @param port the new port
* @return the updated address
*/
public static SocketAddress updatePort(@Nullable Supplier<? extends SocketAddress> address, int port) {
if (address == null) {
return createUnresolved(NetUtil.LOCALHOST.getHostAddress(), port);
}
SocketAddress socketAddress = address.get();
if (socketAddress instanceof DomainSocketAddress) {
throw new IllegalArgumentException("Cannot update DomainSocketAddress with post number [" + port + "].");
}
if(!(address.get() instanceof InetSocketAddress)) {
return createUnresolved(NetUtil.LOCALHOST.getHostAddress(), port);
}
InetSocketAddress inet = (InetSocketAddress) address.get();
InetAddress addr = inet.getAddress();
String host = addr == null ? inet.getHostName() : addr.getHostAddress();
return createUnresolved(host, port);
}
@Test(expected = IllegalArgumentException.class)
public void testHttpClientWithDomainSocketsNIOTransport() {
LoopResources loop = LoopResources.create("testHttpClientWithDomainSocketsNIOTransport");
try {
HttpClient.create()
.runOn(loop, false)
.remoteAddress(() -> new DomainSocketAddress("/tmp/test.sock"))
.get()
.uri("/")
.responseContent()
.aggregate()
.block(Duration.ofSeconds(30));
}
finally {
loop.disposeLater()
.block(Duration.ofSeconds(30));
}
}
public KMSEncryptionProvider(final EncryptionConfiguration configuration) {
super();
setChannelInfo();
SslContext sslContext;
try {
sslContext = GrpcSslContexts.forClient()
.trustManager(new ByteArrayInputStream(configuration.getCa().getBytes(UTF_8)))
.build();
} catch (SSLException e) {
throw new RuntimeException(e);
}
blockingStub = KeyManagementServiceGrpc.newBlockingStub(
NettyChannelBuilder.forAddress(new DomainSocketAddress(configuration.getEndpoint()))
.eventLoopGroup(group)
.channelType(channelType)
.keepAliveTime(DEFAULT_KEEPALIVE_TIMEOUT_NANOS, TimeUnit.NANOSECONDS)
.useTransportSecurity()
.sslContext(sslContext)
.overrideAuthority(configuration.getHost())
.build());
}
@Override
protected void doClose() throws Exception {
try {
super.doClose();
} finally {
DomainSocketAddress local = this.local;
if (local != null) {
// Delete the socket file if possible.
File socketFile = new File(local.path());
boolean success = socketFile.delete();
if (!success && logger.isDebugEnabled()) {
logger.debug("Failed to delete a domain socket file: {}", local.path());
}
}
}
}
@Override
public Void call() throws Exception {
OzoneConfiguration ozoneConfiguration = createOzoneConfiguration();
CsiConfig csiConfig = ozoneConfiguration.getObject(CsiConfig.class);
OzoneClient rpcClient = OzoneClientFactory.getRpcClient(ozoneConfiguration);
EpollEventLoopGroup group = new EpollEventLoopGroup();
if (csiConfig.getVolumeOwner().isEmpty()) {
throw new IllegalArgumentException(
"ozone.csi.owner is not set. You should set this configuration "
+ "variable to define which user should own all the created "
+ "buckets.");
}
Server server =
NettyServerBuilder
.forAddress(new DomainSocketAddress(csiConfig.getSocketPath()))
.channelType(EpollServerDomainSocketChannel.class)
.workerEventLoopGroup(group)
.bossEventLoopGroup(group)
.addService(new IdentitiyService())
.addService(new ControllerService(rpcClient,
csiConfig.getDefaultVolumeSize()))
.addService(new NodeService(csiConfig))
.build();
server.start();
server.awaitTermination();
rpcClient.close();
return null;
}
@Test
public void toAddress_uds() throws Exception {
String path = "/tmp/foo";
DomainSocketAddress uds = new DomainSocketAddress(path);
assertEquals(
Address.newBuilder().setUdsAddress(
UdsAddress
.newBuilder()
.setFilename(path))
.build(),
ChannelzProtoUtil.toAddress(uds));
}
@Test
public void socketToProto_unix() throws Exception {
String path = "/some/path";
DomainSocketAddress socketAddress = new DomainSocketAddress(path);
assertEquals(
Address
.newBuilder()
.setType(Type.TYPE_UNIX)
.setAddress("/some/path")
.build(),
BinlogHelper.socketToProto(socketAddress)
);
}
/**
* Parse a {@link SocketAddress} from the given string.
*/
public static SocketAddress parseSocketAddress(String value) {
if (value.startsWith(UNIX_DOMAIN_SOCKET_PREFIX)) {
// Unix Domain Socket address.
// Create the underlying file for the Unix Domain Socket.
String filePath = value.substring(UNIX_DOMAIN_SOCKET_PREFIX.length());
File file = new File(filePath);
if (!file.isAbsolute()) {
throw new IllegalArgumentException("File path must be absolute: " + filePath);
}
try {
if (file.createNewFile()) {
// If this application created the file, delete it when the application exits.
file.deleteOnExit();
}
} catch (IOException ex) {
throw new RuntimeException(ex);
}
// Create the SocketAddress referencing the file.
return new DomainSocketAddress(file);
} else {
// Standard TCP/IP address.
String[] parts = value.split(":", 2);
if (parts.length < 2) {
throw new IllegalArgumentException(
"Address must be a unix:// path or be in the form host:port. Got: " + value);
}
String host = parts[0];
int port = Integer.parseInt(parts[1]);
return new InetSocketAddress(host, port);
}
}
public static DomainSocketAddress newSocketAddress() {
try {
File file;
do {
file = File.createTempFile("NETTY", "UDS");
if (!file.delete()) {
throw new IOException("failed to delete: " + file);
}
} while (file.getAbsolutePath().length() > Socket.UDS_SUN_PATH_SIZE);
return new DomainSocketAddress(file);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (super.doConnect(remoteAddress, localAddress)) {
local = (DomainSocketAddress) localAddress;
remote = (DomainSocketAddress) remoteAddress;
return true;
}
return false;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
socket.bind(localAddress);
socket.listen(config.getBacklog());
local = (DomainSocketAddress) localAddress;
active = true;
}
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (super.doConnect(remoteAddress, localAddress)) {
local = (DomainSocketAddress) localAddress;
remote = (DomainSocketAddress) remoteAddress;
return true;
}
return false;
}
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (super.doConnect(remoteAddress, localAddress)) {
local = (DomainSocketAddress) localAddress;
remote = (DomainSocketAddress) remoteAddress;
return true;
}
return false;
}
/**
* Returns the correct {@link Class} to use with the given {@link EventLoopGroup}.
*
* @param group the {@link EventLoopGroup} for which the class is needed
* @param addressClass The class of the address that the server socket will be bound to.
* @return the class that should be used for bootstrapping
*/
public static Class<? extends ServerChannel> serverChannel(EventLoopGroup group,
Class<? extends SocketAddress> addressClass) {
if (useEpoll(group)) {
return DomainSocketAddress.class.isAssignableFrom(addressClass) ? EpollServerDomainSocketChannel.class :
EpollServerSocketChannel.class;
} else if (useKQueue(group)) {
return DomainSocketAddress.class.isAssignableFrom(addressClass) ? KQueueServerDomainSocketChannel.class :
KQueueServerSocketChannel.class;
} else {
return NioServerSocketChannel.class;
}
}
/**
* Returns the correct {@link Class} to use with the given {@link EventLoopGroup}.
*
* @param group the {@link EventLoopGroup} for which the class is needed
* @param addressClass The class of the address that to connect to.
* @return the class that should be used for bootstrapping
*/
public static Class<? extends Channel> socketChannel(EventLoopGroup group,
Class<? extends SocketAddress> addressClass) {
if (useEpoll(group)) {
return DomainSocketAddress.class.isAssignableFrom(addressClass) ? EpollDomainSocketChannel.class :
EpollSocketChannel.class;
} else if (useKQueue(group)) {
return DomainSocketAddress.class.isAssignableFrom(addressClass) ? KQueueDomainSocketChannel.class :
KQueueSocketChannel.class;
} else {
return NioSocketChannel.class;
}
}
public static HttpBlobStore create(
DomainSocketAddress domainSocketAddress,
URI uri,
int timeoutMillis,
int remoteMaxConnections,
@Nullable final Credentials creds)
throws ConfigurationException, URISyntaxException, SSLException {
if (KQueue.isAvailable()) {
return new HttpBlobStore(
KQueueEventLoopGroup::new,
KQueueDomainSocketChannel.class,
uri,
timeoutMillis,
remoteMaxConnections,
creds,
domainSocketAddress);
} else if (Epoll.isAvailable()) {
return new HttpBlobStore(
EpollEventLoopGroup::new,
EpollDomainSocketChannel.class,
uri,
timeoutMillis,
remoteMaxConnections,
creds,
domainSocketAddress);
} else {
throw new ConfigurationException("Unix domain sockets are unsupported on this platform");
}
}
/**
* Connect a {@link Channel} to the remote peer.
*
* @param config the transport configuration
* @param remoteAddress the {@link SocketAddress} to connect to
* @param resolverGroup the resolver which will resolve the address of the unresolved named address
* @param channelInitializer the {@link ChannelInitializer} that will be used for initializing the channel pipeline
* @return a {@link Mono} of {@link Channel}
*/
public static Mono<Channel> connect(TransportConfig config, SocketAddress remoteAddress,
AddressResolverGroup<?> resolverGroup, ChannelInitializer<Channel> channelInitializer) {
Objects.requireNonNull(config, "config");
Objects.requireNonNull(remoteAddress, "remoteAddress");
Objects.requireNonNull(resolverGroup, "resolverGroup");
Objects.requireNonNull(channelInitializer, "channelInitializer");
return doInitAndRegister(config, channelInitializer, remoteAddress instanceof DomainSocketAddress)
.flatMap(channel -> doResolveAndConnect(channel, config, remoteAddress, resolverGroup));
}
@Test(expected = IllegalArgumentException.class)
public void testUdpServerWithDomainSocketsWithHost() {
UdpServer.create()
.bindAddress(() -> new DomainSocketAddress("/tmp/test.sock"))
.host("localhost")
.bindNow();
}
@Test(expected = IllegalArgumentException.class)
public void testUdpServerWithDomainSocketsWithPort() {
UdpServer.create()
.bindAddress(() -> new DomainSocketAddress("/tmp/test.sock"))
.port(1234)
.bindNow();
}
@Test(expected = IllegalArgumentException.class)
public void testUdpClientWithDomainSocketsWithHost() {
UdpClient.create()
.remoteAddress(() -> new DomainSocketAddress("/tmp/test.sock"))
.host("localhost")
.connectNow();
}
@Test(expected = IllegalArgumentException.class)
public void testUdpClientWithDomainSocketsWithPort() {
UdpClient.create()
.remoteAddress(() -> new DomainSocketAddress("/tmp/test.sock"))
.port(1234)
.connectNow();
}
@Test(expected = ChannelBindException.class)
public void testTcpServerWithDomainSocketsNIOTransport() {
LoopResources loop = LoopResources.create("testTcpServerWithDomainSocketsNIOTransport");
try {
TcpServer.create()
.runOn(loop, false)
.bindAddress(() -> new DomainSocketAddress("/tmp/test.sock"))
.bindNow();
}
finally {
loop.disposeLater()
.block(Duration.ofSeconds(30));
}
}
@Test(expected = IllegalArgumentException.class)
public void testTcpServerWithDomainSocketsWithHost() {
TcpServer.create()
.bindAddress(() -> new DomainSocketAddress("/tmp/test.sock"))
.host("localhost")
.bindNow();
}