Android RxJava源码学习
RxJava
基础
特点:基于事件流的链式调用,实现异步操作,逻辑简洁实用简单。
原理:基于一种扩展的观察者模式
角色 | 作用 |
---|---|
Observable(被观察者) | 产生事件 |
Observer(观察者) | 接收事件,给出响应 |
Subscribe(订阅) | 连接被观察者与观察者 |
Event(事件) | 被观察者&观察者 沟通的载体 |
被观察者 通过 订阅 按顺序发送 事件 给 观察者,观察者 按顺序接收事件并做出响应
举个例子
餐馆中客户点菜,则客户作为被观察者来产生事件,厨房作为观察者接收具体事件并对事件进行处理,菜肴为具体事件,服务员作为subscribe连接食客与厨房。
简单使用
- 创建被观察者,被观察者生产指定事件
val observable = Observable.create(object: ObservableOnSubscribe<Int> {
override fun subscribe(emitter: ObservableEmitter<Int>) {
emitter.onNext(1)
emitter.onNext(2)
emitter.onComplete()
}
})
// 自动创建
val observableX = Observable.just("1", "2", "3")
// 通过数组创建
val array = intArrayOf(1, 2, 3)
val fromArray = Observable.fromArray(array)
- 创建观察者并定义响应事件行为
// 实现Observer接口
val observer = object: Observer<Int>() {
override fun onSubscribe(d: Disposable) {}
override fun onError(e: Throwable) {}
override fun onComplete() {}
override fun onNext(t: Int) {}
}
// 采用Subscriber抽象类
val observer = object: Subscriber<Int>() {
override fun onSubscribe(d: Disposable) {}
override fun onError(e: Throwable) {}
override fun onComplete() {}
override fun onNext(t: Int) {}
}
使用Subscriber抽象类与Observer接口的区别
- 在RxJava subscribe过程中Observer总是会被转化为Subscriber
- Subscriber对Observer接口进行了扩展,新增额外两个方法
- onStart:还未响应事件前调用,做一些初始化工作
- unsubscribe:取消订阅
- 通过订阅连接被观察者和观察者
// 注意这里主语是被观察者,subscribe可以理解为被观察
observable.subscribe(observer)
基于事件流优雅实现订阅流程
val observable = Observable.create(object: ObservableOnSubscribe<Int> {
override fun subscribe(emitter: ObservableEmitter<Int>) {
emitter.onNext(1)
emitter.onNext(2)
emitter.onComplete()
}
}).subscribe(object : Observer<Int> {
override fun onSubscribe(d: Disposable) {}
override fun onError(e: Throwable) {}
override fun onComplete() {}
override fun onNext(t: Int) {}
})
在onSubscribe中可采用Disposable的dispose方法,切断观察者与被观察者的连接,但此时被观察者还是可以继续发送事件
原理
create
public static <@NonNull T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
}
// 一般返回source,即ObservableCreate
public static <@NonNull T> @NonNull Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
return f != null ? (Observable)apply(f, source) : source;
}
即最终创建的是ObservableCreate对象,继承于抽象类Observable
subscribe
public final void subscribe(@NonNull Observer<? super T> observer) {
Objects.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
// 核心方法
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
RxJavaPlugins.onError(e);
}
}
// RxJavaPlugins
public static <@NonNull T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
// 一般为null
BiFunction<? super Observable, @NonNull ? super Observer, @NonNull ? extends Observer> f = onObservableSubscribe;
if (f != null) {
return apply(f, source, observer);
}
return observer;
}
// ObservaleCreate
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<>(observer);
// 调用Observer中的onSubscrible方法,与下面subscribe方法区分开来
observer.onSubscribe(parent);
try {
// 调用自定义的ObservableOnSubscribe的subscribe方法
// 随后开始执行自定义的方法,包括emitter.onNext
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
接下来流程就交给了ObservableEmitter来进行传递
// ObservaleCreate.CreateEmitter
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
return;
}
// 进行校验
if (!isDisposed()) {
// 调用绑定的observer的onNext方法
observer.onNext(t);
}
}
}
可以看到,onNext
dispose
// ObservaleCreate.CreateEmitter
// CreateEmitter extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
// DisposableHelper
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
// DISPOSED 是实现Disposed接口的常量的实例
Disposable d = DISPOSED;
if (current != d) {
// 通过Atomic类 CAS操作进行, 可能是避免多线程下错误情况
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
// CAS失败,则循环进行,直到正确为止
current.dispose();
}
return true;
}
}
return false;
}
总结
RxJava中设计模式采用了装饰者设计模式,例如CreateEmitter包装了自定义Observer接口实现类。缺点:整个调用栈变得很长
线程切换
核心: 装饰者模式下的逐层封装与拆包调用,依赖于内部Scheduler线程池进行线程调度
使用
Android中对RxJava一般的需求场景是 在子线程中实现耗时操作,然后回到主线程实现UI操作,即:
- 被观察者在子线程中产生事件,例如进行网络请求获取数据
- 观察者在主线程接收并响应事件,例如基于获取得到的数据进行UI更新
RxJava如何实现线程切换?
采用RxJava内置的线程调度器Scheduler,通过subscribeOn 与 observeOn 方法实现
- subscribeOn 指定被观察者发送事件的线程
- observeOn 指定观察者接收响应事件的线程
RxJava中内置的用于调度的线程类型,并使用线程池维护,因此线程调度效率高
类型 | 含义 | 应用场景 |
---|---|---|
Schedulers.immediate() | 当前线程 = 不指定线程 | 默认 |
AndroidSchedulers.mainThread() | Android主线程 | 操作UI |
Schedulers.newThread() | 常规新线程 | 耗时等操作 |
Schedulers.io() | io操作线程 | 网络请求、读写文件等io密集型操作 |
Schedulers.computation() | CPU计算操作线程 | 大量计算操作 |
需要注意的是subScribeOn只能执行一次,其后赋值无效。
而observeOn可以赋值多次,每次指定Scheduler即代表此后与Observer相关的代码均从指定的Scheduler处执行
val observer = Observable.create(ObservableOnSubscribe<Int> {
println("observable subscribe")
println("current Thread: ${Thread.currentThread().name}")
it.onNext(1)
it.onComplete()
}).subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.doOnNext {
println("doOnNext")
println("current Thread: ${Thread.currentThread().name}")
}
.observeOn(Schedulers.newThread())
.doOnComplete {
println("doOnComplete")
println("current Thread: ${Thread.currentThread().name}")
}
.observeOn(Schedulers.newThread())
.subscribe (object : Observer<Int> {
override fun onSubscribe(p0: Disposable) {
println("onSubscribe")
println("current Thread: ${Thread.currentThread().name}")
}
override fun onError(p0: Throwable) {
}
override fun onComplete() {
println("onComplete")
println("current Thread: ${Thread.currentThread().name}")
}
override fun onNext(p0: Int) {
println("onNext")
println("current Thread: ${Thread.currentThread().name}")
}
})
Thread.sleep(2000)
输出
onSubscribe
current Thread: main
observable subscribe
current Thread: RxCachedThreadScheduler-1
doOnNext
current Thread: RxNewThreadScheduler-1
doOnComplete
current Thread: RxNewThreadScheduler-2
onNext
current Thread: RxNewThreadScheduler-3
onComplete
current Thread: RxNewThreadScheduler-3
subscribeOn
接下来源码分析可能会很绕,自己都花了一天的时间才勉强看懂大致流程,此处不了解可以先不管,后续会有流程图总结。
同时,阅读该部分源码一定要梳理清楚每个Observer或Observable中的source、observer参数
假设方法中只有subscribeOn,没有observerOn
调用subscribeOn时通过装饰模式,将scheduler与当前Observable包装为一个ObservableSubscribeOn并返回,返回是很重要的。因为此后的操作特性是基于ObservableSubscribeOn上的了
// Observable
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
// 装饰者模式构造ObservableSubscribeOn对象
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
// ObservableSubscribeOn
@Override
public void subscribeActual(final Observer<? super T> observer) {
// 构建SubscribeOnObserver
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
// observer调用自定义的onSubscribe方法,此时还是在当前线程中,并不是指定线程中
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
// Scheduler
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// 根据之前指定的scheduler的不同类来创建Worker
final Worker w = createWorker();
// 获取到Runnable
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 将Runnable和Wroker封装为DisposeTask
DisposeTask task = new DisposeTask(decoratedRun, w);
// Worker在指定delay后执行这个disposeTask
w.schedule(task, delay, unit);
return task;
}
// IoScheduler(其他Scheduler也可以)
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
// IoScheduler.EventLoopWorker
static final class EventLoopWorker extends Scheduler.Worker {
// 实质包含set集合
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
// 如果task已经取消与observer的绑定关系了
if (tasks.isDisposed()) {
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
// ThreadWorker(此处为NewThreadWorker)
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
// 获取按规则包装后的Runnbale
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 将runnable打包
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
// CompositeDisposable.add
// 通过Synchronized尝试将该Disposable加入到自身的set中
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
// 看是否是立刻执行还是等一会再执行
if (delayTime <= 0) {
// 通过ScheduledExecutorService执行,这里应该是自定义的线程池
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
public final class CompositeDisposable implements Disposable, DisposableContainer {
OpenHashSet<Disposable> resources;
}
到此,梳理一下通过subscribeOn后(也即通过ObservableSubscribeOn)调用subscribe方法的流程
- 调用Observable的subscribe,调用具体实现子类,即ObservableSubscribeOn的subscribeActual方法
- subscribeActual:
- 包装Observer为SubscribeOnObserver
- 调用observer的onSubscribe方法
- 包装SubscribeOnObserver为SubscribeTask(实现Runnable接口)
- 让subscribeOn指定的Scheduler通过scheduleDirect执行包装
- scheduleDirect:
- 创建基于不同Scheduler的Worker,实质是基于内部CachedWorkerPool创建继承于Worker的EventLoopWorker
- 基于Worker与Runnable封装成DisposeTask便于返回Disposable对象
- worker通过schedule准备加入该task
- schedule:Worker内部的threadWorker执行scheduleActual
- scheduleActual:
- 包装Runnable
- 通过线程池submit或者schedule当前Runnable
- 最终返回为一个Disposable
observeOn
假设这里既有subscribeOn又有observerOn,且subscribeOn先于observeOn
// Observable
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
// 包装为ObservableObserveOn对象
// 仔细品味这里的this
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
// protected final ObservableSource<T> source;
this.source = source;
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
到这里后,请问subscribeActual中source对象是什么?通过前面的this关键字,即可知,此时this对象就是调用observerOn方法前的那个基于Observable的对象,在此处就是 ObservableSubscribeOn,并最终调用subscribe,也就回到了我们之前所梳理的subscribeOn中的流程
不同之处是在于,此时调用subscribe方法时,对自定义Observer进行了包装,构成ObserveOnObserver,但之前是直接传递了一个自定义的Observer的。此时还为ObserveOnObserver指定了worker,为observeOn中指定的worker。所以再次进入到Observable的subscribe方法,并进入ObserveOnObserver的subscribeActual方法
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
此后与之前Subscribe步骤中无太大差别
那么observer的线程切换一定是发生在自定义ObservableOnSubscribe实现类subscribe方法中,通过onNext或onComplete
// ObservableCreate.CreateEmitter
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
// ObservableSubscribeOn.SubscribeOnObserver
public void onNext(T t) {
downstream.onNext(t);
}
// ObservableSubscribeOn
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
// 将结果入队
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
// 执行具体SchedulerWorker的schedule方法
worker.schedule(this);
}
}
流程总结
先关注这里的包装顺序关系:
- 包装自定义Observable(即ObservableOnSubscribe):
- 自定义的ObservableOnSubscribe通过Observable.create被包装为ObservableCreate
- 通过subscribeOn,被包装为ObservableSubscribeOn
- 通过observeOn,被包装为ObservableObserveOn
- 调用subscribe方法,根据具体包装类的subscribeActual方法实现不同,包装自定义Observer:
- 通过ObservableObserveOn.subscribeActual,自定义observer被包装为ObserveOnObserver,并调用其source的subscribe方法,传递该对象
- 调用ObservableSubscribeOn.subscribeActual,将ObserveOnObserver包装为SubscribeOnObserver,并最终包装为SubscribeTask(Runnable)传递给scheduler
- SubscribeTask 为SubscribeOnObserver的内部类,run方法为source.subscribe(parent); source为SubscribeOnObserver的实际承载类,parent为封装的SubscribeOnObserver
- 任务交给SubscribeOn所在线程池进行执行,该执行时:
- 取出SubscribeTask,执行run方法,调用source.subscribe(parent); 此时source为ObservableSubscribeOn的实际承载类,即ObservableCreate,parent为包装的SubscribeOnObserver
- 调用ObservableCreate的subscribeActual方法,创建CreateEmitter为parent,设置CreateEmitter的observer为parent。并调用source.subscribe(parent); source为自定义的ObservableOnSubscribe完成了回调过程
- 进行onNext回调时:
- 调用CreateEmitter.onNext方法,会调用到observer.onNext(t); observer为之前传递的SubscribeOnObserver
- 调用SubscribeOnObserver.onNext方法,其实是调用了downstream.onNext(t);(downstream即为source ObserveOnObserver)
- 调用ObserveOnObserver.onNext: 将onNext数据添加到队列中,并调用schedule方法准备执行任务。最终是进入到对应Scheduler的executor安排执行
用一张流程图总结一下全流程(注意理解其中包装关系)
![RxJava线程切换流程图](https://s401177923-1302493622.cos.ap-nanjing.myqcloud.com/mdImages/image-20230827193531680.png)
如何快速理解?
其实核心思想还是装饰者模式贯穿始终,查看流程图能比较直观的了解到其实还是顺序装包与逆序拆包的操作
- Observable装包:对Observable进行subscribeOn、observerOn操作:实质是对Observable进行封装,每调用一个函数就多包一层
- Observer装包:observable包装完后执行subscribe方法,接下来就是对observer包装,之前observable包装成什么样子,那就从外到里让每层observable根据自己需要又对observer进行包装,最终包装成一个可执行的Runnable,并提交给线程池
- 启动Subscribe Runnable:做重写ObservableOnSubscribe中的subscribe的事,并通过next等操作准备通知observer。对包装的observer逐层拆包,执行每一层的onNext,最终又启动一个Runnable,在另一个线程中执行自定义Observer的方法
- 启动Observer Runnable:在指定线程中运行自定义Observer的方法