嵌入式事件驱动代码的难题
事件驱动(event driven)是一种常见的代码模型,其通常会有一个主循环(mainloop)不断的从队列中接收事件,然后分发给相应的函数/模块处理。常见使用事件驱动模型的软件包括图形用户界面(GUI),嵌入式设备软件,网络服务端等。
本文以一个高度简化的嵌入式处理模块做为事件驱动代码的例子:假设该模块需要处理用户命令、外部消息、告警等各种事件,并在主循环中进行分发,那么示例代码如下:
#include <iostream>
#include <vector>
enum class EventType {
COMMAND,
MESSAGE,
ALARM
};
// 仅用于模拟接收的事件序列
std::vector<EventType> g_events{EventType::MESSAGE, EventType::COMMAND, EventType::MESSAGE};
void ProcessCmd()
{
std::cout << "Processing Command" << std::endl;
}
void ProcessMsg()
{
std::cout << "Processing Message" << std::endl;
}
void ProcessAlm()
{
std::cout << "Processing Alarm" << std::endl;
}
int main()
{
for (auto event : g_events) {
switch (event) {
case EventType::COMMAND:
ProcessCmd();
break;
case EventType::MESSAGE:
ProcessMsg();
break;
case EventType::ALARM:
ProcessAlm();
break;
}
}
return 0;
}
这只是一个极简的模型示例,真实的代码要远比它复杂得多,可能还会包含:从特定接口获取事件,解析不同的事件类型,使用表驱动方法进行分发……不过这些和本文关系不大,可暂时先忽略。
用顺序图表示这个模型,大体上是这样:
在实际项目中,常常碰到的一个问题是:有些事件的处理时间很长,比如某个命令可能需要批量的进行上千次硬件操作:
void ProcessCmd()
{
for (int i{0}; i < 1000; ++i) {
// 操作硬件接口……
}
}
这种事件处理函数会长时间的阻塞主循环,导致其他事件一直排队等待。如果所有事件对响应速度都没有要求,那也不会造成问题。但是实际场景中经常会有些事件是需要及时响应的,比如某些告警事件出现后,需要很快的执行业务倒换,否则就会给用户造成损失。这个时候,处理时间很长的事件就会产生问题。
有人会想到额外增加一个线程专用于处理高优先级事件,实践中这确实是个常用方法。然而在嵌入式系统中,事件处理函数会读写很多公共数据结构,还会操作硬件接口,如果并发调用,极容易导致各类数据竞争和硬件操作冲突,而且这些问题常常很难定位和解决。那在多线程的基础上加锁呢?——设计哪些锁,加在哪些地方,也是非常烧脑而且容易出错的工作,如果互斥等待过多,还会影响性能,甚至出现死锁等麻烦的问题。
另一种解决方案是:把处理时间很长的任务切割成很多个小任务,并重新加入到事件队列中。这样就不会长时间的阻塞主循环。这个方案避免了并发编程产生的各种头疼问题,但是却带来另一个难题:如何把一个大流程切割成很多独立小流程?在编码时,这需要程序员解析函数流程的所有上下文信息,设计数据结构单独存储,并建立关联这些数据结构的特殊事件。这往往会带来几倍的额外代码量和工作量。
这个问题几乎在所有事件驱动型软件中都会存在,但在嵌入式软件中尤为突出。这是因为嵌入式环境下的CPU、线程等资源受限,而实时性要求高,并发编程受限。
C++20语言给这个问题提供了一种新的解决方案:协程。
C++20的协程简介
关于协程(coroutine)是什么,在wikipedia[1]等资料中有很好的介绍,本文就不赘述了。在C++20中,协程的关键字只是语法糖:编译器会将函数执行的上下文(包括局部变量等)打包成一个对象,并让未执行完的函数先返回给调用者。之后,调用者使用这个对象,可以让函数从原来的“断点”处继续往下执行。
使用协程,编码时就不再需要费心费力的去把函数“切割”成多个小任务,只用按照习惯的流程写函数内部代码,并在允许暂时中断执行的地方加上co_yield语句,编译器就可以将该函数处理为可“分段执行”。
协程用起来的感觉有点像线程切换,因为函数的栈帧(stack frame)被编译器保存成了对象,可以随时恢复出来接着往下运行。但是实际执行时,协程其实还是单线程顺序运行的,并没有物理线程切换,一切都只是编译器的“魔法”。所以用协程可以完全避免多线程切换的性能开销以及资源占用,也不用担心数据竞争等问题。
可惜的是,C++20标准只提供了协程基础机制,并未提供真正实用的协程库(在C++23中可能会改善)。目前要用协程写实际业务的话,可以借助开源库,比如著名的cppcoro[2]。然而对于本文所述的场景,cppcoro也没有直接提供对应的工具(generator经过适当的包装可以解决这个问题,但是不太直观),因此我自己写了一个切割任务的协程工具类用于示例。
自定义的协程工具
下面是我写的SegmentedTask工具类的代码。这段代码看起来相当复杂,但是它作为可重用的工具存在,没有必要让程序员都理解它的内部实现,一般只要知道它怎么用就行了。SegmentedTask的使用很容易:它只有3个对外接口:Resume、IsFinished和GetReturnValue,其功能可根据接口名字自解释。
#include <optional>
#include <coroutine>
template<typename T>
class SegmentedTask {
public:
struct promise_type {
SegmentedTask<T> get_return_object()
{
return SegmentedTask{Handle::from_promise(*this)};
}
static std::suspend_never initial_suspend() noexcept { return {}; }
static std::suspend_always final_suspend() noexcept { return {}; }
std::suspend_always yield_value(std::nullopt_t) noexcept { return {}; }
std::suspend_never return_value(T value) noexcept
{
returnValue = value;
return {};
}
static void unhandled_exception() { throw; }
std::optional<T> returnValue;
};
using Handle = std::coroutine_handle<promise_type>;
explicit SegmentedTask(const Handle coroutine) : coroutine{coroutine} {}
~SegmentedTask()
{
if (coroutine) {
coroutine.destroy();
}
}
SegmentedTask(const SegmentedTask&) = delete;
SegmentedTask& operator=(const SegmentedTask&) = delete;
SegmentedTask(SegmentedTask&& other) noexcept : coroutine(other.coroutine) { other.coroutine = {}; }
SegmentedTask& operator=(SegmentedTask&& other) noexcept
{
if (this != &other) {
if (coroutine) {
coroutine.destroy();
}
coroutine = other.coroutine;
other.coroutine = {};
}
return *this;
}
void Resume() const { coroutine.resume(); }
bool IsFinished() const { return coroutine.promise().returnValue.has_value(); }
T GetReturnValue() const { return coroutine.promise().returnValue.value(); }
private:
Handle coroutine;
};
自己编写协程的工具类不光需要深入了解C++协程机制,而且很容易产生悬空引用等未定义行为。因此强烈建议项目组统一使用编写好的协程类。如果读者想深入学习协程工具的编写方法,可以参考Rainer Grimm的博客文章[3]。
接下来,我们使用SegmentedTask来改造前面的事件处理代码。当一个C++函数中使用了co_await、co_yield、co_return中的任何一个关键字时,这个函数就变成了协程,其返回值也会变成对应的协程工具类。在示例代码中,需要内层函数提前返回时,使用的是co_yield。但是C++20的co_yield后必须跟随一个表达式,这个表达式在示例场景下并没必要,就用了std::nullopt让其能编译通过。实际业务环境下,co_yield可以返回一个数字或者对象用于表示当前任务执行的进度,方便外层查询。
协程不能使用普通return语句,必须使用co_return来返回值,而且其返回类型也不直接等同于co_return后面的表达式类型。
enum class EventType {
COMMAND,
MESSAGE,
ALARM
};
std::vector<EventType> g_events{EventType::COMMAND, EventType::ALARM};
std::optional<SegmentedTask<int>> suspended; // 没有执行完的任务保存在这里
SegmentedTask<int> ProcessCmd()
{
for (int i{0}; i < 10; ++i) {
std::cout << "Processing step " << i << std::endl;
co_yield std::nullopt;
}
co_return 0;
}
void ProcessMsg()
{
std::cout << "Processing Message" << std::endl;
}
void ProcessAlm()
{
std::cout << "Processing Alarm" << std::endl;
}
int main()
{
for (auto event : g_events) {
switch (event) {
case EventType::COMMAND:
suspended = ProcessCmd();
break;
case EventType::MESSAGE:
ProcessMsg();
break;
case EventType::ALARM:
ProcessAlm();
break;
}
}
while (suspended.has_value() && !suspended->IsFinished()) {
suspended->Resume();
}
if (suspended.has_value()) {
std::cout << "Final return: " << suspended->GetReturnValue() << endl;
}
return 0;
}
出于让示例简单的目的,事件队列中只放入了一个COMMAND和一个ALARM,COMMAND是可以分段执行的协程,执行完第一段后,主循环会优先执行队列中剩下的事件,最后再来继续执行COMMAND余下的部分。实际场景下,可根据需要灵活选择各种调度策略,比如专门用一个队列存放所有未执行完的分段任务,并在空闲时依次执行。
本文中的代码使用gcc 10.3版本编译运行,编译时需要同时加上-std=c++20和-fcoroutines两个参数才能支持协程。代码运行结果如下:
Processing step 0
Processing Alarm
Processing step 1
Processing step 2
Processing step 3
Processing step 4
Processing step 5
Processing step 6
Processing step 7
Processing step 8
Processing step 9
Final return: 0
可以看到ProcessCmd函数(协程)的for循环语句并没有一次执行完,在中间插入了ProcessAlm的执行。如果分析运行线程还会发现,整个过程中并没有物理线程的切换,所有代码都是在同一个线程上顺序执行的。
使用了协程的顺序图变成了这样:
事件处理函数的执行时间长不再是问题,因为可以中途“插入”其他的函数运行,之后再返回断点继续向下运行。
总结
一个较普遍的认识误区是:使用多线程可以提升软件性能。但事实上,只要CPU没有空跑,那么当物理线程数超过了CPU核数,就不再会提升性能,相反还会由于线程的切换开销而降低性能。大多数开发实践中,并发编程的主要好处并非为了提升性能,而是为了编码的方便,因为现实中的场景模型很多都是并发的,容易直接对应成多线程代码。
协程可以像多线程那样方便直观的编码,但是同时又没有物理线程的开销,更没有互斥、同步等并发编程中令人头大的设计负担,在嵌入式应用等很多场景下,常常是比物理线程更好的选择。
相信随着C++20的逐步普及,协程将来会得到越来越广泛的使用。
尾注
[1] https://en.wikipedia.org/wiki/Coroutine
[2] https://github.com/lewissbaker/cppcoro
[3] https://www.modernescpp.com/index.php/tag/coroutines