下面列出了怎么用org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter的API类实例代码及写法,或者点击链接到github查看源代码。
@RequestMapping(value = "/{job:.*}/logStream", method = GET, produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseBodyEmitter getJobLogStream(@PathVariable("job") String job) {
JobInstance ji = jobsManager.getJob(job);
ExtendedAssert.notFound(ji, "Job was not found by id: " + job);
ResponseBodyEmitter emitter = new ResponseBodyEmitter(TimeUnit.MINUTES.toMillis(10L));
JobEventConsumer consumer = new JobEventConsumer(this.jobsManager, emitter, ji);
ji.atEnd().addListener(() -> {
// it need for job which finish before request
emitter.complete();
}, ExecutorUtils.DIRECT);
// TODO we may want to consume history, also.
Subscription subs = jobsManager.getSubscriptions().openSubscriptionOnKey(consumer, ji.getInfo());
emitter.onCompletion(() -> {
// Emitter not invoke this at client disconnect,
// may be it will be fix in future versions
subs.close();
});
return emitter;
}
@GetMapping(Constants.API_RBE)
public ResponseEntity<ResponseBodyEmitter> handleRbe() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
nonBlockingService.execute(() -> {
try {
emitter.send(Constants.API_RBE_MSG + " @ " + new Date(), MediaType.TEXT_PLAIN);
emitter.complete();
} catch (Exception ex) {
System.out.println(Constants.GENERIC_EXCEPTION);
emitter.completeWithError(ex);
}
});
return new ResponseEntity(emitter, HttpStatus.OK);
}
public ResponseBodyEmitterObserver(MediaType mediaType, ResponseBodyEmitter responseBodyEmitter) {
this.mediaType = mediaType;
this.responseBodyEmitter = responseBodyEmitter;
this.responseBodyEmitter.onTimeout(this);
this.responseBodyEmitter.onCompletion(this);
}
public ResponseBodyEmitterObserver(MediaType mediaType, ResponseBodyEmitter responseBodyEmitter) {
this.mediaType = mediaType;
this.responseBodyEmitter = responseBodyEmitter;
this.responseBodyEmitter.onTimeout(this);
this.responseBodyEmitter.onCompletion(this);
}
public ResponseBodyEmitterObserver(MediaType mediaType, Observable<T> observable, ResponseBodyEmitter responseBodyEmitter) {
this.mediaType = mediaType;
this.responseBodyEmitter = responseBodyEmitter;
this.responseBodyEmitter.onTimeout(this);
this.responseBodyEmitter.onCompletion(this);
observable.subscribe(this);
}
@RequestMapping(value = "/clusters/{cluster}/containers/update", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseBodyEmitter update(@PathVariable("cluster") String cluster,
@RequestBody UiUpdateContainers req) {
log.info("got scale update request: {}", req);
JobParameters params = createParametersString(cluster, req);
ResponseBodyEmitter emitter = new ResponseBodyEmitter(TimeUnit.MINUTES.toMillis(10L));
JobInstance jobInstance = jobsManager.create(params);
JobApi.JobEventConsumer consumer = new JobApi.JobEventConsumer(this.jobsManager, emitter, jobInstance);
jobsManager.getSubscriptions().subscribeOnKey(consumer, jobInstance.getInfo());
log.info("Try start job: {}", params);
jobInstance.start();
return emitter;
}
public JobEventConsumer(JobsManager jobsManager, ResponseBodyEmitter emitter, JobInstance jobInstance) {
this.jobsManager = jobsManager;
this.emitter = emitter;
this.jobInstance = jobInstance;
}