下面列出了javax.jms.MessageEOFException#org.apache.qpid.proton.amqp.Binary 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Create request message
* @param component string representation component name
* @return created message
* @throws JMSException if unable to create message
*/
private Message createMessage(String component) throws JMSException {
logger.debug("Creating request message for: {}", component);
Message message = session.createObjectMessage();
message.setBooleanProperty("JMS_AMQP_TYPED_ENCODING", true);
message.setStringProperty("name", "self");
message.setStringProperty("operation", "QUERY");
message.setStringProperty("type", "org.amqp.management");
Binary requestFor = new Binary(("org.apache.qpid.dispatch." + component).getBytes());
((JmsMessage) message).getFacade().setProperty("entityType", requestFor);
HashMap<String,Object> map = new HashMap<>();
map.put("attributeNames", new ArrayList<>());
((ObjectMessage) message).setObject(map);
message.setJMSReplyTo(destination);
return message;
}
@Test
public void testAppendBinaryWithOffsetsToBuffer() throws Exception
{
DeliveryImpl delivery = new DeliveryImpl(null, Mockito.mock(LinkImpl.class), null);
byte[] data1 = new byte[] { 0, 1, 2, 3, 4, 5 };
byte[] data2 = new byte[] { 6, 7, 8, 9, 10, 11 };
Binary binary1 = new Binary(data1, 1, 2);
Binary binary2 = new Binary(data2, 0, 4);
delivery.append(binary1);
delivery.append(binary2);
assertEquals(binary1.getLength() + binary2.getLength(), delivery.getDataLength());
assertNotNull(delivery.getData());
assertEquals(binary1.getLength() + binary2.getLength(), delivery.getData().remaining());
}
@Override
public void setUserId(byte[] userId)
{
if(userId == null)
{
if(_properties != null)
{
_properties.setUserId(null);
}
}
else
{
if(_properties == null)
{
_properties = new Properties();
}
byte[] id = new byte[userId.length];
System.arraycopy(userId, 0, id,0, userId.length);
_properties.setUserId(new Binary(id));
}
}
@Test
public void testEncodeStringBinary32()
{
byte[] payload = createStringPayloadBytes(1372);
assertTrue("Length must be over 255 to ensure use of vbin32 encoding", payload.length > 255);
int encodedSize = 1 + 4 + payload.length; // 1b type + 4b length + content
ByteBuffer expectedEncoding = ByteBuffer.allocate(encodedSize);
expectedEncoding.put((byte) 0xB0);
expectedEncoding.putInt(payload.length);
expectedEncoding.put(payload);
Data data = new DataImpl();
data.putBinary(new Binary(payload));
Binary encoded = data.encode();
assertEquals("unexpected encoding", new Binary(expectedEncoding.array()), encoded);
}
@Override
public Binary readBinary(final Binary defaultValue)
{
byte encodingCode = _buffer.get();
switch (encodingCode)
{
case EncodingCodes.VBIN8:
return (Binary) _constructors[EncodingCodes.VBIN8 & 0xff].readValue();
case EncodingCodes.VBIN32:
return (Binary) _constructors[EncodingCodes.VBIN32 & 0xff].readValue();
case EncodingCodes.NULL:
return defaultValue;
default:
throw new ProtonException("Expected Binary type but found encoding: " + EncodingCodes.toString(encodingCode));
}
}
private void doTestEncodeBinaryTypeReservation(int size) throws IOException {
byte[] data = new byte[size];
for (int i = 0; i < size; ++i) {
data[i] = (byte) (i % 255);
}
Binary binary = new Binary(data);
WritableBuffer writable = new WritableBuffer.ByteBufferWrapper(this.buffer);
WritableBuffer spy = Mockito.spy(writable);
encoder.setByteBuffer(spy);
encoder.writeBinary(binary);
// Check that the BinaryType tries to reserve space, actual encoding size not computed here.
Mockito.verify(spy).ensureRemaining(Mockito.anyInt());
}
@Test(timeout = 20000)
public void testCreateContextWithTransactedSessionMode() throws Exception {
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JMSContext context = testFixture.createJMSContext(testPeer, JMSContext.SESSION_TRANSACTED);
assertEquals(JMSContext.SESSION_TRANSACTED, context.getSessionMode());
// Session should be created and a coordinator should be attached since this
// should be a TX session, then a new TX is declared, once closed the TX should
// be discharged as a roll back.
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
testPeer.expectDeclare(txnId);
testPeer.expectDischarge(txnId, true);
testPeer.expectEnd();
testPeer.expectClose();
context.createTopic("TopicName");
context.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test
public void testGetTextWithUnknownEncodedDataThrowsJMSException() throws Exception {
String encodedString = "myEncodedString";
byte[] encodedBytes = encodedString.getBytes(Charset.forName("UTF-16"));
Message message = Message.Factory.create();
message.setBody(new Data(new Binary(encodedBytes)));
AmqpJmsTextMessageFacade amqpTextMessageFacade = createReceivedTextMessageFacade(createMockAmqpConsumer(), message);
try {
amqpTextMessageFacade.getText();
fail("expected exception not thrown");
} catch (JMSException ise) {
// expected
}
}
@Test
public void testPlainHelperEncodesExpectedResponse() {
TransportImpl transport = new TransportImpl();
SaslImpl sasl = new SaslImpl(transport, 512);
// Use a username + password with a unicode char that encodes
// differently under changing charsets
String username = "username-with-unicode" + "\u1F570";
String password = "password-with-unicode" + "\u1F570";
byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8);
byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8);
byte[] expectedResponseBytes = new byte[usernameBytes.length + passwordBytes.length + 2];
System.arraycopy(usernameBytes, 0, expectedResponseBytes, 1, usernameBytes.length);
System.arraycopy(passwordBytes, 0, expectedResponseBytes, 2 + usernameBytes.length, passwordBytes.length);
sasl.plain(username, password);
assertEquals("Unexpected response data", new Binary(expectedResponseBytes), sasl.getChallengeResponse());
}
private static Binary getBinaryFromMessageBody(ServerJMSBytesMessage message) throws JMSException {
byte[] data = new byte[(int) message.getBodyLength()];
message.readBytes(data);
message.reset(); // Need to reset after readBytes or future readBytes
return new Binary(data);
}
private Message getFakeMessage(final String to, final Buffer payload, final String subject) {
final Message message = mock(Message.class);
when(message.getMessageId()).thenReturn("the-message-id");
when(message.getSubject()).thenReturn(subject);
when(message.getAddress()).thenReturn(to);
if (payload != null) {
final Data data = new Data(new Binary(payload.getBytes()));
when(message.getContentType()).thenReturn("text/plain");
when(message.getBody()).thenReturn(data);
}
return message;
}
/**
* Sets the payload of an AMQP message using a <em>Data</em> section.
*
* @param message The message.
* @param contentType The type of the payload. The message's <em>content-type</em> property
* will only be set if both this and the payload parameter are not {@code null}.
* @param payload The payload or {@code null} if there is no payload to convey in the message body.
*
* @throws NullPointerException If message is {@code null}.
*/
public static void setPayload(final Message message, final String contentType, final byte[] payload) {
Objects.requireNonNull(message);
if (payload != null) {
message.setBody(new Data(new Binary(payload)));
if (contentType != null) {
message.setContentType(contentType);
}
}
}
/**
* Verifies that the helper properly handles malformed JSON payload.
*/
@Test
public void testGetJsonPayloadHandlesMalformedJson() {
final Message msg = ProtonHelper.message();
msg.setBody(new Data(new Binary(new byte[] { 0x01, 0x02, 0x03, 0x04 }))); // not JSON
assertThatThrownBy(() -> MessageHelper.getJsonPayload(msg)).isInstanceOf(DecodeException.class);
}
/**
* Verifies that the helper does not throw an exception when reading
* invalid UTF-8 from a message's payload.
*/
@Test
public void testGetPayloadAsStringHandlesNonCharacterPayload() {
final Message msg = ProtonHelper.message();
msg.setBody(new Data(new Binary(new byte[] { (byte) 0xc3, (byte) 0x28 })));
assertThat(MessageHelper.getPayloadAsString(msg)).isNotNull();
msg.setBody(new Data(new Binary(new byte[] { (byte) 0xf0, (byte) 0x28, (byte) 0x8c, (byte) 0xbc })));
assertThat(MessageHelper.getPayloadAsString(msg)).isNotNull();
}
@Test(timeout=20000)
public void testPassthroughOfRollbackErrorCoordinatorClosedOnCommit() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final String testPeerURI = createPeerURI(testPeer);
LOG.info("Original peer is at: {}", testPeerURI);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
JmsConnection connection = establishAnonymousConnecton(testPeer);
connection.start();
Binary txnId1 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
Binary txnId2 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectDeclare(txnId1);
testPeer.remotelyCloseLastCoordinatorLinkOnDischarge(txnId1, false, true, txnId2);
testPeer.expectCoordinatorAttach();
testPeer.expectDeclare(txnId2);
testPeer.expectDischarge(txnId2, true);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
try {
session.commit();
fail("Transaction should have rolled back");
} catch (TransactionRolledBackException ex) {
LOG.info("Caught expected TransactionRolledBackException");
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
FrameSender(TestAmqpPeer testAmqpPeer, FrameType type, int channel, ListDescribedType frameDescribedType, Binary framePayload)
{
_testAmqpPeer = testAmqpPeer;
_type = type;
_channel = channel;
_frameDescribedType = frameDescribedType;
_framePayload = framePayload;
}
@Override
public void setObject(Serializable value) throws IOException {
if (value == null) {
parent.setBody(NULL_OBJECT_BODY);
} else {
byte[] bytes = getSerializedBytes(value);
parent.setBody(new Data(new Binary(bytes)));
}
localContent = true;
}
@Test
public void testGetTextUsingReceivedMessageWithZeroLengthDataSectionReturnsEmptyString() throws Exception {
org.apache.qpid.proton.codec.Data payloadData = org.apache.qpid.proton.codec.Data.Factory.create();
payloadData.putDescribedType(new DataDescribedType(new Binary(new byte[0])));
Binary b = payloadData.encode();
Message message = Message.Factory.create();
int decoded = message.decode(b.getArray(), b.getArrayOffset(), b.getLength());
assertEquals(decoded, b.getLength());
AmqpJmsTextMessageFacade amqpTextMessageFacade = createReceivedTextMessageFacade(createMockAmqpConsumer(), message);
assertEquals("expected zero-length string", "", amqpTextMessageFacade.getText());
}
@Override
public void handleFlow(Flow flow, Binary payload, Integer channel)
{
TransportSession transportSession = _remoteSessions.get(channel);
if(transportSession == null)
{
// TODO - fail due to attach on non-begun session
}
else
{
transportSession.handleFlow(flow);
}
}
@Override
public void handleTransfer(Transfer transfer, Binary payload, Integer channel)
{
// TODO - check channel < max_channel
TransportSession transportSession = _remoteSessions.get(channel);
if(transportSession != null)
{
transportSession.handleTransfer(transfer, payload);
}
else
{
// TODO - fail due to begin on begun session
}
}
private void handleAccepted(JmsInboundMessageDispatch envelope, Delivery delivery) {
LOG.debug("Accepted Ack of message: {}", envelope);
if (!delivery.remotelySettled()) {
if (session.isTransacted() && !getResourceInfo().isBrowser()) {
if (session.isTransactionFailed()) {
LOG.trace("Skipping ack of message {} in failed transaction.", envelope);
return;
}
Binary txnId = session.getTransactionContext().getAmqpTransactionId();
if (txnId != null) {
delivery.disposition(session.getTransactionContext().getTxnAcceptState());
delivery.settle();
session.getTransactionContext().registerTxConsumer(this);
}
} else {
delivery.disposition(Accepted.getInstance());
delivery.settle();
}
} else {
delivery.settle();
}
if (envelope.isDelivered()) {
deliveredCount--;
}
dispatchedCount--;
}
void doMessageEncodingWithDataBodySectionTestImpl(int bytesLength)
{
byte[] bytes = generateByteArray(bytesLength);
byte[] expectedBytes = generateExpectedDataSectionBytes(bytes);
byte[] encodedBytes = new byte[expectedBytes.length];
Message msg = Message.Factory.create();
msg.setBody(new Data(new Binary(bytes)));
int encodedLength = msg.encode(encodedBytes, 0, encodedBytes.length);
assertArrayEquals("Encoded bytes do not match expectation", expectedBytes, encodedBytes);
assertEquals("Encoded length different than expected length", encodedLength, encodedBytes.length);
}
private void assertDataBodyAsExpected(Section body, int length) {
assertNotNull("Expected body section to be present", body);
assertEquals("Unexpected body section type", Data.class, body.getClass());
Binary value = ((Data) body).getValue();
assertNotNull(value);
assertEquals("Unexpected body length", length, value.getLength());
}
@Test
public void testClearBodyHandlesErrorFromOutputStream() throws Exception {
byte[] bodyBytes = "myOrigBytes".getBytes();
Message message = Message.Factory.create();
message.setBody(new Data(new Binary(bodyBytes)));
AmqpJmsBytesMessageFacade amqpBytesMessageFacade = createReceivedBytesMessageFacade(createMockAmqpConsumer(), message);
OutputStream outputStream = amqpBytesMessageFacade.getOutputStream();
outputStream = substituteMockOutputStream(amqpBytesMessageFacade);
Mockito.doThrow(new IOException()).when(outputStream).close();
amqpBytesMessageFacade.clearBody();
}
@Test
public void testResetHandlesErrorFromOutputStream() throws Exception {
byte[] bodyBytes = "myOrigBytes".getBytes();
Message message = Message.Factory.create();
message.setBody(new Data(new Binary(bodyBytes)));
AmqpJmsBytesMessageFacade amqpBytesMessageFacade = createReceivedBytesMessageFacade(createMockAmqpConsumer(), message);
OutputStream outputStream = amqpBytesMessageFacade.getOutputStream();
outputStream = substituteMockOutputStream(amqpBytesMessageFacade);
Mockito.doThrow(new IOException()).when(outputStream).close();
amqpBytesMessageFacade.reset();
}
@Test
public void testGetUserIdOnReceievedMessageWithEmptyBinaryValue() throws Exception {
byte[] bytes = new byte[0];
Message message = Proton.message();
Properties props = new Properties();
props.setUserId(new Binary(bytes));
message.setProperties(props);
AmqpJmsMessageFacade amqpMessageFacade = createReceivedMessageFacade(createMockAmqpConsumer(), message);
assertNull("Expected a userid on received message", amqpMessageFacade.getUserId());
}
@Test
public void testEncodeArrayOfMaps()
{
// encode an array of two empty maps
Data data = new DataImpl();
data.putArray(false, Data.DataType.MAP);
data.enter();
data.putMap();
data.putMap();
data.exit();
int expectedEncodedSize = 8; // 1b type + 1b size + 1b length + 1b element constructor + 2 * (1b size + 1b count)
Binary encoded = data.encode();
assertEquals("unexpected encoding size", expectedEncodedSize, encoded.getLength());
ByteBuffer expectedEncoding = ByteBuffer.allocate(expectedEncodedSize);
expectedEncoding.put((byte) 0xe0); // constructor
expectedEncoding.put((byte) 6); // size
expectedEncoding.put((byte) 2); // count
expectedEncoding.put((byte) 0xc1); // element constructor
expectedEncoding.put((byte)1); // size
expectedEncoding.put((byte)0); // count
expectedEncoding.put((byte)1); // size
expectedEncoding.put((byte)0); // count
assertEquals("unexpected encoding", new Binary(expectedEncoding.array()), encoded);
}
/**
* Takes the provided id string and return the appropriate amqp messageId style object.
* Converts the type based on any relevant encoding information found as a prefix.
*
* @param origId the object to be converted
* @return the amqp messageId style object
* @throws IdConversionException if the provided baseId String indicates an encoded type but can't be converted to that type.
*/
public static Object toIdObject(final String origId) throws IdConversionException {
if (origId == null) {
return null;
}
if (!AmqpMessageIdHelper.hasMessageIdPrefix(origId)) {
// We have a string without any "ID:" prefix, it is an
// application-specific String, use it as-is.
return origId;
}
try {
if (hasAmqpNoPrefix(origId, JMS_ID_PREFIX_LENGTH)) {
// Prefix telling us there was originally no "ID:" prefix,
// strip it and return the remainder
return origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_NO_PREFIX_LENGTH);
} else if (hasAmqpUuidPrefix(origId, JMS_ID_PREFIX_LENGTH)) {
String uuidString = origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_UUID_PREFIX_LENGTH);
return UUID.fromString(uuidString);
} else if (hasAmqpUlongPrefix(origId, JMS_ID_PREFIX_LENGTH)) {
String ulongString = origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_ULONG_PREFIX_LENGTH);
return UnsignedLong.valueOf(ulongString);
} else if (hasAmqpStringPrefix(origId, JMS_ID_PREFIX_LENGTH)) {
return origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_STRING_PREFIX_LENGTH);
} else if (hasAmqpBinaryPrefix(origId, JMS_ID_PREFIX_LENGTH)) {
String hexString = origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_BINARY_PREFIX_LENGTH);
byte[] bytes = convertHexStringToBinary(hexString);
return new Binary(bytes);
} else {
// We have a string without any encoding prefix needing processed,
// so transmit it as-is, including the "ID:"
return origId;
}
} catch (IllegalArgumentException e) {
throw new IdConversionException("Unable to convert ID value", e);
}
}
/**
* Test that clearing the body on a message results in the underlying body
* section being set with the null object body, ensuring getObject returns null.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testClearBodyWithExistingSerializedBodySection() throws Exception {
Message protonMessage = Message.Factory.create();
protonMessage.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString());
protonMessage.setBody(new Data(new Binary(new byte[0])));
AmqpJmsObjectMessageFacade amqpObjectMessageFacade = createReceivedObjectMessageFacade(createMockAmqpConsumer(), protonMessage);
assertNotNull("Expected existing body section to be found", amqpObjectMessageFacade.getBody());
amqpObjectMessageFacade.clearBody();
assertSame("Expected existing body section to be replaced", AmqpSerializedObjectDelegate.NULL_OBJECT_BODY, amqpObjectMessageFacade.getBody());
assertNull("Expected null object", amqpObjectMessageFacade.getObject());
}
/**
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a Binary
* when given a string indicating an encoded AMQP binary id, using upper case
* hex characters
*
* @throws Exception
* if an error occurs during the test.
*/
@Test
public void testToIdObjectWithEncodedBinaryUppercaseHexString() throws Exception {
byte[] bytes = new byte[] {(byte) 0x00, (byte) 0xAB, (byte) 0x09, (byte) 0xFF};
Binary binaryId = new Binary(bytes);
String provided = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00AB09FF";
doToIdObjectTestImpl(provided, binaryId);
}