RACSignal的子类们

研究和使用RAC有一段时间了,但一直很懒,不想写任何东西。决定还是写一点东西吧,也算是强迫自己整理一下。计划从最常用的也是RAC的核心(信号) RACSignal 这个类说起,来引出RAC的方方面面:冷热信号实现及转换、RAC中FRP的体现、rac中的类和协议(RACDisposable、RACSubscriber、RACCommand、 RACChannel…)、 对信号的处理和控制(combineLatest、bind…)、ReactiveCocoa的设计思想及结构体系、遇到的一些坑…

RACSignal是什么?

在RAC中RACSignal是一个工厂类,他通过各个工厂方法生成不同的子类实例。同时他继承基类RACStream(RAC的抽象类,只是定义了处理信号流的方法,子类通过继承来具体实现)。

下面具体讨论RACSignal的几个子类


RACDynamicSignal

用法:

//1. 创建信号
RACSignal *dynamicSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"send next"];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@" to release some object because signal had completed or error");
}];
}];
//2. 订阅信号
[dynamicSignal subscribeNext:^(id x) {
NSLog(@"%@",x);
}];

上面的代码创建了一个signal, 如果需要dynamicSignal信号工作,我们需要对其订阅;订阅之后subscriber就会将”send next”传递到x中,因此NSLog的打印就是”send next”,如果你使用过RAC一定对此不会陌生。 平时我们所指的signal其实就是RACDynamicSignal(通过createSignal工厂方法创建的一个子类)。还有一些特殊用途的signal,例如RACErrorSignal、RACEmptySignal、RACReturnSignal都是通过工厂方法获得其实例。

下面通过源码逐步解析:
1 . 创建信号

+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
// 1、创建RACDynamicSignal实例:signal
RACDynamicSignal *signal = [[self alloc] init];
// 2、signal 中保存 传入的didSubscribe 以便后续对其调用
signal->_didSubscribe = [didSubscribe copy];
//tip: rac在创建不同实例的时候都会调用这个方法,给name属性赋值(RACStream中唯一具体实现的方法),给不同对象不同的标记用于输出日志等
return [signal setNameWithFormat:@"+createSignal:"];
}

2. 订阅信号

// subscribeNext方法是对订阅过程的一个封装,便于上层调用
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
NSCParameterAssert(nextBlock != NULL);
//1、创建订阅者,将nextBlock参数再次传递
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
//2、传入订阅者,进行订阅
return [self subscribe:o];
}

2.1创建订阅者

// 这里是对订阅者的初步创建,后面还有对它的二次处理
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
//1、创建RACSubscriber实例:subscriber
RACSubscriber *subscriber = [[self alloc] init];
//2、分别记录传入的next、error、completed
subscriber->_next = [next copy];
subscriber->_error = [error copy];
subscriber->_completed = [completed copy];
return subscriber;
}

2.2订阅

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
/*
* subscriber 被赋给了一个新的RACPassthroughSubscriber实例
* 并将原来的subscriber以及signal本身和disposable作为新的subscriber的实例对象保存
*/
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
if (self.didSubscribe != NULL) {
//下列方法会执行传入的bock参数(因涉及到RACDisposable和RACScheduler两大块,不便具体解释过程,可以自己看源码)
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
// 这里对《1.创建信号》中保存的didSubscribe进行调用,并将subscriber作为block参数传入
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
}
return disposable;
}

思路整理:首先createSignal方法创建了一个信号dynamicSignal,并把传入的block参数记录在了dynamicSignal的didSubscribe中;然后subscribeNext方法中创建了一个订阅者 o 用其属性next保存了从subscribeNext方法传入的block,接下来subscribeNext中调用subscribe方法并将 o 作为参数传入;而在subscribe方法中执行了 self.didSubscribe(subscriber) 其中subscriber就是传入subscribe方法的o。也就是执行了一开始保存在dynamicSignal中的didSubscribe。
也就是执行了createSignal方法中传入的block

^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"send next"];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@" to release some object because signal had completed or error");
}];
}

终于到了最后一步,我们只需要弄清[subscriber sendNext:@"send next"];这一方法的具体实现,整个RACDynamicSignal的工作原理就全部揭晓。(sendCompleted 以及 RACDisposable 相关计划下次介绍)
sendNext的实现如下:

- (void)sendNext:(id)value {
@synchronized (self) {
void (^nextBlock)(id) = [self.next copy];
if (nextBlock == nil) return;
nextBlock(value);
}
}

可以看到这里其实就是对上文保存在 o 中的 next 进行执行,同时将传入的 value 再次当作 next 的参数传入。也就是执行了订阅信号方法subscribeNext 时传入的 block ,如下

^(id x) {
NSLog(@"%@",x);
}

直到这里,整个信号的 “创建-发送-订阅” 的过程全部分析完毕。
这里强调一下:RACDynamicSignal 中的信号需要被订阅,然后信号才会被激活。


RACSubject

RACSubject 是 RACSignal 的一个子类,用法如下:

//1、创建实例
RACSubject *subject = [RACSubject subject];
//2、订阅信号
[subject subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
//3、发送信号
[subject sendNext:@"send next"];

和RACSignal不同的是 RACSubject 是先订阅信号,再发送信号。RACSignal是冷信号,而RACSubject及其子类是热信号,这也是为什么RACSubject的创建不是通过RACSignal的工厂方法而是单独拿出来做为一个体系的原因。下面通过源码具体分析:

1 . 创建信号

//仅仅只是实例了一个对象
+ (instancetype)subject {
return [[self alloc] init];
}

2. 订阅信号

//执行的还是其父类RACSignal中的方法
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
NSCParameterAssert(nextBlock != NULL);
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
return [self subscribe:o];
}

2.1 创建订阅者

// 这里和RACSignal中也是一样,执行的RACSubscriber中的方法
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
RACSubscriber *subscriber = [[self alloc] init];
subscriber->_next = [next copy];
subscriber->_error = [error copy];
subscriber->_completed = [completed copy];
return subscriber;
}

2.2 订阅

// 这里开始不同,RACSubject对其进行了重写。其实这个方法,在RACSignal中没有实现,RACDynamicSignal也是执行的RACDynamicSignal中重写的方法
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
// 这里和RACDynamicSignal中不同,RACSubject持有一个数组subscribers,来储存所有的订阅者,对subject的每一次订阅都会将订阅者加入该数组保存起来
NSMutableArray *subscribers = self.subscribers;
@synchronized (subscribers) {
[subscribers addObject:subscriber];
}
return [RACDisposable disposableWithBlock:^{
@synchronized (subscribers) {
// Since newer subscribers are generally shorter-lived, search
// starting from the end of the list.
NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
return obj == subscriber;
}];
if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
}
}];
}

3. 发送信号

// 循环遍历subscribers数组,逐个执行:[subscriber sendNext:value]
- (void)sendNext:(id)value {
[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:value];
}];
}
- (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block {
NSArray *subscribers;
@synchronized (self.subscribers) {
subscribers = [self.subscribers copy];
}
for (id<RACSubscriber> subscriber in subscribers) {
block(subscriber);
}
}

和RACSignal不同的是:RACSubject 通过subscribeNext方法将订阅者保存在subscribers数组中,供sendNext方法调用;而RACSignal是先保存一个didSubscribe对象,然后通过subscribeNext方法创建一个订阅者subscriber作为参数传入didSubscribe中,进而subscriber才能发送信号。
简言之:RACSubject会保存所有subscriber(状态),sendNext只是对其操作。RACSignal 在被订阅后才会生成subscriber,并立即对其操作。
这也是冷信号和热信号的区别(冷信号需要通过订阅来激活)

RACReplaySubject

RACReplaySubject 是 RACSubject 的一个子类,他们都是热信号,那么他们有什么不同呢,还是从源码入手

//1、创建实例
RACReplaySubject *subject = [RACReplaySubject replaySubjectWithCapacity:2];
//2、订阅信号
[subject subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
//3、发送信号
[subject sendNext:@"send next"];

1 . 创建信号

+ (instancetype)replaySubjectWithCapacity:(NSUInteger)capacity {
return [(RACReplaySubject *)[self alloc] initWithCapacity:capacity];
}
- (instancetype)initWithCapacity:(NSUInteger)capacity {
self = [super init];
if (self == nil) return nil;
// 这里会用一个属性capacity 将 传入的数记录下来,下面会有这个数的用途
_capacity = capacity;
_valuesReceived = (capacity == RACReplaySubjectUnlimitedCapacity ? [NSMutableArray array] : [NSMutableArray arrayWithCapacity:capacity]);
return self;
}
// 也可以直接对其实例,capacity为最大无符号整数
- (instancetype)init {
// const NSUInteger RACReplaySubjectUnlimitedCapacity = NSUIntegerMax;
return [self initWithCapacity:RACReplaySubjectUnlimitedCapacity];
}

2. 订阅信号

// 也是在subscribe方法发生变化,走的是RACReplaySubject中的方法
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
@synchronized (self) {
// 这里会从valuesReceived里sendNext未被执行过value,第三步中会有 valuesReceived 的操作
for (id value in self.valuesReceived) {
if (compoundDisposable.disposed) return;
[subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
}
if (compoundDisposable.disposed) return;
if (self.hasCompleted) {
[subscriber sendCompleted];
} else if (self.hasError) {
[subscriber sendError:self.error];
} else {
RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
[compoundDisposable addDisposable:subscriptionDisposable];
}
}
}];
[compoundDisposable addDisposable:schedulingDisposable];
return compoundDisposable;
}

3. 发送信号

- (void)sendNext:(id)value {
@synchronized (self) {
// 每次将发送过的value添加到valuesReceived中记录下来
[self.valuesReceived addObject:value ?: RACTupleNil.tupleNil];
// 调用父类也就是RACSubject的sendNext
[super sendNext:value];
// 通过初始化时的capacity值截取valuesReceived的元素(从数组头部开始,也就是最先添加进来的value)
if (self.capacity != RACReplaySubjectUnlimitedCapacity && self.valuesReceived.count > self.capacity) {
[self.valuesReceived removeObjectsInRange:NSMakeRange(0, self.valuesReceived.count - self.capacity)];
}
}
}

为了解释发生了什么,进行如下测试

RACReplaySubject *subject = [RACReplaySubject subject];
[subject subscribeNext:^(id x) {
NSLog(@"%@--1",x);
}];
[subject subscribeNext:^(id x) {
NSLog(@"%@--2",x);
}];
[subject sendNext:@"send next1"];
[subject sendNext:@"send next2"];
[subject subscribeNext:^(id x) {
NSLog(@"%@--3",x);
}];

日志:

//1
2017-03-23 22:03:03.722 testdemo[3509:143199] send next1--1
//2
2017-03-23 22:03:03.722 testdemo[3509:143199] send next1--2
//3
2017-03-23 22:03:03.722 testdemo[3509:143199] send next2--1
//4
2017-03-23 22:03:03.723 testdemo[3509:143199] send next2--2
//5
2017-03-23 22:03:03.723 testdemo[3509:143199] send next1--3
//6
2017-03-23 22:03:03.723 testdemo[3509:143199] send next2--3

分析:

  1. [subject sendNext:@”send next1”] 毫无疑问会打印 1 2 两条日志,同时将value存入了valuesReceived;
  2. [subject sendNext:@”send next2”] 会打印 3 4 两条日志,也同时将value存入了valuesReceived,到此valuesReceived里就存有两个value了;
  3. 对subject 进行第三次 subscribeNext 时,因为valuesReceived里是有值的,所以会执行两次subscriber sendNext,也就是打印出日志 5 和 6 。

RACReplaySubject 会将每次sendNext的值记录到valuesReceived(根据capacity大小会移除最先添加的元素),之后对RACReplaySubject的订阅会将valuesReceived里的记录通过订阅者来发送。 这也就是RACReplaySubject的区别


对于RACSignal的介绍到此结束,我们可以发现对RACDynamicSignal的每次订阅都会重复执行didSubscribe,而RACSubject则不会。

在我们平时使用时,很多场景是需要用到 RACDynamicSignal 的。
例如:我们会把数据处理、网络请求、UI刷新等等一系列操作放在didSubscribe里(这也是RAC的宗旨啊,将平时的各种操作都转化为炫酷的信号的形式,优雅的进行传递与交互)。但是我们不希望每次对RACSignal的订阅都重复执行didSubscribe,因为这样会带来bug或者性能问题。

这时,我们要做的就是将冷信号转换为热信号,因为热信号的重复订阅是不会带来这个问题的。那么如何将冷信号转化为热信号呢,后面再说吧。

坚持原创技术分享,您的支持将鼓励我继续创作!