下面列出了com.google.protobuf.AbstractMessageLite#org.apache.mesos.v1.scheduler.Protos.Event 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void readEvents_multipleEventsInOneChunk() throws Exception {
final List<Event> subHbOffer = newArrayList(
TestingProtos.SUBSCRIBED,
TestingProtos.HEARTBEAT,
TestingProtos.OFFER
);
final List<byte[]> eventChunks = subHbOffer.stream()
.map(AbstractMessageLite::toByteArray)
.map(RecordIOUtils::createChunk)
.collect(Collectors.toList());
final List<ByteBuf> singleChunk = newArrayList(Unpooled.copiedBuffer(concatAllChunks(eventChunks)));
final List<Event> events = runTestOnChunks(singleChunk);
assertThat(events).isEqualTo(subHbOffer);
}
@NotNull
static List<Event> runTestOnChunks(@NotNull final List<ByteBuf> chunks) {
final TestSubscriber<byte[]> child = new TestSubscriber<>();
final Subscriber<ByteBuf> call = new RecordIOOperator().call(child);
assertThat(call).isInstanceOf(RecordIOOperator.RecordIOSubscriber.class);
final RecordIOOperator.RecordIOSubscriber subscriber = (RecordIOOperator.RecordIOSubscriber) call;
chunks.stream().forEach(subscriber::onNext);
child.assertNoErrors();
child.assertNotCompleted();
child.assertNoTerminalEvent();
assertThat(subscriber.messageSizeBytesBuffer).isEmpty();
assertThat(subscriber.messageBytes).isNull();
assertThat(subscriber.remainingBytesForMessage).isEqualTo(0);
return CollectionUtils.listMap(child.getOnNextEvents(), (bs) -> {
try {
return Event.parseFrom(bs);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
});
}
@Test
public void test() {
final byte[] chunk = RecordIOUtils.createChunk(TestingProtos.SUBSCRIBED.toByteArray());
final List<byte[]> bytes = RecordIOOperatorTest.partitionIntoArraysOfSize(chunk, chunkSize);
final List<ByteBuf> chunks = CollectionUtils.listMap(bytes, Unpooled::copiedBuffer);
final List<Event> events = RecordIOOperatorTest.runTestOnChunks(chunks);
final List<Event.Type> eventTypes = CollectionUtils.listMap(events, Event::getType);
assertThat(eventTypes).isEqualTo(newArrayList(Event.Type.SUBSCRIBED));
}
@Test
public void readEvents_eventsNotSpanningMultipleChunks() throws Exception {
final List<ByteBuf> eventBufs = CollectionUtils.listMap(EVENT_CHUNKS, Unpooled::copiedBuffer);
final List<Event> events = runTestOnChunks(eventBufs);
assertThat(events).isEqualTo(EVENT_PROTOS);
}
@Test
public void readEvents_eventsSpanningMultipleChunks() throws Exception {
final byte[] allBytes = concatAllChunks(EVENT_CHUNKS);
final List<byte[]> arrayChunks = partitionIntoArraysOfSize(allBytes, 10);
final List<ByteBuf> bufChunks = CollectionUtils.listMap(arrayChunks, Unpooled::copiedBuffer);
final List<Event> events = runTestOnChunks(bufChunks);
assertThat(events).isEqualTo(EVENT_PROTOS);
}
private void initializeEventMetrics() {
// For variable named metrics that are keyed on mesos enums, this ensures that we set
// all possible metrics to 0.
for (Event.Type type : Event.Type.values()) {
this.counters.get(eventMetricNameCache.getUnchecked(type));
}
}
@Test
public void correctlyAbleToReadEventsFromEventsBinFile() throws Exception {
final InputStream inputStream = this.getClass().getResourceAsStream("/events.bin");
final List<ByteBuf> chunks = new ArrayList<>();
final byte[] bytes = new byte[100];
int read;
while ((read = inputStream.read(bytes)) != -1) {
chunks.add(Unpooled.copiedBuffer(bytes, 0, read));
}
final List<Event> events = runTestOnChunks(chunks);
final List<Event.Type> eventTypes = CollectionUtils.listMap(events, Event::getType);
assertThat(eventTypes).isEqualTo(newArrayList(
Event.Type.SUBSCRIBED,
Event.Type.HEARTBEAT,
Event.Type.OFFERS,
Event.Type.OFFERS,
Event.Type.OFFERS,
Event.Type.HEARTBEAT,
Event.Type.OFFERS,
Event.Type.OFFERS,
Event.Type.OFFERS,
Event.Type.HEARTBEAT,
Event.Type.OFFERS,
Event.Type.OFFERS,
Event.Type.HEARTBEAT,
Event.Type.OFFERS,
Event.Type.OFFERS,
Event.Type.HEARTBEAT,
Event.Type.OFFERS,
Event.Type.OFFERS,
Event.Type.OFFERS,
Event.Type.HEARTBEAT,
Event.Type.OFFERS,
Event.Type.OFFERS,
Event.Type.HEARTBEAT,
Event.Type.OFFERS,
Event.Type.OFFERS,
Event.Type.OFFERS,
Event.Type.HEARTBEAT,
Event.Type.OFFERS,
Event.Type.OFFERS,
Event.Type.OFFERS,
Event.Type.HEARTBEAT,
Event.Type.OFFERS,
Event.Type.OFFERS,
Event.Type.HEARTBEAT,
Event.Type.OFFERS,
Event.Type.OFFERS,
Event.Type.OFFERS,
Event.Type.HEARTBEAT,
Event.Type.OFFERS,
Event.Type.HEARTBEAT,
Event.Type.HEARTBEAT,
Event.Type.HEARTBEAT
));
}
@Override
public void heartbeat(Event event) {
long now = System.currentTimeMillis();
long delta = (now - lastHeartbeatTime.getAndSet(now));
LOG.debug("Heartbeat from mesos. Delta since last heartbeat is {}ms", delta);
}
@Override
public String load(Event.Type key) throws Exception {
return EVENT_COUNTER_STAT_PREFIX + key.name();
}
private void countEventMetrics(Event event) {
this.counters.get(eventMetricNameCache.getUnchecked(event.getType())).incrementAndGet();
}
@TimedInterceptor.Timed("scheduler_received")
@Override
public void received(Mesos mesos, Event event) {
countEventMetrics(event);
switch(event.getType()) {
case SUBSCRIBED:
Event.Subscribed subscribed = event.getSubscribed();
if (isRegistered.get()) {
handler.handleReregistration(subscribed.getMasterInfo());
} else {
isRegistered.set(true);
handler.handleRegistration(subscribed.getFrameworkId(), subscribed.getMasterInfo());
}
isSubscribed.set(true);
break;
case OFFERS:
checkState(isSubscribed.get(), "Must be registered before receiving offers.");
handler.handleOffers(event.getOffers().getOffersList());
break;
case RESCIND:
handler.handleRescind(event.getRescind().getOfferId());
break;
case INVERSE_OFFERS:
handler.handleInverseOffer(event.getInverseOffers().getInverseOffersList());
break;
case RESCIND_INVERSE_OFFER:
Protos.OfferID id = event.getRescindInverseOffer().getInverseOfferId();
LOG.warn("Ignoring rescinded inverse offer: {}", id);
break;
case UPDATE:
Protos.TaskStatus status = event.getUpdate().getStatus();
handler.handleUpdate(status);
break;
case MESSAGE:
Event.Message m = event.getMessage();
handler.handleMessage(m.getExecutorId(), m.getAgentId());
break;
case ERROR:
handler.handleError(event.getError().getMessage());
break;
case FAILURE:
Event.Failure failure = event.getFailure();
if (failure.hasExecutorId()) {
handler.handleLostExecutor(
failure.getExecutorId(),
failure.getAgentId(),
failure.getStatus());
} else {
handler.handleLostAgent(failure.getAgentId());
}
break;
// TODO(zmanji): handle HEARTBEAT in a graceful manner
// For now it is ok to silently ignore heart beats because the driver wil
// detect disconnections for us.
case HEARTBEAT:
break;
default:
LOG.warn("Unknown event from Mesos \n{}", event);
break;
}
}
/**
* Periodic message sent by the Mesos master according to
* 'Subscribed.heartbeat_interval_seconds'. If the scheduler does not
* receive any events (including heartbeats) for an extended period of time
* (e.g., 5 x heartbeat_interval_seconds), there is likely a network
* partition. In such a case the scheduler should close the existing
* subscription connection and resubscribe using a backoff strategy.
*/
public abstract void heartbeat(Event event);