0%

C++20协程原理和Asio的使用介绍

协程和异步的关系是什么?C++20协程的原理是什么?如何使用Asio开发高性能网络框架!

开发环境

  • llvm14
  • c++20

协程和异步

网络开发中异步的性能往往会远远高于同步,异步的好处在于当事件真正触发的时候才会去执行对应逻辑,因此可以在消耗较少的系统资源下就可以完成非常高的并发!主要原因还是网络开发中大部分开销都在网络IO上!

coroutine(协程) 主要解决的就是异步的问题,异步代码最大的问题就是回掉,当你逻辑复杂点,简直的回掉地狱(如果你代码抽象的不好的话),例如envoy就是采用的异步(libevent)去实现的!coroutine为啥能解决异步的问题呢?coroutine本质上是一个可以暂停(suspend)和恢复(resumed)的函数,异步本质上也是因为当我们创建一个异步任务的时候那么实际上就是一个函数暂停的操作,当真正事件触发的时候才会恢复执行你的回掉函数!

coroutine 是在语言层面支持了函数的暂停和恢复,要实现只能对应的语言、编译器去支持,例如 c++20、javascript、c# 、rust 这几种就是在语言层面支持了函数的暂停和恢复,具体怎么实现的大家应该都能理解,就是将函数在暂停点进行拆分,从而一个函数被拆分成为多个函数,当事件触发的时候执行暂停点之后的函数即可!所以它本质上和异步并无差异,只不过可读性会高很多,你可以理解为是语法糖,所以我们在实现coroutine的时候也需要实现一个调度器!在C++中调度器比较出名的就 libeventasio吧(跨平台能力会比较好),他们都支持计时器、网络IO等事件调度!

不清楚大家用过Go有没有,Go语言是我觉得对于协程设计最完美的语言!它太简单了,简单到任何人都可以快速的实现一个高并发服务器!这就是协程的魅力,在多线程时代你要实现高并发服务器,太难了,多线程高并发基于都是基于异步去做的!很多人认为Go的协程是Fiber(纤程),其实概念这个东西无可厚非,程序员都比较务实只要快就行了!

基于 coroutine 自己实现一个 timer 调度器

官方文档: https://en.cppreference.com/w/cpp/language/coroutines

在C++ coroutine 设计中,一个函数是协程要求需要返回一个coroutine对象,coroutine对象要求内部有一个promise_type类型,promise_type需要实现几个钩子函数来控制coroutine对象的,下面例子的async实际上就是coroutine对象。我们在协程中可以调用 co_await一个 awaitable 对象,awaitable对象可以操作coroutine_handle进而实现恢复操作,暂停操作是通过勾子函数实现的!其次main函数不允许返回coroutine对象,因此main函数不能是一个coroutine,也就不能调用co_await等关键词了!

下面这个例子你可以更好的了解c++ coroutine的设计原理!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
#include "spdlog/spdlog.h"
#include <coroutine>
#include <iostream>
#include <list>

// 事件
template <typename T>
struct event {
event() = default;
virtual ~event() = default;
[[nodiscard]] virtual bool ready() const = 0;
[[nodiscard]] virtual T return_value() const = 0;
};

// 任务,一个可以被恢复执行的任务
struct task {
task() = default;
virtual ~task() = default;
[[nodiscard]] virtual bool ready() = 0;
virtual void resume() = 0;
};

// 执行器,执行任务!
struct executor {
executor() = default;
~executor() = default;
void push(task* task) {
tasks_.push_back(task);
}
void run() {
std::vector<task*> rm_tasks;
while (true) {
if (tasks_.empty()) {
return;
}
for (auto const& this_task : tasks_) {
if (this_task->ready()) {
this_task->resume();
rm_tasks.push_back(this_task);
}
}
for (auto const& rm_task : rm_tasks) {
tasks_.remove(rm_task);
delete rm_task;
}
rm_tasks.clear();
std::this_thread::yield(); // 让出cpu调度防止空转
}
}

private:
std::list<task*> tasks_{};
};

// timer_event
struct timer_event final : virtual event<std::chrono::time_point<std::chrono::system_clock>> {
[[nodiscard]] bool ready() const override {
return std::chrono::system_clock::now() >= this->point_;
}
[[nodiscard]] std::chrono::time_point<std::chrono::system_clock> return_value() const override {
return this->point_;
}
template <typename Rep, typename Period>
void reset(const std::chrono::duration<Rep, Period>& sleep) {
this->point_ = std::chrono::system_clock::now() + sleep;
}
template <typename Rep, typename Period>
explicit timer_event(const std::chrono::duration<Rep, Period>& sleep) : point_(std::chrono::system_clock::now() + sleep) {
}

private:
std::chrono::time_point<std::chrono::system_clock> point_;
};

struct simple_task final : task {
bool ready() override {
return this->is_ready_();
}
void resume() override {
this->resume_();
}
simple_task(const std::function<bool()>& is_ready, const std::function<void()>& resume) : is_ready_(is_ready), resume_(resume) {
}

private:
std::function<bool()> is_ready_;
std::function<void()> resume_;
};

// coroutine 对象
struct async {
struct promise_type;
using coroutine_handle = std::coroutine_handle<promise_type>;

struct promise_type {
// 1. coroutine 创建的时候因为它没办法直接调用 coroutine 的构造函数,因为开放了一个 coroutine::promise_type::get_return_object 函数来创建coroutine对象!因此 promise_type 的构造函数必须是无参的!
async get_return_object() {
SPDLOG_INFO("async::promise_type::get_return_object");
return async{coroutine_handle::from_promise(*this)};
}

// 2. get_return_object 后会立即调用 initial_suspend 需要返回一个 awaitables 对象
// 2.1 如果返回 std::suspend_never,表示继续执行,即进入coroutine函数体执行
std::suspend_never initial_suspend() {
SPDLOG_INFO("async::promise_type::initial_suspend");
return {};
}
// coroutine 结束
std::suspend_never final_suspend() noexcept {
SPDLOG_INFO("async::promise_type::final_suspend");
return {};
}
// co_return void 结束
void return_void() {
SPDLOG_INFO("async::promise_type::return_void");
}
// coroutine 抛出异常
void unhandled_exception() {
SPDLOG_INFO("async::promise_type::unhandled_exception");
}
// 这个是我们自己实现的函数!
void resume() {
SPDLOG_INFO("async::promise_type::resume");
const auto handle = coroutine_handle::from_promise(*this);
if (!handle) {
return;
}
handle();
}
};

template <typename T>
struct event_awaiter {
[[nodiscard]] auto await_ready() const noexcept {
SPDLOG_INFO("async::event_awaiter::await_ready()");
// 在async函数中
// 1. 事件是否准备好
// 2. 没有准备好执行 await_suspend
return this->event_.ready();
}

[[nodiscard]] auto await_suspend(std::coroutine_handle<promise_type> handler) noexcept {
SPDLOG_INFO("async::event_awaiter::await_suspend()");
// 3. 没有准备好,放入 executor_ 的队列中,并且让出当前执行函数,等待调度器恢复
this->executor_.push(new simple_task{[this]() -> bool {
return this->event_.ready();
},
[handler]() -> void {
handler.promise().resume();
}});
}
[[nodiscard]] auto await_resume() const noexcept {
SPDLOG_INFO("async::event_awaiter::await_resume()");
// 执行成功,获取返回值
return this->event_.return_value();
}

event_awaiter(event<T>& event, executor& executor) : event_(event), executor_(executor) {
}

private:
event<T>& event_;
executor& executor_;
};

explicit async(const coroutine_handle& handle) : handle_(handle) {
}

private:
coroutine_handle handle_{};
};

template <typename Rep, typename Period>
async sleep(executor& executor, const std::string& name, std::chrono::duration<Rep, Period> sleep_time) {
timer_event event(sleep_time);
SPDLOG_INFO("[{}] start event", name);
co_await async::event_awaiter(event, executor);
SPDLOG_INFO("[{}] done event.", name);
}


int main() {
using namespace std::chrono_literals;
executor executor{};
sleep(executor, "coroutine-1", 1s); // 会创建一个 coroutine !
sleep(executor, "coroutine-2", 2s);
executor.run();
}

asio - 异步 写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include "asio.hpp"
#include "spdlog/spdlog.h"

int main() {
asio::io_context ctx;
using namespace std::chrono_literals;
// 注册时间
asio::steady_timer timer(ctx, 1s);
SPDLOG_INFO("steady_timer start");

timer.async_wait([](auto) {
SPDLOG_INFO("steady_timer tigger"); // 当事件触发回回掉此函数
});

// 启动调度器
ctx.run();
}

asio - coroutine 写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include "asio.hpp"
#include "spdlog/spdlog.h"

asio::awaitable<void> sleep() {
using namespace std::chrono_literals;
SPDLOG_INFO("steady_timer start");
asio::steady_timer timer(co_await asio::this_coro::executor, 1s);
co_await timer.async_wait(asio::use_awaitable);
SPDLOG_INFO("steady_timer tigger");
}

int main() {
asio::io_context ctx;
asio::co_spawn(ctx, sleep(), asio::detached); // 把 sleep这个coroutine(任务) 交给 ctx 去调度
ctx.run();
}

总结

我们可以看到不论是异步还是coroutine,只要封装得当,其实没有啥差异,不过coroutine这种同步写法更适合我们去编程!所以c++26都快出来了,还不用c++20的coroutine吗?

本文没有降到 coroutine - generator 模型,对于网络开发来说基本上用不到也!

asio 介绍

这个是 asio 官方的示例代码 https://think-async.com/Asio/asio-1.28.0/doc/asio/examples/cpp20_examples.html 可以看下

asio 提供了丰富的coroutine原语,我们可以基于其实现更上层的业务代码!可以说 asio 就是一门coroutine语言!

tcp服务

asio 整体设计的非常的简单,整体上来看就是创建一个io_context,然后将异步任务(coroutine)绑定到io_context中,然后运行io_context

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include "asio.hpp"
#include "spdlog/spdlog.h"
#include "fmt/chrono.h"

template <class Rep, class Period>
asio::awaitable<void> do_print(const std::string& task_name, std::chrono::duration<Rep, Period> sleep) {
// 获取当前coroutine的执行器
auto executor = co_await asio::this_coro::executor;

for (;;) {
// 创建一个timer
auto steady_timer = asio::steady_timer(executor, sleep);
// 等待这个timer触发
co_await steady_timer.async_wait(asio::use_awaitable);
SPDLOG_INFO("[{}] sleep {}", task_name, sleep);
}
}

int main() {
using namespace std::chrono_literals;

// 1. 初始化io_context, 你可以理解为是一个调度器
asio::io_context ctx{};

std::string task1="task1";
std::string task2="task2";

// 2. 创建异步任务 绑定 到 io_context
asio::co_spawn(ctx, do_print(task1, 1s), asio::detached);
asio::co_spawn(ctx, do_print(task2, 2s), asio::detached);

// 3. 运行io_context
ctx.run();
}

co_return

co_return 本质上会调用 promise_type::return_value 函数! 对于一些有返回值的 coroutine 可能需要特殊处理下!通常做法就是coroutine也是一个awaitable对象!

asio::detached 实际上会创建并开始协程,所以这里我们是没办法直接获取 read_data 数据,需要使用 run 函数包装下!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include "asio.hpp"
#include "asio/experimental/coro.hpp"
#include "spdlog/spdlog.h"

asio::awaitable<std::string> read_data() {
const auto executor = co_await asio::this_coro::executor;
asio::system_timer timer(executor, std::chrono::seconds(1));
co_await timer.async_wait(asio::use_awaitable);
co_return "hello world";
}

asio::awaitable<void> run() {
SPDLOG_INFO("run start");
auto result = co_await read_data();
SPDLOG_INFO("run end {}", result);
}

int main(int argc, char* argv[]) {
asio::io_context ctx;
asio::co_spawn(ctx, run(), asio::detached);
ctx.run();
}

channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include "asio.hpp"
#include "asio/experimental/channel.hpp"
#include "spdlog/spdlog.h"

// 注意 channel 必需第一个参数是 asio::error_code,其他是自己需要传递的参数,可以多个!
// channel是绑定了io_context不能跨io_context调度!
using int_channel = asio::experimental::channel<void(asio::error_code, int)>;

asio::awaitable<void> produce(int_channel& channel) {
for (int x = 0; x < 10; x++) {
co_await channel.async_send(asio::error_code{}, x + 1, asio::use_awaitable);
}
// 用完后记得close
channel.close();
}

asio::awaitable<void> consume(int_channel& channel) {
for (;;) {
auto [errcode, num] = co_await channel.async_receive(asio::as_tuple(asio::use_awaitable));
if (errcode) {
if (errcode == asio::experimental::channel_errc::channel_closed) {
SPDLOG_INFO("channel_closed");
co_return;
}
SPDLOG_INFO("system error code: {}, message: {}", errcode.value(), errcode.message());
co_return;
}
SPDLOG_INFO("receive: {}", num);
}
}

asio::awaitable<void> consume_try_catch(int_channel& channel) {
asio::steady_timer timer(channel.get_executor(), std::chrono::seconds(1));
co_await timer.async_wait(asio::use_awaitable);
SPDLOG_INFO("start receive");
for (;;) {
try {
auto num = co_await channel.async_receive(asio::use_awaitable);
SPDLOG_INFO("receive: {}", num);
} catch (const std::exception& err) {
// 也可以 try catch抓
SPDLOG_INFO("error: {}", err.what());
co_return;
}
}
}

int main(int argc, char* argv[]) {
asio::io_context ctx;
int_channel channel(ctx);
asio::co_spawn(ctx, produce(channel), asio::detached);
asio::co_spawn(ctx, consume(channel), asio::detached);
ctx.run();
}

deferred 和 use_awaitable 区别

总结一下就是 deferred 会创建一个 deferred_async_operation (awaitable对象) ,然后需要我们需要执行的时候再使用 co_await!可以通过下面例子体验下!

但是有个坑爹的地方在于 deferred_async_operation 貌似没有实现move!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include "asio.hpp"
#include "spdlog/spdlog.h"
using namespace std::chrono_literals;


asio::awaitable<void> do_sleep(asio::io_context& context) {
asio::steady_timer timer(context, std::chrono::seconds(1));
// 创建一个 deferred_async_operation
SPDLOG_INFO("do_sleep start");
const auto timer_async_op = timer.async_wait(asio::deferred);

// 中间可以做一些别的事情!
SPDLOG_INFO("do_sleep end");

// 需要执行的时候再执行 deferred_async_operation
co_await timer_async_op(asio::use_awaitable);
SPDLOG_INFO("do_sleep end");
}

asio::awaitable<void> do_sleep_use_awaitable(asio::io_context& context) {
asio::steady_timer timer(context, std::chrono::seconds(1));
// 如果你需要直接 await 那么你只需要 use_awaitable,不推荐用 deferred
SPDLOG_INFO("do_sleep_use_awaitable start");
co_await timer.async_wait(asio::use_awaitable);
SPDLOG_INFO("do_sleep_use_awaitable end");
}


int main() {
asio::io_context context{};
asio::co_spawn(context, do_sleep(context), asio::detached);
context.run();
}

asio::make_work_guard

work_guard 非常重要,通过上面的例子其实我没发现当事件全部结束那么 io_context.run()将会执行结束退出!那么问题是啥呢?有些时候我们可能需要在一些后置流程中给 io_context 添加任务,那么此时 work_guard 就非常有用了! 下面这个 master+worker模型中就用到了!

asio 异常设计

c++20的异常设计是coroutine如果不抓异常,那么直接忽略了,asio为了高性能,提供了两种方案,一种是手动抓取返回值 err_code ,一种是抓取异常,推荐前者,上面讲到的 channel 例子中就有提到!

多个 coroutine awaitable_operators

在Go里面实际上是支持channel select + waitgroup的,但是在asio中实际上实现这俩能力需要借助 awaitable_operators !

or

or 对标的就是channel select,我们可以选择一个等待的事件去处理!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include "asio.hpp"
#include "asio/experimental/awaitable_operators.hpp"
#include "fmt/chrono.h"
#include "spdlog/spdlog.h"

template <class Rep, class Period>
asio::awaitable<std::string> create_task(std::chrono::duration<Rep, Period> spend, std::string task_name) {
const auto executor = co_await asio::this_coro::executor;
asio::steady_timer timer(executor, spend);
SPDLOG_INFO("create_io_task - {} start. spend {}", task_name, spend);
co_await timer.async_wait(asio::use_awaitable);
SPDLOG_INFO("create_io_task - {} end.", task_name);
co_return task_name;
}

asio::awaitable<void> select_one_task() {
using namespace asio::experimental::awaitable_operators;
using namespace std::chrono_literals;
const auto result = co_await (create_task(1s, "task1") || create_task(2s, "task2"));
if (const auto name = std::get_if<0>(&result); name != nullptr) {
SPDLOG_INFO("select task0 - {}", *name);
}
if (const auto name = std::get_if<1>(&result); name != nullptr) {
SPDLOG_INFO("select task1 - {}", *name);
}
}

asio::awaitable<void> wait_group() {
using namespace asio::experimental::awaitable_operators;
using namespace std::chrono_literals;
const auto& [task1, task2] = co_await (create_task(1s, "task1") && create_task(2s, "task2"));
SPDLOG_INFO("result: {} {}", task1, task2);
}

int main(int argc, char* argv[]) {
asio::io_context context{};
asio::co_spawn(context, select_one_task(), asio::detached);
context.run();
}
// 输出
// [2024-04-12 14:34:07.631] [info] [awaitable_operators_asio.cpp:10] create_io_task - task1 start. spend 1s
// [2024-04-12 14:34:07.632] [info] [awaitable_operators_asio.cpp:10] create_io_task - task2 start. spend 2s
// [2024-04-12 14:34:08.631] [info] [awaitable_operators_asio.cpp:12] create_io_task - task1 end.
// [2024-04-12 14:34:08.632] [info] [awaitable_operators_asio.cpp:21] select task0 - task1

and

and 对标的就是 waitgroup,我们可以等待多个事件结束后再处理!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
asio::awaitable<void> wait_group() {
using namespace asio::experimental::awaitable_operators;
using namespace std::chrono_literals;
const auto& [task1, task2] = co_await (create_io_task(1s, "task1") && create_io_task(2s, "task2"));
SPDLOG_INFO("result: {} {}", task1, task2);
}

int main(int argc, char* argv[]) {
asio::io_context context{};
asio::co_spawn(context, wait_group(), asio::detached);
context.run();
}

//输出:
//[2024-04-12 14:34:57.945] [info] [awaitable_operators_asio.cpp:10] create_io_task - task1 start. spend 1s
//[2024-04-12 14:34:57.946] [info] [awaitable_operators_asio.cpp:10] create_io_task - task2 start. spend 2s
//[2024-04-12 14:34:58.945] [info] [awaitable_operators_asio.cpp:12] create_io_task - task1 end.
//[2024-04-12 14:34:59.947] [info] [awaitable_operators_asio.cpp:12] create_io_task - task2 end.
//[2024-04-12 14:34:59.947] [info] [awaitable_operators_asio.cpp:32] result: task1 task2

备注

这里我举的是 二元操作,实际上是支持下面这个操作!具体返回值是 std::variant<string, string, string> 的!

1
const auto result = co_await (create_task(1s, "task1") || create_task(2s, "task2") || create_task(3s, "task3"));

网络编程

其实在asio中我们不难发现,aiso整体设计都是围绕着io_context去走,很显然上面例子都是单线程的,但是实际业务不可能用单线程,因此通常有几种模型

  • 单io_context + 单线程,性能瓶颈较大!
  • 单io_context + 多线程,用法简单,但是坏处是不清楚当前coroutine(代码)执行在哪个线程上,天然的需要考虑多线程/数据竞争/多线程数据一致性等问题!
  • master + worker 模型,master io_context 负责一些框架,网络请求的连接/读/写/关闭事件,真正的业务处理交给 worker io_context,这种好处就是同一个请求会走一个io_context 天然的没有多线程问题!

[1] 单io_context + 单线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include "asio.hpp"
#include "spdlog/spdlog.h"

using namespace asio::ip;

asio::awaitable<void> echo(tcp::socket socket) {
try {
char data[1024];
for (;;) {
const auto size = co_await socket.async_read_some(asio::buffer(data), asio::use_awaitable);
// SPDLOG_INFO("{} {} - read", thread_id(), socket.remote_endpoint());
co_await async_write(socket, asio::buffer(data, size), asio::use_awaitable);
// SPDLOG_INFO("{} {} - write", thread_id(), socket.remote_endpoint());
}
} catch (std::exception& e) {
SPDLOG_INFO("{} {} - echo Exception: {}", thread_id(), socket.remote_endpoint(), e.what());
socket.close();
}
}

asio::awaitable<void> handler_listener(tcp::acceptor& listener) {
for (;;) {
auto conn = co_await listener.async_accept(asio::use_awaitable);
SPDLOG_INFO("{} - receive conn {} -> {}", thread_id(), conn.remote_endpoint(), conn.local_endpoint());
asio::co_spawn(listener.get_executor(), echo(std::move(conn)), asio::detached);
}
}

int main() {
asio::io_context ctx;
std::allocator<void> alloc;
tcp::acceptor listener(ctx, tcp::endpoint(tcp::v4(), 8080));
asio::co_spawn(ctx, handler_listener(listener), asio::detached);
ctx.run();
}

[2] 单io_context + 多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include "asio.hpp"
#include "spdlog/spdlog.h"

using namespace asio::ip;

struct multi_thread {
explicit multi_thread(const size_t size) : threads_(std::vector<std::thread>(size)) {
}

~multi_thread() {
for (auto& thread : threads_) {
if (thread.joinable()) {
thread.join();
}
}
}

void run(const std::function<void()>& foo) {
for (auto& thread : threads_) {
thread = std::move(std::thread(foo));
}
}

private:
std::vector<std::thread> threads_;
};


int main() {
asio::io_context ctx;
tcp::acceptor listener(ctx, tcp::endpoint(tcp::v4(), 8080));
asio::co_spawn(ctx, handler_listener(listener), asio::detached);
multi_thread multi_threads(4);
multi_threads.run([&] {
ctx.run();
});
}

[3] master + worker 模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#include "asio.hpp"
#include "spdlog/spdlog.h"

struct worker_context {
worker_context() = default;

void run() {
thread_ = std::move(std::thread([this] {
io_ctx.run();
}));
}

~worker_context() {
io_ctx.stop();
if (this->thread_.joinable()) {
thread_.join();
}
}

asio::awaitable<void> echo(asio::ip::tcp::socket socket) {
SPDLOG_INFO("{} - handle conn {}", thread_id(), socket.remote_endpoint());
try {
char data[1024];
for (;;) {
const auto size = co_await socket.async_read_some(asio::buffer(data), asio::use_awaitable);
// SPDLOG_INFO("{} {} - read", thread_id(), socket.remote_endpoint());
co_await async_write(socket, asio::buffer(data, size), asio::use_awaitable);
// SPDLOG_INFO("{} {} - write", thread_id(), socket.remote_endpoint());
}
} catch (std::exception& e) {
SPDLOG_INFO("{} {} - echo Exception: {}", thread_id(), socket.remote_endpoint(), e.what());
socket.close();
}
}
asio::io_context& get_context() {
return io_ctx;
}

private:
asio::io_context io_ctx{};
std::thread thread_{};
};

struct main_context {
using work_guard_type = decltype(asio::make_work_guard(std::declval<worker_context>().get_context()));

explicit main_context(size_t worker_size) : worker_contexts_(std::vector<worker_context>(worker_size)){};

void run() {
asio::ip::tcp::acceptor listener(io_ctx_, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), 8080));
asio::co_spawn(io_ctx_, handler_listener(listener), asio::detached);

std::vector<work_guard_type> work_guards{};
for (auto& worker_context : worker_contexts_) {
work_guards.push_back(asio::make_work_guard(worker_context.get_context()));
worker_context.run();
}
io_ctx_.run();
}

~main_context() {
io_ctx_.stop();
}

asio::awaitable<void> handler_listener(asio::ip::tcp::acceptor& listener) {
for (;;) {
auto conn = co_await listener.async_accept(asio::use_awaitable);
conn_counter = conn_counter + 1;
SPDLOG_INFO("{} - receive conn[{}] {} -> {}", thread_id(), conn_counter, conn.remote_endpoint(), conn.local_endpoint());
auto& worker_context = worker_contexts_[conn_counter % worker_contexts_.size()];
asio::co_spawn(worker_context.get_context(), worker_context.echo(std::move(conn)), asio::detached);
}
}

private:
asio::io_context io_ctx_{};
std::vector<worker_context> worker_contexts_{};
size_t conn_counter{};
};

int main() {
main_context main_context(4);
main_context.run();
}

性能对比

  1. 单 io_context + 单线程
1
2
3
4
5
6
~ devtool tcp  benchmark_echo_service --conn 100  --count 10000
[INFO] 00:21:21.818 addr=localhost:8080, max_conn=100, max_req=10000, data_size=64, run_time=10s, interval=0s
[INFO] 00:21:31.819 latency avg(req): 870.994µs
[INFO] 00:21:31.819 throughput avg(s): 114507
[INFO] 00:21:31.819 total success request: 1145075
[INFO] 00:21:31.819 total error request: 0
  1. 单 io_context + 多线程
1
2
3
4
5
6
7
~ devtool tcp  benchmark_echo_service --conn 100  --count 10000
[INFO] 00:15:45.176 addr=localhost:8080, max_conn=100, max_req=10000, data_size=64, run_time=10s, interval=0s
[INFO] 00:15:55.179 latency avg(req): 371.247µs
[INFO] 00:15:55.179 throughput avg(s): 268017
[INFO] 00:15:55.179 total success request: 2680170
[INFO] 00:15:55.179 total error request: 0

  1. master + worker 模型
1
2
3
4
5
6
~ devtool tcp  benchmark_echo_service --conn 100  --count 10000
[INFO] 00:16:43.651 addr=localhost:8080, max_conn=100, max_req=10000, data_size=64, run_time=10s, interval=0s
[INFO] 00:16:53.653 latency avg(req): 512.29µs
[INFO] 00:16:53.653 throughput avg(s): 194573
[INFO] 00:16:53.653 total success request: 1945732
[INFO] 00:16:53.653 total error request: 0

总结

单 io_context + 单线程在echo这种简单的模型下都显得最弱,确实如此因为利用不了系统资源!

单 io_context + 多线程 模型依赖于asio的强大的调度器来看确实性能会优秀一些,但是会出现一个问题,导致同一个请求读写事件回掉会分配到不同的线程执行,会导致线程切换开销较大

master + worker 模型用的不太对,因为 master io_context仅处理接收连接(在这个例子中它仅处理了100个事件),worker io_context仅处理连接的读写事件,好处在于 同一个请求的全部任务都在一个线程上执行,开销小!

待优化:master io_context 处理所有请求的连接/读/写事件,worker io_context 仅去执行具体的业务逻辑,但是我这个例子里体现不出来,因为我是一个echo!后续我可以实现一个简单的HTTP服务器试试!

为什么 master + worker 模型 会比较好呢?因为实际业务中我们业务逻辑的开销往往占大头!而IO和业务线程混合在一起会导致资源分配不均匀,比如io事件延时较高抖动等!

asio 确实是一个非常强力的网络框架,提供了非常丰富的 coroutine 原语,例如 channle、join 等,目前在开源的现状看确实asio的功能最为丰富!

其他

对于网络编程来说,超时控制也非常的重要,asio/更底层的网络库(epoll/io_uring)等并不会直接提供读、写、连接等超时选项,所以通常需要业务自己控制请求!通常的做法就是与 timer 一起维护!上文我们也讲了,可以使用 awaitable_operators 进行处理!

备注

  1. 使用到两个库: spdlog & fmt!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include "fmt/core.h"
#include "fmt/ostream.h"
#include "spdlog/spdlog.h"

template <>
struct fmt::formatter<decltype(std::this_thread::get_id())> : fmt::ostream_formatter {};

std::string thread_id() {
return fmt::to_string(std::this_thread::get_id());
}

template <typename T>
struct fmt::formatter<asio::ip::basic_endpoint<T>> : fmt::formatter<std::string_view> {
auto format(const asio::ip::basic_endpoint<T>& endpoint, fmt::format_context& ctx) {
return fmt::format_to(ctx.out(), "[{}]:{}", endpoint.address().to_string(), endpoint.port());
}
};
  1. 本文的代码都在这个项目里:https://github.com/Anthony-Dong/cpp
本人坚持原创技术分享,如果你觉得文章对您有用,请随意打赏! 如果有需要咨询的请发送到我的邮箱!