javax.jms.MessageEOFException#org.apache.qpid.proton.amqp.messaging.Data源码实例Demo

下面列出了javax.jms.MessageEOFException#org.apache.qpid.proton.amqp.messaging.Data 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Test
public void testSourceWithDataContent() {
    String topic = UUID.randomUUID().toString();
    Map<String, Object> config = getConfig(topic);
    List<Message<byte[]>> messages = new ArrayList<>();
    provider = new AmqpConnector();
    provider.setup(executionHolder);

    PublisherBuilder<? extends Message<?>> builder = provider.getPublisherBuilder(new MapBasedConfig(config));
    AtomicBoolean opened = new AtomicBoolean();

    builder.to(createSubscriber(messages, opened)).run();
    await().until(opened::get);

    await().until(() -> provider.isReady(config.get(CHANNEL_NAME_ATTRIBUTE).toString()));

    List<String> list = new ArrayList<>();
    list.add("hello");
    list.add("world");
    usage.produce(topic, 1, () -> new Data(new Binary(list.toString().getBytes())));

    await().atMost(2, TimeUnit.MINUTES).until(() -> !messages.isEmpty());
    byte[] result = messages.get(0).getPayload();
    assertThat(new String(result))
            .isEqualTo(list.toString());
}
 
源代码2 项目: hono   文件: AbstractRequestResponseClientTest.java
/**
 * Verifies that the client creates and sends a message based on provided headers and payload
 * and sets a timer for canceling the request if no response is received.
 */
@Test
public void testCreateAndSendRequestSendsProperRequestMessage() {

    // GIVEN a request-response client that times out requests after 200 ms
    client.setRequestTimeout(200);

    // WHEN sending a request message with some headers and payload
    final JsonObject payload = new JsonObject().put("key", "value");
    final Map<String, Object> props = Collections.singletonMap("test-key", "test-value");
    client.createAndSendRequest("get", props, payload.toBuffer(), s -> {});

    // THEN the message is sent and the message being sent contains the headers as application properties
    final ArgumentCaptor<Message> messageCaptor = ArgumentCaptor.forClass(Message.class);
    verify(sender).send(messageCaptor.capture(), VertxMockSupport.anyHandler());
    assertThat(messageCaptor.getValue()).isNotNull();
    assertThat(messageCaptor.getValue().getBody()).isNotNull();
    assertThat(messageCaptor.getValue().getBody()).isInstanceOf(Data.class);
    final Buffer body = MessageHelper.getPayload(messageCaptor.getValue());
    assertThat(body.getBytes()).isEqualTo(payload.toBuffer().getBytes());
    assertThat(messageCaptor.getValue().getApplicationProperties()).isNotNull();
    assertThat(messageCaptor.getValue().getApplicationProperties().getValue().get("test-key")).isEqualTo("test-value");
    // and a timer has been set to time out the request after 200 ms
    verify(vertx).setTimer(eq(200L), VertxMockSupport.anyHandler());
}
 
@Override
public Message toMessage(String address, KafkaConsumerRecord<String, byte[]> record) {

    Message message = Proton.message();
    message.setAddress(address);

    // put message annotations about partition, offset and key (if not null)
    Map<Symbol, Object> map = new HashMap<>();
    map.put(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_ANNOTATION), record.partition());
    map.put(Symbol.valueOf(AmqpBridge.AMQP_OFFSET_ANNOTATION), record.offset());
    map.put(Symbol.valueOf(AmqpBridge.AMQP_KEY_ANNOTATION), record.key());
    map.put(Symbol.valueOf(AmqpBridge.AMQP_TOPIC_ANNOTATION), record.topic());

    MessageAnnotations messageAnnotations = new MessageAnnotations(map);
    message.setMessageAnnotations(messageAnnotations);

    message.setBody(new Data(new Binary(record.value())));

    return message;
}
 
源代码4 项目: enmasse   文件: MessageSendTester.java
private void handleMessage(final Message message) {

            if (log.isInfoEnabled()) {
                String str = message.toString();
                if (str.length() > MAX_MESSAGE_DUMP_LENGTH) {
                    str = str.substring(0, MAX_MESSAGE_DUMP_LENGTH) + "…";
                }
                log.info("Received message - {}", str);
            }

            var body = message.getBody();
            if (!(body instanceof Data)) {
                handleInvalidMessage(message);
                return;
            }

            var json = new JsonObject(Buffer.buffer(((Data) body).getValue().getArray()));
            var testId = json.getString("test-id");
            var timestamp = json.getLong("timestamp");
            if (!this.testId.equals(testId) || timestamp == null) {
                handleInvalidMessage(message);
                return;
            }

            handleValidMessage(message, timestamp, json);
        }
 
@Test
public void testConvertBytesMessageToAmqpMessageWithDataBody() throws Exception {
   byte[] expectedPayload = new byte[]{8, 16, 24, 32};
   ServerJMSBytesMessage outbound = createBytesMessage();
   outbound.writeBytes(expectedPayload);
   outbound.encode();

   AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage(), null);

   assertNotNull(amqp.getBody());
   assertTrue(amqp.getBody() instanceof Data);
   assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
   assertEquals(4, ((Data) amqp.getBody()).getValue().getLength());

   Binary amqpData = ((Data) amqp.getBody()).getValue();
   Binary inputData = new Binary(expectedPayload);

   assertTrue(inputData.equals(amqpData));
}
 
@Test
public void testConvertObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
   ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
   outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
   outbound.encode();

   AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage(), null);

   assertNotNull(amqp.getBody());
   assertTrue(amqp.getBody() instanceof Data);
   assertFalse(0 == ((Data) amqp.getBody()).getValue().getLength());

   Object value = deserialize(((Data) amqp.getBody()).getValue().getArray());
   assertNotNull(value);
   assertTrue(value instanceof UUID);
}
 
@Test
public void testConvertTextMessageCreatesBodyUsingOriginalEncodingWithDataSection() throws Exception {
   String contentString = "myTextMessageContent";
   ServerJMSTextMessage outbound = createTextMessage(contentString);
   outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
   outbound.encode();

   AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage(), null);

   assertNotNull(amqp.getBody());
   assertTrue(amqp.getBody() instanceof Data);
   assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);

   Binary data = ((Data) amqp.getBody()).getValue();
   String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8);
   assertEquals(contentString, contents);
}
 
@Test
public void testConvertTextMessageContentNotStoredCreatesBodyUsingOriginalEncodingWithDataSection() throws Exception {
   String contentString = "myTextMessageContent";
   ServerJMSTextMessage outbound = createTextMessage(contentString);
   outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
   outbound.encode();

   AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage(), null);

   assertNotNull(amqp.getBody());
   assertTrue(amqp.getBody() instanceof Data);
   assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);

   Binary data = ((Data) amqp.getBody()).getValue();
   String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8);
   assertEquals(contentString, contents);
}
 
源代码9 项目: qpid-jms   文件: AmqpJmsTextMessageFacadeTest.java
@Test
public void testGetTextUsingReceivedMessageWithDataSectionContainingStringBytes() throws Exception {
    String encodedString = "myEncodedString";
    byte[] encodedBytes = encodedString.getBytes(Charset.forName("UTF-8"));

    org.apache.qpid.proton.codec.Data payloadData = org.apache.qpid.proton.codec.Data.Factory.create();
    payloadData.putDescribedType(new DataDescribedType(new Binary(encodedBytes)));
    Binary b = payloadData.encode();

    Message message = Message.Factory.create();
    int decoded = message.decode(b.getArray(), b.getArrayOffset(), b.getLength());
    assertEquals(decoded, b.getLength());
    AmqpJmsTextMessageFacade amqpTextMessageFacade = createReceivedTextMessageFacade(createMockAmqpConsumer(), message);

    assertEquals(encodedString, amqpTextMessageFacade.getText());
}
 
源代码10 项目: qpid-jms   文件: AmqpJmsTextMessageFacadeTest.java
@Test
public void testGetTextWithUnknownEncodedDataThrowsJMSException() throws Exception {
    String encodedString = "myEncodedString";
    byte[] encodedBytes = encodedString.getBytes(Charset.forName("UTF-16"));

    Message message = Message.Factory.create();
    message.setBody(new Data(new Binary(encodedBytes)));
    AmqpJmsTextMessageFacade amqpTextMessageFacade = createReceivedTextMessageFacade(createMockAmqpConsumer(), message);

    try {
        amqpTextMessageFacade.getText();
        fail("expected exception not thrown");
    } catch (JMSException ise) {
        // expected
    }
}
 
源代码11 项目: qpid-jms   文件: AmqpCodecTest.java
/**
 * Test that a data body containing nothing, but with the content type set to
 * {@value AmqpMessageSupport#OCTET_STREAM_CONTENT_TYPE} results in a BytesMessage when not
 * otherwise annotated to indicate the type of JMS message it is.
 *
 * @throws Exception if an error occurs during the test.
 */
@Test
public void testCreateBytesMessageFromDataWithEmptyBinaryAndContentType() throws Exception {
    Message message = Proton.message();
    Binary binary = new Binary(new byte[0]);
    message.setBody(new Data(binary));
    message.setContentType(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE.toString());

    JmsMessage jmsMessage = AmqpCodec.decodeMessage(mockConsumer, encodeMessage(message)).asJmsMessage();
    assertNotNull("Message should not be null", jmsMessage);
    assertEquals("Unexpected message class type", JmsBytesMessage.class, jmsMessage.getClass());

    JmsMessageFacade facade = jmsMessage.getFacade();
    assertNotNull("Facade should not be null", facade);
    assertEquals("Unexpected facade class type", AmqpJmsBytesMessageFacade.class, facade.getClass());
}
 
源代码12 项目: qpid-jms   文件: AmqpCodecTest.java
/**
 * Test that a receiving a data body containing nothing and no content type being set
 * results in a BytesMessage when not otherwise annotated to indicate the type of
 * JMS message it is.
 *
 * @throws Exception if an error occurs during the test.
 */
@Test
public void testCreateBytesMessageFromDataWithEmptyBinaryAndNoContentType() throws Exception {
    Message message = Proton.message();
    Binary binary = new Binary(new byte[0]);
    message.setBody(new Data(binary));

    assertNull(message.getContentType());

    JmsMessage jmsMessage = AmqpCodec.decodeMessage(mockConsumer, encodeMessage(message)).asJmsMessage();
    assertNotNull("Message should not be null", jmsMessage);
    assertEquals("Unexpected message class type", JmsBytesMessage.class, jmsMessage.getClass());

    JmsMessageFacade facade = jmsMessage.getFacade();
    assertNotNull("Facade should not be null", facade);
    assertEquals("Unexpected facade class type", AmqpJmsBytesMessageFacade.class, facade.getClass());
}
 
源代码13 项目: qpid-jms   文件: AmqpCodecTest.java
/**
 * Test that receiving a data body containing nothing, but with the content type set to
 * {@value AmqpMessageSupport#SERIALIZED_JAVA_OBJECT_CONTENT_TYPE} results in an ObjectMessage
 * when not otherwise annotated to indicate the type of JMS message it is.
 *
 * @throws Exception if an error occurs during the test.
 */
@Test
public void testCreateObjectMessageFromDataWithContentTypeAndEmptyBinary() throws Exception {
    Message message = Proton.message();
    Binary binary = new Binary(new byte[0]);
    message.setBody(new Data(binary));
    message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString());

    JmsMessage jmsMessage = AmqpCodec.decodeMessage(mockConsumer, encodeMessage(message)).asJmsMessage();
    assertNotNull("Message should not be null", jmsMessage);
    assertEquals("Unexpected message class type", JmsObjectMessage.class, jmsMessage.getClass());

    JmsMessageFacade facade = jmsMessage.getFacade();
    assertNotNull("Facade should not be null", facade);
    assertEquals("Unexpected facade class type", AmqpJmsObjectMessageFacade.class, facade.getClass());

    AmqpObjectTypeDelegate delegate = ((AmqpJmsObjectMessageFacade) facade).getDelegate();
    assertTrue("Unexpected delegate type: " + delegate, delegate instanceof AmqpSerializedObjectDelegate);
}
 
源代码14 项目: qpid-jms   文件: AmqpJmsObjectMessageFacadeTest.java
/**
 * Test that setting an object on a new message results in the expected
 * content in the body section of the underlying message.
 *
 * @throws Exception if an error occurs during the test.
 */
@Test
public void testSetObjectOnNewMessage() throws Exception {
    String content = "myStringContent";

    AmqpJmsObjectMessageFacade amqpObjectMessageFacade = createNewObjectMessageFacade(false);
    amqpObjectMessageFacade.setObject(content);

    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    ObjectOutputStream oos = new ObjectOutputStream(baos);
    oos.writeObject(content);
    oos.flush();
    oos.close();
    byte[] bytes = baos.toByteArray();

    // retrieve the bytes from the underlying message, check they match expectation
    Section section = amqpObjectMessageFacade.getBody();
    assertNotNull(section);
    assertEquals(Data.class, section.getClass());
    assertArrayEquals("Underlying message data section did not contain the expected bytes", bytes, ((Data) section).getValue().getArray());
}
 
源代码15 项目: qpid-jms   文件: AmqpJmsBytesMessageFacadeTest.java
@Test
public void testClearBodyWithExistingInputStream() throws Exception {
    byte[] bytes = "myBytes".getBytes();

    Message message = Message.Factory.create();
    message.setBody(new Data(new Binary(bytes)));
    AmqpJmsBytesMessageFacade amqpBytesMessageFacade = createReceivedBytesMessageFacade(createMockAmqpConsumer(), message);

    @SuppressWarnings("unused")
    InputStream unused = amqpBytesMessageFacade.getInputStream();

    amqpBytesMessageFacade.clearBody();

    assertEquals("Expected no data from facade, but got some", END_OF_STREAM, amqpBytesMessageFacade.getInputStream().read(new byte[1]));

    assertDataBodyAsExpected(amqpBytesMessageFacade.getBody(), 0);
}
 
源代码16 项目: qpid-jms   文件: AmqpJmsBytesMessageFacadeTest.java
@Test
public void testInputStreamUsingReceivedMessageWithDataSection() throws Exception {
    byte[] bytes = "myBytes".getBytes();

    Message message = Message.Factory.create();
    message.setBody(new Data(new Binary(bytes)));

    AmqpJmsBytesMessageFacade amqpBytesMessageFacade = createReceivedBytesMessageFacade(createMockAmqpConsumer(), message);
    InputStream bytesStream = amqpBytesMessageFacade.getInputStream();
    assertNotNull(bytesStream);

    // retrieve the expected bytes, check they match
    byte[] receivedBytes = new byte[bytes.length];
    bytesStream.read(receivedBytes);
    assertTrue(Arrays.equals(bytes, receivedBytes));

    // verify no more bytes remain, i.e EOS
    assertEquals("Expected input stream to be at end but data was returned", END_OF_STREAM, bytesStream.read(new byte[1]));
}
 
源代码17 项目: qpid-jms   文件: AmqpJmsBytesMessageFacadeTest.java
private void assertDataBodyAsExpected(Section body, int length) {
    assertNotNull("Expected body section to be present", body);
    assertEquals("Unexpected body section type", Data.class, body.getClass());
    Binary value = ((Data) body).getValue();
    assertNotNull(value);
    assertEquals("Unexpected body length", length, value.getLength());
}
 
源代码18 项目: hono   文件: VertxBasedAmqpProtocolAdapterTest.java
private Message getFakeMessage(final String to, final Buffer payload, final String subject) {

        final Message message = mock(Message.class);
        when(message.getMessageId()).thenReturn("the-message-id");
        when(message.getSubject()).thenReturn(subject);
        when(message.getAddress()).thenReturn(to);

        if (payload != null) {
            final Data data = new Data(new Binary(payload.getBytes()));
            when(message.getContentType()).thenReturn("text/plain");
            when(message.getBody()).thenReturn(data);
        }

        return message;
    }
 
源代码19 项目: qpid-jms   文件: AmqpJmsBytesMessageFacadeTest.java
@Test
public void testHasBodyWithContent() throws Exception {
    byte[] bodyBytes = "myOrigBytes".getBytes();

    Message message = Message.Factory.create();
    message.setBody(new Data(new Binary(bodyBytes)));
    AmqpJmsBytesMessageFacade amqpBytesMessageFacade = createReceivedBytesMessageFacade(createMockAmqpConsumer(), message);

    assertTrue(amqpBytesMessageFacade.hasBody());
}
 
源代码20 项目: hono   文件: RequestResponseEndpointTest.java
/**
 * Verifies that the endpoint returns a response with status code 400
 * for a request that contains malformed payload. 
 */
@SuppressWarnings("unchecked")
@Test
public void testHandleMessageSendsResponseForMalformedPayload() {

    final Message msg = ProtonHelper.message();
    msg.setSubject("get");
    msg.setReplyTo(REPLY_RESOURCE.toString());
    msg.setCorrelationId(UUID.randomUUID().toString());
    msg.setBody(new Data(new Binary(new byte[] { 0x01, 0x02, 0x03 })));

    final ProtonDelivery delivery = mock(ProtonDelivery.class);

    final RequestResponseEndpoint<ServiceConfigProperties> endpoint = getEndpoint(true);
    endpoint.onLinkAttach(connection, sender, REPLY_RESOURCE);

    // WHEN a request for an operation is received that the client is authorized to invoke
    endpoint.handleRequestMessage(connection, receiver, resource, delivery, msg);

    // THEN then the message is accepted
    verify(delivery).disposition(argThat(d -> d instanceof Accepted), eq(Boolean.TRUE));

    // and not forwarded to the service instance
    verify(eventBus, never()).request(eq(EVENT_BUS_ADDRESS), any(JsonObject.class), any(DeliveryOptions.class), any(Handler.class));

    // and a response with the expected status is sent to the client
    verify(sender).send(argThat(m -> hasStatusCode(m, HttpURLConnection.HTTP_BAD_REQUEST)));
    verify(receiver).flow(1);
}
 
源代码21 项目: qpid-jms   文件: AmqpJmsObjectMessageFacadeTest.java
@Test
public void testGetObjectUsingReceivedMessageWithDataSectionContainingNothingReturnsNull() throws Exception {
    Message message = Message.Factory.create();
    message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString());
    message.setBody(new Data(null));

    AmqpJmsObjectMessageFacade amqpObjectMessageFacade = createReceivedObjectMessageFacade(createMockAmqpConsumer(), message);

    assertNull("Expected null object", amqpObjectMessageFacade.getObject());
}
 
源代码22 项目: hono   文件: MessageHelper.java
/**
 * Sets the payload of an AMQP message using a <em>Data</em> section.
 *
 * @param message The message.
 * @param contentType The type of the payload. The message's <em>content-type</em> property
 *                    will only be set if both this and the payload parameter are not {@code null}.
 * @param payload The payload or {@code null} if there is no payload to convey in the message body.
 *
 * @throws NullPointerException If message is {@code null}.
 */
public static void setPayload(final Message message, final String contentType, final byte[] payload) {
    Objects.requireNonNull(message);

    if (payload != null) {
        message.setBody(new Data(new Binary(payload)));
        if (contentType != null) {
            message.setContentType(contentType);
        }
    }
}
 
源代码23 项目: hono   文件: MessageHelperTest.java
/**
 * Verifies that the helper properly handles malformed JSON payload.
 */
@Test
public void testGetJsonPayloadHandlesMalformedJson() {

    final Message msg = ProtonHelper.message();
    msg.setBody(new Data(new Binary(new byte[] { 0x01, 0x02, 0x03, 0x04 }))); // not JSON
    assertThatThrownBy(() -> MessageHelper.getJsonPayload(msg)).isInstanceOf(DecodeException.class);
}
 
源代码24 项目: hono   文件: MessageHelperTest.java
/**
 * Verifies that the helper does not throw an exception when reading
 * invalid UTF-8 from a message's payload.
 */
@Test
public void testGetPayloadAsStringHandlesNonCharacterPayload() {

    final Message msg = ProtonHelper.message();
    msg.setBody(new Data(new Binary(new byte[] { (byte) 0xc3, (byte) 0x28 })));
    assertThat(MessageHelper.getPayloadAsString(msg)).isNotNull();

    msg.setBody(new Data(new Binary(new byte[] { (byte) 0xf0, (byte) 0x28, (byte) 0x8c, (byte) 0xbc })));
    assertThat(MessageHelper.getPayloadAsString(msg)).isNotNull();
}
 
源代码25 项目: hono   文件: MessageHelperTest.java
/**
 * Verifies that the helper does not throw an exception when trying to
 * read payload as JSON from an empty Data section.
 */
@Test
public void testGetJsonPayloadHandlesEmptyDataSection() {

    final Message msg = ProtonHelper.message();
    msg.setBody(new Data(new Binary(new byte[0])));
    assertThat(MessageHelper.getJsonPayload(msg)).isNull();
}
 
源代码26 项目: hono   文件: MessageTapTest.java
private Message createTestMessage(final String contentType, final String payload) {
    final Message msg = createTestMessageWithoutBody(payload);
    if (payload != null) {
        msg.setBody(new Data(new Binary(payload.getBytes(StandardCharsets.UTF_8))));
    }
    msg.setContentType(contentType);
    MessageHelper.setJsonPayload(msg, payload);
    return msg;
}
 
源代码27 项目: qpid-proton-j   文件: FastPathDataType.java
@Override
public Data readValue() {
    ReadableBuffer buffer = getDecoder().getBuffer();
    byte encodingCode = buffer.get();

    int size = 0;

    switch (encodingCode) {
        case EncodingCodes.VBIN8:
            size = buffer.get() & 0xFF;
            break;
        case EncodingCodes.VBIN32:
            size = buffer.getInt();
            break;
        case EncodingCodes.NULL:
            return new Data(null);
        default:
            throw new ProtonException("Expected Binary type but found encoding: " + encodingCode);
    }

    if (size > buffer.remaining()) {
        throw new IllegalArgumentException("Binary data size " + size + " is specified to be greater than the " +
                                           "amount of data available ("+ buffer.remaining()+")");
    }

    byte[] data = new byte[size];
    buffer.get(data, 0, size);

    return new Data(new Binary(data));
}
 
源代码28 项目: qpid-proton-j   文件: FastPathDataType.java
@Override
public void write(Data data) {
    WritableBuffer buffer = getEncoder().getBuffer();
    buffer.put(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
    buffer.put(EncodingCodes.SMALLULONG);
    buffer.put(DESCRIPTOR_CODE);
    getEncoder().writeBinary(data.getValue());
}
 
源代码29 项目: qpid-jms   文件: AmqpJmsStreamMessageFacadeTest.java
@Test
public void testCreateWithUnexpectedBodySectionTypeThrowsISE() throws Exception {
    Message message = Message.Factory.create();
    message.setBody(new Data(new Binary(new byte[0])));

    try {
        createReceivedStreamMessageFacade(createMockAmqpConsumer(), message);
        fail("expected exception to be thrown");
    } catch (IllegalStateException ise) {
        // expected
    }
}
 
源代码30 项目: qpid-proton-j   文件: MessageImplTest.java
void doMessageEncodingWithDataBodySectionTestImpl(int bytesLength)
{
    byte[] bytes = generateByteArray(bytesLength);

    byte[] expectedBytes = generateExpectedDataSectionBytes(bytes);
    byte[] encodedBytes = new byte[expectedBytes.length];

    Message msg = Message.Factory.create();
    msg.setBody(new Data(new Binary(bytes)));

    int encodedLength = msg.encode(encodedBytes, 0, encodedBytes.length);

    assertArrayEquals("Encoded bytes do not match expectation", expectedBytes, encodedBytes);
    assertEquals("Encoded length different than expected length", encodedLength, encodedBytes.length);
}