Rxjava源码学习

ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 http://reactivex.io/。

ReactiveX

微软给的定义是,Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序,使用Rx,开发者可以用Observables表示异步数据流,用LINQ操作符查询异步数据流, 用Schedulers参数化异步数据流的并发处理,Rx可以这样定义:Rx = Observables + LINQ + Schedulers。

1
2
3
4
5
6
7
8
9
10
11
//C#3.0 LINQ 查询语法
//首先来看一个很简单的LINQ查询例子,查询一个int数组中小于5的数字,并按照大小顺序排列:
static void Main(string[] args)
{
int[] arr = new int[] { 8, 5, 89, 41, 1, 2, 3, 65, 1 };
var m = from n in arr where n < 5 orderby n descending select n;//小于5,并且倒叙排列显示
foreach (var n in m){
Console.WriteLine(n);
}
Console.ReadLine();
}

RxJava 简介

github介绍:一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库(RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.)

RxJava 特性

Zero Dependencies
< 1MB Jar
Java 6+ & Android 2.3+
Java 8 lambda support
Polyglot (Scala, Groovy, Clojure and Kotlin)
Non-opinionated about source of concurrency (threads, pools, event loops, fibers, actors, etc)
Async or synchronous execution
Virtual time and schedulers for parameterized concurrency
RxJava精华可以浓缩为Observable(被观察者)在一系列同步或者异步的操作后通知Observer/Subscriber(观察者)

1
2
3
4
Observable -> Operator 1 -> Operator 2 -> Operator 3 -> Subscriber


function

RxJava Demo

Rxjava主要是Observable(被观察者)与Observer(观察者),Observer子类subscriber拓展取消订阅

1.Observable -> Subscriber

建立观察者与订阅者,并建立两者间的联系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
//被观察者
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {

@Override
public void call(Subscriber<? super String> subscriber) {
String str = "do some thing";
subscriber.onNext(str);
subscriber.onCompleted();
}
});

//观察者 or 订阅者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Toast.makeText(HelloWorldActivity.this, "complete", Toast.LENGTH_SHORT).show();
}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {
Toast.makeText(HelloWorldActivity.this, s, Toast.LENGTH_SHORT).show();
}
};

//建立被观察者与订阅者关系
observableJust.subscribe(subscriber);

//代码简洁压缩一 just or from
Observable.just("do some thing")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Toast.makeText(HelloWorldActivity.this, s, Toast.LENGTH_SHORT).show();
}
});

//结合lambda压缩二 just or from
Observable.just("Hello, world!")

function (s)
{
} .subscribe(s -> Toast.makeText(HelloWorldActivity.this, s, Toast.LENGTH_SHORT).show());

2.Operator(Observable -> Operator -> Subscriber)

RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一,观察者与订阅者的数据传递过程可以通过无数个Operator进行处理加工的,转换成不同的事件或事件序列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
   //map operator 事件对象的直接变换,一对一的转化
Observable.just(R.mipmap.ic_launcher)
.map(new Func1<Integer, Drawable>() {

@Override
public Drawable call(Integer id) {
return getTheme().getDrawable(id.intValue());
}
}).subscribe(new Action1<Drawable>() {
@Override
public void call(Drawable img) {
Toast.makeText(OperatorsActivity.this, img.toString(), Toast.LENGTH_SHORT).show();
}
});

//flatMap 事件对象的直接变换,一对多的转化,flatMap() 中返回的是个 Observable 对象,常用于嵌套的异步操作
int[][] data = {{100, 200, 300}, {500, 600, 700, 800}, {900, 1000, 1100, 1200, 1300}};
Observable.just(data)
.flatMap(new Func1<int[][], Observable<int[]>>() {
@Override
public Observable<int[]> call(int[][] ints) {
return Observable.from(ints);
}
}).subscribe(new Action1<int[]>() {
@Override
public void call(int[] ints) {
Log.i("flatmap", "length:" + ints.length);
}
});
//filter 过滤
Observable.from(new Integer[]{12,34,53,342,6353,234})
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer %2 ==0;
}
}).take(2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i("filter", "Integer:" + integer);
}
});
//lift 大Boss Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。源码如下:
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

//OnSubscribeLift 源码:
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}

除了以上介绍的Operator,还有其他的如下:

1
2
3
4
5
6
7
8
9
10
11
12
doOnNext          在处理下一个事件之前要做的事。
take 最多保留的事件数。
debounce 间隔指定时间内的事件将被丢弃
merge 合并两个Observable为一个Observable
concat 顺序执行多个Observable,个数为1 ~ 9
compose 与 flatMap 类似,都是进行变换,返回Observable对象,激活并发送事件
first 只发送符合条件的第一个事件
timer 定时操作
interval 定时的周期性操作
throttleFirst 与debounce类似,也是时间间隔太短,就丢弃事件
Single Observabled的精简版,回调OnSuccess/OnError
Subject 既是Observable又是Observer,既是事件的生产者,又是事件的消费者,相当于自身是一条管道,从一端进,又从另一端出

3.Scheduler

Rxjava默认事件的产生和消费均在同一个线程进行,如果需要对不同的事件Observable -> Operator -> Subscriber放在不同的线程进行处理,该流程主要是通过Scheduler线程调度器进行,提供的方法有两个subscribeOn()observeOn(),Rxjava通过这两个函数指定每一个代码片段应该运行在什么样的线程,subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。 * observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。默认主要提供4个调度器:
3.1 Schedulers.io(): I/O 操作(读写文件、数据库、网络请求等),与newThread()差不多,区别在于io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 效率比 newThread() 更高。值得注意的是,在 io() 下,不要进行大量的计算,以免产生不必要的线程;

3.2 Schedulers.newThread(): 开启新线程操作;
3.3 Schedulers.immediate(): 默认指定的线程,也就是当前线程;
3.4 Schedulers.computation():计算所使用的调度器。这个计算指的是 CPU 密集型计算,即不会被 I/O等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。值得注意的是,不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU;

android 提供了拓展的线程:
AndroidSchedulers.mainThread(): RxJava 扩展的 Android 主线程;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onNext("do some thing" + i);
}
subscriber.onCompleted();
}
})
//生产事件在单独的线程池里面执行
.subscribeOn(Schedulers.io())
//消费事件在单独的主线程里面执行
.observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {
Toast.makeText(SchedulerActivity.this, s, Toast.LENGTH_SHORT).show();
}
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (memoryCache != null) {
subscriber.onNext(memoryCache);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String cachePref = rxPreferences.getString("cache").get();
if (!TextUtils.isEmpty(cachePref)) {
subscriber.onNext(cachePref);
} else {
subscriber.onCompleted();
}
}
});

Observable<String> network = Observable.just("network");

//主要就是靠concat operator来实现
Observable.concat(memory, disk, network)
.first()
.subscribeOn(Schedulers.newThread())
observeo()
.subscribe(s -> {
memoryCache = "memory";
System.out.println("--------------subscribe: " + s);
});
onbseron()
.

Rxjava MissingBackpressureException问题

消息发送的速度与消费速度不一致 对应策略:Backpressure 策略
很多 Rx 操作函数内部都使用了 backpressure 从而避免过多的数据填满内部的队列。这样处理慢的消费者就会把这种情况传递给前面的消费者,前面的消费者开始缓冲数据直到他也缓存满为止再告诉他前面的消费者。Backpressure 并没有消除这种情况。只是让错误延迟发生,我们还是需要处理这种情况。

参考资料
github源码库
RxJava适用场景小结
RxJava 从入门到出轨
关于 RxJava 最友好的文章—— RxJava 2.0 全新来袭
Grokking RxJava, Part 1: The Basics
RxJava 2.0有什么不同
RxJava 2.0中backpressure(背压)概念的理解
http://gank.io/post/560e15be2dca930e00da1083
NotRxJava guide for lazy folks
Agera Wiki 中文版
ReactiveX/RxJava文档中文版
[ Android RxJava使用介绍(三) RxJava的操作符]http://blog.csdn.net/job_hesc/article/details/46495281
Awesome-RxJava

文章目录
  1. 1. ReactiveX
  2. 2. RxJava 简介
  3. 3. RxJava 特性
  4. 4. RxJava Demo
    1. 4.1. 1.Observable -> Subscriber
    2. 4.2. 2.Operator(Observable -> Operator -> Subscriber)
    3. 4.3. 3.Scheduler
  5. 5. Rxjava MissingBackpressureException问题
,