<menu id="yymgu"></menu>
  • <menu id="yymgu"><menu id="yymgu"></menu></menu>
    <menu id="yymgu"></menu>
  • <xmp id="yymgu"><tt id="yymgu"></tt>
    鍍金池/ 教程/ Android/ Incrementally Agerifying legacy code
    Custom observables
    Compiled functions
    Reactive programming
    Reservoirs and parallelism
    Incrementally Agerifying legacy code
    Observables and updatables
    Compiled repositories
    Repositories

    Incrementally Agerifying legacy code

    Agera引入的代碼風格也許適合從零開始的新建app項目。這篇包括一些提示,來幫助想在遺留代碼中使用Agera的開發者,如此往下做。

    Upgrading legacy observer pattern

    --升級原有觀察者模式

    觀察者模式有很多種實現方式,但不是所有的都可以通過簡單的放入遷入到Agera中。下面是一個例子:演示將一個監聽(listenable)類添加到Observable接口的一種方法。

    MyListenable類可以增加(addListener)和刪除(removeListener)Listener,作為額外完整的演示,它繼承了SomeBaseClass。

    該實例使用UpdateDispatcher來解決Java的單繼承約束,使用一個內部類(Bridge)來做橋接, 保持其完整的原始API,同時也使Agera可見。

    public final class MyListenable extends SomeBaseClass implements Observable {
    
      private final UpdateDispatcher updateDispatcher;
    
      public MyListenable() {
        // Original constructor code here...
        updateDispatcher = Observables.updateDispatcher(new Bridge());
      }
    
      // Original class body here... including:
      public void addListener(Listener listener) { … }
      public void removeListener(Listener listener) { … }
    
      @Override
      public void addUpdatable(Updatable updatable) {
        updateDispatcher.addUpdatable(updatable);
      }
    
      @Override
      public void removeUpdatable(Updatable updatable) {
        updateDispatcher.removeUpdatable(updatable);
      }
    
      private final class Bridge implements ActivationHandler, Listener {
        @Override
        public void observableActivated(UpdateDispatcher caller) {
          addListener(this);
        }
    
        @Override
        public void observableDeactivated(UpdateDispatcher caller) {
          removeListener(this);
        }
    
        @Override
        public void onEvent() { // Listener implementation
          updateDispatcher.update();
        }
      }
    }

    Exposing synchronous operations as repositories

    --揭秘repository的同步操作

    Java本質是一種同步語言,如:在Java中最低級別的操作都是同步方法。 當操作可能會花一些時間才能返回(耗時操作),這種方法通常稱為阻塞方法,而且禁止開發者在主線程(UI Thread)調用。

    假設app的UI需要從阻塞的方法獲得數據。Agera可以很容易的通過后臺線程完成調用,然后UI可以在主線程中接收數據。首先,這個阻塞操作需要封裝成一個Agera操作,像這樣:

    public class NetworkCallingSupplier implements Supplier<Result<ResponseBlob>> {
      private final RequestBlob request = …;
    
      @Override
      public Result<ResponseBlob> get() {
        try {
           ResponseBlob blob = networkStack.execute(request); // blocking call
           return Result.success(blob);
        } catch (Throwable e) {
           return Result.failure(e);
        }
      }
    }
    
    Supplier<Result<ResponseBlob>> networkCall = new NetworkCallingSupplier();
    
    Repository<Result<ResponseBlob>> responseRepository =
        Repositories.repositoryWithInitialValue(Result.<ResponseBlob>absent())
            .observe() // no event source; works on activation
            .onUpdatesPerLoop() // but this line is still needed to compile
            .goTo(networkingExecutor)
            .thenGetFrom(networkCall)
            .compile();

    上面的代碼段假定了,在Repository.compile()之前這個請求是已知且永遠不變的。

    這個很容易升級成為一個動態請求,甚至在repository同樣的激活周期期間。

    要可以修改請求,簡單的方式是使用MutableRepository。 此外,為了在第一次請求為完成之前就可以提供數據,可以在Result中一個提供初始值:absent()。

    MutableRepository這種用法類似于是一個可變的變量(可為null),故命名為requestVariable。

    // MutableRepository<RequestBlob> requestVariable =
    //     mutableRepository(firstRequest);
    // OR:
    MutableRepository<Result<RequestBlob>> requestVariable =
        mutableRepository(Result.<RequestBlob>absent());

    然后, 不是在supplier中封裝阻塞方法,使用function實現動態請求:

    public class NetworkCallingFunction
        implements Function<RequestBlob, Result<ResponseBlob>> {
      @Override
      public Result<ResponseBlob> apply(RequestBlob request) {
        try {
           ResponseBlob blob = networkStack.execute(request);
           return Result.success(blob);
        } catch (Throwable e) {
           return Result.failure(e);
        }
      }
    }
    
    Function<RequestBlob, Result<ResponseBlob>> networkCallingFunction =
        new NetworkCallingFunction();

    升級后的repository可以像這樣compiled:

    Result<ResponseBlob> noResponse = Result.absent();
    Function<Throwable, Result<ResponseBlob>> withNoResponse =
        Functions.staticFunction(noResponse);
    Repository<Result<ResponseBlob>> responseRepository =
        Repositories.repositoryWithInitialValue(noResponse)
            .observe(requestVariable)
            .onUpdatesPerLoop()
            // .getFrom(requestVariable) if it does not supply Result, OR:
            .attemptGetFrom(requestVariable).orEnd(withNoResponse)
            .goTo(networkingExecutor)
            .thenTransform(networkCallingFunction)
            .compile();

    這部分代碼段還演示了一點:通過給操作一個特殊的名字,讓repository的編譯表達式更易讀。

    Wrapping asynchronous calls in repositories

    --repository的異步調用封裝

    現在的很多Library都有異步API和內置的線程,但是客戶端不能控制或禁用。

    app中有這樣的Library的話,引入Agera將是一個具有挑戰性的工作。 一個直接的辦法就是找到Library中同步選擇的點,采用[[如上段所述|Incrementally-Agerifying-legacy-code#exposing-synchronous-operations-as-repositories]]方法。

    另一個方式(反模式):切換后臺線程執行異步調用并等待結果,然后同步拿結果。上面方式不可行時,這一節討論一個合適的解決方法。

    異步調用的一個循環模式是請求-響應 結構。下面的示例假定這樣結構:未完成的工作可以被取消,但是不指定回調的線程。

    interface AsyncOperator<P, R> {
      Cancellable request(P param, Callback<R> callback);
    }
    
    interface Callback<R> {
      void onResponse(R response); // Can be called from any thread
    }
    
    interface Cancellable {
      void cancel();
    }

    下面repository例子,使用AsyncOperator提供數據, 完成響應式請求(一個抽象的supplier類)。

    這段代碼假定AsyncOperator已經有足夠的緩存,因此重復的請求不會影響性能。

    public class AsyncOperatorRepository<P, R> extends BaseObservable
        implements Repository<Result<R>>, Callback<R> {
    
      private final AsyncOperator<P, R> asyncOperator;
      private final Supplier<P> paramSupplier;
    
      private Result<R> result;
      private Cancellable cancellable;
    
      public AsyncOperatorRepository(AsyncOperator<P, R> asyncOperator,
          Supplier<P> paramSupplier) {
        this.asyncOperator = asyncOperator;
        this.paramSupplier = paramSupplier;
        this.result = Result.absent();
      }
    
      @Override
      protected synchronized void observableActivated() {
        cancellable = asyncOperator.request(paramSupplier.get(), this);
      }
    
      @Override
      protected synchronized void observableDeactivated() {
        if (cancellable != null) {
          cancellable.cancel();
          cancellable = null;
        }
      }
    
      @Override
      public synchronized void onResponse(R response) {
        cancellable = null;
        result = Result.absentIfNull(response);
        dispatchUpdate();
      }
    
      @Override
      public synchronized Result<R> get() {
        return result;
      }
    }

    這個類可以很容易地升級到可以修改請求參數,而這個過程就類似于前面的討論:讓repository提供請求參數,并讓AsyncOperatorRepository觀察請求參數變化。

    在激活期間,觀察請求參數的變化,取消任何正在進行的請求,并發出新的請求,如下所示:

    public class AsyncOperatorRepository<P, R> extends BaseObservable
        implements Repository<Result<R>>, Callback<R>, Updatable {
    
      private final AsyncOperator<P, R> asyncOperator;
      private final Repository<P> paramRepository;
    
      private Result<R> result;
      private Cancellable cancellable;
    
      public AsyncOperatorRepository(AsyncOperator<P, R> asyncOperator,
          Repository<P> paramRepository) {
        this.asyncOperator = asyncOperator;
        this.paramRepository = paramRepository;
        this.result = Result.absent();
      }
    
      @Override
      protected void observableActivated() {
        paramRepository.addUpdatable(this);
        update();
      }
    
      @Override
      protected synchronized void observableDeactivated() {
        paramRepository.removeUpdatable(this);
        cancelOngoingRequestLocked();
      }
    
      @Override
      public synchronized void update() {
        cancelOngoingRequestLocked();
        // Adapt accordingly if paramRepository supplies a Result.
        cancellable = asyncOperator.request(paramRepository.get(), this);
      }
    
      private void cancelOngoingRequestLocked() {
        if (cancellable != null) {
          cancellable.cancel();
          cancellable = null;
        }
      }
    
      @Override
      public synchronized void onResponse(R response) {
        cancellable = null;
        result = Result.absentIfNull(response);
        dispatchUpdate();
      }
    
      // Similar process for fallible requests (typically with an
      // onError(Throwable) callback): wrap the failure in a Result and
      // dispatchUpdate().
    
      @Override
      public synchronized Result<R> get() {
        return result;
      }
    }