下面列出了com.google.common.collect.DiscreteDomains#org.kitesdk.data.DatasetWriter 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public int run(List<String> args) throws Exception {
Preconditions.checkState(!Datasets.exists(uri),
"events dataset already exists");
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schema(StandardEvent.class).build();
View<StandardEvent> events = Datasets.create(uri, descriptor, StandardEvent.class);
DatasetWriter<StandardEvent> writer = events.newWriter();
try {
while (System.currentTimeMillis() - baseTimestamp < 36000) {
writer.write(generateRandomEvent());
}
} finally {
writer.close();
}
System.out.println("Generated " + counter + " events");
return 0;
}
@Override
public void write(E entity) {
Preconditions.checkState(state.equals(ReaderWriterState.OPEN),
"Attempt to write to a writer in state:%s", state);
accessor.keyFor(entity, provided, reusedKey);
DatasetWriter<E> writer = cachedWriters.getIfPresent(reusedKey);
if (writer == null) {
// avoid checking in every whether the entity belongs in the view by only
// checking when a new writer is created
Preconditions.checkArgument(view.includes(entity),
"View %s does not include entity %s", view, entity);
// get a new key because it is stored in the cache
StorageKey key = StorageKey.copy(reusedKey);
try {
writer = cachedWriters.getUnchecked(key);
} catch (UncheckedExecutionException ex) {
throw new IllegalArgumentException(
"Problem creating view for entity: " + entity, ex.getCause());
}
}
writer.write(entity);
}
@BeforeClass
public static void setup() throws IOException {
fs = LocalFileSystem.getInstance();
testDirectory = new Path(Files.createTempDir().getAbsolutePath());
FileSystemDatasetRepository repo = new FileSystemDatasetRepository(fs.getConf(),
testDirectory);
Dataset<MyRecord> writerDataset = repo.create("ns", "test", new DatasetDescriptor.Builder()
.schema(MyRecord.class)
.build(), MyRecord.class);
DatasetWriter<MyRecord> writer = writerDataset.newWriter();
for (int i = 0; i < totalRecords; i++) {
writer.write(new MyRecord(String.valueOf(i), i));
}
writer.close();
readerDataset = repo.load("ns", "test", GenericRecord.class);
}
@BeforeClass
public static void setup() throws IOException {
fs = LocalFileSystem.getInstance();
testDirectory = new Path(Files.createTempDir().getAbsolutePath());
FileSystemDatasetRepository repo = new FileSystemDatasetRepository(fs.getConf(),
testDirectory);
Dataset<MyRecord> writerDataset = repo.create("ns", "test", new DatasetDescriptor.Builder()
.schema(MyRecord.class)
.build(), MyRecord.class);
DatasetWriter<MyRecord> writer = writerDataset.newWriter();
for (int i = 0; i < totalRecords; i++) {
writer.write(new MyRecord(String.valueOf(i), i));
}
writer.close();
readerDataset = repo.load("ns", "test", TestGenericRecord.class);
}
private static void writeTestRecords(View<TestRecord> view) {
DatasetWriter<TestRecord> writer = null;
try {
writer = view.newWriter();
for (int i = 0; i < 10; i += 1) {
TestRecord record = new TestRecord();
record.id = i;
record.data = "test/-" + i;
writer.write(record);
}
} finally {
if (writer != null) {
writer.close();
}
}
}
public static void writeTestUsers(View<GenericData.Record> view, int count, int start, String... fields) {
DatasetWriter<GenericData.Record> writer = null;
try {
writer = view.newWriter();
for (int i = start; i < count + start; i++) {
GenericRecordBuilder recordBuilder = new GenericRecordBuilder(view.getDataset().getDescriptor
().getSchema()).set("username", "test-" + i);
for (String field : fields) {
recordBuilder.set(field, field + "-" + i);
}
writer.write(recordBuilder.build());
}
if (writer instanceof Flushable) {
((Flushable) writer).flush();
}
} finally {
if (writer != null) {
writer.close();
}
}
}
private static void writeTestRecords(View<TestRecord> view) {
DatasetWriter<TestRecord> writer = null;
try {
writer = view.newWriter();
for (int i = 0; i < 10; i += 1) {
TestRecord record = new TestRecord();
record.id = i;
record.data = "test-" + i;
writer.write(record);
}
} finally {
if (writer != null) {
writer.close();
}
}
}
@Test
public void testSpecificProjectionLoad() throws IOException {
DatasetWriter<StandardEvent> writer = null;
try {
writer = unbounded.newWriter();
writer.write(sepEvent);
writer.write(octEvent);
writer.write(novEvent);
} finally {
Closeables.close(writer, false);
}
Dataset<SmallEvent> dataset = repo.load(
"ns", unbounded.getDataset().getName(),
SmallEvent.class);
Set<SmallEvent> expected = Sets.newHashSet(toSmallEvent(sepEvent),
toSmallEvent(octEvent), toSmallEvent(novEvent));
assertContentEquals(expected, dataset);
}
@Test
public void testMixedProjection() throws IOException {
Dataset<StandardEvent> original = repo.create("ns", "mixedProjection",
new DatasetDescriptor.Builder()
.schema(StandardEvent.class)
.build(), StandardEvent.class);
DatasetWriter<StandardEvent> writer = null;
try {
writer = original.newWriter();
writer.write(sepEvent);
writer.write(octEvent);
writer.write(novEvent);
} finally {
Closeables.close(writer, false);
}
Dataset<ReflectSmallEvent> dataset = repo.load("ns", original.getName(),
ReflectSmallEvent.class);
Set<ReflectSmallEvent> expected = Sets.newHashSet(
new ReflectSmallEvent(sepEvent), new ReflectSmallEvent(octEvent),
new ReflectSmallEvent(novEvent));
assertContentEquals(expected, dataset);
}
@Override
public int run(String[] args) throws Exception {
// Create a partition strategy that hash partitions on username with 10 buckets
PartitionStrategy partitionStrategy = new PartitionStrategy.Builder()
.identity("favoriteColor", "favorite_color")
.build();
// Create a dataset of users with the Avro schema
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schemaUri("resource:user.avsc")
.partitionStrategy(partitionStrategy)
.build();
Dataset<Record> users = Datasets.create(
"dataset:hdfs:/tmp/data/users", descriptor, Record.class);
// Get a writer for the dataset and write some users to it
DatasetWriter<Record> writer = null;
try {
writer = users.newWriter();
Random rand = new Random();
GenericRecordBuilder builder = new GenericRecordBuilder(descriptor.getSchema());
for (int i = 0; i < 100; i++) {
Record record = builder.set("username", "user-" + i)
.set("creationDate", System.currentTimeMillis())
.set("favoriteColor", colors[rand.nextInt(colors.length)]).build();
writer.write(record);
}
} finally {
if (writer != null) {
writer.close();
}
}
return 0;
}
@Override
public int run(String[] args) throws Exception {
// Create a dataset of users with the Avro schema
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schemaUri("resource:user.avsc")
.build();
Dataset<Record> users = Datasets.create(
"dataset:hdfs:/tmp/data/users", descriptor, Record.class);
// Get a writer for the dataset and write some users to it
DatasetWriter<Record> writer = null;
try {
writer = users.newWriter();
Random rand = new Random();
GenericRecordBuilder builder = new GenericRecordBuilder(descriptor.getSchema());
for (int i = 0; i < 100; i++) {
Record record = builder.set("username", "user-" + i)
.set("creationDate", System.currentTimeMillis())
.set("favoriteColor", colors[rand.nextInt(colors.length)]).build();
writer.write(record);
}
} finally {
if (writer != null) {
writer.close();
}
}
return 0;
}
@Override
public int run(String[] args) throws Exception {
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schemaUri("resource:user.avsc")
.format(Formats.PARQUET)
.build();
Dataset<Record> users = Datasets.create(
"dataset:hdfs:/tmp/data/users", descriptor, Record.class);
// Get a writer for the dataset and write some users to it
DatasetWriter<Record> writer = null;
try {
writer = users.newWriter();
Random rand = new Random();
GenericRecordBuilder builder = new GenericRecordBuilder(descriptor.getSchema());
for (int i = 0; i < 100; i++) {
Record record = builder.set("username", "user-" + i)
.set("creationDate", System.currentTimeMillis())
.set("favoriteColor", colors[rand.nextInt(colors.length)]).build();
writer.write(record);
}
} finally {
if (writer != null) {
writer.close();
}
}
return 0;
}
@Override
public int run(String[] args) throws Exception {
// Create a dataset of products with the Avro schema
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schema(Product.class)
.build();
Dataset<Product> products = Datasets.create(
"dataset:hdfs:/tmp/data/products", descriptor, Product.class);
// Get a writer for the dataset and write some products to it
DatasetWriter<Product> writer = null;
try {
writer = products.newWriter();
int i = 0;
for (String name : names) {
Product product = new Product();
product.setName(name);
product.setId(i++);
writer.write(product);
}
} finally {
if (writer != null) {
writer.close();
}
}
return 0;
}
@Override
public int run(String[] args) throws Exception {
// Create a dataset of users with the Avro schema
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schemaUri("resource:user.avsc")
.build();
Dataset<Record> users = Datasets.create("dataset:hive?dataset=users",
descriptor, Record.class);
// Get a writer for the dataset and write some users to it
DatasetWriter<Record> writer = null;
try {
writer = users.newWriter();
Random rand = new Random();
GenericRecordBuilder builder = new GenericRecordBuilder(descriptor.getSchema());
for (int i = 0; i < 100; i++) {
Record record = builder.set("username", "user-" + i)
.set("creationDate", System.currentTimeMillis())
.set("favoriteColor", colors[rand.nextInt(colors.length)]).build();
writer.write(record);
}
} finally {
if (writer != null) {
writer.close();
}
}
return 0;
}
private DatasetWriter<GenericRecord> newWriter(
final UserGroupInformation login, final URI uri) {
View<GenericRecord> view = KerberosUtil.runPrivileged(login,
new PrivilegedExceptionAction<Dataset<GenericRecord>>() {
@Override
public Dataset<GenericRecord> run() {
return Datasets.load(uri);
}
});
DatasetDescriptor descriptor = view.getDataset().getDescriptor();
String formatName = descriptor.getFormat().getName();
Preconditions.checkArgument(allowedFormats().contains(formatName),
"Unsupported format: " + formatName);
Schema newSchema = descriptor.getSchema();
if (targetSchema == null || !newSchema.equals(targetSchema)) {
this.targetSchema = descriptor.getSchema();
// target dataset schema has changed, invalidate all readers based on it
readers.invalidateAll();
}
this.reuseDatum = !("parquet".equals(formatName));
this.datasetName = view.getDataset().getName();
return view.newWriter();
}
private void writeRecords(Dataset<GenericRecord> dataset, int count) {
DatasetWriter<GenericRecord> writer = dataset.newWriter();
try {
for (int i = 0; i < count; ++i) {
GenericRecord entity = HBaseDatasetRepositoryTest.createGenericEntity(i);
writer.write(entity);
}
} finally {
writer.close();
}
}
private void populateInputDataset() {
DatasetWriter<GenericData.Record> writer = inputDataset.newWriter();
writer.write(newStringRecord("apple"));
writer.write(newStringRecord("banana"));
writer.write(newStringRecord("banana"));
writer.write(newStringRecord("carrot"));
writer.write(newStringRecord("apple"));
writer.write(newStringRecord("apple"));
writer.close();
}
@Override
public void close() {
if (state.equals(ReaderWriterState.OPEN)) {
LOG.debug("Closing all cached writers for view:{}", view);
for (DatasetWriter<E> writer : cachedWriters.asMap().values()) {
LOG.debug("Closing partition writer:{}", writer);
writer.close();
}
state = ReaderWriterState.CLOSED;
}
}
@Override
public void setRollIntervalMillis(long rollIntervalMillis) {
this.rollIntervalMillis = rollIntervalMillis;
if (ReaderWriterState.OPEN == state) {
for (DatasetWriter<E> writer : cachedWriters.asMap().values()) {
if (writer instanceof RollingWriter) {
((RollingWriter) writer).setRollIntervalMillis(rollIntervalMillis);
}
}
}
}
@Override
public void setTargetFileSize(long targetSizeBytes) {
this.targetFileSize = targetSizeBytes;
if (ReaderWriterState.OPEN == state) {
for (DatasetWriter<E> writer : cachedWriters.asMap().values()) {
if (writer instanceof RollingWriter) {
((RollingWriter) writer).setTargetFileSize(targetSizeBytes);
}
}
}
}
@Override
public void tick() {
if (ReaderWriterState.OPEN == state) {
for (DatasetWriter<E> writer : cachedWriters.asMap().values()) {
if (writer instanceof ClockReady) {
((ClockReady) writer).tick();
}
}
}
}
@Override
public void onRemoval(
RemovalNotification<StorageKey, DatasetWriter<E>> notification) {
DatasetWriter<E> writer = notification.getValue();
LOG.debug("Closing writer:{} for partition:{}", writer,
notification.getKey());
writer.close();
}
@Override
public DatasetWriter<E> newWriter() {
checkSchemaForWrite();
AbstractDatasetWriter<E> writer;
if (dataset.getDescriptor().isPartitioned()) {
writer = PartitionedDatasetWriter.newWriter(this);
} else {
writer = FileSystemWriter.newWriter(
fs, root, -1, -1 /* get from descriptor */, dataset.getDescriptor(), this.getAccessor().getWriteSchema());
}
writer.initialize();
return writer;
}
@Test
@SuppressWarnings("unchecked")
public void testUnboundedDelete() throws Exception {
// NOTE: this is an un-restricted write so all should succeed
DatasetWriter<StandardEvent> writer = null;
try {
writer = unbounded.newWriter();
writer.write(sepEvent);
writer.write(octEvent);
writer.write(novEvent);
} finally {
Closeables.close(writer, false);
}
final Path root = new Path("target/data/ns/test");
final Path y2013 = new Path("target/data/ns/test/year=2013");
final Path sep = new Path("target/data/ns/test/year=2013/month=09");
final Path sep12 = new Path("target/data/ns/test/year=2013/month=09/day=12");
final Path oct = new Path("target/data/ns/test/year=2013/month=10");
final Path oct12 = new Path("target/data/ns/test/year=2013/month=10/day=12");
final Path nov = new Path("target/data/ns/test/year=2013/month=11");
final Path nov11 = new Path("target/data/ns/test/year=2013/month=11/day=11");
assertDirectoriesExist(fs, root, y2013, sep, sep12, oct, oct12, nov, nov11);
Assert.assertTrue("Delete should return true to indicate data was deleted.",
unbounded.deleteAll());
assertDirectoriesDoNotExist(fs, y2013, sep12, sep, oct12, oct, nov11, nov);
assertDirectoriesExist(fs, root);
}
@Test
@SuppressWarnings("unchecked")
public void testUnboundedMoveToTrash() throws Exception {
// NOTE: this is an un-restricted write so all should succeed
DatasetWriter<StandardEvent> writer = null;
try {
writer = unbounded.newWriter();
writer.write(sepEvent);
writer.write(octEvent);
writer.write(novEvent);
} finally {
Closeables.close(writer, false);
}
final Path root = new Path("target/data/ns/test");
final Path y2013 = new Path("target/data/ns/test/year=2013");
final Path sep = new Path("target/data/ns/test/year=2013/month=09");
final Path sep12 = new Path("target/data/ns/test/year=2013/month=09/day=12");
final Path oct = new Path("target/data/ns/test/year=2013/month=10");
final Path oct12 = new Path("target/data/ns/test/year=2013/month=10/day=12");
final Path nov = new Path("target/data/ns/test/year=2013/month=11");
final Path nov11 = new Path("target/data/ns/test/year=2013/month=11/day=11");
assertDirectoriesExist(fs, root, y2013, sep, sep12, oct, oct12, nov, nov11);
Assert.assertTrue("Delete should return true to indicate data was deleted.",
unbounded.moveToTrash());
assertDirectoriesDoNotExist(fs, y2013, sep12, sep, oct12, oct, nov11, nov);
assertDirectoriesExist(fs, root);
}
@Test
public void testRangeWithEmptyDirectory() throws Exception {
long start = new DateTime(2013, 9, 1, 0, 0, DateTimeZone.UTC).getMillis();
long end = new DateTime(2013, 11, 14, 0, 0, DateTimeZone.UTC).getMillis();
final RefinableView<StandardEvent> range = unbounded
.from("timestamp", start).toBefore("timestamp", end);
DatasetWriter<StandardEvent> writer = null;
try {
writer = range.newWriter();
writer.write(sepEvent);
writer.write(novEvent);
} finally {
Closeables.close(writer, false);
}
// All expected datasets and partitions are present
final Path root = new Path("target/data/ns/test");
final Path y2013 = new Path("target/data/ns/test/year=2013");
final Path sep = new Path("target/data/ns/test/year=2013/month=09");
final Path sep12 = new Path("target/data/ns/test/year=2013/month=09/day=12");
final Path nov = new Path("target/data/ns/test/year=2013/month=11");
final Path nov11 = new Path("target/data/ns/test/year=2013/month=11/day=11");
assertDirectoriesExist(fs, root, y2013, sep, sep12, nov, nov11);
// Recreate an empty directory that represents a valid partition for october 12.
final Path oct = new Path("target/data/ns/test/year=2013/month=10");
final Path oct12 = new Path("target/data/ns/test/year=2013/month=10/day=12");
assertTrue("mkdir should return true to indicate FS changed.", fs.mkdirs(oct12));
assertDirectoriesExist(fs, oct, oct12);
// Test reading from September thru November. The range includes the empty directory.
assertContentEquals(Sets.newHashSet(sepEvent, novEvent), range);
}
@Test
public void testRefineIdentity() throws Exception {
PartitionStrategy strategy = new PartitionStrategy.Builder()
.identity("user_id")
.build();
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schemaUri("resource:standard_event.avsc")
.partitionStrategy(strategy)
.build();
// Create a separate dataset to avoid conflicts with the above.
Dataset<StandardEvent> identityDataset = repo.create(
"ns", "test_identity", descriptor);
DatasetWriter<StandardEvent> writer = null;
try {
writer = identityDataset.newWriter();
writer.write(sepEvent);
writer.write(octEvent);
writer.write(novEvent);
} finally {
Closeables.close(writer, false);
}
assertContentEquals(Sets.newHashSet(sepEvent, novEvent),
identityDataset.with("user_id", 0L));
}
private static <E> void writeToView(View<E> view, E... entities) {
DatasetWriter<E> writer = null;
try {
writer = view.newWriter();
for (E entity : entities) {
writer.write(entity);
}
writer.close();
} finally {
if (writer != null) {
writer.close();
}
}
}
public void writeUserToView(View<GenericRecord> dataset) {
DatasetWriter<GenericRecord> writer = null;
try {
writer = dataset.newWriter();
writer.write(USER);
} finally {
if (writer != null) {
writer.close();
}
}
}
@Test
public void testGenericProjectionAsSchema() throws IOException {
Dataset<StandardEvent> original = Datasets.load(
unbounded.getUri(), StandardEvent.class);
Schema standardEvent = Schemas.fromAvsc(conf,
URI.create("resource:standard_event.avsc"));
final Schema smallEvent = Schemas.fromAvsc(conf,
URI.create("resource:small_event.avsc"));
DatasetWriter<GenericRecord> writer = null;
try {
writer = original.asSchema(standardEvent).newWriter();
writer.write(toGeneric(sepEvent, standardEvent));
writer.write(toGeneric(octEvent, standardEvent));
writer.write(toGeneric(novEvent, standardEvent));
} finally {
Closeables.close(writer, false);
}
final View<GenericRecord> smallEvents = original.asSchema(smallEvent);
Set<GenericRecord> expected = Sets.newHashSet(
toGeneric(toSmallEvent(sepEvent), smallEvent),
toGeneric(toSmallEvent(octEvent), smallEvent),
toGeneric(toSmallEvent(novEvent), smallEvent));
assertContentEquals(expected, smallEvents);
TestHelpers.assertThrows("Should not be able to write small events",
IncompatibleSchemaException.class, new Runnable() {
@Override
public void run() {
smallEvents.newWriter();
}
});
}