作者都是各自领域经过审查的专家,并撰写他们有经验的主题. 我们所有的内容都经过同行评审,并由同一领域的Toptal专家验证.
克里斯托弗Arriola
验证专家 在工程

Christopher是一名移动工程师,拥有7年以上创建原生安卓和iOS应用程序的经验.

专业知识

以前在

区块链.com
分享

并发性和异步性是移动编程所固有的.

通过命令式编程处理并发性, 在安卓上编程通常涉及到什么, 会引起很多问题吗. 使用响应式编程 RxJava,您可以通过提供更简洁、更不容易出错的解决方案来避免潜在的并发问题.

除了简化并发, 异步任务, RxJava还提供了执行函数式转换操作的能力, 结合, 并汇总可观测到的排放,直到我们达到我们想要的结果.

通过结合RxJava的响应式范式和函数式风格的操作, 我们可以用响应式的方式对大范围的并发构造进行建模, 即使在安卓的无反应世界中也是如此. 在本文中,您将学习如何做到这一点. 您还将学习如何逐步将RxJava应用到现有项目中.

如果您是RxJava的新手,我建议您阅读这篇文章 在这里 讲了一些RxJava的基础知识.

将非反应性连接到反应性世界

将RxJava作为库之一添加到项目中的挑战之一是,它从根本上改变了您对代码进行推理的方式.

RxJava要求您考虑数据是被推的,而不是被拉的. 虽然概念本身很简单, 更改基于拉式范式的完整代码库可能有点令人生畏. 虽然一致性总是理想的, 您可能并不总是有特权一次在整个代码库中进行这种转换, 因此,可能需要更多的渐进式方法.

考虑下面的代码:

/**
* @返回拥有博客的用户列表
*/
public List<用户> get用户sWith博客s() {
   final List<用户> all用户s = 用户Cache.getAll用户s ();
   final List<用户> 用户sWith博客s = new ArrayList<>();
   for (用户 用户: all用户s) {
       如果(用户.博客 !=零 && !用户.博客.isEmpty ()) {
           用户sWith博客s.添加(用户);
       }
   }
   集合.sort(用户sWith博客s, (用户1, 用户2) -> 用户1.名字.compareTo (用户2.名字));
   返回用户sWith博客s;
}

这个函数获取一个列表 用户 缓存中的对象, 根据用户是否有博客来过滤每一个, 按用户名对它们进行排序, 最后将它们返回给调用者. 看看这个片段, we notice that many of these operations can take advantage of RxJava 操作符s; e.g., filter ()排序().

重写这段代码会得到:

/**
* @返回拥有博客的用户列表
*/
public 可观测的<用户> get用户sWith博客s() {
   返回可观察到的.from可迭代的 (用户Cache.getAll用户s ())
                    .filter(用户 -> 用户.博客 !=零 && !用户.博客.isEmpty ())
                    .sorted((用户1, 用户2) -> 用户1.名字.compareTo (用户2.名字));
}

函数的第一行转换 List<用户> 返回的 用户Cache.getAll用户s () 到一个 可观测的<用户> 通过 from可迭代的 (). 这是使我们的代码响应的第一步. 现在我们在做手术 可观测的,这使我们能够执行任何 可观测的 RxJava工具包中的操作符 filter ()排序() 在这种情况下.

关于这个变化,还有其他几点需要注意.

首先,方法签名不再相同. This may not be a huge deal if this 方法 call is only 使用 in a few places 和 it’s easy to propagate the changes up to other areas of the stack; however, 如果它破坏了依赖此方法的客户端, 这是有问题的,应该恢复方法签名.

第二,RxJava在设计时就考虑到了惰性. 也就是说,当没有订阅者时,不应该执行长操作 可观测的. 经过这个修改,这个假设不再正确,因为 用户Cache.getAll用户s () 甚至在有订阅者之前调用.

离开反应式世界

解决我们变更的第一个问题, 我们可以使用任何可用的阻塞操作符 可观测的blockingFirst ()blockingNext (). 从本质上讲,这两个操作符都将阻塞,直到一个项被发送到下游: blockingFirst () 将返回发出的第一个元素并结束,而 blockingNext () 将返回一个 可迭代的 它允许您对底层数据执行for-each循环(循环中的每次迭代都会阻塞).

使用阻塞操作的副作用,需要注意, 虽然, 异常是在调用线程上抛出的,而不是传递给观察者的 onError () 方法.

使用阻塞操作符将方法签名更改回 List<用户>,我们的代码片段现在看起来像这样:

/**
* @返回拥有博客的用户列表
*/
public List<用户> get用户sWith博客s() {
   返回可观察到的.from可迭代的 (用户Cache.getAll用户s ())
           .filter(用户 -> 用户.博客 !=零 && !用户.博客.isEmpty ())
           .sorted((用户1, 用户2) -> 用户1.名字.compareTo (用户2.的名字)
           .toList ()
           .blockingGet ();
}

在调用阻塞操作符(i.e. blockingGet ()),我们首先需要链接聚合操作符 toList () 这样流就从 可观测的<用户> 到一个 > (a 是特殊类型的吗 可观测的 只发出一个值 onSuccess (),或通过错误 onError ()).

之后,我们可以调用阻塞操作符 blockingGet () 打开 并返回a List<用户>.

尽管RxJava支持这个, 应该尽可能地避免这种情况,因为这不是习惯的响应式编程. 但在绝对必要的时候, 阻塞操作符是走出响应式世界的一种很好的初始方式.

懒惰的方法

如前所述,RxJava在设计时就考虑了惰性. 也就是说,长时间运行的操作应该尽可能地延迟.e.,直到在对象上调用订阅为止 可观测的). 为了使我们的解偷懒,我们使用 推迟() 操作符.

推迟() 需要一个 可观测的Source ,它创建一个 可观测的 对于每个订阅的新观察者. 在我们的例子中,我们想要返回 可观测的.from可迭代的 (用户Cache.getAll用户 ()) 每当观察者订阅时.

/**
* @返回拥有博客的用户列表
*/
public 可观测的<用户> get用户sWith博客s() {
   返回可观察到的.defer(() -> 可观测的.from可迭代的 (用户Cache.getAll用户s ()))
                    .filter(用户 -> 用户.博客 !=零 && !用户.博客.isEmpty ())
                    .sorted((用户1, 用户2) -> 用户1.名字.compareTo (用户2.名字));
}

现在,长时间运行的操作被包装在 推迟(),我们可以完全控制应该在哪个线程中运行,只需指定适当的 调度器 in subscribeOn (). 有了这个变化, 我们的代码是完全响应式的,订阅应该只在需要数据的时候发生.

/**
* @返回拥有博客的用户列表
*/
public 可观测的<用户> get用户sWith博客s() {
   返回可观察到的.defer(() -> 可观测的.from可迭代的 (用户Cache.getAll用户s ()))
                    .filter(用户 -> 用户.博客 !=零 && !用户.博客.isEmpty ())
                    .sorted((用户1, 用户2) -> 用户1.名字.compareTo (用户2.的名字)
                    .subscribeOn(调度器.io());
}

延迟计算的另一个非常有用的操作符是 fromCallable () 方法. 不像 推迟(),它期望 可观测的 要在lambda函数中返回,然后将返回的内容“变平” 可观测的, fromCallable () 将调用lambda并返回下游的值吗.

/**
* @返回拥有博客的用户列表
*/
public 可观测的<用户> get用户sWith博客s() {
   final 可观测的> 用户s可观测的 = 可观测的.fromCallable(() -> 用户Cache.getAll用户s ());
   final 可观测的<用户> 用户可观测的 = 用户s可观测的.flatMap(用户s -> 可观测的.from可迭代的(用户));
   返回用户可观测的.filter(用户 -> 用户.博客 !=零 && !用户.博客.isEmpty ())
                        .sorted((用户1, 用户2) -> 用户1.名字.compareTo (用户2.名字));
}

单独的使用 fromCallable () 在列表上返回一个 可观测的>,我们需要使用 flatMap ().

无功-everything

从前面的示例中,我们已经看到可以将任何对象包装在 可观测的 用阻塞操作在非反应态和反应态之间跳转 推迟()/fromCallable (). 使用这些结构,我们就可以开始将安卓应用的部分转换为响应式.

长时间操作

首先考虑使用RxJava的一个好地方是,每当您有一个需要一段时间才能执行的进程时, 如网络呼叫(退房) 以前的文章 例如),磁盘读写等. 下面的例子演示了一个将文本写入文件系统的简单函数:

/**
*将{@code text}写入文件系统.
*
@param context是一个上下文
* @param file名字文件的名称
* @param text要写的文本
* @如果文本写入成功返回true,否则返回false
*/
公共布尔writeTextToFile(Context Context, String file名字, String text) {
   FileOutputStream outputStream;
   尝试{
       outputStream =上下文.openFileOutput(文件名、上下文.MODE_PRIVATE);
       outputStream.写(文本.getBytes ());
       outputStream.close ();
       返回true;
   } catch(异常e) {
       e.printStackTrace ();
       返回错误;
   }
}

调用这个函数时, 我们需要确保它是在一个单独的线程上完成的,因为这个操作是阻塞的. 对调用者施加这样的限制会使开发人员的事情变得复杂,这增加了出现bug的可能性,并可能减慢开发速度.

向函数中添加注释当然有助于避免调用者犯错误, 但这还远远不能做到刀枪不入.

然而,使用RxJava,我们可以很容易地将它包装成 可观测的 并指定 调度器 它应该继续运行. 这种方式, the caller doesn’t need to be concerned at all with invoking the function in a separate thread; the function will take care of this itself.

/**
*将{@code text}写入文件系统.
*
@param context是一个上下文
* @param file名字文件的名称
* @param text要写的文本
* @return一个可观察对象,发出一个布尔值,指示文本是否被成功写入.
*/
public 可观测的 writeTextToFile(Context context, String file名字, String text) {
   返回可观察到的.fromCallable(() -> {
       FileOutputStream outputStream;
       outputStream =上下文.openFileOutput(文件名、上下文.MODE_PRIVATE);
       outputStream.写(文本.getBytes ());
       outputStream.close ();
       返回true;
   }).subscribeOn(调度器.io());
}

使用 fromCallable (),将文本写入文件将延迟到订阅时间.

因为异常是RxJava中的一等对象, 更改的另一个好处是,我们不再需要将操作包装在try/catch块中. 异常将简单地向下游传播,而不是被吞下. 这允许调用者在他/她认为合适的时候处理异常.g. 根据抛出的异常向用户显示错误,等等.).

我们可以执行的另一个优化是返回a 可完备化的 而不是 可观测的. A 可完备化的 本质上是一种特殊类型的 可观测的 -类似于 -仅表示计算是否成功,通过 onComplete (),或失败,通过 onError (). 返回一个 可完备化的 在这种情况下似乎更有意义,因为在an中返回单个true似乎很愚蠢 可观测的 流.

/**
*将{@code text}写入文件系统.
*
@param context是一个上下文
* @param file名字文件的名称
* @param text要写的文本
* @return A 可完备化的
*/
public 可完备化的 writeTextToFile(Context Context, String file名字, String text) {
   返回可完备化的.fromAction(() -> {
       FileOutputStream outputStream;
       outputStream =上下文.openFileOutput(文件名、上下文.MODE_PRIVATE);
       outputStream.写(文本.getBytes ());
       outputStream.close ();
   }).subscribeOn(调度器.io());
}

来完成这个操作,我们使用 fromAction () a的操作 可完备化的 因为我们对返回值不再感兴趣. 如果需要,像一个 可观测的, a 可完备化的 还支持 fromCallable ()推迟() 功能.

替换回调

到目前为止,我们看到的所有示例都输出一个值(i.e.,可以建模为 ),或告诉我们操作是否成功或失败(例如.e.,可以建模为 可完备化的).

如何在应用中转换区域, 虽然, 接收持续的更新或事件(例如位置更新), 查看点击事件, 传感器事件, 等等)?

我们将看两种方法,使用 create () 和使用 主题.

create () 允许我们显式地调用观察者的 onNext () | onComplete () | onError () 方法,当我们从数据源接收更新时. 使用 create (),我们通过一个 可观测的OnSubscribe 它接收到 可观测的发射器 每当观察者订阅时. 使用接收到的发射器, 然后,我们可以执行所有必要的设置调用来开始接收更新,然后调用适当的 发射器 事件.

在位置更新情况下, 我们可以在这个地方注册接收更新,并发出收到的位置更新.

公共类LocationManager {

   /**
    *呼叫接收设备位置更新.
    @return一个发出位置更新的可观测的
    */
   public 可观测的 observeLocation () {
       返回可观察到的.create(发射器 -> {
           //确保下列条件适用,如果不适用,调用发射器的onError ()方法
           // (1) googleApiClient已连接
           //(2)设置位置权限
           final LocationRequest LocationRequest = new LocationRequest()
           locationRequest.setInterval (1000);
           locationRequest.setPriority (LocationRequest.PRIORITY_HIGH_ACCURACY);

           LocationServices.F使用LocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener()) {
               @Override public void onLocationChanged(Location Location) {
                   if (!发射器.isDisposed ()) {
                       发射器.onNext(位置);
                   }
               }
           });
       });
   }
}

内部的函数 create () Call请求位置更新并传入一个回调,该回调在设备位置更改时被调用. 正如我们在这里看到的, 我们本质上替换了回调风格的接口,而在创建的可观测的流中发出接收到的位置(出于教育目的), 我跳过了构造位置请求的一些细节, 如果你想更深入地了解细节,你可以阅读它 在这里).

还有一件事需要注意 create () 是这样的吗? 订阅() ,则提供一个新的发射器. 换句话说, create () 返回 一个寒冷的 可观测的. 这意味着, 在上面的函数中, 我们可能会多次请求位置更新, 这不是我们想要的.

为了解决这个问题,我们希望将该函数更改为返回一个热 可观测的 在…的帮助下 主题.

进入主题

A 主题 扩展一个 可观测的 并实现了 观察者 同时. 当我们想要同时向多个订阅者发出或强制转换同一个事件时,这一点特别有用. 在实现方面,我们希望公开 主题 作为一个 可观测的 给客户,同时把它作为 主题 对于提供者.

公共类LocationManager {

   private 主题 location主题 = Publish主题.create ();
   
   /**
    *当这个LocationManager应该开始监听位置更新时调用这个方法.
    */
   Public void connect () {
       final LocationRequest LocationRequest = new LocationRequest()
       locationRequest.setInterval (1000);
       locationRequest.setPriority (LocationRequest.PRIORITY_HIGH_ACCURACY);

       LocationServices.F使用LocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener()) {
           @Override public void onLocationChanged(Location Location) {
               location主题.onNext(位置);
           }
       });
   }
   
   /**
    *呼叫接收设备位置更新.
    @return一个发出位置更新的可观测的
    */
   public 可观测的 observeLocation () {
       返回location主题;
   }
}

在这个新的实现中,子类型 Publish主题 在事件从订阅时间开始到达时发出事件的方法. 相应的, 如果在已经发出位置更新时执行订阅, 过去的排放将不会被观察员接收到, 只有后续的. 如果不需要这种行为,还有一些其他的 主题 RxJava工具包中的子类型 使用.

此外,我们还创建了一个单独的 connect () 函数,它启动接收位置更新的请求. 的 observeLocation () 我还能做 connect () 调用,但为了清晰/简单,我们从函数中重构了它.

Summary

我们已经研究了一些机制和技术:

  • 推迟() 以及它的变体,将计算的执行延迟到订阅为止
  • 可见 通过生成 create ()
  • 可见 使用 主题
  • 当我们想要离开响应世界时,阻塞x操作

希望, 本文中提供的示例启发了一些关于应用中可以转换为响应式的不同领域的想法. 我们已经讲了很多,如果你有任何问题, 建议, 或者有什么不清楚的, 欢迎在下面留言!

如果您有兴趣了解更多关于RxJava的知识, 我正在写一本深入的书,用安卓的例子解释如何以响应式的方式看待问题. 如果你想收到最新消息,请订阅 在这里.

聘请Toptal这方面的专家.
现在雇佣
克里斯托弗Arriola

克里斯托弗Arriola

验证专家 在工程

伯克利,美国

2016年5月4日成为会员

作者简介

Christopher是一名移动工程师,拥有7年以上创建原生安卓和iOS应用程序的经验.

作者都是各自领域经过审查的专家,并撰写他们有经验的主题. 我们所有的内容都经过同行评审,并由同一领域的Toptal专家验证.

专业知识

以前在

区块链.com

世界级的文章,每周发一次.

输入您的电子邮件,即表示您同意我们的 隐私政策.

世界级的文章,每周发一次.

输入您的电子邮件,即表示您同意我们的 隐私政策.

Toptal开发者

加入总冠军® 社区.