前置知识
- C++11 智能指针
- 多线程知识
- 设计模式 Observer
什么是信号槽
程序需要一种监听结构,当对象发生改变时,监听者能知道,非常常用的监听是输入事件的监听,例如按钮点击之后会产生一个监听事件,该事件会被监听者「听到」。
虽然叫做监听者,直观理解上是监听者能够知道对象的变化,但实际上这种知道是被动的,需要对象主动通知监听者,对象发生了改变。
这实际上就是一种函数回调,在对象中注册一个回调事件,当对象发生改变时,调用该回调函数通知监听者。
设计模式中专门有 Observer 模式来应对这种需求,但其有一定的缺陷(后续再讨论),在 Qt 中,信号槽就是为了满足这种监听需求而设计的。
简单来说,信号槽机制就是 Sender 产生了一个信号,该信号会发送给 Receiver,接着 Receiver 会调用槽函数来处理信号。换句话说,无非就是发出信号调用槽函数,通过信号和槽建立对象之间的联系。
这里,一个信号可以触发多个槽,同时一个槽也可以由多个信号触发
为什么不用 Observer?
陈硕:《Linux 多线程服务端编程》1.14 Observer 之谬
Observer 模式的本质问题在于其面向对象的设计。换句话说,我认为正是面向对象(OO)本身造成了 Observer 的缺点。Observer 是基类,这带来了非常强的耦合,强度仅次于友元(friend)。这种耦合不仅限制了成员函数的名字、参数、返回值,还限制了成员函数所属的类型(必须是observer的派生类)。
Observer class 是基类,这意味着如果 Foo 想要观察两个类型的事件(比如时钟和温度),需要使用多继承。这还不是最糟糕的,如果要重复观察同一类型的事件(比如 1 秒一次的心跳和 30 秒一次的自检),就要用到一些伎俩来 work around,因为不能从一个 Base class 继承两次。
在 C++ 里为了替换 Observer,可以用 Signal/Slots,我指的不是 QT 那种靠语言扩展的实现,而是完全靠标准库实现的 thread safe、race condition free、thread contention free 的 Signal /Slots,并且不强制要求 shared_ptr 来管理对象,也就是说完全解决了 1.8 列出的 Observer 遗留问题。这会用到 2.8 介绍的“借 shared_ptr 实现 copy-on-write”技术。
简单来说,Observer 的缺陷:
- 通过继承实现,耦合性太强
- 想要观察多个事件,需要多继承(当然可以强行将两个事件的通知合并到一起,但这显然是一个十分错误的凑合做法)
Signal/Slot:通过聚合实现,弱耦合,可以嵌入到任何对象中
使用示例
要理解一个程序,最好从如何使用开始,从应用的角度宏观地理解程序,然后再回到细节。
一个最简单的信号槽:
class Observer {
public:
void update(int i) {
cout << "Detected signal " << i << endl;
}
Slot slot_;
};
class Subject {
public:
void doSomething() {
// some task
// ...
// call the signal
signal_.call(data_);
}
void addListener(Observer& obs) {
obs.slot_ = signal_.connect(std::bind(&Observer::update, &obs, std::placeholders::_1));
}
void delListener(Observer& obs) {
obs.slot_.reset();
}
int data_ = 0;
Signal<void(int&)> signal_; // 模板参数对应 Observer 中的 update
};
int main() {
Observer obs;
Subject sbj;
sbj.data_ = 1024;
sbj.addListener(obs); // 注册回调函数
sbj.doSomething();
sbj.delListener(obs); // 注销回调函数
sbj.doSomething();
return 0;
}
// output:
// Detected signal 1024
源码分析
// 前置声明
template <typename Callback> struct SlotImpl;
// SignalImpl 仅仅实现了对 Slot 的储存
template <typename Callback> struct SignalImpl {
typedef std::vector<std::weak_ptr<SlotImpl<Callback>>> SlotList;
SignalImpl() : slots_(new SlotList) {}
// 该函数用于实现 race condition free 和 thread contention free
// 后续再详细讨论
void copyOnWrite() {
if (!slots_.unique()) {
slots_.reset(new SlotList(*slots_));
}
}
// Signal 并不持有 Slot 对象,该函数用于清除失效的 Slot
void clean() {
std::unique_lock<std::mutex> lock(mutex_);
copyOnWrite();
SlotList& list(*slots_);
typename SlotList::iterator it(list.begin());
while (it != list.end()) {
if (it->expired()) {
it = list.erase(it);
} else {
++it;
}
}
}
std::mutex mutex_; // 保证 thread safe
std::shared_ptr<SlotList> slots_;
};
该部分没有什么特别的功能,只是储存了信号所对应的槽,mutex_
用于保证线程安全,同时只能有一个线程对列表进行写操作。
copyOnWrite()
用于保证对列表进行写操作不会影响其他线程的读操作,该部分实现的是 race condition free 和 thread contention free,不用条件变量来控制,实现最高程度的并发。也就是说,可以对列表同时进行读写,具体在后续再详细讨论。
// 槽的实现
template <typename Callback> struct SlotImpl {
typedef SignalImpl<Callback> Data;
SlotImpl(const std::shared_ptr<Data>& data, Callback&& cb)
: data_(data), cb_(cb), tie_(), tied_(false) {}
SlotImpl(const std::shared_ptr<Data>& data, Callback&& cb,
const std::shared_ptr<void>& tie)
: data_(data), cb_(cb), tie_(tie), tied_(true) {}
// 当 Slot 失效时,清除所对应信号中失效的 Slot
~SlotImpl() {
std::shared_ptr<Data> data(data_.lock());
if (data) {
data->clean();
}
}
// 槽所对应的信号,但并不持有
std::weak_ptr<Data> data_;
// 槽所对应的槽函数
Callback cb_;
// 特殊需求:对槽绑定一个对象,只有该对象有效时才会调用槽函数
std::weak_ptr<void> tie_;
bool tied_;
};
typedef std::shared_ptr<void> Slot;
该部分实现了槽的实体,std::shared_ptr<void>
可以指向任何对象,由于 SlotImpl<Callback>
是个模板类,通过 Slot
可以实现接口的统一,无需考虑 Slot
指向的 SlotImpl<Callback>
到底是什么类型,用就完事了。
可以这样用的一个原因是,用户并不需要使用 Slot
进行什么操作,信号与槽本身的关联关系,即注销回调是靠 Slot
的有效性来实现的,具体在后续再说明。
这里涉及到 shared_ptr
的「析构动作在创建时被捕获」特性,即 shared_ptr
的析构操作并不取决于模板参数,析构操作从构造参数中捕获,举个例子。
class Test {
public:
~Test() {
cout << "~Test()" << endl;
}
};
int main() {
// test_ptr 的析构操作由 new Test 给出,所以模板参数不影响析构
shared_ptr<void> test_ptr(new Test);
}
最后一个类是 Signal
,将上述的类进行拼合
template <typename Signature> class Signal;
// Signal 的函数模板是函数类型,例如 Signal<void(int)>
// 代表该信号所对应的槽函数的函数类型是 void(int)
template <typename RET, typename... ARGS> class Signal<RET(ARGS...)> {
public:
// 虽然槽函数的返回类型可以是非 void,但信号槽机制无法获得其返回值
typedef std::function<void(ARGS...)> Callback;
typedef SignalImpl<Callback> SignalImpl;
typedef SlotImpl<Callback> SlotImpl;
Signal() : impl_(new SignalImpl) {}
~Signal() {}
// 连接一个槽函数,即注册回调函数
// 返回一个 Slot,该变量不该忽略,所以添加 [[nodiscard]]
// 因为信号与槽的对应关系由 Slot 的有效性确定,该参数忽略
// 意味着 Slot 失效,就无法进行回调的操作
[[nodiscard]] Slot connect(Callback&& func) {
std::shared_ptr<SlotImpl> slotImpl(
new SlotImpl(impl_, std::forward<Callback>(func)));
add(slotImpl);
return slotImpl;
}
// 绑定槽函数的对象
[[nodiscard]] Slot connect(Callback&& func, const std::shared_ptr<void>& tie) {
std::shared_ptr<SlotImpl> slotImpl(new SlotImpl(impl_, func, tie));
add(slotImpl);
return slotImpl;
}
// 调用信号所对应的所有槽函数
void call(ARGS&&... args) {
SignalImpl& impl(*impl_);
std::shared_ptr<typename SignalImpl::SlotList> slots;
{
// 保证多线程调用 call 时的线程安全
std::unique_lock<std::mutex> lock(impl.mutex_);
slots = impl.slots_; // 一个新的 shared_ptr 指向
}
typename SignalImpl::SlotList& s(*slots);
for (auto it = s.begin(); it != s.end(); ++it) {
// 根据 weak_ptr 生成一个 shared_ptr
// 如果成功则代表槽有效,可以调用
std::shared_ptr<SlotImpl> slotImpl = it->lock();
if (slotImpl) {
std::shared_ptr<void> guard;
// 如果有绑定对象,检查对象的有效性
if (slotImpl->tied_) {
guard = slotImpl->tie_.lock();
if (guard) {
slotImpl->cb_(args...);
}
} else {
slotImpl->cb_(args...);
}
}
}
}
private:
void add(const std::shared_ptr<SlotImpl>& slot) {
SignalImpl& impl(*impl_);
{
// 保证多线程调用 add 时的线程安全
std::unique_lock<std::mutex> lock(impl.mutex_);
impl.copyOnWrite();
impl.slots_->push_back(slot);
}
}
// 指针指向不可变
const std::shared_ptr<SignalImpl> impl_;
};
这个类要复杂的多,首先 connect()
连接一个槽函数,表示在该信号中注册一个槽函数,对应的在 call()
中进行调用。至于为什么返回值不能忽略,我们就得回到 SignalImpl
,由于其槽列表保存的是 weak_ptr
,所以不负责槽的生命周期,我们连接一个槽函数,即注册一个回调函数,理应有注销的操作,这个注销操作由返回的 Slot
对象的有效性负责。
即如果想要注销该回调,只要销毁对应 connect
返回的 Slot
即可。
接着到 call()
函数,该函数中的互斥量用来保证在多线程情况下,slots
不会获得到一个 clean 了一半的列表。实际上该函数在多线程中属于消费者,并不会修改列表,但互斥量只保护了 shared_ptr
的构造,在后续的循环中,列表有可能被写,这种情况怎么办呢?
接下来就涉及到 race condition free 和 thread contention free 的巧妙实现了:Copy on write
我们回到 copyOnWrite()
函数,他有一个判断,如果指针独享列表,则不进行复制,如果列表还有其他指针指向,则进行复制,为什么?
Copy on write 顾名思义就是需要写操作的时候进行复制,即当 call()
函数在循环中不断读取列表时,此时另一个线程想要对列表进行 clean
,也就是写操作,发现该列表有多于一个指针指向他,所以此时我并不能随意对该列表进行写操作,因为有其他线程可能正在读。
那么此时我就进行复制操作,保证我进行的写操作不会影响其他线程的读,也就是 Copy on write 的由来。所以,当 call()
函数执行到循环部分时,可以保证列表不会被更改,这样就避开用复杂的条件变量来实现互斥操作,可以实现最大程度的并发,同时进行读写操作。
那么会造成内存泄漏吗?都是智能指针,显然不会。我们要知道,发生 Copy on write 之后的列表才是正源,其他正在读的线程里所保存的列表可以看作是临时变量,在函数退出之后如果指针引用变为 0,自然会销毁。
最后一个 add()
函数则同理,该函数和 clean()
一样是写操作,当对象有多个指针指向时,则进行复制,不影响其他线程的读操作,互斥量用来保证同时只能有一个写操作在进行。
总结
该机制实现起来虽然只有短短 100+ 行,但其中使用到的技巧令人叹为观止,对 shared_ptr
的引用计数有非常巧妙的应用,实现了 race condition free 和 thread contention free,以达到最大程度的并发,阅读完受益匪浅。
简单做个要点的总结:
- 一个信号可以注册多个槽,但一个槽(Slot)只由一个信号触发,如果想要多个信号触发同一个槽函数,可以在多个信号里注册同一个槽函数
- 回调注册通过
Signal
类完成,回调注销通过Slot
的有效性完成 - Copy on write 实现 race condition free 和 thread contention free,做到可以同时读写,最大程度并发