TaskFlow-源码浅析-Executor类

类Executor是负责执行taskflow的类。

构造函数

构造函数中创建了一个大小为N的线程池 _threads 和 大小为N的Worker队列 _workers,并在 _spawn 中初始化

首先看线程池的部分,可以看到在 _spawn 中有一个for循环,初始化了使用 lambda表达式 去初始化了 _threads 中的每一个线程,所有线程在初始化后将开始运行

其中 Worker& w, std::mutex& mutex, std::condition_variable& cond, size_t& n 都是引用,为了保证线程中正确使用,对应的实参使用了std::ref

inline void Executor::_spawn(size_t N) {
    // ...
    for(size_t id=0; id<N; ++id) {
        // ...
        _threads[id] = std::thread([this] (
            Worker& w, std::mutex& mutex, std::condition_variable& cond, size_t& n
        ) -> void {
            // ...
        }, std::ref(_workers[id]), std::ref(mutex), std::ref(cond), std::ref(n));
    }
    // ...
}

在该lambda开始部分,worker的_threads指针指向自身线程,接着设置线程ID到worker ID的映射。

为了防止多线程读写_wids,mutex是主线程和_threads中所有线程共享的,对其上锁可以保证对_wids读写不会发生冲突。

lock是块级变量,在std::scoped_lock lock(mutex)中 构造函数调用mutex.lock上锁,在块结束时析构函数调用mutex.unlock释放锁

在主线程执行完for以后,将使用mutex构造unique_lock变量,使用condition_variable的wait函数堵塞主线程,wait被调用和会释放所在线程持有的 mutex 锁, 所以这里不会造成死锁

_threads里的各个线程会对启动的线程计数,需要的N个线程都启动后,使用cond.notify_one唤醒因cond.wait堵塞的主线程,构造函数完成

inline void Executor::_spawn(size_t N) {
    std::mutex mutex;
    std::condition_variable cond;
    size_t n=0;
    for(size_t id=0; id<N; ++id) {
        // ...
        _threads[id] = std::thread([this] (
            Worker& w, std::mutex& mutex, std::condition_variable& cond, size_t& n
        ) -> void {
            w._thread = &_threads[w._id];
            {
                std::scoped_lock lock(mutex);
                _wids[std::this_thread::get_id()] = w._id;
                if(n++; n == num_workers()) {
                cond.notify_one();
                }
            }
            // ...
        }, std::ref(_workers[id]), std::ref(mutex), std::ref(cond), std::ref(n));
    }
    std::unique_lock<std::mutex> lock(mutex);
    cond.wait(lock, [&](){ return n==N; });
}

消费者

thread 接受了一个lambda作为运行函数,处理了worker和threads中的映射后,剩下部分是一个死循环,不停的消费 worker 的 wsq 中的内容

有趣的是,这里是先执行_exploit_task,再执行_wait_for_task,而不是相反的操作。

这个死循环只有当 _wait_for_task 的返回值为false时才会结束;而 _wait_for_task 只有 _done 才会返回false;而 _done 只用在 Excutor被析构的时候才会置为true

inline void Executor::_spawn(size_t N) {
    //...
    _threads[id] = std::thread([this] (
        Worker& w, std::mutex& mutex, std::condition_variable& cond, size_t& n
    ) -> void {
        // ...
        while(1) {
          // execute the tasks.
          _exploit_task(w, t);
          // wait for tasks
          if(_wait_for_task(w, t) == false) {
            break;
          }
        }
    }, std::ref(_workers[id]), std::ref(mutex), std::ref(cond), std::ref(n));
    // ...
}

_exploit_task

其中, _exploit_task 对获取的任务(实际是Node*), 交由 _invoke 处理,然后从worker的TaskQueue中获取下一个

_invoke 在执行完 Node 后,会把新的可以执行 Node 放入当前 worker 的 _wsq

// Procedure: _exploit_task
inline void Executor::_exploit_task(Worker& w, Node*& t) {
  while(t) {
    _invoke(w, t);
    t = w._wsq.pop();
  }
}

_wait_for_task

在工作队列为空后,_wait_for_task 将使用 _explore_task 获取一个Node

当 _explore_task 成功获取到 Node 时会尝试唤醒另一个线程避免饿死

tips:线程饿死(Thread Starvation)是指某些线程在系统中无法获得所需的资源而无限期地等待的情况。这些资源可以是CPU时间、内存、锁、文件句柄等。线程饿死可能会导致系统性能下降和资源浪费。

源码这里的描述也很灵性:The last thief who successfully stole a task will wake up another thief worker to avoid starvation.

在没有获取到 Node 的时候,遍历所有的worker,检查他们的 _wsq 然后找到一个受害者, 这里的命名很有趣,需要去偷取任务的WorkerID被命名成了vtm(victim), 也就是受害者的意思,实属生动形象了

这里2PC guard是两阶段提交,一种常见的安全机制,用于确保只有所有参与方同意执行事务时才会执行。

_explore_task

过程如下:

  1. 获取新的任务,指去一个Worker中偷取(steal)一个任务
  2. 如果偷到了就直接返回了
  3. 如果没偷到且偷得过于频繁(偷取次数超过_MAX_STEALS)就歇歇(堵塞当前线程)
  4. 如果没偷到且歇歇的次数过于频繁(num_yields>100)就直接返回了
  5. 其余情况再找一个受害者

流程图如下:

源代码如下:

生产者

run 方法最终会调用 run_until(Taskflow& f, P&& p, C&& c)

_set_up_topology 获取了入度为0的Node,将这些 Node 组装成了一个 Node 的 list

最终调用了 _schedule 将这些Node放到了对应worker的_wsq中或Executor的_wsq,而消费者线程会不停的尝试从 _wsq 中获取 Node 并执行Node对应函数

以下是调度方法的不同重载形式,将一个(Node* node)或多个(SmallVector<Node*>& nodes)放到对应worker的_wsq中,如果没有对应的worker就放入Executor的_wsq