Spring-Integration-Kafka outbound-channel-adapter 发送消息

IT小君   2022-09-15T08:18:06

使用 Spring-Integration-Kafka,使用 outbound-channel-adapter 我正在尝试将消息发送到名为“ test ”的主题

通过命令行终端,我启动了zookeeper、kafka并创建了名为“test”的主题

春季 XML 配置

<int:publish-subscribe-channel id="inputToKafka" />

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    auto-startup="false"
                                    channel="inputToKafka"
                                    kafka-template="template"
                                    sync="true"
                                    topic="test">
</int-kafka:outbound-channel-adapter>

<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
</bean>

JUnit 测试代码

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {
        "classpath:kafka-outbound-context.xml"
        })
public class ProducerTest{

    @Autowired
    @Qualifier("inputToKafka")
    MessageChannel channel;

    @Test
    public void test_send_message() {

        channel.send(MessageBuilder.withPayload("Test Message")
                .setHeader(KafkaHeaders.TOPIC, "test").build());

    }

}

测试用例成功并且在调试时我发现 channel.send() 返回 true

我使用以下命令通过命令行检查主题,但在测试主题中没有看到任何消息。

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

有人可以为什么我在我的测试主题上看不到任何消息吗?

点击广告,支持我们为你提供更好的服务
评论(1)
IT小君

你看过日志吗?您需要配置键和值序列化程序,否则您会得到

Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "key.serializer" which has no default value.

使用java时:

    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

地图键是key.serializervalue.serializer

2022-09-15T08:18:06   回复