下面列出了java.sql.Statement#unwrap ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Ignore
@Test
public void giganticLoadDataInfileTest() throws SQLException, IOException {
try (VeryLongAutoGeneratedInputStream in = new VeryLongAutoGeneratedInputStream(300000000)) {
try (Connection connection = setConnection("&allowLocalInfile=true")) {
Statement statement = connection.createStatement();
MariaDbStatement mariaDbStatement = statement.unwrap(MariaDbStatement.class);
mariaDbStatement.setLocalInfileInputStream(in);
String sql =
"LOAD DATA LOCAL INFILE 'dummyFileName'"
+ " INTO TABLE gigantic_load_data_infile "
+ " FIELDS TERMINATED BY '\\t' ENCLOSED BY ''"
+ " ESCAPED BY '\\\\' LINES TERMINATED BY '\\n'";
assertTrue(statement.execute(sql));
ResultSet rs = statement.executeQuery("select count(*) from gigantic_load_data_infile");
assertTrue(rs.next());
assertEquals(in.numberOfRows, rs.getInt(1));
}
}
}
/**
* Returns the query plan associated with the select query.
* @param context
* @return
* @throws IOException
* @throws SQLException
*/
private QueryPlan getQueryPlan(final JobContext context,final Configuration configuration) throws IOException {
Preconditions.checkNotNull(context);
try{
final Connection connection = ConnectionUtil.getConnection(configuration);
final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
Preconditions.checkNotNull(selectStatement);
final Statement statement = connection.createStatement();
final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
// Optimize the query plan so that we potentially use secondary indexes
final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
// Initialize the query plan so it sets up the parallel scans
queryPlan.iterator();
return queryPlan;
} catch(Exception exception) {
LOG.error(String.format("Failed to get the query plan with error [%s]",exception.getMessage()));
throw new RuntimeException(exception);
}
}
@Test
public void testStartKeyStopKey() throws SQLException {
long ts = nextTimestamp();
Properties props = new Properties();
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
Connection conn = DriverManager.getConnection(getUrl(), props);
conn.createStatement().execute("CREATE TABLE start_stop_test (pk char(2) not null primary key) SPLIT ON ('EA','EZ')");
conn.close();
String query = "select count(*) from start_stop_test where pk >= 'EA' and pk < 'EZ'";
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2));
conn = DriverManager.getConnection(getUrl(), props);
Statement statement = conn.createStatement();
statement.execute(query);
PhoenixStatement pstatement = statement.unwrap(PhoenixStatement.class);
List<KeyRange>splits = pstatement.getQueryPlan().getSplits();
assertTrue(splits.size() > 0);
}
@Test
public void testStartKeyStopKey() throws SQLException {
Properties props = new Properties();
Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
conn.createStatement().execute("CREATE TABLE " + tableName
+ " (pk char(2) not null primary key) SPLIT ON ('EA','EZ')");
conn.close();
String query = "select count(*) from " + tableName + " where pk >= 'EA' and pk < 'EZ'";
conn = DriverManager.getConnection(getUrl(), props);
Statement statement = conn.createStatement();
statement.execute(query);
PhoenixStatement pstatement = statement.unwrap(PhoenixStatement.class);
List<KeyRange> splits = pstatement.getQueryPlan().getSplits();
assertTrue(splits.size() > 0);
}
@Test
public void testStartKeyStopKey() throws SQLException {
long ts = nextTimestamp();
Properties props = new Properties();
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
Connection conn = DriverManager.getConnection(getUrl(), props);
conn.createStatement().execute("CREATE TABLE start_stop_test (pk char(2) not null primary key) SPLIT ON ('EA','EZ')");
conn.close();
String query = "select count(*) from start_stop_test where pk >= 'EA' and pk < 'EZ'";
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2));
conn = DriverManager.getConnection(getUrl(), props);
Statement statement = conn.createStatement();
statement.execute(query);
PhoenixStatement pstatement = statement.unwrap(PhoenixStatement.class);
List<KeyRange>splits = pstatement.getQueryPlan().getSplits();
assertTrue(splits.size() > 0);
}
@Test
public void testUnwrapper() throws Throwable
{
try (Connection connection = getConnection())
{
Statement statement = connection.createStatement();
if (statement.isWrapperFor(SnowflakeStatementV1.class))
{
statement.execute("select 1");
SnowflakeStatement sfstatement = statement.unwrap(SnowflakeStatement.class);
assertNotNull(sfstatement.getQueryID());
}
else
{
fail("should be able to unwrap");
}
try
{
statement.unwrap(SnowflakeConnectionV1.class);
fail("should fail to cast");
}
catch (SQLException ex)
{
// nop
}
}
}
@Override
public List<ColumnInfo> apply(String sqlQuery) {
Preconditions.checkNotNull(sqlQuery);
Connection connection = null;
List<ColumnInfo> columnInfos = null;
try {
connection = ConnectionUtil.getConnection(this.configuration);
final Statement statement = connection.createStatement();
final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
final QueryPlan queryPlan = pstmt.compileQuery(sqlQuery);
final List<? extends ColumnProjector> projectedColumns = queryPlan.getProjector().getColumnProjectors();
columnInfos = Lists.newArrayListWithCapacity(projectedColumns.size());
columnInfos = Lists.transform(projectedColumns, new Function<ColumnProjector,ColumnInfo>() {
@Override
public ColumnInfo apply(final ColumnProjector columnProjector) {
return new ColumnInfo(columnProjector.getName(), columnProjector.getExpression().getDataType().getSqlType());
}
});
} catch (SQLException e) {
LOG.error(String.format(" Error [%s] parsing SELECT query [%s] ",e.getMessage(),sqlQuery));
throw new RuntimeException(e);
} finally {
if(connection != null) {
try {
connection.close();
} catch(SQLException sqle) {
LOG.error("Error closing connection!!");
throw new RuntimeException(sqle);
}
}
}
return columnInfos;
}
@Override
public Pair<String, String> apply(final String selectStatement) {
Preconditions.checkNotNull(selectStatement);
Preconditions.checkArgument(!selectStatement.isEmpty(), "Select Query is empty!!");
Connection connection = null;
try {
connection = ConnectionUtil.getConnection(this.configuration);
final Statement statement = connection.createStatement();
final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
final QueryPlan queryPlan = pstmt.compileQuery(selectStatement);
isValidStatement(queryPlan);
final String tableName = queryPlan.getTableRef().getTable().getName().getString();
final List<? extends ColumnProjector> projectedColumns = queryPlan.getProjector().getColumnProjectors();
final List<String> columns = Lists.transform(projectedColumns,
new Function<ColumnProjector,String>() {
@Override
public String apply(ColumnProjector column) {
return column.getName();
}
});
final String columnsAsStr = Joiner.on(",").join(columns);
return new Pair<String, String>(tableName, columnsAsStr);
} catch (SQLException e) {
LOG.error(String.format(" Error [%s] parsing SELECT query [%s] ",e.getMessage(),selectStatement));
throw new RuntimeException(e);
} finally {
if(connection != null) {
try {
connection.close();
} catch(SQLException sqle) {
LOG.error(" Error closing connection ");
throw new RuntimeException(sqle);
}
}
}
}
@Test
public void testIteratorsPickedInRoundRobinFashionForSaltedTable() throws Exception {
try (Connection conn = getConnection()) {
String testTable = "testIteratorsPickedInRoundRobinFashionForSaltedTable".toUpperCase();
Statement stmt = conn.createStatement();
stmt.execute("CREATE TABLE " + testTable + "(K VARCHAR PRIMARY KEY) SALT_BUCKETS = 8");
PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class);
MockParallelIteratorFactory parallelIteratorFactory = new MockParallelIteratorFactory();
phxConn.setIteratorFactory(parallelIteratorFactory);
ResultSet rs = stmt.executeQuery("SELECT * FROM " + testTable);
StatementContext ctx = rs.unwrap(PhoenixResultSet.class).getContext();
PTable table = ctx.getResolver().getTables().get(0).getTable();
parallelIteratorFactory.setTable(table);
PhoenixStatement pstmt = stmt.unwrap(PhoenixStatement.class);
int numIterators = pstmt.getQueryPlan().getSplits().size();
assertEquals(8, numIterators);
int numFetches = 2 * numIterators;
List<String> iteratorOrder = new ArrayList<>(numFetches);
for (int i = 1; i <= numFetches; i++) {
rs.next();
iteratorOrder.add(rs.getString(1));
}
/*
* Because TableResultIterators are created in parallel in multiple threads, their relative order is not
* deterministic. However, once the iterators are assigned to a RoundRobinResultIterator, the order in which
* the next iterator is picked is deterministic - i1, i2, .. i7, i8, i1, i2, .. i7, i8, i1, i2, ..
*/
for (int i = 0; i < numIterators; i++) {
assertEquals(iteratorOrder.get(i), iteratorOrder.get(i + numIterators));
}
}
}
@Override
public Statement getNativeStatement(Statement stmt) throws SQLException {
return stmt.unwrap(this.statementType);
}
@Override
public Statement getNativeStatement(Statement stmt) throws SQLException {
return stmt.unwrap(this.statementType);
}
private CassandraStatementExtras statementExtras(Statement statement) throws Exception
{
Class cse = Class.forName("com.github.adejanovski.cassandra.jdbc.CassandraStatementExtras");
return (CassandraStatementExtras) statement.unwrap(cse);
}
@Override
public Statement getNativeStatement(Statement stmt) throws SQLException {
return stmt.unwrap(this.statementType);
}