下面列出了io.reactivex.MaybeOnSubscribe#com.safframework.tony.common.utils.Preconditions 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public SpiderBean detail(String spiderName) {
if (Preconditions.isNotBlank(spiderName) && spiders.get(spiderName) != null) {
Spider spider = spiders.get(spiderName);
SpiderBean entity = new SpiderBean();
entity.setSpiderName(spiderName);
entity.setSpiderStatus(spider.getSpiderStatus());
entity.setLeftRequestSize(spider.getQueue().getLeftRequests(spiderName));
entity.setTotalRequestSize(spider.getQueue().getTotalRequests(spiderName));
entity.setConsumedRequestSize(entity.getTotalRequestSize() - entity.getLeftRequestSize());
entity.setQueueType(spider.getQueue().getClass().getSimpleName());
entity.setDownloaderType(spider.getDownloader().getClass().getSimpleName());
return entity;
}
return null;
}
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
log.info("定时任务开始");
log.info("jobName="+context.getJobDetail().getKey().getName());
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
Spider spider = (Spider) dataMap.get("spider");
Request[] requests = (Request[]) dataMap.get("requests");
if (spider!=null && Preconditions.isNotBlank(requests)) {
log.info("spiderName="+spider.getName());
if (spider.getSpiderStatus() == Spider.SPIDER_STATUS_INIT
|| spider.getSpiderStatus() == Spider.SPIDER_STATUS_STOPPED) {
spider.run();
} else if (spider.getSpiderStatus() == Spider.SPIDER_STATUS_PAUSE) {
spider.resume();
}
Stream.of(requests).forEach(request -> spider.getQueue().pushToRunninSpider(request,spider));
}
}
private Spider() {
String queueType = SpiderConfig.getInstance().getQueueType();
if (Preconditions.isNotBlank(queueType)) {
switch (queueType) {
case Constant.QUEUE_TYPE_DEFAULT:
this.queue = new DefaultQueue();
break;
case Constant.QUEUE_TYPE_DISRUPTOR:
this.queue = new DisruptorQueue();
break;
default:
break;
}
}
if (this.queue == null) {
this.queue = new DefaultQueue();
}
initSpiderConfig();
}
public Spider request(Request... requests) {
checkIfRunning();
if (Preconditions.isNotBlank(requests)) {
Arrays.asList(requests)
.stream()
.forEach(request -> {
queue.push(request.spiderName(name));
});
signalNewRequest();
}
return this;
}
@Override
public void process(ResultItems resultItems) {
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, Object> entry : resultItems.getAll().entrySet()) {
sb.append(entry.getValue()).append(",");
}
String[] ss = sb.toString().split(",");
if (Preconditions.isNotBlank(ss)) {
List<String> dataList = Arrays.asList(ss);
SpiderUtils.exportCsv(csvFile,dataList, Charset.forName("GBK"));
}
}
/**
* 方便在 pipeline 中往队列中发起爬取任务(进行深度爬取)
* @param spider
* @param originalRequest 原始的request,新的request可以继承原始request的header信息
* @param url
*/
public void push(Spider spider, Request originalRequest, String url) {
if (spider==null || originalRequest==null || Preconditions.isBlank(url)) {
return;
}
Request request = new Request(url,spider.getName()); // 根据spider的名称来创建request
Map<String,String> header = originalRequest.getHeader(); // 从原始request中获取header
if (Preconditions.isNotBlank(header)) { // 将原始request的header复制到新的request
header.forEach((key,value)->{
request.header(key,value);
});
}
spider.getQueue().push(request);
}
/**
* 导出csv
*
* @param file
* @param dataList
* @param charset
* @return
*/
public static boolean exportCsv(File file, List<String> dataList, Charset charset) {
boolean isSucess = false;
FileOutputStream out = null;
OutputStreamWriter osw = null;
BufferedWriter bw = null;
try {
out = new FileOutputStream(file);
osw = new OutputStreamWriter(out, charset);
bw = new BufferedWriter(osw);
if (Preconditions.isNotBlank(dataList)) {
for (String data : dataList) {
bw.append(data).append("\r");
}
}
isSucess = true;
} catch (Exception e) {
isSucess = false;
} finally {
IOUtils.closeQuietly(bw,osw,out);
}
return isSucess;
}
/**
* 保存单个cookie字符串
* @param request
* @param cookie
*/
public void saveCookie(Request request, String cookie) {
if (Preconditions.isNotBlank(cookie)) {
CookiesGroup cookiesGroup = CookiesPool.getInsatance().getCookieGroup(request.getUrlParser().getHost());
List<HttpCookie> httpCookieList = new ArrayList<>();
if (cookiesGroup ==null) {
cookiesGroup = new CookiesGroup(request.getUrlParser().getHost());
httpCookieList.addAll(HttpCookie.parse(cookie));
cookiesGroup.putAllCookies(httpCookieList);
CookiesPool.getInsatance().addCookieGroup(cookiesGroup);
} else {
httpCookieList.addAll(HttpCookie.parse(cookie));
cookiesGroup.putAllCookies(httpCookieList);
}
}
}
@Override
public List<String> parse(Document doc) {
List<String> urls = new ArrayList<>();
Elements links = doc.select("a[href]");
if (Preconditions.isNotBlank(links)) {
for (Element src : links) {
if (Preconditions.isNotBlank(src.attr("abs:href"))) {
String href = src.attr("abs:href");
log.info(href);
urls.add(href);
}
}
}
return urls;
}
/**
* 下载整个网页的全部图片
* @param url
*/
public void downloadWebPageImages(String url) {
if (Preconditions.isNotBlank(url)) {
isWebPage = true;
pageParser = new PicParser();
Flowable.just(url)
.map(s->httpManager.createHttpWithGet(s))
.map(response->parseHtmlToImages(response,(PicParser)pageParser))
.subscribe(urls -> downloadPics(urls),
throwable-> System.out.println(throwable.getMessage()));
}
}
/**
* 下载多个网页的全部图片
* @param urls
*/
public void downloadWebPageImages(List<String> urls) {
if (Preconditions.isNotBlank(urls)) {
isWebPage = true;
pageParser = new PicParser();
Flowable.fromIterable(urls)
.parallel()
.map(url->httpManager.createHttpWithGet(url))
.map(response->parseHtmlToImages(response,(PicParser)pageParser))
.sequential()
.subscribe(list -> downloadPics(list),
throwable-> System.out.println(throwable.getMessage()));
}
}
@Override
public boolean saveResourcePlan(ResourcePlan resourcePlan) {
boolean result = false;
if(resourcePlan.getAddTime() == 0) { //insert
resourcePlan.setAddTime(new Date().getTime());
resourcePlan.setModTime(new Date().getTime());
mongoTemplate.save(resourcePlan, Constant.COL_NAME_RESOURCE_PLAN);
result = Preconditions.isNotBlank(resourcePlan.getId());
} else { //update
Query query = new Query().addCriteria(Criteria.where("_id").is(resourcePlan.getId()));
Update update = new Update();
update.set("startPageNum", resourcePlan.getStartPageNum());
update.set("endPageNum", resourcePlan.getEndPageNum());
update.set("modTime", new Date().getTime());
WriteResult writeResult = mongoTemplate.updateFirst(query, update, Constant.COL_NAME_RESOURCE_PLAN);
result = writeResult!=null && writeResult.getN() > 0;
}
return result;
}
@Around("methodAnnotatedWithTrace() || constructorAnnotatedTrace()")
public Object traceMethod(final ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
Trace trace = methodSignature.getMethod().getAnnotation(Trace.class);
if (trace!=null && !trace.enable()) {
return joinPoint.proceed();
}
String className = methodSignature.getDeclaringType().getSimpleName();
String methodName = methodSignature.getName();
final StopWatch stopWatch = new StopWatch();
stopWatch.start();
Object result = joinPoint.proceed();
stopWatch.stop();
if (Preconditions.isBlank(className)) {
className = "Anonymous class";
}
L.i(className, buildLogMessage(methodName, stopWatch.getElapsedTime()));
return result;
}
@Override
public void onBindViewHolder(final GridViewFilterAdapter.ViewHolder holder, int position) {
String filterName = mList.get(position);
if (position == 0) {
holder.image.setImageBitmap(mBitmap);
} else {
if (Preconditions.isNotBlank(filterName)) {
CommonFilter filter = (CommonFilter)getFilter(filterName);
RxImageData.bitmap(mBitmap)
.addFilter(filter)
.into(holder.image);
}
}
holder.text.setText(filterName);
}
@Override
public void onBindViewHolder(final SpitalConvAdapter.ViewHolder holder, int position) {
if (position == 0) {
holder.image.setImageBitmap(mBitmap);
} else {
String filterName = mList.get(position);
if (Preconditions.isNotBlank(filterName)) {
CommonFilter filter = (CommonFilter)getFilter(filterName);
RxImageData.bitmap(mBitmap)
// .placeHolder(R.drawable.test_spital_conv)
.addFilter(filter)
.into(holder.image);
}
}
holder.text.setText(map.get(position));
}
@Override
public Maybe<Response> download(Request request) {
return Maybe.create(new MaybeOnSubscribe<String>(){
@Override
public void subscribe(MaybeEmitter emitter) throws Exception {
if (webDriver!=null) {
webDriver.get(request.getUrl());
if (Preconditions.isNotBlank(actions)) {
actions.forEach(
action-> action.perform(webDriver)
);
}
emitter.onSuccess(webDriver.getPageSource());
}
}
})
.compose(new DownloaderDelayTransformer(request))
.map(new Function<String, Response>() {
@Override
public Response apply(String html) throws Exception {
Response response = new Response();
response.setContent(html.getBytes());
response.setStatusCode(Constant.OK_STATUS_CODE);
response.setContentType(getContentType(webDriver));
return response;
}
});
}
public void subscribe(String topic,String tag) {
try {
consumer.subscribe(topic, tag);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
synchronized (this) {
if (Preconditions.isNotBlank(msgs)) {
ConcurrentLinkedQueue<MessageExt> messages = map.get(topic);
if (Preconditions.isNotBlank(messages)) {
messages.addAll(msgs);
} else {
ConcurrentLinkedQueue<MessageExt> queue = new ConcurrentLinkedQueue<MessageExt>();
queue.addAll(msgs);
map.put(topic,queue);
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
private boolean hasExtraRequestInfo(Request request) {
if (request == null) {
return false;
}
return Preconditions.isNotBlank(request.getHeader())
|| Preconditions.isNotBlank(request.getCharset())
|| Preconditions.isNotBlank(request.getExtras())
|| request.getPriority() > 0;
}
@Override
public synchronized Request poll(String spiderName) {
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisCommands<String, String> commands = connection.sync();
try {
String url = getRequest(commands, spiderName);
if (Preconditions.isBlank(url)) {
return null;
}
return getExtrasInItem(commands, url, spiderName);
} finally {
connection.close();
}
}
/**
* 停止所有的爬虫程序
*/
public void stopSpiders() {
if (Preconditions.isNotBlank(spiders)) {
spiders.forEach((s, spider) -> spider.stop());
}
}
/**
* 需要在启动 SpiderEngine 之前,启动 ProxyPool
*/
public void startProxyPool(Map<String, Class> proxyMap) {
if (Preconditions.isNotBlank(proxyMap)) {
ProxyPool.proxyMap = proxyMap;
ProxyManager proxyManager = ProxyManager.get();
proxyManager.start();
}
}
@Override
public void push(String spiderName, String url) {
if (Preconditions.isNotBlank(spiderName) && spiders.get(spiderName)!=null) {
Spider spider = spiders.get(spiderName);
spider.getQueue().pushToRunninSpider(url,spider);
}
}
public EtcdWatchManager(String etcdStr, String etcdPath) {
if (Preconditions.isNotBlank(etcdStr)) {
client = Client.builder().endpoints(etcdStr).build();
}
if (Preconditions.isBlank(etcdPath)) {
this.path = Constant.DEFAULT_REGISTRY_PATH;
} else {
this.path = etcdPath;
}
vertx = Vertx.vertx();
}
public ZooKeeperWatchManager(String zkStr, String zkPath) {
if (Preconditions.isNotBlank(zkStr)) {
log.info("zkStr: {}", zkStr);
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.newClient(zkStr, retryPolicy);
// 在start方法之后书写具体的操作
client.start();
try {
if (Preconditions.isBlank(zkPath)) {
this.path = Constant.DEFAULT_REGISTRY_PATH;
} else {
this.path = zkPath;
}
Stat stat = client.checkExists().forPath(zkPath);
if (stat==null) {
client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkPath);
}
znodes = client.getChildren().usingWatcher(this).forPath(zkPath);
} catch (Exception e) {
e.printStackTrace();
}
if (Preconditions.isNotBlank(znodes)) {
znodes.forEach(node->{
stateMap.put(node, SpiderEngineState.ONLINE);
});
}
vertx = Vertx.vertx();
}
}
@Override
public String select(Element element) {
List<Element> elements = selectElements(element);
if (Preconditions.isBlank(elements)) {
return null;
}
return getValue(elements.get(0));
}
@Override
public List<String> selectList(Element doc) {
List<String> strings = new ArrayList<String>();
List<Element> elements = selectElements(doc);
if (Preconditions.isNotBlank(elements)) {
for (Element element : elements) {
String value = getValue(element);
if (value != null) {
strings.add(value);
}
}
}
return strings;
}
@Override
public Element selectElement(Element element) {
Elements elements = element.select(selectorText);
if (Preconditions.isNotBlank(elements)) {
return elements.get(0);
}
return null;
}
@Override
public Element selectElement(Element element) {
List<Element> elements = selectElements(element);
if (Preconditions.isNotBlank(elements)){
return elements.get(0);
}
return null;
}
@Override
public String get() {
if (Preconditions.isNotBlank(all())) {
return all().get(0);
} else {
return null;
}
}
private void compileRegex(String regexStr) {
if (Preconditions.isBlank(regexStr)) {
throw new IllegalArgumentException("regex must not be empty");
}
try {
this.regex = Pattern.compile(regexStr, Pattern.DOTALL | Pattern.CASE_INSENSITIVE);
this.regexStr = regexStr;
} catch (PatternSyntaxException e) {
throw new IllegalArgumentException("invalid regex "+regexStr, e);
}
}