下面列出了怎么用org.apache.hadoop.fs.swift.http.SwiftRestClient的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Get the object as an input stream
*
* @param path object path
* @return the input stream -this must be closed to terminate the connection
* @throws IOException IO problems
* @throws FileNotFoundException path doesn't resolve to an object
*/
public HttpBodyContent getObject(Path path) throws IOException {
List<String> locations = getDataLocalEndpoints(path);
for (String url : locations) {
if (LOG.isDebugEnabled()) {
LOG.debug("Reading " + path + " from location: " + url);
}
try {
return swiftRestClient.getData(new URI(url),
SwiftRestClient.NEWEST);
} catch (Exception e) {
// Ignore
// It is possible that endpoint doesn't contains needed data.
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reading " + path + " from proxy node");
}
return swiftRestClient.getData(toObjectPath(path),
SwiftRestClient.NEWEST);
}
private String[] getRawObjectNames() throws Exception {
SwiftRestClient client;
client = SwiftRestClient.getInstance(fs.getUri(), fs.getConf());
SwiftObjectPath path = SwiftObjectPath.fromPath(fs.getUri(), new Path("/"));
byte[] bytes = client.listDeepObjectsInDirectory(path, true, true);
final CollectionType collectionType = JSONUtil.getJsonMapper().
getTypeFactory().constructCollectionType(List.class,
SwiftObjectFileStatus.class);
final List<SwiftObjectFileStatus> fileStatusList =
JSONUtil.toObject(new String(bytes), collectionType);
final ArrayList<String> objects = new ArrayList();
for (SwiftObjectFileStatus status : fileStatusList) {
if (status.getName() != null) {
objects.add(status.getName());
} else if (status.getSubdir() != null) {
objects.add(status.getSubdir());
}
}
return objects.toArray(new String[objects.size()]);
}
private Header[] stat(SwiftObjectPath objectPath, boolean newest) throws
IOException {
Header[] headers;
if (newest) {
headers = swiftRestClient.headRequest("getObjectMetadata-newest",
objectPath, SwiftRestClient.NEWEST);
} else {
headers = swiftRestClient.headRequest("getObjectMetadata",
objectPath);
}
return headers;
}
/**
* Does the object exist
*
* @param path swift object path
* @return true if the metadata of an object could be retrieved
* @throws IOException IO problems other than FileNotFound, which
* is downgraded to an object does not exist return code
*/
public boolean objectExists(SwiftObjectPath path) throws IOException {
try {
Header[] headers = swiftRestClient.headRequest("objectExists",
path,
SwiftRestClient.NEWEST);
//no headers is treated as a missing file
return headers.length != 0;
} catch (FileNotFoundException e) {
return false;
}
}
@Test
public void testPositiveBlocksize() throws Exception {
final Configuration configuration = createCoreConfig();
int size = 127;
configuration.set(SWIFT_BLOCKSIZE, Integer.toString(size));
SwiftRestClient restClient = mkInstance(configuration);
assertEquals(size, restClient.getBlocksizeKB());
}
@Test
public void testLocationAwareTruePropagates() throws Exception {
final Configuration configuration = createCoreConfig();
set(configuration, DOT_LOCATION_AWARE, "true");
SwiftRestClient restClient = mkInstance(configuration);
assertTrue(restClient.isLocationAware());
}
@Test
public void testLocationAwareFalsePropagates() throws Exception {
final Configuration configuration = createCoreConfig();
set(configuration, DOT_LOCATION_AWARE, "false");
SwiftRestClient restClient = mkInstance(configuration);
assertFalse(restClient.isLocationAware());
}
@Test
public void testPositivePartsize() throws Exception {
final Configuration configuration = createCoreConfig();
int size = 127;
configuration.set(SWIFT_PARTITION_SIZE, Integer.toString(size));
SwiftRestClient restClient = mkInstance(configuration);
assertEquals(size, restClient.getPartSizeKB());
}
@Test
public void testProxyData() throws Exception {
final Configuration configuration = createCoreConfig();
String proxy="web-proxy";
int port = 8088;
configuration.set(SWIFT_PROXY_HOST_PROPERTY, proxy);
configuration.set(SWIFT_PROXY_PORT_PROPERTY, Integer.toString(port));
SwiftRestClient restClient = mkInstance(configuration);
assertEquals(proxy, restClient.getProxyHost());
assertEquals(port, restClient.getProxyPort());
}
@Test(timeout = SWIFT_TEST_TIMEOUT)
public void testConvertToPath() throws Throwable {
String initialpath = "/dir/file1";
Path ipath = new Path(initialpath);
SwiftObjectPath objectPath = SwiftObjectPath.fromPath(new URI(initialpath),
ipath);
URI endpoint = new URI(ENDPOINT);
URI uri = SwiftRestClient.pathToURI(objectPath, endpoint);
LOG.info("Inital Hadoop Path =" + initialpath);
LOG.info("Merged URI=" + uri);
}
private Header[] stat(SwiftObjectPath objectPath, boolean newest) throws
IOException {
Header[] headers;
if (newest) {
headers = swiftRestClient.headRequest("getObjectMetadata-newest",
objectPath, SwiftRestClient.NEWEST);
} else {
headers = swiftRestClient.headRequest("getObjectMetadata",
objectPath);
}
return headers;
}
/**
* Does the object exist
*
* @param path swift object path
* @return true if the metadata of an object could be retrieved
* @throws IOException IO problems other than FileNotFound, which
* is downgraded to an object does not exist return code
*/
public boolean objectExists(SwiftObjectPath path) throws IOException {
try {
Header[] headers = swiftRestClient.headRequest("objectExists",
path,
SwiftRestClient.NEWEST);
//no headers is treated as a missing file
return headers.length != 0;
} catch (FileNotFoundException e) {
return false;
}
}
@Test
public void testPositiveBlocksize() throws Exception {
final Configuration configuration = createCoreConfig();
int size = 127;
configuration.set(SWIFT_BLOCKSIZE, Integer.toString(size));
SwiftRestClient restClient = mkInstance(configuration);
assertEquals(size, restClient.getBlocksizeKB());
}
@Test
public void testLocationAwareTruePropagates() throws Exception {
final Configuration configuration = createCoreConfig();
set(configuration, DOT_LOCATION_AWARE, "true");
SwiftRestClient restClient = mkInstance(configuration);
assertTrue(restClient.isLocationAware());
}
@Test
public void testLocationAwareFalsePropagates() throws Exception {
final Configuration configuration = createCoreConfig();
set(configuration, DOT_LOCATION_AWARE, "false");
SwiftRestClient restClient = mkInstance(configuration);
assertFalse(restClient.isLocationAware());
}
@Test
public void testPositivePartsize() throws Exception {
final Configuration configuration = createCoreConfig();
int size = 127;
configuration.set(SWIFT_PARTITION_SIZE, Integer.toString(size));
SwiftRestClient restClient = mkInstance(configuration);
assertEquals(size, restClient.getPartSizeKB());
}
@Test
public void testProxyData() throws Exception {
final Configuration configuration = createCoreConfig();
String proxy="web-proxy";
int port = 8088;
configuration.set(SWIFT_PROXY_HOST_PROPERTY, proxy);
configuration.set(SWIFT_PROXY_PORT_PROPERTY, Integer.toString(port));
SwiftRestClient restClient = mkInstance(configuration);
assertEquals(proxy, restClient.getProxyHost());
assertEquals(port, restClient.getProxyPort());
}
@Test(timeout = SWIFT_TEST_TIMEOUT)
public void testConvertToPath() throws Throwable {
String initialpath = "/dir/file1";
Path ipath = new Path(initialpath);
SwiftObjectPath objectPath = SwiftObjectPath.fromPath(new URI(initialpath),
ipath);
URI endpoint = new URI(ENDPOINT);
URI uri = SwiftRestClient.pathToURI(objectPath, endpoint);
LOG.info("Inital Hadoop Path =" + initialpath);
LOG.info("Merged URI=" + uri);
}
/**
* Initialize the filesystem store -this creates the REST client binding.
*
* @param fsURI URI of the filesystem, which is used to map to the filesystem-specific
* options in the configuration file
* @param configuration configuration
* @throws IOException on any failure.
*/
public void initialize(URI fsURI, Configuration configuration) throws IOException {
this.uri = fsURI;
dnsToSwitchMapping = ReflectionUtils.newInstance(
configuration.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
DNSToSwitchMapping.class), configuration);
this.swiftRestClient = SwiftRestClient.getInstance(fsURI, configuration);
}
private Header[] stat(SwiftObjectPath objectPath, boolean newest) throws
IOException {
Header[] headers;
if (newest) {
headers = swiftRestClient.headRequest("getObjectMetadata-newest",
objectPath, SwiftRestClient.NEWEST);
} else {
headers = swiftRestClient.headRequest("getObjectMetadata",
objectPath);
}
return headers;
}
/**
* Does the object exist
*
* @param path swift object path
* @return true if the metadata of an object could be retrieved
* @throws IOException IO problems other than FileNotFound, which
* is downgraded to an object does not exist return code
*/
public boolean objectExists(SwiftObjectPath path) throws IOException {
try {
Header[] headers = swiftRestClient.headRequest("objectExists",
path,
SwiftRestClient.NEWEST);
//no headers is treated as a missing file
return headers.length != 0;
} catch (FileNotFoundException e) {
return false;
}
}
@Test
public void testPositiveBlocksize() throws Exception {
final Configuration configuration = createCoreConfig();
int size = 127;
configuration.set(SWIFT_BLOCKSIZE, Integer.toString(size));
SwiftRestClient restClient = mkInstance(configuration);
assertEquals(size, restClient.getBlocksizeKB());
}
@Test
public void testLocationAwareTruePropagates() throws Exception {
final Configuration configuration = createCoreConfig();
set(configuration, DOT_LOCATION_AWARE, "true");
SwiftRestClient restClient = mkInstance(configuration);
assertTrue(restClient.isLocationAware());
}
@Test
public void testLocationAwareFalsePropagates() throws Exception {
final Configuration configuration = createCoreConfig();
set(configuration, DOT_LOCATION_AWARE, "false");
SwiftRestClient restClient = mkInstance(configuration);
assertFalse(restClient.isLocationAware());
}
@Test
public void testPositivePartsize() throws Exception {
final Configuration configuration = createCoreConfig();
int size = 127;
configuration.set(SWIFT_PARTITION_SIZE, Integer.toString(size));
SwiftRestClient restClient = mkInstance(configuration);
assertEquals(size, restClient.getPartSizeKB());
}
@Test
public void testProxyData() throws Exception {
final Configuration configuration = createCoreConfig();
String proxy="web-proxy";
int port = 8088;
configuration.set(SWIFT_PROXY_HOST_PROPERTY, proxy);
configuration.set(SWIFT_PROXY_PORT_PROPERTY, Integer.toString(port));
SwiftRestClient restClient = mkInstance(configuration);
assertEquals(proxy, restClient.getProxyHost());
assertEquals(port, restClient.getProxyPort());
}
@Test(timeout = SWIFT_TEST_TIMEOUT)
public void testRenamePseudoDir() throws Throwable {
assumeRenameSupported();
// create file directory (don't create directory file)
SwiftRestClient client;
client = SwiftRestClient.getInstance(fs.getUri(), fs.getConf());
SwiftObjectPath path = SwiftObjectPath.fromPath(fs.getUri(), new Path("/test/olddir/file"));
client.upload(path, new ByteArrayInputStream(new byte[0]), 0);
rename(path("/test/olddir"), path("/test/newdir"), true, false, true);
SwiftTestUtils.assertIsDirectory(fs, path("/test/newdir"));
assertIsFile(path("/test/newdir/file"));
}
@Test(timeout = SWIFT_TEST_TIMEOUT)
public void testConvertToPath() throws Throwable {
String initialpath = "/dir/file1";
Path ipath = new Path(initialpath);
SwiftObjectPath objectPath = SwiftObjectPath.fromPath(new URI(initialpath),
ipath);
URI endpoint = new URI(ENDPOINT);
URI uri = SwiftRestClient.pathToURI(objectPath, endpoint);
LOG.info("Inital Hadoop Path =" + initialpath);
LOG.info("Merged URI=" + uri);
}
@Test(expected = org.apache.hadoop.fs.swift.exceptions.SwiftConfigurationException.class)
public void testNegativePartsize() throws Exception {
final Configuration configuration = createCoreConfig();
configuration.set(SWIFT_PARTITION_SIZE, "-1");
SwiftRestClient restClient = mkInstance(configuration);
}
private SwiftRestClient mkInstance(Configuration configuration) throws
IOException,
URISyntaxException {
URI uri = new URI("swift://container.openstack/");
return SwiftRestClient.getInstance(uri, configuration);
}