likes
comments
collection
share

RxJava 由浅入深之 Single 串行执行 (一)

作者站长头像
站长
· 阅读数 71

本文的代码是在IntellJ环境中测试的,可以直接copy过去运行看看

IntellJ新建Gradle工程,引入依赖

implementation "io.reactivex.rxjava3:rxjava:3.0.0"

1.简单用法

RxJava中,Single可以描述只发送一次事件值的事件,如一次网络请求,它有2个可订阅接口,onSuccessonError,在任何一个接口触发后会自动解除订阅关系。

创建Single:

Single<String> single = Single.create(emitter -> {
    emitter.onSuccess("ok");
});

订阅single:

single.subscribe(System.out::println);

2.多个Single串行执行

首先创建多个Single,task1()task2() 返回2个Single

public Single<String> task1() {
    return Single.create((SingleOnSubscribe<String>) em -> {
        em.onSuccess("task-1");
    }).doOnSuccess(s -> {
        System.out.println("发射 ->" + s);
    });
}

public Single<String> task2() {
    return Single.create((SingleOnSubscribe<String>) em -> {
        em.onSuccess("task-2");
    }).doOnSuccess(s -> {
        System.out.println("发射 ->" + s);
    });
}

声明Single任务:

Single<String> single1 = task1();
Single<String> single2 = task2();

工作线程,可以使用Schedulers.io()或其他调度器,这里用一个单线程线程池,方便在同一个线程中打印日志。

Executor executor = Executors.newSingleThreadExecutor();

场景一:无条件串行

Single.concat(single1, single2)
        .subscribeOn(Schedulers.from(executors))
        .subscribe(s -> {
            System.out.println("接收到->" + s);
        });

执行结果:

发射 ->task-1
接收到->task-1
发射 ->task-2
接收到->task-2

场景二:根据前一个Single的结果,判断是否执行后一个任务

如先从数据库缓存中读取数据,如果没有再从网络进行请求。

2.1 条件判断操作符实现串行

Single.concat(single1, single2)
        .subscribeOn(Schedulers.from(executors))
        .takeUntil(s -> {
            return !"".equals(s);
        })
        .filter(s -> {
            return !"".equals(s);
        })
        .subscribe(s -> {
            System.out.println("接收到->" + s);
        });

执行结果:

发射 ->task-1
接收到->task-1

这里使用takeUntilfilter操作符实现条件判断和过滤不需要的请求结果。

takeUntil在条件返回true时,不再执行下一个Single,并直接返回结果;如果takeUntil条件返回false,其发射的值在filter操作符中被过滤掉,并继续执行下一个Single

提供默认返回值和异常处理 如果多个Single的执行结果都被过滤,则订阅者将收不到任何值,这时可以提供一个默认返回值;

使用single()操作符提供结果为空时的默认返回值;

使用onErrorReturn()操作符处理异常;

Single.concat(single1, single2)
        .subscribeOn(Schedulers.from(executors))
        .takeUntil(s -> {
            return false;//此处测试全都执行
        })
        .filter(s -> {
            return true;//此处测试全都不满足过滤条件的情况
        })
        .single("default-value")
        .onErrorReturn(Throwable::getLocalizedMessage)
        .subscribe(s -> {
            System.out.println("接收到->" + s);
        });

执行结果:

发射 ->task-1
发射 ->task-2
接收到->default

2.2 阻塞的方式实现串行 (不建议使用)

使用blockingGet阻塞工作线程,该方式会使线程阻塞而非挂起,在线程完成前系统其他资源无法使用该线程。

这里很像Kotlin中的协程写法,但是协程是非阻塞的,而这里是会阻塞当前线程。

String str1 = single1.subscribeOn(Schedulers.from(executors)).blockingGet();
if (!"success".equals(str1)) {
    single2.subscribeOn(Schedulers.from(executors))
            .subscribe(System.out::println);
}

执行结果:

发射 ->task-1
发射 ->task-2
task-2

2.3 flatMap操作符实现串行

在flatMap的接收函数中 对Single1的结果进行判断,如果不需要执行其他请求,直接使用Single.just()将结果返回给订阅者,或者返回其他的Single流程。

single1.flatMap(
        s -> {
            if (!s.contains("success"))
                return single2;
            else return Single.just(s);
        }
).subscribe(System.out::println);

执行结果:

发射 ->task-1
发射 ->task-2
task-2
转载自:https://juejin.cn/post/7119443402046832670
评论
请登录