RxJava
RxJava 入门
RxJava 入门理念
Rx 是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便地处理异步数据流。RxJava 充分体现了函数响应式编程思维,解决了传统异步代码过于繁琐的痛点,采用函数式方式使得代码更好维护以及编写。
函数式编程的概念和特性
概念:
为了解决烦琐复杂的并发编程以及分布式处理、多线程等常见问题。
函数式编程往往由于其数据不可变,因此没有并发编程中的问题,相比于并发编程,更安全。可以将解决多种业务逻辑单独抽离成对一个函数的求解运算,在其求解过程中,常常可运用于单一原则或多次复用某一函数,同时避免了状态以及变量的概念,,思维更接近数学运算。
特性: 
响应式编程的概念和特性
概念:
响应式编程是一种面向数据流及其变化传播的编程方式,意味着静态以及动态的数据流可以很直观地被表达展示,其计算模型会以一种变化的值通过数据流的方式进行传播。
特性: 
RxJava 基础知识
Rx 模型特性: 
比如:
1 2 3 4 5 6 7 8 9 10
| fun rxjava() { Observable .just("this is create Demo") .subscribe( { Log.d("zzx","(msg:$it)-->>")}, { Log.d("zzx","(error:${it.message?: "Unknown Error"})-->>")}, { Log.d("zzx","(onComplete:)-->>")}, ) }
|
RxJava 的生命周期
RxJava 有自己执行的生命周期,如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| fun rxjava() { Observable.just("this is lifeCycleExample") .doOnSubscribe { Log.d("zzx","(doOnSubscribe:)-->>"); }.doOnLifecycle( { Log.d("zzx","(doOnLifecycle is disposed${it.isDisposed}:)-->>");}, { Log.d("zzx","(doOnLifecycle is run:)-->>");} ).doOnEach { Log.d("zzx","doOnEach:${when { it.isOnNext -> "onNext" it.isOnError -> "onError" it.isOnComplete -> "onComplete" else -> "nothing" }}"); }.doOnNext { Log.d("zzx","(doOnNext:$it)-->>"); }.doAfterNext { Log.d("zzx","(doAfterNext:$it)-->>"); }.doOnError { Log.d("zzx","(doOnError:${it.message ?: "unknown error message"}:)-->>"); }.doOnComplete { Log.d("zzx","(doOnComplete:)-->>"); }.doAfterTerminate { Log.d("zzx","(doAfterTerminate:)-->>"); }.doFinally { Log.d("zzx","(doFinally:)-->>"); }.subscribe{ Log.d("zzx","(onNext value:$it)-->>");} }
|
其中 doFinally
与 doAfterTerminal
触发顺序与调用顺序相反。
Observable
RxJava 中的 Observable
是核心组件之一,它表示一个可观察的序列,可以发射多个数据项或事件,并且允许观察者(Observers)订阅以接收这些数据项或事件。
特点:
-
数据流:Observable
表示一个数据流,可以是有限的或无限的。它可以发射任意数量的数据项,并且可以在任何时候发射错误或完成事件。
-
异步:Observable
可以在不同的线程上运行,从而支持异步操作,这对于处理 IO 操作或其他耗时任务非常有用。
-
响应式编程:Observable
是响应式编程的基础,它允许你以声明式的方式处理数据流和事件流。
其中, Observable
有 observeOn()
和 subscribeOn()
两个用于控制线程调度的关键方法。这两个方法允许你指定代码在哪个线程上执行,从而实现异步编程和线程切换。
subscribeOn()
方法用于指定上游操作(Observable)在哪个线程上执行。它影响的是数据的发射和上游操作符的执行线程。通常用于指定 Observable 开始执行的线程。如:
1 2 3 4
| Observable.just(1, 2, 3) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(item -> System.out.println("Received: " + item));
|
observeOn()
方法用于指定下游操作(Observer)在哪个线程上执行。它影响的是数据的接收和下游操作符的执行线程。通常用于指定 Observer 处理数据的线程。如:
1 2 3 4
| Observable.just(1, 2, 3) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(item -> System.out.println("Received: " + item));
|
调用多次 subscribeOn()
, 只有第一次调用有效;调用多次 observeOn()
,每次都会切换线程。
原因:
subscribeOn()
属于从下游传到上游的操作符,用于指定Observable
在哪个线程运行,他是在subsribe
发起订阅后从下往上传,所以无论中间有多少个subscribeOn
,最后Observable
一定在第一个subsribeOn
的线程中执行。
常用的从下游往上游走的操作:
doOnSubscribe
subscribeOn
subscribe
其余基本都是从上游往下游走。
常用操作符
map
作用: 对观察者发射的事件进行处理,转换成另一个事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| Observable.just(1,2,3) .map { it.toString() }.subscribe(object : Observer<String> { override fun onSubscribe(d: Disposable) { Log.d("zzx","(${Thread.currentThread().name}:)-->>"); }
override fun onError(e: Throwable) {
}
override fun onNext(t: String) { Log.d("zzx","(int to str:$t)-->>"); }
override fun onComplete() {
}
})
|
flatmap
作用,为每一个事件创建一个 Observable
对象,再将每个 Observable
合并成一个总的 Observable
对象并发送个 Observer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Observable.create<Int>{ it.onNext(1) it.onNext(2) it.onNext(3) it.onNext(4) it.onNext(5) it.onNext(6) } .flatMap { Observable.create<Int> {emitter -> emitter.onNext(it+2) } .subscribeOn(Schedulers.io()) } .subscribe{ Log.d("zzx",it.toString() + " " + Thread.currentThread()) }
|
但因为在不同线程中,发射顺序与生产顺序就不是一致的了。
concatmap
这个就是直接进行连接,而不是先合并再产生新的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| Observable.create<Int>{ it.onNext(1) it.onNext(2) it.onNext(3) it.onNext(4) it.onNext(5) it.onNext(6) } .concatMap { Observable.create<Int> {emitter -> emitter.onNext(it) emitter.onComplete() } .subscribeOn(Schedulers.io()) } .subscribe{ println(it.toString() + " " + Thread.currentThread()) }
|
merge
就是最基本的合并操作,但在遇到错误时,错误后面的事件就不会再合并了。
1 2 3
| val odds = Observable.just(1, 3, 5, 7) val events = Observable.just(2, 4, 6, 8) Observable.merge(odds, events).subscribe{}
|
mergeDelayError
这个和 merge
操作符的区别就是他可以在遇到错误时继续发射数据,在最后才报告错误。
delay
作用: 延迟 subscribe
发射事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
delay(long delay,TimeUnit unit)
delay(long delay,TimeUnit unit,mScheduler scheduler)
delay(long delay,TimeUnit unit,boolean delayError)
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指 定延迟多长时间并添加调度器,错误通知可以设置是否延迟
|
repeat
作用: 就是无条件重复发送
1 2 3
| Observable.just(1,2,3) .repeat(3)
|
filter
作用: 根据指定条件进行过滤
1 2 3 4 5
| Observable.just(1,2,3) .filter { it >= 2 }
|
Rxjava 与网络请求结合
### 连续进行两次网络请求
比如我们现在需要先登录,然后用返回的 $\texttt{cookie}$ 或 $\texttt{token}$,然后接着进行网络请求用户信息等。
我们就可以用 flatmap
这个操作符将网络请求下来的 $\texttt{cookie}$ 携带着包装成一个新的 Observable
然后进行下一次网络请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| fun login(): Observable<String> { Thread.sleep(1000) val cookie:String = "xxxxxx" return Observable.just(cookie) }
fun requireInfo(cookie: String):Observable<String> { Thread.sleep(1000) val info = "xxx" return Observable.just(info) }
fun request() { login() .flatMap { requireInfo(it) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe { info -> binding.textView.text = info } }
|