Gparallel Save

A DAG based parallel task schedule framework for galois advertising|基于DAG(Directed Acyclic Graph)的并行任务调度系统,自动推导节点依赖生成DAG。

Project README

gParallel

Build Status

gparallel是一个针对具有复杂流程或逻辑的单体式信息检索系统而设计的并行任务调度框架。使用Meta Programming技术根据任务的输入和输出自动推导依赖关系,生成DAG(Directed acyclic graph)并进行并行任务调度。

Quick start

  • 编译依赖
    • g++8
    • boost_log-mt v1.70
    • gtest v1.10.0

下载编译test和demo

$ git clone [email protected]:galois-advertising/gparallel.git
$ cd gparallel
$ git submodule update --init --recursive
$ mkdir build
$ cd build
$ cmake ..
$ make
$ ./test
$ ./demo

将gparallel作为你项目的一部分

首先将gparallel以及所依赖的commongtest设置为git submodule

cd your-project
git add submodule -b master https://github.com/galois-advertising/gparallel
git add submodule -b master https://github.com/galois-advertising/common
git add submodule https://github.com/google/googletest.git gtest
cd gtest
git checkout release-1.10.0
git add gtest
git commit -m "Add gparallel"

并且修改CMakeLists.txt,加入:

INCLUDE_DIRECTORIES("${CMAKE_SOURCE_DIR}/common/util/include")
INCLUDE_DIRECTORIES("${CMAKE_SOURCE_DIR}/gparallel/include")
IF (NOT TARGET gtest)
    ADD_SUBDIRECTORY(gtest)
    ENABLE_TESTING()
    INCLUDE_DIRECTORIES(SYSTEM
        ${gtest_SOURCE_DIR}
        ${gtest_SOURCE_DIR}/include)
ENDIF()

IF (NOT TARGET common)
    ADD_SUBDIRECTORY(common)
ENDIF()

IF (NOT TARGET gparallel)
    ADD_SUBDIRECTORY(gparallel)
ENDIF()
ADD_EXECUTABLE(your-bin ...)
TARGET_LINK_LIBRARIES(your-bin common gparallel)

背景介绍

对于单体型业务系统,在系统建立初期,系统业务还比较简单,每次检索请求到来时需要执行的业务逻辑也比较单一,此时请求级别的数据变量比较少,这些变量的赋值顺序与依赖关系也一目了然,系统很容易维护。

但是,随着开发的人越来越多,大家都在上面加入自己的业务逻辑和新的数据变量,此时变量增加到了几百个,变量之间的赋值顺序与依赖关系开始变得复杂,一些代码逻辑甚至打破了已有的流程结构(俗称飞线),整个系统难以理解。没有一个人能说清楚这些变量之间的依赖关系以及这些业务逻辑之间的执行顺序,每次新的开发都如履薄冰,每次排查问题都耗时耗力。

对于多人参与,迭代密集的系统,如何更合理地组织各类型的数据,如何更好地实现繁杂的业务逻辑,这就是为需要gparallel的理由.

gparallel是一款基于DAG(Directed acyclic graph)的并且支持自动依赖推导的任务调度框架。

DAG在计算机领域有着广泛的应用,例如在大数据计算中可以使用DAG指导Hadoop任务的执行顺序等等。在软件设计中也被广泛应用,开源社区中DAG-based调度框架也有很多,例如cpp-taskflow。但是,几乎所有框架都采用了 手工配置的方式 生成调度DAG。例如在一个拥有四个任务的调度系统中,cpp-taskflow需要通过下面方式来配置DAG。

    auto [A, B, C, D] = taskflow.emplace(
      [] () { std::cout << "TaskA\n"; },
      [] () { std::cout << "TaskB\n"; },
      [] () { std::cout << "TaskC\n"; },
      [] () { std::cout << "TaskD\n"; }
    );
    A.precede(B);  // A runs before B 
    A.precede(C);  // A runs before C 
    B.precede(D);  // B runs before D 
    C.precede(D);  // C runs before D 
}

上面代码中想要生成预期的DAG图需要人工显式定义每两个节点之间的依赖关系,这种方式虽然理解比较直观,但是缺点也非常明显:

  • 在有大量任务的时候,人工定义DAG图比较困难并且容易出错。现实中的业务系统一般是多人同时开发,这就需要模块owner对所有的任务节点之间的依赖关系进行人工梳理,可维护性较差。
  • 工业环境中很多业务,往往以数据流驱动的方式表达会更加清晰,这就需要花费大量时间来将系统逻辑从数据驱动表示强行转化为任务驱动表示来适配调度系统,耗时耗力。

gparallel如何解决问题

gparallel的主要思想有3个:

  • 数据划分:将所有数据成员,按照业务逻辑数据状态划分为不同的集合。
  • 依赖推导:将所有的代码逻辑,按照功能划分为不同的task节点,并且自动推导task节点之间的依赖关系,建立DAG。
  • 任务调用:通过拓扑排序,将DAG转化为偏序表示,并使用thread或者coroutine对task进行调度。

数据划分

在检索系统中,数据一般就是指检索过程中用来存储中间结果和最终结果的变量,比如存储广告的std::list,存储标题的std::string。gparallel主要从2个角度进行划分:

  • 按照业务逻辑:把不同业务逻辑所需要用到的数据划分为不同的集合。比如有不同的广告营销产品,各自都有自己的User、Plan和Ad的集合,以及一些存储数中间结果的变量。为了方便描述,我们用大写字母来表示按照业务逻辑划分出来的数据即可,例如ABC等等。

  • 按照数据状态:更进一步,对于服务于同一个业务的数据集合(A),在不同的阶段,又可以划分为不同的状态。例如一个广告队列,初始状态是空(empty),经过填充以后有N条广告(inited),又经过了一次按照CTR的排序(ranked),最后经历了一次截断(truncate),那么对应四个状态A_emptyA_initedA_rankedA_truncated

为了方便理解,我们把划分出来的每个子集(包含不同的阶段),叫做meta。前面提到的A_emptyA_initedA_rankedA_truncate都是meta。在gparallel中,我们用DECL_META宏来定义一个meta

meta:用来描述指定业务所需要的所有数据(指定节点)的集合的一种数据结构

理解gparallel对数据的2层递进划分方式非常重要,因为gparallel的DAG自动推导过程正是依赖于不同的meta。

现实中的系统中,数据成员一般放置在一个叫做context或者thread_data的结构体中。顾名思义,这些数据的作用范围就是一次请求,一个比较常见的设计是一次请求由线程池中的一个线程来独立负责,所以请求级别的数据,往往也是线程级别的数据。这个context或者thread_data类型,在代码实现中通常用meta_storage_t来表示,即所一次检索中用到的所有数据,都统一存储在这里。

通过定义gettersetter可以对子集需要包含的元素进行指定,如果定义了gettersetter就代表这个meta中包含这个数据成员。

子集之间也可以互相包含,原理与面向对象中的继承是一样的。同理,如果一个任务依赖于一个meta,则也同样依赖于这个meta的父metagparallel在推导依赖时会自动将继承关系也考虑在内。继承机制的主要目的是为了避免重复定义集合的元素,增加代码的可维护性。在实践中,通常将很多业务公用的数据单独定义为一个meta,属于单独业务的meta,可以从前面的公共meta进行继承。

通过下面的例子可以理解meta_storage_tmeta继承的关系。

通过上图可以看到,所有数据被放置在thread_data这个对象中(meta_storage_t),通过三个metathread_data中的数据划分为三个集合:

  • meta_common包含了thread_data::id
  • meta_a包含了thread_data::business_a并且继承了meta_common的所有元素。
  • meta_b包含了thread_data::business_b并且继承了meta_common的所有元素。

任务定义

gparallel中,使用一个函数表示一个具体的任务,函数的参数表示任务的输入输出。任何一个meta既可以作为输入,也可以作为输出。这里引入2个模版包装器inputoutput。如果metainput包装,则任务函数会将其当作一个输入数据,同理如果用output包装,则会当作输出。

表示任务的函数,必须定义为类的静态成员函数,函数名字必须为process,返回类型为void。例如:

struct DemoNode {
    void process(input<meta_a> a, input<meta_b> b, output<meta_c> c) {
        // process code
        c->mutable_business_c() = a->get_business_a() + b->get_business_b();
    }
}

上面的DemoNode实现了将business_abusiness_b的和赋值给business_c的逻辑。 process函数可以拥有任意多个输入和任意多个输出。其中没有输入的节点作为起始节点之一,没有输出的节点作为终止节点之一。

任务调度

通过register_node函数将所有节点注册到DAG中后,使用setup_dag_schema函数可自动实现节点依赖分析以及DAG生成。针对生成的DAG进行拓扑排序,就能得到任务调度顺序。后面可根据具体情况,实现多线程或者多协程调度。

dag_schema<thread_data> nodes;
register_node<thread_data, node_a, node_b, node_c ...>::reg(nodes);
setup_dag_schema<thread_data>(nodes);
if (auto tasks = topological_sort<thread_data>(nodes); tasks) {
    for (auto task : tasks.value()) {
        task->mutable_executor()(&td);
    }
}

gparallel实战

在本小结中,会从一个现实的场景来描述gparallel的使用逻辑。

问题描述:对一个指定的广告队列,分别请求其CTR(点击率)值和CPM(千次展示成本)并填充到广告的对应字段,最后分别按照CTRCPM进行排序后生成2个新的广告队列供下游使用。

上述流程是广告检索系统里面一个比较典型的逻辑,完整的代码在./demo/advprocess.cpp

数据划分

我们首先梳理一下所需要用到的数据对象:

数据名称 类型 含义
advs_original advlist_t 原始的广告队列
ctr_data advlist_t 模型返回的ctr数据
cpm_data advlist_t 模型返回的cpm数据
advs_ctr_ordered ctr_response_t 输出的ctr排序的广告队列
advs_cpm_ordered cpm_response_t 输出的cpm排序的广告队列

根据上面的定义,我们定义数据集合,业务执行需要用到的所有数据都放在thread_data这个集合中:

demo_thread_data.png

class thread_data {
public:
    advlist_t advs_original;
    advlist_t advs_ctr_ordered;
    advlist_t advs_cpm_ordered;
    ctr_response_t ctr_data;
    cpm_response_t cpm_data;
};

接下来我们根据不同的业务,将集合划分为不同的子集,每个子集就是一个meta,一个元素可以同时属于多个metametameta之间可以互相包含。 根据问题的描述,我们可以很容易总结出5个子流程,每个子流程都对应一个数据处理节点:

流程节点名称输入输出
获取CTR值get_ctr_nodeadvs_originalctr_data
获取CPM值get_cpm_nodeadvs_originalcpm_data
填充字段fill_node advs_original
ctr_data
cpm_data
advs_original*
生成CTR
排序队列
gen_ctr_nodeadvs_original* advs_ctr_ordered
生成CPM
排序队列
gen_cpm_node advs_original*advs_cpm_ordered

可以看到原始输入的广告队列是advs_original,这里我们将其封装为metaoriginal

get_ctr_node节点和get_cpm_node节点通过metaoriginal分别获取ctr_datacpm_data两份数据,这两份数据我们用metactr和metacpm来封装。

fill_node节点对广告队列进行数据填充,这里注意,节点的输入中有metaoriginal,输出中有metaoriginal_with_ctr_cpm。这2个meta其实本质上都是advs_original的封装,但是因为属于2个阶段(即填充前和填充后),所以分别用2个不同的meta来表示,在实现上,我们可以直接使用继承功能,复用metaoriginal

gen_ctr_nodegen_cpm_node的输入都包含metaoriginal_with_ctr_cpm,表示其依赖于填充后的advs_original而不是填充前。

这样就可以根据不同的业务逻辑,把thread_data集合划分为不同的meta,每个业务只需要关注子集需要用到哪些meta,生成哪些meta即可。 demo_meta.png

任务定义

首先实现get_ctr_nodeget_cpm_node这两个任务节点。

struct get_ctr_node {
    static void process(input<original> ori, output<ctr> ctr) {
        INFO("[gparallel] get_ctr_node", "");
        ctr->mutable_ctr_data().resize(ori->mutable_advs_original().size());
        for (int pos = 0; pos < ori->mutable_advs_original().size(); pos++) {
            auto & adv = ori->mutable_advs_original()[pos];
            ctr->mutable_ctr_data()[pos] = 0.1 * static_cast<double>(adv.id);
        }
    }
};

struct get_cpm_node {
    static void process(input<original> ori, output<cpm> cpm) {
        INFO("[gparallel] get_cpm_node", "");
        cpm->mutable_cpm_data().resize(ori->mutable_advs_original().size());
        for (int pos = 0; pos < ori->mutable_advs_original().size(); pos++) {
            auto & adv = ori->mutable_advs_original()[pos];
            cpm->mutable_cpm_data()[pos] = 100.2 * static_cast<double>(adv.id);
        }

    }
};

这两个任务的功能相似,都是根据original中存储的所有广告,获取对应的点击率千次展示成本数据,并且分别保存到ctrcpm这两个meta。这里为了简化,我们取一些随机的数字作为ctr作为cpm。现实中往往需要同步或者异步请求模型服务器来获取。

接下来实现fill_node

struct fill_node {
    static void process(input<ctr> ctr, input<cpm> cpm, input<original> ori, 
        output<original_with_ctr_cpm> ori_ctr_cpm) {
        INFO("[gparallel] fill_node", "");
        for (int pos = 0; pos < ori->mutable_advs_original().size(); pos++) {
            auto & adv = ori_ctr_cpm->mutable_advs_original()[pos];
            adv.ctr = ctr->mutable_ctr_data()[pos];
            adv.cpm = cpm->mutable_cpm_data()[pos];
            INFO("[gparallel] ori_ctr_cpm adv:[%d] ctr:[%lf] cpm:[%lf]", adv.id, adv.ctr, adv.cpm);
        }
    }
};

fill_node节点将前面get_ctr_nodeget_cpm_node这两个节点的输出作为自己的输入,最后生成original_with_ctr_cpm。这里可以看到,originaloriginal_with_ctr_cpm分别作为节点的输入和输出,虽然这两个meta中包含的数据都是advs_original,但是对应了不同的状态original代表了advs_original一开始的状态,而original_with_ctr_cpm代表了已经填充了ctr作为cpm数据的状态。

然后实现gen_ctr_nodegen_cpm_node

struct gen_ctr_node {
    static void process(input<original_with_ctr_cpm> ori_ctr_cpm, 
        output<ctr_ordered_advlist> ctr_ordered) {
        INFO("[gparallel] gen_ctr_node", "");
        ctr_ordered->mutable_advs_ctr_ordered() = ori_ctr_cpm->mutable_advs_original();
        std::sort(ctr_ordered->mutable_advs_ctr_ordered().begin(),
            ctr_ordered->mutable_advs_ctr_ordered().end(), 
            [](const auto& a, const auto& b)->bool{return a.ctr > b.ctr;});

    }
};

struct gen_cpm_node {
    static void process(input<original_with_ctr_cpm> ori_ctr_cpm, 
        output<cpm_ordered_advlist> cpm_ordered) {
        INFO("[gparallel] gen_cpm_node", "");
        cpm_ordered->mutable_advs_cpm_ordered() = ori_ctr_cpm->mutable_advs_original();
        std::sort(cpm_ordered->mutable_advs_cpm_ordered().begin(),
            cpm_ordered->mutable_advs_cpm_ordered().end(),
            [](const auto& a, const auto& b)->bool{return a.cpm > b.cpm;});
    }
};

这里需要注意的是,gen_ctr_nodegen_cpm_node这两个节点,需要的是已经填充了ctr作为cpm数据advs_original,所以输入meta必须为original_with_ctr_cpm。这一点对于生成正确的DAG来说非常重要。gparallel正是通过区分同一数据的不同阶段,来实现正确的任务调度。

最后,end_node对所有的结果进行汇总,并反馈给下游。

struct end_node {
    static void process(input<ctr_ordered_advlist> ctr, input<cpm_ordered_advlist> cpm) {
        INFO("[gparallel] end_node", "");
        for (auto& adv : cpm->mutable_advs_cpm_ordered()) {
            INFO("CPM ordered:[%d]", adv.id);
        }
        for (auto& adv : ctr->mutable_advs_ctr_ordered()) {
            INFO("CTR ordered:[%d]", adv.id);
        }
    }
};

任务调度

在所有的meta和node都定义好以后,我们可以通过下面流程进行任务调度。 首先我们需要申请相应的内存来存储我们所有的数据变量,这里我们申请一个thread_data的实例:

thread_data td{{advertisement(1), advertisement(2), advertisement(3)}, {},{},{},{}};

我们还需要一个节点容器,来存放所有的数据处理节点:

dag_schema<thread_data> nodes;

接下来我们将所有的节点,注册到刚才申请的节点容器中。

register_node<thread_data, get_ctr_node, get_cpm_node>::reg(nodes);
register_node<thread_data, fill_node>::reg(nodes);
register_node<thread_data, gen_ctr_node, gen_cpm_node, end_node>::reg(nodes);

register_node模版负责注册节点到容器nodes中,其中第一个模版参数是我们所有数据最终存储的类,也就是meta_storage_t。剩余任意多个模版参数分别为各个node。reg函数的输入参数为dag_schema类型的节点容器实例。

接下来推导所有节点的依赖关系:

setup_dag_schema<thread_data>(nodes);

最后,我们对DAG上面的所有节点进行拓扑排序,并且按照排序后的顺序依次进行调用:

if (auto tasks = topological_sort<thread_data>(nodes); tasks) {
    for (auto task : tasks.value()) {
        INFO("Execute[%s]", task->name().c_str());
        task->mutable_executor()(&td);
    }
}

编译执行:

$ cd build
$ cmake ../
$ make demo
$ ./demo

如果我们打开了debug日志,就可以在demo的输出中看到下面的DAG信息:

[2020-05-20 23:24:11.082883] [] [info][~/gparallel/include/dag_schema.h][127]node_depends_after
http://graphviz.it/#
[2020-05-20 23:24:11.083212] [] [info][~/gparallel/include/dag_schema.h][128]
digraph node_depends_after{
rankdir=BT;
size="8,5";
"fill_node" -> "get_ctr_node";
"fill_node" -> "get_cpm_node";
"gen_ctr_node" -> "fill_node";
"gen_cpm_node" -> "fill_node";
"end_node" -> "gen_ctr_node";
"end_node" -> "gen_cpm_node";
}

通过http://graphviz.it/#,可以看到gparallel自动推导得到的DAG。

demo_dag.png

同时可以看到全部业务执行的结果:

[2020-05-20 23:24:11.084784] [] [info][advprocess.cpp][141]Execute[get_ctr_node]
[2020-05-20 23:24:11.084792] [] [info][advprocess.cpp][63][gparallel] get_ctr_node
[2020-05-20 23:24:11.084866] [] [info][advprocess.cpp][141]Execute[get_cpm_node]
[2020-05-20 23:24:11.084898] [] [info][advprocess.cpp][74][gparallel] get_cpm_node
[2020-05-20 23:24:11.084905] [] [info][advprocess.cpp][141]Execute[fill_node]
[2020-05-20 23:24:11.084910] [] [info][advprocess.cpp][87][gparallel] fill_node
[2020-05-20 23:24:11.084916] [] [info][advprocess.cpp][92][gparallel] ori_ctr_cpm adv:[1] ctr:[0.100000] cpm:[100.200000]
[2020-05-20 23:24:11.084920] [] [info][advprocess.cpp][92][gparallel] ori_ctr_cpm adv:[2] ctr:[0.200000] cpm:[200.400000]
[2020-05-20 23:24:11.084956] [] [info][advprocess.cpp][92][gparallel] ori_ctr_cpm adv:[3] ctr:[0.300000] cpm:[300.600000]
[2020-05-20 23:24:11.084966] [] [info][advprocess.cpp][141]Execute[gen_ctr_node]
[2020-05-20 23:24:11.084992] [] [info][advprocess.cpp][100][gparallel] gen_ctr_node
[2020-05-20 23:24:11.085003] [] [info][advprocess.cpp][141]Execute[gen_cpm_node]
[2020-05-20 23:24:11.085007] [] [info][advprocess.cpp][112][gparallel] gen_cpm_node
[2020-05-20 23:24:11.085011] [] [info][advprocess.cpp][141]Execute[end_node]
[2020-05-20 23:24:11.085015] [] [info][advprocess.cpp][122][gparallel] end_node
[2020-05-20 23:24:11.085046] [] [info][advprocess.cpp][124]CPM ordered:[3]
[2020-05-20 23:24:11.085055] [] [info][advprocess.cpp][124]CPM ordered:[2]
[2020-05-20 23:24:11.085062] [] [info][advprocess.cpp][124]CPM ordered:[1]
[2020-05-20 23:24:11.085069] [] [info][advprocess.cpp][127]CTR ordered:[3]
[2020-05-20 23:24:11.085076] [] [info][advprocess.cpp][127]CTR ordered:[2]
[2020-05-20 23:24:11.085082] [] [info][advprocess.cpp][127]CTR ordered:[1]

Some friends ask how to execute this DAG parallelly?

One solution is:

void invoke(Node &node, std::shared_ptr<std::latch> done) {
    std::shared_ptr<std::latch> dep_done = std::make_shared<>(node.dependency_list.size());
    for (auto &dep : node.dependency_list) {
        invoke(dep, dep_done);
    }
    thread_pool.push([&node, done, dep_done](){
        dep_done->wait();
        node.execute();
        done->count_down();
    })
}

int main() {
    std::list<Node> node_list;
    // Find the final node
    Node &final_node = xxx;
    std::shared_ptr<std::latch> done = std::make_shared<>(1);
    invoke(final_node, latch);
    latch->wait();
    // Final result is ready here
}

By huyifeng (2023/08/26)

Open Source Agenda is not affiliated with "Gparallel" Project. README Source: galois-advertising/gparallel

Open Source Agenda Badge

Open Source Agenda Rating