类io.reactivex.FlowableOnSubscribe源码实例Demo

下面列出了怎么用io.reactivex.FlowableOnSubscribe的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: AcgClub   文件: ZeroFiveNewsDetailModel.java
@Override
public Flowable<ZeroFiveNewsDetail> getAcgNewsDetail(final String url) {
  return Flowable.create(new FlowableOnSubscribe<ZeroFiveNewsDetail>() {
    @Override
    public void subscribe(@NonNull FlowableEmitter<ZeroFiveNewsDetail> e) throws Exception {
      Element html = Jsoup.connect(url).get();
      if (html == null) {
        e.onError(new Throwable("element html is null"));
      } else {
        ZeroFiveNewsDetail zeroFiveNewsDetail = JP.from(html, ZeroFiveNewsDetail.class);
        e.onNext(zeroFiveNewsDetail);
        e.onComplete();
      }
    }
  }, BackpressureStrategy.BUFFER);
}
 
源代码2 项目: AcgClub   文件: ZeroFiveNewsModel.java
@Override
public Flowable<ZeroFiveNewsPage> getAcgNews(final String typeUrl) {
  return Flowable.create(new FlowableOnSubscribe<ZeroFiveNewsPage>() {
    @Override
    public void subscribe(@NonNull FlowableEmitter<ZeroFiveNewsPage> e) throws Exception {
      Element html = Jsoup.connect(typeUrl).get();
      if(html == null){
        e.onError(new Throwable("element html is null"));
      }else {
        ZeroFiveNewsPage zeroFiveNewsPage = JP.from(html, ZeroFiveNewsPage.class);
        e.onNext(zeroFiveNewsPage);
        e.onComplete();
      }
    }
  }, BackpressureStrategy.BUFFER);
}
 
源代码3 项目: AcgClub   文件: ScheduleDetailModel.java
@Override
public Flowable<ScheduleDetail> getScheduleDetail(final String url) {
  return Flowable.create(new FlowableOnSubscribe<ScheduleDetail>() {
    @Override
    public void subscribe(@NonNull FlowableEmitter<ScheduleDetail> e) throws Exception {
      String scheduleLink = url;
      if (!url.contains("http")) {
        scheduleLink = HtmlConstant.YHDM_M_URL + url;
      }
      Element html = Jsoup.connect(scheduleLink).get();
      if (html == null) {
        e.onError(new Throwable("element html is null"));
      } else {
        ScheduleDetail scheduleDetail = JP.from(html, ScheduleDetail.class);
        e.onNext(scheduleDetail);
        e.onComplete();
      }
    }
  }, BackpressureStrategy.BUFFER);
}
 
源代码4 项目: AcgClub   文件: ScheduleNewModel.java
@Override
public Flowable<ScheduleNew> getScheduleNew(final String url) {
  return Flowable.create(new FlowableOnSubscribe<ScheduleNew>() {
    @Override
    public void subscribe(@NonNull FlowableEmitter<ScheduleNew> e) throws Exception {
      Element html = Jsoup.connect(url).get();
      if (html == null) {
        e.onError(new Throwable("element html is null"));
      } else {
        ScheduleNew scheduleNew = JP.from(html, ScheduleNew.class);
        e.onNext(scheduleNew);
        e.onComplete();
      }
    }
  }, BackpressureStrategy.BUFFER);
}
 
源代码5 项目: AcgClub   文件: ScheduleOtherModel.java
@Override
public Flowable<ScheduleOtherPage> getScheduleOtherPage(final String url) {
  return Flowable.create(new FlowableOnSubscribe<ScheduleOtherPage>() {
    @Override
    public void subscribe(@NonNull FlowableEmitter<ScheduleOtherPage> e) throws Exception {
      Element html = Jsoup.connect(url).get();
      if (html == null) {
        e.onError(new Throwable("element html is null"));
      } else {
        ScheduleOtherPage scheduleOtherPage = JP.from(html, ScheduleOtherPage.class);
        e.onNext(scheduleOtherPage);
        e.onComplete();
      }
    }
  }, BackpressureStrategy.BUFFER);
}
 
源代码6 项目: AcgClub   文件: ScheduleVideoModel.java
@Override
public Flowable<ScheduleVideo> getScheduleVideo(final String url) {
  return Flowable.create(new FlowableOnSubscribe<ScheduleVideo>() {
    @Override
    public void subscribe(@NonNull FlowableEmitter<ScheduleVideo> e) throws Exception {
      Element html = Jsoup.connect(url).get();
      if(html == null){
        e.onError(new Throwable("element html is null"));
      }else {
        ScheduleVideo scheduleVideo = JP.from(html, ScheduleVideo.class);
        if (!TextUtils.isEmpty(scheduleVideo.getVideoHtml())) {
          scheduleVideo.setVideoUrl("http://tup.yhdm.tv/?m=1&vid=" + scheduleVideo.getVideoUrl());
        }
        /*StringBuilder scheduleVideoHtmlBuilder = new StringBuilder();
        scheduleVideoHtmlBuilder.append(HtmlConstant.SCHEDULE_VIDEO_CSS);
        scheduleVideoHtmlBuilder.append("<div class=\"player_main\" style=\"position: relative;\"> ");
        scheduleVideoHtmlBuilder.append(scheduleVideo.getVideoHtml());
        scheduleVideoHtmlBuilder.append("</div>");
        scheduleVideo.setVideoHtml(scheduleVideoHtmlBuilder.toString());*/
        e.onNext(scheduleVideo);
        e.onComplete();
      }
    }
  }, BackpressureStrategy.BUFFER);
}
 
private static TestSubscriber<InAppMessage> listenerToFlowable(FirebaseInAppMessaging instance) {
  Flowable<InAppMessage> listenerFlowable =
      Flowable.create(
          new FlowableOnSubscribe<InAppMessage>() {
            @Override
            public void subscribe(FlowableEmitter<InAppMessage> emitter) throws Exception {
              instance.setMessageDisplayComponent(
                  new FirebaseInAppMessagingDisplay() {
                    @Override
                    public void displayMessage(
                        InAppMessage inAppMessage,
                        FirebaseInAppMessagingDisplayCallbacks callbacks) {
                      emitter.onNext(inAppMessage);
                      Log.i("FIAM", "Putting callback for IAM " + inAppMessage.getCampaignName());
                      callbacksHashMap.put(inAppMessage.getCampaignId(), callbacks);
                    }
                  });
            }
          },
          BUFFER);
  return listenerFlowable.test();
}
 
源代码8 项目: RuntimePermission   文件: RxPermissions.java
/**
 * use only request with an empty array to request all manifest permissions
 */
public Flowable<PermissionResult> requestAsFlowable(final List<String> permissions) {
    return Flowable.create(new FlowableOnSubscribe<PermissionResult>() {
        @Override
        public void subscribe(final FlowableEmitter<PermissionResult> emitter) throws Exception {
            runtimePermission
                    .request(permissions)
                    .onResponse(new ResponseCallback() {
                        @Override
                        public void onResponse(PermissionResult result) {
                            if (result.isAccepted()) {
                                emitter.onNext(result);
                            } else {
                                emitter.onError(new Error(result));
                            }
                        }
                    }).ask();
        }
    }, BackpressureStrategy.LATEST);
}
 
源代码9 项目: CrazyDaily   文件: PhotoPickerDataRepository.java
@Override
public Flowable<MediaEntity.MediaResponseData> getMediaList(int imageOffset, int videoOffset, String bucketId) {
    if (TextUtils.isEmpty(bucketId)) {
        return Flowable.empty();
    }
    if (TextUtils.equals(bucketId, String.valueOf(Integer.MAX_VALUE))) {
        // 图片和视频
        return Flowable.create((FlowableOnSubscribe<MediaEntity.MediaResponseData>) e -> {
            e.onNext(handleImageAndVideoMediaList(imageOffset, videoOffset));
            e.onComplete();
        }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());
    } else if (TextUtils.equals(bucketId, String.valueOf(Integer.MIN_VALUE))) {
        // 所有视频
        return Flowable.create((FlowableOnSubscribe<MediaEntity.MediaResponseData>) e -> {
            e.onNext(handleVideoMediaList(imageOffset, videoOffset));
            e.onComplete();
        }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());
    } else {
        return Flowable.create((FlowableOnSubscribe<MediaEntity.MediaResponseData>) e -> {
            e.onNext(handleImageMediaList(imageOffset, videoOffset, bucketId));
            e.onComplete();
        }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());
    }
}
 
源代码10 项目: SAF-AOP   文件: AsyncAspect.java
private void asyncMethod(final ProceedingJoinPoint joinPoint) throws Throwable {

        Flowable.create(new FlowableOnSubscribe<Object>() {
                            @Override
                            public void subscribe(FlowableEmitter<Object> e) throws Exception {
                                Looper.prepare();
                                try {
                                    joinPoint.proceed();
                                } catch (Throwable throwable) {
                                    throwable.printStackTrace();
                                }
                                Looper.loop();
                            }
                        }
                , BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe();
    }
 
源代码11 项目: alchemy   文件: RxJava2Fetchable.java
@Override
public Flowable<T> flowable(BackpressureStrategy mode) {
    return Flowable.create(new FlowableOnSubscribe<T>() {
        @Override
        public void subscribe(FlowableEmitter<T> emitter) throws Exception {
            final CloseableIterator<T> iterator = mCallable.call();
            try {
                while (!emitter.isCancelled() && iterator.hasNext()) {
                    emitter.onNext(iterator.next());
                }
                emitter.onComplete();
            } finally {
                iterator.close();
            }
        }
    }, mode);
}
 
源代码12 项目: SeeWeather   文件: ChoiceCityActivity.java
/**
 * 查询全国所有的省,从数据库查询
 */
private void queryProvinces() {
    getToolbar().setTitle("选择省份");
    Flowable.create((FlowableOnSubscribe<String>) emitter -> {
        if (provincesList.isEmpty()) {
            provincesList.addAll(WeatherDB.loadProvinces(DBManager.getInstance().getDatabase()));
        }
        dataList.clear();
        for (Province province : provincesList) {
            emitter.onNext(province.mProName);
        }
        emitter.onComplete();
    }, BackpressureStrategy.BUFFER)
        .compose(RxUtil.ioF())
        .compose(RxUtil.activityLifecycleF(this))
        .doOnNext(proName -> dataList.add(proName))
        .doOnComplete(() -> {
            mProgressBar.setVisibility(View.GONE);
            currentLevel = LEVEL_PROVINCE;
            mAdapter.notifyDataSetChanged();
        })
        .subscribe();
}
 
源代码13 项目: SeeWeather   文件: ChoiceCityActivity.java
/**
 * 查询选中省份的所有城市,从数据库查询
 */
private void queryCities() {
    getToolbar().setTitle("选择城市");
    dataList.clear();
    mAdapter.notifyDataSetChanged();

    Flowable.create((FlowableOnSubscribe<String>) emitter -> {
        cityList = WeatherDB.loadCities(DBManager.getInstance().getDatabase(), selectedProvince.mProSort);
        for (City city : cityList) {
            emitter.onNext(city.mCityName);
        }
        emitter.onComplete();
    }, BackpressureStrategy.BUFFER)
        .compose(RxUtil.ioF())
        .compose(RxUtil.activityLifecycleF(this))
        .doOnNext(proName -> dataList.add(proName))
        .doOnComplete(() -> {
            currentLevel = LEVEL_CITY;
            mAdapter.notifyDataSetChanged();
            mRecyclerView.smoothScrollToPosition(0);
        })
        .subscribe();
}
 
源代码14 项目: RxCache   文件: RxCache.java
public <T> Flowable<CacheResult<T>> load2Flowable(final String key, final Type type, BackpressureStrategy backpressureStrategy) {
    return Flowable.create(new FlowableOnSubscribe<CacheResult<T>>() {
        @Override
        public void subscribe(FlowableEmitter<CacheResult<T>> flowableEmitter) throws Exception {
            CacheResult<T> load = cacheCore.load(getMD5MessageDigest(key), type);
            if (!flowableEmitter.isCancelled()) {
                if (load != null) {
                    flowableEmitter.onNext(load);
                    flowableEmitter.onComplete();
                } else {
                    flowableEmitter.onError(new NullPointerException("Not find the key corresponding to the cache"));
                }
            }
        }
    }, backpressureStrategy);
}
 
源代码15 项目: actor4j-core   文件: ActorMessageFlowable.java
public static Flowable<ActorMessage<?>> getMessages(final Queue<ActorMessage<?>> stash) {
	return Flowable.create(new FlowableOnSubscribe<ActorMessage<?>>() {
		@Override
		public void subscribe(FlowableEmitter<ActorMessage<?>> emitter) throws Exception {
			try {
				ActorMessage<?> message;
				for (int i=0; !emitter.isCancelled() && i<emitter.requested() && (message=stash.poll())!=null; i++) 
					emitter.onNext(message);
				
				if (emitter.isCancelled())
					return;
				else
					emitter.onComplete();;
			}
			catch (Exception e) {
				emitter.onError(e);
			}
		}
		
	}, BackpressureStrategy.BUFFER);
}
 
/**
 * Send chat message to the server.
 *
 * @param chatMessage
 * @return
 */
@Override
public Flowable<ChatMessage> sendMessage(@NonNull final ChatMessage chatMessage) {
    return Flowable.create(new FlowableOnSubscribe<ChatMessage>() {
        @Override
        public void subscribe(FlowableEmitter<ChatMessage> emitter) throws Exception {
            /*
             * Socket.io supports acking messages.
             * This feature can be used as
             * mSocket.emit("EVENT_NEW_MESSAGE", chatMessage.getMessage(), new Ack() {
             *   @Override
             *   public void call(Object... args) {
             *       // Do something with args
             *
             *       // On success
             *       emitter.onNext(chatMessage);
             *
             *       // On error
             *       emitter.onError(new Exception("Sending message failed."));
             *    }
             * });
             *
             * */

            mSocket.emit(EVENT_NEW_MESSAGE, chatMessage.getMessage());
            emitter.onNext(chatMessage);
        }
    }, BackpressureStrategy.BUFFER);
}
 
源代码17 项目: RxBus   文件: RxBus.java
private <T> void subscribe(final Object subscriber,
                           final String tag,
                           final boolean isSticky,
                           final Scheduler scheduler,
                           final Callback<T> callback) {
    Utils.requireNonNull(subscriber, tag, callback);

    final Class<T> typeClass = Utils.getTypeClassFromParadigm(callback);

    final Consumer<T> onNext = new Consumer<T>() {
        @Override
        public void accept(T t) {
            callback.onEvent(t);
        }
    };

    if (isSticky) {
        final TagMessage stickyEvent = CacheUtils.getInstance().findStickyEvent(typeClass, tag);
        if (stickyEvent != null) {
            Flowable<T> stickyFlowable = Flowable.create(new FlowableOnSubscribe<T>() {
                @Override
                public void subscribe(FlowableEmitter<T> emitter) {
                    emitter.onNext(typeClass.cast(stickyEvent.mEvent));
                }
            }, BackpressureStrategy.LATEST);
            if (scheduler != null) {
                stickyFlowable = stickyFlowable.observeOn(scheduler);
            }
            Disposable stickyDisposable = FlowableUtils.subscribe(stickyFlowable, onNext, mOnError);
            CacheUtils.getInstance().addDisposable(subscriber, stickyDisposable);
        } else {
            Utils.logW("sticky event is empty.");
        }
    }
    Disposable disposable = FlowableUtils.subscribe(
            toFlowable(typeClass, tag, scheduler), onNext, mOnError
    );
    CacheUtils.getInstance().addDisposable(subscriber, disposable);
}
 
源代码18 项目: AcgClub   文件: ScheduleMainModel.java
@Override
public Flowable<DilidiliInfo> getDilidiliInfo() {
  return Flowable.create(new FlowableOnSubscribe<DilidiliInfo>() {
    @Override
    public void subscribe(@NonNull FlowableEmitter<DilidiliInfo> e) throws Exception {
      Element html = Jsoup.connect(HtmlConstant.YHDM_M_URL).get();
      if (html == null) {
        e.onError(new Throwable("element html is null"));
      } else {
        DilidiliInfo dilidiliInfo = JP.from(html, DilidiliInfo.class);
        /*Iterator<ScheduleWeek> scheudleWeekIterator = dilidiliInfo.getScheduleWeek().iterator();
        while (scheudleWeekIterator.hasNext()) {
          ScheduleWeek scheduleWeek = scheudleWeekIterator.next();
          Iterator<ScheduleWeek.ScheduleItem> scheduleItemIterator = scheduleWeek
              .getScheduleItems().iterator();
          while (scheduleItemIterator.hasNext()) {
            ScheduleWeek.ScheduleItem scheduleItem = scheduleItemIterator.next();
            if (scheduleItem.getAnimeLink().contains("www.005.tv")) {
              scheduleItemIterator.remove();
            }
          }
        }
        Iterator<ScheduleBanner> scheudleBannerIterator = dilidiliInfo.getScheduleBanners()
            .iterator();
        while (scheudleBannerIterator.hasNext()) {
          ScheduleBanner scheudleBanner = scheudleBannerIterator.next();
          if (TextUtils.isEmpty(scheudleBanner.getImgUrl()) |
              TextUtils.isEmpty(scheudleBanner.getAnimeLink()) |
              !scheudleBanner.getAnimeLink().contains("anime")) {
            scheudleBannerIterator.remove();
          }
        }*/
        e.onNext(dilidiliInfo);
        e.onComplete();
      }
    }
  }, BackpressureStrategy.BUFFER);
}
 
源代码19 项目: AcgClub   文件: RxRealmUtils.java
public static <T> Flowable<T> flowableExec(final RealmConfiguration configuration,
    final Consumer<Pair<FlowableEmitter, Realm>> emitter) {
  return Flowable.create(new FlowableOnSubscribe<T>() {
    @Override
    public void subscribe(FlowableEmitter<T> e) throws Exception {
      try (Realm realm = Realm.getInstance(configuration)) {
        emitter.accept(new Pair<FlowableEmitter, Realm>(e, realm));
      }
    }
  }, BackpressureStrategy.BUFFER);
}
 
源代码20 项目: AcgClub   文件: RxUtil.java
/**
 * 生成Flowable
 */
public static <T> Flowable<T> createData(final T t) {
  return Flowable.create(new FlowableOnSubscribe<T>() {
    @Override
    public void subscribe(FlowableEmitter<T> emitter) throws Exception {
      try {
        emitter.onNext(t);
        emitter.onComplete();
      } catch (Exception e) {
        emitter.onError(e);
      }
    }
  }, BackpressureStrategy.BUFFER);
}
 
源代码21 项目: ShareLoginPayUtil   文件: WxShareInstance.java
@SuppressLint("CheckResult")
private void shareFunc(final int platform, final String title, final String targetUrl
        , final String summary, final String miniId, final String miniPath
        , final ShareImageObject shareImageObject
        , final Activity activity, final ShareListener listener) {

    mShareFunc = Flowable.create(new FlowableOnSubscribe<byte[]>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<byte[]> emitter) {
            try {
                String imagePath = ImageDecoder.decode(activity, shareImageObject);
                emitter.onNext(ImageDecoder.compress2Byte(imagePath, TARGET_SIZE, THUMB_SIZE));
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<byte[]>() {
                @Override
                public void accept(@NonNull byte[] bytes) {
                    handleShareWx(platform, title, targetUrl, summary
                            , bytes, miniId, miniPath, listener);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) {
                    listener.shareFailure(new Exception(throwable));
                    recycle();
                    activity.finish();
                }
            });
}
 
源代码22 项目: ShareLoginPayUtil   文件: WxShareInstance.java
@SuppressLint("CheckResult")
@Override
public void shareImage(final int platform, final ShareImageObject shareImageObject,
                       final Activity activity, final ShareListener listener) {

    mShareImage = Flowable.create(new FlowableOnSubscribe<Pair<Bitmap, byte[]>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<Pair<Bitmap, byte[]>> emitter) {
            try {
                String imagePath = ImageDecoder.decode(activity, shareImageObject);
                emitter.onNext(Pair.create(BitmapFactory.decodeFile(imagePath),
                        ImageDecoder.compress2Byte(imagePath, TARGET_SIZE, THUMB_SIZE)));
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Pair<Bitmap, byte[]>>() {
                @Override
                public void accept(@NonNull Pair<Bitmap, byte[]> pair) {
                    WXImageObject imageObject = new WXImageObject(pair.first);

                    WXMediaMessage message = new WXMediaMessage();
                    message.mediaObject = imageObject;
                    message.thumbData = pair.second;

                    sendMessage(platform, message, buildTransaction("image"));
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) {
                    listener.shareFailure(new Exception(throwable));
                    recycle();
                    activity.finish();
                }
            });
}
 
源代码23 项目: ShareLoginPayUtil   文件: WeiboShareInstance.java
@SuppressLint("CheckResult")
private void shareTextOrImage(final ShareImageObject shareImageObject, final String title, final String targetUrl, final String summary,
                              final Activity activity, final ShareListener listener) {

    mShareImage = Flowable.create(new FlowableOnSubscribe<Pair<String, byte[]>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<Pair<String, byte[]>> emitter) {
            try {
                String path = ImageDecoder.decode(activity, shareImageObject);
                emitter.onNext(Pair.create(path, ImageDecoder.compress2Byte(path, TARGET_SIZE, TARGET_LENGTH)));
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Pair<String, byte[]>>() {
                @Override
                public void accept(@NonNull Pair<String, byte[]> stringPair) {
                    handleShare(stringPair, title, targetUrl, summary, listener);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) {
                    listener.shareFailure(new Exception(throwable.getMessage()));
                    recycle();
                    activity.finish();
                }
            });
}
 
源代码24 项目: ShareLoginPayUtil   文件: WeiboShareInstance.java
@SuppressLint("CheckResult")
private void shareTextOrImage(final Pair<String, byte[]> shareImageObject, final String title, final String targetUrl, final String summary,
                              final Activity activity, final ShareListener listener) {

    mShareImage = Flowable.create(new FlowableOnSubscribe<Pair<String, byte[]>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<Pair<String, byte[]>> emitter) {
            try {
                emitter.onNext(shareImageObject);
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Pair<String, byte[]>>() {
                @Override
                public void accept(@NonNull Pair<String, byte[]> stringPair) {
                    handleShare(stringPair, title, targetUrl, summary, listener);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) {
                    listener.shareFailure(new Exception(throwable));
                    recycle();
                    activity.finish();
                }
            });
}
 
源代码25 项目: ShareLoginPayUtil   文件: QQShareInstance.java
@SuppressLint("CheckResult")
private void shareFunc(final int platform, final String title, final String targetUrl, final String summary,
                       final ShareImageObject shareImageObject, final boolean immediate, final Activity activity, final ShareListener listener) {
    mShareFunc = Flowable.create(new FlowableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<String> emitter) {
            try {
                if (immediate) {
                    emitter.onNext(shareImageObject.getPathOrUrl());
                } else {
                    emitter.onNext(ImageDecoder.decode(activity, shareImageObject));
                }
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(@NonNull String s) {
                    if (platform == SharePlatform.QZONE) {
                        shareToQZoneForMedia(title, targetUrl, summary, s, activity, listener);
                    } else {
                        shareToQQForMedia(title, summary, targetUrl, s, activity, listener);
                    }
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) throws Exception {
                    listener.shareFailure(new Exception(throwable));
                    recycle();
                    activity.finish();
                }
            });
}
 
源代码26 项目: ShareLoginPayUtil   文件: QQShareInstance.java
@SuppressLint("CheckResult")
@Override
public void shareImage(final int platform, final ShareImageObject shareImageObject,
                       final Activity activity, final ShareListener listener) {
    mShareImage = Flowable.create(new FlowableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<String> emitter) throws Exception {
            try {
                emitter.onNext(ImageDecoder.decode(activity, shareImageObject));
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(@NonNull String localPath) throws Exception {
                    if (platform == SharePlatform.QZONE) {
                        shareToQzoneForImage(localPath, activity, listener);
                    } else {
                        shareToQQForImage(localPath, activity, listener);
                    }
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) throws Exception {
                    listener.shareFailure(new Exception(throwable));
                    recycle();
                    activity.finish();
                }
            });
}
 
源代码27 项目: ShareLoginPayUtil   文件: QQLoginInstance.java
@SuppressLint("CheckResult")
@Override
public void fetchUserInfo(final BaseToken token) {
    mSubscribe = Flowable.create(new FlowableOnSubscribe<QQUser>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<QQUser> qqUserEmitter) {
            OkHttpClient client = new OkHttpClient();
            Request request = new Request.Builder().url(buildUserInfoUrl(token, URL)).build();

            try {
                Response response = client.newCall(request).execute();
                JSONObject jsonObject = new JSONObject(response.body().string());
                QQUser user = QQUser.parse(token.getOpenid(), jsonObject);
                qqUserEmitter.onNext(user);
                qqUserEmitter.onComplete();
            } catch (IOException | JSONException e) {
                ShareLogger.e(ShareLogger.INFO.FETCH_USER_INOF_ERROR);
                qqUserEmitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<QQUser>() {
                @Override
                public void accept(@NonNull QQUser qqUser) {
                    mLoginListener.loginSuccess(
                            new LoginResultData(LoginPlatform.QQ, token, qqUser));
                    LoginUtil.recycle();
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) {
                    mLoginListener.loginFailure(new Exception(throwable), ShareLogger.INFO.ERR_FETCH_CODE);
                    LoginUtil.recycle();
                }
            });
}
 
源代码28 项目: ShareLoginPayUtil   文件: WeiboLoginInstance.java
@SuppressLint("CheckResult")
@Override
public void fetchUserInfo(final BaseToken token) {
    mSubscribe = Flowable.create(new FlowableOnSubscribe<WeiboUser>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<WeiboUser> weiboUserEmitter) {
            OkHttpClient client = new OkHttpClient();
            Request request = new Request.Builder().url(buildUserInfoUrl(token, USER_INFO)).build();
            try {
                Response response = client.newCall(request).execute();
                JSONObject jsonObject = new JSONObject(response.body().string());
                WeiboUser user = WeiboUser.parse(jsonObject);
                weiboUserEmitter.onNext(user);
                weiboUserEmitter.onComplete();
            } catch (IOException | JSONException e) {
                ShareLogger.e(ShareLogger.INFO.FETCH_USER_INOF_ERROR);
                weiboUserEmitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<WeiboUser>() {
                @Override
                public void accept(@NonNull WeiboUser weiboUser) throws Exception {
                    mLoginListener.loginSuccess(
                            new LoginResultData(LoginPlatform.WEIBO, token, weiboUser));
                    LoginUtil.recycle();
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) throws Exception {
                    mLoginListener.loginFailure(new Exception(throwable), ShareLogger.INFO.ERR_FETCH_CODE);
                    LoginUtil.recycle();
                }
            });
}
 
源代码29 项目: ShareLoginPayUtil   文件: TwitterLoginInstance.java
@SuppressLint("CheckResult")
@Override
public void fetchUserInfo(final BaseToken token) {
    mSubscribe = Flowable.create(new FlowableOnSubscribe<TwitterUser>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<TwitterUser> userEmitter) {

            TwitterApiClient apiClient = TwitterCore.getInstance().getApiClient();
            Call<User> userCall = apiClient.getAccountService().verifyCredentials(true, false, false);

            try {
                Response<User> execute = userCall.execute();
                userEmitter.onNext(new TwitterUser(execute.body()));
                userEmitter.onComplete();
            } catch (Exception e) {
                ShareLogger.e(ShareLogger.INFO.FETCH_USER_INOF_ERROR);
                userEmitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<TwitterUser>() {
                @Override
                public void accept(@NonNull TwitterUser user) {
                    mLoginListener.loginSuccess(new LoginResultData(LoginPlatform.TWITTER, token, user));
                    LoginUtil.recycle();
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) {
                    mLoginListener.loginFailure(new Exception(throwable), ShareLogger.INFO.ERR_FETCH_CODE);
                    LoginUtil.recycle();
                }
            });
}
 
源代码30 项目: ShareLoginPayUtil   文件: WxLoginInstance.java
@SuppressLint("CheckResult")
private void getToken(final String code) {
    mTokenSubscribe = Flowable.create(new FlowableOnSubscribe<WxToken>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<WxToken> wxTokenEmitter) {
            Request request = new Request.Builder().url(buildTokenUrl(code)).build();
            try {
                Response response = mClient.newCall(request).execute();
                JSONObject jsonObject = new JSONObject(response.body().string());
                WxToken token = new WxToken(jsonObject);
                wxTokenEmitter.onNext(token);
                wxTokenEmitter.onComplete();
            } catch (IOException | JSONException e) {
                wxTokenEmitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<WxToken>() {
                @Override
                public void accept(@NonNull WxToken wxToken) {
                    if (mFetchUserInfo) {
                        mLoginListener.beforeFetchUserInfo(wxToken);
                        fetchUserInfo(wxToken);
                    } else {
                        mLoginListener.loginSuccess(new LoginResultData(LoginPlatform.WX, wxToken));
                        LoginUtil.recycle();
                    }

                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) {
                    mLoginListener.loginFailure(new Exception(throwable.getMessage()), ShareLogger.INFO.ERR_GET_TOKEN_CODE);
                    LoginUtil.recycle();
                }
            });
}
 
 类所在包
 同包方法