跳过正文

2025-01-24-CPP Net Standard & The Asio asynchronous model

·23425 字·47 分钟
作者
菜狗
Focus
目录

CPP Net Standard & The Asio asynchronous model
#

因为CPP的Sender & Receiver标准看的比较晚,所以实际上是先写的Asio的网络模型。

我这几天在看这篇论文《The Asio asynchronous model》,

我这几天看到有一些人在攻击陈硕的一些观点,甚至reactor proactor模式的观点。个人觉得是比较好笑的,因为空谈设计哲学是最没有意义的。但是有一点说的很对,send事件是需要关注的,因为send比方说100mb,不代表100mb能都发完

下面的内容,除了我把

  • 里面会穿插一部分官方文档(https://www.boost.org/doc/libs/1_87_0/doc/html/boost_asio/overview/core.html),官方文档会打散穿插到这篇论文里面,如果看到标题带有Plus或者++那就是官方文档里面的内容。关于Asio,我认为重点是理解executor,dispatcher,scheduler(dispatcher,这个词在不同的框架中可能有不同的含义。比如在Boost.Asio中,dispatcher可能指代分发事件或消息的机制。io_context既是executor,也负责调度处理程序的执行;scheduler通常与任务调度相关,决定任务执行的顺序、优先级等。)概念理解之后,我们就回去看看源代码,看看背后的设计哲学
  • 一些临时想到的想法,都写为了引用

The Asio asynchronous model
#

阅读完了Asio的异步模型之后,可以看看Asio的作者的视频,对应文字翻译请参考这个https://zhuanlan.zhihu.com/p/662972177

1 引论
#

网络领域长期以来一直采用事件驱动和异步编程设计来开发高效、可扩展、面向网络的软件。基于proactor模型的事件模型,其函数快可视为连续的异步操作,对理解和组合提供了一个很好的概念模型。异步操作可以被连接,每个连接都会启动下一个操作。细粒度的连接可以抽象为单个、更高级别的异步操作连接。 然而,随着异步组合越来越多,纯粹基于回调的方法会大大增加明显的代码复杂度和损害代码可读性。程序员转而使用机制,如状态机、纤程,C++20基于语言的协程,以提高代码清晰度,同时保留异步实现的好处。这里并没有什么银弹。 本文从高层次对Asio库核心的异步模型进行了总结。这种模型将异步操作作为异步组合的基本构建块,但并未使用组合机制。Asio中的异步操作支持回调、future(eager和laze模式)、fiber、协程和尚未想到的方法。从而应用程序程序员根据适当的权衡选择一种方法。

//代码,未粘贴

2 动机
#

2.1 同步形式作为灵感
#

最简单的网络程序采用 thread-per-connection 方法实现。这里我们来看看基本的 echo 服务器,下面是一个只用同步函数编写的Echo服务器:

这个程序的结构和流程很清晰,同步操作都是一个个的函数调用。这些函数带来了很多不错的的句法和语义属性,包括:

  • 组合可以使用该语言来管理控制流(即 for、if、while 等)。
  • 组合可以重构为使用在同一线程上运行的子函数(也就是直接调用子函数)而不改变功能。
  • 如果同步操作需要临时资源(例如内存、文件描述符或线程),此资源在从函数返回之前被释放。
  • 当同步操作是泛型的(就是模板)时,返回类型可以从函数及其参数确定性的推到出来
  • 要传递给同步操作的参数的生命周期是明确的,即使是传递临时变量。

然而,使用每个线程处理一个连接的方法有几个问题限制了其普遍的可用性。

2.2 线程的可扩展性有限
#

顾名思义,每个线程一个连接的设计使用单独的线程来处理每个连接。对于处理数千或数百万并发连接的服务器,这代表着程序占据巨量的资源使用,尽管近年来64位的广泛可用性操作系统缓解了这种情况。

从性能敏感的角度考虑,上下文切换的消耗可能更需要考虑在内。在通用操作系统线程之间进行上下文切换的成本以数千个CPU周期来衡量。当可运行线程数量超过执行资源(如 CPU)时,就会发生排队,而最后一个排队的任务会因多次上下文切换的成本而延迟。

// 差个图片

即使网络服务器看起来总体负载较轻,时间相关的事件仍然可能产生排队排队。例如,在金融市场中,所有参与者都在处理和响应相同的市场数据流,因此很可能不止一个参与者会通过向服务器发送交易来响应相同的刺激。这种排队增加了参与者所经历的平均延迟和抖动。

相比之下,专门为事件处理设计的调度程序可以在任务之间“上下文切换”速度提高一到两个数量级,在几十到几百个周期内就结束调度。这里排队可能仍然会发生,但处理相关的队列的总体开销大大减少。

最后,我们还必须注意到,我们的thread-per-connection回显服务器非常简单:线程一旦启动,就能够独立运行。在现实世界的用例中,服务器程序可能需要访问共享数据以响应客户端,处理同步成本、处理CPU之间的数据移动、并增加代码的复杂度。

2.3 半双工和全双工协议
#

每个线程一个连接的方法对于简单的协议可能比较试用,比方说上面所示的回显服务器,协议是一个半双工协议。服务器要么发送,要么接收,但绝不能同时处理发送和接受。

然而,许多现实世界的应用协议是全双工的,这意味着数据可以在任何时候的任何一个方向传输。考虑一些基于FIX协议的消息:

//插入一些图片

如你所见,像这样的协议需要响应来自许多不同来源的事件。这里面隐藏几个含义:

  • 协议逻辑中不同部分,它们可能并发执行(concurrent的含义请自行理解),可能需要访问共享的状态。
  • 复杂事件处理流程可能不容易以线性形式表示(例如基于thread-per-connection机制设计,再比方说使用协程时流程)。

因此,我们经常发现这些协议的作者利用其他组合机制,例如状态机,作为管理复杂性和确保正确性的一种方式(状态机能简化理解和设计难度,对于状态机的应用理解有困难的同学,可以去看TCP状态机和TLS状态机,在RFC后面都有图片看。

  • Reactor模式的核心事件循环非阻塞I/O事件的分发机制,通过解耦事件监听与处理实现高效并发。

Reactor和Proactor模式的区别并不体现在全双工和半双工的区别上,上面实际上想说的是Reactor模式处理一个复杂的协议时,要使用一些非常麻烦(可以理解为不好懂)的实现方法,比方说状态机来确保正确实现了协议。

2.4 快速执行关乎性能表现
#

一些网络应用程序需要向许多消费者传递一条消息。比方说量化交易需要向所有参与者实时传播市场数据。异步地传递此信息时,一种常见的方法是将消息包装在引用计数指针中(例如shared_ptr)保持内存有效,直到它被传输到所有人再将之释放。

然而,出于效率原因,任何一个传输操作中都尝试投机性发送。符合期望的情况,从统计来说大概率确实如此(前提是确保未超出硬件和软件的负载能力),即投机发送数据成功并立刻传输数据。这种情况下,则无需再维护有效的共享指针。这避免了维护引用计数的开销。

相比较而言,原子计数计算成本可衡量为几十个CPU周期,与以数百CPU周期为基本耗时使用系统调用传输数据的操作相比。避免原子计数的额外成本在实践中可以获得5-10%的收益。(可能是金融交易领域会比较关注,对于普通工程从业者,还好)。

惰性执行模型(lazy execution model)无法避免这种成本,因为它必须在操作第一次时复制共享数据调用。

这段话到底是想表明什么呢?shared_ptr引用计数带来的成本往往优化很麻烦。boost自己实现了一种buffer。

除此之外boost给了一种intrusive_ptr,https://www.boost.org/doc/libs/1_88_0/libs/smart_ptr/doc/html/smart_ptr.html#intrusive_ptr 。其实现的方案就是将控制块和对象嵌入到一起。

实际上针对shared_ptr带来的性能问题,往往就是几种常见的手段

  • 还是shared_ptr,但是减少原子操作的同步开销
  • intrusive_ptr,将引用计数嵌入对象内部,消除控制块的内存开销和间接访问
  • 完全放弃引用计数,在极端低延迟场景中,放弃智能指针,改用手动或作用域管理

2.5 设计哲学
#

上述问题激发了以下异步模型的设计理念:

  • 异步模型需要灵活支持组合机制,因为具体的选择因人(用例)而异。

  • 尽可能多的支持同步操作的语义和句法属性,因为它们可以更简单地组合和抽象。

    这里实际上是想说,支持所谓的状态机,支持所谓的组合,支持所谓的协程。来简化代码开发的难度

  • 应用程序代码应该在很大程度上避免线程和同步的复杂性,因为线程 & 同步会带来自不同来源的事件的复杂性。

    这里实际上是想说,要把事件分发机制隐藏。和下面的CPP 标准相比,这里隐藏了dispatcher,而scheduler是CPP标准的重点

2 Plus
#

https://www.boost.org/doc/libs/1_87_0/doc/html/boost_asio/overview/core/async.html

很明显Asio选择的是Proactor模式,和同步或者Reactor模式相比,Asio的Proactor模式的优缺点都很明显。当然Asio也支持Reactor模式,可以嵌入第三方库,具体不翻译了,参考https://www.boost.org/doc/libs/1_87_0/doc/html/boost_asio/overview/core/reactor.html

先看看Asoi的实现当中有哪些东西。

Proactor模式的关键词(改编自[POSA2])

  • 异步操作:定义一个异步执行的操作,例如对套接字的异步读或写。
  • 异步操作处理器(Asynchronous Operation Processor):执行异步操作,并在操作完成时将事件排入完成事件队列。从高层次来看,像reactive_socket_service这样的内部服务是异步操作处理器。
  • 完成事件队列 缓冲完成事件,直到被异步事件解复用器出列。
  • completion handler 处理异步操作的结果。这些是函数对象,通常使用boost::bind创建(当然我个人有的时候用std::bind)。
  • 异步事件解复用器(Asynchronous Event Demultiplexer)阻塞等待完成事件队列上的事件发生,并将已完成的事件返回给调用者。
  • Proactor 调用异步事件解复用器出列事件,并分派与事件相关的完成处理程序(即调用函数对象)。这个抽象由io_context类表示。
  • Initiator 特定于应用程序的代码,启动异步操作。发起者通过像basic_stream_socket这样的高层次接口与异步操作处理器交互,而后者又委托给像reactive_socket_service这样的服务。

Asio的Proactor模式实际上也是在Reactor模式上实现的

  • 异步操作处理器(Asynchronous Operation Processor)使用select、epoll或kqueue实现的Reactor。当Reactor指示资源准备好执行操作时,处理器执行异步操作,并将相关的completion handler排入完成事件队列。
  • 完成事件队列 完成处理程序(即函数对象)的链表。
  • 异步事件解复用器 这是通过等待事件或条件变量来实现的,直到完成事件队列中有可用的完成处理程序。

  • 可移植性:许多操作系统都提供了原生异步 I/O API(例如Windows上就支持这种),作为开发高性能网络应用程序的首选。这个库可以基于原生异步 I/O 来实现。然而,如果原生支持不可用,这个库也可以使用典型的reactor模式的同步事件解复用器来实现,例如POSIX的select()。
  • 将线程与并发解耦:需要一直跑的complete handler丢在队列里面,由executor执行。因此,应用程序不需要生成许多线程来提高并发性。
  • 性能和可扩展性:诸如每个连接一个线程(只有纯同步通信的情况下会需要这种实现)这样的实现策略会降低系统性能,因为会增加 CPU 之间的上下文切换、同步和数据移动。使用异步操作,可以通过最小化操作系统线程的数量(线程往往是有限的资源),并且只在有活干的时候,再让线程进入运行状态,来避免上下文切换的成本。
  • 简化应用程序同步:异步操作completion handler写起来和单线程同步通信下的代码差不多,因此应用程序逻辑可以在很少或不考虑同步问题的情况下开发。而不是一堆回调飞来飞去
  • 函数组合:函数组合是指completion handler组合以提供更高级别的操作,例如以特定格式发送消息。每个函数都是通过对较低级别读或写操作的多次调用来实现的。 例如,考虑一种协议,其中每个消息由固定长度的头部和可变长度的主体组成,主体的长度在头部中指定。一个假设的 read_message 操作可以使用两个较低级别的读取来实现,第一次读取接收头部,一旦知道长度,第二次读取接收主体。 在异步模型中组合函数时,异步操作可以链接在一起。也就是说,一个操作的完成处理程序可以启动下一个操作。可以封装链中的第一个调用,以便调用者不必知道高级操作是作为异步操作链实现的。 以这种方式组合新操作的能力简化了支持特定协议的时候开发更高层次抽象的流程。

3 模型
#

3.1 异步操作(Operation)
#

请注意,这里我将(completion handler)翻译为completion handler,我之所没翻译为句柄,是因为句柄就有点类似鲁棒,属于你懂了才会理解,不懂根本就不理解的词语

异步操作是Asio异步模型中的基本组合单元。异步操作代表在后台启动和执行的工作,而用户的代码发起的工作可以继续做其他事情。

从概念上讲,异步操作的生命周期可以用以下事件和阶段来描述。

//需要插入一张图片

  • 初始化函数是用户调用以启动异步操作,进行初始化操作的函数。(initiating function这玩意你就当成是什么async_read之类的就行了)
  • completion handler是用户提供的、可移动(move-only)的函数对象,最多被调用一次,通知异步操作的结果。completion handler的调用用于通知用户一些事情已经发生了:操作完成,操作的副作用产生了。

初始化函数和completion handler被嵌入到用户的代码中,用法如下所示:

// 需要插入一张图片

同步操作的表现形式为单个函数,具有几个固有的语义属性。异步操作从这些同步操作选取一部分语意以便于灵活高效的组合。(这句话我觉得主要是针对设计哲学第二条说的,是想说Aiso的一步操作采用了同步操作的相同语意)

同步操作属性异步操作相同的语意
当同步操作是泛型的(即模板,其返回类型可由派生自函数及其参数确定当异步操作是泛型时,completion handler的参数类型和顺序可确定地从初始化函数函数及其参数推导。
如果同步操作需要临时资源(例如内存、文件描述符或线程),此资源在函数返回之前被释放。如果异步操作需要临时资源(例如内存、文件描述符或线程),此资源在调用completion handler之前就被释放掉。

第二个语意是异步操作的一个重要属性,因为它允许completion handler在不重叠资源使用的情况下启动进一步的异步操作。想想下面这种琐碎的(也是相对常见的)的情况,在异步操作链中一遍又一遍地重复相同操作:

通过确保在completion handler运行之前释放资源,我们避免了峰值翻倍运营链的资源使用。(这句话我也没看懂)

这里多插一句,Proactor & Reqctor模式是两种传统网络编程模式,这里Boost选择的是Proactor模式。因此这里需要考虑

3.2 异步Agent
#

asio_asynchronous_agent

异步Agent是异步操作的顺序组合。每个异步操作被认为是作为异步Agent的一部分运行,即使该Agent仅包含该单个操作。异步Agent可以与其他异步Agent同时执行工作。异步Agent之于异步操作就像线程之于同步操作一样。

然而,异步Agent是一个纯粹的概念结构,它允许我们理解异步操作上下文如何组织,异步操作如何组合。库中没有“异步Agent”这个名词 Agent如何组织异步操作也不重要。我们可以视异步Agent工作流程如下

// 插入图片

异步Agent交替地等待异步操作完成,然后运行该操作的completion handler。在异步Agent的上下文中,这些completion handler表示不可分割的可调度工作。

3.3 关联特性和关联器
#

这部分请注意,关联特性是Associated characteristics,关联器是associators

这部分的概念出现的似乎很突然,实则不然。这东西实际上简化了用户代码,比方说使用了某种executor(比方说,这里用了strand),然后异步操作不断传播,怎么样继续使用这种executor呢,我自己再不断地传递template?

ASIO中的 Associated Characteristics(关联特性)和 Associator Traits(关联器特征)是用于隐式传递异步操作的上下文信息定制异步操作行为的底层机制。它们的设计目的是在异步操作链中自动传递某些关键属性(如执行器、分配器、取消状态等),从而简化用户代码并提高灵活性。就是

  • Associated Characteristics 避免在异步链中重复传递上下文(如执行器)

  • Associator traits允许自定义特性的提取逻辑。可以associated_executor,可以associated_allocator,可以associated_cancellation_slot

帮助库自动推断和传递这些属性。例如,当用户编写一个完成处理程序(completion handler)时,ASIO可能需要确定该处理程序应该使用哪个执行器,这时候associator traits会起作用,自动关联正确的执行器,而无需用户显式指定。

关联特性指的是组合为异步Agent工作流程中的一部分时,异步操作(asynchronous operations)应当如何表现(就是问how),例如(下面这几个都是一种关联特性):

  • 一个分配器,指明异步操作如何获取内存资源
  • 取消槽(cancellation slot),指明异步操作取消时采取什么行为
  • 一个执行器(executor),它指明Agent的completion handler(completion handlers)将如何排队和运行。

异步操作在异步Agent的执行流程中运行,每个异步操作都需要使用某些关联特性(可以类比为满足golang里面的接口)。异步操作或者说initing function调用的的completion handler需要满足相应的关联器特征,异步操作会查询某个类型(如 Handler)是否关联了特定特性(实际上是C++ traits)。每种关联特性(characteristic)实际上都具有相应的关联器特征(traits)。

说的好像都不是人话了,说白了就是每个异步操作调用

具体的completion handler的关联器特征(associator trait)需要给出下列的信息:

  • 接受异步操作提供的默认关联特性(characteristic),按原样返回此默认值
  • 返回和关联特性(characteristic)不相关的具体实现,或
  • 调整提供的默认特征(characteristic)以引入completion handler所需的其他行为。

如何定义关联器(associator)

给定一个名为associated_R的关联器特征,它应当具有:

  • 默认必须有的S类型的s,用于定义completion handler,定义completion handler类型,
  • 定义关联特性的语法(可以理解为函数原型)和语义要求(可以理解为语法上层做什么)的一组类型要求(或C++ concept),称之为R,以及
  • 满足上面要求R的C类型候选值c,由异步操作提供的,代表默认提供的满足关联特性的实现

异步操作真正计算时使用下述关联器特征:

  • 类型的定义,associated_R<S, C>::type
  • 实际实现(值)associated_R<S, C>::get(s,c)

上面两个满足R中定义的要求。为方便起见,这些也可以通过类型别名访问associated_R_t<S, C>或调用可能变化的函数get_associated_R(s,c)。

关联器特征的模板应当定义为:

  • 如果S::R_type格式良好,定义一个嵌套类型的别名为S::R_type和静态成员函数用于get此类型s.get_R()
  • 其他情况,如果关联器<associated_R, S,C>::type已经定义清晰,直接继承关联器<associated_R, S,C>即可
  • 其他情况,将嵌套类型别名定义为C,再定义一个静态成员函数get返回c。

3.4 子Agent
#

这段的子Agent和父Agent的关系我实际上感觉读起来乖乖的,总觉得是论文有问题

异步Agent中的异步操作可以组合为子Agent(在Aiso里面,异步操作也被称为组合操作)。就父(异步)Agent而言,它在等待最终异步操作的完成。构成子Agent的异步操作依次运行,子Agent执行完成,最后的completion handler运行时父Agent才继续运行。

与单个异步操作一样,构成子Agent构建的异步操作必须在调用completion handler之前释放临时资源。我们可以认为这些子Agent的生命周期在调用completion handler之前结束。 当异步操作创建子Agent时,它可能会传播父Agent的特征到子Agent,然后递归传播这些相关特征。这些传递的特征复制了同步操作的另一种属性。

同步操作异步操作
同步操作的组合可以重构为在同一个线程使用相同的函数(即简单地调用)而不改变功能异步Agent可以重构为共享父Agent关联特性的异步操作和子Agent,而无需改变功能。

最终,这些异步操作可以实现为并发运行的多个子Agent。在这种情况下,异步操作可以选择选择性地传播父Agent的关联特性。

3.5 Executors
#

这里请注意,不能将executor认为是某个线程,或者执行者。它是一种抽象的概念

每个异步Agent都有一个关联的执行器。Agent的执行器决定Agent的completion handler如何排队并最终运行。

执行器的例子包括:

  • 协调一组操作共享数据结构的异步Agent,确保Agent的completion handler永远不会同时运行(Aiso里面这种类型的执行器被称为strand)。
  • 确保Agent在靠近数据或数据的指定执行资源(例如CPU)上运行事件源(例如NIC)。
  • 表示一组相关Agent,从而启用动态线程池以进行更智能的调度决策(例如将Agent作为一个单元在执行资源之间移动)。
  • 将所有completion handler排队以在GUI应用程序线程上运行,以便它们可以安全地更新用户交互界面元素。
  • 返回异步操作的默认执行程序,尽可能快地运行completion handler在触发操作完成的事件之后。
  • 调整异步操作的默认执行器,以便在每个之前和之后运行诸如日志记录、用户授权或异常处理等行为
  • 指定异步Agent及其completion handler的优先级。

异步Agent中的异步操作使用Agent的关联执行器来:

  • 在异步操作未完成时,记录异步操作表示的工作的存在。
  • 在操作完成时,确保completion handler进入执行队列。
  • 确保completion handler不会重新运行,如果这样做可能导致疏忽递归和堆栈溢出。

因此,异步Agent的关联执行器表示Agent应该以何种方式、地点和时间的策略运行,是实际组成Agent的代码的横切关注点。

这段我觉得读起来也有一些拗口,我目前看到asio的执行器(也包括执行器上下文)就是io_context(执行器上下文),thread_pool等就是单纯的供线程调用的对象,线程在这个上下文上调用boost::asio::io_context::run() 执行相关的任务。如果没有任务,那么boost::asio::io_context::run() 就会直接返回

3.6 Allocators
#

每个异步Agent都有一个关联的分配器。Agent的分配器是组成Agent的异步操作用以获取每个操作的稳定内存资源(POSM)使用的接口。这个名字反映了这样一个事实:内存是per操作的,因为内存仅在该操作的生命周期内保留;并且保证内存在整个异步操作过程中始终可用。

异步操作可以通过多种不同方式利用POSM:

  • 该异步操作不需要任何POSM。例如,该操作包裹了一个现有的API执行自己的内存管理,或者将一些数据拷贝进环形队列。
  • 异步操作未完成时,只使用单个固定大小的POSM。例如,将某些状态存储在链表中。
  • 该异步操作使用单个,运行时确定大小的POSM。例如,异步操作存储用户提供的缓冲区的副本,或运行时确定大小的iovec结构数组。
  • 该操作同时使用多个POSM。例如,链表调用固定大小POSM外加一个用于缓冲区的运行时大小的POSM。
  • 操作串行使用多个POSM,大小可能会有所不同

POSM优化是组合异步操作的横切关注点(横切关注点是指在多种模块或组件中重复出现的功能或操作)。此外,使用分配器作为接口来获取POSM授予保证异步操作的实现和调用方的灵活性:

  • 代码使用方可以忽略分配器的存在并接受应用程序采用的任何默认策略。
  • 代码实现者可以忽略分配器,尤其是在操作不认为是性能的关键点的前提下。
  • 用户可以为相关的异步操作共同定位POSM,以获得更好剧不行。
  • 对于串行使用不同大小的POSM的组合,内存使用只需调用当前现存的POSM即可。例如,考虑一个短期异步操作的组合使用内存需求大POSM(连接建立和握手),然后进行长寿命异步操作它使用小型POSM(在对等点之间传输数据,存储这些数据)。

如前所述,在调用completion handler之前必须释放所有资源,从而为Agent内其他的后续异步操作回收内存。这允许应用程序即使有长寿命异步Agent,也不会热路径(hot-path,“hot path”是指程序或系统中执行频率非常高的代码路径)内存分配,而用户代码并知道关联的分配器存在。

3.7 Cancellation
#

在Asio中,许多对象,例如套接字和计时器,都支持通过它们的关闭或取消成员函数来取消和当前套接字或者计时器相关的未完成的异步操作。但是,某些异步操作还支持单独的、有针对性的取消。这种操作取消的方式通过指定异步Agent关联的取消槽实现。

为了支持取消,异步操作将取消handler安装到Agent的插槽中。取消handler是一个函数对象,当用户发出取消信号时将调用它进入插槽。由于取消槽与单个Agent相关联,因此该槽最多可容纳一个处理程序时间,安装新的取消handler将覆盖任何以前安装的处理程序。因此,相同的插槽可重用于Agent内的后续异步操作(后面的取消handler直接插进去覆盖就完事了)。

当异步操作包含多个子Agent时,取消特别有用。例如,一个子Agent可能已完成,另一个随后的子Agent立刻被取消,子Agent不会造成任何后续的副作用。

3.8 Completion tokens
#

// 图片

Asio异步模型的一个关键目标是支持多种组合机制。用户通过将完成令牌( Completion token)传递给异步操作的初始化函数来调用。按照惯例,完成令牌是异步的操作初始化函数的最后一个参数。

这段可能读起来有些拗口,实际上参考https://www.boost.org/doc/libs/1_87_0/doc/html/boost_asio/tutorial/tuttimer2.html的解释,a completion token, which determines how the result will be delivered to a completion handler when an asynchronous operation completes.

如果觉得看起来还是有点难以理解,可以理解为completion token是对completion handler的一个包裹。在asio初期,initiating function(比方说async_read)的参数必须是completion handler,而initiating function(比方说async_read)的返回值必须是void(也就是说completion handler对ininting function返回值并不关心)。14-15年,asio的作者Christopher Kohlhoff这时候改为initiating function接受completion token。为什么非得用completion token呢?实际上不是非得用completion token,而是为了增加代码的灵活性,让ininting function可以做更多的事情:换言之initiating function可以有返回值,可以嵌套更多的操作。

什么意思呢?你可以在completion token里面传递use_future,而initiating function(比方说async_read)会返回一个future。这个future你可以用来等待做些事情。实际上asio还支持了诸如协程之类的方法

当然,这还满足兼容性的要求,如果你原本实现,不处理initing function的completion handler实际上不需要改变。因为它实际上还是让initiating function返回为void。

完成令牌的实例参考https://www.boost.org/doc/libs/1_87_0/doc/html/boost_asio/tutorial/tuttimer3.html,如何将一些参数绑定传递至completion handler:使用bind将一些参数绑定为function对象,再将包裹之后的function传递为completion token,这里注意一下需要调用的是boost::bind(),如果调用的是std::bind,那么就需要传递boost::asio::placeholders::error,比方说https://www.boost.org/doc/libs/1_87_0/doc/html/boost_asio/tutorial/tutdaytime3.html 的

void start_accept()
{
 tcp_connection::pointer new_connection =
   tcp_connection::create(io_context_);

 acceptor_.async_accept(new_connection->socket(),
     std::bind(&tcp_server::handle_accept, this, new_connection,
       boost::asio::placeholders::error));
}

例如,如果用户传递一个lambda(或其他函数对象)作为完成令牌,异步操作行为如前所述:操作启动,当操作完成时,结果传递给lambda表达式。

当用户将use_future作为完成令牌调用(初始化函数)时,该操作的行为就像是调用promise和future一样。初始化函数并不只启动异步操作,还会返回一个future用于等待结果。

future<size_t> f =
	socket.async_read_some(
		buffer, use_future
	);
// ...
size_t n = f.get();

类似地,当用户传递use_awaitable为完成令牌,初始化函数表现的好像它是一个awaitable-based协程。然而,在这种情况下,初始化函数不会启动异步操作。它只返回awaitable对象,只有在进入co_await-ed之后,才启动操作。

awaitable<void> foo()
{
	size_t n =
 	co_await socket.async_read_some(
 		buffer, use_awaitable
 	);
 // ...
}

最后一种,将纤程(Fiber)的yield作为完成令牌传递,看起来似乎初始化函数能感知到纤程的同步操作:除了开始异步操作,还会阻塞纤程,直到它完成。对纤程而言,这是一个同步操作。

void foo()
{
 	size_t n = socket.async_read_some(
 	buffer, fibers::yield
 );
 // ...
}

初始化函数,就是这里的async_read_some,其实现需要支持上述所有这些用途。

为了实现这一点,异步操作必须首先指定一个完成签名(completion signature )(或者简写为签名),该签名描述了将传递给completion handler的参数。(注意这里是异步操作指定完成签名)

然后,异步操作的初始化函数获取完成签名、完成令牌及其内部实现,并将它们传递给 async_result 关联器特征(trait)。async_result 特征是一个自定义点,它结合这些参数先生成一个具体的completion handler(concrete completion handler),然后启动操作。

这个下面的例子里面,completion signature是 void(error_code, size_t)。

asio_completion_token

听上去似乎不可思议,现在让我们实践试试,使用一个分离的线程将同步操作调整为异步操作

template <class CompletionToken>
auto async_read_some(tcp::socket& s, const mutable_buffer& b, CompletionToken&& token)
{
 // 这段代码包含运行异步操作的代码,任何需要传递给async_result 特征的都得在里面给出
 auto init = [](
 	auto completion_handler,
 	tcp::socket* s,
 	const mutable_buffer& b)
 {
    // 启动一个新线程来执行操作
 	std::thread(
 		[](
 			auto completion_handler,
 			tcp::socket* s,
 			const mutable_buffer& b
 		  )
 		{
 		  error_code ec;
 		  size_t n = s->read_some(b, ec);
 		  std::move(completion_handler)(ec, n);
 		},
 		std::move(completion_handler),
 		s,
 		b
 		).detach();
 };
 return async_result<
 	decay_t<CompletionToken>,
 	void(error_code, size_t)
 >::initiate(
 	init,
 	std::forward<CompletionToken>(token),
 	&s,
 	b
  );
}

可以将完成令牌视为一种completion handler的原型。如果传递一个函数对象(如 lambda表达式)作为完成令牌,它已经满足completion handler的要求。async_result 主模板通过简单地将参数转发调用我们的“completion handler原型”来处理这种情况:

template <class CompletionToken, completion_signature... Signatures>
struct async_result
{
 template <
 	class Initiation,
 	completion_handler_for<Signatures...> CompletionHandler,
 	class... Args>
 static void initiate(
 	Initiation&& initiation,
 	CompletionHandler&& completion_handler,
 	Args&&... args)
 {
 	std::forward<Initiation>(initiation)(
 		std::forward<CompletionHandler>(completion_handler),
 		std::forward<Args>(args)...);
 }
};

我们可以在这里看到,这个默认实现避免了拷贝所有参数,从而确保尽快初始化来实现尽可能高效。 另一方面,惰性的完成令牌(如上面use_awaitable)可能会捕获这些参数来延迟异步操作的启动。例如,一个简单的延迟令牌的实现(把异步操作打包,用于稍后的操作)看起来可能像这样:

template <completion_signature... Signatures>
struct async_result<deferred_t, Signatures...>
{
 template <class Initiation, class... Args>
 static auto initiate(Initiation initiation, deferred_t, Args... args)
 {
 return [
 	initiation = std::move(initiation),
 	arg_pack = std::make_tuple(std::move(args)...)
 ](auto&& token) mutable
 {
 	return std::apply(
 		[&](auto&&... args)
 		{
 			return async_result<decay_t<decltype(token)>, Signatures...>::initiate(
 				std::move(initiation),
 				std::forward<decltype(token)>(token),
 				std::forward<decltype(args)>(args)...
 			);
 		},
 		std::move(arg_pack)
 		);
 	};
 }
};

3.9 Asio库元素有哪些
#

RTFM https://www.boost.org/doc/libs/1_87_0/doc/html/boost_asio/reference.html

  • completion_signature概念-定义有效的完成签名形式。

  • completion_handler_for概念-确定completion handler是否满足给定的完成签名。

  • async_result特征-将完成签名和完成令牌转换为具体的完成处理函数,并启动异步操作。

    这里多扯一句,翻译自https://www.boost.org/doc/libs/1_87_0/doc/html/boost_asio/overview/core/streams.html#boost_asio.overview.core.streams.why_eof_is_an_error

    Boost Asio提供面的向流的 I/O 模型的对象满足以下一种或多种类型要求:

    • SyncReadStream,在这里同步读取操作是通过一个名为read_some()的成员函数来执行的。
    • AsyncReadStream,其中异步读取操作是通过一个名为async_read_some()的成员函数执行的。
    • SyncWriteStream,在这里同步写入操作通过一个名为write_some()的成员函数来执行。
    • AsyncWriteStream,其中异步写操作是通过一个名为async_write_some()的成员函数来执行的。在这个函数的注释部分有提到The write operation may not transmit all of the data to the peer. Consider using the async_write function if you need to ensure that all data is written before the asynchronous operation completes.

    不过,比方说TCP这种一般是有流的概念的(换言之不确定边界),可能会发生还没读完就触发对应用层的通知。

    但程序通常希望传输确切数量的字节。当出现读取不足或写入不足的情况时,程序必须重新启动操作,并持续进行,直到传输所需数量的字节为止。Boost.Asio 提供了自动执行此操作的通用函数:read()、async_read()、write()和async_write()。所以这里注意,async_write只有两种情况会停止发送,一个是全部要写入的数据发送完了,要不就是写入数据出错了

    许多常用的互联网协议是基于行的,这意味着它们具有由字符序列"\r\n"分隔的协议元素。asio可以使用async_read_until来对应这种情况,只需要第三个参数指定终止字符,终止符可以指定为单个char、一个std::string或一个boost::regex。

  • async_initiate函数——帮助函数,以简化如何使用async_result特征。

  • associator trait ——在Agent这个抽象层里通过关联器传播,需要满足特定的trait。

  • associated_executor特征-定义异步Agent关联的执行器。

  • associated_executor_t 模板类型,是上面这个的别名

  • get_associated_executor函数

  • associated_allocator特征-定义异步Agent关联的分配器。

  • associated_allocator_t 模板类型,是上面这个的别名

  • get_associated_allocator 函数

  • associated_cancellation_slot 特征-定义异步Agent关联的取消槽。

  • associated_cancellation_slot_t 模板类型,是上面这个的别名

  • get_associated_cancellation_slot 函数

3.10 高层次抽象
#

本文提出的异步模型为定义更高级别的抽象提供了基础,但这些概念的定义实际超出了本文的范围。本文的范围仅限于指定用于高层次组合异步操作是什么。

然而,Asio库在此核心模型的基础上提供一些额外的组件,例如:

  • 基于此模型,暴露异步操作的IO对象,比方说套接字和计时器。

    我这个时候有个问题,就是这些IO对象对应ASIO的异步模型里面的什么?实际上啥也不对应。IO对象是构建异步Agent的“原子组件”,通过组合多个IO对象的异步操作,形成完整的业务逻辑。也就是说IO对象提供了我们组成ASIO异步模型的抓手。所以不需要疑惑这些玩意是啥

  • 具体执行器,比方说io_context执行器、thread_pool执行器,还有比方说用来保证completion handler不会并发执行的strand adapter。

    我个人觉得io_context是一个非常奇特的存在,它实际上是上下文,不是单纯的executor:比方说strand就可以通过io_context获得。参考https://www.boost.org/doc/libs/1_87_0/doc/html/boost_asio/tutorial/tuttimer5.html

    printer(boost::asio::io_context& io)
     : strand_(boost::asio::make_strand(io)),
       timer1_(io, boost::asio::chrono::seconds(1)),
       timer2_(io, boost::asio::chrono::seconds(1)),
       count_(0)
    {
    

    这里的概念建议参考Asio的官方文档,https://www.boost.org/doc/libs/1_87_0/doc/html/boost_asio/overview/basics.html

    one I/O execution context, such as an boost::asio::io_context object, boost::asio::thread_pool object, or boost::asio::system_context. This I/O execution context represents your program’s link to the operating system’s I/O services.

    关于Strand

    Strand被定义为事件处理程序的严格顺序调用(即没有并发调用)。链的使用允许在多线程程序中执行代码而无需显式锁定(例如使用互斥锁)。strand可以显式或者非显式的定义,比方说

    • 仅从一个线程调用 io_context::run()意味着所有事件处理程序在隐式执行链中执行,因为 io_context 保证处理程序仅从 run()内部被调用,毕竟只有一个线程,所以同时只有一个在运行
    • 如果与连接相关联的一系列异步操作只有单链条,(比方说在像 HTTP 这样的半双工协议实现中),不可能并发执行处理程序。这是一个隐式执行链。
    • 显式执行链是strand<>或io_context::strand的实例。所有事件处理函数对象都需要使用boost::asio::bind_executor()绑定到执行链,或者通过执行链对象进行发布/调度。

    在组合异步操作的情况下,例如async_read()或async_read_until(),如果一个completion handler通过一个 strand,那么所有其他处于中间态的completion handler也应该通过相同的 strand。这是为了确保对调用者和组合操作(比方说async_read(),调用者可以close()以取消操作,同时async_read又不会出现和当前操作出现冲突)之间共享的任何对象(比方说Socket)进行线程安全访问。

    使用get_associated_executor来获取对应的executor

    boost::asio::associated_executor_t<Handler> a = boost::asio::get_associated_executor(h);
    

    如果定义这种类型的executor需要定义嵌套类型executor_type和成员函数get_executor()针对特定的处理程序类型进行定制:

  • 促进不同组合机制的完成令牌,如协程、纤程、future和deferred操作。

    多扯一句,想调试的话,可以定义BOOST_ASIO_ENABLE_HANDLER_TRACKING。具体参考https://www.boost.org/doc/libs/1_87_0/doc/html/boost_asio/overview/core/handler_tracking.html,就不多赘述了

  • 对C++协程的高层次支持,将执行器和取消槽结合到一起,以便于协调并发异步Agent的coordination。

4 例子
#

我建议这部分好好读一读,理解一下,尤其是里面的逻辑或和逻辑与运算符,非常的牛逼,非常的好

关于Asio对于C++ 20协程的支持,有这个网页https://think-async.com/Asio/asio-1.22.0/doc/asio/overview/core/cpp20_coroutines.html

Asio对于C++ 20的协程支持通过 awaitable 类模板, use_awaitable completion token, co_spawn()

co_spawn接受三个参数:

  • executor that determines the context in which the coroutine is permitted to execute
  • awaitable<R>, that is the result of the coroutine’s entry point function

如果觉得使用抛异常的方式不好,可以使用 experimental::as_tuple or redirect_error completion token adapters。就可以这样子调用代码

std::tuple<asio::error_code, std::size_t> result =
      co_await socket.async_read_some(...)

关于Asio的协程,有个好玩的地方。

逻辑运算符 || 和 && 已针对 awaitable<> 进行了重载,以便协程可以轻松并行。

当使用 && 进行等待时,co_await 表达式会等待两个操作都成功完成。作为一种“短路”求值,如果一个操作因异常而失败,另一个操作将立即被取消。对于这种组合起来的多个co_await表达式,需要使用tuple来获取每个的结果

std::tuple<std::size_t, std::size_t> results =
  co_await (
    async_read(socket, input_buffer, use_awaitable)
      && async_write(socket, output_buffer, use_awaitable)
  );

当使用 || 进行等待时,co_await 表达式会等待,直到其中一个操作成功。作为一种“短路”求值,如果一个操作成功且未抛出异常,另一个操作将立即被取消。这种的结果需要使用variant获取结果

std::variant<std::size_t, std::monostate> results =
  co_await (
    async_read(socket, input_buffer, use_awaitable)
      || timer.async_wait(use_awaitable)
  );

使用的时候需要加入这么几句话

#include <asio/experimental/awaitable_operators.hpp>
using namespace asio::experimental::awaitable_operators;

第一个例子是每个连接独占一个线程的echo server,和asio基于协程的代码实现

#include <asio.hpp>
using asio::buffer;
using asio::ip::tcp;
void echo(tcp::socket s)
{
	try
 	{
 		char data[1024];
 		for (;;)
 		{
 			std::size_t n =
 				s.read_some(
 					buffer(data)
 				);
 				write(
 					s,
 					buffer(data, n)
 				);
 		}
 	}
 	catch (const std::exception& e)
 	{
 	}
}
void listen(tcp::acceptor a)
{
 	for (;;)
 	{
		std::thread(
 			echo,
 			a.accept()
 		).detach();
 	}
}

int main()
{
	asio::io_context ctx;
 	listen({ctx, {tcp::v4(), 55555}});
}
#include <asio.hpp>
using asio::awaitable;
using asio::buffer;
using asio::detached;
using asio::ip::tcp;
using asio::use_awaitable;
awaitable<void> echo(tcp::socket s)
{
 	try
 	{
 		char data[1024];
 		for (;;)
 		{
 		std::size_t n =
 			co_await s.async_read_some(
 				buffer(data),
 				use_awaitable
 			);
 			co_await async_write(
 				s,
 				buffer(data, n),
 				use_awaitable
 			);
 		}
 	}
 	catch (const std::exception& e)
 	{
 	}
}

awaitable<void> listen(tcp::acceptor a)
{
 	for (;;)
 	{
 		co_spawn(
 			a.get_executor(),
 			echo(co_await a.async_accept(use_awaitable)),
 			detached
 		);
 	}
}

int main()
{
     asio::io_context ctx;
     co_spawn(
        ctx,
        listen({ctx, {tcp::v4(), 55555}}),
        detached
     );
     ctx.run();
}

下一个示例是来自简单TCP套接字代理的片段。它演示了协程如何组合取消一方面优雅的协调各种异步Agent,另一部分支持高效的取消超时

constexpr auto use_nothrow_awaitable = as_tuple(use_awaitable);
awaitable<void> transfer(tcp::socket& from, tcp::socket& to, steady_clock::time_point& deadline)
{
     std::array<char, 1024> data;
     for (;;)
     {
         deadline = std::max(deadline, steady_clock::now() + 5s);
         auto [e1, n1] = co_await from.async_read_some(buffer(data), use_nothrow_awaitable);
         if (e1)
         co_return;
         auto [e2, n2] = co_await async_write(to, buffer(data, n1), use_nothrow_awaitable);
         if (e2)
         co_return;
     }
}
awaitable<void> watchdog(steady_clock::time_point& deadline)
{
     steady_timer timer(co_await this_coro::executor);
     auto now = steady_clock::now();
     while (deadline > now)
     {
         timer.expires_at(deadline);
         co_await timer.async_wait(use_nothrow_awaitable);
         now = steady_clock::now();
     }
}
awaitable<void> proxy(tcp::socket client, tcp::endpoint target)
{
     tcp::socket server(client.get_executor());
     steady_clock::time_point deadline{};
     auto [e] = co_await server.async_connect(target, use_nothrow_awaitable);
     if (!e)
     {
         co_await (
             transfer(client, server, deadline) ||
             transfer(server, client, deadline) ||
             watchdog(deadline)
         );
     }
}
awaitable<void> listen(tcp::acceptor& acceptor, tcp::endpoint target)
{
     for (;;)
     {
         auto [e, client] = co_await acceptor.async_accept(use_nothrow_awaitable);
         if (e)
         	break;
         auto ex = client.get_executor();
         co_spawn(ex, proxy(std::move(client), target), detached);
     }
}

最后,有一段代码片段展示了按名称连接的实现。这个基于协程的算法尝试并行连接多个主机。一旦其中一个连接成功,其余操作将自动取消。

tcp::socket selected(std::variant<tcp::socket, tcp::socket> v)
{
     switch (v.index())
     {
         case 0:
         	return std::move(std::get<0>(v));
         case 1:
         	return std::move(std::get<1>(v));
         default:
         	throw std::logic_error(__func__);
     }
}
awaitable<tcp::socket> connect(ip::tcp::endpoint ep)
{
     auto sock = tcp::socket(co_await this_coro::executor);
     co_await sock.async_connect(ep, use_awaitable);
     co_return std::move(sock);
}
awaitable<tcp::socket> connect_range(
     tcp::resolver::results_type::const_iterator first,
     tcp::resolver::results_type::const_iterator last)
{
     assert(first != last);
     auto next = std::next(first);
     if (next == last)
     	co_return co_await connect(first->endpoint());
     else
     	co_return selected(co_await(connect(first->endpoint()) || connect_range(next, last)));
}
awaitable<tcp::socket> connect_by_name(std::string host, std::string service)
{
     auto resolver = tcp::resolver(co_await this_coro::executor);
     auto results = co_await resolver.async_resolve(host, service, use_awaitable);
     co_return co_await connect_range(results.begin(), results.end());
}

5
#

Folly 的异步模式
#

CPP Net Standard Sender & Receiver
#

草案是这个https://wg21.link/P2300R10。我建议有能力还是直接阅读草案吧。下面的很多内容我都只会节选一部分,所以序号看起来是跳跃的。如果看起来有一些困难,我觉得直接阅读https://www.bilibili.com/video/BV1WrRsYWEjX,更为合适

1.1 For What?
#

如今,C++软件越来越多地采用异步和并行方式,这种趋势很可能会持续下去。异步和并行性无处不在,从处理器硬件接口到网络、文件 I/O、图形用户界面再到加速器。每个 C++领域和每个平台都需要处理异步和并行性,从科学计算到视频游戏再到金融服务,从最小的移动设备到你的笔记本电脑再到世界上最快的超级计算机中的图形处理器。

虽然 C++标准库有丰富的并发原语(std::atomic、std::mutex、std::counting_semaphore 等)和较低级别的构建块(std::thread 等),但我们缺乏 C++程序员迫切需要的异步和并行的标准词汇和框架。std::async/std::future/std::promise,作为 C++11 中用于异步的预期特性,效率低下、难以正确使用,并且通用性严重不足,使其在许多情况下无法使用。我们在 C++17 中向 C++标准库引入了并行算法,虽然它们是一个很好的开始,但它们本质上都是同步的,并且不可组合。

本文提出了一个基于三个关键抽象(调度器、发送器和接收器)和一组可定制异步算法的标准 C++异步模型。

1.2 前置
#

  • 具有可组合性和通用性,允许用户编写可用于许多不同类型执行资源的代码。
  • 将常见的异步模式封装在可定制和可重用的算法中,这样用户就不必自己去发明。
  • 通过构造使正确性更容易实现(我没看懂这句话啥意思)
  • 支持执行资源和执行Agent的多样性,因为并非所有执行Agent都是平等创建的;有些能力较弱,但并非不重要。
  • 允许执行资源对所有内容进行定制,包括转移到其他执行资源,但不要求执行资源定制所有内容。——关注所有合理的用例、领域和平台。——错误必须传播,但错误处理不能成为负担。——支持取消,这不是错误。——对事物在哪里执行有清晰简洁的答案。——能够异步管理和终止对象的生命周期。

如何理解三种概念?

  • Sender == lazy value to be compute。核心就是sender,一边传递着sender
  • Receiver == continuation or callback
  • Scheduler == handle to compute resource

CPP Standard & Asio 对比
#

CPP Coroutines
#

如果希望看看简单好理解的教学,看这个

协程的概念基本每个语言都有,C++的稍微需要多说几句:

  • C++协程是一种可以暂停执行以便稍后恢复的函数(实际上就这一句话)。协程是无栈的:它们通过返回给调用者来暂停执行,而恢复执行所需的数据则与栈分开存储。这使得顺序代码能够异步执行(例如,在无需显式回调的情况下处理非阻塞I/O),并且还支持对延迟计算的无限序列等的算法和其他用途。
  • 用户用的时候需要自己实现promise类型 & 包裹promise类型的返回值(这个一般还得有个能返回awaitable的co_await operator重载)。

还有另外一点需要注意,coroutine是会在执行完成后流程继续,但是当返回时是否保证执行完成是未必的(我这里主要想说的是网络里面的send事件,不代表全部发送完成),所以如果要求某个执行完成可能需要在外面再包个while循环啥的

判断C++的协程说白了就三种表达式

  • co_await表达式——暂停执行,直到恢复执行。
  • co_yield表达式——暂停执行并返回一个值。
  • co_return语句 —— 完成执行并返回一个值。

这里注意,协程必须是明确的,不能使用可变参数、普通的返回语句或占位符返回类型(auto 或概念)。立即求值函数、常量表达式函数、构造函数、析构函数以及主函数不能是协程。

协程的原理
#

记这三个东西实际上用处不大,重点是理解这东西存在的意义。C++把coroutine frame重命名为coroutine state,而且在这三个玩意后面隐藏了太多的东西,导致阅读起来非常费劲。

C++用户侧协程需要什么:

  • Wrapper_t, wrapper_t存储着要和外界交互的数据
  • promise_type
  • awaitbale type for co_await

Task是执行的任务,没有返回值。

Generator生成内容,并返回结果。

简单来说就是coroutine frame存储着用户用到,用不到的东西。这里的东西包括promise对象

  • promise对象,每个协程都与promise 对象相关联,协程内部对该对象进行操作以提交其结果或异常。这个promise与std::promise毫无关系。一般我们实现的时候,这个东西都自己写,是某个类型的子类型。而最外层的子类型,既不是协程句柄(handle)也不是协程状态(coroutine state)本身,而是一个协程的返回值包装器,它的核心作用是让协程的调用者能够与协程交互(例如等待结果、恢复执行、检查状态等)。
  • 协程句柄,在协程外部使用“这个非拥有句柄”进行操作。用于恢复协程的执行或销毁协程帧。我看的时候不理解这个handle和promise的对象是什么关系?这里多说一句,就是可以从promise能够获取handle。handle也可以跟promise联系起来,从promise拿到东西
  • 协程状态coroutine state实际上也就是coroutine frame,这是内部的(实际上就是说用户是看不到的)、动态分配的存储(除非分配被优化掉),该对象包含
    • promise对象、可以理解为通过frame->promise.get_return_object()获取,就是在frame里面保存
    • 参数(全部按值复制)、 (这里要注意一个事情,promise对象和参数是没有直接关系的)
    • 当前挂起点的某种表示,以便恢复运行时知道从何处继续,销毁时知道哪些局部变量在作用域内
    • 生命周期跨越当前挂起点的局部变量和临时变量。

当协程开始执行时,它执行以下操作:

  • 使用operator new分配协程状态对象。
  • 将所有函数参数复制到协程状态:按值传递的参数被移动或复制,按引用传递的参数保持为引用(因此,如果在被引用对象的生命周期结束后恢复协程,可能会变成悬空引用 —— 示例见下文)。
  • 调用promise对象的构造函数。如果承诺类型有一个接受所有协程参数的构造函数,则使用复制后的协程参数调用该构造函数。否则,调用默认构造函数。
  • 调用promise.get_return_object()并将结果保存在局部变量中。当协程首次挂起时,该调用的结果将返回给调用者。在此步骤及之前抛出的任何异常都会传播回调用者,而不会放入promise对象中。
  • 调用promise.initial_suspend()并等待其结果。典型的Promise类型要么返回std::suspend_always(用于延迟启动的协程),要么返回std::suspend_never(用于立即启动的协程)。
  • 当co_await promise.initial_suspend()恢复时,开始执行协程体。

当协程到达暂停点时

  • 如有必要,在隐式转换为协程的返回类型后,先前获得的返回对象将返回给调用者/恢复者。

当协程到达co_return语句时,它会执行以下操作:

  • 表达式为co_return; & co_return expr; (其中expr的类型为void),调用promise.return_void();
  • 对于co_return expr; (其中expr的类型为非void),调用promise.return_value(expr);
  • 以与创建顺序相反的顺序销毁所有具有自动存储期的变量。
  • 调用promise.final_suspend() 并co_await结果。从协程末尾退出等同于co_return; ,

执行到协程末尾等同于co_return,但如果在Promise的作用域中找不到return_void的声明,则行为未定义。

在其函数体中没有任何定义关键字的函数不是协程,无论其返回类型如何,如果返回类型不是(可能带有cv限定符的)void,则从末尾退出会导致未定义行为。

协程的动态分配
#

协程状态是通过非数组形式的new运算符动态分配的。

如果Promise类型定义了类级别的替换函数,将使用该函数,否则将使用全局的new运算符。

如果Promise类型定义了带额外参数的new运算符的定位形式,且这些参数与一个参数列表匹配,其中第一个参数是请求的大小(类型为std::size_t),其余参数是协程函数的参数,那么这些参数将被传递给new运算符(这使得协程可以使用前导分配器约定)。

如果满足以下条件,对new运算符的调用可以被优化掉(即使使用了自定义分配器):

  1. 协程状态的生命周期严格嵌套在调用者的生命周期内;
  2. 协程帧的大小在调用点是已知的。

在这种情况下,协程状态会嵌入到调用者的栈帧中(如果调用者是个function,很合理,因为对于普通函数栈是会一直存在直到推出),或者嵌入到调用者的协程状态中(如果调用者是协程,也很合理,协程嵌套协程嘛)。

如果分配失败,协程会抛出std::bad_alloc,除非Promise类型定义了成员函数Promise::get_return_object_on_allocation_failure()。如果定义了该成员函数,分配将使用new运算符的不抛出异常形式,并且在分配失败时,协程会立即将从Promise::get_return_object_on_allocation_failure()获得的对象返回给调用者,例如:

看看关键的数据和表达式
#

这里我建议还是看看https://theshoemaker.de/posts/yet-another-cpp-coroutine-tutorial,这个感觉比cppreference说的更好理解,毕竟不会上来就塞你一堆概念,而且重点放到了Task里面

Promise
#

co_await
#

这里需要注意下,corountine不一定是awaitable的,这样子

一元运算符 co_await 会暂停协程并将控制权返回给调用方。co_await对应的地方实际上很多时候都执行的是一些根本不知道啥时候执行完成的东西,所以这里有个问题,怎么知道await对象是不是执行完了呢?当第一次执行到co_await的时候,通过awaiter对象的await_ready判断(这里需要重点理解awaitable对象,也就是awaiter是我们需要重点实现的)。但是后面恢复的时候,就不管了。co_await 的核心设计目标是实现异步操作的挂起与恢复,而不是直接暴露“某个暂停点是否完成”的状态。要判断 co_await 暂停的操作是否完成,需依赖 awaitable 对象内部的状态管理

部分实现是将awaiter的await_suspend函数里面,把要执行的东西做了,然后调用handle.resume()。handle.resume()会调用awaiter的await_resume函数。

格式为 co_await expr

co_await 表达式只能出现在常规函数体(包括 lambda 表达式的函数体)中可能被求值的表达式内,不能出现在

  • 在处理程序中,在声明语句中(除非它出现在该声明语句的初始化器中),在初始化语句(见 if、switch、for 和 [[../range-for|范围 for]])的简单声明中(除非它出现在该初始化语句的初始化器中),在默认实参中,或者在具有静态或线程存储期的块作用域变量的初始化器中。
  1. 执行1:首先,表达式按如下方式转换为可等待对象(awaitable):

    • 如果表达式由初始挂起点、最终挂起点或yield表达式生成,则可等待对象即为该表达式本身。

    • 否则,如果当前协程的Promise类型具有成员函数await_transform,那么可等待对象为promise.await_transform(expr)。

    • 否则,可等待对象即为表达式 expr,保持原样。

  2. 执行2:然后,获取等待器(awaiter )对象(从这里我们就知道了,实际上co_await是返回了一个awaiter对象,并且在awaiter对象上面执行操作),如下所示:

    • 如果对co_await操作符的重载解析给出单个最佳重载,等待器就是该调用的结果:
    • co_await的对象,重载了co_await运算符,换言之有awaitable.operator co_await(),对于非成员重载为operator co_await(static_cast<Awaitable&&>(awaitable))。否则,如果重载解析找不到co_await操作符,等待器就是awaitable本身。
    • 否则,如果重载解析不明确,程序格式错误。如果上述表达式是一个纯右值,等待器对象是由此具体化的临时对象。
    • 否则,如果上述表达式是一个左值,等待器对象就是它所引用的对象。
  3. 执行3:然后,调用awaiter.await_ready()(如果已知结果已准备好或可以同步完成,这是一种避免挂起开销的捷径)。如果其结果在上下文转换为bool类型后为false,则协程被挂起(其协程状态填充有局部变量和当前挂起点)。调用awaiter.await_suspend(handle),一些具体执行的操作可以在这里执行。其中handle是表示当前协程的协程句柄,这句话的理解实际上很乱,说白了就是谁调用对应awaitable对象,就是谁的handle。假设如果co_await的expr是个协程,那么调用co_await expr这句话的协程,它的句柄会被放到返回Awaiter::await_suspend里面。在该函数内部,通过该句柄可以观察到被挂起的协程状态,并且该函数有责任在某个执行器上调度它恢复执行,或者将其销毁(返回false视为调度)。

    • 如果await_suspend返回void,控制权立即返回给当前协程的调用者/恢复者(此协程仍保持挂起状态),

      • 否则,如果await_suspend返回bool
        • true将控制权返回给当前协程的调用者/恢复者,
        • false恢复当前协程。
    • 如果await_suspend返回另一个协程的协程句柄,则通过调用handle.resume()恢复该句柄(注意这可能最终导致当前协程恢复)。

    • 如果await_suspend抛出异常,捕获该异常,恢复协程,并立即重新抛出该异常。

  4. 执行4 :最后,调用awaiter.await_resume()(无论协程是否被挂起),其结果就是整个co_await expr表达式的结果。如果协程在co_await表达式中被挂起,之后又被恢复,恢复点就在调用awaiter.await_resume()之前。请注意,协程在进入awaiter.await_suspend()之前已完全挂起。它的句柄可以与另一个线程共享,并在await_suspend()函数返回之前恢复。(请注意,默认的内存安全规则仍然适用,因此如果协程句柄在没有锁的情况下跨线程共享,等待器至少应使用释放语义,恢复者至少应使用获取语义。)例如,协程句柄可以放在一个回调函数中,安排在异步I/O操作完成时在线程池上运行。在这种情况下,由于当前协程可能已被恢复并因此执行了等待器对象的析构函数,在await_suspend()在当前线程上继续执行的同时,await_suspend()应将*this视为已销毁,并且在将句柄发布到其他线程后不应再访问它。

结尾
#

唉,尴尬

狗头的赞赏码.jpg