RxJava源码学习

文章将会同步到个人微信公众号:Android部落格

1 RxJava使用

RxJava是响应式数据流驱动框架,Retrokit提供了对RxJava的支持。

1.1 接入

在app module的build.gradle中添加依赖:

implementation "com.squareup.retrofit2:adapter-rxjava2:2.3.0"
implementation "io.reactivex.rxjava2:rxjava:2.0.6"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'

接下来在Retrofit构建的时候添加RxJava CallFactory:

Retrofit retrofit = new Retrofit.Builder()
        .baseUrl("https://api.github.com/")
        .callbackExecutor(Executors.newSingleThreadExecutor())
        .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
        .addConverterFactory(GsonConverterFactory.create())
        .build();

CallAdapter.Factory是典型的工厂模式,这个类的一般实现模式是:

public interface CallAdapter<R, T> {
    Type responseType();
    T adapt(Call<R> call);
    abstract class Factory {
        public abstract @Nullable CallAdapter<?, ?> get(
        Type returnType, Annotation[] annotations, Retrofit retrofit);
    }
}

先调用get方法返回CallAdapter,然后调用adapt方法获取请求Call执行请求。

在正式添加RxJava代码之前需要先看看RxJava的源码。

可以从官方的文档中获取相关知识。

http://reactivex.io/RxJava/3.x/javadoc/overview-summary.html

2 基本类

2.1 io.reactivex.rxjava3.core.Flowable

0..N flows, supporting Reactive-Streams and backpressure

0到多个流,支持响应流和背压。

http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html

The Flowable class that implements the Reactive Streams Publisher Pattern and offers factory methods, intermediate operators and the ability to consume reactive dataflows.

Flowable类实现了响应流发布模式,并提供工厂方法,中间运算符实现,以及有能力消费响应数据流。

他实现了Publisher接口:

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

public interface Subscriber<T> {
    /**
     * Invoked after calling {@link Publisher#subscribe(Subscriber)}.
     * <p>
     * No data will start flowing until {@link Subscription#request(long)} is invoked.
     * <p>
     * It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(long)} whenever more data is wanted.
     * <p>
     * The {@link Publisher} will send notifications only in response to {@link Subscription#request(long)}.
     * 
     * @param s
     *            {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
     */
    public void onSubscribe(Subscription s);

    /**
     * Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(long)}.
     * 
     * @param t the element signaled
     */
    public void onNext(T t);

    /**
     * Failed terminal state.
     * <p>
     * No further events will be sent even if {@link Subscription#request(long)} is invoked again.
     *
     * @param t the throwable signaled
     */
    public void onError(Throwable t);

    /**
     * Successful terminal state.
     * <p>
     * No further events will be sent even if {@link Subscription#request(long)} is invoked again.
     */
    public void onComplete();
}

可以认为Flowable是一个发布者,数据流从这里开始发起请求。

subscribe是一个工厂方法,可以被调用多次,每次调用都会产生一次订阅,每次订阅对应一个Subscriber(订阅者)。每个订阅者每次只能订阅一个发布者,就是Publisher的各种实现类。

如果发布者拒绝订阅或因为其他原因导致失败,会回调一个错误信号给订阅者(Subscriber.onError)。

图片来自http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html

2.2 io.reactivex.rxjava3.core.Observable

0..N flows, no backpressure。

0到多个数据流,没有背压。

The Observable class is the non-backpressured, optionally multi-valued base reactive class that offers factory methods, intermediate operators and the ability to consume synchronous and/or asynchronous reactive dataflows.

Many operators in the class accept ObservableSource(s), the base reactive interface for such non-backpressured flows, which Observable itself implements as well.

Observable类是没有背压,可选多值的响应类,这个类能够提供工厂方法,中间运算符实现以及有能力消费同步和/或异步响应数据流。

该类中的许多运算符都实现ObservableSource(s),它是此类非背压流的基本响应接口,Observable本身也实现了该接口。

public interface ObservableSource<T> {
    void subscribe(@NonNull Observer<? super T> observer);
}

public interface Observer<T> {

    /**
     * Provides the Observer with the means of cancelling (disposing) the
     * connection (channel) with the Observable in both
     * synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
     * @param d the Disposable instance whose {@link Disposable#dispose()} can
     * be called anytime to cancel the connection
     * @since 2.0
     */
    void onSubscribe(@NonNull Disposable d);

    /**
     * Provides the Observer with a new item to observe.
     * <p>
     * The {@link Observable} may call this method 0 or more times.
     * <p>
     * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
     * {@link #onError}.
     *
     * @param t
     *          the item emitted by the Observable
     */
    void onNext(@NonNull T t);

    /**
     * Notifies the Observer that the {@link Observable} has experienced an error condition.
     * <p>
     * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
     * {@link #onComplete}.
     *
     * @param e
     *          the exception encountered by the Observable
     */
    void onError(@NonNull Throwable e);

    /**
     * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
     * <p>
     * The {@link Observable} will not call this method if it calls {@link #onError}.
     */
    void onComplete();

}

图片来自http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Observable.html

2.3 io.reactivex.rxjava3.core.Single

a flow of exactly 1 item or an error。

一个数据流正好对应一个item返回或一个错误回调。

The Single class implements the Reactive Pattern for a single value response.

Single behaves similarly to Observable except that it can only emit either a single successful value or an error (there is no onComplete notification as there is for an Observable).

The Single class implements the SingleSource base interface and the default consumer type it interacts with is the SingleObserver via the subscribe(SingleObserver) method.

Single类实现了单个数据返回的响应模式。

Single类的行为类似Observable,除了他只能发送一个成功的数据或一个错误(也没有一个onComplete回调通知,这在Observable中是存在的)。

Single类实现了SingleSource基本接口,与之交互的默认消费者类型是SingleObserver,通过SingleObserver的subscribe方法产生关联。

public interface SingleSource<T> {

    /**
     * Subscribes the given SingleObserver to this SingleSource instance.
     * @param observer the SingleObserver, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(@NonNull SingleObserver<? super T> observer);
}

public interface SingleObserver<T> {

    /**
     * Provides the SingleObserver with the means of cancelling (disposing) the
     * connection (channel) with the Single in both
     * synchronous (from within {@code onSubscribe(Disposable)} itself) and asynchronous manner.
     * @param d the Disposable instance whose {@link Disposable#dispose()} can
     * be called anytime to cancel the connection
     * @since 2.0
     */
    void onSubscribe(@NonNull Disposable d);

    /**
     * Notifies the SingleObserver with a single item and that the {@link Single} has finished sending
     * push-based notifications.
     * <p>
     * The {@link Single} will not call this method if it calls {@link #onError}.
     *
     * @param t
     *          the item emitted by the Single
     */
    void onSuccess(@NonNull T t);

    /**
     * Notifies the SingleObserver that the {@link Single} has experienced an error condition.
     * <p>
     * If the {@link Single} calls this method, it will not thereafter call {@link #onSuccess}.
     *
     * @param e
     *          the exception encountered by the Single
     */
    void onError(@NonNull Throwable e);
}

图片来自:http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Single.html

2.4 io.reactivex.rxjava3.core.Completable

a flow without items but only a completion or error signal。

没有数据回调返回的数据流,但是只有完成或错误回调。

The Completable class represents a deferred computation without any value but only indication for completion or exception.

Completable behaves similarly to Observable except that it can only emit either a completion or error signal (there is no onNext or onSuccess as with the other reactive types).

The Completable class implements the CompletableSource base interface and the default consumer type it interacts with is the CompletableObserver via the subscribe(CompletableObserver) method.

Completable类意味着一个延迟的没有值返回的计算,但是只有完成或异常返回回调。

Completable的表现与Observable类似,除了他只能传递一个完成或错误异常。

Completable类实现了CompletableSource基本接口,默认的消费者类型是CompletableObserver,通过CompletableObserver.subscribe方法产生交互。

public interface CompletableSource {

    /**
     * Subscribes the given {@link CompletableObserver} to this {@code CompletableSource} instance.
     * @param observer the {@code CompletableObserver}, not {@code null}
     * @throws NullPointerException if {@code observer} is {@code null}
     */
    void subscribe(@NonNull CompletableObserver observer);
}

public interface CompletableObserver {
    /**
     * Called once by the {@link Completable} to set a {@link Disposable} on this instance which
     * then can be used to cancel the subscription at any time.
     * @param d the {@code Disposable} instance to call dispose on for cancellation, not null
     */
    void onSubscribe(@NonNull Disposable d);

    /**
     * Called once the deferred computation completes normally.
     */
    void onComplete();

    /**
     * Called once if the deferred computation 'throws' an exception.
     * @param e the exception, not {@code null}.
     */
    void onError(@NonNull Throwable e);
}

图片来自:http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Completable.html

2.5 io.reactivex.rxjava3.core.Maybe

a flow with no items, exactly one item or an error.

一个没有反馈项的数据流,准确的说应该是反馈一个item或错误。

The Maybe class represents a deferred computation and emission of a single value, no value at all or an exception.

The Maybe class implements the MaybeSource base interface and the default consumer type it interacts with is the MaybeObserver via the subscribe(MaybeObserver) method.

Maybe类表示单个值(完全没有值或异常)的延迟计算和发射。

Maybe类实现了基类借口MaybeSource,与之交互的默认消费者类型是MaybeObserver,通过MaybeObserver.subscribe()实现交互。

public interface MaybeSource<T> {

    /**
     * Subscribes the given MaybeObserver to this MaybeSource instance.
     * @param observer the MaybeObserver, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(@NonNull MaybeObserver<? super T> observer);
}

public interface MaybeObserver<T> {

    /**
     * Provides the MaybeObserver with the means of cancelling (disposing) the
     * connection (channel) with the Maybe in both
     * synchronous (from within {@code onSubscribe(Disposable)} itself) and asynchronous manner.
     * @param d the Disposable instance whose {@link Disposable#dispose()} can
     * be called anytime to cancel the connection
     */
    void onSubscribe(@NonNull Disposable d);

    /**
     * Notifies the MaybeObserver with one item and that the {@link Maybe} has finished sending
     * push-based notifications.
     * <p>
     * The {@link Maybe} will not call this method if it calls {@link #onError}.
     *
     * @param t
     *          the item emitted by the Maybe
     */
    void onSuccess(@NonNull T t);

    /**
     * Notifies the MaybeObserver that the {@link Maybe} has experienced an error condition.
     * <p>
     * If the {@link Maybe} calls this method, it will not thereafter call {@link #onSuccess}.
     *
     * @param e
     *          the exception encountered by the Maybe
     */
    void onError(@NonNull Throwable e);

    /**
     * Called once the deferred computation completes normally.
     */
    void onComplete();
}

图片来自http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Maybe.html

3 RxJava中的线程

3.1 IO

在Schedulers类中通过静态调用获取IO线程,

public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}

io方法中,有一个IO静态变量,在static块中被初始化:

static {
    SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

    COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

    IO = RxJavaPlugins.initIoScheduler(new IOTask());

    TRAMPOLINE = TrampolineScheduler.instance();

    NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}

可以看到这里不止IO线程,还有NEW_THREAD等线程,这里以IO为例说明。

Schedulers

IO = RxJavaPlugins.initIoScheduler(new IOTask());

static final class IOTask implements Supplier<Scheduler> {
    @Override
    public Scheduler get() {
        return IoHolder.DEFAULT;
    }
}

static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}

IoScheduler

public final class IoScheduler extends Scheduler {}

public IoScheduler() {
    this(WORKER_THREAD_FACTORY);
}

public IoScheduler(ThreadFactory threadFactory) {
    this.threadFactory = threadFactory;
    this.pool = new AtomicReference<>(NONE);
    start();
}

    @Override
public void start() {
    CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
    if (!pool.compareAndSet(NONE, update)) {
        update.shutdown();
    }
}
    
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
    this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
    this.expiringWorkerQueue = new ConcurrentLinkedQueue<>();
    this.allWorkers = new CompositeDisposable();
    this.threadFactory = threadFactory;
    
    ScheduledExecutorService evictor = null;
    Future<?> task = null;
    if (unit != null) {
        evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
        task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
    }
    evictorService = evictor;
    evictorTask = task;
}

EVICTOR_THREAD_FACTORY是一个RxThreadFactory对象。

RxThreadFactory

public final class RxThreadFactory extends AtomicLong implements ThreadFactory {
    @Override
    public Thread newThread(@NonNull Runnable r) {
        Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
        t.setPriority(priority);
        t.setDaemon(true);
        return t;
    }
    }
}

可以看到IO其实对应的是IoScheduler,中间包含了一个线程池执行对应的任务。

3.2 .mainThread

一般调用方法是:AndroidSchedulers.mainThread()。

AndroidSchedulers

public static Scheduler mainThread() {
    return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
    
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
    new Callable<Scheduler>() {
        @Override public Scheduler call() throws Exception {
            return MainHolder.DEFAULT;
        }
    });
            
private static final class MainHolder {
    static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}

主线程的切换使用Handler MainLooper。

4 整体把握

Observable<String> observable = service.listRepos1("cg22983677");
observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        
    }

    @Override
    public void onNext(String s) {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

4.1 subscribeOn

这两个方法创建了数据流被执行的环境。

Observable

    public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
    }

ObservableSubscribeOn

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> downstream;

        final AtomicReference<Disposable> upstream;

        SubscribeOnObserver(Observer<? super T> downstream) {
            this.downstream = downstream;
            this.upstream = new AtomicReference<>();
        }

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(this.upstream, d);
        }

        @Override
        public void onNext(T t) {
            downstream.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            downstream.onError(t);
        }

        @Override
        public void onComplete() {
            downstream.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(upstream);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
}

4.2 observeOn

Observable

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

ObservableObserveOn

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
        }
    }
    
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
        final Observer<? super T> downstream;
        final Scheduler.Worker worker;
                SimpleQueue<T> queue;

        Disposable upstream;
        
        @Override
        public void onSubscribe(Disposable d) {
        }
        
        @Override
        public void onNext(T t) {
            queue.offer(t);
            schedule();
        }
        
        @Override
        public void onError(Throwable t) {
            done = true;
            schedule();
        }
        
        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            schedule();
        }
        
        @Override
        public void dispose() {
            if (!disposed) {
                disposed = true;
                upstream.dispose();
                worker.dispose();
                if (!outputFused && getAndIncrement() == 0) {
                    queue.clear();
                }
            }
        }
        
        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
        
        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
    }
}

4.3 subscribe

一句话,在这里启动数据流。

5 流程图

Observable<String> observable = service.listRepos1("cg22983677");
observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {});

6 总结

第5节中的流程图是Observable相关的,其实另外四种基础类都是一样的流程。

每个基础类中都有不同的操作符,每一个操作符就是一个Observable,意思就是可被观察的,内部又实现了观察者,用于接收上一步的数据流,处理完毕之后,将数据流往下传递,最终到了消费者。

处理数据的过程所处的线程,依赖于最初设置的Scheduler,他里面的Worker是具体实施者,并在最初设定的线程中运行。