下面列出了怎么用org.apache.kafka.common.requests.AbstractRequest的API类实例代码及写法,或者点击链接到github查看源代码。
protected void writeAndFlushWhenInactiveChannel(Channel channel) {
// loop from first responseFuture, and return them all
while (requestsQueue != null && requestsQueue.peek() != null) {
try {
ResponseAndRequest pair = requestsQueue.remove();
if (log.isDebugEnabled()) {
log.debug("Channel Closing! Write kafka cmd responseFuture back to client. request: {}",
pair.getRequest().getHeader());
}
AbstractRequest request = pair.getRequest().getRequest();
AbstractResponse apiResponse = request
.getErrorResponse(new LeaderNotAvailableException("Channel is closing!"));
pair.getResponseFuture().complete(apiResponse);
ByteBuf result = responseToByteBuf(pair.getResponseFuture().get(), pair.getRequest());
channel.writeAndFlush(result);
} catch (Exception e) {
// should not comes here.
log.error("error to get Response ByteBuf:", e);
}
}
}
@Test
public void testNormalJoinGroupLeader() {
final String consumerId = LEADER_ID;
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
// normal join group
Map<String, KarelDbIdentity> memberInfo = Collections.singletonMap(consumerId, LEADER_INFO);
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberInfo, Errors.NONE));
SyncGroupResponse syncGroupResponse = syncGroupResponse(
KarelDbProtocol.Assignment.NO_ERROR,
consumerId,
LEADER_INFO,
Errors.NONE
);
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
SyncGroupRequest sync = (SyncGroupRequest) body;
return sync.data.memberId().equals(consumerId) &&
sync.data.generationId() == 1 &&
sync.groupAssignments().containsKey(consumerId);
}
}, syncGroupResponse);
coordinator.ensureActiveGroup();
assertFalse(coordinator.rejoinNeededOrPending());
assertEquals(0, rebalanceListener.revokedCount);
assertEquals(1, rebalanceListener.assignedCount);
assertFalse(rebalanceListener.assignments.get(0).failed());
assertEquals(consumerId, rebalanceListener.assignments.get(0).leader());
assertEquals(LEADER_INFO, rebalanceListener.assignments.get(0).leaderIdentity());
}
@Test
public void testJoinGroupLeaderNoneEligible() {
final String consumerId = LEADER_ID;
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
Map<String, KarelDbIdentity> memberInfo = Collections.singletonMap(
consumerId,
INELIGIBLE_LEADER_INFO
);
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberInfo, Errors.NONE));
SyncGroupResponse syncGroupResponse = syncGroupResponse(
KarelDbProtocol.Assignment.NO_ERROR,
null,
null,
Errors.NONE
);
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
SyncGroupRequest sync = (SyncGroupRequest) body;
return sync.data.memberId().equals(consumerId) &&
sync.data.generationId() == 1 &&
sync.groupAssignments().containsKey(consumerId);
}
}, syncGroupResponse);
coordinator.ensureActiveGroup();
assertFalse(coordinator.rejoinNeededOrPending());
assertEquals(0, rebalanceListener.revokedCount);
assertEquals(1, rebalanceListener.assignedCount);
// No leader isn't considered a failure
assertFalse(rebalanceListener.assignments.get(0).failed());
assertNull(rebalanceListener.assignments.get(0).leader());
assertNull(rebalanceListener.assignments.get(0).leaderIdentity());
}
@Test
public void testJoinGroupLeaderDuplicateUrls() {
final String consumerId = LEADER_ID;
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
Map<String, KarelDbIdentity> memberInfo = new HashMap<>();
// intentionally duplicate info to get duplicate URLs
memberInfo.put(LEADER_ID, LEADER_INFO);
memberInfo.put(MEMBER_ID, LEADER_INFO);
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberInfo, Errors.NONE));
SyncGroupResponse syncGroupResponse = syncGroupResponse(
KarelDbProtocol.Assignment.DUPLICATE_URLS,
null,
null,
Errors.NONE
);
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
SyncGroupRequest sync = (SyncGroupRequest) body;
return sync.data.memberId().equals(consumerId) &&
sync.data.generationId() == 1 &&
sync.groupAssignments().containsKey(consumerId);
}
}, syncGroupResponse);
coordinator.ensureActiveGroup();
assertFalse(coordinator.rejoinNeededOrPending());
assertEquals(0, rebalanceListener.revokedCount);
assertEquals(1, rebalanceListener.assignedCount);
assertTrue(rebalanceListener.assignments.get(0).failed());
assertNull(rebalanceListener.assignments.get(0).leader());
assertNull(rebalanceListener.assignments.get(0).leaderIdentity());
}
@Test
public void testNormalJoinGroupFollower() {
final String consumerId = MEMBER_ID;
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
// normal join group
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, LEADER_ID, Errors.NONE));
SyncGroupResponse syncGroupResponse = syncGroupResponse(
KarelDbProtocol.Assignment.NO_ERROR,
LEADER_ID,
LEADER_INFO,
Errors.NONE
);
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
SyncGroupRequest sync = (SyncGroupRequest) body;
return sync.data.memberId().equals(consumerId) &&
sync.data.generationId() == 1 &&
sync.groupAssignments().isEmpty();
}
}, syncGroupResponse);
coordinator.ensureActiveGroup();
assertFalse(coordinator.rejoinNeededOrPending());
assertEquals(0, rebalanceListener.revokedCount);
assertEquals(1, rebalanceListener.assignedCount);
assertFalse(rebalanceListener.assignments.get(0).failed());
assertEquals(LEADER_ID, rebalanceListener.assignments.get(0).leader());
assertEquals(LEADER_INFO, rebalanceListener.assignments.get(0).leaderIdentity());
}
protected void handleInactive(KafkaHeaderAndRequest kafkaHeaderAndRequest,
CompletableFuture<AbstractResponse> resultFuture) {
AbstractRequest request = kafkaHeaderAndRequest.getRequest();
AbstractResponse apiResponse = request.getErrorResponse(new LeaderNotAvailableException("Channel is closing!"));
log.error("Kafka API {} is send to a closing channel", kafkaHeaderAndRequest.getHeader().apiKey());
resultFuture.complete(apiResponse);
}
KafkaHeaderAndRequest(RequestHeader header,
AbstractRequest request,
ByteBuf buffer,
SocketAddress remoteAddress) {
this.header = header;
this.request = request;
this.buffer = buffer.retain();
this.remoteAddress = remoteAddress;
}
private KafkaHeaderAndRequest createFetchRequest(int maxResponseBytes,
int maxPartitionBytes,
List<TopicPartition> topicPartitions,
Map<TopicPartition, Long> offsetMap) {
AbstractRequest.Builder builder = FetchRequest.Builder
.forConsumer(Integer.MAX_VALUE, 0, createPartitionMap(maxPartitionBytes, topicPartitions, offsetMap))
.setMaxBytes(maxResponseBytes);
return buildRequest(builder);
}
Request(ChannelHandlerContext ctx, ByteBuffer buffer) {
this.requestId = buffer.getShort();
buffer.rewind();
header = RequestHeader.parse(buffer);
if (header.apiKey() == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey(), header.apiVersion())) {
body = new ApiVersionsRequest();
} else {
body = AbstractRequest.getRequest(header.apiKey(), header.apiVersion(), buffer);
}
this.clientAddress = ((InetSocketAddress) ctx.getChannel().getRemoteAddress()).getAddress();
}
public AbstractRequest getRequest() {
return this.request;
}
private KafkaHeaderAndRequest createTopicMetadataRequest(List<String> topics) {
AbstractRequest.Builder builder = new MetadataRequest.Builder(topics, true);
return buildRequest(builder);
}
AbstractRequest getBody() {
return body;
}
public void sendApiRequest(final Node node, final AbstractRequest.Builder<?> requestBuilder){
this.clientResponse = this.networkClient.send(node, requestBuilder);
}
public ClientRequest newClientRequest(AbstractRequest.Builder<?> requestBuilder) {
if (client == null || node == null)
return null;
ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, 1000L, true);
return clientRequest;
}