下面列出了com.google.inject.multibindings.ProvidesIntoSet#io.prestosql.spi.connector.ConnectorTransactionHandle 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private long createTable(String name)
{
ConnectorTransactionHandle transaction = connector.beginTransaction(READ_COMMITTED, false);
connector.getMetadata(transaction).createTable(
SESSION,
new ConnectorTableMetadata(
new SchemaTableName("test", name),
ImmutableList.of(new ColumnMetadata("id", BIGINT))),
false);
connector.commit(transaction);
transaction = connector.beginTransaction(READ_COMMITTED, false);
ConnectorTableHandle tableHandle = getTableHandle(connector.getMetadata(transaction), name);
connector.commit(transaction);
return ((RaptorTableHandle) tableHandle).getTableId();
}
@Override
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle tableHandle)
{
requireNonNull(tableHandle, "tableHandle is null");
checkArgument(tableHandle instanceof CassandraInsertTableHandle, "tableHandle is not an instance of ConnectorInsertTableHandle");
CassandraInsertTableHandle handle = (CassandraInsertTableHandle) tableHandle;
return new CassandraPageSink(
cassandraSession,
cassandraSession.getProtocolVersion(),
handle.getSchemaName(),
handle.getTableName(),
handle.getColumnNames(),
handle.getColumnTypes(),
handle.isGenerateUuid());
}
@Override
public final RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
TransactionId transactionId = ((GlobalSystemTransactionHandle) transactionHandle).getTransactionId();
InMemoryRecordSet.Builder table = InMemoryRecordSet.builder(tableMetadata);
Map<CatalogName, Map<String, PropertyMetadata<?>>> connectorProperties = propertySupplier.get();
for (Entry<String, CatalogName> entry : new TreeMap<>(transactionManager.getCatalogNames(transactionId)).entrySet()) {
String catalog = entry.getKey();
Map<String, PropertyMetadata<?>> properties = new TreeMap<>(connectorProperties.getOrDefault(entry.getValue(), ImmutableMap.of()));
for (PropertyMetadata<?> propertyMetadata : properties.values()) {
table.addRow(
catalog,
propertyMetadata.getName(),
firstNonNull(propertyMetadata.getDefaultValue(), "").toString(),
propertyMetadata.getSqlType().toString(),
propertyMetadata.getDescription());
}
}
return table.build().cursor();
}
@Override
public ConnectorPageSource createPageSource(
ConnectorTransactionHandle transaction,
ConnectorSession session,
ConnectorSplit split,
ConnectorTableHandle table,
List<ColumnHandle> columns,
TupleDomain<ColumnHandle> dynamicFilter)
{
MongoTableHandle tableHandle = (MongoTableHandle) table;
ImmutableList.Builder<MongoColumnHandle> handles = ImmutableList.builder();
for (ColumnHandle handle : requireNonNull(columns, "columns is null")) {
handles.add((MongoColumnHandle) handle);
}
return new MongoPageSource(mongoSession, tableHandle, handles.build());
}
@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableHandle table, SplitSchedulingStrategy splitSchedulingStrategy)
{
AtopTableHandle tableHandle = (AtopTableHandle) table;
List<ConnectorSplit> splits = new ArrayList<>();
ZonedDateTime end = ZonedDateTime.now(timeZone);
for (Node node : nodeManager.getWorkerNodes()) {
ZonedDateTime start = end.minusDays(maxHistoryDays - 1).withHour(0).withMinute(0).withSecond(0).withNano(0);
while (start.isBefore(end)) {
ZonedDateTime splitEnd = start.withHour(23).withMinute(59).withSecond(59).withNano(0);
Domain splitDomain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP_WITH_TIME_ZONE, 1000 * start.toEpochSecond(), true, 1000 * splitEnd.toEpochSecond(), true)), false);
if (tableHandle.getStartTimeConstraint().overlaps(splitDomain) && tableHandle.getEndTimeConstraint().overlaps(splitDomain)) {
splits.add(new AtopSplit(node.getHostAndPort(), start.toEpochSecond(), start.getZone()));
}
start = start.plusDays(1).withHour(0).withMinute(0).withSecond(0).withNano(0);
}
}
return new FixedSplitSource(splits);
}
@Override
public ConnectorPageSourceProvider getPageSourceProvider()
{
return new ConnectorPageSourceProvider()
{
@Override
public ConnectorPageSource createPageSource(
ConnectorTransactionHandle transaction,
ConnectorSession session,
ConnectorSplit split,
ConnectorTableHandle table,
List<ColumnHandle> columns,
TupleDomain<ColumnHandle> dynamicFilter)
{
return new EmptyPageSource();
}
};
}
@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableHandle table, SplitSchedulingStrategy splitSchedulingStrategy)
{
JmxTableHandle tableHandle = (JmxTableHandle) table;
//TODO is there a better way to get the node column?
Optional<JmxColumnHandle> nodeColumnHandle = tableHandle.getColumnHandles().stream()
.filter(jmxColumnHandle -> jmxColumnHandle.getColumnName().equals(NODE_COLUMN_NAME))
.findFirst();
checkState(nodeColumnHandle.isPresent(), "Failed to find %s column", NODE_COLUMN_NAME);
TupleDomain<ColumnHandle> nodeFilter = tableHandle.getNodeFilter();
List<ConnectorSplit> splits = nodeManager.getAllNodes().stream()
.filter(node -> {
NullableValue value = NullableValue.of(createUnboundedVarcharType(), utf8Slice(node.getNodeIdentifier()));
return nodeFilter.overlaps(fromFixedValues(ImmutableMap.of(nodeColumnHandle.get(), value)));
})
.map(node -> new JmxSplit(ImmutableList.of(node.getHostAndPort())))
.collect(toList());
return new FixedSplitSource(splits);
}
@Override
public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List<? extends ColumnHandle> columns)
{
JmxTableHandle tableHandle = (JmxTableHandle) table;
requireNonNull(columns, "columns is null");
checkArgument(!columns.isEmpty(), "must provide at least one column");
List<List<Object>> rows;
try {
if (tableHandle.isLiveData()) {
rows = getLiveRows(tableHandle, columns);
}
else {
List<Integer> selectedColumns = calculateSelectedColumns(tableHandle.getColumnHandles(), getColumnNames(columns));
rows = tableHandle.getObjectNames().stream()
.flatMap(objectName -> jmxHistoricalData.getRows(objectName, selectedColumns).stream())
.collect(toImmutableList());
}
}
catch (JMException e) {
rows = ImmutableList.of();
}
return new InMemoryRecordSet(getColumnTypes(columns), rows);
}
@Override
public ConnectorPageSource createPageSource(
ConnectorTransactionHandle transaction,
ConnectorSession session,
ConnectorSplit split,
ConnectorTableHandle table,
List<ColumnHandle> columns,
TupleDomain<ColumnHandle> dynamicFilter)
{
AtopTableHandle tableHandle = (AtopTableHandle) table;
AtopSplit atopSplit = (AtopSplit) split;
ImmutableList.Builder<Type> types = ImmutableList.builder();
ImmutableList.Builder<AtopColumn> atopColumns = ImmutableList.builder();
for (ColumnHandle column : columns) {
AtopColumnHandle atopColumnHandle = (AtopColumnHandle) column;
AtopColumn atopColumn = tableHandle.getTable().getColumn(atopColumnHandle.getName());
atopColumns.add(atopColumn);
types.add(typeManager.getType(atopColumn.getType()));
}
ZonedDateTime date = atopSplit.getDate();
checkArgument(date.equals(date.withHour(0).withMinute(0).withSecond(0).withNano(0)), "Expected date to be at beginning of day");
return new AtopPageSource(readerPermits, atopFactory, session, utf8Slice(atopSplit.getHost().getHostText()), tableHandle.getTable(), date, atopColumns.build(), types.build());
}
@Override
public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List<? extends ColumnHandle> columns)
{
JdbcSplit jdbcSplit = (JdbcSplit) split;
JdbcTableHandle jdbcTable = (JdbcTableHandle) table;
// In the current API, the columns (and order) needed by the engine are provided via an argument to this method. Make sure that
// any columns that were recorded in the table handle match the requested set.
// If no columns are recorded, it means that applyProjection never got called (e.g., in the case all columns are being used) and all
// table columns should be returned. TODO: this is something that should be addressed once the getRecordSet API is revamped
jdbcTable.getColumns()
.ifPresent(tableColumns -> verify(columns.equals(tableColumns)));
ImmutableList.Builder<JdbcColumnHandle> handles = ImmutableList.builder();
for (ColumnHandle handle : columns) {
handles.add((JdbcColumnHandle) handle);
}
return new JdbcRecordSet(jdbcClient, session, jdbcSplit, jdbcTable, handles.build());
}
@Override
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle tableHandle)
{
RaptorInsertTableHandle handle = (RaptorInsertTableHandle) tableHandle;
return new RaptorPageSink(
pageSorter,
storageManager,
temporalFunction,
handle.getTransactionId(),
toColumnIds(handle.getColumnHandles()),
handle.getColumnTypes(),
toColumnIds(handle.getSortColumnHandles()),
handle.getSortOrders(),
handle.getBucketCount(),
toColumnIds(handle.getBucketColumnHandles()),
handle.getTemporalColumnHandle(),
maxBufferSize);
}
private static Connector createTestSessionConnector()
{
return new Connector()
{
@Override
public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly)
{
return new ConnectorTransactionHandle() {};
}
@Override
public ConnectorMetadata getMetadata(ConnectorTransactionHandle transaction)
{
return new SystemTablesMetadata(new StaticSystemTablesProvider(ImmutableSet.of()));
}
};
}
@Override
public ConnectorSplitSource getSplits(
ConnectorTransactionHandle transaction,
ConnectorSession session,
ConnectorTableHandle table,
SplitSchedulingStrategy splitSchedulingStrategy)
{
log.debug("getSplits(transaction=%s, session=%s, table=%s, splitSchedulingStrategy=%s)", transaction, session, table, splitSchedulingStrategy);
BigQueryTableHandle bigQueryTableHandle = (BigQueryTableHandle) table;
TableId tableId = bigQueryTableHandle.getTableId();
int actualParallelism = parallelism.orElse(nodeManager.getRequiredWorkerNodes().size());
TupleDomain<ColumnHandle> constraint = bigQueryTableHandle.getConstraint();
Optional<String> filter = BigQueryFilterQueryBuilder.buildFilter(constraint);
List<BigQuerySplit> splits = emptyProjectionIsRequired(bigQueryTableHandle.getProjectedColumns()) ?
createEmptyProjection(tableId, actualParallelism, filter) :
readFromBigQuery(tableId, bigQueryTableHandle.getProjectedColumns(), actualParallelism, filter);
return new FixedSplitSource(splits);
}
@Override
public TableHandle makeCompatiblePartitioning(Session session, TableHandle tableHandle, PartitioningHandle partitioningHandle)
{
checkArgument(partitioningHandle.getConnectorId().isPresent(), "Expect partitioning handle from connector, got system partitioning handle");
CatalogName catalogName = partitioningHandle.getConnectorId().get();
checkArgument(catalogName.equals(tableHandle.getCatalogName()), "ConnectorId of tableHandle and partitioningHandle does not match");
CatalogMetadata catalogMetadata = getCatalogMetadata(session, catalogName);
ConnectorMetadata metadata = catalogMetadata.getMetadataFor(catalogName);
ConnectorTransactionHandle transaction = catalogMetadata.getTransactionHandleFor(catalogName);
if (metadata.usesLegacyTableLayouts()) {
ConnectorTableLayoutHandle newTableLayoutHandle = metadata.makeCompatiblePartitioning(session.toConnectorSession(catalogName), tableHandle.getLayout().get(), partitioningHandle.getConnectorHandle());
return new TableHandle(catalogName, tableHandle.getConnectorHandle(), transaction, Optional.of(newTableLayoutHandle));
}
verify(tableHandle.getLayout().isEmpty(), "layout should not be present");
ConnectorTableHandle newTableHandle = metadata.makeCompatiblePartitioning(
session.toConnectorSession(catalogName),
tableHandle.getConnectorHandle(),
partitioningHandle.getConnectorHandle());
return new TableHandle(catalogName, newTableHandle, transaction, Optional.empty());
}
@Override
public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
InMemoryRecordSet.Builder table = InMemoryRecordSet.builder(COLUMNS);
Set<Long> ancestorIds = ImmutableSet.copyOf(SnapshotUtil.currentAncestors(icebergTable));
TimeZoneKey timeZoneKey = session.getTimeZoneKey();
for (HistoryEntry historyEntry : icebergTable.history()) {
long snapshotId = historyEntry.snapshotId();
Snapshot snapshot = icebergTable.snapshot(snapshotId);
table.addRow(
packDateTimeWithZone(historyEntry.timestampMillis(), timeZoneKey),
snapshotId,
snapshot != null ? snapshot.parentId() : null,
ancestorIds.contains(snapshotId));
}
return table.build().cursor();
}
@Test
public void testTableDefinition()
{
KinesisMetadata metadata = (KinesisMetadata) connector.getMetadata(new ConnectorTransactionHandle() {});
SchemaTableName tblName = new SchemaTableName("prod", "test_table");
KinesisTableHandle tableHandle = metadata.getTableHandle(SESSION, tblName);
assertNotNull(metadata);
SchemaTableName tableSchemaName = tableHandle.toSchemaTableName();
assertEquals(tableSchemaName.getSchemaName(), "prod");
assertEquals(tableSchemaName.getTableName(), "test_table");
assertEquals(tableHandle.getStreamName(), "test_kinesis_stream");
assertEquals(tableHandle.getMessageDataFormat(), "json");
Map<String, ColumnHandle> columnHandles = metadata.getColumnHandles(SESSION, tableHandle);
assertEquals(columnHandles.size(), 14);
assertEquals(columnHandles.values().stream().filter(x -> ((KinesisColumnHandle) x).isInternal()).count(), 10);
}
@Override
public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List<? extends ColumnHandle> columns)
{
LocalFileSplit localFileSplit = (LocalFileSplit) split;
LocalFileTableHandle localFileTable = (LocalFileTableHandle) table;
ImmutableList.Builder<LocalFileColumnHandle> handles = ImmutableList.builder();
for (ColumnHandle handle : columns) {
handles.add((LocalFileColumnHandle) handle);
}
return new LocalFileRecordSet(localFileTables, localFileSplit, localFileTable, handles.build());
}
SystemTransactionHandle(
TransactionId transactionId,
Function<TransactionId, ConnectorTransactionHandle> transactionHandleFunction)
{
this.transactionId = requireNonNull(transactionId, "transactionId is null");
requireNonNull(transactionHandleFunction, "transactionHandleFunction is null");
this.connectorTransactionHandle = Suppliers.memoize(() -> transactionHandleFunction.apply(transactionId));
}
@Override
public Optional<ResolvedIndex> resolveIndex(Session session, TableHandle tableHandle, Set<ColumnHandle> indexableColumns, Set<ColumnHandle> outputColumns, TupleDomain<ColumnHandle> tupleDomain)
{
CatalogName catalogName = tableHandle.getCatalogName();
CatalogMetadata catalogMetadata = getCatalogMetadata(session, catalogName);
ConnectorMetadata metadata = catalogMetadata.getMetadataFor(catalogName);
ConnectorTransactionHandle transaction = catalogMetadata.getTransactionHandleFor(catalogName);
ConnectorSession connectorSession = session.toConnectorSession(catalogName);
Optional<ConnectorResolvedIndex> resolvedIndex = metadata.resolveIndex(connectorSession, tableHandle.getConnectorHandle(), indexableColumns, outputColumns, tupleDomain);
return resolvedIndex.map(resolved -> new ResolvedIndex(tableHandle.getCatalogName(), transaction, resolved));
}
@Override
public ToIntFunction<ConnectorSplit> getSplitBucketFunction(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.getSplitBucketFunction(transactionHandle, session, partitioningHandle);
}
}
@Override
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle insertTableHandle)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return new ClassLoaderSafeConnectorPageSink(delegate.createPageSink(transactionHandle, session, insertTableHandle), classLoader);
}
}
@Override
public ConnectorSplitSource getSplits(
ConnectorTransactionHandle transaction,
ConnectorSession session,
ConnectorTableHandle table,
SplitSchedulingStrategy splitSchedulingStrategy)
{
KuduTableHandle handle = (KuduTableHandle) table;
List<KuduSplit> splits = clientSession.buildKuduSplits(handle);
return new FixedSplitSource(splits);
}
@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout, SplitSchedulingStrategy splitSchedulingStrategy)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.getSplits(transactionHandle, session, layout, splitSchedulingStrategy);
}
}
@Override
public ToIntFunction<ConnectorSplit> getSplitBucketFunction(
ConnectorTransactionHandle transactionHandle,
ConnectorSession session,
ConnectorPartitioningHandle partitioningHandle)
{
return value -> {
throw new PrestoException(NOT_SUPPORTED, "Black hole connector does not supported distributed reads");
};
}
@JsonCreator
public InsertTableHandle(
@JsonProperty("catalogName") CatalogName catalogName,
@JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle,
@JsonProperty("connectorHandle") ConnectorInsertTableHandle connectorHandle)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.transactionHandle = requireNonNull(transactionHandle, "transactionHandle is null");
this.connectorHandle = requireNonNull(connectorHandle, "connectorHandle is null");
}
@Override
public ConnectorBucketNodeMap getBucketNodeMap(
ConnectorTransactionHandle transactionHandle,
ConnectorSession session,
ConnectorPartitioningHandle partitioningHandle)
{
return ConnectorBucketNodeMap.createBucketNodeMap(1);
}
@Override
public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List<? extends ColumnHandle> columns)
{
TpchSplit tpchSplit = (TpchSplit) split;
TpchTableHandle tpchTable = (TpchTableHandle) table;
return getRecordSet(
TpchTable.getTable(tpchTable.getTableName()),
columns,
tpchTable.getScaleFactor(),
tpchSplit.getPartNumber(),
tpchSplit.getTotalParts(),
tpchTable.getConstraint());
}
@Override
public void rollback(ConnectorTransactionHandle transaction)
{
TransactionalMetadata metadata = transactionManager.remove(transaction);
checkArgument(metadata != null, "no such transaction: %s", transaction);
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
metadata.rollback();
}
}
@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableHandle table, SplitSchedulingStrategy splitSchedulingStrategy)
{
List<ConnectorSplit> splits = nodeManager.getAllNodes().stream()
.map(node -> new LocalFileSplit(node.getHostAndPort()))
.collect(Collectors.toList());
return new FixedSplitSource(splits);
}
@Override
public ConnectorPageSource createPageSource(
ConnectorTransactionHandle transaction,
ConnectorSession session,
ConnectorSplit split,
ConnectorTableHandle table,
List<ColumnHandle> columns,
TupleDomain<ColumnHandle> dynamicFilter)
{
requireNonNull(split, "split is null");
requireNonNull(table, "table is null");
ElasticsearchTableHandle elasticsearchTable = (ElasticsearchTableHandle) table;
ElasticsearchSplit elasticsearchSplit = (ElasticsearchSplit) split;
if (elasticsearchTable.getType().equals(QUERY)) {
return new PassthroughQueryPageSource(client, elasticsearchTable);
}
if (columns.isEmpty()) {
return new CountQueryPageSource(client, session, elasticsearchTable, elasticsearchSplit);
}
return new ScanQueryPageSource(
client,
session,
elasticsearchTable,
elasticsearchSplit,
columns.stream()
.map(ElasticsearchColumnHandle.class::cast)
.collect(toImmutableList()));
}