1. 创建线程池和线程管理策略分析
1 2 3 4 5
| Schedulers.io() Schedulers.computation() Schedulers.newThread() AndroidSchedulers.mainThread()
|
当我们调用以上方法中的任意一个,都会调到Schedulers类中,Schedulers使用策略模式封装了所有线程切换策略(因此后面以io()分析)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| static { SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask()); }
static final class IOTask implements Callable<Scheduler> { @Override public Scheduler call() throws Exception { return IoHolder.DEFAULT; } }
static final class IoHolder { static final Scheduler DEFAULT = new IoScheduler(); }
public IoScheduler(ThreadFactory threadFactory) { this.threadFactory = threadFactory; this.pool = new AtomicReference<CachedWorkerPool>(NONE); start(); } public void start() { CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory); if (!pool.compareAndSet(NONE, update)) { update.shutdown(); } }
static final class CachedWorkerPool implements Runnable { private final long keepAliveTime; private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue; final CompositeDisposable allWorkers; private final ScheduledExecutorService evictorService; private final Future<?> evictorTask; private final ThreadFactory threadFactory;
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { ...... if (unit != null) { evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY); task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS); } ...... } ThreadWorker get() { ..... while (!expiringWorkerQueue.isEmpty()) { ThreadWorker threadWorker = expiringWorkerQueue.poll(); if (threadWorker != null) { return threadWorker; } }
ThreadWorker w = new ThreadWorker(threadFactory); allWorkers.add(w); return w; } ...... }
|
用一张图可能说明得比较清楚一些。
2. Rxjava上游任务在子线程中执行分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("JackOu"); } }) .subscribeOn(Schedulers.io()) .subscribe(new Observer<String>() { ......
@Override public void onNext(String s) {
} ...... });
|
从上面使用过程的代码看下面的图,分析Rxjava封装任务和抛任务到线程池的过程。
当我们一订阅(调用subscribe(Observer)方法)的时候,Rxjava将会把上游需要执行的任务和下游的观察者经过层层包裹,包裹好之后,就会得到一个Scheduler.Worker任务对象。当调用发射器的onNext的方式的时候,结合第一小节的图片,ObservableSubscribeOn就会将任务抛到线程池执行,在子线程中执行任务并且返回,从而完成线程切换功能。
3. Rxjava下游任务在主线程中执行分析
3.1 创建AndroidSchedulers.mainThread的过程
如第一节Schedulers的创建流程一样,当调用AndroidSchedulers.mainThread()之后,最终会创建HandlerScheduler。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| public final class AndroidSchedulers {
private static final class MainHolder { static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper())); }
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler( new Callable<Scheduler>() { @Override public Scheduler call() throws Exception { return MainHolder.DEFAULT; } });
public static Scheduler mainThread() { return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD); } }
final class HandlerScheduler extends Scheduler { private final Handler handler;
HandlerScheduler(Handler handler) { this.handler = handler; }
@Override public Worker createWorker() { return new HandlerWorker(handler); } }
private static final class HandlerWorker extends Worker { private final Handler handler; HandlerWorker(Handler handler) { this.handler = handler; }
@Override public Disposable schedule(Runnable run, long delay, TimeUnit unit) { ...... run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); Message message = Message.obtain(handler, scheduled); message.obj = this; handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
...... return scheduled; } }
|
其实真正将任务放在主线程中执行就是上面三个步骤,但是Rxjava增加了很多其他功能,例如解除订阅(将任务包装在Disposable中),增加hook功能(在任务外面在包装了ScheduledRunnable)等等,其最内层的本质就是我们需要执行的任务。细化的包裹情况如下图:
4.多个线程切换,以哪个为准
如下面代码,我们作死得切换线程,那么哪些线程会最终执行我们的任务呢
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("JackOu"); } }) .subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.newThread()) .subscribeOn(Schedulers.computation()) .observeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<String>() { ...... @Override public void onNext(String s) {
} ...... });
|
我们可以从第二节和第三节看出,当我们每调用一次subscribeOn方法,上游就会多包装一层Scheduler,在订阅之后,解包裹的时候越靠近“待执行任务”的subscribeOn越后解包,所以最靠近任务的subscribeOn调用会是最终被执行,也就是最终被执行的线程。
因此我们可以总结得到:
总结一: 在多次调用线程切换的时候,第一次调用subscribeOn的线程切换会是最后执行任务的线程;最后调用observeOn切换的线程会是最后执行的线程。
总结二:从调用关系来看,越靠近上游的线程切换,将是最终执行任务的线程;越靠近下游的线程切换,将是最终执行任务的线程。
版权声明: 此文章版权归Jack Ou所有,如有转载,请註明来自原作者