源码分析:C++ 信号槽机制的实现

前置知识

  1. C++11 智能指针
  2. 多线程知识
  3. 设计模式 Observer

什么是信号槽

程序需要一种监听结构,当对象发生改变时,监听者能知道,非常常用的监听是输入事件的监听,例如按钮点击之后会产生一个监听事件,该事件会被监听者「听到」。

虽然叫做监听者,直观理解上是监听者能够知道对象的变化,但实际上这种知道是被动的,需要对象主动通知监听者,对象发生了改变。

这实际上就是一种函数回调,在对象中注册一个回调事件,当对象发生改变时,调用该回调函数通知监听者。

设计模式中专门有 Observer 模式来应对这种需求,但其有一定的缺陷(后续再讨论),在 Qt 中,信号槽就是为了满足这种监听需求而设计的。

简单来说,信号槽机制就是 Sender 产生了一个信号,该信号会发送给 Receiver,接着 Receiver 会调用槽函数来处理信号。换句话说,无非就是发出信号调用槽函数,通过信号和槽建立对象之间的联系。

这里,一个信号可以触发多个槽,同时一个槽也可以由多个信号触发

为什么不用 Observer?

陈硕:《Linux 多线程服务端编程》1.14 Observer 之谬

Observer 模式的本质问题在于其面向对象的设计。换句话说,我认为正是面向对象(OO)本身造成了 Observer 的缺点。Observer 是基类,这带来了非常强的耦合,强度仅次于友元(friend)。这种耦合不仅限制了成员函数的名字、参数、返回值,还限制了成员函数所属的类型(必须是observer的派生类)。

Observer class 是基类,这意味着如果 Foo 想要观察两个类型的事件(比如时钟和温度),需要使用多继承。这还不是最糟糕的,如果要重复观察同一类型的事件(比如 1 秒一次的心跳和 30 秒一次的自检),就要用到一些伎俩来 work around,因为不能从一个 Base class 继承两次。

在 C++ 里为了替换 Observer,可以用 Signal/Slots,我指的不是 QT 那种靠语言扩展的实现,而是完全靠标准库实现的 thread safe、race condition free、thread contention free 的 Signal /Slots,并且不强制要求 shared_ptr 来管理对象,也就是说完全解决了 1.8 列出的 Observer 遗留问题。这会用到 2.8 介绍的“借 shared_ptr 实现 copy-on-write”技术。

简单来说,Observer 的缺陷:

  1. 通过继承实现,耦合性太强
  2. 想要观察多个事件,需要多继承(当然可以强行将两个事件的通知合并到一起,但这显然是一个十分错误的凑合做法)

Signal/Slot:通过聚合实现,弱耦合,可以嵌入到任何对象中

使用示例

要理解一个程序,最好从如何使用开始,从应用的角度宏观地理解程序,然后再回到细节。

一个最简单的信号槽:

class Observer {
public:
    void update(int i) {
        cout << "Detected signal " << i << endl;
    }
    Slot slot_;
};

class Subject {
public:
    void doSomething() {
        // some task
        // ...

        // call the signal
        signal_.call(data_);
    }
    void addListener(Observer& obs) {
        obs.slot_ = signal_.connect(std::bind(&Observer::update, &obs, std::placeholders::_1));
    }
    void delListener(Observer& obs) {
	    obs.slot_.reset();
    }
    int data_ = 0;
    Signal<void(int&)> signal_; // 模板参数对应 Observer 中的 update
};

int main() {
    Observer obs;
    Subject sbj;
    sbj.data_ = 1024;
    sbj.addListener(obs); // 注册回调函数
    sbj.doSomething();

	sbj.delListener(obs); // 注销回调函数
	sbj.doSomething();

	return 0;
}

// output:
// Detected signal 1024

源码分析

完整源码

// 前置声明
template <typename Callback> struct SlotImpl;

// SignalImpl 仅仅实现了对 Slot 的储存
template <typename Callback> struct SignalImpl {
    typedef std::vector<std::weak_ptr<SlotImpl<Callback>>> SlotList;

    SignalImpl() : slots_(new SlotList) {}

    // 该函数用于实现 race condition free 和 thread contention free
    // 后续再详细讨论
    void copyOnWrite() {
        if (!slots_.unique()) {
            slots_.reset(new SlotList(*slots_));
        }
    }

    // Signal 并不持有 Slot 对象,该函数用于清除失效的 Slot
    void clean() {
        std::unique_lock<std::mutex> lock(mutex_);
        copyOnWrite();
        SlotList& list(*slots_);
        typename SlotList::iterator it(list.begin());
        while (it != list.end()) {
            if (it->expired()) {
                it = list.erase(it);
            } else {
                ++it;
            }
        }
    }

    std::mutex mutex_; // 保证 thread safe
    std::shared_ptr<SlotList> slots_;
};

该部分没有什么特别的功能,只是储存了信号所对应的槽,mutex_ 用于保证线程安全,同时只能有一个线程对列表进行写操作。

copyOnWrite() 用于保证对列表进行写操作不会影响其他线程的读操作,该部分实现的是 race condition free 和 thread contention free,不用条件变量来控制,实现最高程度的并发。也就是说,可以对列表同时进行读写,具体在后续再详细讨论。

// 槽的实现
template <typename Callback> struct SlotImpl {
    typedef SignalImpl<Callback> Data;
    SlotImpl(const std::shared_ptr<Data>& data, Callback&& cb)
        : data_(data), cb_(cb), tie_(), tied_(false) {}

    SlotImpl(const std::shared_ptr<Data>& data, Callback&& cb,
             const std::shared_ptr<void>& tie)
        : data_(data), cb_(cb), tie_(tie), tied_(true) {}

	// 当 Slot 失效时,清除所对应信号中失效的 Slot
    ~SlotImpl() {
        std::shared_ptr<Data> data(data_.lock());
        if (data) {
            data->clean();
        }
    }

	// 槽所对应的信号,但并不持有
    std::weak_ptr<Data> data_;
    // 槽所对应的槽函数
    Callback cb_;

	// 特殊需求:对槽绑定一个对象,只有该对象有效时才会调用槽函数
    std::weak_ptr<void> tie_;
    bool tied_;
};

typedef std::shared_ptr<void> Slot;

该部分实现了槽的实体,std::shared_ptr<void> 可以指向任何对象,由于 SlotImpl<Callback> 是个模板类,通过 Slot 可以实现接口的统一,无需考虑 Slot 指向的 SlotImpl<Callback> 到底是什么类型,用就完事了。

可以这样用的一个原因是,用户并不需要使用 Slot 进行什么操作,信号与槽本身的关联关系,即注销回调是靠 Slot 的有效性来实现的,具体在后续再说明。

这里涉及到 shared_ptr 的「析构动作在创建时被捕获」特性,即 shared_ptr 的析构操作并不取决于模板参数,析构操作从构造参数中捕获,举个例子。

class Test {
public:
    ~Test() {
        cout << "~Test()" << endl;
    }
};

int main() {
	// test_ptr 的析构操作由 new Test 给出,所以模板参数不影响析构
    shared_ptr<void> test_ptr(new Test);
}

最后一个类是 Signal,将上述的类进行拼合

template <typename Signature> class Signal;

// Signal 的函数模板是函数类型,例如 Signal<void(int)>
// 代表该信号所对应的槽函数的函数类型是 void(int)
template <typename RET, typename... ARGS> class Signal<RET(ARGS...)> {
public:
	// 虽然槽函数的返回类型可以是非 void,但信号槽机制无法获得其返回值
    typedef std::function<void(ARGS...)> Callback;
    typedef SignalImpl<Callback> SignalImpl;
    typedef SlotImpl<Callback> SlotImpl;

    Signal() : impl_(new SignalImpl) {}

    ~Signal() {}

    // 连接一个槽函数,即注册回调函数
    // 返回一个 Slot,该变量不该忽略,所以添加 [[nodiscard]]
    // 因为信号与槽的对应关系由 Slot 的有效性确定,该参数忽略
    // 意味着 Slot 失效,就无法进行回调的操作
    [[nodiscard]] Slot connect(Callback&& func) {
        std::shared_ptr<SlotImpl> slotImpl(
            new SlotImpl(impl_, std::forward<Callback>(func)));
        add(slotImpl);
        return slotImpl;
    }

	// 绑定槽函数的对象
    [[nodiscard]] Slot connect(Callback&& func, const std::shared_ptr<void>& tie) {
        std::shared_ptr<SlotImpl> slotImpl(new SlotImpl(impl_, func, tie));
        add(slotImpl);
        return slotImpl;
    }

	// 调用信号所对应的所有槽函数
    void call(ARGS&&... args) {
        SignalImpl& impl(*impl_);
        std::shared_ptr<typename SignalImpl::SlotList> slots;
        {
	        // 保证多线程调用 call 时的线程安全
            std::unique_lock<std::mutex> lock(impl.mutex_);
            slots = impl.slots_; // 一个新的 shared_ptr 指向
        }
        typename SignalImpl::SlotList& s(*slots);
        for (auto it = s.begin(); it != s.end(); ++it) {
            // 根据 weak_ptr 生成一个 shared_ptr
            // 如果成功则代表槽有效,可以调用
            std::shared_ptr<SlotImpl> slotImpl = it->lock();
            if (slotImpl) {
                std::shared_ptr<void> guard;
                // 如果有绑定对象,检查对象的有效性
                if (slotImpl->tied_) {
                    guard = slotImpl->tie_.lock();
                    if (guard) {
                        slotImpl->cb_(args...);
                    }
                } else {
                    slotImpl->cb_(args...);
                }
            }
        }
    }

private:
    void add(const std::shared_ptr<SlotImpl>& slot) {
        SignalImpl& impl(*impl_);
        {
	        // 保证多线程调用 add 时的线程安全
            std::unique_lock<std::mutex> lock(impl.mutex_);
            impl.copyOnWrite();
            impl.slots_->push_back(slot);
        }
    }

	// 指针指向不可变
    const std::shared_ptr<SignalImpl> impl_;
};

这个类要复杂的多,首先 connect() 连接一个槽函数,表示在该信号中注册一个槽函数,对应的在 call() 中进行调用。至于为什么返回值不能忽略,我们就得回到 SignalImpl,由于其槽列表保存的是 weak_ptr,所以不负责槽的生命周期,我们连接一个槽函数,即注册一个回调函数,理应有注销的操作,这个注销操作由返回的 Slot 对象的有效性负责。

即如果想要注销该回调,只要销毁对应 connect 返回的 Slot 即可。

接着到 call() 函数,该函数中的互斥量用来保证在多线程情况下,slots 不会获得到一个 clean 了一半的列表。实际上该函数在多线程中属于消费者,并不会修改列表,但互斥量只保护了 shared_ptr 的构造,在后续的循环中,列表有可能被写,这种情况怎么办呢?

接下来就涉及到 race condition free 和 thread contention free 的巧妙实现了:Copy on write

我们回到 copyOnWrite() 函数,他有一个判断,如果指针独享列表,则不进行复制,如果列表还有其他指针指向,则进行复制,为什么?

Copy on write 顾名思义就是需要写操作的时候进行复制,即当 call() 函数在循环中不断读取列表时,此时另一个线程想要对列表进行 clean,也就是写操作,发现该列表有多于一个指针指向他,所以此时我并不能随意对该列表进行写操作,因为有其他线程可能正在读。

那么此时我就进行复制操作,保证我进行的写操作不会影响其他线程的读,也就是 Copy on write 的由来。所以,当 call() 函数执行到循环部分时,可以保证列表不会被更改,这样就避开用复杂的条件变量来实现互斥操作,可以实现最大程度的并发,同时进行读写操作。

那么会造成内存泄漏吗?都是智能指针,显然不会。我们要知道,发生 Copy on write 之后的列表才是正源,其他正在读的线程里所保存的列表可以看作是临时变量,在函数退出之后如果指针引用变为 0,自然会销毁。

最后一个 add() 函数则同理,该函数和 clean() 一样是写操作,当对象有多个指针指向时,则进行复制,不影响其他线程的读操作,互斥量用来保证同时只能有一个写操作在进行。

总结

该机制实现起来虽然只有短短 100+ 行,但其中使用到的技巧令人叹为观止,对 shared_ptr 的引用计数有非常巧妙的应用,实现了 race condition free 和 thread contention free,以达到最大程度的并发,阅读完受益匪浅。

简单做个要点的总结:

  1. 一个信号可以注册多个槽,但一个槽(Slot)只由一个信号触发,如果想要多个信号触发同一个槽函数,可以在多个信号里注册同一个槽函数
  2. 回调注册通过 Signal 类完成,回调注销通过 Slot 的有效性完成
  3. Copy on write 实现 race condition free 和 thread contention free,做到可以同时读写,最大程度并发
上一篇 下一篇

评论 | 0条评论