下面列出了org.apache.hadoop.hbase.io.ImmutableBytesWritable#copyBytes ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
ImmutableBytesPtr key = ptr.get().length > FIXED_COPY_THRESHOLD &&
ptr.get().length > ptr.getLength() * COPY_THRESHOLD ?
new ImmutableBytesPtr(ptr.copyBytes()) :
new ImmutableBytesPtr(ptr);
Integer count = this.valueVsCount.get(key);
if (count == null) {
this.valueVsCount.put(key, 1);
heapSize += SizedUtil.MAP_ENTRY_SIZE + // entry
Bytes.SIZEOF_INT + // key size
key.getLength() + SizedUtil.ARRAY_SIZE; // value size
} else {
this.valueVsCount.put(key, ++count);
}
}
@Override
public void reduce(ImmutableBytesWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
byte region = key.get()[key.getOffset()];
if (lastRegion != region || size > splitLimit) {
byte[] split = lastRegion != region ? new byte[]{region} : key.copyBytes();
splits.add(split);
context.setStatus("#" + splits.size() + " " + Arrays.toString(split));
lastRegion = key.get()[key.getOffset()];
size = 0;
}
for (LongWritable val : values) {
size += val.get();
}
}
/**
* Given an ImmutableBytesWritable, returns the payload part of the argument as an byte array.
*/
public static byte[] copyKeyBytesIfNecessary(ImmutableBytesWritable ptr) {
if (ptr.getOffset() == 0 && ptr.getLength() == ptr.get().length) {
return ptr.get();
}
return ptr.copyBytes();
}
/**
* Convert a line of TSV text into an HBase table row after transforming the
* values by multiplying them by 3.
*/
@Override
public void map(LongWritable offset, Text value, Context context)
throws IOException {
byte[] family = Bytes.toBytes("FAM");
final byte[][] qualifiers = { Bytes.toBytes("A"), Bytes.toBytes("B") };
// do some basic line parsing
byte[] lineBytes = value.getBytes();
String[] valueTokens = new String(lineBytes, StandardCharsets.UTF_8).split("\u001b");
// create the rowKey and Put
ImmutableBytesWritable rowKey =
new ImmutableBytesWritable(Bytes.toBytes(valueTokens[0]));
Put put = new Put(rowKey.copyBytes());
put.setDurability(Durability.SKIP_WAL);
//The value should look like this: VALUE1 or VALUE2. Let's multiply
//the integer by 3
for(int i = 1; i < valueTokens.length; i++) {
String prefix = valueTokens[i].substring(0, "VALUE".length());
String suffix = valueTokens[i].substring("VALUE".length());
String newValue = prefix + Integer.parseInt(suffix) * 3;
KeyValue kv = new KeyValue(rowKey.copyBytes(), family,
qualifiers[i-1], Bytes.toBytes(newValue));
put.add(kv);
}
try {
context.write(rowKey, put);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* Truncates range to be a max of rangeSpan fields
* @param schema row key schema
* @param fieldIndex starting index of field with in the row key schema
* @param rangeSpan maximum field length
* @return the same range if unchanged and otherwise a new range
*/
public static KeyRange clipRange(RowKeySchema schema, int fieldIndex, int rangeSpan, KeyRange range) {
if (range == KeyRange.EVERYTHING_RANGE) {
return range;
}
if (range == KeyRange.EMPTY_RANGE) {
return range;
}
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
boolean newRange = false;
boolean lowerUnbound = range.lowerUnbound();
boolean lowerInclusive = range.isLowerInclusive();
byte[] lowerRange = range.getLowerRange();
if (!lowerUnbound && lowerRange.length > 0) {
if (clipKeyRangeBytes(schema, fieldIndex, rangeSpan, lowerRange, ptr, true)) {
// Make lower range inclusive since we're decreasing the range by chopping the last part off
lowerInclusive = true;
lowerRange = ptr.copyBytes();
newRange = true;
}
}
boolean upperUnbound = range.upperUnbound();
boolean upperInclusive = range.isUpperInclusive();
byte[] upperRange = range.getUpperRange();
if (!upperUnbound && upperRange.length > 0) {
if (clipKeyRangeBytes(schema, fieldIndex, rangeSpan, upperRange, ptr, false)) {
// Make lower range inclusive since we're decreasing the range by chopping the last part off
upperInclusive = true;
upperRange = ptr.copyBytes();
newRange = true;
}
}
return newRange ? KeyRange.getKeyRange(lowerRange, lowerInclusive, upperRange, upperInclusive) : range;
}
/**
* Given an ImmutableBytesWritable, returns the payload part of the argument as an byte array.
*/
public static byte[] copyKeyBytesIfNecessary(ImmutableBytesWritable ptr) {
if (ptr.getOffset() == 0 && ptr.getLength() == ptr.get().length) {
return ptr.get();
}
return ptr.copyBytes();
}
public Get getGet(PTable catalogTable, byte[] tenantId, String viewName) {
byte[][] tenantKeyParts = new byte[5][];
tenantKeyParts[0] = tenantId;
tenantKeyParts[1] = Bytes.toBytes(SCHEMA_NAME.toUpperCase());
tenantKeyParts[2] = Bytes.toBytes(viewName.toUpperCase());
tenantKeyParts[3] = Bytes.toBytes(VIEW_COLUMN_NAME);
tenantKeyParts[4] = VIEW_COLUMN_FAMILY_BYTES;
ImmutableBytesWritable key = new ImmutableBytesWritable();
catalogTable.newKey(key, tenantKeyParts);
//the backing byte array of key might have extra space at the end.
// need to just slice "the good parts" which we do by calling copyBytes
return new Get(key.copyBytes());
}
private static byte[] getKeyPart(PTable t, String... keys) throws SQLException {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
byte[][] keyByteArray = new byte[keys.length][];
int i = 0;
for (String key : keys) {
keyByteArray[i++] = key == null ? ByteUtil.EMPTY_BYTE_ARRAY : Bytes.toBytes(key);
}
t.newKey(ptr, keyByteArray);
return ptr.copyBytes();
}
/**
* Given an ImmutableBytesWritable, returns the payload part of the argument as an byte array.
*/
public static byte[] copyKeyBytesIfNecessary(ImmutableBytesWritable ptr) {
if (ptr.getOffset() == 0 && ptr.getLength() == ptr.get().length) {
return ptr.get();
}
return ptr.copyBytes();
}
public static byte[] copyBytesIfNecessary(ImmutableBytesWritable ptr) {
if (ptr.getOffset() == 0 && ptr.getLength() == ptr.get().length) {
return ptr.get();
}
return ptr.copyBytes();
}
/**
* @param tableName parent table's name
* Looks for whether child views exist for the table specified by table.
* TODO: should we pass a timestamp here?
*/
private TableViewFinderResult findChildViews(HRegion region, byte[] tenantId, PTable table) throws IOException {
byte[] schemaName = table.getSchemaName().getBytes();
byte[] tableName = table.getTableName().getBytes();
boolean isMultiTenant = table.isMultiTenant();
Scan scan = new Scan();
// If the table is multi-tenant, we need to check across all tenant_ids,
// so we can't constrain the row key. Otherwise, any views would have
// the same tenantId.
if (!isMultiTenant) {
byte[] startRow = ByteUtil.concat(tenantId, QueryConstants.SEPARATOR_BYTE_ARRAY);
byte[] stopRow = ByteUtil.nextKey(startRow);
scan.setStartRow(startRow);
scan.setStopRow(stopRow);
}
SingleColumnValueFilter linkFilter = new SingleColumnValueFilter(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES, CompareOp.EQUAL, PHYSICAL_TABLE_BYTES);
linkFilter.setFilterIfMissing(true);
byte[] suffix = ByteUtil.concat(QueryConstants.SEPARATOR_BYTE_ARRAY, SchemaUtil.getTableNameAsBytes(schemaName, tableName));
SuffixFilter rowFilter = new SuffixFilter(suffix);
Filter filter = new FilterList(linkFilter, rowFilter);
scan.setFilter(filter);
scan.addColumn(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES);
// Original region-only scanner modified due to PHOENIX-1208
// RegionScanner scanner = region.getScanner(scan);
// The following *should* work, but doesn't due to HBASE-11837
// TableName systemCatalogTableName = region.getTableDesc().getTableName();
// HTableInterface hTable = env.getTable(systemCatalogTableName);
// These deprecated calls work around the issue
HTableInterface hTable = ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
try {
boolean allViewsInCurrentRegion = true;
int numOfChildViews = 0;
List<Result> results = Lists.newArrayList();
ResultScanner scanner = hTable.getScanner(scan);
try {
for (Result result = scanner.next(); (result != null); result = scanner.next()) {
numOfChildViews++;
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
ResultTuple resultTuple = new ResultTuple(result);
resultTuple.getKey(ptr);
byte[] key = ptr.copyBytes();
if (checkTableKeyInRegion(key, region) != null) {
allViewsInCurrentRegion = false;
}
results.add(result);
}
TableViewFinderResult tableViewFinderResult = new TableViewFinderResult(results);
if (numOfChildViews > 0 && !allViewsInCurrentRegion) {
tableViewFinderResult.setAllViewsNotInSingleRegion();
}
return tableViewFinderResult;
} finally {
scanner.close();
}
} finally {
hTable.close();
}
}
@SuppressWarnings("deprecation")
private void testIndexRowKeyBuilding(String schemaName, String tableName, String dataColumns,
String pk, String indexColumns, Object[] values, String includeColumns,
String dataProps, String indexProps, KeyValueBuilder builder) throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
String fullTableName = SchemaUtil.getTableName(SchemaUtil.normalizeIdentifier(schemaName),SchemaUtil.normalizeIdentifier(tableName));
String fullIndexName = SchemaUtil.getTableName(SchemaUtil.normalizeIdentifier(schemaName),SchemaUtil.normalizeIdentifier("idx"));
conn.createStatement().execute("CREATE TABLE " + fullTableName + "(" + dataColumns + " CONSTRAINT pk PRIMARY KEY (" + pk + ")) " + (dataProps.isEmpty() ? "" : dataProps) );
try {
conn.createStatement().execute("CREATE INDEX idx ON " + fullTableName + "(" + indexColumns + ") " + (includeColumns.isEmpty() ? "" : "INCLUDE (" + includeColumns + ") ") + (indexProps.isEmpty() ? "" : indexProps));
PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
PTable table = pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(), fullTableName));
PTable index = pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(),fullIndexName));
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
table.getIndexMaintainers(ptr, pconn);
List<IndexMaintainer> c1 = IndexMaintainer.deserialize(ptr, builder);
assertEquals(1,c1.size());
IndexMaintainer im1 = c1.get(0);
StringBuilder buf = new StringBuilder("UPSERT INTO " + fullTableName + " VALUES(");
for (int i = 0; i < values.length; i++) {
buf.append("?,");
}
buf.setCharAt(buf.length()-1, ')');
PreparedStatement stmt = conn.prepareStatement(buf.toString());
for (int i = 0; i < values.length; i++) {
stmt.setObject(i+1, values[i]);
}
stmt.execute();
Iterator<Pair<byte[],List<KeyValue>>> iterator = PhoenixRuntime.getUncommittedDataIterator(conn);
List<KeyValue> dataKeyValues = iterator.next().getSecond();
Map<ColumnReference,byte[]> valueMap = Maps.newHashMapWithExpectedSize(dataKeyValues.size());
byte[] row = dataKeyValues.get(0).getRow();
ImmutableBytesWritable rowKeyPtr = new ImmutableBytesWritable(row);
Put dataMutation = new Put(rowKeyPtr.copyBytes());
for (KeyValue kv : dataKeyValues) {
valueMap.put(new ColumnReference(kv.getFamily(),kv.getQualifier()), kv.getValue());
dataMutation.add(kv);
}
ValueGetter valueGetter = newValueGetter(row, valueMap);
List<Mutation> indexMutations =
IndexTestUtil.generateIndexData(index, table, dataMutation, ptr, builder);
assertEquals(1,indexMutations.size());
assertTrue(indexMutations.get(0) instanceof Put);
Mutation indexMutation = indexMutations.get(0);
ImmutableBytesWritable indexKeyPtr = new ImmutableBytesWritable(indexMutation.getRow());
ptr.set(rowKeyPtr.get(), rowKeyPtr.getOffset(), rowKeyPtr.getLength());
byte[] mutablelndexRowKey = im1.buildRowKey(valueGetter, ptr, null, null);
byte[] immutableIndexRowKey = indexKeyPtr.copyBytes();
assertArrayEquals(immutableIndexRowKey, mutablelndexRowKey);
for (ColumnReference ref : im1.getCoverededColumns()) {
valueMap.get(ref);
}
byte[] dataRowKey = im1.buildDataRowKey(indexKeyPtr, null);
assertArrayEquals(dataRowKey, dataKeyValues.get(0).getRow());
} finally {
try {
conn.createStatement().execute("DROP TABLE " + fullTableName);
} finally {
conn.close();
}
}
}
/**
* Convert a line of TSV text into an HBase table row.
*/
@Override
public void map(LongWritable offset, Text value,
Context context)
throws IOException {
byte[] lineBytes = value.getBytes();
try {
ImportTsv.TsvParser.ParsedLine parsed = parser.parse(
lineBytes, value.getLength());
ImmutableBytesWritable rowKey =
new ImmutableBytesWritable(lineBytes,
parsed.getRowKeyOffset(),
parsed.getRowKeyLength());
// Retrieve timestamp if exists
ts = parsed.getTimestamp(ts);
cellVisibilityExpr = parsed.getCellVisibility();
ttl = parsed.getCellTTL();
// create tags for the parsed line
if (hfileOutPath != null) {
tags.clear();
if (cellVisibilityExpr != null) {
tags.addAll(kvCreator.getVisibilityExpressionResolver().createVisibilityExpTags(
cellVisibilityExpr));
}
// Add TTL directly to the KV so we can vary them when packing more than one KV
// into puts
if (ttl > 0) {
tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
}
}
Put put = new Put(rowKey.copyBytes());
for (int i = 0; i < parsed.getColumnCount(); i++) {
if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
|| i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
|| i == parser.getCellTTLColumnIndex() || (skipEmptyColumns
&& parsed.getColumnLength(i) == 0)) {
continue;
}
populatePut(lineBytes, parsed, put, i);
}
context.write(rowKey, put);
} catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException
| InvalidLabelException badLine) {
if (logBadLines) {
System.err.println(value);
}
System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
if (skipBadLines) {
incrementBadLineCount(1);
return;
}
throw new IOException(badLine);
} catch (InterruptedException e) {
LOG.error("Interrupted while emitting put", e);
Thread.currentThread().interrupt();
}
}
/**
* Intersects an RVC that starts at pkPos with an overlapping range that starts at otherPKPos.
* For example, ((A, B) - (J, K)) intersected with (F - *) would return ((A,F) - (J, K))
* ((A, B) - (J, K)) intersected with (M - P) would return (A-J) since both of the trailing
* part of the RVC, B and K, do not intersect with B and K.
* @param result an RVC expression starting from pkPos and with length of at least otherPKPos - pkPos.
* @param pkPos the PK position of the leading part of the RVC expression
* @param otherRange the other range to intersect with the overlapping part of the RVC.
* @param otherPKPos the PK position of the leading part of the other range
* @return resulting KeyRange from the intersection, potentially an empty range if the result RVC
* is a single key and the trailing part of the key does not intersect with the RVC.
*/
private KeyRange intersectTrailing(KeyRange result, int pkPos, KeyRange otherRange, int otherPKPos) {
RowKeySchema rowKeySchema = table.getRowKeySchema();
ImmutableBytesWritable ptr = context.getTempPtr();
int separatorLength = table.getPKColumns().get(otherPKPos-1).getDataType().isFixedWidth() ? 0 : 1;
boolean lowerInclusive = result.isLowerInclusive();
byte[] lowerRange = result.getLowerRange();
ptr.set(lowerRange);
// Position ptr at the point at which the two ranges overlap
if (rowKeySchema.position(ptr, pkPos, otherPKPos)) {
int lowerOffset = ptr.getOffset();
// Increase the length of the ptr to include the entire trailing bytes
ptr.set(ptr.get(), lowerOffset, lowerRange.length - lowerOffset);
byte[] trailingBytes = ptr.copyBytes();
// Special case for single key since single keys of different span lengths
// will never overlap. We do not need to process both the lower and upper
// ranges since they are the same.
if (result.isSingleKey() && otherRange.isSingleKey()) {
int minSpan = rowKeySchema.computeMinSpan(pkPos, result, ptr);
int otherMinSpan =
rowKeySchema.computeMinSpan(otherPKPos, otherRange, ptr);
byte[] otherLowerRange;
boolean isFixedWidthAtEnd;
if (pkPos + minSpan <= otherPKPos + otherMinSpan) {
otherLowerRange = otherRange.getLowerRange();
isFixedWidthAtEnd = table.getPKColumns().get(pkPos + minSpan -1).getDataType().isFixedWidth();
} else {
otherLowerRange = trailingBytes;
trailingBytes = otherRange.getLowerRange();
isFixedWidthAtEnd = table.getPKColumns().get(otherPKPos + otherMinSpan -1).getDataType().isFixedWidth();
}
// If the otherRange starts with the overlapping trailing byte *and* we're comparing
// the entire key (i.e. not just a leading subset), then we have an intersection.
if (Bytes.startsWith(otherLowerRange, trailingBytes) &&
(isFixedWidthAtEnd ||
otherLowerRange.length == trailingBytes.length ||
otherLowerRange[trailingBytes.length] == QueryConstants.SEPARATOR_BYTE)) {
return result;
}
// Otherwise, there is no overlap
return KeyRange.EMPTY_RANGE;
}
// If we're not dealing with single keys, then we can use our normal intersection
if (otherRange.intersect(KeyRange.getKeyRange(trailingBytes)) == KeyRange.EMPTY_RANGE) {
// Exit early since the upper range is the same as the lower range
if (result.isSingleKey()) {
return KeyRange.EMPTY_RANGE;
}
ptr.set(result.getLowerRange(), 0, lowerOffset - separatorLength);
lowerRange = ptr.copyBytes();
}
}
boolean upperInclusive = result.isUpperInclusive();
byte[] upperRange = result.getUpperRange();
ptr.set(upperRange);
if (rowKeySchema.position(ptr, pkPos, otherPKPos)) {
int upperOffset = ptr.getOffset();
ptr.set(ptr.get(), upperOffset, upperRange.length - upperOffset);
if (otherRange.intersect(KeyRange.getKeyRange(ptr.copyBytes())) == KeyRange.EMPTY_RANGE) {
ptr.set(ptr.get(), 0, upperOffset - separatorLength);
upperRange = ptr.copyBytes();
}
}
if (lowerRange == result.getLowerRange() && upperRange == result.getUpperRange()) {
return result;
}
KeyRange range = KeyRange.getKeyRange(lowerRange, lowerInclusive, upperRange, upperInclusive);
return range;
}
public static byte[] copyBytesIfNecessary(ImmutableBytesWritable ptr) {
if (ptr.getOffset() == 0 && ptr.getLength() == ptr.get().length) {
return ptr.get();
}
return ptr.copyBytes();
}
private void testIndexRowKeyBuilding(String schemaName, String tableName, String dataColumns,
String pk, String indexColumns, Object[] values, String includeColumns,
String dataProps, String indexProps, KeyValueBuilder builder) throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
String fullTableName = SchemaUtil.getTableName(SchemaUtil.normalizeIdentifier(schemaName),SchemaUtil.normalizeIdentifier(tableName));
String fullIndexName = SchemaUtil.getTableName(SchemaUtil.normalizeIdentifier(schemaName),SchemaUtil.normalizeIdentifier("idx"));
conn.createStatement().execute("CREATE TABLE " + fullTableName + "(" + dataColumns + " CONSTRAINT pk PRIMARY KEY (" + pk + ")) " + (dataProps.isEmpty() ? "" : dataProps) );
try {
conn.createStatement().execute("CREATE INDEX idx ON " + fullTableName + "(" + indexColumns + ") " + (includeColumns.isEmpty() ? "" : "INCLUDE (" + includeColumns + ") ") + (indexProps.isEmpty() ? "" : indexProps));
PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
PTable table = pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName));
PTable index = pconn.getTable(new PTableKey(pconn.getTenantId(),fullIndexName));
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
table.getIndexMaintainers(ptr, pconn);
List<IndexMaintainer> c1 = IndexMaintainer.deserialize(ptr, builder, true);
assertEquals(1,c1.size());
IndexMaintainer im1 = c1.get(0);
StringBuilder buf = new StringBuilder("UPSERT INTO " + fullTableName + " VALUES(");
for (int i = 0; i < values.length; i++) {
buf.append("?,");
}
buf.setCharAt(buf.length()-1, ')');
PreparedStatement stmt = conn.prepareStatement(buf.toString());
for (int i = 0; i < values.length; i++) {
stmt.setObject(i+1, values[i]);
}
stmt.execute();
Iterator<Pair<byte[],List<Cell>>> iterator = PhoenixRuntime.getUncommittedDataIterator(conn);
List<Cell> dataKeyValues = iterator.next().getSecond();
Map<ColumnReference,byte[]> valueMap = Maps.newHashMapWithExpectedSize(dataKeyValues.size());
ImmutableBytesWritable rowKeyPtr = new ImmutableBytesWritable(dataKeyValues.get(0).getRowArray(), dataKeyValues.get(0).getRowOffset(), dataKeyValues.get(0).getRowLength());
byte[] row = rowKeyPtr.copyBytes();
Put dataMutation = new Put(row);
for (Cell kv : dataKeyValues) {
valueMap.put(new ColumnReference(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()), CellUtil.cloneValue(kv));
dataMutation.add(kv);
}
ValueGetter valueGetter = newValueGetter(row, valueMap);
List<Mutation> indexMutations = IndexTestUtil.generateIndexData(index, table, dataMutation, ptr, builder);
assertEquals(1,indexMutations.size());
assertTrue(indexMutations.get(0) instanceof Put);
Mutation indexMutation = indexMutations.get(0);
ImmutableBytesWritable indexKeyPtr = new ImmutableBytesWritable(indexMutation.getRow());
ptr.set(rowKeyPtr.get(), rowKeyPtr.getOffset(), rowKeyPtr.getLength());
byte[] mutablelndexRowKey = im1.buildRowKey(valueGetter, ptr, null, null, HConstants.LATEST_TIMESTAMP);
byte[] immutableIndexRowKey = indexKeyPtr.copyBytes();
assertArrayEquals(immutableIndexRowKey, mutablelndexRowKey);
for (ColumnReference ref : im1.getCoveredColumns()) {
valueMap.get(ref);
}
byte[] dataRowKey = im1.buildDataRowKey(indexKeyPtr, null);
assertArrayEquals(dataRowKey, CellUtil.cloneRow(dataKeyValues.get(0)));
} finally {
try {
conn.rollback();
conn.createStatement().execute("DROP TABLE " + fullTableName);
} finally {
conn.close();
}
}
}
public static byte[] copyBytesIfNecessary(ImmutableBytesWritable ptr) {
if (ptr.getOffset() == 0 && ptr.getLength() == ptr.get().length) {
return ptr.get();
}
return ptr.copyBytes();
}
private ImmutableBytesWritable copy(ImmutableBytesWritable bytes) {
return new ImmutableBytesWritable(bytes.copyBytes());
}
private void testIndexRowKeyBuilding(String schemaName, String tableName, String dataColumns,
String pk, String indexColumns, Object[] values, String includeColumns,
String dataProps, String indexProps, KeyValueBuilder builder) throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
String fullTableName = SchemaUtil.getTableName(schemaName, tableName) ;
conn.createStatement().execute("CREATE TABLE " + fullTableName + "(" + dataColumns + " CONSTRAINT pk PRIMARY KEY (" + pk + ")) " + (dataProps.isEmpty() ? "" : dataProps) );
try {
conn.createStatement().execute("CREATE INDEX idx ON " + fullTableName + "(" + indexColumns + ") " + (includeColumns.isEmpty() ? "" : "INCLUDE (" + includeColumns + ") ") + (indexProps.isEmpty() ? "" : indexProps));
PTable table = conn.unwrap(PhoenixConnection.class).getPMetaData().getTable(SchemaUtil.getTableName(SchemaUtil.normalizeIdentifier(schemaName),SchemaUtil.normalizeIdentifier(tableName)));
PTable index = conn.unwrap(PhoenixConnection.class).getPMetaData().getTable(SchemaUtil.getTableName(SchemaUtil.normalizeIdentifier(schemaName),SchemaUtil.normalizeIdentifier("idx")));
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
table.getIndexMaintainers(ptr);
List<IndexMaintainer> c1 = IndexMaintainer.deserialize(ptr, builder);
assertEquals(1,c1.size());
IndexMaintainer im1 = c1.get(0);
StringBuilder buf = new StringBuilder("UPSERT INTO " + fullTableName + " VALUES(");
for (int i = 0; i < values.length; i++) {
buf.append("?,");
}
buf.setCharAt(buf.length()-1, ')');
PreparedStatement stmt = conn.prepareStatement(buf.toString());
for (int i = 0; i < values.length; i++) {
stmt.setObject(i+1, values[i]);
}
stmt.execute();
Iterator<Pair<byte[],List<KeyValue>>> iterator = PhoenixRuntime.getUncommittedDataIterator(conn);
List<KeyValue> dataKeyValues = iterator.next().getSecond();
Map<ColumnReference,byte[]> valueMap = Maps.newHashMapWithExpectedSize(dataKeyValues.size());
ImmutableBytesWritable rowKeyPtr = new ImmutableBytesWritable(dataKeyValues.get(0).getRow());
Put dataMutation = new Put(rowKeyPtr.copyBytes());
for (KeyValue kv : dataKeyValues) {
valueMap.put(new ColumnReference(kv.getFamily(),kv.getQualifier()), kv.getValue());
dataMutation.add(kv);
}
ValueGetter valueGetter = newValueGetter(valueMap);
List<Mutation> indexMutations =
IndexTestUtil.generateIndexData(index, table, dataMutation, ptr, builder);
assertEquals(1,indexMutations.size());
assertTrue(indexMutations.get(0) instanceof Put);
Mutation indexMutation = indexMutations.get(0);
ImmutableBytesWritable indexKeyPtr = new ImmutableBytesWritable(indexMutation.getRow());
ptr.set(rowKeyPtr.get(), rowKeyPtr.getOffset(), rowKeyPtr.getLength());
byte[] mutablelndexRowKey = im1.buildRowKey(valueGetter, ptr);
byte[] immutableIndexRowKey = indexKeyPtr.copyBytes();
assertArrayEquals(immutableIndexRowKey, mutablelndexRowKey);
for (ColumnReference ref : im1.getCoverededColumns()) {
valueMap.get(ref);
}
} finally {
try {
conn.createStatement().execute("DROP TABLE " + fullTableName);
} finally {
conn.close();
}
}
}