com.datastax.driver.core.DataType#serializeValue ( )源码实例Demo

下面列出了com.datastax.driver.core.DataType#serializeValue ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: deep-spark   文件: CassandraUtils.java
/**
 * Returns the partition key related to a given {@link Cells}.
 *
 * @param cells        {@link Cells} from Cassandra to extract the partition key.
 * @param keyValidator Cassandra key type.
 * @param numberOfKeys Number of keys.
 * @return Partition key.
 */
public static ByteBuffer getPartitionKey(Cells cells, AbstractType<?> keyValidator, int numberOfKeys) {
    ByteBuffer partitionKey;
    if (keyValidator instanceof CompositeType) {
        ByteBuffer[] keys = new ByteBuffer[numberOfKeys];

        for (int i = 0; i < cells.size(); i++) {
            Cell c = cells.getCellByIdx(i);

            if (c.isKey()) {
                keys[i] = DataType.serializeValue(c.getValue(), CassandraDeepJobConfig.PROTOCOL_VERSION);
            }
        }

        partitionKey = CompositeType.build(keys);
    } else {
        Cell cell = cells.getCellByIdx(0);
        partitionKey = DataType.serializeValue(cell.getValue(), CassandraDeepJobConfig.PROTOCOL_VERSION);
    }
    return partitionKey;
}
 
private ByteBuffer serializeLogKey(UUID appUUID, String applicationType, UUID entityId, String entityType){

        List<Object> keys = new ArrayList<>(4);
        keys.add(appUUID);
        keys.add(applicationType);
        keys.add(entityId);
        keys.add(entityType);

        int size = 16+applicationType.getBytes().length+16+entityType.getBytes().length;

        // we always need to add length for the 2 byte short and 1 byte equality
        size += keys.size()*3;

        ByteBuffer stuff = ByteBuffer.allocate(size);

        for (Object key : keys) {

            ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
            if (kb == null) {
                kb = ByteBuffer.allocate(0);
            }

            stuff.putShort((short) kb.remaining());
            stuff.put(kb.slice());
            stuff.put((byte) 0);


        }
        stuff.flip();
        return stuff;

    }
 
@Test
public void writeNewMessageData(){

    QueueMessageSerialization queueMessageSerialization =
            getInjector().getInstance( QueueMessageSerialization.class );

    Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);

    UUID messageId = QakkaUtils.getTimeUuid();

    final String data = "my test data";

    final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
            DataType.serializeValue(data, ProtocolVersion.NEWEST_SUPPORTED), "text/plain");

    queueMessageSerialization.writeMessageData(messageId, messageBody);

    final DatabaseQueueMessageBody returnedData = queueMessageSerialization.loadMessageData( messageId );
}
 
@Test
public void loadMessageData() throws Exception {

    QueueMessageSerialization queueMessageSerialization =
            getInjector().getInstance( QueueMessageSerialization.class );

    Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);

    UUID messageId = QakkaUtils.getTimeUuid();

    final String data = "my test data";

    final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody( DataType.serializeValue(data,
            ProtocolVersion.NEWEST_SUPPORTED), "text/plain");

    queueMessageSerialization.writeMessageData(messageId, messageBody);

    final DatabaseQueueMessageBody returnedBody = queueMessageSerialization.loadMessageData( messageId );
    String returnedData = new String( returnedBody.getBlob().array(), "UTF-8");

    assertEquals(data, returnedData);
}
 
@Test
public void loadMessageObjectData() throws Exception {

    QueueMessageSerialization queueMessageSerialization =
        getInjector().getInstance( QueueMessageSerialization.class );

    Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);

    UUID messageId = QakkaUtils.getTimeUuid();

    final String data = "my test data";

    final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody( DataType.serializeValue(data,
        ProtocolVersion.NEWEST_SUPPORTED), "text/plain");

    queueMessageSerialization.writeMessageData(messageId, messageBody);

    final DatabaseQueueMessageBody returnedBody = queueMessageSerialization.loadMessageData( messageId );
    String returnedData = new String( returnedBody.getBlob().array(), "UTF-8");

    assertEquals(data, returnedData);
}
 
源代码6 项目: usergrid   文件: QueueActorServiceTest.java
@Test
    public void testGetMultipleQueueMessages() throws InterruptedException {

        Injector injector = getInjector();

        ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
        String region = actorSystemFig.getRegionLocal();

//        App app = injector.getInstance( App.class );
//        app.start("localhost", getNextAkkaPort(), region);

        DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
        QueueMessageSerialization serialization         = injector.getInstance( QueueMessageSerialization.class );
        TransferLogSerialization xferLogSerialization   = injector.getInstance( TransferLogSerialization.class );
        QueueMessageManager queueMessageManager         = injector.getInstance( QueueMessageManager.class );

        String queueName = "queue_testGetMultipleQueueMessages_" + UUID.randomUUID();
        QueueManager queueManager = injector.getInstance( QueueManager.class );

        try {

            queueManager.createQueue(
                new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) );

            for (int i = 0; i < 100; i++) {

                UUID messageId = UUIDGen.getTimeUUID();

                final String data = "my test data";
                final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
                    DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
                serialization.writeMessageData( messageId, messageBody );

                xferLogSerialization.recordTransferLog(
                    queueName, actorSystemFig.getRegionLocal(), region, messageId );

                distributedQueueService.sendMessageToRegion(
                    queueName, region, region, messageId, null, null );
            }

            DatabaseQueueMessage.Type type = DatabaseQueueMessage.Type.DEFAULT;

            int maxRetries = 30;
            int retries = 0;
            long count = 0;
            while (retries++ < maxRetries) {
                distributedQueueService.refresh();
                count = queueMessageManager.getQueueDepth(  queueName, type );
                if ( count == 100 ) {
                    break;
                }
                Thread.sleep( 1000 );
            }

            Assert.assertEquals( 100, count );

            Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
            Assert.assertEquals( 75, queueMessageManager.getQueueDepth(  queueName, type ) );

            Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
            Assert.assertEquals( 50, queueMessageManager.getQueueDepth(  queueName, type ) );

            Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
            Assert.assertEquals( 25, queueMessageManager.getQueueDepth(  queueName, type ) );

            Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
            Assert.assertEquals( 0,  queueMessageManager.getQueueDepth(  queueName, type ) );

            distributedQueueService.shutdown();

        } finally {
            queueManager.deleteQueue( queueName );
        }
    }
 
源代码7 项目: usergrid   文件: QueueActorServiceTest.java
@Test
    public void testQueueMessageCounter() throws InterruptedException {

        Injector injector = getInjector();

        ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
        String region = actorSystemFig.getRegionLocal();

//        App app = injector.getInstance( App.class );
//        app.start("localhost", getNextAkkaPort(), region);

        DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
        QueueMessageSerialization serialization         = injector.getInstance( QueueMessageSerialization.class );
        TransferLogSerialization xferLogSerialization   = injector.getInstance( TransferLogSerialization.class );
        QueueMessageManager queueMessageManager         = injector.getInstance( QueueMessageManager.class );

        String queueName = "queue_testGetMultipleQueueMessages_" + UUID.randomUUID();
        QueueManager queueManager = injector.getInstance( QueueManager.class );

        try {

            queueManager.createQueue(
                new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) );

            UUID messageId = UUIDGen.getTimeUUID();

            final String data = "my test data";
            final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
                DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
            serialization.writeMessageData( messageId, messageBody );

            xferLogSerialization.recordTransferLog(
                queueName, actorSystemFig.getRegionLocal(), region, messageId );

            distributedQueueService.sendMessageToRegion(
                queueName, region, region, messageId, null, null );

            DatabaseQueueMessage.Type type = DatabaseQueueMessage.Type.DEFAULT;

            Thread.sleep(5000);

            int maxRetries = 10;
            int retries = 0;
            long count = 0;
            while (retries++ < maxRetries) {
                distributedQueueService.refresh();
                count = queueMessageManager.getQueueDepth(  queueName, type );
                if ( count > 0 ) {
                    break;
                }
                Thread.sleep( 1000 );
            }

            Thread.sleep( 1000 );

            Assert.assertEquals( 1, queueMessageManager.getQueueDepth( queueName, type ) );

            distributedQueueService.shutdown();

        } finally {
            queueManager.deleteQueue( queueName );
        }
    }
 
源代码8 项目: usergrid   文件: QueueMessageManagerTest.java
@Test
    public void testGetWithMissingData() throws InterruptedException {

        Injector injector = getInjector();

        injector.getInstance( App.class ); // init the INJECTOR

        ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
        DistributedQueueService qas         = injector.getInstance( DistributedQueueService.class );
        QueueManager qm               = injector.getInstance( QueueManager.class );
        QueueMessageManager qmm       = injector.getInstance( QueueMessageManager.class );
        QueueMessageSerialization qms = injector.getInstance( QueueMessageSerialization.class );

        String region = actorSystemFig.getRegionLocal();
//        App app = injector.getInstance( App.class );
//        app.start( "localhost", getNextAkkaPort(), region );

        // create queue messages, every other one with missing data

        int numMessages = 100;
        String queueName = "qmmt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
        qm.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));

        for ( int i=0; i<numMessages; i++ ) {

            final UUID messageId = QakkaUtils.getTimeUuid();

            if ( i % 2 == 0 ) { // every other it
                final String data = "my test data";
                final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
                        DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
                qms.writeMessageData( messageId, messageBody );
            }

            UUID queueMessageId = QakkaUtils.getTimeUuid();

            DatabaseQueueMessage message = new DatabaseQueueMessage(
                    messageId,
                    DatabaseQueueMessage.Type.DEFAULT,
                    queueName,
                    actorSystemFig.getRegionLocal(),
                    null,
                    System.currentTimeMillis(),
                    null,
                    queueMessageId);
            qms.writeMessage( message );
        }

        qas.refresh();
        Thread.sleep(1000);

        int count = 0;
        while ( count < numMessages / 2 ) {
            List<QueueMessage> messages = qmm.getNextMessages( queueName, 1 );
            Assert.assertTrue( !messages.isEmpty() );
            count += messages.size();
            logger.debug("Got {} messages", ++count);
        }

        DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
        distributedQueueService.shutdown();
    }
 
@Override
protected ByteBuffer serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){

    /**
     *   final UUID version = value.getVersion();
         final Field<?> field = value.getField();

         final FieldTypeName fieldType = field.getTypeName();
         final String fieldValue = field.getValue().toString().toLowerCase();


         DynamicComposite composite = new DynamicComposite(  );

         //we want to sort ascending to descending by version
         composite.addComponent( version,  UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED);
         composite.addComponent( field.getName(), STRING_SERIALIZER );
         composite.addComponent( fieldValue, STRING_SERIALIZER );
         composite.addComponent( fieldType.name() , STRING_SERIALIZER);
     */

    // values are serialized as strings, not sure why, and always lower cased
    String fieldValueString = fieldEntry.getField().getValue().toString().toLowerCase();


    List<Object> keys = new ArrayList<>(4);
    keys.add(fieldEntry.getVersion());
    keys.add(fieldEntry.getField().getName());
    keys.add(fieldValueString);
    keys.add(fieldEntry.getField().getTypeName().name());

    String comparator = UUID_TYPE_REVERSED;

    int size = 16+fieldEntry.getField().getName().getBytes().length
        +fieldEntry.getField().getValue().toString().getBytes().length+
        fieldEntry.getField().getTypeName().name().getBytes().length;

    // we always need to add length for the 2 byte comparator short,  2 byte length short and 1 byte equality
    size += keys.size()*5;

    // uuid type comparator is longest, ensure we allocate buffer using the max size to avoid overflow
    size += keys.size()*comparator.getBytes().length;

    ByteBuffer stuff = ByteBuffer.allocate(size);


    for (Object key : keys) {

        if(key.equals(fieldEntry.getVersion())) {
            int p = comparator.indexOf("(reversed=true)");
            boolean desc = false;
            if (p >= 0) {
                comparator = comparator.substring(0, p);
                desc = true;
            }

            byte a = (byte) 85; // this is the byte value for UUIDType in astyanax used in legacy data
            if (desc) {
                a = (byte) Character.toUpperCase((char) a);
            }

            stuff.putShort((short) ('耀' | a));
        }else{
            comparator = "UTF8Type"; // only strings are being serialized other than UUIDs here
            stuff.putShort((short)comparator.getBytes().length);
            stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
        }

        ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
        if (kb == null) {
            kb = ByteBuffer.allocate(0);
        }

        // put a short that indicates how big the buffer is for this item
        stuff.putShort((short) kb.remaining());

        // put the actual item
        stuff.put(kb.slice());

        // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
        stuff.put((byte) 0);


    }

    stuff.flip();
    return stuff;

}
 
@Override
protected ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion){

    /**
     *   final Id entityId = ev.getEntityId();
         final UUID entityUuid = entityId.getUuid();
         final String entityType = entityId.getType();

         CompositeBuilder builder = Composites.newDynamicCompositeBuilder();

         builder.addUUID( entityVersion );
         builder.addUUID( entityUuid );
         builder.addString(entityType );
     */

    String comparator = "UTF8Type";

    List<Object> keys = new ArrayList<>(3);
    keys.add(entityVersion.getEntityVersion());
    keys.add(entityVersion.getEntityId().getUuid());
    keys.add(entityVersion.getEntityId().getType());

    // UUIDs are 16 bytes
    int size = 16+16+entityVersion.getEntityId().getType().getBytes().length;

    // we always need to add length for the 2 byte comparator short,  2 byte length short and 1 byte equality
    size += keys.size()*5;

    // we always add comparator to the buffer as well
    size += keys.size()*comparator.getBytes().length;

    ByteBuffer stuff = ByteBuffer.allocate(size);

    for (Object key : keys) {

        // custom comparator alias to comparator mappings in  CQLUtils.COMPOSITE_TYPE ( more leftover from Asytanax )
        // the custom mapping is used for schema creation, but datastax driver does not have the alias concept and
        // we must work with the actual types
        if(key instanceof UUID){
            comparator = "UUIDType";
        }else{
            comparator = "UTF8Type"; // if it's not a UUID, the only other thing we're serializing is text
        }

        stuff.putShort((short)comparator.getBytes().length);
        stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));

        ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
        if (kb == null) {
            kb = ByteBuffer.allocate(0);
        }

        // put a short that indicates how big the buffer is for this item
        stuff.putShort((short) kb.remaining());

        // put the actual item
        stuff.put(kb.slice());

        // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
        stuff.put((byte) 0);


    }

    stuff.flip();
    return stuff;

}
 
private ByteBuffer serializeKey(UUID appUUID,
                                String applicationType,
                                String entityType,
                                String fieldType,
                                String fieldName,
                                Object fieldValue  ){

    // values are serialized as strings, not sure why, and always lower cased
    String fieldValueString = fieldValue.toString().toLowerCase();

    List<Object> keys = new ArrayList<>(6);
    keys.add(0, appUUID);
    keys.add(1, applicationType);
    keys.add(2, entityType);
    keys.add(3, fieldType);
    keys.add(4, fieldName);
    keys.add(5, fieldValueString);


    // UUIDs are 16 bytes, allocate the buffer accordingly
    int size = 16 + applicationType.getBytes().length + entityType.getBytes().length
        + fieldType.getBytes().length + fieldName.getBytes().length+fieldValueString.getBytes().length;


    // we always need to add length for the 2 byte short and 1 byte equality
    size += keys.size()*3;

    ByteBuffer stuff = ByteBuffer.allocate(size);

    for (Object key : keys) {

        ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
        if (kb == null) {
            kb = ByteBuffer.allocate(0);
        }

        stuff.putShort((short) kb.remaining());
        stuff.put(kb.slice());
        stuff.put((byte) 0);


    }
    stuff.flip();
    return stuff;

}
 
@Override
protected ByteBuffer serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){

    /**
     *  final UUID version = value.getVersion();
        final Field<?> field = value.getField();

         final FieldTypeName fieldType = field.getTypeName();
         final String fieldValue = field.getValue().toString().toLowerCase();


         DynamicComposite composite = new DynamicComposite(  );

         //we want to sort ascending to descending by version
         composite.addComponent( version,  UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED);
         composite.addComponent( field.getName(), STRING_SERIALIZER );
         composite.addComponent( fieldValue, STRING_SERIALIZER );
         composite.addComponent( fieldType.name() , STRING_SERIALIZER);
     */

    // values are serialized as strings, not sure why, and always lower cased
    String fieldValueString = fieldEntry.getField().getValue().toString().toLowerCase();


    List<Object> keys = new ArrayList<>(4);
    keys.add(fieldEntry.getVersion());
    keys.add(fieldEntry.getField().getName());
    keys.add(fieldValueString);
    keys.add(fieldEntry.getField().getTypeName().name());

    String comparator = UUID_TYPE_REVERSED;

    int size = 16+fieldEntry.getField().getName().getBytes().length
        +fieldEntry.getField().getValue().toString().getBytes().length+
        fieldEntry.getField().getTypeName().name().getBytes().length;

    // we always need to add length for the 2 byte comparator short,  2 byte length short and 1 byte equality
    size += keys.size()*5;

    // uuid type comparator is longest, ensure we allocate buffer using the max size to avoid overflow
    size += keys.size()*comparator.getBytes().length;

    ByteBuffer stuff = ByteBuffer.allocate(size);


    for (Object key : keys) {

        if(key.equals(fieldEntry.getVersion())) {
            int p = comparator.indexOf("(reversed=true)");
            boolean desc = false;
            if (p >= 0) {
                comparator = comparator.substring(0, p);
                desc = true;
            }

            byte a = (byte) 85; // this is the byte value for UUIDType in astyanax used in legacy data
            if (desc) {
                a = (byte) Character.toUpperCase((char) a);
            }

            stuff.putShort((short) ('耀' | a));
        }else{
            comparator = "UTF8Type"; // only strings are being serialized other than UUIDs here
            stuff.putShort((short)comparator.getBytes().length);
            stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
        }

        ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
        if (kb == null) {
            kb = ByteBuffer.allocate(0);
        }

        // put a short that indicates how big the buffer is for this item
        stuff.putShort((short) kb.remaining());

        // put the actual item
        stuff.put(kb.slice());

        // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
        stuff.put((byte) 0);


    }

    stuff.flip();
    return stuff;

}
 
@Override
protected ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion){

    /**
     *  final Id entityId = ev.getEntityId();
        final UUID entityUuid = entityId.getUuid();
        final String entityType = entityId.getType();

        CompositeBuilder builder = Composites.newDynamicCompositeBuilder();

        builder.addUUID( entityVersion );
        builder.addUUID( entityUuid );
        builder.addString(entityType );
     */

    String comparator = "UTF8Type";

    List<Object> keys = new ArrayList<>(3);
    keys.add(entityVersion.getEntityVersion());
    keys.add(entityVersion.getEntityId().getUuid());
    keys.add(entityVersion.getEntityId().getType());

    // UUIDs are 16 bytes
    int size = 16+16+entityVersion.getEntityId().getType().getBytes().length;

    // we always need to add length for the 2 byte comparator short,  2 byte length short and 1 byte equality
    size += keys.size()*5;

    // we always add comparator to the buffer as well
    size += keys.size()*comparator.getBytes().length;

    ByteBuffer stuff = ByteBuffer.allocate(size);

    for (Object key : keys) {

        if(key instanceof UUID){
            comparator = "UUIDType";
        }else{
            comparator = "UTF8Type"; // if it's not a UUID, the only other thing we're serializing is text
        }

        stuff.putShort((short)comparator.getBytes().length);
        stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));

        ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
        if (kb == null) {
            kb = ByteBuffer.allocate(0);
        }

        // put a short that indicates how big the buffer is for this item
        stuff.putShort((short) kb.remaining());

        // put the actual item
        stuff.put(kb.slice());

        // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
        stuff.put((byte) 0);


    }

    stuff.flip();
    return stuff;

}
 
private ByteBuffer serializeKey( UUID appUUID,
                                 String applicationType,
                                 String entityType,
                                 String fieldType,
                                 String fieldName,
                                 Object fieldValue  ){

    final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( entityType );

    /**
        final CollectionPrefixedKey<Field> uniquePrefixedKey =
            new CollectionPrefixedKey<>( collectionName, applicationId, field );

        final Id orgId = ID_SER.fromComposite( parser );
        final Id scopeId = ID_SER.fromComposite( parser );
        final String scopeName = parser.readString();
        final K value = keySerializer.fromComposite( parser );
    **/

    // values are serialized as strings, not sure why, and always lower cased
    String fieldValueString = fieldValue.toString().toLowerCase();

    List<Object> keys = new ArrayList<>(8);
    keys.add(0, appUUID);
    keys.add(1, applicationType);
    keys.add(2, appUUID);
    keys.add(3, applicationType);
    keys.add(4, collectionName);
    keys.add(5, fieldType);
    keys.add(6, fieldName);
    keys.add(7, fieldValueString);


    // UUIDs are 16 bytes, allocate the buffer accordingly
    int size = 16 + applicationType.getBytes().length + 16 + applicationType.getBytes().length +
        collectionName.getBytes().length + fieldType.getBytes().length + fieldName.getBytes().length
        + fieldValueString.getBytes().length;


    // we always need to add length for the 2 byte short and 1 byte equality
    size += keys.size()*3;

    ByteBuffer stuff = ByteBuffer.allocate(size);

    for (Object key : keys) {

        ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
        if (kb == null) {
            kb = ByteBuffer.allocate(0);
        }

        stuff.putShort((short) kb.remaining());
        stuff.put(kb.slice());
        stuff.put((byte) 0);


    }
    stuff.flip();
    return stuff;

}
 
private ByteBuffer serializeLogKey(UUID appUUID, String applicationType, UUID entityId, String entityType){


       final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( entityType );

      /**
            final CollectionPrefixedKey<Id> collectionPrefixedEntityKey =
                new CollectionPrefixedKey<>( collectionName, applicationId, uniqueValueId );
       **/

        List<Object> keys = new ArrayList<>(4);
        keys.add(appUUID);
        keys.add(applicationType);
        keys.add(appUUID);
        keys.add(applicationType);
        keys.add(collectionName);
        keys.add(entityId);
        keys.add(entityType);

        int size = 16+applicationType.getBytes().length+16+applicationType.getBytes().length
            +collectionName.getBytes().length+16+entityType.getBytes().length;

        // we always need to add length for the 2 byte short and 1 byte equality
        size += keys.size()*3;

        ByteBuffer stuff = ByteBuffer.allocate(size);

        for (Object key : keys) {

            ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
            if (kb == null) {
                kb = ByteBuffer.allocate(0);
            }

            stuff.putShort((short) kb.remaining());
            stuff.put(kb.slice());
            stuff.put((byte) 0);


        }
        stuff.flip();
        return stuff;

    }
 
@Test
public void deleteMessageData() throws UnsupportedEncodingException {

    QueueMessageSerialization queueMessageSerialization =
            getInjector().getInstance( QueueMessageSerialization.class );

    Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);

    UUID messageId = QakkaUtils.getTimeUuid();

    final String data = "my test data";

    final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody( DataType.serializeValue(data,
            ProtocolVersion.NEWEST_SUPPORTED), "text/plain");

    queueMessageSerialization.writeMessageData(messageId, messageBody);

    final DatabaseQueueMessageBody returnedBody = queueMessageSerialization.loadMessageData( messageId );
    final String returnedData = new String( returnedBody.getBlob().array(), "UTF-8");

    assertEquals(data, returnedData);

    queueMessageSerialization.deleteMessageData(messageId);

    assertNull(queueMessageSerialization.loadMessageData( messageId ));


}
 
源代码17 项目: usergrid   文件: QueueActorServiceTest.java
@Test
    public void testBasicOperation() throws Exception {

        Injector injector = getInjector();

        ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
        String region = actorSystemFig.getRegionLocal();

//        App app = injector.getInstance( App.class );
//        app.start( "localhost", getNextAkkaPort(), region );

        DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
        QueueMessageSerialization serialization = injector.getInstance( QueueMessageSerialization.class );

        String queueName = "testqueue_" + UUID.randomUUID();
        QueueManager queueManager = injector.getInstance( QueueManager.class );
        queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));

        try {

            // send 1 queue message, get back one queue message
            UUID messageId = UUIDGen.getTimeUUID();

            final String data = "my test data";
            final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
                DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
            serialization.writeMessageData( messageId, messageBody );

            distributedQueueService.sendMessageToRegion(
                queueName, region, region, messageId, null, null );

            distributedQueueService.refresh();
            Thread.sleep( 1000 );

            Collection<DatabaseQueueMessage> qmReturned = distributedQueueService.getNextMessages( queueName, 1 );
            Assert.assertEquals( 1, qmReturned.size() );

            DatabaseQueueMessage dqm = qmReturned.iterator().next();
            DatabaseQueueMessageBody dqmb = serialization.loadMessageData( dqm.getMessageId() );
            ByteBuffer blob = dqmb.getBlob();

            String returnedData = new String( blob.array(), "UTF-8" );

            Assert.assertEquals( data, returnedData );

            distributedQueueService.shutdown();

        } finally {
            queueManager.deleteQueue( queueName );
        }

    }