RxSwift, 异步操作组合处理

工作期间项目中RxSwift的应用

Posted by P36348 on November 27, 2018

响应式编程&链式编程

接手thinker.vc的几个iOS共享经济项目, 有较多后台定时的网络请求,定位和蓝牙操作的组合. 原实现方案是直接把不同操作通过闭包嵌套起来, 如此一来有些比较头疼的问题:

  • 异步操作组合出现”回调地狱”, 每个组合操作的业务上有变动需要做大修.

  • 任务管理不便,无法获取或取消上一次的请求/操作.

  • 异步响应不及时可能造成之前的请求后至, 让数据出错. 或在页面退出之后仍然在进行未完成的请求.

网上较推荐的解决方案,就是使用响应式编程框架. 大体分为RxSwiftReactiveCocoa / ReactiveSwift.

ReactiveCocoa是从OC时代就开始有的, 在Swift时代迁移过来, 开始的时候只是单纯的调用OC源码混编, 到后来衍生出了ReactiveSwift. 而RxSwift则是”原生”Swift诞生的.

刚好公司的项目里面本来已经使用了Swagger来自动生成网络请求业务的代码, 自带Moya框架并选择的是RxSwift. 于是就理所当然用了RxSwift.


串行异步请求处理

举个例子需要更新坐标然后请求附近的设备列表, 需要考虑定位超时或者定位出错的情况(例如没有开启权限), 以及提示用户当前正在更新数据.

分解之后需要实现以下函数:

1
2
3
4
5
6
7
8
// 更新坐标
func updateLocation(success: (CLLocation) -> Void, failure: (Error) -> Void)
// 请求设备
func fetchDevices(near location: CLLocation, success: ([Device]) -> Void, failure: (Error) -> Void)
// 更新数据以及界面
func update(with devices: [Device])
// 处理错误
func handlError(_ error: Error)

原来的传统闭包实现方式

1
2
3
4
5
6
7
8
9
10
11
self.updateLocation(success: { [weak self] location in
         self?.fetchDevices(near: location, 
                            success: { res in 
                               self?.update(with: res)
                            }
                            failure: { err in 
                               self?.handlError(err)
                            })}, 
                     failure: { [weak self] err in 
                         self?.handlError(err)
                     })

类似这样的业务在共享系列项目经常出现, 如:

1.首页地图上轮询用户坐标附近的设备

2.设备列表界面

先讨论第2种情况, 这个时候上面的代码应该在下拉刷新中调用, 就是实际上项目写出的是这种嵌套的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
self.tableView.pullToRefresh(action: { [unowned self] in
        self.updateLocation(success: { [weak self] location in
                 self?.fetchDevices(near: location, 
                                    success: { res in 
                                       self?.update(with: res)
                                    }
                                    failure: { err in 
                                       self?.handlError(err)
                                    })}, 
                             failure: { [weak self] err in 
                                 self?.handlError(err)
                             })
})

一开始我尝试把更新定位和数据这一段操作封装为一个函数func updateData(), 然后在tableView的下拉刷新闭包里面调用.

如此类推也可以把updateData里面的操作继续拆分, 视觉上就不会面对这么难看的代码块.

可是即使这样, 下拉刷新之后一系列的请求动作还是没有很直观的呈现出来, 代码也会因此变得相对零散, 最后甚至更加难看出原来的业务逻辑.

这个时候搬出RxSwift. 在接入之后, 代码就可以改成以下这个样子:

1
2
3
4
5
6
self.tableView.rx_pullToRefresh
    .flatMap({[unowned self] in self.rx_updateLocation()})
    .flatMap({[unowned self] in self.rx_fetchDevices(near: $0)})
    .subscribe(onNext:  {[weak self] in self?.update(with: $0)},
               onError: {[weak self] in self?.handleError($0)})
    .disposed(by: self.disposeBag)

然后逐步解析这段代码, 关键点主要有:

  • 链式

    把原来通过闭包回调的函数都改造成了返回Observable的函数, 改造后回调的处理就通过链式调用ObservableOperator结合函数式编程来传递.

  • 函数式

    把处理回调的函数作为参数传递给flatMapsubscribe函数.

  • Error聚合处理

    原来每个异步操作都有一个Error的返回, 用闭包回调的方式需要在每次调用函数的时候传入. 而使用RxSwift的话, 其实所有Observable是被聚合成了1个, 就是在subscribe调用的那个, 而之前聚合起来的Observable的Error都可以在subscribe的onError参数里传递处理.

这段代码仍不够完善, 优化方式后面会提到. 但是显而易见, 这一系列异步操作的条理顺序已经出来了, 而如果中间某一环需要修改, 也并不难操作.


并行异步请求处理

照样举例, 下载多张图片然后更新数据库:

分解之后有以下几个函数:

1
2
3
4
5
6
7
8
// 下载图片
func downloadImage(withURL: URL) -> (imageData: Data?, error: Error?)
// 处理下载好的图片
func handleDownloadImage(imageData: Data)

func handleError(error: Error)
// 写入数据库
func updateDatabase()

原来的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
let group = DispatchGroup()

DispatchQueue.global().async(group: group, execute: { [unowned self] in
    let res = self.downloadImage(withURL: url1)
    if let data = res.imageData {
        self.handleDownloadImage(imageData: data)
    }else if let error = res.error {
        self.handleError(error: error)
    }
})
DispatchQueue.global().async(group: group, execute: { [unowned self] in
    let res = self.downloadImage(withURL: url2)
    if let data = res.imageData {
        self.handleDownloadImage(imageData: data)
    }else if let error = res.error {
             self.handleError(error: error)
    }
})
DispatchQueue.global().async(group: group, execute: { [unowned self] in
    let res = self.downloadImage(withURL: url2)
    if let data = res.imageData {
        self.handleDownloadImage(imageData: data)
    }else if let error = res.error {
        self.handleError(error: error)
    }
})

group.notify(queue: DispatchQueue.main) { [unowned self] in
    self.updateDatabase()
}

RxSwift的做法, 利用Observable的zip函数把多个下载任务聚合:

1
2
3
4
Observable.zip(self.rx_downloadImage(withURL: url1), self.rx_downloadImage(withURL: url2), self.rx_downloadImage(withURL: url3))
          .subscribe(onNext: {[unowned self] res in self.updateDatabase()}
                     onError: {[unowned self] err in self.handleError(error: error)})
          .disposed(by: self.disposeBag)

改造耗时的同步函数

ReactiveX框架确实很适合用来处理异步场景,只要使用者习惯了用return Observable来代替block, 并且要在Observable中传递处理结果.

RxSwift提供了一个比较简单的方式创建Observable, 就是Observable的静态函数create:

Creates an observable sequence from a specified subscribe method implementation.

1
public static func create(_ subscribe: @escaping (RxSwift.AnyObserver<Self.E>) -> Disposable) -> RxSwift.Observable<Self.E>

这个函数只有一个闭包参数需要传入, 而这个闭包会给我们提供一个Observer的实例, 我们的操作都会在这个闭包里面执行, 而结果就通过Observer来派发, 比方上面的下载图片函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func rx_downloadImage(withURL url: URL) -> Observable<Data> {
    return Observable.create({ [unowned self] observer -> Disposable in 
        DispatchQueue.global().async({
        let res = self.downloadImage(withURL: url)
                if let data = res.data {
                    observer.onNext(data)
                    observer.onCompleted()
                }else if let error = res.error {
                    observer.onError(error)
                }
        })
        return Disposables.create()
    })
}

这样就封装了一个在内部子线程调用旧函数downloadImage的函数. 对于子线程调用这个需求, 简单地使用了global队列来异步执行下载.

队列的切换

以上的做法实现了在函数内异步下载并回调的需求, 但是这个做法会让调用者无法控制下载任务在哪个队列执行.

如果我们有必要手动去控制这类函数执行的队列, 可以通过Rx提供的解决方案实现, 就是以下两个函数observeOnsubscribeOn:

Wraps the source sequence in order to run its observer callbacks on the specified scheduler.

1
public func observeOn(_ scheduler: ImmediateSchedulerType) -> PrimitiveSequence<Trait, Element> 

Wraps the source sequence in order to run its subscription and unsubscription logic on the specified

1
public func subscribeOn(_ scheduler: ImmediateSchedulerType)

其中subscribeOn是指定了Observable是在什么队列被订阅的, 这个队列同时也会是被订阅的Observable任务执行所在的队列.

observeOn则是指定了处理结果所在的队列, 就是我们最后调用subscribe(onNext, onError,…)的时候, 闭包里的任务的执行所在队列.

这两个函数接收的参数ImmediateSchedulerType是RxSwift定义的协议, 实际上已经帮我们实现了几个结构体可以直接使用, 比较常用的是SerialDispatchQueueScheduler, ConcurrentDispatchQueueScheduler, ConcurrentMainSchedulerMainScheduler.

我们可以在SerialDispatchQueueSchedulerConcurrentDispatchQueueScheduler的init函数中传入对应的队列. 或者直接指定qos, 类似DispatchQueue的global函数.

通过这2个函数, 可以便捷地把原来同步执行的函数放到子队列里面执行, 然后回到主线程处理结果, 比方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func rx_downloadImage(withURL url: URL) -> Observable<Data> {
    return Observable.create({ [unowned self] observer -> Disposable in 
        let res = self.downloadImage(withURL: url)
            if let data = res.data {
                observer.onNext(data)
                observer.onCompleted()
            }else if let error = res.error {
                observer.onError(error)
            }
        return Disposables.create()
    })
}

Observable.zip(self.rx_downloadImage(withURL: url1), self.rx_downloadImage(withURL: url2), self.rx_downloadImage(withURL: url3))
          .subscribeOn(ConcurrentDispatchQueueScheduler(qos: DispatchQoS.default))
          .observeOn(ConcurrentMainScheduler.asyncInstance)
          .subscribe(onNext: {[unowned self] res in self.updateDatabase()}
                     onError: {[unowned self] err in self.handleError(error: error)})

Observable的”缺陷”

在串行异步请求处理里面提到了这样的处理方式并不完善, 因为Observable有一个特点就是抛出Error之后就会自动销毁. 而这段代码里面, 下拉刷新的Observable是与另外两个异步操作聚合在一起的, 就是说如果网络请求或者定位的操作抛出Error, 那么用户下一次下拉刷新也是不会被处理的.

所以, 为了避免这个问题, 解决方式有两种:

1.不要把下拉刷新的Observable与另外两个异步操作的Observable聚合;

2.拦截另外两个Observable可能抛出的Error

第1个方案的解决代码:

1
2
3
4
5
6
7
8
self.tableView.rx_pullToRefresh
    .subscribe(onNext: { [unowned self] in
        _ = self.rx_updateLocation()
                .flatMap({self.rx_fetchDevices(near: $0)})
                .subscribe(onNext:  {self.update(with: $0)},
                           onError: {self.handleError($0)})
    })    
    .disposed(by: self.disposeBag)

这样就可以保证rx_pullToRefresh会一直被observe, 但是这个处理方式会让代码可读性又下降了.

所幸RxSwift还给我们提供了另外的选择, 利用Observable的catchError函数或者catchErrorJustReturn函数, 我们就可以把以上2个异步操作的错误拦截.

Continues an observable sequence that is terminated by an error with the observable sequence produced by the handler.

1
public func catchError(_ handler: @escaping (Error) throws -> RxSwift.Observable<Self.E>) -> RxSwift.Observable<Self.E>

Continues an observable sequence that is terminated by an error with a single element.

1
public func catchErrorJustReturn(_ element: Self.E) -> RxSwift.Observable<Self.E>

其中catchErrorJustReturn函数可以让我们产生一个默认值来继续事件链, 而catchError则接收一个闭包参数, 可以通过我们产生的Error来制定一个值给后续事件链或者直接在闭包里面处理Error. 显然这个场景我们是需要处理Error的, 所以选用catchError. 而这样的话, subscribe函数的onError就永远都不会执行了, 等于是把Error的处理提前到了catchError:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 处理Error并且返回默认值
func rx_handleError(_ error: Error) -> Observable<[Device]> {
    // 处理Error, 生成默认参数
    ...
    return Observable.of(__defaultValue__)
}
// 请求列表的数据(聚合2个异步操作的Observable)
func rx_fetchTableViewData() -> Observable<[Device]> {
    return self.rx_updateLocation().flatMap({self.rx_fetchDevices(near: $0)})
}

self.tableView.rx_pullToRefresh
    .flatMap({ [unowned self] in 
        self.rx_fetchTableViewData()
            .catchError({self.rx_handleError($0)})
    })
    .subscribe(onNext: {[weak self] in self?.update(with: $0)})
    .disposed(by: self.disposeBag)

以上这样操作算是把Error处理了, 但是还是存在问题, 我们还需要再改进一下, 不要让handleResponseValue和handleError的代码分散.

配合enum以及泛型更舒服地处理Error

Swift里面的枚举类型是可以带参数的, 我们可以把我们要的结果抽象出来, 定义一个枚举:

1
2
3
4
enum Result {
    case value([Device])
    case error(Error)
}

然后改动一下我们的fetch以及handle方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 改进后的fetch函数把[Device]和Error转换成Result
func rx_fetchTableViewData() -> Observable<Result> {
    return self.rx_updateLocation()
               .flatMap({self.rx_fetchDevices(near: $0)})
               .map({Result.value($0)})
               .catchError({Observable.of(Result.error($0))})
}
// 统一处理Result
func handleResult(_ result: Result) {
    switch result {
        case .value(let value):
            self.update(with: value)
        case .error(let error):
            self.handleError(error)
    }
}

最后我们这个下拉刷新->更新定位坐标->获取附近设备->更新table的代码就可以变成:

1
2
3
4
self.tableView.rx_pullToRefresh
    .flatMap({[unowned self] in self.rx_fetchTableViewData()})
    .subscribe(onNext: {[weak self] in self?.handleResult($0)})
    .disposed(by: self.disposeBag)

但是, 实际上有这种场景的不止是设备列表, 还有比如附近的设备中心(Station), 这个Result显然可以承担更多的任务.

利用Swift的泛型改进, 让Result适用于通用的场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
enum Result<T> {
    case value(T)
    case error(Error)
}
func rx_fetchDeviceTableViewData() -> Observable<Result<[Device]>> {
    return ...
}
func handleDeviceResult(_ result: Result<[Device]>) {
    ...
}
func rx_fetchStationTableViewData() -> Observable<Result<[Station]>> {
    return ...
}
func handleStationResult(_ result: Result<[Station]>) {
    ...
}

改造原来的GCD异步函数

如果在在项目中途接入Rx, 原来项目中已经存在大量通过GCD回调的函数了.

这个时候把全部函数都改造是很高成本的, 而且部分函数可能在项目中被调用了很多次, 涉及的模块可能比较多, 但是不一定每个调用了这个函数的模块都有必要接入Rx. 在这种情况下通用可以使用Observable的create函数去封装原来的函数.

比如有一个加载本地数据的函数:

1
2
3
4
5
function loadDataFromLocal(filePath: URL, success: (Data)->Void, failure: (Error)->Void) {
    ...
    ...
    ...
}

在不改动原函数的情况下, 增加一个新的函数:

1
2
3
4
5
6
7
8
9
10
function rx_loadDataFromLocal(filePath: URL) -> Observable<Result<Data>> {
    return Observable.create({ observer in
    
        loadDataFromLocal(filePath: filePath,
                          success: {data in observer.onNext(.value(data))}, 
                          failure: {error in observer.onNext(.error(error))})
                          
        return Disposables.create()
    })
}

这样的函数改造和同步函数的改造一样, 有一个类似的缺陷, 就是不可以再改变loadDataFromLocal函数在哪个队列执行.

相对的, 这个改造方式可以比较简便地复用现有的函数.

Delegate回调

除了GCD, delegate也是异步回调的一种常用方式, 而delegate回调也是有可能出现在异步函数嵌套组合中.比如: 一个弹出的选择框.

用传统Delegate的做法, 需要在调用端实现delegate的函数.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class ModalViewController: UIViewController {
    weak var delegate: ModalViewControllerDelegate? = nil
    ...
    
    private func internalSelect(at index: Int) {
        self.delegate?.modalViewController(didSelectAt: index)
    }
}
protocol ModalViewController: NSObjectProtocol {
    func modalViewController(didSelectAt index: Int)
}
// 调用端
class ViewController: UIViewController, ModalViewControllerDelegate {
    func performSelect() {
        let modalViewController = ModalViewController()
        modalViewController.delegate = self
        self.present(viewController: modalViewController, animated: true, completion: nil)
    }
    
    func modalViewController(didSelectAt index: Int) {
        // 处理回调
        self.handleSelect(at: index)
    }
    
    func handleSelect(at index: Int) {
        ...
    }
}

这种做法有一个明显的弊端: 当ViewController是一个业务量大的页面的时候, 它一般会有很多个这样的回调, 如此一来ViewController需要在定义的时候就声明实现很多个delegate, 并且会有很多个回调函数. 这样会对ViewController造成很大的代码量和增大了维护难度.

如果运用RxSwift去处理这种回调, 我们可以压缩ViewController的代码:

设想我们需要ModalViewController提供一个Observbable给外部观察选择事件. 那调用的时候就可以直接通过这个Observable知道选中值.如此一来, ViewController中回调对应的代码就可以控制在performSelect函数中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 调用端
class ViewController: UIViewController {
    func performSelect() {
        let modalViewController = ModalViewController()
        
        self.present(viewController: modalViewController, animated: true, completion: nil)
        
        modalViewController.rx_didSelect
        	.subscribe(onNext: { [weak self] index in
				// 处理回调
            	...
        	})
    }
}

为了实现这个目的, 我们只需如下改造ModalViewController:

1
2
3
4
5
6
7
8
9
10
11
class ModalViewController: UIViewController {
    // PublishSubject不适合对外公开, 避免外部调用入口函数.
    var rx_didSelect: Obsevable<Int> {
        return self.rx_internal_didSelect.asObservable()
    }
    private let rx_internal_didSelect: PublishSubject<Int> = PublishSubject()
    ...
    private func internalSelect(at index: Int) {
        self.rx_internal_didSelect.onNext(index)
    }
}

取消异步任务

这个比较容易操作, 使用Observabledisposed(by:)函数, 在想要取消这个任务的时候把传入的DisposeBag实例销毁.

比方说让ViewController持有一个DisposeBag, 在ViewController调用deinit的时候, 绑定在这个DisposeBag上的Observable就都不会继续处理了.

小结

以上是Thinker.vc项目中接入RxSwift对异步/并行操作的优化改进经历.概括来说, 就是:

1.把多层的闭包嵌套降维成链式单层闭包

2.把散落的Error处理整合起来

3.更灵活方便地切换队列

4.更明确展现异步操作之间的关系

5.更方便顺应业务改变异步操作组合

6.可以取消任务响应


持续更新...