这篇文章主要介绍了RxJava的消息发送和线程切换实现原理,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
RxJava是一个在Java 语言(一种计算机语言,尤用于创建网站)语言(一种计算机语言,尤用于创建网站)虚拟机上的响应式扩展,通过使用可观察的序列将异步和基于事件的程序组合起来的一个库。
它扩展了观察者模式来支持数据/事件序列,并且添加了操作符,这些操作符允许你声明性地组合序列,同时抽象出要关注的问题:比如低级线程、同步、线程安全和并发数据结构等。
RxJava相信大家都非常了解吧,今天分享一下RxJava的消息发送和线程源码的分析。最后并分享一个相关演示,让大家更加熟悉我们天天都在用的框架。
消息订阅发送
首先让我们看看消息订阅发送最基本的代码组成:
可观察的。create(new ObservableOnSubscribeString(){
@覆盖
公共无效订阅(observablemitterstring发射器)引发异常{
发射器。在下一个上(“插孔1”);
发射器。下一次(“杰克2”);
发射器。下一次(“杰克3”);
发射器。on complete();
}
});
观察者字符串observer=新观察者字符串(){
@覆盖
公共订阅无效(一次性d) {
Log.d(标签,‘在订阅上’);
}
@覆盖
公共void onNext(String s) {
Log.d(TAG, on next: s );
}
@覆盖
public void one rror(Throwable e) {
Log.d(TAG, on error: e . tostring());
}
@覆盖
公共void onComplete() {
Log.d(标签,‘完成时’);
}
};
observable.subscribe(观察者);
代码很简单,可观察为被观察者,观察者为观察者,然后通过observable.subscribe(观察者),把观察者和被观察者关联起来。被观察者发送消息(emitter.onNext(内容)),观察者就可以在onNext()方法里回调出来。
我们先来看可观察的,创建是用Observable.create()方法进行创建,源码如下:
公共静态测试可观察的创建(可观察订阅源){
对象助手。要求非空(source, source为null’);
返回rxjavaplugins。组装时(新的可观察的创建测试(源));
}
公共静态测试T要求非空(T对象,字符串消息){
if (object==null) {
抛出新的NullPointerException(message);
}
返回对象;
}
装配上的公共静态测试ObservableT(@非空ObservableT source){
功能?超可观测,扩展可观察f=onObservableAssembly
如果(f!=null) {
返回应用(f,来源);
}
返回来源;
}
可以看出,创建()方法里最主要的还是创建用可观察订户传入创建了一个可观察的创建对象并且保存而已。
公共最终类ObservableCreateT扩展可观察的
最终可观测订阅源;
公共可观察创建(observableonsubscribe源){
来源=来源
}
}
接着是创建观察者,这比较简单只是单纯创建一个接口对象而已
公共接口观察器{
void on subscribe(@ NonNull Disposable d);
void on next(@ NonNull T T);
出错时void(@ NonNull Throwable e);
完成时作废();
}
订阅发送消息
observable.subscribe(观察者)的订阅方法如下:
公开最终作废订阅(观察者?超级测试观察者){
对象助手。要求非空(观察者,观察者为null’);
尝试{
observer=rxjavaplugins。onsubscribe(这个,观察者);
对象助手。要求非空(观察者,插件返回空观察者’);
subscribeActual实际(观察者);
} catch(NullPointerException e){//没有PMD
扔e;
} catch(可投掷e) {
例外情况。throwif fatal(e);
rxjavaplugins。关于误差(e);
NullPointerException npe=new NullPointerException(实际上不是,但是不能因为标准英语而抛出其他异常);
npe。初始化原因(e);
扔npe
}
}
//ObjectHelper.requireNonNull()方法
公共静态测试T要求非空(T对象,字符串消息){
if (object==null) {
抛出新的NullPointerException(message);
}
返回对象;
}
//RxJavaPlugins.onSubscribe()方法
公共静态测试观察者?subscribe上的超级T(@非空可观察到的来源,@非空观察者?超级测试观察者){
双功能?超可观测,超级观察者,扩展观察器f=onObservableSubscribe
如果(f!=null) {
返回应用(f,源,观察者);
}
回报观察者;
}
从上面源码可以看出requireNonNull()只是做非空判断而已,而RxJavaPlugins.onSubscribe()也只是返回最终的观察者而已。所以关键代码是抽象方法subscribeActual实际(观察者);那么订户实际对应哪个代码段呢?
还记得Observable.create()创建的可观察的创建类吗,这就是订户实际实际()具体实现类,源码如下:
受保护的void subscribeActual(Observer?超级测试观察者){
createemittert parent=new createemittert(观察者);
观察者网(父);
尝试{
源.订阅(父);
} catch (Throwable ex) {
例外情况。throwif fatal(ex);
父母。出错时(例如);
}
}
从上面的代码可以看出,首先创建了一个创建发射器对象并传入观察者,然后回到观察者的订阅()方法,而来源就是我们之前创建可观察的创建传入的可观察订户对象。
类CreateEmitterT扩展AtomicReferenceDisposable
实施ObservableEmitterT,一次性{
}
而创建发射器又继承观测发射器接口,又回调可观察订户的订阅方法,对应着我们的:
可观察的。create(new ObservableOnSubscribeString(){
@覆盖
公共无效订阅(observablemitterstring发射器)引发异常{
发射器。在下一个上(“插孔1”);
发射器。下一次(“杰克2”);
发射器。下一次(“杰克3”);
发射器。on complete();
}
});
当它发送消息既调用emitter.onNext()方法时,既调用了创建发射器的onNext()方法:
公共void onNext(T t) {
if (t==null) {
on error(new NullPointerException( on next用空调用2.x运算符和源中通常不允许空值。));
返回;
}
如果(!isDisposed()) {
观察者。在下一次(t);
}
}
可以看到最终又回调了观察者的onNext()方法,把被观察者的数据传输给了观察者。有人会问
isDisposed()是什么意思,是判断要不要终止传递的,我们看emitter.onComplete()源码:
公共void onComplete() {
如果(!isDisposed()) {
尝试{
观察者。on complete();
}最后{
dispose();
}
}
}
公共静态布尔dispose(AtomicReferenceDisposable字段){
可支配电流=字段。get();
一次性d=已处理;
如果(当前!=d) {
当前=字段。getandset(d);
如果(当前!=d) {
如果(当前!=null) {
当前。dispose();
}
返回真实的
}
}
返回错误的
}
public static boolean被释放(Disposable d){
返回d==已处理
}
处置()方法是终止消息传递,也就付了个处理常量,而isDisposed()方法就是判断这个常量而已。这就是整个消息订阅发送的过程,用的是观察者模式。
线程切换
在上面模板代码的基础上,线程切换只是改变了如下代码:
可观察的。订阅(计划程序。io())。观察者离子(机器人调度程序。主线程())。订阅(观察员);
下面我们对线程切换的源码进行一下分析,分为两部分:subscribeOn()和观察()
subscribeOn()
首先是订阅()源码如下:
公众最终意见可在(调度程序调度程序){
对象助手。要求非空(调度程序,调度程序为null’);
返回rxjavaplugins。on assembly(new ObservableSubscribeOnT(this,scheduler));
}
我们传进去了一个调度程序类,调度程序是一个调度类,能够延时或周期性地去执行一个任务。
调度程序有如下类型:
类型
使用方式
含义
使用场景
IoScheduler
Schedulers.io()
超正析象管操作线程
读写南达科他州卡文件,查询数据库,访问网络等超正析象管(图片Orthicon)密集型操作
NewThreadScheduler
Schedulers.newThread()
创建新线程
耗时操作等
单一调度程序
Schedulers.single()
单例线程
只需一个单例线程时
计算调度程序
Schedulers.computation()
中央处理器计算操作线程
图片压缩取样、xml、json解析等中央处理器密集型计算
蹦床时间表
Schedulers.trampoline()
当前线程
需要在当前线程立即执行任务时
处理器调度程序
AndroidSchedulers.mainThread()
机器人主线程
更新用户界面等
接着就没什么了,只是返回一个可观察订户b对象而已。
observeOn()
首先看源码如下:
公共最终观察(调度程序调度程序){
返回观察者ion(scheduler,false,buffer size());
}
public final ObservableT observion(Scheduler Scheduler,boolean delayError,int bufferSize) {
对象助手。要求非空(调度程序,调度程序为null’);
对象助手。验证正数(缓冲区大小,“缓冲区大小”);
返回rxjavaplugins。装配时(新的可观察观察ont(this,scheduler,delayError,buffer size));
}
这里也是没什么,只是最终返回一个可观察的对象而已。
接着还是像原来那样调用订阅()方法进行订阅,看起来好像整体变化不大,就是封装了一些对象而已,不过着恰恰是RxJava源码的精华,当他再次调用订户实际实际()方法时,已经不是之前的ObservableCreate()里订户实际方法了,而是最先调用可观察的的订户实际实际()方法,对应源码如下:
受保护的void subscribeActual(Observer?超级测试观察者){
蹦床时间表的调度程序实例){
源.订阅(观察者);
}否则{
调度程序工人w=调度。创建worker();
来源。subscribe(new observeonobserver(observer,w,delayError,buffer size));
}
}
在这里有两点要讲,一点是观察者是执行观察者的线程,后面还会详解,然后就是来源。订阅,这个来源。订阅调的是可观察订户b的订阅方法,而订阅方法因为继承的也是可观察的,是可观察量里的方法,所以和上面的可观察的创建一样的方法,所以会调用可观察订户b里的订户实际实际()方法,对应的代码如下:
公共void subscribeActual实际(最终观察者?超级测试){
最终SubscribeOnObserverT父级=新subscribeonobserver
订阅(母公司);
父母。设置一次性(调度程序。直接调度(新订阅任务(父));
}
上面代码中,首先把观察者返回给来的用SubscribeOnObserver "包装"起来,然后在回调观察者的onSubscribe(),就是对应模板代码的订阅()方法。
接着看订阅任务类的源码:
最终类订阅任务实现可运行{
私有最终SubscribeOnObserverT父级;
订阅任务(subscribeonobserver父级){
this.parent=parent
}
@覆盖
公共无效运行(){
源.订阅(父);
}
}
其中的源.订阅(父级),就是我们执行子线程的回调方法,对应我们模板代码里的被观察者的订阅()方法。它放在运行()方法里,并且继承可运行,说明这个类主要是线程运行。接着看scheduler.scheduleDirect()方法对应的源码如下:
公共一次性计划直接(@非空可运行运行){
返回scheduleDirect(运行,0L,时间单位.纳秒);
}
public Disposable schedule direct(@ NonNull Runnable run,long delay,@NonNull TimeUnit unit) {
最终工人w=创建Worker();
最终可运行修饰运行=rxjavaplugins。按时(运行);
dispose task task=new dispose task(修饰运行,w);
w .时间表(任务、延迟、单位);
返回任务;
}
在这里,createWorker()也是一个抽象方法,调用的是我们的调度类对应的调度程序类里面的方法,这里是IoScheduler类,
公共最终类IoScheduler扩展调度程序{
最终AtomicReferenceCachedWorkerPool池;
//省略.
公共工作者createWorker() {
返回新的EventLoopWorker(池。get());
}
静态最终类EventLoopWorker扩展调度程序。工人{
私有最终复合可释放任务;
私有最终CachedWorkerPool池;
私有最终线程工作器线程工作器;
final原子布尔once=new原子布尔();
EventLoopWorker(CachedWorkerPool池){
this.pool=池;
这个。tasks=new CompositeDisposable();
这个。线程工作者=池。get();
}
//省略.
@NonNull
@覆盖
公共可处置计划(@NonNull Runnable action,long delayTime,@NonNull TimeUnit单位){
if (tasks.isDisposed()) {
//不要预定,我们退订了
返回空的一次性的.实例;
}
返回线程工作器。实际计划(行动、延迟时间、单位、任务);
}
}
}
静态最终类CachedWorkerPool实现可运行{
//省略.
ThreadWorker get() {
if (allWorkers.isDisposed()) {
返回关机_线程_工作线程
}
而(!expiringworkerqueue。isempty()){
线程工作器线程工作器=expiringworkerqueue。poll();
如果(threadWorker!=null) {
返回线程工人
}
}
线程工人w=新线程工人(线程工厂);
所有工人。添加(w);
返回w;
}
//省略.
}
这就是IoScheduler的createWorker()的方法,其实最主要的意思就是获取线程池,以便于生成子线程,让订阅任务()可以运行。然后直接调用w。进度表(任务、延迟、单位)方法让它在线程池里执行。上面中那线程工人的源码如下:
静态最终类线程工人扩展了NewThreadWorker {
私有长到期时间
线程工作器(线程工厂线程工厂){
超级(线厂);
this.expirationTime=0L
}
//省略代码.
}
公共类NewThreadWorker扩展调度程序。工人实施一次性{
私有最终ScheduledExecutorService执行器;
公共新线程工人(线程工厂线程工厂){
executor=schedulerpoolfactory。创建(thread factory);
}
公共计划的可运行计划实际(最终可运行运行,long delayTime,@NonNull TimeUnit unit,@ Nullable disposable container parent){
runnable修饰run=rxjavaplugins。按时(运行);
预定可运行Sr=新预定可运行(修饰运行,父);
如果(家长!=null) {
如果(!parent.add(sr)) {
返回Sr;
}
}
未来?f;
尝试{
if (delayTime=0) {
f=执行者。提交((可调用对象)Sr);
}否则{
f=执行者。schedule((可调用对象)Sr,delayTime,unit);
}
高级设置未来(f)和:
} catch(RejectedExecutionException ex){
如果(家长!=null) {
父母。移除(Sr);
}
rxjavaplugins。出错时(例如);
}
返回Sr;
}
}
可以看到,这就调了原始的javaAPI来进行线程池操作。
然后最后一环在子线程调用源.订阅(父)方法,然后回调刚开始创建的可观察的创建的subscribeActual(),既:
受保护的void subscribeActual(Observer?超级测试观察者){
createemittert parent=new createemittert(观察者);
观察者网(父);
尝试{
源.订阅(父);
} catch (Throwable ex) {
例外情况。throwif fatal(ex);
父母。出错时(例如);
}
}
进行消息的订阅绑定。
当我们在调用emitter.onNext(内容)时,是在超正析象管线程里的,那回调的onNext()又是什么时候切换的?那就是前面为了整个流程流畅性没讲的在观察()里的观察者是执行观察者的线程的过程。
observer类扩展了基本队列处置
实现观察,可运行{
//省略代码.
观察者观察器(观察者?超级T实际,调度程序Worker worker,boolean delayError,int bufferSize) {
this.actual=实际;
工人=工人
这个。延迟误差=延迟误差;
这个。缓冲区大小=缓冲区大小;
}
@覆盖
公共取消订阅(一次性s) {
如果(一次性助手。验证(这个。s,s)) {
这. s=s
一次性队列的实例){
@SuppressWarnings(未选中)
QueueDisposableT qd=(QueueDisposableT)s;
int m=qd。请求融合(队列可任意处理.任何|队列一次性。边界);
if (m==QueueDisposable .同步){
源模式=m;
队列=qd
完成=真
实际。关于订阅(本);
调度();
返回;
}
if (m==QueueDisposable .异步){
源模式=m;
队列=qd
实际。关于订阅(本);
返回;
}
}
queue=new spsclinkdarrayqueuet(缓冲区大小);
实际。关于订阅(本);
}
}
@覆盖
公共void onNext(T t) {
如果(完成){
返回;
}
if (sourceMode!=一次性队列.异步){
排队。报价(t);
}
调度();
}
无效计划(){
if (getAndIncrement()==0) {
工人。附表(本);
}
}
//省略代码.
}
当调用emitter.onNext(内容)方法,会调用上面的onNext()方法,然后在这个方法里会把数据压入一个队列,然后执行worker.schedule(this)方法,工作是什么呢,还记得AndroidSchedulers.mainThread()吗,这个对应这个处理器调度程序这个类,所以createWorker()对应着:
私有静态最终类主要持有者{
静态最终调度程序默认=新处理程序调度程序(新处理程序(looper。getmainlooper()));
}
公共工作者createWorker() {
返回新的搬运工人(处理程序);
}
私有静态最终类处理程序工人扩展工人{
私有最终处理程序处理者;
私有易失布尔值已释放;
搬运工人(处理程序处理程序){
this.handler=处理程序
}
@覆盖
公共一次性计划(可运行运行、长期延迟、时间单位单位){
if (run==null)抛出新的NullPointerException( run==null );
if(单位==空)抛出new NullPointerException( unit==null );
如果(已处置){
返回一次性用品。disposed();
}
run=rxjavaplugins。按时(运行);
scheduled runnable scheduled=new scheduled runnable(handler,run);
消息消息=消息。获取(处理程序,已调度);
message.obj=this//用作此工作线程的可运行的的批处理标记。
处理程序。发送消息延迟(消息,单位。tomi llis(delay));
如果(已处置){
handler.removeCallbacks(已调度);
返回一次性用品。disposed();
}
预定返回;
}
}
在下一个()方法里,运用机器人自带的处理者消息机制,通过把方法包裹在消息里,同通过handler.sendMessageDelayed()发送消息,就会在用户界面线程里回调下一个()方法,从而实现从子线程切换到机器人主线程的操作。我们在主线程拿到数据就可以进行各种在主线程的操作了。
总结一下:
可观察的创建一可观察订户b一可观察的为初始化顺序
当调用observable.subscribe(观察者)时的执行顺序
可观察的一可观察订户b一可观察的创建
当发送消息的执行顺序
可观察的创建一可观察订户b一可观察的
以上就是消息订阅和线程切换的源码的所有讲解了。
为了让你们理解更清楚,我仿照RxJava写了大概的消息订阅和线程切换的最基本代码和基本功能,以帮助你们理解
https://github.com/jack921/RxJava2Demo
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。