RxJava2 实战知识梳理(5) - 简单及进阶的轮询操作
地址:http://www.jianshu.com/p/fa1828d70192
声明:本文是泽毛原创,已获其授权发布,未经原作者允许请勿转载
一、示例
1.1 应用场景
今天,我们介绍一种新的场景,轮询操作。也就是说,我们会尝试间隔一段时间就向服务器发起一次请求,在使用
RxJava
之前,该需求的实现一般有两种方式:
-
通过
Handler
发送延时消息,在handleMessage
中请求服务器之后,再次发送一个延时消息,直到达到循环次数为止。 -
使用
Java
提供的定时器Timer
。
我们尝试使用
RxJava2
提供的操作符来实现这一需求,这里演示两种方式的轮询,并将单次访问的次数限制在
5
次:
-
固定时延:使用
intervalRange
操作符,每间隔3s
执行一次任务。 -
变长时延:使用
repeatWhen
操作符实现,第一次执行完任务后,等待4s
再执行第二次任务,在第二次任务执行完成后,等待5s
,依次递增。
1.2 示例
public class PollingActivity extends AppCompatActivity {
private static final String TAG = PollingActivity.class.getSimpleName();
private TextView mTvSimple;
private TextView mTvAdvance;
private CompositeDisposable mCompositeDisposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_polling);
mTvSimple = (TextView) findViewById(R.id.tv_simple);
mTvSimple.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
startSimplePolling();
mTvAdvance = (TextView) findViewById(R.id.tv_advance);
mTvAdvance.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
startAdvancePolling();
mCompositeDisposable = new CompositeDisposable();
private void startSimplePolling() {
Log.d(TAG, "startSimplePolling");
Observable<Long> observable = Observable.intervalRange(0, 5, 0
, 3000, TimeUnit.MILLISECONDS).take(5).doOnNext(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
doWork();
DisposableObserver<Long> disposableObserver = getDisposableObserver();
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(disposableObserver);
mCompositeDisposable.add(disposableObserver);
private void startAdvancePolling() {
Log.d(TAG, "startAdvancePolling click");
Observable<Long> observable = Observable.just(0L)
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
doWork();
}).repeatWhen(new Function<Observable<Object>
, ObservableSource<Long>>() {
private long mRepeatCount;
@Override
public ObservableSource<Long> apply(Observable<Object> o
bjectObservable) throws Exception {
return objectObservable.flatMap(new Function<Object
, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Object o)
throws Exception {
if (++mRepeatCount > 4) {
return Observable.error(new
Throwable("Polling work finished"));
Log.d(TAG, "startAdvancePolling apply");
return Observable.timer(3000 + mRepeatCount * 1000
, TimeUnit.MILLISECONDS);
DisposableObserver<Long> disposableObserver = getDisposableObserver();
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(disposableObserver);
mCompositeDisposable.add(disposableObserver);
private DisposableObserver<Long> getDisposableObserver() {
return new DisposableObserver<Long>() {
@Override
public void onNext(Long aLong) {}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "DisposableObserver onError, threadId="
+ Thread.currentThread().getId() + ",reason=" + throwable.getMessage());
@Override
public void onComplete() {
Log.d(TAG, "DisposableObserver onComplete, threadId="
+ Thread.currentThread().getId());
private void doWork() {
long workTime = (long) (Math.random() * 500) + 500;
try {
Log.d(TAG, "doWork start, threadId=" +
Thread.currentThread().getId());
Thread.sleep(workTime);
Log.d(TAG, "doWork finished");
} catch (InterruptedException e) {
e.printStackTrace();
@Override
protected void onDestroy() {
super.onDestroy();
mCompositeDisposable.clear();