使用 Spring-Integration-Kafka,使用 outbound-channel-adapter 我正在尝试将消息发送到名为“ test ”的主题
通过命令行终端,我启动了zookeeper、kafka并创建了名为“test”的主题
春季 XML 配置
<int:publish-subscribe-channel id="inputToKafka" />
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
auto-startup="false"
channel="inputToKafka"
kafka-template="template"
sync="true"
topic="test">
</int-kafka:outbound-channel-adapter>
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
JUnit 测试代码
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {
"classpath:kafka-outbound-context.xml"
})
public class ProducerTest{
@Autowired
@Qualifier("inputToKafka")
MessageChannel channel;
@Test
public void test_send_message() {
channel.send(MessageBuilder.withPayload("Test Message")
.setHeader(KafkaHeaders.TOPIC, "test").build());
}
}
测试用例成功并且在调试时我发现 channel.send() 返回 true
我使用以下命令通过命令行检查主题,但在测试主题中没有看到任何消息。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
有人可以为什么我在我的测试主题上看不到任何消息吗?
你看过日志吗?您需要配置键和值序列化程序,否则您会得到
使用java时:
地图键是
key.serializer
和value.serializer
。