协程和异步的关系是什么?C++20协程的原理是什么?如何使用Asio开发高性能网络框架!
开发环境
协程和异步
网络开发中异步的性能往往会远远高于同步,异步的好处在于当事件真正触发的时候才会去执行对应逻辑,因此可以在消耗较少的系统资源下就可以完成非常高的并发!主要原因还是网络开发中大部分开销都在网络IO上!
coroutine(协程) 主要解决的就是异步的问题,异步代码最大的问题就是回掉,当你逻辑复杂点,简直的回掉地狱(如果你代码抽象的不好的话),例如envoy就是采用的异步(libevent)去实现的!coroutine为啥能解决异步的问题呢?coroutine本质上是一个可以暂停(suspend)和恢复(resumed)的函数,异步本质上也是因为当我们创建一个异步任务的时候那么实际上就是一个函数暂停的操作,当真正事件触发的时候才会恢复执行你的回掉函数!
coroutine 是在语言层面支持了函数的暂停和恢复,要实现只能对应的语言、编译器去支持,例如 c++20、javascript、c# 、rust 这几种就是在语言层面支持了函数的暂停和恢复,具体怎么实现的大家应该都能理解,就是将函数在暂停点进行拆分,从而一个函数被拆分成为多个函数,当事件触发的时候执行暂停点之后的函数即可!所以它本质上和异步并无差异,只不过可读性会高很多,你可以理解为是语法糖,所以我们在实现coroutine的时候也需要实现一个调度器!在C++中调度器比较出名的就 libevent 、asio吧(跨平台能力会比较好),他们都支持计时器、网络IO等事件调度!
不清楚大家用过Go有没有,Go语言是我觉得对于协程设计最完美的语言!它太简单了,简单到任何人都可以快速的实现一个高并发服务器!这就是协程的魅力,在多线程时代你要实现高并发服务器,太难了,多线程高并发基于都是基于异步去做的!很多人认为Go的协程是Fiber(纤程),其实概念这个东西无可厚非,程序员都比较务实只要快就行了!
基于 coroutine 自己实现一个 timer 调度器
官方文档: https://en.cppreference.com/w/cpp/language/coroutines
在C++ coroutine 设计中,一个函数是协程要求需要返回一个coroutine对象,coroutine对象要求内部有一个promise_type类型,promise_type需要实现几个钩子函数来控制coroutine对象的,下面例子的async实际上就是coroutine对象。我们在协程中可以调用 co_await
一个 awaitable 对象,awaitable对象可以操作coroutine_handle进而实现恢复操作,暂停操作是通过勾子函数实现的!其次main函数不允许返回coroutine对象,因此main函数不能是一个coroutine,也就不能调用co_await
等关键词了!
下面这个例子你可以更好的了解c++ coroutine的设计原理!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
| #include "spdlog/spdlog.h" #include <coroutine> #include <iostream> #include <list>
template <typename T> struct event { event() = default; virtual ~event() = default; [[nodiscard]] virtual bool ready() const = 0; [[nodiscard]] virtual T return_value() const = 0; };
struct task { task() = default; virtual ~task() = default; [[nodiscard]] virtual bool ready() = 0; virtual void resume() = 0; };
struct executor { executor() = default; ~executor() = default; void push(task* task) { tasks_.push_back(task); } void run() { std::vector<task*> rm_tasks; while (true) { if (tasks_.empty()) { return; } for (auto const& this_task : tasks_) { if (this_task->ready()) { this_task->resume(); rm_tasks.push_back(this_task); } } for (auto const& rm_task : rm_tasks) { tasks_.remove(rm_task); delete rm_task; } rm_tasks.clear(); std::this_thread::yield(); } }
private: std::list<task*> tasks_{}; };
struct timer_event final : virtual event<std::chrono::time_point<std::chrono::system_clock>> { [[nodiscard]] bool ready() const override { return std::chrono::system_clock::now() >= this->point_; } [[nodiscard]] std::chrono::time_point<std::chrono::system_clock> return_value() const override { return this->point_; } template <typename Rep, typename Period> void reset(const std::chrono::duration<Rep, Period>& sleep) { this->point_ = std::chrono::system_clock::now() + sleep; } template <typename Rep, typename Period> explicit timer_event(const std::chrono::duration<Rep, Period>& sleep) : point_(std::chrono::system_clock::now() + sleep) { }
private: std::chrono::time_point<std::chrono::system_clock> point_; };
struct simple_task final : task { bool ready() override { return this->is_ready_(); } void resume() override { this->resume_(); } simple_task(const std::function<bool()>& is_ready, const std::function<void()>& resume) : is_ready_(is_ready), resume_(resume) { }
private: std::function<bool()> is_ready_; std::function<void()> resume_; };
struct async { struct promise_type; using coroutine_handle = std::coroutine_handle<promise_type>;
struct promise_type { async get_return_object() { SPDLOG_INFO("async::promise_type::get_return_object"); return async{coroutine_handle::from_promise(*this)}; }
std::suspend_never initial_suspend() { SPDLOG_INFO("async::promise_type::initial_suspend"); return {}; } std::suspend_never final_suspend() noexcept { SPDLOG_INFO("async::promise_type::final_suspend"); return {}; } void return_void() { SPDLOG_INFO("async::promise_type::return_void"); } void unhandled_exception() { SPDLOG_INFO("async::promise_type::unhandled_exception"); } void resume() { SPDLOG_INFO("async::promise_type::resume"); const auto handle = coroutine_handle::from_promise(*this); if (!handle) { return; } handle(); } };
template <typename T> struct event_awaiter { [[nodiscard]] auto await_ready() const noexcept { SPDLOG_INFO("async::event_awaiter::await_ready()"); return this->event_.ready(); }
[[nodiscard]] auto await_suspend(std::coroutine_handle<promise_type> handler) noexcept { SPDLOG_INFO("async::event_awaiter::await_suspend()"); this->executor_.push(new simple_task{[this]() -> bool { return this->event_.ready(); }, [handler]() -> void { handler.promise().resume(); }}); } [[nodiscard]] auto await_resume() const noexcept { SPDLOG_INFO("async::event_awaiter::await_resume()"); return this->event_.return_value(); }
event_awaiter(event<T>& event, executor& executor) : event_(event), executor_(executor) { }
private: event<T>& event_; executor& executor_; };
explicit async(const coroutine_handle& handle) : handle_(handle) { }
private: coroutine_handle handle_{}; };
template <typename Rep, typename Period> async sleep(executor& executor, const std::string& name, std::chrono::duration<Rep, Period> sleep_time) { timer_event event(sleep_time); SPDLOG_INFO("[{}] start event", name); co_await async::event_awaiter(event, executor); SPDLOG_INFO("[{}] done event.", name); }
int main() { using namespace std::chrono_literals; executor executor{}; sleep(executor, "coroutine-1", 1s); sleep(executor, "coroutine-2", 2s); executor.run(); }
|
asio - 异步 写法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| #include "asio.hpp" #include "spdlog/spdlog.h"
int main() { asio::io_context ctx; using namespace std::chrono_literals; asio::steady_timer timer(ctx, 1s); SPDLOG_INFO("steady_timer start");
timer.async_wait([](auto) { SPDLOG_INFO("steady_timer tigger"); });
ctx.run(); }
|
asio - coroutine 写法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| #include "asio.hpp" #include "spdlog/spdlog.h"
asio::awaitable<void> sleep() { using namespace std::chrono_literals; SPDLOG_INFO("steady_timer start"); asio::steady_timer timer(co_await asio::this_coro::executor, 1s); co_await timer.async_wait(asio::use_awaitable); SPDLOG_INFO("steady_timer tigger"); }
int main() { asio::io_context ctx; asio::co_spawn(ctx, sleep(), asio::detached); ctx.run(); }
|
总结
我们可以看到不论是异步还是coroutine,只要封装得当,其实没有啥差异,不过coroutine这种同步写法更适合我们去编程!所以c++26都快出来了,还不用c++20的coroutine吗?
本文没有降到 coroutine - generator 模型,对于网络开发来说基本上用不到也!
asio 介绍
这个是 asio 官方的示例代码 https://think-async.com/Asio/asio-1.28.0/doc/asio/examples/cpp20_examples.html 可以看下
asio 提供了丰富的coroutine原语,我们可以基于其实现更上层的业务代码!可以说 asio 就是一门coroutine语言!
tcp服务
asio 整体设计的非常的简单,整体上来看就是创建一个io_context
,然后将异步任务(coroutine)绑定到io_context中,然后运行io_context
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| #include "asio.hpp" #include "spdlog/spdlog.h" #include "fmt/chrono.h"
template <class Rep, class Period> asio::awaitable<void> do_print(const std::string& task_name, std::chrono::duration<Rep, Period> sleep) { auto executor = co_await asio::this_coro::executor;
for (;;) { auto steady_timer = asio::steady_timer(executor, sleep); co_await steady_timer.async_wait(asio::use_awaitable); SPDLOG_INFO("[{}] sleep {}", task_name, sleep); } }
int main() { using namespace std::chrono_literals;
asio::io_context ctx{};
std::string task1="task1"; std::string task2="task2";
asio::co_spawn(ctx, do_print(task1, 1s), asio::detached); asio::co_spawn(ctx, do_print(task2, 2s), asio::detached);
ctx.run(); }
|
co_return
co_return 本质上会调用 promise_type::return_value 函数! 对于一些有返回值的 coroutine 可能需要特殊处理下!通常做法就是coroutine也是一个awaitable对象!
asio::detached
实际上会创建并开始协程,所以这里我们是没办法直接获取 read_data 数据,需要使用 run 函数包装下!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| #include "asio.hpp" #include "asio/experimental/coro.hpp" #include "spdlog/spdlog.h"
asio::awaitable<std::string> read_data() { const auto executor = co_await asio::this_coro::executor; asio::system_timer timer(executor, std::chrono::seconds(1)); co_await timer.async_wait(asio::use_awaitable); co_return "hello world"; }
asio::awaitable<void> run() { SPDLOG_INFO("run start"); auto result = co_await read_data(); SPDLOG_INFO("run end {}", result); }
int main(int argc, char* argv[]) { asio::io_context ctx; asio::co_spawn(ctx, run(), asio::detached); ctx.run(); }
|
channel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| #include "asio.hpp" #include "asio/experimental/channel.hpp" #include "spdlog/spdlog.h"
using int_channel = asio::experimental::channel<void(asio::error_code, int)>;
asio::awaitable<void> produce(int_channel& channel) { for (int x = 0; x < 10; x++) { co_await channel.async_send(asio::error_code{}, x + 1, asio::use_awaitable); } channel.close(); }
asio::awaitable<void> consume(int_channel& channel) { for (;;) { auto [errcode, num] = co_await channel.async_receive(asio::as_tuple(asio::use_awaitable)); if (errcode) { if (errcode == asio::experimental::channel_errc::channel_closed) { SPDLOG_INFO("channel_closed"); co_return; } SPDLOG_INFO("system error code: {}, message: {}", errcode.value(), errcode.message()); co_return; } SPDLOG_INFO("receive: {}", num); } }
asio::awaitable<void> consume_try_catch(int_channel& channel) { asio::steady_timer timer(channel.get_executor(), std::chrono::seconds(1)); co_await timer.async_wait(asio::use_awaitable); SPDLOG_INFO("start receive"); for (;;) { try { auto num = co_await channel.async_receive(asio::use_awaitable); SPDLOG_INFO("receive: {}", num); } catch (const std::exception& err) { SPDLOG_INFO("error: {}", err.what()); co_return; } } }
int main(int argc, char* argv[]) { asio::io_context ctx; int_channel channel(ctx); asio::co_spawn(ctx, produce(channel), asio::detached); asio::co_spawn(ctx, consume(channel), asio::detached); ctx.run(); }
|
deferred 和 use_awaitable 区别
总结一下就是 deferred 会创建一个 deferred_async_operation
(awaitable对象) ,然后需要我们需要执行的时候再使用 co_await!可以通过下面例子体验下!
但是有个坑爹的地方在于 deferred_async_operation 貌似没有实现move!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| #include "asio.hpp" #include "spdlog/spdlog.h" using namespace std::chrono_literals;
asio::awaitable<void> do_sleep(asio::io_context& context) { asio::steady_timer timer(context, std::chrono::seconds(1)); SPDLOG_INFO("do_sleep start"); const auto timer_async_op = timer.async_wait(asio::deferred);
SPDLOG_INFO("do_sleep end");
co_await timer_async_op(asio::use_awaitable); SPDLOG_INFO("do_sleep end"); }
asio::awaitable<void> do_sleep_use_awaitable(asio::io_context& context) { asio::steady_timer timer(context, std::chrono::seconds(1)); SPDLOG_INFO("do_sleep_use_awaitable start"); co_await timer.async_wait(asio::use_awaitable); SPDLOG_INFO("do_sleep_use_awaitable end"); }
int main() { asio::io_context context{}; asio::co_spawn(context, do_sleep(context), asio::detached); context.run(); }
|
asio::make_work_guard
work_guard 非常重要,通过上面的例子其实我没发现当事件全部结束那么 io_context.run()将会执行结束退出!那么问题是啥呢?有些时候我们可能需要在一些后置流程中给 io_context 添加任务,那么此时 work_guard 就非常有用了! 下面这个 master+worker模型中就用到了!
asio 异常设计
c++20的异常设计是coroutine如果不抓异常,那么直接忽略了,asio为了高性能,提供了两种方案,一种是手动抓取返回值 err_code ,一种是抓取异常,推荐前者,上面讲到的 channel 例子中就有提到!
多个 coroutine awaitable_operators
在Go里面实际上是支持channel select + waitgroup的,但是在asio中实际上实现这俩能力需要借助 awaitable_operators !
or
or 对标的就是channel select,我们可以选择一个等待的事件去处理!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| #include "asio.hpp" #include "asio/experimental/awaitable_operators.hpp" #include "fmt/chrono.h" #include "spdlog/spdlog.h"
template <class Rep, class Period> asio::awaitable<std::string> create_task(std::chrono::duration<Rep, Period> spend, std::string task_name) { const auto executor = co_await asio::this_coro::executor; asio::steady_timer timer(executor, spend); SPDLOG_INFO("create_io_task - {} start. spend {}", task_name, spend); co_await timer.async_wait(asio::use_awaitable); SPDLOG_INFO("create_io_task - {} end.", task_name); co_return task_name; }
asio::awaitable<void> select_one_task() { using namespace asio::experimental::awaitable_operators; using namespace std::chrono_literals; const auto result = co_await (create_task(1s, "task1") || create_task(2s, "task2")); if (const auto name = std::get_if<0>(&result); name != nullptr) { SPDLOG_INFO("select task0 - {}", *name); } if (const auto name = std::get_if<1>(&result); name != nullptr) { SPDLOG_INFO("select task1 - {}", *name); } }
asio::awaitable<void> wait_group() { using namespace asio::experimental::awaitable_operators; using namespace std::chrono_literals; const auto& [task1, task2] = co_await (create_task(1s, "task1") && create_task(2s, "task2")); SPDLOG_INFO("result: {} {}", task1, task2); }
int main(int argc, char* argv[]) { asio::io_context context{}; asio::co_spawn(context, select_one_task(), asio::detached); context.run(); }
|
and
and 对标的就是 waitgroup,我们可以等待多个事件结束后再处理!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| asio::awaitable<void> wait_group() { using namespace asio::experimental::awaitable_operators; using namespace std::chrono_literals; const auto& [task1, task2] = co_await (create_io_task(1s, "task1") && create_io_task(2s, "task2")); SPDLOG_INFO("result: {} {}", task1, task2); }
int main(int argc, char* argv[]) { asio::io_context context{}; asio::co_spawn(context, wait_group(), asio::detached); context.run(); }
|
备注
这里我举的是 二元操作,实际上是支持下面这个操作!具体返回值是 std::variant<string, string, string>
的!
1
| const auto result = co_await (create_task(1s, "task1") || create_task(2s, "task2") || create_task(3s, "task3"));
|
网络编程
其实在asio中我们不难发现,aiso整体设计都是围绕着io_context去走,很显然上面例子都是单线程的,但是实际业务不可能用单线程,因此通常有几种模型
- 单io_context + 单线程,性能瓶颈较大!
- 单io_context + 多线程,用法简单,但是坏处是不清楚当前coroutine(代码)执行在哪个线程上,天然的需要考虑多线程/数据竞争/多线程数据一致性等问题!
- master + worker 模型,master io_context 负责一些框架,网络请求的连接/读/写/关闭事件,真正的业务处理交给 worker io_context,这种好处就是同一个请求会走一个io_context 天然的没有多线程问题!
[1] 单io_context + 单线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| #include "asio.hpp" #include "spdlog/spdlog.h"
using namespace asio::ip;
asio::awaitable<void> echo(tcp::socket socket) { try { char data[1024]; for (;;) { const auto size = co_await socket.async_read_some(asio::buffer(data), asio::use_awaitable); co_await async_write(socket, asio::buffer(data, size), asio::use_awaitable); } } catch (std::exception& e) { SPDLOG_INFO("{} {} - echo Exception: {}", thread_id(), socket.remote_endpoint(), e.what()); socket.close(); } }
asio::awaitable<void> handler_listener(tcp::acceptor& listener) { for (;;) { auto conn = co_await listener.async_accept(asio::use_awaitable); SPDLOG_INFO("{} - receive conn {} -> {}", thread_id(), conn.remote_endpoint(), conn.local_endpoint()); asio::co_spawn(listener.get_executor(), echo(std::move(conn)), asio::detached); } }
int main() { asio::io_context ctx; std::allocator<void> alloc; tcp::acceptor listener(ctx, tcp::endpoint(tcp::v4(), 8080)); asio::co_spawn(ctx, handler_listener(listener), asio::detached); ctx.run(); }
|
[2] 单io_context + 多线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| #include "asio.hpp" #include "spdlog/spdlog.h"
using namespace asio::ip;
struct multi_thread { explicit multi_thread(const size_t size) : threads_(std::vector<std::thread>(size)) { }
~multi_thread() { for (auto& thread : threads_) { if (thread.joinable()) { thread.join(); } } }
void run(const std::function<void()>& foo) { for (auto& thread : threads_) { thread = std::move(std::thread(foo)); } }
private: std::vector<std::thread> threads_; };
int main() { asio::io_context ctx; tcp::acceptor listener(ctx, tcp::endpoint(tcp::v4(), 8080)); asio::co_spawn(ctx, handler_listener(listener), asio::detached); multi_thread multi_threads(4); multi_threads.run([&] { ctx.run(); }); }
|
[3] master + worker 模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| #include "asio.hpp" #include "spdlog/spdlog.h"
struct worker_context { worker_context() = default;
void run() { thread_ = std::move(std::thread([this] { io_ctx.run(); })); }
~worker_context() { io_ctx.stop(); if (this->thread_.joinable()) { thread_.join(); } }
asio::awaitable<void> echo(asio::ip::tcp::socket socket) { SPDLOG_INFO("{} - handle conn {}", thread_id(), socket.remote_endpoint()); try { char data[1024]; for (;;) { const auto size = co_await socket.async_read_some(asio::buffer(data), asio::use_awaitable); co_await async_write(socket, asio::buffer(data, size), asio::use_awaitable); } } catch (std::exception& e) { SPDLOG_INFO("{} {} - echo Exception: {}", thread_id(), socket.remote_endpoint(), e.what()); socket.close(); } } asio::io_context& get_context() { return io_ctx; }
private: asio::io_context io_ctx{}; std::thread thread_{}; };
struct main_context { using work_guard_type = decltype(asio::make_work_guard(std::declval<worker_context>().get_context()));
explicit main_context(size_t worker_size) : worker_contexts_(std::vector<worker_context>(worker_size)){};
void run() { asio::ip::tcp::acceptor listener(io_ctx_, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), 8080)); asio::co_spawn(io_ctx_, handler_listener(listener), asio::detached);
std::vector<work_guard_type> work_guards{}; for (auto& worker_context : worker_contexts_) { work_guards.push_back(asio::make_work_guard(worker_context.get_context())); worker_context.run(); } io_ctx_.run(); }
~main_context() { io_ctx_.stop(); }
asio::awaitable<void> handler_listener(asio::ip::tcp::acceptor& listener) { for (;;) { auto conn = co_await listener.async_accept(asio::use_awaitable); conn_counter = conn_counter + 1; SPDLOG_INFO("{} - receive conn[{}] {} -> {}", thread_id(), conn_counter, conn.remote_endpoint(), conn.local_endpoint()); auto& worker_context = worker_contexts_[conn_counter % worker_contexts_.size()]; asio::co_spawn(worker_context.get_context(), worker_context.echo(std::move(conn)), asio::detached); } }
private: asio::io_context io_ctx_{}; std::vector<worker_context> worker_contexts_{}; size_t conn_counter{}; };
int main() { main_context main_context(4); main_context.run(); }
|
性能对比
- 单 io_context + 单线程
1 2 3 4 5 6
| ~ devtool tcp benchmark_echo_service --conn 100 --count 10000 [INFO] 00:21:21.818 addr=localhost:8080, max_conn=100, max_req=10000, data_size=64, run_time=10s, interval=0s [INFO] 00:21:31.819 latency avg(req): 870.994µs [INFO] 00:21:31.819 throughput avg(s): 114507 [INFO] 00:21:31.819 total success request: 1145075 [INFO] 00:21:31.819 total error request: 0
|
- 单 io_context + 多线程
1 2 3 4 5 6 7
| ~ devtool tcp benchmark_echo_service --conn 100 --count 10000 [INFO] 00:15:45.176 addr=localhost:8080, max_conn=100, max_req=10000, data_size=64, run_time=10s, interval=0s [INFO] 00:15:55.179 latency avg(req): 371.247µs [INFO] 00:15:55.179 throughput avg(s): 268017 [INFO] 00:15:55.179 total success request: 2680170 [INFO] 00:15:55.179 total error request: 0
|
- master + worker 模型
1 2 3 4 5 6
| ~ devtool tcp benchmark_echo_service --conn 100 --count 10000 [INFO] 00:16:43.651 addr=localhost:8080, max_conn=100, max_req=10000, data_size=64, run_time=10s, interval=0s [INFO] 00:16:53.653 latency avg(req): 512.29µs [INFO] 00:16:53.653 throughput avg(s): 194573 [INFO] 00:16:53.653 total success request: 1945732 [INFO] 00:16:53.653 total error request: 0
|
总结
单 io_context + 单线程在echo这种简单的模型下都显得最弱,确实如此因为利用不了系统资源!
单 io_context + 多线程 模型依赖于asio的强大的调度器来看确实性能会优秀一些,但是会出现一个问题,导致同一个请求读写事件回掉会分配到不同的线程执行,会导致线程切换开销较大
master + worker 模型用的不太对,因为 master io_context仅处理接收连接(在这个例子中它仅处理了100个事件),worker io_context仅处理连接的读写事件,好处在于 同一个请求的全部任务都在一个线程上执行,开销小!
待优化:master io_context 处理所有请求的连接/读/写事件,worker io_context 仅去执行具体的业务逻辑,但是我这个例子里体现不出来,因为我是一个echo!后续我可以实现一个简单的HTTP服务器试试!
为什么 master + worker 模型 会比较好呢?因为实际业务中我们业务逻辑的开销往往占大头!而IO和业务线程混合在一起会导致资源分配不均匀,比如io事件延时较高抖动等!
asio 确实是一个非常强力的网络框架,提供了非常丰富的 coroutine 原语,例如 channle、join 等,目前在开源的现状看确实asio的功能最为丰富!
其他
对于网络编程来说,超时控制也非常的重要,asio/更底层的网络库(epoll/io_uring)等并不会直接提供读、写、连接等超时选项,所以通常需要业务自己控制请求!通常的做法就是与 timer 一起维护!上文我们也讲了,可以使用 awaitable_operators 进行处理!
备注
- 使用到两个库: spdlog & fmt!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| #include "fmt/core.h" #include "fmt/ostream.h" #include "spdlog/spdlog.h"
template <> struct fmt::formatter<decltype(std::this_thread::get_id())> : fmt::ostream_formatter {};
std::string thread_id() { return fmt::to_string(std::this_thread::get_id()); }
template <typename T> struct fmt::formatter<asio::ip::basic_endpoint<T>> : fmt::formatter<std::string_view> { auto format(const asio::ip::basic_endpoint<T>& endpoint, fmt::format_context& ctx) { return fmt::format_to(ctx.out(), "[{}]:{}", endpoint.address().to_string(), endpoint.port()); } };
|
- 本文的代码都在这个项目里:https://github.com/Anthony-Dong/cpp