workflow的应用场景

workflow的思维导图

网络模型

服务端:获取一个获取客户端请求的请求

#include <stdio.h>
#include "workflow/WFHttpServer.h"

int mul(int a, int b, int res) {
    return a * b;
}

int main()
{
    WFHttpServer server([](WFHttpTask *task) {
        task->get_resp()->append_output_body("<html>Hello World!</html>");
        int a = 365;
        int b = 7;
        int res;
        WFGoTask *go = WFTaskFactory::create_go_task("test", &mul, a, b, std::ref(res));
        go->set_callback([&](WFGoTask *t){
            protocol::HttpResponse *resp = (protocol::HttpResponse *)t->user_data;
            char buf[1024] = {0};
            sprintf(buf, "<br>7*365 = %d</br>", res);
            resp->append_output_body(buf);
        });
        go->user_data = task->get_resp();

        series_of(task)->push_back(go);
    });

    if (server.start(8888) == 0) { // start server on port 8888
        getchar(); // press "Enter" to end.
        server.stop();
    }

    return 0;
}

客户端:请求回应模式

/*
  Copyright (c) 2019 Sogou, Inc.

  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

	  http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.

  Author: Xie Han (xiehan@sogou-inc.com;63350856@qq.com)
*/

#include <netdb.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <string>
#include "workflow/RedisMessage.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"

#define RETRY_MAX       2

struct tutorial_task_data
{
std::string url;
std::string key;
};

//上层收到了一个完整的数据包之后,才会触发这个redis_callback回调函数去处理
void redis_callback(WFRedisTask *task)
{
    protocol::RedisRequest *req = task->get_req();
    protocol::RedisResponse *resp = task->get_resp();
    int state = task->get_state();
    int error = task->get_error();
    protocol::RedisValue val;

    switch (state)
        {
            case WFT_STATE_SYS_ERROR:
                fprintf(stderr, "system error: %s\n", strerror(error));
                break;
            case WFT_STATE_DNS_ERROR:
                fprintf(stderr, "DNS error: %s\n", gai_strerror(error));
                break;
            case WFT_STATE_SSL_ERROR:
                fprintf(stderr, "SSL error: %d\n", error);
                break;
            case WFT_STATE_TASK_ERROR:
                fprintf(stderr, "Task error: %d\n", error);
                break;
            case WFT_STATE_SUCCESS:
                resp->get_result(val);
                if (val.is_error())
                {
                    fprintf(stderr, "Error reply. Need a password?\n");
                    state = WFT_STATE_TASK_ERROR;
                }
                break;
        }

    if (state != WFT_STATE_SUCCESS)
    {
        fprintf(stderr, "Failed. Press Ctrl-C to exit.\n");
        return;
    }

    std::string cmd;
    req->get_command(cmd);
    if (cmd == "SET")
    {
        tutorial_task_data *data = (tutorial_task_data *)task->user_data;
        WFRedisTask *next = WFTaskFactory::create_redis_task(data->url,
        RETRY_MAX,
        redis_callback);

        next->get_req()->set_request("GET", { data->key });
        /* Push next task(GET task) to current series. */
        series_of(task)->push_back(next);
        fprintf(stderr, "Redis SET request success. Trying to GET...\n");
    }
    else /* if (cmd == "GET") */
    {
        if (val.is_string())
        {
            fprintf(stderr, "Redis GET success. value = %s\n",
                val.string_value().c_str());
        }
        else
        {
            fprintf(stderr, "Error: Not a string value. \n");
        }

        fprintf(stderr, "Finished. Press Ctrl-C to exit.\n");
    }
}

static WFFacilities::WaitGroup wait_group(1);

void sig_handler(int signo)
{
    wait_group.done();
}

int main(int argc, char *argv[])
{
    WFRedisTask *task;

    if (argc != 4)
    {
        fprintf(stderr, "USAGE: %s <redis URL> <key> <value>\n", argv[0]);
        exit(1);
    }

    signal(SIGINT, sig_handler);

    /* This struct only used in this tutorial. */
    struct tutorial_task_data data;

	/* Redis URL format: redis://:password@host:port/dbnum
	   examples:
	   redis://127.0.0.1
	   redis://:12345@redis.sogou:6379/3
	*/
	data.url = argv[1];
	if (strncasecmp(argv[1], "redis://", 8) != 0 &&
		strncasecmp(argv[1], "rediss://", 9) != 0)
	{
		data.url = "redis://" + data.url;
	}

	data.key = argv[2];

	task = WFTaskFactory::create_redis_task(data.url, RETRY_MAX,
											redis_callback);
	protocol::RedisRequest *req = task->get_req();
	req->set_request("SET", { data.key, argv[3] });

	/* task->user_data is a public (void *), can store anything. */
	task->user_data = &data;

	/* task->start() equel to:
	 * Workflow::start_series_work(task, nullptr) or
	 * Workflow::create_series_work(task, nullptr)->start() */
	task->start();

	wait_group.wait();
	return 0;
}

异步任务的封装

串联

SeriesWork的封装执行流程
  1. 找到task所属的流,并且将task go给push back到task对应的任务流中

  1. 执行任务流中的任务

并联

/*
  Copyright (c) 2019 Sogou, Inc.

  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

	  http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.

  Author: Xie Han (xiehan@sogou-inc.com;63350856@qq.com)
*/

#include <stdio.h>
#include <string.h>
#include <utility>
#include <string>
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/HttpMessage.h"
#include "workflow/HttpUtil.h"
#include "workflow/WFFacilities.h"

using namespace protocol;

#define REDIRECT_MAX    5
#define RETRY_MAX       2

struct tutorial_series_context
{
std::string url;
int state;
int error;
HttpResponse resp;
};

void callback(const ParallelWork *pwork)
{
    tutorial_series_context *ctx;
    const void *body;
    size_t size;
    size_t i;

    for (i = 0; i < pwork->size(); i++)
        {
            ctx = (tutorial_series_context *)pwork->series_at(i)->get_context();
            printf("%s\n", ctx->url.c_str());
            if (ctx->state == WFT_STATE_SUCCESS)
            {
                ctx->resp.get_parsed_body(&body, &size);
                printf("%zu%s\n", size, ctx->resp.is_chunked() ? " chunked" : "");
                fwrite(body, 1, size, stdout);
                printf("\n");
            }
            else
                printf("ERROR! state = %d, error = %d\n", ctx->state, ctx->error);

            delete ctx;
        }
}

int main(int argc, char *argv[])
{
    ParallelWork *pwork = Workflow::create_parallel_work(callback);
    SeriesWork *series;
    WFHttpTask *task;
    HttpRequest *req;
    tutorial_series_context *ctx;
    int i;

    for (i = 1; i < argc; i++)
        {
            std::string url(argv[i]);

            if (strncasecmp(argv[i], "http://", 7) != 0 &&
                strncasecmp(argv[i], "https://", 8) != 0)
            {
                url = "http://" +url;
            }

            task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX,
                [](WFHttpTask *task)
                {
                    tutorial_series_context *ctx =
                    (tutorial_series_context *)series_of(task)->get_context();
                    ctx->state = task->get_state();
                    ctx->error = task->get_error();
                    ctx->resp = std::move(*task->get_resp());
                });

            req = task->get_req();
            req->add_header_pair("Accept", "*/*");
            req->add_header_pair("User-Agent", "Wget/1.14 (linux-gnu)");
            req->add_header_pair("Connection", "close");

            ctx = new tutorial_series_context;
            ctx->url = std::move(url);
            series = Workflow::create_series_work(task, nullptr);
            series->set_context(ctx);
            pwork->add_series(series);
        }

    WFFacilities::WaitGroup wait_group(1);

    Workflow::start_series_work(pwork, [&wait_group](const SeriesWork *) {
        wait_group.done();
    });

    wait_group.wait();
    return 0;
}

统一创建一个并联的任务

创建多个并发任务

执行并发任务

并发任务执行完之后执行的回调

DAG图

/*
  Copyright (c) 2021 Sogou, Inc.

  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

	  http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.

  Author: Xie Han (xiehan@sogou-inc.com;63350856@qq.com)
*/

#include <stdio.h>
#include "workflow/WFTaskFactory.h"
#include "workflow/WFGraphTask.h"
#include "workflow/HttpMessage.h"
#include "workflow/WFFacilities.h"

using namespace protocol;

static WFFacilities::WaitGroup wait_group(1);

void go_func(const size_t *size1, const size_t *size2)
{
    printf("page1 size = %zu, page2 size = %zu\n", *size1, *size2);
}

void http_callback(WFHttpTask *task)
{
    size_t *size = (size_t *)task->user_data;
    const void *body;

    if (task->get_state() == WFT_STATE_SUCCESS)
        task->get_resp()->get_parsed_body(&body, size);
    else
        *size = (size_t)-1;
}

#define REDIRECT_MAX	3
#define RETRY_MAX		1

int main()
{
    WFTimerTask *timer;
    WFHttpTask *http_task1;
    WFHttpTask *http_task2;
    WFGoTask *go_task;
    size_t size1;
    size_t size2;

    timer = WFTaskFactory::create_timer_task(1000000, [](WFTimerTask *) {
        printf("timer task complete(1s).\n");
    });

    /* Http task1 */
    http_task1 = WFTaskFactory::create_http_task("https://www.sogou.com/",
        REDIRECT_MAX, RETRY_MAX,
        http_callback);
    http_task1->user_data = &size1;

    /* Http task2 */
    http_task2 = WFTaskFactory::create_http_task("https://www.baidu.com/",
        REDIRECT_MAX, RETRY_MAX,
        http_callback);
    http_task2->user_data = &size2;

    /* go task will print the http pages size */
    go_task = WFTaskFactory::create_go_task("go", go_func, &size1, &size2);

    /* Create a graph. Graph is also a kind of task */
    WFGraphTask *graph = WFTaskFactory::create_graph_task([](WFGraphTask *) {
        printf("Graph task complete. Wakeup main process\n");
        wait_group.done();
    });

    /* Create graph nodes */
    WFGraphNode& a = graph->create_graph_node(timer);
    WFGraphNode& b = graph->create_graph_node(http_task1);
    WFGraphNode& c = graph->create_graph_node(http_task2);
    WFGraphNode& d = graph->create_graph_node(go_task);

    /* Build the graph */
    a-->b;
    a-->c;
    b-->d;
    c-->d;

    graph->start();
    wait_group.wait();
    return 0;
}

DAG图的创建过程

DAG图的执行流程

workflow的抽象

创建流的接口

  • create_series_work的接口是只创建了流,不执行任务流
  • start_series_work的接口是不仅创建了流,还执行了流(dispatch)

SeriesWork的封装

class SeriesWork
{
    public:
    void start()
    {
        assert(!this->in_parallel);
        this->first->dispatch();
    }

    /* Call dismiss() only when you don't want to start a created series.
	 * This operation is recursive, so only call on the "root". */
    void dismiss()
    {
        assert(!this->in_parallel);
        this->dismiss_recursive();
    }

    public:
    void push_back(SubTask *task);
    void push_front(SubTask *task);

    public:
    void *get_context() const { return this->context; }
    void set_context(void *context) { this->context = context; }

    public:
    /* Cancel a running series. Typically, called in the callback of a task
	 * that belongs to the series. All subsequent tasks in the series will be
	 * destroyed immediately and recursively (ParallelWork), without callback.
	 * But the callback of this canceled series will still be called. */
    virtual void cancel() { this->canceled = true; }

    /* Parallel work's callback may check the cancellation state of each
	 * sub-series, and cancel it's super-series recursively. */
    bool is_canceled() const { return this->canceled; }

    /* 'false' until the time of callback. Mainly for sub-class. */
    bool is_finished() const { return this->finished; }

    public:
    void set_callback(series_callback_t callback)
    {
        this->callback = std::move(callback);
    }

    public:
    virtual void *get_specific(const char *key) { return NULL; }

    public:
    /* The following functions are intended for task implementations only. */
    SubTask *pop();

    void set_last_task(SubTask *last)
    {
        last->set_pointer(this);
        this->last = last;
    }

    void unset_last_task() { this->last = NULL; }

    const ParallelTask *get_in_parallel() const { return this->in_parallel; }

    protected:
    SubTask *get_last_task() const { return this->last; }

    void set_in_parallel(const ParallelTask *task) { this->in_parallel = task; }

    void dismiss_recursive();

    protected:
    void *context;
    series_callback_t callback;

    private:
    SubTask *pop_task();
    void expand_queue();

    private:
    SubTask *buf[4];
    SubTask *first;
    SubTask *last;
    SubTask **queue;
    int queue_size;
    int front;
    int back;
    bool canceled;
    bool finished;
    const ParallelTask *in_parallel;
    std::mutex mutex;

    protected:
    SeriesWork(SubTask *first, series_callback_t&& callback);
    virtual ~SeriesWork();
    friend class ParallelWork;
    friend class Workflow;
};

  • dispatch:串行流的发起操作
任务流中的上下文

任务流中的所有的任务,都可以去访问这个上下文(上下文:一个任务流中的所有任务都可以共享的数据)

任务流的pop操作

当任务流中的任务执行完毕之后,这个执行完的任务就会从这个任务流中pop出来

WFTaskFactory的封装

class WFTaskFactory
{
	public:
	static WFHttpTask *create_http_task(const std::string& url,
									int redirect_max,
									int retry_max,
									http_callback_t callback);

	static WFHttpTask *create_http_task(const ParsedURI& uri,
									int redirect_max,
									int retry_max,
									http_callback_t callback);

	static WFHttpTask *create_http_task(const std::string& url,
									const std::string& proxy_url,
									int redirect_max,
									int retry_max,
									http_callback_t callback);

	static WFHttpTask *create_http_task(const ParsedURI& uri,
									const ParsedURI& proxy_uri,
									int redirect_max,
									int retry_max,
									http_callback_t callback);

	static WFRedisTask *create_redis_task(const std::string& url,
									  int retry_max,
									  redis_callback_t callback);

	static WFRedisTask *create_redis_task(const ParsedURI& uri,
									  int retry_max,
									  redis_callback_t callback);

	static WFMySQLTask *create_mysql_task(const std::string& url,
									  int retry_max,
									  mysql_callback_t callback);

	static WFMySQLTask *create_mysql_task(const ParsedURI& uri,
									  int retry_max,
									  mysql_callback_t callback);

	static WFDnsTask *create_dns_task(const std::string& url,
								  int retry_max,
								  dns_callback_t callback);

	static WFDnsTask *create_dns_task(const ParsedURI& uri,
								  int retry_max,
								  dns_callback_t callback);

	public:
	static WFFileIOTask *create_pread_task(int fd,
									   void *buf,
									   size_t count,
									   off_t offset,
									   fio_callback_t callback);

	static WFFileIOTask *create_pwrite_task(int fd,
										const void *buf,
										size_t count,
										off_t offset,
										fio_callback_t callback);

	/* preadv and pwritev tasks are supported by Linux aio only.
	 * On macOS or others, you will get an ENOSYS error in callback. */

	static WFFileVIOTask *create_preadv_task(int fd,
										 const struct iovec *iov,
										 int iovcnt,
										 off_t offset,
										 fvio_callback_t callback);

	static WFFileVIOTask *create_pwritev_task(int fd,
										  const struct iovec *iov,
										  int iovcnt,
										  off_t offset,
										  fvio_callback_t callback);

	static WFFileSyncTask *create_fsync_task(int fd,
										 fsync_callback_t callback);

	/* On systems that do not support fdatasync(), like macOS,
	 * fdsync task is equal to fsync task. */
	static WFFileSyncTask *create_fdsync_task(int fd,
										  fsync_callback_t callback);

	/* File tasks with path name. */
	public:
	static WFFileIOTask *create_pread_task(const std::string& pathname,
									   void *buf,
									   size_t count,
									   off_t offset,
									   fio_callback_t callback);

	static WFFileIOTask *create_pwrite_task(const std::string& pathname,
										const void *buf,
										size_t count,
										off_t offset,
										fio_callback_t callback);

	static WFFileVIOTask *create_preadv_task(const std::string& pathname,
										 const struct iovec *iov,
										 int iovcnt,
										 off_t offset,
										 fvio_callback_t callback);

	static WFFileVIOTask *create_pwritev_task(const std::string& pathname,
										  const struct iovec *iov,
										  int iovcnt,
										  off_t offset,
										  fvio_callback_t callback);

	public:
	static WFTimerTask *create_timer_task(unsigned int microseconds,
									  timer_callback_t callback);

	static WFTimerTask *create_timer_task(time_t seconds, long nanoseconds,
									  timer_callback_t callback);

	/* Counter is like semaphore. The callback of counter is called when
	 * 'count' operations reach target_value & after the task is started.
	 * It's perfectly legal to call 'count' before the task is started. */

	/* Create an unnamed counter. Call counter->count() directly.
	 * NOTE: never call count() exceeding target_value. */
	static WFCounterTask *create_counter_task(unsigned int target_value,
										  counter_callback_t callback)
	{
		return new WFCounterTask(target_value, std::move(callback));
	}

	/* Create a named counter. */
	static WFCounterTask *create_counter_task(const std::string& counter_name,
										  unsigned int target_value,
										  counter_callback_t callback);

	/* Count by a counter's name. When count_by_name(), it's safe to count
	 * exceeding target_value. When multiple counters share a same name,
	 * this operation will be performed on the first created. If no counter
	 * matches the name, nothing is performed. */
	static void count_by_name(const std::string& counter_name)
	{
		WFTaskFactory::count_by_name(counter_name, 1);
	}

	/* Count by name with a value n. When multiple counters share this name,
	 * the operation is performed on the counters in the sequence of its
	 * creation, and more than one counter may reach target value. */
	static void count_by_name(const std::string& counter_name, unsigned int n);

	public:
	static WFMailboxTask *create_mailbox_task(void **mailbox,
										  mailbox_callback_t callback)
	{
		return new WFMailboxTask(mailbox, std::move(callback));
	}

	/* Use 'user_data' as mailbox. */
	static WFMailboxTask *create_mailbox_task(mailbox_callback_t callback)
	{
		return new WFMailboxTask(std::move(callback));
	}

	public:
	static WFConditional *create_conditional(SubTask *task, void **msgbuf)
	{
		return new WFConditional(task, msgbuf);
	}

	static WFConditional *create_conditional(SubTask *task)
	{
		return new WFConditional(task);
	}

	static WFConditional *create_conditional(const std::string& cond_name,
										 SubTask *task, void **msgbuf);

	static WFConditional *create_conditional(const std::string& cond_name,
										 SubTask *task);

	static void signal_by_name(const std::string& cond_name, void *msg)
	{
		WFTaskFactory::signal_by_name(cond_name, msg, (size_t)-1);
	}

	static void signal_by_name(const std::string& cond_name, void *msg,
						   size_t max);

	public:
	template<class FUNC, class... ARGS>
		static WFGoTask *create_go_task(const std::string& queue_name,
								FUNC&& func, ARGS&&... args);

	/* Create 'Go' task with running time limit in seconds plus nanoseconds.
	 * If time exceeded, state WFT_STATE_ABORTED will be got in callback. */
	template<class FUNC, class... ARGS>
		static WFGoTask *create_timedgo_task(time_t seconds, long nanoseconds,
									 const std::string& queue_name,
									 FUNC&& func, ARGS&&... args);

	/* Create 'Go' task on user's executor and execution queue. */
	template<class FUNC, class... ARGS>
		static WFGoTask *create_go_task(ExecQueue *queue, Executor *executor,
								FUNC&& func, ARGS&&... args);

	template<class FUNC, class... ARGS>
		static WFGoTask *create_timedgo_task(time_t seconds, long nanoseconds,
									 ExecQueue *queue, Executor *executor,
									 FUNC&& func, ARGS&&... args);

	/* For capturing 'task' itself in go task's running function. */
	template<class FUNC, class... ARGS>
		static void reset_go_task(WFGoTask *task, FUNC&& func, ARGS&&... args);

	public:
	static WFGraphTask *create_graph_task(graph_callback_t callback)
	{
		return new WFGraphTask(std::move(callback));
	}

	public:
	static WFEmptyTask *create_empty_task()
	{
		return new WFEmptyTask;
	}

	static WFDynamicTask *create_dynamic_task(dynamic_create_t create);

	static WFRepeaterTask *create_repeater_task(repeated_create_t create,
											repeater_callback_t callback)
	{
		return new WFRepeaterTask(std::move(create), std::move(callback));
	}

	public:
	static WFModuleTask *create_module_task(SubTask *first,
										module_callback_t callback)
	{
		return new WFModuleTask(first, std::move(callback));
	}

	static WFModuleTask *create_module_task(SubTask *first, SubTask *last,
										module_callback_t callback)
	{
		WFModuleTask *task = new WFModuleTask(first, std::move(callback));
		task->sub_series()->set_last_task(last);
		return task;
	}
};

WFTaskFactory中就列举了workflow原生支持的哪几种协议的task(比如:http、redis、mysql等)

TaskFactory任务流程
  1. 创建任务流
  2. 向创建的任务流中添加任务(这个任务的来源就是WFTaskFactory抽象出来那几种基础的任务)
    1. 有串行执行流
    2. 有并行执行流
    3. 有串并联混合的执行流

任务流中串并联的实现

SubTask纯虚类的实现

  • dispatch成员函数:发起任务
  • done成员函数:任务完成,将任务从任务流中pop出来
    • 在done的成员函数中会调用subtask_done成员函数。subtask_done中解决了任务流中的任务传递的问题。

并行任务中的dispatch的实现

workflow的线程池模型

  • 主线程至少有一个server任务,server任务中调用start来dispatch任务
  • server任务中dispatch的任务会被抛出到poller thread
  • 这里会根据fd的负载均衡(fd%4)最终决定将dispatch的任务抛出到具体的poller thread
  • 每一个poller thread都对应着一个epoll
  • server任务中会包含几个网络流程的
    • 接收客户端跟服务器建立的连接
    • 注册读事件,等待客户端向服务器端发起请求(这里需要注意的是workflow的kernel层会将所有的数据包都接收完成之后,才会触发用户在应用层注册的回调)
  • 注意在其中的一个poller thread中对一个端口进行监听(listenfd),当客户端三次握手接入之后,服务器端会根据负载均衡计算得出另一个fd应该落在的poller thread(并且在这个线程中注册监听客户端三次握手接入的clientfd);也就是说监听客户端的读事件会涉及到切换线程。在这个线程中会切割接收到的完整的http数据包
  • poller线程收到的http数据包,server任务将其抛出在worker线程中处理(poller线程与worker线程之间有个队列,poller线程将接收到的完整的http数据包push到队列中,worker线程从队列中pop出来数据并且进行处理;worker线程从队列中拉倒具体的数据之后,server任务就会触发具体的回调函数---这个被触发的任务的回调就会在worker线程中处理了)
  • 只要是net\timer\fileio等检测部分的事件都是在poller线程;当条件满足后,net\timer\fileio的回调就会被触发,net\timer\fileio的回调的执行都是在worker线程中
  • poller线程的任务
    • 检测条件是否满足,如果是网络事件,那么还会去组装完整的数据;
      • 当数据组装成功后,就说明当前的条件满足;
      • 条件满足,将数据push到队列
    • 如果是timer事件,也是跟网络事件类似的
  • 如果是cpu这种耗时计算的任务会在go线程池中执行

  • 对于一个客户端的请求(三次握手、发送请求、请求redis数据库、向redis数据库发送数据)可能经历了N多个线程(poller线程<-->worker线程<-->go线程),那么这里是否涉及到了线程安全的问题?
    • 请求回应的工作模式,只要不涉及到临界资源(同一块内存)的访问,那么就不会有线程安全的问题出现。

应用场景

高扇出

多client混用场景

SRPC

workflow的组件实现

思维导图

Linux下cmake调试环境搭建

helloworld代码调试

运行helloworld进程,可以看到起来的线程

浏览器中输入192.168.43.56:8888

此时代码就会停止在断点处

网络模块组件分析

网络线程入口函数

static void *__poller_thread_routine(void *arg)
{
	poller_t *poller = (poller_t *)arg;
	__poller_event_t events[POLLER_EVENTS_MAX];
	struct __poller_node time_node;
	struct __poller_node *node;
	int has_pipe_event;
	int nevents;
	int i;
	pthread_setname_np(pthread_self(), "poller");

	while (1)
	{
		__poller_set_timer(poller);
		nevents = __poller_wait(events, POLLER_EVENTS_MAX, poller);
		clock_gettime(CLOCK_MONOTONIC, &time_node.timeout);
		has_pipe_event = 0;
		for (i = 0; i < nevents; i++)
		{
			node = (struct __poller_node *)__poller_event_data(&events[i]);
			if (node > (struct __poller_node *)1)
			{
				switch (node->data.operation)
				{
				case PD_OP_READ:
					__poller_handle_read(node, poller);
					break;
				case PD_OP_WRITE:
					__poller_handle_write(node, poller);
					break;
				case PD_OP_LISTEN:
					__poller_handle_listen(node, poller);
					break;
				case PD_OP_CONNECT:
					__poller_handle_connect(node, poller);
					break;
				case PD_OP_SSL_ACCEPT:
					__poller_handle_ssl_accept(node, poller);
					break;
				case PD_OP_SSL_CONNECT:
					__poller_handle_ssl_connect(node, poller);
					break;
				case PD_OP_SSL_SHUTDOWN:
					__poller_handle_ssl_shutdown(node, poller);
					break;
				case PD_OP_EVENT:
					__poller_handle_event(node, poller);
					break;
				case PD_OP_NOTIFY:
					__poller_handle_notify(node, poller);
					break;
				case PD_OP_RECVFROM:
					__poller_handle_recvfrom(node, poller);
					break;
				}
			}
			else if (node == (struct __poller_node *)1)
				has_pipe_event = 1;
		}

		if (has_pipe_event)
		{
			if (__poller_handle_pipe(poller))
				break;
		}

		__poller_handle_timeout(&time_node, poller);
	}

#if OPENSSL_VERSION_NUMBER < 0x10100000L
# ifdef CRYPTO_LOCK_ECDH
	ERR_remove_thread_state(NULL);
# else
	ERR_remove_state(0);
# endif
#endif
	return NULL;
}

与其他网络框架的不同之处

  • 其他的网络框架的所有epoll_ctl(增加、修改以及删除)都是worker线程通过Linux下pipe信号发送的方式通知poller线程(epoll所在的线程)去操作epoll(增加、修改以及删除)
  • workflow只有epoll的删除操作是worker线程通过pipe向poller线程发送信号通知poller线程,最终是由poller线程去从红黑树中删除这个节点;其他的epoll操作(增加、修改)都是由worker线程直接修改

队列设计

workflow采用的是多生产者多消费者的有锁队列来实现的

数据结构

多生产者多消费者的队列模型的设计

  • 多生产者对多消费者的问题,转换成了多生产者对单消费者的问题 ---减少了锁的竞争问题
  • 多消费者对多生产者的问题,转换成了多消费者对单生产者的问题 ---减少了锁的竞争问题

队列元素的通用性设计

队列的应用

网络线程接收到数据包之后,网络线程会将这个数据包抛出给工作线程去处理

队列的创建

队列创建的时候就会指定数据规则(这个队列中存储什么类型的数据,数据偏移量是多少)

poll线程向队列中push数据

poller线程向队列中push的数据结构

__poller_node数据结构

  • __poller_node结构体与poller_result结构体前面几个元素都是一样的(state、error、data)
  • msgqueue_create在创建队列的时候会指定linkoff的偏移(大小为sizeof(struct poller_result))
  • 队列中每个元素偏移sizeof(struct poller_result)大小之后的内容,用户可以自己做定制化设计;但是节点的类型,也就是每个元素里面的struct poller_result数据结构里面的内容都是一样的
  • 这种设计是典型的c语言设计,如果是c++则会采用继承的方式来实现数据的多样性

线程池设计

go线程池的设计

handler_thread_routine线程任务的设计

void Communicator::handler_thread_routine(void *context)
{
	Communicator *comm = (Communicator *)context;
	struct poller_result *res;

	while (1)
	{
		//从队列当中取出消息
		res = (struct poller_result *)msgqueue_get(comm->msgqueue);
		if (!res)
			break;

		switch (res->data.operation)
		{
		case PD_OP_TIMER:
			comm->handle_sleep_result(res);
			break;
		case PD_OP_READ:
			comm->handle_read_result(res);
			break;
		case PD_OP_WRITE:
			comm->handle_write_result(res);
			break;
		case PD_OP_CONNECT:
		case PD_OP_SSL_CONNECT:
			comm->handle_connect_result(res);
			break;
		case PD_OP_LISTEN:
			comm->handle_listen_result(res);
			break;
		case PD_OP_SSL_ACCEPT:
			comm->handle_ssl_accept_result(res);
			break;
		case PD_OP_EVENT:
		case PD_OP_NOTIFY:
			comm->handle_aio_result(res);
			break;
		}

		free(res);
	}
}

__thrdpool_schedule

  • 这里需要注意的是__thrdpool_schedule中的msgqueue与handler_thread_routine中的msgqueue不是同一个队列

workflow线程池的实现接口

thrdpool_increase

通过代码追踪可以发现,线程创建其实就是不断地从消息队列中取任务执行,具体代码实现在__thrdpool_routine函数中

static void *__thrdpool_routine(void *arg)
{
	thrdpool_t *pool = (thrdpool_t *)arg;
	struct __thrdpool_task_entry *entry;
	void (*task_routine)(void *);
	void *task_context;
	pthread_t tid;

	pthread_setspecific(pool->key, pool);
	pthread_setname_np(pthread_self(), "thrdpool");
	while (!pool->terminate)
	{
		entry = (struct __thrdpool_task_entry *)msgqueue_get(pool->msgqueue);
		if (!entry)
			break;

		task_routine = entry->task.routine;
		task_context = entry->task.context;
		free(entry);
		task_routine(task_context);

		if (pool->nthreads == 0)
		{
			/* Thread pool was destroyed by the task. */
			free(pool);
			return NULL;
		}
	}

	/* One thread joins another. Don't need to keep all thread IDs. */
	pthread_mutex_lock(&pool->mutex);
	tid = pool->tid;
	pool->tid = pthread_self();
	if (--pool->nthreads == 0)
		pthread_cond_signal(pool->terminate);

	pthread_mutex_unlock(&pool->mutex);
	if (memcmp(&tid, &__zero_tid, sizeof (pthread_t)) != 0)
		pthread_join(tid, NULL);

	return NULL;
}

创建的线程不断的从消息队列中取出任务然后执行

如果消息队列为空(也就是说当前没有任务需要执行的时候,该线程池就会休眠)

从上面的msgqueue_get函数可以看出,当get队列为空,那么就会对put队列和get队列之间进行一个交换(执行__msgqueue_swap函数)

从上面可以看出,workflow中线程池的设计是:当前的线程是否阻塞取决于当前的get队列是否阻塞

thrdpool_destroy

学习链接

https://xxetb.xetslk.com/s/12PH1r

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐