下面列出了java.sql.Connection#unwrap ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Returns the table if it is found in the connection metadata cache. If the metadata of this
* table has changed since it was put in the cache these changes will not necessarily be
* reflected in the returned table. If the table is not found, makes a call to the server to
* fetch the latest metadata of the table. This is different than how a table is resolved when
* it is referenced from a query (a call is made to the server to fetch the latest metadata of the table
* depending on the UPDATE_CACHE_FREQUENCY property)
* See https://issues.apache.org/jira/browse/PHOENIX-4475
* @param conn
* @param name requires a pre-normalized table name or a pre-normalized schema and table name
* @return
* @throws SQLException
*/
public static PTable getTable(Connection conn, String name) throws SQLException {
PTable table = null;
PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
try {
table = pconn.getTable(new PTableKey(pconn.getTenantId(), name));
} catch (TableNotFoundException e) {
String schemaName = SchemaUtil.getSchemaNameFromFullName(name);
String tableName = SchemaUtil.getTableNameFromFullName(name);
MetaDataMutationResult result =
new MetaDataClient(pconn).updateCache(schemaName, tableName);
if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
throw e;
}
table = result.getTable();
}
return table;
}
private Connection setup() throws SQLException {
// Create a jdbc database & table
final String db = TempDb.INSTANCE.getUrl();
Connection c1 = DriverManager.getConnection(db, "", "");
Statement stmt1 = c1.createStatement();
// This is a table we can join with the emps from the hr schema
stmt1.execute("create table table1(id integer not null primary key, "
+ "field1 varchar(10))");
stmt1.execute("insert into table1 values(100, 'foo')");
stmt1.execute("insert into table1 values(200, 'bar')");
c1.close();
// Make a Calcite schema with both a jdbc schema and a non-jdbc schema
Connection connection = DriverManager.getConnection("jdbc:calcite:");
CalciteConnection calciteConnection =
connection.unwrap(CalciteConnection.class);
SchemaPlus rootSchema = calciteConnection.getRootSchema();
rootSchema.add("DB",
JdbcSchema.create(rootSchema, "DB",
JdbcSchema.dataSource(db, "org.hsqldb.jdbcDriver", "", ""),
null, null));
rootSchema.add("hr", new ReflectiveSchema(new JdbcTest.HrSchema()));
return connection;
}
@Test
public void testSuspect() throws Exception {
this.datasource.setMaxActive(100);
this.datasource.setMaxIdle(100);
this.datasource.setInitialSize(0);
this.datasource.getPoolProperties().setAbandonWhenPercentageFull(0);
this.datasource.getPoolProperties().setTimeBetweenEvictionRunsMillis(100);
this.datasource.getPoolProperties().setRemoveAbandoned(true);
this.datasource.getPoolProperties().setRemoveAbandonedTimeout(100);
this.datasource.getPoolProperties().setSuspectTimeout(1);
this.datasource.getPoolProperties().setLogAbandoned(true);
Connection con = datasource.getConnection();
Assert.assertEquals("Number of connections active/busy should be 1",1,datasource.getPool().getActive());
Thread.sleep(3000);
PooledConnection pcon = con.unwrap(PooledConnection.class);
Assert.assertTrue("Connection should be marked suspect",pcon.isSuspect());
con.close();
}
@Deprecated
private static List<PColumn> getPkColumns(PTable ptable, Connection conn, boolean forDataTable) throws SQLException {
PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
List<PColumn> pkColumns = ptable.getPKColumns();
// Skip the salting column and the view index id column if present.
// Skip the tenant id column too if the connection is tenant specific and the table used by the query plan is multi-tenant
int offset = (ptable.getBucketNum() == null ? 0 : 1) + (ptable.isMultiTenant() && pConn.getTenantId() != null ? 1 : 0) + (ptable.getViewIndexId() == null ? 0 : 1);
// get a sublist of pkColumns by skipping the offset columns.
pkColumns = pkColumns.subList(offset, pkColumns.size());
if (ptable.getType() == PTableType.INDEX && forDataTable) {
// index tables have the same schema name as their parent/data tables.
String fullDataTableName = ptable.getParentName().getString();
// Get the corresponding columns of the data table.
List<PColumn> dataColumns = IndexUtil.getDataColumns(fullDataTableName, pkColumns, pConn);
pkColumns = dataColumns;
}
return pkColumns;
}
/**
* Tests a table function that implements {@link ScannableTable} and returns
* a single column.
*/
@Test void testScannableTableFunction()
throws SQLException, ClassNotFoundException {
Connection connection = DriverManager.getConnection("jdbc:calcite:");
CalciteConnection calciteConnection =
connection.unwrap(CalciteConnection.class);
SchemaPlus rootSchema = calciteConnection.getRootSchema();
SchemaPlus schema = rootSchema.add("s", new AbstractSchema());
final TableFunction table = TableFunctionImpl.create(Smalls.MAZE_METHOD);
schema.add("Maze", table);
final String sql = "select *\n"
+ "from table(\"s\".\"Maze\"(5, 3, 1))";
ResultSet resultSet = connection.createStatement().executeQuery(sql);
final String result = "S=abcde\n"
+ "S=xyz\n"
+ "S=generate(w=5, h=3, s=1)\n";
assertThat(CalciteAssert.toString(resultSet), is(result));
}
/** Test case for
* <a href="https://issues.apache.org/jira/browse/CALCITE-937">[CALCITE-937]
* User-defined function within view</a>. */
@Test void testUserDefinedFunctionInView() throws Exception {
Class.forName("org.apache.calcite.jdbc.Driver");
Connection connection = DriverManager.getConnection("jdbc:calcite:");
CalciteConnection calciteConnection =
connection.unwrap(CalciteConnection.class);
SchemaPlus rootSchema = calciteConnection.getRootSchema();
rootSchema.add("hr", new ReflectiveSchema(new JdbcTest.HrSchema()));
SchemaPlus post = rootSchema.add("POST", new AbstractSchema());
post.add("MY_INCREMENT",
ScalarFunctionImpl.create(Smalls.MyIncrement.class, "eval"));
final String viewSql = "select \"empid\" as EMPLOYEE_ID,\n"
+ " \"name\" || ' ' || \"name\" as EMPLOYEE_NAME,\n"
+ " \"salary\" as EMPLOYEE_SALARY,\n"
+ " POST.MY_INCREMENT(\"empid\", 10) as INCREMENTED_SALARY\n"
+ "from \"hr\".\"emps\"";
post.add("V_EMP",
ViewTable.viewMacro(post, viewSql, ImmutableList.of(),
ImmutableList.of("POST", "V_EMP"), null));
final String result = ""
+ "EMPLOYEE_ID=100; EMPLOYEE_NAME=Bill Bill; EMPLOYEE_SALARY=10000.0; INCREMENTED_SALARY=110.0\n"
+ "EMPLOYEE_ID=200; EMPLOYEE_NAME=Eric Eric; EMPLOYEE_SALARY=8000.0; INCREMENTED_SALARY=220.0\n"
+ "EMPLOYEE_ID=150; EMPLOYEE_NAME=Sebastian Sebastian; EMPLOYEE_SALARY=7000.0; INCREMENTED_SALARY=165.0\n"
+ "EMPLOYEE_ID=110; EMPLOYEE_NAME=Theodore Theodore; EMPLOYEE_SALARY=11500.0; INCREMENTED_SALARY=121.0\n";
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(viewSql);
assertThat(CalciteAssert.toString(resultSet), is(result));
resultSet.close();
ResultSet viewResultSet =
statement.executeQuery("select * from \"POST\".\"V_EMP\"");
assertThat(CalciteAssert.toString(viewResultSet), is(result));
statement.close();
connection.close();
}
public static PTable getTableNoCache(Connection conn, String name) throws SQLException {
String schemaName = SchemaUtil.getSchemaNameFromFullName(name);
String tableName = SchemaUtil.getTableNameFromFullName(name);
PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
MetaDataMutationResult result = new MetaDataClient(pconn).updateCache(pconn.getTenantId(),
schemaName, tableName, true);
if(result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
throw new TableNotFoundException(schemaName, tableName);
}
return result.getTable();
}
/** Test case for
* <a href="https://issues.apache.org/jira/browse/CALCITE-937">[CALCITE-937]
* User-defined function within view</a>. */
@Test public void testUserDefinedFunctionInView() throws Exception {
Class.forName("com.qihoo.qsql.org.apache.calcite.jdbc.Driver");
Connection connection = DriverManager.getConnection("jdbc:calcite:");
CalciteConnection calciteConnection =
connection.unwrap(CalciteConnection.class);
SchemaPlus rootSchema = calciteConnection.getRootSchema();
rootSchema.add("hr", new ReflectiveSchema(new JdbcTest.HrSchema()));
SchemaPlus post = rootSchema.add("POST", new AbstractSchema());
post.add("MY_INCREMENT",
ScalarFunctionImpl.create(Smalls.MyIncrement.class, "eval"));
final String viewSql = "select \"empid\" as EMPLOYEE_ID,\n"
+ " \"name\" || ' ' || \"name\" as EMPLOYEE_NAME,\n"
+ " \"salary\" as EMPLOYEE_SALARY,\n"
+ " POST.MY_INCREMENT(\"empid\", 10) as INCREMENTED_SALARY\n"
+ "from \"hr\".\"emps\"";
post.add("V_EMP",
ViewTable.viewMacro(post, viewSql, ImmutableList.of(),
ImmutableList.of("POST", "V_EMP"), null));
final String result = ""
+ "EMPLOYEE_ID=100; EMPLOYEE_NAME=Bill Bill; EMPLOYEE_SALARY=10000.0; INCREMENTED_SALARY=110.0\n"
+ "EMPLOYEE_ID=200; EMPLOYEE_NAME=Eric Eric; EMPLOYEE_SALARY=8000.0; INCREMENTED_SALARY=220.0\n"
+ "EMPLOYEE_ID=150; EMPLOYEE_NAME=Sebastian Sebastian; EMPLOYEE_SALARY=7000.0; INCREMENTED_SALARY=165.0\n"
+ "EMPLOYEE_ID=110; EMPLOYEE_NAME=Theodore Theodore; EMPLOYEE_SALARY=11500.0; INCREMENTED_SALARY=121.0\n";
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(viewSql);
assertThat(CalciteAssert.toString(resultSet), is(result));
resultSet.close();
ResultSet viewResultSet =
statement.executeQuery("select * from \"POST\".\"V_EMP\"");
assertThat(CalciteAssert.toString(viewResultSet), is(result));
statement.close();
connection.close();
}
private static void assertActiveIndex(Connection conn, String schemaName, String tableName) throws SQLException {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
conn.createStatement().executeQuery("SELECT count(*) FROM " + fullTableName).next(); // client side cache will update
PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName)).getIndexMaintainers(ptr, pconn);
assertTrue(ptr.getLength() > 0);
}
private static void assertNoActiveIndex(Connection conn, String schemaName, String tableName) throws SQLException {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
conn.createStatement().executeQuery("SELECT count(*) FROM " + fullTableName).next(); // client side cache will update
PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(), fullTableName)).getIndexMaintainers(ptr, pconn);
assertTrue(ptr.getLength() == 0);
}
/**
* Encode the primary key values from the table as a byte array. The values must
* be in the same order as the primary key constraint. If the connection and
* table are both tenant-specific, the tenant ID column must not be present in
* the values.
* @param conn an open connection
* @param fullTableName the full table name
* @param values the values of the primary key columns ordered in the same order
* as the primary key constraint
* @return the encoded byte array
* @throws SQLException if the table cannot be found or the incorrect number of
* of values are provided
* @see #decodePK(Connection, String, byte[]) to decode the byte[] back to the
* values
*/
@Deprecated
public static byte[] encodePK(Connection conn, String fullTableName, Object[] values) throws SQLException {
PTable table = getTable(conn, fullTableName);
PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
int offset = (table.getBucketNum() == null ? 0 : 1) + (table.isMultiTenant() && pconn.getTenantId() != null ? 1 : 0);
List<PColumn> pkColumns = table.getPKColumns();
if (pkColumns.size() - offset != values.length) {
throw new SQLException("Expected " + (pkColumns.size() - offset) + " but got " + values.length);
}
PDataType type = null;
TrustedByteArrayOutputStream output = new TrustedByteArrayOutputStream(table.getRowKeySchema().getEstimatedValueLength());
try {
for (int i = offset; i < pkColumns.size(); i++) {
if (type != null && !type.isFixedWidth()) {
output.write(QueryConstants.SEPARATOR_BYTE);
}
type = pkColumns.get(i).getDataType();
//for fixed width data types like CHAR and BINARY, we need to pad values to be of max length.
Object paddedObj = type.pad(values[i - offset], pkColumns.get(i).getMaxLength());
byte[] value = type.toBytes(paddedObj);
output.write(value);
}
return output.toByteArray();
} finally {
try {
output.close();
} catch (IOException e) {
throw new RuntimeException(e); // Impossible
}
}
}
private static void makeImmutableAndDeleteData(String tableName, String fullTableName) throws Exception {
Connection conn = getConnection();
try {
conn.setAutoCommit(true);
conn.createStatement().execute("DELETE FROM " + fullTableName);
conn.createStatement().execute("ALTER TABLE " + fullTableName + " SET IMMUTABLE_ROWS=true");
conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullTableName).next();
PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
assertTrue(pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName)).isImmutableRows());
} finally {
conn.close();
}
}
private CalciteConnection createConnection() {
final Properties properties = new Properties();
properties.put(CalciteConnectionProperty.LEX.camelName(), Lex.MYSQL_ANSI.name());
try {
final Connection connection = DriverManager.getConnection("jdbc:calcite:", properties);
final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
return calciteConnection;
} catch (final Exception e) {
throw new ProcessException(e);
}
}
@SneakyThrows
private Connection unwrapPhysicalConnection(final String xaDataSourceClassName, final Connection connection) {
switch (xaDataSourceClassName) {
case MYSQL_XA_DATASOURCE_5:
return (Connection) connection.unwrap(Class.forName("com.mysql.jdbc.Connection"));
case MYSQL_XA_DATASOURCE_8:
return (Connection) connection.unwrap(Class.forName("com.mysql.cj.jdbc.JdbcConnection"));
default:
throw new UnsupportedOperationException(String.format("Cannot support xa datasource: `%s`", xaDataSourceClassName));
}
}
public void testDateFormatTimeZone(String timeZoneId) throws Exception {
Properties props = new Properties();
props.setProperty("phoenix.query.dateFormatTimeZone", timeZoneId);
Connection conn1 = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
String ddl = "CREATE TABLE IF NOT EXISTS " + tableName +
" (k1 INTEGER PRIMARY KEY," +
" v_date DATE," +
" v_time TIME," +
" v_timestamp TIMESTAMP)";
try {
conn1.createStatement().execute(ddl);
PhoenixConnection pConn = conn1.unwrap(PhoenixConnection.class);
verifyTimeZoneIDWithConn(pConn, PDate.INSTANCE, timeZoneId);
verifyTimeZoneIDWithConn(pConn, PTime.INSTANCE, timeZoneId);
verifyTimeZoneIDWithConn(pConn, PTimestamp.INSTANCE, timeZoneId);
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone(timeZoneId));
cal.setTime(date);
String dateStr = DateUtil.getDateFormatter(DateUtil.DEFAULT_MS_DATE_FORMAT).format(date);
String dml = "UPSERT INTO " + tableName + " VALUES (" +
"1," +
"'" + dateStr + "'," +
"'" + dateStr + "'," +
"'" + dateStr + "'" +
")";
conn1.createStatement().execute(dml);
conn1.commit();
PhoenixStatement stmt = conn1.createStatement().unwrap(PhoenixStatement.class);
ResultSet rs = stmt.executeQuery("SELECT v_date, v_time, v_timestamp FROM " + tableName);
assertTrue(rs.next());
assertEquals(rs.getDate(1).toString(), new Date(cal.getTimeInMillis()).toString());
assertEquals(rs.getTime(2).toString(), new Time(cal.getTimeInMillis()).toString());
assertEquals(rs.getTimestamp(3).getTime(), cal.getTimeInMillis());
assertFalse(rs.next());
StatementContext stmtContext = stmt.getQueryPlan().getContext();
verifyTimeZoneIDWithFormatter(stmtContext.getDateFormatter(), timeZoneId);
verifyTimeZoneIDWithFormatter(stmtContext.getTimeFormatter(), timeZoneId);
verifyTimeZoneIDWithFormatter(stmtContext.getTimestampFormatter(), timeZoneId);
stmt.close();
} finally {
conn1.close();
}
}
private static void assertImmutableRows(Connection conn, String fullTableName, boolean expectedValue) throws SQLException {
PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
assertEquals(expectedValue, pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(), fullTableName)).isImmutableRows());
}
private static void asssertIsWALDisabled(Connection conn, String fullTableName, boolean expectedValue) throws SQLException {
PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
assertEquals(expectedValue, pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName)).isWALDisabled());
}
/**
* 取出連線
*
* @param driver
* @param url
* @return
* @throws ClassNotFoundException
* @throws SQLException
*/
public static OlapConnection getConnection(String driver, String url) throws ClassNotFoundException,
SQLException, Exception {
Class.forName(driver);
Connection connection = DriverManager.getConnection(url);
return connection.unwrap(OlapConnection.class);
}
/**
* Get transaction information for an existing database connection.
*
* @param connection
* Database connection; must unwrap to {@link FirebirdConnection}.
* @return Database transaction information
* @throws SQLException
* If {@code connection} does not unwrap to {@link FirebirdConnection}, or for failures to
* retrieve information
*/
public static DatabaseTransactionInfo getDatabaseTransactionInfo(Connection connection) throws SQLException {
FirebirdConnection firebirdConnection = connection.unwrap(FirebirdConnection.class);
return getDatabaseTransactionInfo(firebirdConnection.getFbDatabase());
}
/**
* Reset the mutation and reads-for-mutations metrics collected in the connection.
*
* @see {@link #getReadMetricsForMutationsSinceLastReset(Connection)} {@link #getWriteMetricsForMutationsSinceLastReset(Connection)}
* @param conn
* @throws SQLException
*/
public static void resetMetrics(Connection conn) throws SQLException {
PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
pConn.clearMetrics();
}