一、数据结构设计
stream线程池采用无锁队列实现,其目的是为了高效管理线程的出/入池操作。定义结构体ThreadSlot保存线程池中每一个线程的信息,包含:线程状态、线程号、线程对应的database oid、线程执行所需的信息StreamProducer,StreamProducer是父线程向子线程传递的唯一结构、线程唤醒所需的锁和条件变量。如下所示:
typedef struct { int status; uint32 idx; ThreadId tid; Oid dbOid; StreamProducer* streamObj; pthread_mutex_t m_mutex; pthread_cond_t m_cond; } ThreadSlot; |
定义结构体StreamThreadPool表征线程池,结构如下所示:
class StreamThreadPool: public BaseObject { public: StreamThreadPool(); void Init(int num); // StreamThreadPool init int Call(StreamProducer* obj); // 获取idle线程 或 create 新线程 bool Wait(); // idle线程等待唤醒或者超时退出 ThreadSlot* GetLocalSlot(); // get streamThreadSlot void SetLocalSlot(int slotIdx); // set streamThreadSlot StreamPool* GetLocalPool(); // 获取streamDBPool 或 新建一个 ThreadSlot* PopSlot(); // 从idleRing/emptyRing获取一slot void PushToEmpty(ThreadSlot* slot); // 将slot直接放入emptyRing void PushToIdle(StreamPool* pool, ThreadSlot* slot); //将slot直接放入idleRing void LocalPushToIdle(); // 根据状态,将slot放入idleRing void LocalPushToEmpty(); // 根据状态,将slot放入emptyRing int CleanStreamPool(const char *dbName, cleanOption cleanMode); //根据db信息清线程 void CleanInAllStreamPool(int desNum); // 调整线程池中stream线程个数 int GetStreamNum(); // 获取线程池中stream线程个数 bool Release(); // 判断超时线程是否需要清理 bool TimeoutClean(); // 清理超时idle线程 private: int size; ThreadSlot* threadSlots; ArrayLockFreeQueue emptyRing; StreamPool* PoolListHead; } |
结构体中,size表示线程池中拟预留的ThreadSlot个数,ThreadSlot被保存在threadSlots数组中;无锁队列emptyRing用来保存未创建线程的ThreadSlot,对应地,idleRing用来保存空闲的已创建stream线程的ThreadSlot。
由于stream线程的初始化信息和database是强相关的,如果不保留database相关的信息,那么线程初始化的时间代价仍然较高,所以线程池中的线程复用时,需要满足database信息匹配,所以一个emptyRing和一个database相匹配,保存在链表PoolListHead中,链表元素StreamPool对应结构如下:
typedef struct StreamPool { Oid dbOid; ArrayLockFreeQueue idleRing; struct StreamPool* next; } StreamPool; |
线程池中各结构间组织的直观图如下所示:
图一、基于无锁队列的线程池结构
上图中threadSlots可以放在idleRing(蓝色)、emptyRing(绿色)和运行空间(黄色)中,具体在下节介绍。
二、stream线程状态转移DFA设计
每一个记录线程信息的结构ThreadSlot中都保存了线程当前的状态status,记录线程状态的目的是为了保障线程执行过程的有序控制,也可以通过状态的互斥避免threadSlot不会被两个线程同时使用。
stream线程状态转移用确定性有限状态机(DFA,definite automata)表征,共包含4个状态:
STREAM_SLOT_EXIT、STREAM_SLOT_IDLE、STREAM_SLOT_HOLD和STREAM_SLOT_RUN状态。其物理含义如下:
- STREAM_SLOT_EXIT:线程退出状态,表示线程未被创建或线程已退出;
- STREAM_SLOT_IDLE:线程可复用状态,表示线程在idleRing中,可以被复用;
- STREAM_SLOT_HOLD:线程临时独占状态,表示线程在做进入下一个状态的准备工作;
- STREAM_SLOT_RUN:线程运行状态,表示线程正在执行任务。
状态间转移条件如下所示,图中粗箭头表示状态机主循环部分:
图二、stream线程状态转移
与状态对应的,是slot所处的位置,slot所处的位置有三处,分别是idleRing、emptyRing和运行空间,slot从无锁队列中拿出,运行时所处的位置,我们称之为运行空间。各状态所处的位置情况如下所示:
- STREAM_SLOT_EXIT:idleRing(idle线程超时)、emptyRing(初始化或者FATAL);
- STREAM_SLOT_IDLE:idleRing
- STREAM_SLOT_HOLD:运行空间(从无锁队列中取出)、idleRing(idle线程超时或中断);
- STREAM_SLOT_RUN:运行空间。
Slot的位置变化和状态转移的关系如下,图中粗箭头表示状态机主循环部分:
图三、stream线程状态转移和slot位置的转移关系
根据各状态所处的位置情况,从idleRing中取出的slot可能有三种状态:EXIT、IDLE、HOLD。当取出IDLE状态的slot,说明线程可复用;当取出EXIT状态的slot,说明线程已退出,此时需要将slot转存到emptyRing;当取出HOLD状态,说明线程正在被使用,此时需要放回idleRing。
EmptyRing中slot的状态只能是EXIT,运行空间中slot的状态要么是HOLD(刚取出还未运行),要么是RUN(正在运行),不再赘述。