类org.apache.kafka.common.errors.SerializationException源码实例Demo

下面列出了怎么用org.apache.kafka.common.errors.SerializationException的API类实例代码及写法,或者点击链接到github查看源代码。

@Override
public void accept(Consumer<?, ?> consumer, RuntimeException e) {
    if (e instanceof SerializationException) {
        Matcher m = EXCEPTION_PATTERN.matcher(e.getMessage());
        if (m.find()) {
            String topic = m.group(1);
            int partition = Integer.parseInt(m.group(2));
            long errorOffset = Long.parseLong(m.group(3));
            TopicPartition tp = new TopicPartition(topic, partition);
            long currentOffset = consumer.position(tp);
            log.error("SerializationException - skipping records in partition {} from offset {} up to and including error offset {}",
                    tp, currentOffset, errorOffset, e);
            consumer.seek(tp, errorOffset + 1);
        } else {
            super.accept(consumer, e);
        }
    } else {
        super.accept(consumer, e);
    }
}
 
源代码2 项目: kareldb   文件: KafkaValueSerializer.java
@Override
public byte[] serialize(String topic, NavigableMap<Long, VersionedValue> object) {
    if (object == null) {
        return null;
    }
    try {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        out.write(MAGIC_BYTE);
        out.write(ByteBuffer.allocate(VERSION_SIZE).putInt(version).array());
        BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null);
        writer.write(toArray(object), encoder);
        encoder.flush();
        byte[] bytes = out.toByteArray();
        out.close();
        return bytes;
    } catch (IOException | RuntimeException e) {
        // avro serialization can throw AvroRuntimeException, NullPointerException,
        // ClassCastException, etc
        LOG.error("Error serializing Avro value " + e.getMessage());
        throw new SerializationException("Error serializing Avro value", e);
    }
}
 
源代码3 项目: kareldb   文件: KafkaKeySerializer.java
@Override
public byte[] serialize(String topic, Comparable[] object) {
    if (object == null) {
        return null;
    }
    try {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null);
        writer.write(toRecord(object), encoder);
        encoder.flush();
        byte[] bytes = out.toByteArray();
        out.close();
        return bytes;
    } catch (IOException | RuntimeException e) {
        // avro serialization can throw AvroRuntimeException, NullPointerException,
        // ClassCastException, etc
        LOG.error("Error serializing Avro key " + e.getMessage());
        throw new SerializationException("Error serializing Avro key", e);
    }
}
 
源代码4 项目: kop   文件: KopProtocolHandlerTestBase.java
public static Integer kafkaIntDeserialize(byte[] data) {
    if (data == null) {
        return null;
    }

    if (data.length != 4) {
        throw new SerializationException("Size of data received by IntegerDeserializer is not 4");
    }

    int value = 0;
    for (byte b : data) {
        value <<= 8;
        value |= b & 0xFF;
    }
    return value;
}
 
@Override
public void handle(KafkaListenerException exception) {
    final Throwable cause = exception.getCause();
    final Object consumerBean = exception.getKafkaListener();
    if (cause instanceof SerializationException) {
        if (LOG.isErrorEnabled()) {
            LOG.error("Kafka consumer [" + consumerBean + "] failed to deserialize value: " + cause.getMessage(), cause);
        }

        if (skipRecordOnDeserializationFailure) {
            final Consumer<?, ?> kafkaConsumer = exception.getKafkaConsumer();
            seekPastDeserializationError((SerializationException) cause, consumerBean, kafkaConsumer);
        }
    } else {
        if (LOG.isErrorEnabled()) {
            Optional<ConsumerRecord<?, ?>> consumerRecord = exception.getConsumerRecord();
            if (consumerRecord.isPresent()) {
                LOG.error("Error processing record [" + consumerRecord + "] for Kafka consumer [" + consumerBean + "] produced error: " + cause.getMessage(), cause);

            } else {
                LOG.error("Kafka consumer [" + consumerBean + "] produced error: " + cause.getMessage(), cause);
            }
        }
    }
}
 
/**
 * Seeks past a serialization exception if an error occurs.
 * @param cause The cause
 * @param consumerBean The consumer bean
 * @param kafkaConsumer The kafka consumer
 */
protected void seekPastDeserializationError(
        @Nonnull SerializationException cause,
        @Nonnull Object consumerBean,
        @Nonnull Consumer<?, ?> kafkaConsumer) {
    try {
        final String message = cause.getMessage();
        final Matcher matcher = SERIALIZATION_EXCEPTION_MESSAGE_PATTERN.matcher(message);
        if (matcher.find()) {
            final String topic = matcher.group(1);
            final int partition = Integer.valueOf(matcher.group(2));
            final int offset = Integer.valueOf(matcher.group(3));
            TopicPartition tp = new TopicPartition(topic, partition);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Seeking past unserializable consumer record for partition {}-{} and offset {}", topic, partition, offset);
            }
            kafkaConsumer.seek(tp, offset + 1);
        }
    } catch (Throwable e) {
        if (LOG.isErrorEnabled()) {
            LOG.error("Kafka consumer [" + consumerBean + "] failed to seek past unserializable value: " + e.getMessage(), e);
        }
    }
}
 
@Override
public byte[] serialize(final String topic, final GenericRow genericRow) {
  if (genericRow == null) {
    return null;
  }
  try {
    GenericRecord avroRecord = new GenericData.Record(avroSchema);
    for (int i = 0; i < genericRow.getColumns().size(); i++) {
      Schema schema = getNonNullSchema(fields.get(i).schema());
      if (schema.getType() == Schema.Type.ARRAY) {
        if (genericRow.getColumns().get(i) != null) {
          avroRecord.put(
              fields.get(i).name(),
              Arrays.asList((Object[]) genericRow.getColumns().get(i))
          );
        }
      } else {
        avroRecord.put(fields.get(i).name(), genericRow.getColumns().get(i));
      }
    }
    return kafkaAvroSerializer.serialize(topic, avroRecord);
  } catch (Exception e) {
    throw new SerializationException(e);
  }
}
 
@Override
public byte[] serialize(final String topic, final GenericRow genericRow) {
  if (genericRow == null) {
    return null;
  }
  try {
    StringWriter stringWriter = new StringWriter();
    CSVPrinter csvPrinter = new CSVPrinter(stringWriter, CSVFormat.DEFAULT);
    csvPrinter.printRecord(genericRow.getColumns());
    String result = stringWriter.toString();
    return result.substring(0, result.length() - 2).getBytes(StandardCharsets.UTF_8);
  } catch (Exception e) {
    throw new SerializationException("Error serializing CSV message", e);
  }

}
 
源代码9 项目: kafka-graphs   文件: TopicPartitionDeserializer.java
@Override
public TopicPartition deserialize(String topic, byte[] data) {
    if (data == null || data.length == 0) {
        return null;
    }
    try {
        ByteBuffer buf = ByteBuffer.wrap(data);
        int topicLength = buf.getInt();
        byte[] topicBytes = new byte[topicLength];
        buf.get(topicBytes);
        String otherTopic = new String(topicBytes, ENCODING);
        int partition = buf.getInt();

        return new TopicPartition(otherTopic, partition);
    } catch (UnsupportedEncodingException e) {
        throw new SerializationException("Error when deserializing byte[] to string");
    }
}
 
源代码10 项目: kafka-graphs   文件: TopicPartitionSerializer.java
@Override
public byte[] serialize(String topic, TopicPartition data) {
    if (data == null) {
        return null;
    }
    try {
        byte[] topicBytes = data.topic().getBytes(ENCODING);

        ByteBuffer buf = ByteBuffer.allocate(ARRAY_LENGTH_SIZE + topicBytes.length
            + PARTITION_SIZE);
        buf.putInt(topicBytes.length);
        buf.put(topicBytes);
        buf.putInt(data.partition());
        return buf.array();
    } catch (UnsupportedEncodingException e) {
        throw new SerializationException("Error when serializing string to byte[]");
    }
}
 
源代码11 项目: apicurio-registry   文件: AbstractKafkaSerDe.java
public static ByteBuffer getByteBuffer(byte[] payload) {
    ByteBuffer buffer = ByteBuffer.wrap(payload);
    if (buffer.get() != MAGIC_BYTE) {
        throw new SerializationException("Unknown magic byte!");
    }
    return buffer;
}
 
源代码12 项目: apicurio-registry   文件: NonRecordContainer.java
public NonRecordContainer(Schema schema, T value) {
    if (schema == null) {
        throw new SerializationException("Schema may not be null.");
    }
    this.schema = schema;
    this.value = value;
}
 
源代码13 项目: apicurio-registry   文件: AvroSchemaUtils.java
static Schema getReflectSchema(Object object) {
    Class<?> clazz = (object instanceof Class) ? (Class) object : object.getClass();
    Schema schema = ReflectData.get().getSchema(clazz);
    if (schema == null) {
        throw new SerializationException("No schema for class: " + clazz.getName());
    }
    return schema;
}
 
源代码14 项目: apicurio-registry   文件: RecordIdStrategy.java
@Override
public String artifactId(String topic, boolean isKey, Schema schema) {
    if (schema != null && schema.getType() == Schema.Type.RECORD) {
        return schema.getFullName();
    }
    throw new SerializationException("The message must only be an Avro record schema!");
}
 
源代码15 项目: kareldb   文件: KafkaSchemaValueDeserializer.java
@Override
public KafkaSchemaValue deserialize(String topic, byte[] value) throws SerializationException {
    try {
        return new ObjectMapper().readValue(value, KafkaSchemaValue.class);
    } catch (IOException e) {
        throw new SerializationException("Error while deserializing schema value", e);
    }
}
 
源代码16 项目: kareldb   文件: KafkaValueDeserializer.java
@Override
public NavigableMap<Long, VersionedValue> deserialize(String topic, byte[] payload) throws SerializationException {
    if (payload == null) {
        return null;
    }
    try {
        ByteBuffer buffer = getByteBuffer(payload);
        int version = buffer.getInt();
        int length = buffer.limit() - 1 - VERSION_SIZE;
        int start = buffer.position() + buffer.arrayOffset();
        DatumReader<GenericArray<GenericRecord>> reader = readers.get(version);
        if (reader == null) {
            KafkaSchema schema = (KafkaSchema) table.getSchema();
            KafkaSchemaValue schemaValue = schema.getSchemaValue(table.getName(), version);
            Schema writerSchema = AvroUtils.parseSchema(schemaValue.getSchema());
            Pair<Schema, Schema> schemas = getKeyValueSchemas(writerSchema);
            Schema valueSchema = schemas.right;
            reader = new GenericDatumReader<>(valueSchema, avroSchema, KafkaTable.GENERIC);
            readers.put(version, reader);
        }
        GenericArray<GenericRecord> array = reader.read(
            null, decoderFactory.binaryDecoder(buffer.array(), start, length, null));
        return toValue(array);
    } catch (IOException | RuntimeException e) {
        // avro deserialization may throw AvroRuntimeException, NullPointerException, etc
        LOG.error("Error deserializing Avro value " + e.getMessage());
        throw new SerializationException("Error deserializing Avro value", e);
    }
}
 
源代码17 项目: kareldb   文件: KafkaValueDeserializer.java
private ByteBuffer getByteBuffer(byte[] payload) {
    ByteBuffer buffer = ByteBuffer.wrap(payload);
    if (buffer.get() != MAGIC_BYTE) {
        throw new SerializationException("Unknown magic byte!");
    }
    return buffer;
}
 
源代码18 项目: kareldb   文件: KafkaSchemaValueSerializer.java
@Override
public byte[] serialize(String topic, KafkaSchemaValue value) {
    try {
        return new ObjectMapper().writeValueAsBytes(value);
    } catch (IOException e) {
        throw new SerializationException("Error while serializing schema value " + value.toString(),
            e);
    }
}
 
源代码19 项目: kareldb   文件: KafkaSchemaKeyDeserializer.java
@Override
public KafkaSchemaKey deserialize(String topic, byte[] key) throws SerializationException {
    try {
        return new ObjectMapper().readValue(key, KafkaSchemaKey.class);
    } catch (IOException e) {
        throw new SerializationException("Error while deserializing schema key "
            + new String(key, StandardCharsets.UTF_8), e);
    }
}
 
源代码20 项目: kareldb   文件: KafkaSchemaKeySerializer.java
@Override
public byte[] serialize(String topic, KafkaSchemaKey key) {
    try {
        return new ObjectMapper().writeValueAsBytes(key);
    } catch (IOException e) {
        throw new SerializationException("Error while serializing schema key " + key.toString(),
            e);
    }
}
 
源代码21 项目: kareldb   文件: KafkaKeyDeserializer.java
@Override
public Comparable[] deserialize(String topic, byte[] payload) throws SerializationException {
    if (payload == null) {
        return null;
    }
    try {
        GenericRecord record = reader.read(null, decoderFactory.binaryDecoder(payload, null));
        return toKey(record);
    } catch (IOException | RuntimeException e) {
        // avro deserialization may throw AvroRuntimeException, NullPointerException, etc
        LOG.error("Error deserializing Avro key " + e.getMessage());
        throw new SerializationException("Error deserializing Avro key", e);
    }
}
 
源代码22 项目: kafka_book_demo   文件: CompanyDeserailizer.java
public Company deserialize(String topic, byte[] data) {
    if (data == null) {
        return null;
    }
    if (data.length < 8) {
        throw new SerializationException("Size of data received " +
                "by DemoDeserializer is shorter than expected!");
    }
    ByteBuffer buffer = ByteBuffer.wrap(data);
    int nameLen, addressLen;
    String name, address;

    nameLen = buffer.getInt();
    byte[] nameBytes = new byte[nameLen];
    buffer.get(nameBytes);
    addressLen = buffer.getInt();
    byte[] addressBytes = new byte[addressLen];
    buffer.get(addressBytes);

    try {
        name = new String(nameBytes, "UTF-8");
        address = new String(addressBytes, "UTF-8");
    } catch (UnsupportedEncodingException e) {
        throw new SerializationException("Error occur when deserializing!");
    }

    return Company.builder().name(name).address(address).build();
}
 
源代码23 项目: kafka_book_demo   文件: StringDeserializer.java
@Override
public String deserialize(String topic, byte[] data) {
    try {
        if (data == null)
            return null;
        else
            return new String(data, encoding);
    } catch (UnsupportedEncodingException e) {
        throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
    }
}
 
源代码24 项目: kafka_book_demo   文件: StringSerializer.java
@Override
public byte[] serialize(String topic, String data) {
    try {
        if (data == null)
            return null;
        else
            return data.getBytes(encoding);
    } catch (UnsupportedEncodingException e) {
        throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
    }
}
 
@Override
public byte[] serialize(final String topic, final T data) {
    if (data == null)
        return null;

    try {
        return this.objectMapper.writeValueAsBytes(data);
    } catch (final IOException e) {
        throw new SerializationException("Error serializing JSON message", e);
    }
}
 
@Override
public T deserialize(final String topic, final byte[] bytes) {
    if (bytes == null)
        return null;

    try {
        return this.objectMapper.readValue(bytes, this.clazz);
    } catch (final IOException e) {
        throw new SerializationException(e);
    }
}
 
@Test
public void applySchemalessKeyBytesTooShort() {
    configure(true);

    // allocate enough space for the magic-byte
    byte[] b = ByteBuffer.allocate(1).array();
    ConnectRecord record = createRecord(null, b, null, null);

    // The key payload is not long enough for schema registry wire-format
    assertThrows(SerializationException.class, () -> smt.apply(record));
}
 
@Test
public void applySchemalessValueBytesTooShort() {
    configure(false);

    // allocate enough space for the magic-byte
    byte[] b = ByteBuffer.allocate(1).array();
    ConnectRecord record = createRecord(null, null, null, b);

    // The value payload is not long enough for schema registry wire-format
    assertThrows(SerializationException.class, () -> smt.apply(record));
}
 
源代码29 项目: alcor   文件: GoalStateDeserializer.java
public GoalState deserialize(String topic, byte[] data) {
    try {
        return data == null ? null : GoalState.parseFrom(data);

    } catch (InvalidProtocolBufferException bf_exp) {
        throw new SerializationException("Error when deserializing byte[] to string due to invalid protobuf exception " + bf_exp);
    }
}
 
源代码30 项目: SkaETL   文件: JsonNodeDeserializer.java
@Override
public JsonNode deserialize(String s, byte[] bytes) {
    if (bytes == null) {
        return null;
    }
    try {
        return JSONUtils.getInstance().parseWithError(new String(bytes));
    } catch (IOException e) {
        throw new SerializationException(e);
    }
}
 
 类所在包
 类方法
 同包方法