下面列出了com.datastax.driver.core.DataType#serializeValue ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Returns the partition key related to a given {@link Cells}.
*
* @param cells {@link Cells} from Cassandra to extract the partition key.
* @param keyValidator Cassandra key type.
* @param numberOfKeys Number of keys.
* @return Partition key.
*/
public static ByteBuffer getPartitionKey(Cells cells, AbstractType<?> keyValidator, int numberOfKeys) {
ByteBuffer partitionKey;
if (keyValidator instanceof CompositeType) {
ByteBuffer[] keys = new ByteBuffer[numberOfKeys];
for (int i = 0; i < cells.size(); i++) {
Cell c = cells.getCellByIdx(i);
if (c.isKey()) {
keys[i] = DataType.serializeValue(c.getValue(), CassandraDeepJobConfig.PROTOCOL_VERSION);
}
}
partitionKey = CompositeType.build(keys);
} else {
Cell cell = cells.getCellByIdx(0);
partitionKey = DataType.serializeValue(cell.getValue(), CassandraDeepJobConfig.PROTOCOL_VERSION);
}
return partitionKey;
}
private ByteBuffer serializeLogKey(UUID appUUID, String applicationType, UUID entityId, String entityType){
List<Object> keys = new ArrayList<>(4);
keys.add(appUUID);
keys.add(applicationType);
keys.add(entityId);
keys.add(entityType);
int size = 16+applicationType.getBytes().length+16+entityType.getBytes().length;
// we always need to add length for the 2 byte short and 1 byte equality
size += keys.size()*3;
ByteBuffer stuff = ByteBuffer.allocate(size);
for (Object key : keys) {
ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
if (kb == null) {
kb = ByteBuffer.allocate(0);
}
stuff.putShort((short) kb.remaining());
stuff.put(kb.slice());
stuff.put((byte) 0);
}
stuff.flip();
return stuff;
}
@Test
public void writeNewMessageData(){
QueueMessageSerialization queueMessageSerialization =
getInjector().getInstance( QueueMessageSerialization.class );
Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
UUID messageId = QakkaUtils.getTimeUuid();
final String data = "my test data";
final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
DataType.serializeValue(data, ProtocolVersion.NEWEST_SUPPORTED), "text/plain");
queueMessageSerialization.writeMessageData(messageId, messageBody);
final DatabaseQueueMessageBody returnedData = queueMessageSerialization.loadMessageData( messageId );
}
@Test
public void loadMessageData() throws Exception {
QueueMessageSerialization queueMessageSerialization =
getInjector().getInstance( QueueMessageSerialization.class );
Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
UUID messageId = QakkaUtils.getTimeUuid();
final String data = "my test data";
final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody( DataType.serializeValue(data,
ProtocolVersion.NEWEST_SUPPORTED), "text/plain");
queueMessageSerialization.writeMessageData(messageId, messageBody);
final DatabaseQueueMessageBody returnedBody = queueMessageSerialization.loadMessageData( messageId );
String returnedData = new String( returnedBody.getBlob().array(), "UTF-8");
assertEquals(data, returnedData);
}
@Test
public void loadMessageObjectData() throws Exception {
QueueMessageSerialization queueMessageSerialization =
getInjector().getInstance( QueueMessageSerialization.class );
Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
UUID messageId = QakkaUtils.getTimeUuid();
final String data = "my test data";
final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody( DataType.serializeValue(data,
ProtocolVersion.NEWEST_SUPPORTED), "text/plain");
queueMessageSerialization.writeMessageData(messageId, messageBody);
final DatabaseQueueMessageBody returnedBody = queueMessageSerialization.loadMessageData( messageId );
String returnedData = new String( returnedBody.getBlob().array(), "UTF-8");
assertEquals(data, returnedData);
}
@Test
public void testGetMultipleQueueMessages() throws InterruptedException {
Injector injector = getInjector();
ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
String region = actorSystemFig.getRegionLocal();
// App app = injector.getInstance( App.class );
// app.start("localhost", getNextAkkaPort(), region);
DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
QueueMessageSerialization serialization = injector.getInstance( QueueMessageSerialization.class );
TransferLogSerialization xferLogSerialization = injector.getInstance( TransferLogSerialization.class );
QueueMessageManager queueMessageManager = injector.getInstance( QueueMessageManager.class );
String queueName = "queue_testGetMultipleQueueMessages_" + UUID.randomUUID();
QueueManager queueManager = injector.getInstance( QueueManager.class );
try {
queueManager.createQueue(
new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) );
for (int i = 0; i < 100; i++) {
UUID messageId = UUIDGen.getTimeUUID();
final String data = "my test data";
final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
serialization.writeMessageData( messageId, messageBody );
xferLogSerialization.recordTransferLog(
queueName, actorSystemFig.getRegionLocal(), region, messageId );
distributedQueueService.sendMessageToRegion(
queueName, region, region, messageId, null, null );
}
DatabaseQueueMessage.Type type = DatabaseQueueMessage.Type.DEFAULT;
int maxRetries = 30;
int retries = 0;
long count = 0;
while (retries++ < maxRetries) {
distributedQueueService.refresh();
count = queueMessageManager.getQueueDepth( queueName, type );
if ( count == 100 ) {
break;
}
Thread.sleep( 1000 );
}
Assert.assertEquals( 100, count );
Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
Assert.assertEquals( 75, queueMessageManager.getQueueDepth( queueName, type ) );
Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
Assert.assertEquals( 50, queueMessageManager.getQueueDepth( queueName, type ) );
Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
Assert.assertEquals( 25, queueMessageManager.getQueueDepth( queueName, type ) );
Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
Assert.assertEquals( 0, queueMessageManager.getQueueDepth( queueName, type ) );
distributedQueueService.shutdown();
} finally {
queueManager.deleteQueue( queueName );
}
}
@Test
public void testQueueMessageCounter() throws InterruptedException {
Injector injector = getInjector();
ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
String region = actorSystemFig.getRegionLocal();
// App app = injector.getInstance( App.class );
// app.start("localhost", getNextAkkaPort(), region);
DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
QueueMessageSerialization serialization = injector.getInstance( QueueMessageSerialization.class );
TransferLogSerialization xferLogSerialization = injector.getInstance( TransferLogSerialization.class );
QueueMessageManager queueMessageManager = injector.getInstance( QueueMessageManager.class );
String queueName = "queue_testGetMultipleQueueMessages_" + UUID.randomUUID();
QueueManager queueManager = injector.getInstance( QueueManager.class );
try {
queueManager.createQueue(
new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) );
UUID messageId = UUIDGen.getTimeUUID();
final String data = "my test data";
final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
serialization.writeMessageData( messageId, messageBody );
xferLogSerialization.recordTransferLog(
queueName, actorSystemFig.getRegionLocal(), region, messageId );
distributedQueueService.sendMessageToRegion(
queueName, region, region, messageId, null, null );
DatabaseQueueMessage.Type type = DatabaseQueueMessage.Type.DEFAULT;
Thread.sleep(5000);
int maxRetries = 10;
int retries = 0;
long count = 0;
while (retries++ < maxRetries) {
distributedQueueService.refresh();
count = queueMessageManager.getQueueDepth( queueName, type );
if ( count > 0 ) {
break;
}
Thread.sleep( 1000 );
}
Thread.sleep( 1000 );
Assert.assertEquals( 1, queueMessageManager.getQueueDepth( queueName, type ) );
distributedQueueService.shutdown();
} finally {
queueManager.deleteQueue( queueName );
}
}
@Test
public void testGetWithMissingData() throws InterruptedException {
Injector injector = getInjector();
injector.getInstance( App.class ); // init the INJECTOR
ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
DistributedQueueService qas = injector.getInstance( DistributedQueueService.class );
QueueManager qm = injector.getInstance( QueueManager.class );
QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class );
QueueMessageSerialization qms = injector.getInstance( QueueMessageSerialization.class );
String region = actorSystemFig.getRegionLocal();
// App app = injector.getInstance( App.class );
// app.start( "localhost", getNextAkkaPort(), region );
// create queue messages, every other one with missing data
int numMessages = 100;
String queueName = "qmmt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
qm.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
for ( int i=0; i<numMessages; i++ ) {
final UUID messageId = QakkaUtils.getTimeUuid();
if ( i % 2 == 0 ) { // every other it
final String data = "my test data";
final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
qms.writeMessageData( messageId, messageBody );
}
UUID queueMessageId = QakkaUtils.getTimeUuid();
DatabaseQueueMessage message = new DatabaseQueueMessage(
messageId,
DatabaseQueueMessage.Type.DEFAULT,
queueName,
actorSystemFig.getRegionLocal(),
null,
System.currentTimeMillis(),
null,
queueMessageId);
qms.writeMessage( message );
}
qas.refresh();
Thread.sleep(1000);
int count = 0;
while ( count < numMessages / 2 ) {
List<QueueMessage> messages = qmm.getNextMessages( queueName, 1 );
Assert.assertTrue( !messages.isEmpty() );
count += messages.size();
logger.debug("Got {} messages", ++count);
}
DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
distributedQueueService.shutdown();
}
@Override
protected ByteBuffer serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){
/**
* final UUID version = value.getVersion();
final Field<?> field = value.getField();
final FieldTypeName fieldType = field.getTypeName();
final String fieldValue = field.getValue().toString().toLowerCase();
DynamicComposite composite = new DynamicComposite( );
//we want to sort ascending to descending by version
composite.addComponent( version, UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED);
composite.addComponent( field.getName(), STRING_SERIALIZER );
composite.addComponent( fieldValue, STRING_SERIALIZER );
composite.addComponent( fieldType.name() , STRING_SERIALIZER);
*/
// values are serialized as strings, not sure why, and always lower cased
String fieldValueString = fieldEntry.getField().getValue().toString().toLowerCase();
List<Object> keys = new ArrayList<>(4);
keys.add(fieldEntry.getVersion());
keys.add(fieldEntry.getField().getName());
keys.add(fieldValueString);
keys.add(fieldEntry.getField().getTypeName().name());
String comparator = UUID_TYPE_REVERSED;
int size = 16+fieldEntry.getField().getName().getBytes().length
+fieldEntry.getField().getValue().toString().getBytes().length+
fieldEntry.getField().getTypeName().name().getBytes().length;
// we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality
size += keys.size()*5;
// uuid type comparator is longest, ensure we allocate buffer using the max size to avoid overflow
size += keys.size()*comparator.getBytes().length;
ByteBuffer stuff = ByteBuffer.allocate(size);
for (Object key : keys) {
if(key.equals(fieldEntry.getVersion())) {
int p = comparator.indexOf("(reversed=true)");
boolean desc = false;
if (p >= 0) {
comparator = comparator.substring(0, p);
desc = true;
}
byte a = (byte) 85; // this is the byte value for UUIDType in astyanax used in legacy data
if (desc) {
a = (byte) Character.toUpperCase((char) a);
}
stuff.putShort((short) ('耀' | a));
}else{
comparator = "UTF8Type"; // only strings are being serialized other than UUIDs here
stuff.putShort((short)comparator.getBytes().length);
stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
}
ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
if (kb == null) {
kb = ByteBuffer.allocate(0);
}
// put a short that indicates how big the buffer is for this item
stuff.putShort((short) kb.remaining());
// put the actual item
stuff.put(kb.slice());
// put an equality byte ( again not used by part of legacy thrift Astyanax schema)
stuff.put((byte) 0);
}
stuff.flip();
return stuff;
}
@Override
protected ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion){
/**
* final Id entityId = ev.getEntityId();
final UUID entityUuid = entityId.getUuid();
final String entityType = entityId.getType();
CompositeBuilder builder = Composites.newDynamicCompositeBuilder();
builder.addUUID( entityVersion );
builder.addUUID( entityUuid );
builder.addString(entityType );
*/
String comparator = "UTF8Type";
List<Object> keys = new ArrayList<>(3);
keys.add(entityVersion.getEntityVersion());
keys.add(entityVersion.getEntityId().getUuid());
keys.add(entityVersion.getEntityId().getType());
// UUIDs are 16 bytes
int size = 16+16+entityVersion.getEntityId().getType().getBytes().length;
// we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality
size += keys.size()*5;
// we always add comparator to the buffer as well
size += keys.size()*comparator.getBytes().length;
ByteBuffer stuff = ByteBuffer.allocate(size);
for (Object key : keys) {
// custom comparator alias to comparator mappings in CQLUtils.COMPOSITE_TYPE ( more leftover from Asytanax )
// the custom mapping is used for schema creation, but datastax driver does not have the alias concept and
// we must work with the actual types
if(key instanceof UUID){
comparator = "UUIDType";
}else{
comparator = "UTF8Type"; // if it's not a UUID, the only other thing we're serializing is text
}
stuff.putShort((short)comparator.getBytes().length);
stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
if (kb == null) {
kb = ByteBuffer.allocate(0);
}
// put a short that indicates how big the buffer is for this item
stuff.putShort((short) kb.remaining());
// put the actual item
stuff.put(kb.slice());
// put an equality byte ( again not used by part of legacy thrift Astyanax schema)
stuff.put((byte) 0);
}
stuff.flip();
return stuff;
}
private ByteBuffer serializeKey(UUID appUUID,
String applicationType,
String entityType,
String fieldType,
String fieldName,
Object fieldValue ){
// values are serialized as strings, not sure why, and always lower cased
String fieldValueString = fieldValue.toString().toLowerCase();
List<Object> keys = new ArrayList<>(6);
keys.add(0, appUUID);
keys.add(1, applicationType);
keys.add(2, entityType);
keys.add(3, fieldType);
keys.add(4, fieldName);
keys.add(5, fieldValueString);
// UUIDs are 16 bytes, allocate the buffer accordingly
int size = 16 + applicationType.getBytes().length + entityType.getBytes().length
+ fieldType.getBytes().length + fieldName.getBytes().length+fieldValueString.getBytes().length;
// we always need to add length for the 2 byte short and 1 byte equality
size += keys.size()*3;
ByteBuffer stuff = ByteBuffer.allocate(size);
for (Object key : keys) {
ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
if (kb == null) {
kb = ByteBuffer.allocate(0);
}
stuff.putShort((short) kb.remaining());
stuff.put(kb.slice());
stuff.put((byte) 0);
}
stuff.flip();
return stuff;
}
@Override
protected ByteBuffer serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){
/**
* final UUID version = value.getVersion();
final Field<?> field = value.getField();
final FieldTypeName fieldType = field.getTypeName();
final String fieldValue = field.getValue().toString().toLowerCase();
DynamicComposite composite = new DynamicComposite( );
//we want to sort ascending to descending by version
composite.addComponent( version, UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED);
composite.addComponent( field.getName(), STRING_SERIALIZER );
composite.addComponent( fieldValue, STRING_SERIALIZER );
composite.addComponent( fieldType.name() , STRING_SERIALIZER);
*/
// values are serialized as strings, not sure why, and always lower cased
String fieldValueString = fieldEntry.getField().getValue().toString().toLowerCase();
List<Object> keys = new ArrayList<>(4);
keys.add(fieldEntry.getVersion());
keys.add(fieldEntry.getField().getName());
keys.add(fieldValueString);
keys.add(fieldEntry.getField().getTypeName().name());
String comparator = UUID_TYPE_REVERSED;
int size = 16+fieldEntry.getField().getName().getBytes().length
+fieldEntry.getField().getValue().toString().getBytes().length+
fieldEntry.getField().getTypeName().name().getBytes().length;
// we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality
size += keys.size()*5;
// uuid type comparator is longest, ensure we allocate buffer using the max size to avoid overflow
size += keys.size()*comparator.getBytes().length;
ByteBuffer stuff = ByteBuffer.allocate(size);
for (Object key : keys) {
if(key.equals(fieldEntry.getVersion())) {
int p = comparator.indexOf("(reversed=true)");
boolean desc = false;
if (p >= 0) {
comparator = comparator.substring(0, p);
desc = true;
}
byte a = (byte) 85; // this is the byte value for UUIDType in astyanax used in legacy data
if (desc) {
a = (byte) Character.toUpperCase((char) a);
}
stuff.putShort((short) ('耀' | a));
}else{
comparator = "UTF8Type"; // only strings are being serialized other than UUIDs here
stuff.putShort((short)comparator.getBytes().length);
stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
}
ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
if (kb == null) {
kb = ByteBuffer.allocate(0);
}
// put a short that indicates how big the buffer is for this item
stuff.putShort((short) kb.remaining());
// put the actual item
stuff.put(kb.slice());
// put an equality byte ( again not used by part of legacy thrift Astyanax schema)
stuff.put((byte) 0);
}
stuff.flip();
return stuff;
}
@Override
protected ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion){
/**
* final Id entityId = ev.getEntityId();
final UUID entityUuid = entityId.getUuid();
final String entityType = entityId.getType();
CompositeBuilder builder = Composites.newDynamicCompositeBuilder();
builder.addUUID( entityVersion );
builder.addUUID( entityUuid );
builder.addString(entityType );
*/
String comparator = "UTF8Type";
List<Object> keys = new ArrayList<>(3);
keys.add(entityVersion.getEntityVersion());
keys.add(entityVersion.getEntityId().getUuid());
keys.add(entityVersion.getEntityId().getType());
// UUIDs are 16 bytes
int size = 16+16+entityVersion.getEntityId().getType().getBytes().length;
// we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality
size += keys.size()*5;
// we always add comparator to the buffer as well
size += keys.size()*comparator.getBytes().length;
ByteBuffer stuff = ByteBuffer.allocate(size);
for (Object key : keys) {
if(key instanceof UUID){
comparator = "UUIDType";
}else{
comparator = "UTF8Type"; // if it's not a UUID, the only other thing we're serializing is text
}
stuff.putShort((short)comparator.getBytes().length);
stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
if (kb == null) {
kb = ByteBuffer.allocate(0);
}
// put a short that indicates how big the buffer is for this item
stuff.putShort((short) kb.remaining());
// put the actual item
stuff.put(kb.slice());
// put an equality byte ( again not used by part of legacy thrift Astyanax schema)
stuff.put((byte) 0);
}
stuff.flip();
return stuff;
}
private ByteBuffer serializeKey( UUID appUUID,
String applicationType,
String entityType,
String fieldType,
String fieldName,
Object fieldValue ){
final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( entityType );
/**
final CollectionPrefixedKey<Field> uniquePrefixedKey =
new CollectionPrefixedKey<>( collectionName, applicationId, field );
final Id orgId = ID_SER.fromComposite( parser );
final Id scopeId = ID_SER.fromComposite( parser );
final String scopeName = parser.readString();
final K value = keySerializer.fromComposite( parser );
**/
// values are serialized as strings, not sure why, and always lower cased
String fieldValueString = fieldValue.toString().toLowerCase();
List<Object> keys = new ArrayList<>(8);
keys.add(0, appUUID);
keys.add(1, applicationType);
keys.add(2, appUUID);
keys.add(3, applicationType);
keys.add(4, collectionName);
keys.add(5, fieldType);
keys.add(6, fieldName);
keys.add(7, fieldValueString);
// UUIDs are 16 bytes, allocate the buffer accordingly
int size = 16 + applicationType.getBytes().length + 16 + applicationType.getBytes().length +
collectionName.getBytes().length + fieldType.getBytes().length + fieldName.getBytes().length
+ fieldValueString.getBytes().length;
// we always need to add length for the 2 byte short and 1 byte equality
size += keys.size()*3;
ByteBuffer stuff = ByteBuffer.allocate(size);
for (Object key : keys) {
ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
if (kb == null) {
kb = ByteBuffer.allocate(0);
}
stuff.putShort((short) kb.remaining());
stuff.put(kb.slice());
stuff.put((byte) 0);
}
stuff.flip();
return stuff;
}
private ByteBuffer serializeLogKey(UUID appUUID, String applicationType, UUID entityId, String entityType){
final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( entityType );
/**
final CollectionPrefixedKey<Id> collectionPrefixedEntityKey =
new CollectionPrefixedKey<>( collectionName, applicationId, uniqueValueId );
**/
List<Object> keys = new ArrayList<>(4);
keys.add(appUUID);
keys.add(applicationType);
keys.add(appUUID);
keys.add(applicationType);
keys.add(collectionName);
keys.add(entityId);
keys.add(entityType);
int size = 16+applicationType.getBytes().length+16+applicationType.getBytes().length
+collectionName.getBytes().length+16+entityType.getBytes().length;
// we always need to add length for the 2 byte short and 1 byte equality
size += keys.size()*3;
ByteBuffer stuff = ByteBuffer.allocate(size);
for (Object key : keys) {
ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
if (kb == null) {
kb = ByteBuffer.allocate(0);
}
stuff.putShort((short) kb.remaining());
stuff.put(kb.slice());
stuff.put((byte) 0);
}
stuff.flip();
return stuff;
}
@Test
public void deleteMessageData() throws UnsupportedEncodingException {
QueueMessageSerialization queueMessageSerialization =
getInjector().getInstance( QueueMessageSerialization.class );
Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
UUID messageId = QakkaUtils.getTimeUuid();
final String data = "my test data";
final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody( DataType.serializeValue(data,
ProtocolVersion.NEWEST_SUPPORTED), "text/plain");
queueMessageSerialization.writeMessageData(messageId, messageBody);
final DatabaseQueueMessageBody returnedBody = queueMessageSerialization.loadMessageData( messageId );
final String returnedData = new String( returnedBody.getBlob().array(), "UTF-8");
assertEquals(data, returnedData);
queueMessageSerialization.deleteMessageData(messageId);
assertNull(queueMessageSerialization.loadMessageData( messageId ));
}
@Test
public void testBasicOperation() throws Exception {
Injector injector = getInjector();
ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
String region = actorSystemFig.getRegionLocal();
// App app = injector.getInstance( App.class );
// app.start( "localhost", getNextAkkaPort(), region );
DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
QueueMessageSerialization serialization = injector.getInstance( QueueMessageSerialization.class );
String queueName = "testqueue_" + UUID.randomUUID();
QueueManager queueManager = injector.getInstance( QueueManager.class );
queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
try {
// send 1 queue message, get back one queue message
UUID messageId = UUIDGen.getTimeUUID();
final String data = "my test data";
final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
serialization.writeMessageData( messageId, messageBody );
distributedQueueService.sendMessageToRegion(
queueName, region, region, messageId, null, null );
distributedQueueService.refresh();
Thread.sleep( 1000 );
Collection<DatabaseQueueMessage> qmReturned = distributedQueueService.getNextMessages( queueName, 1 );
Assert.assertEquals( 1, qmReturned.size() );
DatabaseQueueMessage dqm = qmReturned.iterator().next();
DatabaseQueueMessageBody dqmb = serialization.loadMessageData( dqm.getMessageId() );
ByteBuffer blob = dqmb.getBlob();
String returnedData = new String( blob.array(), "UTF-8" );
Assert.assertEquals( data, returnedData );
distributedQueueService.shutdown();
} finally {
queueManager.deleteQueue( queueName );
}
}