下面列出了com.google.common.collect.DiscreteDomains#org.kitesdk.data.Datasets 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testAbsoluteTrailingSlash() {
DatasetRepository repo = DatasetRepositories
.repositoryFor("repo:hdfs://" + hdfsAuth + "/tmp/data/");
repo.delete("ns", "test");
repo.create("ns", "test", descriptor);
Dataset<Object> ds = Datasets.<Object, Dataset<Object>>
load("dataset:hdfs://" + hdfsAuth + "/tmp/data/ns/test/", Object.class);
Assert.assertNotNull("Should load dataset", ds);
Assert.assertTrue(ds instanceof FileSystemDataset);
Assert.assertEquals("Locations should match",
URI.create("hdfs://" + hdfsAuth + "/tmp/data/ns/test"),
ds.getDescriptor().getLocation());
Assert.assertEquals("Descriptors should match",
repo.load("ns", "test").getDescriptor(), ds.getDescriptor());
Assert.assertEquals("Should report correct namespace",
"ns", ds.getNamespace());
Assert.assertEquals("Should report correct name",
"test", ds.getName());
repo.delete("ns", "test");
}
@Test
public void testAbsoluteRoot() {
DatasetRepository repo = DatasetRepositories
.repositoryFor("repo:hdfs://" + hdfsAuth + "/");
repo.delete("ns", "test");
repo.create("ns", "test", descriptor);
Dataset<Object> ds = Datasets.<Object, Dataset<Object>>
load("dataset:hdfs://" + hdfsAuth + "/ns/test",
Object.class);
Assert.assertNotNull("Should load dataset", ds);
Assert.assertTrue(ds instanceof FileSystemDataset);
Assert.assertEquals("Locations should match",
URI.create("hdfs://" + hdfsAuth + "/ns/test"),
ds.getDescriptor().getLocation());
Assert.assertEquals("Descriptors should match",
repo.load("ns", "test").getDescriptor(), ds.getDescriptor());
Assert.assertEquals("Should report correct namespace",
"ns", ds.getNamespace());
Assert.assertEquals("Should report correct name",
"test", ds.getName());
repo.delete("ns", "test");
}
@Test
public void testDeleteWithDefaultNamespace() throws IOException {
Assert.assertTrue("Delete should succeed if the location matches",
Datasets.delete("dataset:hive:/tmp/datasets/test"));
Assert.assertFalse("Delete should return false if there is no dataset",
Datasets.delete("dataset:hive:/tmp/datasets/test"));
// recreate the default.test dataset, but with a different storage location
DatasetDescriptor doNotDelete = new DatasetDescriptor.Builder(descriptor)
.location(URI.create("file:/tmp/datasets/default/test"))
.build();
metastore.createTable(HiveUtils.tableForDescriptor(
"default", "test", doNotDelete, true));
Assert.assertFalse("Delete should not find a dataset to delete",
Datasets.delete("dataset:hive:/tmp/datasets/test"));
Assert.assertTrue("Delete should not change the dataset",
metastore.exists("default", "test"));
}
@Before
public void createTestDatasets() {
Datasets.delete("dataset:file:/tmp/datasets/unpartitioned");
Datasets.delete("dataset:file:/tmp/datasets/partitioned");
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schema(TestRecord.class)
.build();
unpartitioned = Datasets.create("dataset:file:/tmp/datasets/unpartitioned",
descriptor, TestRecord.class);
descriptor = new DatasetDescriptor.Builder(descriptor)
.partitionStrategy(new PartitionStrategy.Builder()
.hash("id", 4)
.build())
.build();
partitioned = Datasets.create("dataset:file:/tmp/datasets/partitioned",
descriptor, TestRecord.class);
writeTestRecords(unpartitioned);
writeTestRecords(partitioned);
}
@Test
public void testURIStringEquality() {
for(int i = 0; i < 10; i++) {
String a = UUID.randomUUID().toString();
String b = UUID.randomUUID().toString();
String originalUri = "view:file:/tmp/test_name?color="+ a + "," + b;
View<GenericRecord> view = Datasets.load(originalUri);
String afterUri = view.getUri().toString();
if(!originalUri.equals(afterUri)) {
System.out.println("Iteration: " + i);
System.out.println("Start: " + originalUri);
System.out.println("End : " + afterUri);
}
Assert.assertEquals(originalUri, afterUri);
}
}
@Test
public void testExternalRelative() {
DatasetRepository repo = DatasetRepositories.repositoryFor("repo:hive:data");
repo.delete("ns", "test");
repo.create("ns", "test", descriptor);
Dataset<GenericRecord> ds = Datasets.load("dataset:hive:data/ns/test");
Assert.assertNotNull("Should load dataset", ds);
Assert.assertTrue(ds instanceof FileSystemDataset);
Path cwd = getDFS().makeQualified(new Path("."));
Assert.assertEquals("Locations should match",
new Path(cwd, "data/ns/test").toUri(), ds.getDescriptor().getLocation());
Assert.assertEquals("Descriptors should match",
repo.load("ns", "test").getDescriptor(), ds.getDescriptor());
repo.delete("ns", "test");
}
@Test
public void testLoadChangedRelativePathURI() {
// this used to be a relative external URI, but is now a managed URI
String uri = "dataset:hive:ns/ds";
DatasetRepository repo = DatasetRepositories
.repositoryFor("repo:hive:/tmp/data");
Dataset<GenericRecord> expected = repo.create(
"ns", "ds", DESCRIPTOR, GenericRecord.class);
Dataset<GenericRecord> actual = Datasets.load(uri);
Assert.assertEquals("Should load existing dataset ns.ds",
expected, actual);
Assert.assertEquals("URI should use apparent namespace",
"dataset:hive:ns/ds", actual.getUri().toString());
Assert.assertTrue(Datasets.delete(uri));
}
@Test
public void testCreateChangedAbsolutePathURIWithDescriptorLocation() {
String uri = "dataset:hive:/ns/ds";
Datasets.create(uri, new DatasetDescriptor.Builder(DESCRIPTOR)
.location("file:/tmp/data/ns/ds")
.build());
Table table = metastore.getTable("ns", "ds");
Assert.assertNotNull("Table should be found under ns.ds", table);
Assert.assertTrue("Should create an external table",
HiveAbstractMetadataProvider.isExternal(table));
Assert.assertTrue(Datasets.delete(uri));
}
@Override
public int run(String[] args) throws Exception {
// Load the users dataset
Dataset<Record> users = Datasets.load(
"dataset:hive?dataset=users", Record.class);
// Get a reader for the dataset and read all the users
DatasetReader<Record> reader = null;
try {
reader = users.newReader();
for (GenericRecord user : users.newReader()) {
System.out.println(user);
}
} finally {
if (reader != null) {
reader.close();
}
}
return 0;
}
@Override
public int run(String[] args) throws Exception {
// Load the users dataset
Dataset<Record> users = Datasets.load(
"dataset:hdfs:/tmp/data/users", Record.class);
// Get a reader for the dataset and read all the users
DatasetReader<Record> reader = null;
try {
reader = users.with("favoriteColor", "green").newReader();
for (GenericRecord user : reader) {
System.out.println(user);
}
} finally {
if (reader != null) {
reader.close();
}
}
return 0;
}
@Test
public void testCreateChangedRelativePathURIWithDescriptorLocation() {
String uri = "dataset:hive:ns/ds";
Datasets.create(uri, new DatasetDescriptor.Builder(DESCRIPTOR)
.location("file:/tmp/data/ns/ds")
.build());
Table table = metastore.getTable("ns", "ds");
Assert.assertNotNull("Table should be found under ns.ds", table);
Assert.assertTrue("Should create an external table",
HiveAbstractMetadataProvider.isExternal(table));
Assert.assertTrue(Datasets.delete(uri));
}
@Test
public void testExternalRelative() {
DatasetRepository repo = DatasetRepositories
.repositoryFor("repo:hive:data?" + hdfsQueryArgs);
repo.delete("ns", "test");
repo.create("ns", "test", descriptor);
Dataset<Object> ds = Datasets
.<Object, Dataset<Object>>load("dataset:hive:data/ns/test?" + hdfsQueryArgs, Object.class);
Assert.assertNotNull("Should load dataset", ds);
Assert.assertTrue(ds instanceof FileSystemDataset);
Path cwd = getDFS().makeQualified(new Path("."));
Assert.assertEquals("Locations should match",
new Path(cwd, "data/ns/test").toUri(), ds.getDescriptor().getLocation());
Assert.assertEquals("Descriptors should match",
repo.load("ns", "test").getDescriptor(), ds.getDescriptor());
repo.delete("ns", "test");
}
@Override
public int run(String[] args) throws Exception {
// Load the events dataset
Dataset<GenericRecord> events = Datasets.load("dataset:hive:/tmp/data/default/events");
// Get a reader for the dataset and read all the events
DatasetReader<GenericRecord> reader = events.newReader();
try {
for (GenericRecord event : reader) {
System.out.println(event);
}
} finally {
reader.close();
}
return 0;
}
@Test
public void testAbsolute() {
DatasetRepository repo = DatasetRepositories.repositoryFor("repo:file:/tmp/data");
repo.delete("ns", "test");
repo.create("ns", "test", descriptor);
Dataset<Record> ds = Datasets.<Record, Dataset<Record>>
load("dataset:file:/tmp/data/ns/test", Record.class);
Assert.assertNotNull("Should load dataset", ds);
Assert.assertTrue(ds instanceof FileSystemDataset);
Assert.assertEquals("Locations should match",
URI.create("file:/tmp/data/ns/test"),
ds.getDescriptor().getLocation());
Assert.assertEquals("Descriptors should match",
repo.load("ns", "test").getDescriptor(), ds.getDescriptor());
Assert.assertEquals("Should report correct namespace",
"ns", ds.getNamespace());
Assert.assertEquals("Should report correct name",
"test", ds.getName());
repo.delete("ns", "test");
}
@Override
public int run(String[] args) throws Exception {
// Load the events dataset
Dataset<GenericRecord> events = Datasets.load("dataset:hive:/tmp/data/default/events");
// Get a reader for the dataset and read all the events
DatasetReader<GenericRecord> reader = events.newReader();
try {
for (GenericRecord event : reader) {
System.out.println(event);
}
} finally {
reader.close();
}
return 0;
}
@Test
public void testUnpartitionedDataset() throws Exception {
File folder = temp.newFolder("a/b/c/d/e/dataset_name");
Path root = new Path(temp.getRoot().toURI());
FileSystem fs = LocalFileSystem.getInstance();
URI datasetUri = URI.create("dataset:file:" + folder.getAbsolutePath());
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schema(USER_SCHEMA)
.build();
Dataset<GenericRecord> dataset = Datasets.create(datasetUri, descriptor);
// write two so that the descriptor uses the directory rather than a file
writeUserToView(dataset);
writeUserToView(dataset);
DatasetDescriptor expected = dataset.getDescriptor();
DatasetDescriptor actual = Iterables.getOnlyElement(
FileSystemUtil.findPotentialDatasets(fs, root));
Assert.assertEquals("Should succeed and find an equivalent descriptor",
expected, actual);
}
@Test
public void testExternal() {
DatasetRepository repo = DatasetRepositories
.repositoryFor("repo:hive:/tmp/data?" + hdfsQueryArgs);
repo.delete("ns", "test");
repo.create("ns", "test", descriptor);
Dataset<Object> ds = Datasets
.<Object, Dataset<Object>>load("dataset:hive:/tmp/data/ns/test?" + hdfsQueryArgs, Object.class);
Assert.assertNotNull("Should load dataset", ds);
Assert.assertTrue(ds instanceof FileSystemDataset);
Assert.assertEquals("Locations should match",
URI.create("hdfs://" + hdfsAuth + "/tmp/data/ns/test"),
ds.getDescriptor().getLocation());
Assert.assertEquals("Descriptors should match",
repo.load("ns", "test").getDescriptor(), ds.getDescriptor());
Assert.assertEquals("Should report correct namespace",
"ns", ds.getNamespace());
Assert.assertEquals("Should report correct name",
"test", ds.getName());
repo.delete("ns", "test");
}
@Test
public void testLoadChangedAbsolutePathURICompatibility() {
// this used to be a relative external URI, but is now a managed URI
String uri = "dataset:hive:/data/ds";
DatasetRepository repo = DatasetRepositories
.repositoryFor("repo:hive:/tmp/data");
DatasetDescriptor withLocation = new DatasetDescriptor.Builder(DESCRIPTOR)
.location("file:/tmp/data/ds") // old location
.build();
Dataset<GenericRecord> expected = repo.create(
"default", "ds", withLocation, GenericRecord.class);
Dataset<GenericRecord> actual = Datasets.load(uri);
Assert.assertEquals("Should load existing dataset default.ds",
expected, actual);
Assert.assertEquals("URI should use apparent namespace",
"dataset:hive:data/ds", actual.getUri().toString());
Assert.assertTrue(Datasets.delete(uri));
}
@Test
public void testManaged() {
DatasetRepository repo = DatasetRepositories
.repositoryFor("repo:hive?" + hdfsQueryArgs);
repo.delete("ns", "test");
repo.create("ns", "test", descriptor);
Dataset<Object> ds = Datasets
.<Object, Dataset<Object>>load("dataset:hive?dataset=test&namespace=ns&" + hdfsQueryArgs, Object.class);
Assert.assertNotNull("Should load dataset", ds);
Assert.assertTrue(ds instanceof FileSystemDataset);
Assert.assertEquals("Descriptors should match",
repo.load("ns", "test").getDescriptor(), ds.getDescriptor());
Assert.assertEquals("Should report correct namespace",
"ns", ds.getNamespace());
Assert.assertEquals("Should report correct name",
"test", ds.getName());
repo.delete("ns", "test");
}
@Test
public void testLocalImport() throws Exception {
String sample = temp.newFile("sample.sequence").toString();
writeSequenceFile(getFS(), new Path(sample)); // local sequence file
// Crunch will use a MemPipeline, so use the custom copying InputFormat
command.inFormatClass = CopyingInputFormat.class.getName();
command.targets = Lists.newArrayList(sample, datasetUri);
command.noCompaction = true; // no need to run reducers
int rc = command.run();
Assert.assertEquals("Should return success", 0, rc);
verify(console).info("Added {} records to \"{}\"", 3L, datasetUri);
verifyNoMoreInteractions(console);
Set<Measurement> datasetContent = materialize(
Datasets.load(datasetUri, Measurement.class));
Assert.assertEquals(Sets.newHashSet(measurements), datasetContent);
}
@Test
public void testLocalImportWithTransform() throws Exception {
String sample = temp.newFile("sample.sequence").toString();
writeSequenceFile(getFS(), new Path(sample)); // local sequence file
// Crunch will use a MemPipeline, so use the custom copying InputFormat
command.inFormatClass = CopyingInputFormat.class.getName();
command.targets = Lists.newArrayList(sample, datasetUri);
command.noCompaction = true; // no need to run reducers
command.transform = TransformMeasurement.class.getName();
int rc = command.run();
Assert.assertEquals("Should return success", 0, rc);
verify(console).info("Added {} records to \"{}\"", 3L, datasetUri);
verifyNoMoreInteractions(console);
Set<Measurement> datasetContent = materialize(
Datasets.load(datasetUri, Measurement.class));
Set<Measurement> expected = Sets.newHashSet(Iterables.transform(
measurements, new TransformMeasurement()));
Assert.assertEquals(expected, datasetContent);
}
@Test
public void testMRImportWithTransform() throws Exception {
Path sample = new Path(temp.newFile("sample.sequence").toString())
.makeQualified(getDFS().getUri(), new Path("/"));
writeSequenceFile(getDFS(), sample); // HDFS sequence file
// Reusing records is okay when running in MR
command.inFormatClass = SequenceFileInputFormat.class.getName();
command.targets = Lists.newArrayList(sample.toString(), datasetUri);
command.noCompaction = true; // no need to run reducers
command.transform = TransformMeasurement.class.getName();
int rc = command.run();
Assert.assertEquals("Should return success", 0, rc);
verify(console).info("Added {} records to \"{}\"", 3L, datasetUri);
verifyNoMoreInteractions(console);
Set<Measurement> datasetContent = materialize(
Datasets.load(datasetUri, Measurement.class));
Set<Measurement> expected = Sets.newHashSet(Iterables.transform(
measurements, new TransformMeasurement()));
Assert.assertEquals(expected, datasetContent);
}
@Test
public void testCreateFromExisting() throws Exception {
command.datasets = Lists.newArrayList(existingDataURI);
command.run();
verify(console).debug(contains("Created"), eq(existingDataURI));
// load the new dataset and verify it
Dataset<GenericRecord> users = Datasets.load(existingDataURI);
Assert.assertEquals("Schema should match",
USER_SCHEMA, users.getDescriptor().getSchema());
Assert.assertFalse("Should not be partitioned",
users.getDescriptor().isPartitioned());
Assert.assertEquals("Should be Parquet",
Formats.PARQUET, users.getDescriptor().getFormat());
}
@Test
public void testExternal() {
DatasetRepository repo = DatasetRepositories.repositoryFor("repo:hive:/tmp/data");
repo.delete("ns", "test");
repo.create("ns", "test", descriptor);
Dataset<GenericRecord> ds = Datasets.load("dataset:hive:/tmp/data/ns/test");
Assert.assertNotNull("Should load dataset", ds);
Assert.assertTrue(ds instanceof FileSystemDataset);
Assert.assertEquals("Locations should match",
URI.create("hdfs://" + hdfsAuth + "/tmp/data/ns/test"),
ds.getDescriptor().getLocation());
Assert.assertEquals("Descriptors should match",
repo.load("ns", "test").getDescriptor(), ds.getDescriptor());
repo.delete("ns", "test");
}
@Test
public void testUpdateWithUpdatedURI() {
Dataset<GenericRecord> updated = Datasets.update(
"dataset:hive:/tmp/datasets/default/test",
new DatasetDescriptor.Builder(descriptor)
.property("added.property", "true")
.build());
Assert.assertNotNull("Update should succeed", updated);
DatasetDescriptor stored =
HiveUtils.descriptorForTable(conf, metastore.getTable("default", "test"));
Assert.assertEquals("Should update default.test descriptor",
stored, updated.getDescriptor());
Assert.assertEquals("Added property should be present",
stored.getProperty("added.property"), "true");
}
@Test
public void testBasicStoreToHive() throws IOException {
String datasetUri = "dataset:hive:ns/test";
Dataset<Record> dataset = Datasets.create(datasetUri, descriptor, Record.class);
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.assertNotValid();
runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
runner.assertValid();
List<Record> users = Lists.newArrayList(
user("a", "[email protected]"),
user("b", "[email protected]"),
user("c", "[email protected]")
);
runner.enqueue(streamFor(users));
runner.run();
runner.assertAllFlowFilesTransferred("success", 1);
List<Record> stored = Lists.newArrayList(
(Iterable<Record>) dataset.newReader());
Assert.assertEquals("Records should match", users, stored);
Datasets.delete(datasetUri);
}
@Before
public void createDataset() throws Exception {
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schema(TestUtil.USER_SCHEMA)
.build();
this.datasetUri = "dataset:file:" + temp.newFolder("ns", "temp").toString();
this.dataset = Datasets.create(datasetUri, descriptor, Record.class);
}
@Test
public void testCreateChangedAbsolutePathURI() {
// this used to be a relative external URI, but is now a managed URI
String uri = "dataset:hive:/ns/ds";
Datasets.create(uri, DESCRIPTOR);
Table table = metastore.getTable("ns", "ds");
Assert.assertNotNull("Table should be found under ns.ds", table);
Assert.assertTrue("Should create a managed table: " + table.getSd().getLocation(),
HiveAbstractMetadataProvider.isManaged(table));
Assert.assertTrue(Datasets.delete(uri));
}
@Test
public void testFindsHDFS() throws Exception {
// set the default configuration that the loader will use
Configuration existing = DefaultConfiguration.get();
DefaultConfiguration.set(getConfiguration());
FileSystemDataset<GenericRecord> dataset =
Datasets.load("dataset:hdfs:/tmp/datasets/ns/strings");
Assert.assertNotNull("Dataset should be found", dataset);
Assert.assertEquals("Dataset should be located in HDFS",
"hdfs", dataset.getFileSystem().getUri().getScheme());
// replace the original config so the other tests are not affected
DefaultConfiguration.set(existing);
}
@Test
public void testMissingDataset() {
TestHelpers.assertThrows("Should not find dataset: no such dataset",
DatasetNotFoundException.class, new Runnable() {
@Override
public void run() {
Datasets.load("dataset:hive:/tmp/data/ns/nosuchdataset");
}
});
}