下面列出了怎么用org.apache.kafka.common.errors.SerializationException的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void accept(Consumer<?, ?> consumer, RuntimeException e) {
if (e instanceof SerializationException) {
Matcher m = EXCEPTION_PATTERN.matcher(e.getMessage());
if (m.find()) {
String topic = m.group(1);
int partition = Integer.parseInt(m.group(2));
long errorOffset = Long.parseLong(m.group(3));
TopicPartition tp = new TopicPartition(topic, partition);
long currentOffset = consumer.position(tp);
log.error("SerializationException - skipping records in partition {} from offset {} up to and including error offset {}",
tp, currentOffset, errorOffset, e);
consumer.seek(tp, errorOffset + 1);
} else {
super.accept(consumer, e);
}
} else {
super.accept(consumer, e);
}
}
@Override
public byte[] serialize(String topic, NavigableMap<Long, VersionedValue> object) {
if (object == null) {
return null;
}
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(MAGIC_BYTE);
out.write(ByteBuffer.allocate(VERSION_SIZE).putInt(version).array());
BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null);
writer.write(toArray(object), encoder);
encoder.flush();
byte[] bytes = out.toByteArray();
out.close();
return bytes;
} catch (IOException | RuntimeException e) {
// avro serialization can throw AvroRuntimeException, NullPointerException,
// ClassCastException, etc
LOG.error("Error serializing Avro value " + e.getMessage());
throw new SerializationException("Error serializing Avro value", e);
}
}
@Override
public byte[] serialize(String topic, Comparable[] object) {
if (object == null) {
return null;
}
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null);
writer.write(toRecord(object), encoder);
encoder.flush();
byte[] bytes = out.toByteArray();
out.close();
return bytes;
} catch (IOException | RuntimeException e) {
// avro serialization can throw AvroRuntimeException, NullPointerException,
// ClassCastException, etc
LOG.error("Error serializing Avro key " + e.getMessage());
throw new SerializationException("Error serializing Avro key", e);
}
}
public static Integer kafkaIntDeserialize(byte[] data) {
if (data == null) {
return null;
}
if (data.length != 4) {
throw new SerializationException("Size of data received by IntegerDeserializer is not 4");
}
int value = 0;
for (byte b : data) {
value <<= 8;
value |= b & 0xFF;
}
return value;
}
@Override
public void handle(KafkaListenerException exception) {
final Throwable cause = exception.getCause();
final Object consumerBean = exception.getKafkaListener();
if (cause instanceof SerializationException) {
if (LOG.isErrorEnabled()) {
LOG.error("Kafka consumer [" + consumerBean + "] failed to deserialize value: " + cause.getMessage(), cause);
}
if (skipRecordOnDeserializationFailure) {
final Consumer<?, ?> kafkaConsumer = exception.getKafkaConsumer();
seekPastDeserializationError((SerializationException) cause, consumerBean, kafkaConsumer);
}
} else {
if (LOG.isErrorEnabled()) {
Optional<ConsumerRecord<?, ?>> consumerRecord = exception.getConsumerRecord();
if (consumerRecord.isPresent()) {
LOG.error("Error processing record [" + consumerRecord + "] for Kafka consumer [" + consumerBean + "] produced error: " + cause.getMessage(), cause);
} else {
LOG.error("Kafka consumer [" + consumerBean + "] produced error: " + cause.getMessage(), cause);
}
}
}
}
/**
* Seeks past a serialization exception if an error occurs.
* @param cause The cause
* @param consumerBean The consumer bean
* @param kafkaConsumer The kafka consumer
*/
protected void seekPastDeserializationError(
@Nonnull SerializationException cause,
@Nonnull Object consumerBean,
@Nonnull Consumer<?, ?> kafkaConsumer) {
try {
final String message = cause.getMessage();
final Matcher matcher = SERIALIZATION_EXCEPTION_MESSAGE_PATTERN.matcher(message);
if (matcher.find()) {
final String topic = matcher.group(1);
final int partition = Integer.valueOf(matcher.group(2));
final int offset = Integer.valueOf(matcher.group(3));
TopicPartition tp = new TopicPartition(topic, partition);
if (LOG.isDebugEnabled()) {
LOG.debug("Seeking past unserializable consumer record for partition {}-{} and offset {}", topic, partition, offset);
}
kafkaConsumer.seek(tp, offset + 1);
}
} catch (Throwable e) {
if (LOG.isErrorEnabled()) {
LOG.error("Kafka consumer [" + consumerBean + "] failed to seek past unserializable value: " + e.getMessage(), e);
}
}
}
@Override
public byte[] serialize(final String topic, final GenericRow genericRow) {
if (genericRow == null) {
return null;
}
try {
GenericRecord avroRecord = new GenericData.Record(avroSchema);
for (int i = 0; i < genericRow.getColumns().size(); i++) {
Schema schema = getNonNullSchema(fields.get(i).schema());
if (schema.getType() == Schema.Type.ARRAY) {
if (genericRow.getColumns().get(i) != null) {
avroRecord.put(
fields.get(i).name(),
Arrays.asList((Object[]) genericRow.getColumns().get(i))
);
}
} else {
avroRecord.put(fields.get(i).name(), genericRow.getColumns().get(i));
}
}
return kafkaAvroSerializer.serialize(topic, avroRecord);
} catch (Exception e) {
throw new SerializationException(e);
}
}
@Override
public byte[] serialize(final String topic, final GenericRow genericRow) {
if (genericRow == null) {
return null;
}
try {
StringWriter stringWriter = new StringWriter();
CSVPrinter csvPrinter = new CSVPrinter(stringWriter, CSVFormat.DEFAULT);
csvPrinter.printRecord(genericRow.getColumns());
String result = stringWriter.toString();
return result.substring(0, result.length() - 2).getBytes(StandardCharsets.UTF_8);
} catch (Exception e) {
throw new SerializationException("Error serializing CSV message", e);
}
}
@Override
public TopicPartition deserialize(String topic, byte[] data) {
if (data == null || data.length == 0) {
return null;
}
try {
ByteBuffer buf = ByteBuffer.wrap(data);
int topicLength = buf.getInt();
byte[] topicBytes = new byte[topicLength];
buf.get(topicBytes);
String otherTopic = new String(topicBytes, ENCODING);
int partition = buf.getInt();
return new TopicPartition(otherTopic, partition);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing byte[] to string");
}
}
@Override
public byte[] serialize(String topic, TopicPartition data) {
if (data == null) {
return null;
}
try {
byte[] topicBytes = data.topic().getBytes(ENCODING);
ByteBuffer buf = ByteBuffer.allocate(ARRAY_LENGTH_SIZE + topicBytes.length
+ PARTITION_SIZE);
buf.putInt(topicBytes.length);
buf.put(topicBytes);
buf.putInt(data.partition());
return buf.array();
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[]");
}
}
public static ByteBuffer getByteBuffer(byte[] payload) {
ByteBuffer buffer = ByteBuffer.wrap(payload);
if (buffer.get() != MAGIC_BYTE) {
throw new SerializationException("Unknown magic byte!");
}
return buffer;
}
public NonRecordContainer(Schema schema, T value) {
if (schema == null) {
throw new SerializationException("Schema may not be null.");
}
this.schema = schema;
this.value = value;
}
static Schema getReflectSchema(Object object) {
Class<?> clazz = (object instanceof Class) ? (Class) object : object.getClass();
Schema schema = ReflectData.get().getSchema(clazz);
if (schema == null) {
throw new SerializationException("No schema for class: " + clazz.getName());
}
return schema;
}
@Override
public String artifactId(String topic, boolean isKey, Schema schema) {
if (schema != null && schema.getType() == Schema.Type.RECORD) {
return schema.getFullName();
}
throw new SerializationException("The message must only be an Avro record schema!");
}
@Override
public KafkaSchemaValue deserialize(String topic, byte[] value) throws SerializationException {
try {
return new ObjectMapper().readValue(value, KafkaSchemaValue.class);
} catch (IOException e) {
throw new SerializationException("Error while deserializing schema value", e);
}
}
@Override
public NavigableMap<Long, VersionedValue> deserialize(String topic, byte[] payload) throws SerializationException {
if (payload == null) {
return null;
}
try {
ByteBuffer buffer = getByteBuffer(payload);
int version = buffer.getInt();
int length = buffer.limit() - 1 - VERSION_SIZE;
int start = buffer.position() + buffer.arrayOffset();
DatumReader<GenericArray<GenericRecord>> reader = readers.get(version);
if (reader == null) {
KafkaSchema schema = (KafkaSchema) table.getSchema();
KafkaSchemaValue schemaValue = schema.getSchemaValue(table.getName(), version);
Schema writerSchema = AvroUtils.parseSchema(schemaValue.getSchema());
Pair<Schema, Schema> schemas = getKeyValueSchemas(writerSchema);
Schema valueSchema = schemas.right;
reader = new GenericDatumReader<>(valueSchema, avroSchema, KafkaTable.GENERIC);
readers.put(version, reader);
}
GenericArray<GenericRecord> array = reader.read(
null, decoderFactory.binaryDecoder(buffer.array(), start, length, null));
return toValue(array);
} catch (IOException | RuntimeException e) {
// avro deserialization may throw AvroRuntimeException, NullPointerException, etc
LOG.error("Error deserializing Avro value " + e.getMessage());
throw new SerializationException("Error deserializing Avro value", e);
}
}
private ByteBuffer getByteBuffer(byte[] payload) {
ByteBuffer buffer = ByteBuffer.wrap(payload);
if (buffer.get() != MAGIC_BYTE) {
throw new SerializationException("Unknown magic byte!");
}
return buffer;
}
@Override
public byte[] serialize(String topic, KafkaSchemaValue value) {
try {
return new ObjectMapper().writeValueAsBytes(value);
} catch (IOException e) {
throw new SerializationException("Error while serializing schema value " + value.toString(),
e);
}
}
@Override
public KafkaSchemaKey deserialize(String topic, byte[] key) throws SerializationException {
try {
return new ObjectMapper().readValue(key, KafkaSchemaKey.class);
} catch (IOException e) {
throw new SerializationException("Error while deserializing schema key "
+ new String(key, StandardCharsets.UTF_8), e);
}
}
@Override
public byte[] serialize(String topic, KafkaSchemaKey key) {
try {
return new ObjectMapper().writeValueAsBytes(key);
} catch (IOException e) {
throw new SerializationException("Error while serializing schema key " + key.toString(),
e);
}
}
@Override
public Comparable[] deserialize(String topic, byte[] payload) throws SerializationException {
if (payload == null) {
return null;
}
try {
GenericRecord record = reader.read(null, decoderFactory.binaryDecoder(payload, null));
return toKey(record);
} catch (IOException | RuntimeException e) {
// avro deserialization may throw AvroRuntimeException, NullPointerException, etc
LOG.error("Error deserializing Avro key " + e.getMessage());
throw new SerializationException("Error deserializing Avro key", e);
}
}
public Company deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
if (data.length < 8) {
throw new SerializationException("Size of data received " +
"by DemoDeserializer is shorter than expected!");
}
ByteBuffer buffer = ByteBuffer.wrap(data);
int nameLen, addressLen;
String name, address;
nameLen = buffer.getInt();
byte[] nameBytes = new byte[nameLen];
buffer.get(nameBytes);
addressLen = buffer.getInt();
byte[] addressBytes = new byte[addressLen];
buffer.get(addressBytes);
try {
name = new String(nameBytes, "UTF-8");
address = new String(addressBytes, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error occur when deserializing!");
}
return Company.builder().name(name).address(address).build();
}
@Override
public String deserialize(String topic, byte[] data) {
try {
if (data == null)
return null;
else
return new String(data, encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
}
}
@Override
public byte[] serialize(String topic, String data) {
try {
if (data == null)
return null;
else
return data.getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
}
@Override
public byte[] serialize(final String topic, final T data) {
if (data == null)
return null;
try {
return this.objectMapper.writeValueAsBytes(data);
} catch (final IOException e) {
throw new SerializationException("Error serializing JSON message", e);
}
}
@Override
public T deserialize(final String topic, final byte[] bytes) {
if (bytes == null)
return null;
try {
return this.objectMapper.readValue(bytes, this.clazz);
} catch (final IOException e) {
throw new SerializationException(e);
}
}
@Test
public void applySchemalessKeyBytesTooShort() {
configure(true);
// allocate enough space for the magic-byte
byte[] b = ByteBuffer.allocate(1).array();
ConnectRecord record = createRecord(null, b, null, null);
// The key payload is not long enough for schema registry wire-format
assertThrows(SerializationException.class, () -> smt.apply(record));
}
@Test
public void applySchemalessValueBytesTooShort() {
configure(false);
// allocate enough space for the magic-byte
byte[] b = ByteBuffer.allocate(1).array();
ConnectRecord record = createRecord(null, null, null, b);
// The value payload is not long enough for schema registry wire-format
assertThrows(SerializationException.class, () -> smt.apply(record));
}
public GoalState deserialize(String topic, byte[] data) {
try {
return data == null ? null : GoalState.parseFrom(data);
} catch (InvalidProtocolBufferException bf_exp) {
throw new SerializationException("Error when deserializing byte[] to string due to invalid protobuf exception " + bf_exp);
}
}
@Override
public JsonNode deserialize(String s, byte[] bytes) {
if (bytes == null) {
return null;
}
try {
return JSONUtils.getInstance().parseWithError(new String(bytes));
} catch (IOException e) {
throw new SerializationException(e);
}
}