RxJava 由浅入深之 Single 串行执行 (一)
本文的代码是在IntellJ环境中测试的,可以直接copy过去运行看看
IntellJ新建Gradle工程,引入依赖
implementation "io.reactivex.rxjava3:rxjava:3.0.0"
1.简单用法
在RxJava
中,Single
可以描述只发送一次事件值的事件,如一次网络请求,它有2个可订阅接口,onSuccess
和onError
,在任何一个接口触发后会自动解除订阅关系。
创建Single:
Single<String> single = Single.create(emitter -> {
emitter.onSuccess("ok");
});
订阅single:
single.subscribe(System.out::println);
2.多个Single串行执行
首先创建多个Single
,task1()
和task2()
返回2个Single
:
public Single<String> task1() {
return Single.create((SingleOnSubscribe<String>) em -> {
em.onSuccess("task-1");
}).doOnSuccess(s -> {
System.out.println("发射 ->" + s);
});
}
public Single<String> task2() {
return Single.create((SingleOnSubscribe<String>) em -> {
em.onSuccess("task-2");
}).doOnSuccess(s -> {
System.out.println("发射 ->" + s);
});
}
声明Single
任务:
Single<String> single1 = task1();
Single<String> single2 = task2();
工作线程,可以使用Schedulers.io()
或其他调度器,这里用一个单线程线程池,方便在同一个线程中打印日志。
Executor executor = Executors.newSingleThreadExecutor();
场景一:无条件串行
Single.concat(single1, single2)
.subscribeOn(Schedulers.from(executors))
.subscribe(s -> {
System.out.println("接收到->" + s);
});
执行结果:
发射 ->task-1
接收到->task-1
发射 ->task-2
接收到->task-2
场景二:根据前一个Single
的结果,判断是否执行后一个任务
如先从数据库缓存中读取数据,如果没有再从网络进行请求。
2.1 条件判断操作符实现串行
Single.concat(single1, single2)
.subscribeOn(Schedulers.from(executors))
.takeUntil(s -> {
return !"".equals(s);
})
.filter(s -> {
return !"".equals(s);
})
.subscribe(s -> {
System.out.println("接收到->" + s);
});
执行结果:
发射 ->task-1
接收到->task-1
这里使用takeUntil
和filter
操作符实现条件判断和过滤不需要的请求结果。
takeUntil
在条件返回true时,不再执行下一个Single
,并直接返回结果;如果takeUntil
条件返回false,其发射的值在filter
操作符中被过滤掉,并继续执行下一个Single
。
提供默认返回值和异常处理 如果多个
Single
的执行结果都被过滤,则订阅者将收不到任何值,这时可以提供一个默认返回值;
使用single()
操作符提供结果为空时的默认返回值;
使用onErrorReturn()
操作符处理异常;
Single.concat(single1, single2)
.subscribeOn(Schedulers.from(executors))
.takeUntil(s -> {
return false;//此处测试全都执行
})
.filter(s -> {
return true;//此处测试全都不满足过滤条件的情况
})
.single("default-value")
.onErrorReturn(Throwable::getLocalizedMessage)
.subscribe(s -> {
System.out.println("接收到->" + s);
});
执行结果:
发射 ->task-1
发射 ->task-2
接收到->default
2.2 阻塞的方式实现串行 (不建议使用)
使用blockingGet
阻塞工作线程,该方式会使线程阻塞而非挂起,在线程完成前系统其他资源无法使用该线程。
这里很像Kotlin
中的协程写法,但是协程是非阻塞的,而这里是会阻塞当前线程。
String str1 = single1.subscribeOn(Schedulers.from(executors)).blockingGet();
if (!"success".equals(str1)) {
single2.subscribeOn(Schedulers.from(executors))
.subscribe(System.out::println);
}
执行结果:
发射 ->task-1
发射 ->task-2
task-2
2.3 flatMap
操作符实现串行
在flatMap的接收函数中 对Single1
的结果进行判断,如果不需要执行其他请求,直接使用Single.just()
将结果返回给订阅者,或者返回其他的Single
流程。
single1.flatMap(
s -> {
if (!s.contains("success"))
return single2;
else return Single.just(s);
}
).subscribe(System.out::println);
执行结果:
发射 ->task-1
发射 ->task-2
task-2
转载自:https://juejin.cn/post/7119443402046832670