最近看了下网上的RxJava源码分析,发现所基于的源码版本和最新的略有不同,于是自己动手翻阅了一下最新的源码版本(rxjava:2.2.8,rxandroid:2.1.1),并写分析博客作分享。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
//示例代码
private static void rxJavaTest() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) {
//1
emitter.onNext("onNext");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//2
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
//3
Log.d(TAG, s);
}
@Override
public void onError(Throwable e) {
//4
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
//5
Log.d(TAG, "onComplete");
}
});
}
|
上面RxJava最简单的使用,主要涉及被观察者Observable、观察者Observer和事件订阅subscribe()三个角色。
首先分析Observable的创建过程,即Observable的create()方法:
1
2
3
4
5
|
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
//这里传入的source对象是我们传入的匿名内部类
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
|
这里先看一下我们传入的匿名内部类类型ObservableOnSubscribe源码:
1
2
3
|
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
|
可见ObservableOnSubscribe是一个只含有一个抽象方法subscribe()的接口。
接着调用RxJavaPlugins的onAssembly()方法并传入一个新建的ObservableCreate对象,而ObservableCreate的构造函数如下:
1
2
3
4
|
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
|
内部操作很简单,只是把ObservableCreate的成员变量source赋值为传入的ObservableOnSubscribe对象,即最开始我们创建的匿名内部类。
RxJavaPlugins的onAssembly()方法调用如下:
1
2
3
4
5
6
7
|
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
|
首先会创建一个Function f,赋值为onObservableAssembly,而onObservableAssembly默认为null(当使用转换操作符时会进行赋值,在后面的文章中会进一步分析),所以会直接返回source。至此Observable创建完毕。
然后看Observer的内部实现,是一个包含4个抽象方法的接口:
1
2
3
4
5
6
|
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
|
最后来看重点subscribe()方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);//1
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer...");
subscribeActual(observer);//2
} catch (NullPointerException e) {
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
|
在1处调用RxJavaPlugins.onSubscribe(),将我们传入的observer进行包装:
1
2
3
4
5
6
7
|
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
if (f != null) {
return apply(f, source, observer);
}
return observer;
}
|
和上文的onAssembly()方法一样,这里的onObservableSubscribe也默认为null,所以返回值还是我们传入的observer本身。
重中之重在于2处的subscribeActual(observer):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
//subscribeActual()是Observable中的抽象方法,本文示例的具体实现是在ObservableCreate类中
protected void subscribeActual(Observer<? super T> observer) {
//1、创建事件发射器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//2、调用observer的onSubscribe()方法
observer.onSubscribe(parent);
try {
//3、调用source(即包装过的observer)的onSubscribe()方法,传入发射器
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
|
第1步,创建CreateEmitter,CreateEmitter类的主要代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
static final class CreateEmitter<T> extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
...
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
//将observer赋值给成员变量
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
...
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
...
//取消发送事件
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
...
}
|
第2步,调用observer的onSubscribe()方法,这样会走到示例代码的2处打印出"onSubscribe”。
第3步,调用source.subscribe(parent),source实际上就是示例代码中的传入的匿名内部类:
1
2
3
4
5
6
7
8
|
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) {
//1
emitter.onNext("onNext");
emitter.onComplete();
}
}
|
所以会走到示例代码的1处分别执行emitter的onNext()和onComplete()方法,而从CreateEmitter的内部实现可见emitter的onNext()和onComplete()方法的具体操作实际上就是调用观察者observer的onNext()和onComplete()方法,observer即示例代码中的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//2
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
//3
Log.d(TAG, s);
}
@Override
public void onError(Throwable e) {
//4
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
//5
Log.d(TAG, "onComplete");
}
}
|
于是就走到了示例代码3、4处,并在调用onNext()时传入1处emitter.onNext()方法传入的参数。
以上只是RxJava最基础的用法的分析,主要是对观察者模式的不同角色进行封装,达到链式调用形式的目的,并且设计了发射器Emitter的概念,形成流式事件订阅的模式。