RxJava

RxJava 入门

RxJava 入门理念

Rx 是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便地处理异步数据流。RxJava 充分体现了函数响应式编程思维,解决了传统异步代码过于繁琐的痛点,采用函数式方式使得代码更好维护以及编写。

函数式编程的概念和特性

概念:

为了解决烦琐复杂的并发编程以及分布式处理、多线程等常见问题。

函数式编程往往由于其数据不可变,因此没有并发编程中的问题,相比于并发编程,更安全。可以将解决多种业务逻辑单独抽离成对一个函数的求解运算,在其求解过程中,常常可运用于单一原则或多次复用某一函数,同时避免了状态以及变量的概念,,思维更接近数学运算。

特性: 函数式编程的特性

响应式编程的概念和特性

概念:

响应式编程是一种面向数据流及其变化传播的编程方式,意味着静态以及动态的数据流可以很直观地被表达展示,其计算模型会以一种变化的值通过数据流的方式进行传播。

特性: 响应式编程

RxJava 基础知识

Rx 模型特性: Rx 模型特性

  • 创建 Observer(观察者),代表着不同的线程中执行异步处理事件业务逻辑回调通知观察者,观察者在未执行的时候永远待命。

  • 使用 subscribe 在两者间进行订阅。

比如:

1
2
3
4
5
6
7
8
9
10
//subsrcibr里面分别表示onNext,onError,onComplete
fun rxjava() {
Observable
.just("this is create Demo")
.subscribe(
{ Log.d("zzx","(msg:$it)-->>")},
{ Log.d("zzx","(error:${it.message?: "Unknown Error"})-->>")},
{ Log.d("zzx","(onComplete:)-->>")},
)
}

RxJava 的生命周期

RxJava 有自己执行的生命周期,如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
fun rxjava() {
Observable.just("this is lifeCycleExample")
.doOnSubscribe {
//发生订阅后
Log.d("zzx","(doOnSubscribe:)-->>");
}.doOnLifecycle(
//订阅后是否可以取消
{ Log.d("zzx","(doOnLifecycle is disposed${it.isDisposed}:)-->>");},
{ Log.d("zzx","(doOnLifecycle is run:)-->>");}
).doOnEach {
//Observable每次发送数据都会执行
Log.d("zzx","doOnEach:${when {
it.isOnNext -> "onNext"
it.isOnError -> "onError"
it.isOnComplete -> "onComplete"
else -> "nothing"
}}");
}.doOnNext {
//onNext前调用
Log.d("zzx","(doOnNext:$it)-->>");
}.doAfterNext {
//onNext后调用
Log.d("zzx","(doAfterNext:$it)-->>");
}.doOnError {
//onError前调用
Log.d("zzx","(doOnError:${it.message ?: "unknown error message"}:)-->>");
}.doOnComplete {
//正常调用onComplete调用
Log.d("zzx","(doOnComplete:)-->>");
}.doAfterTerminate {
//onComplete或onError执行后触发
Log.d("zzx","(doAfterTerminate:)-->>");
}.doFinally {
//中止后调用,无论正常执行或异常终止
Log.d("zzx","(doFinally:)-->>");
}.subscribe{ Log.d("zzx","(onNext value:$it)-->>");}
}

其中 doFinallydoAfterTerminal 触发顺序与调用顺序相反。

Observable

RxJava 中的 Observable 是核心组件之一,它表示一个可观察的序列,可以发射多个数据项或事件,并且允许观察者(Observers)订阅以接收这些数据项或事件。

特点:

  1. 数据流Observable 表示一个数据流,可以是有限的或无限的。它可以发射任意数量的数据项,并且可以在任何时候发射错误或完成事件。

  2. 异步Observable 可以在不同的线程上运行,从而支持异步操作,这对于处理 IO 操作或其他耗时任务非常有用。

  3. 响应式编程Observable 是响应式编程的基础,它允许你以声明式的方式处理数据流和事件流。

其中, ObservableobserveOn()subscribeOn() 两个用于控制线程调度的关键方法。这两个方法允许你指定代码在哪个线程上执行,从而实现异步编程和线程切换。

subscribeOn() 方法用于指定上游操作(Observable)在哪个线程上执行。它影响的是数据的发射和上游操作符的执行线程。通常用于指定 Observable 开始执行的线程。如:

1
2
3
4
Observable.just(1, 2, 3)
.subscribeOn(Schedulers.io()) // 指定上游操作在IO线程上执行
.observeOn(AndroidSchedulers.mainThread()) // 指定下游操作在主线程上执行
.subscribe(item -> System.out.println("Received: " + item));

observeOn() 方法用于指定下游操作(Observer)在哪个线程上执行。它影响的是数据的接收和下游操作符的执行线程。通常用于指定 Observer 处理数据的线程。如:

1
2
3
4
Observable.just(1, 2, 3)
.subscribeOn(Schedulers.io()) // 指定上游操作在IO线程上执行
.observeOn(AndroidSchedulers.mainThread()) // 指定下游操作在主线程上执行
.subscribe(item -> System.out.println("Received: " + item));

调用多次 subscribeOn(), 只有第一次调用有效;调用多次 observeOn(),每次都会切换线程。

原因:

subscribeOn()属于从下游传到上游的操作符,用于指定Observable在哪个线程运行,他是在subsribe发起订阅后从下往上传,所以无论中间有多少个subscribeOn,最后Observable一定在第一个subsribeOn的线程中执行。

常用的从下游往上游走的操作:

  • doOnSubscribe
  • subscribeOn
  • subscribe

其余基本都是从上游往下游走。

常用操作符

map

作用: 对观察者发射的事件进行处理,转换成另一个事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable.just(1,2,3)
.map {
it.toString()
}.subscribe(object : Observer<String> {
override fun onSubscribe(d: Disposable) {
Log.d("zzx","(${Thread.currentThread().name}:)-->>");
}

override fun onError(e: Throwable) {

}

override fun onNext(t: String) {
Log.d("zzx","(int to str:$t)-->>");
}

override fun onComplete() {

}

})

flatmap

作用,为每一个事件创建一个 Observable 对象,再将每个 Observable 合并成一个总的 Observable 对象并发送个 Observer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable.create<Int>{
it.onNext(1)
it.onNext(2)
it.onNext(3)
it.onNext(4)
it.onNext(5)
it.onNext(6)
}
.flatMap {
Observable.create<Int> {emitter ->
emitter.onNext(it+2)
}
.subscribeOn(Schedulers.io())
}
.subscribe{
Log.d("zzx",it.toString() + " " + Thread.currentThread())
}

但因为在不同线程中,发射顺序与生产顺序就不是一致的了。

concatmap

这个就是直接进行连接,而不是先合并再产生新的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Observable.create<Int>{
it.onNext(1)
it.onNext(2)
it.onNext(3)
it.onNext(4)
it.onNext(5)
it.onNext(6)
}
.concatMap {
Observable.create<Int> {emitter ->
emitter.onNext(it)
emitter.onComplete() //每次调用均需要使用一个onComplete()方法
}
.subscribeOn(Schedulers.io())
}
.subscribe{
println(it.toString() + " " + Thread.currentThread())
}

merge

就是最基本的合并操作,但在遇到错误时,错误后面的事件就不会再合并了。

1
2
3
val odds = Observable.just(1, 3, 5, 7)
val events = Observable.just(2, 4, 6, 8)
Observable.merge(odds, events).subscribe{}

mergeDelayError

这个和 merge 操作符的区别就是他可以在遇到错误时继续发射数据,在最后才报告错误。

delay

作用: 延迟 subscribe 发射事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 1. 指定延迟时间
// 参数1 = 时间;参数2 = 时间单位
delay(long delay,TimeUnit unit)
// 2. 指定延迟时间 & 调度器
// 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器
delay(long delay,TimeUnit unit,mScheduler scheduler)
// 3. 指定延迟时间 & 错误延迟
// 错误延迟,即:若存在Error事件,则如常执行,执行后再抛出错误异常
// 参数1 = 时间;参数2 = 时间单位;参数3 = 错误延迟参数
delay(long delay,TimeUnit unit,boolean delayError)
// 4. 指定延迟时间 & 调度器 & 错误延迟
// 参数1 = 时间;参数2 = 时间单位;参数3 = 线程调度器;参数4 = 错误延迟参数
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指
定延迟多长时间并添加调度器,错误通知可以设置是否延迟

repeat

作用: 就是无条件重复发送

1
2
3
Observable.just(1,2,3)
.repeat(3)
//重复发送三次事件

filter

作用: 根据指定条件进行过滤

1
2
3
4
5
//filter
Observable.just(1,2,3)
.filter {
it >= 2
}

Rxjava 与网络请求结合

### 连续进行两次网络请求

比如我们现在需要先登录,然后用返回的 $\texttt{cookie}$ 或 $\texttt{token}$,然后接着进行网络请求用户信息等。

我们就可以用 flatmap 这个操作符将网络请求下来的 $\texttt{cookie}$ 携带着包装成一个新的 Observable 然后进行下一次网络请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
fun login(): Observable<String> {
Thread.sleep(1000)
val cookie:String = "xxxxxx"
return Observable.just(cookie)
}

fun requireInfo(cookie: String):Observable<String> {
Thread.sleep(1000)
val info = "xxx"
return Observable.just(info)
}

fun request() {
login()
.flatMap {
requireInfo(it)
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { info ->
binding.textView.text = info //更新UI,因为已经在主线程中
}
}