一、单线程模型的设计
1. 最基础的单线程处理简单任务
假设有几个任务:
• 任务1: “姓名:” + “杭城小刘”
• 任务2: “年龄:” + “1995” + “02” + “20”
• 任务3: “大小:” + (2021 • 1995 + 1)
• 任务4: 打印任务1、2、3 的结果
在单线程中执行,代码可能如下:
//c
void mainThread () {
string name = "姓名:" + "杭城小刘";
string birthday = "年龄:" + "1995" + "02" + "20"
int age = 2021 • 1995 + 1;
printf("个人信息为:%s, %s, 大小:%d", name.c_str(), birthday.c_str(), age);
}
线程开始执行任务,按照需求,单线程依次执行每个任务,执行完毕后线程马上退出。
2. 线程运行过程中来了新的任务怎么处理?
问题1 介绍的线程模型太简单太理想了,不可能从一开始就 n 个任务就确定了,大多数情况下,会接收到新的 m 个任务。那么 section1 中的设计就无法满足该需求。
要在线程运行的过程中,能够接受并执行新的任务,就需要有一个事件循环机制。最基础的事件循环可以想到用一个循环来实现。
// c++
int getInput() {
int input = 0;
cout<< "请输入一个数";
cin>>input;
return input;
}
void mainThread () {
while(true) {
int input1 = getInput();
int input2 = getInput();
int sum = input1 + input2;
print("两数之和为:%d", sum);
}
}
相较于第一版线程设计,这一版做了以下改进:
• 引入了循环机制,线程不会做完事情马上退出。
• 引入了事件。线程一开始会等待用户输入,等待的时候线程处于暂停状态,当用户输入完毕,线程得到输入的信息,此时线程被激活。执行相加的操作,最终输出结果。不断的等待输入,并计算输出。
3. 处理来自其他线程的任务
真实环境中的线程模块远远没有这么简单。比如浏览器环境下,线程可能正在绘制,可能会接收到1个来自用户鼠标点击的事件,1个来自网络加载 css 资源完成的事件等等。第二版线程模型虽然引入了事件循环机制,可以接受新的事件任务,但是发现没?这些任务之来自线程内部,该设计是无法接受来自其他线程的任务的。
从上图可以看出,渲染主线程会频繁接收到来自于 IO 线程的一些事件任务,当接受到的资源加载完成后的消息,则渲染线程会开始 DOM 解析;当接收到来自鼠标点击的消息,渲染主线程则会执行绑定好的鼠标点击事件脚本(js)来处理事件。
需要一个合理的数据结构,来存放并获取其他线程发送的消息?
消息队列这个词大家都听过,在 GUI 系统中,事件队列是一个通用解决方案。
消息队列(事件队列)是一种合理的数据结构。要执行的任务添加到队列的尾部,需要执行的任务,从队列的头部取出。
有了消息队列之后,线程模型得到了升级。如下:
可以看出改造分为3个步骤:
• 构建一个消息队列
• IO 线程产生的新任务会被添加到消息队列的尾部
• 渲染主线程会循环的从消息队列的头部读取任务,执行任务
伪代码。构造队列接口部分
class TaskQueue {
public:
Task fetchTask (); // 从队列头部取出1个任务
void addTask (Task task); // 将任务插入到队列尾部
}
改造主线程
TaskQueue taskQueue;
void processTask ();
void mainThread () {
while (true) {
Task task = taskQueue.fetchTask();
processTask(task);
}
}
IO 线程
void handleIOTask () {
Task clickTask;
taskQueue.addTask(clickTask);
}
Tips: 事件队列是存在多线程访问的情况,所以需要加锁。
4. 处理来自其他线程的任务
浏览器环境中, 渲染进程经常接收到来自其他进程的任务,IO 线程专门用来接收来自其他进程传递来的消息。IPC 专门处理跨进程间的通信。
5. 消息队列中的任务类型
消息队列中有很多消息类型。内部消息:如鼠标滚动、点击、移动、宏任务、微任务、文件读写、定时器等等。
消息队列中还存在大量的与页面相关的事件。如 JS 执行、DOM 解析、样式计算、布局计算、CSS 动画等等。
上述事件都是在渲染主线程中执行的,因此编码时需注意,尽量减小这些事件所占用的时长。
6. 如何安全退出
Chrome 设计上,确定要退出当前页面时,页面主线程会设置一个退出标志的变量,每次执行完1个任务时,判断该标志。如果设置了,则中断任务,退出线程
7. 单线程的缺点
事件队列的特点是先进先出,后进后出。那后进的任务也许会被前面的任务因为执行时间过长而阻塞,等待前面的任务执行完毕才可以执行后面的任务。这样存在2个问题。
• 如何处理高优先级的任务
假如要监控 DOM 节点的变化情况(插入、删除、修改 innerHTML),然后触发对应的逻辑。最基础的做法就是设计一套监听接口,当 DOM 变化时,渲染引擎同步调用这些接口。不过这样子存在很大的问题,就是 DOM 变化会很频繁。如果每次 DOM 变化都触发对应的 JS 接口,则该任务执行会很长,导致执行效率的降低
如果将这些 DOM 变化做为异步消息,假如消息队列中。可能会存在因为前面的任务在执行导致当前的 DOM 消息不会被执行的问题,也就是影响了监控的实时性。
如何权衡效率和实时性?微任务 就是解决该类问题的。
通常,我们把消息队列中的任务成为宏任务,每个宏任务中都包含一个微任务队列,在执行宏任务的过程中,假如 DOM 有变化,则该变化会被添加到该宏任务的微任务队列中去,这样子效率问题得以解决。
当宏任务中的主要功能执行完毕欧,渲染引擎会执行微任务队列中的微任务。因此实时性问题得以解决
• 如何解决单个任务执行时间过长的问题
可以看出,假如 JS 计算超时导致动画 paint 超时,会造成卡顿。浏览器为避免该问题,采用 callback 回调的设计来规避,也就是让 JS 任务延后执行。
二、 flutter 里的单线程模型
1. event loop 机制
Dart 是单线程的,也就是代码会有序执行。此外 Dart 作为 Flutter 这一 GUI 框架的开发语言,必然支持异步。
一个 Flutter 应用包含一个或多个 isolate,默认方法的执行都是在 main isolate 中;一个 isolate 包含1个 Event loop 和1个 Task queue。其中,Task queue 包含1个 Event queue 事件队列和1个 MicroTask queue 微任务队列。如下:
为什么需要异步?因为大多数场景下 应用都并不是一直在做运算。比如一边等待用户的输入,输入后再去参与运算。这就是一个 IO 的场景。所以单线程可以再等待的时候做其他事情,而当真正需要处理运算的时候,再去处理。因此虽是单线程,但是给我们的感受是同事在做很多事情(空闲的时候去做其他事情)
某个任务涉及 IO 或者异步,则主线程会先去做其他需要运算的事情,这个动作是靠 event loop 驱动的。和 JS 一样,dart 中存储事件任务的角色是事件队列 event queue。
Event queue 负责存储需要执行的任务事件,比如 DB 的读取。
Dart 中存在2个队列,一个微任务队列(Microtask Queue)、一个事件队列(Event Queue)。
Event loop 不断的轮询,先判断微任务队列是否为空,从队列头部取出需要执行的任务。如果微任务队列为空,则判断事件队列是否为空,不为空则从头部取出事件(比如键盘、IO、网络事件等),然后在主线程执行其回调函数,如下:
2. 异步任务
微任务,即在一个很短的时间内就会完成的异步任务。微任务在事件循环中优先级最高,只要微任务队列不为空,事件循环就不断执行微任务,后续的事件队列中的任务持续等待。微任务队列可由 scheduleMicroTask
创建。
通常情况,微任务的使用场景比较少。Flutter 内部也在诸如手势识别、文本输入、滚动视图、保存页面效果等需要高优执行任务的场景用到了微任务。
所以,一般需求下,异步任务我们使用优先级较低的 Event Queue。比如 IO、绘制、定时器等,都是通过事件队列驱动主线程来执行的。
Dart 为 Event Queue 的任务提供了一层封装,叫做 Future。把一个函数体放入 Future 中,就完成了同步任务到异步任务的包装(类似于 iOS 中通过 GCD 将一个任务以同步、异步提交给某个队列)。Future 具备链式调用的能力,可以在异步执行完毕后执行其他任务(函数)。
看一段具体代码:
void main() {
print('normal task 1');
Future(() => print('Task1 Future 1'));
print('normal task 2');
Future(() => print('Task1 Future 2'))
.then((value) => print("subTask 1"))
.then((value) => print("subTask 2"));
}
//
lbp@MBP ~/Desktop dart index.dart
normal task 1
normal task 2
Task1 Future 1
Task1 Future 2
subTask 1
subTask 2
main 方法内,先添加了1个普通同步任务,然后以 Future 的形式添加了1个异步任务,Dart 会将异步任务加入到事件队列中,然后理解返回。后续代码继续以同步任务的方式执行。然后再添加了1个普通同步任务。然后再以 Future 的方式添加了1个异步任务,异步任务被加入到事件队列中。此时,事件队列中存在2个异步任务,Dart 在事件队列头部取出1个任务以同步的方式执行,全部执行(先进先出)完毕后再执行后续的 then。
Future 与 then 公用1个事件循环。如果存在多个 then,则按照顺序执行。
例2:
void main() {
Future(() => print('Task1 Future 1'));
Future(() => print('Task1 Future 2'));
Future(() => print('Task1 Future 3'))
.then((_) => print('subTask 1 in Future 3'));
Future(() => null).then((_) => print('subTask 1 in empty Future'));
}
lbp@MBP ~/Desktop dart index.dart
Task1 Future 1
Task1 Future 2
Task1 Future 3
subTask 1 in Future 3
subTask 1 in empty Future
main 方法内,Task 1 添加到 Future 1中,被 Dart 添加到 Event Queue 中。Task 1 添加到 Future 2中,被 Dart 添加到 Event Queue 中。Task 1 添加到 Future 3中,被 Dart 添加到 Event Queue 中,subTask 1 和 Task 1 共用 Event Queue。Future 4中任务为空,所以 then 里的代码会被加入到 Microtask Queue,以便下一轮事件循环中被执行。
综合例子
void main() {
Future(() => print('Task1 Future 1'));
Future fx = Future(() => null);
Future(() => print("Task1 Future 3")).then((value) {
print("subTask 1 Future 3");
scheduleMicrotask(() => print("Microtask 1"));
}).then((value) => print("subTask 3 Future 3"));
Future(() => print("Task1 Future 4"))
.then((value) => Future(() => print("sub subTask 1 Future 4")))
.then((value) => print("sub subTask 2 Future 4"));
Future(() => print("Task1 Future 5"));
fx.then((value) => print("Task1 Future 2"));
scheduleMicrotask(() => print("Microtask 2"));
print("normal Task");
}
lbp@MBP ~/Desktop dart index.dart
normal Task
Microtask 2
Task1 Future 1
Task1 Future 2
Task1 Future 3
subTask 1 Future 3
subTask 3 Future 3
Microtask 1
Task1 Future 4
Task1 Future 5
sub subTask 1 Future 4
sub subTask 2 Future 4
解释:
• Event Loop 优先执行 main 方法同步任务,再执行微任务,最后执行 Event Queue 的异步任务。所以 normal Task 先执行
• 同理微任务 Microtask 2 执行
• 其次,Event Queue FIFO,Task1 Future 1 被执行
• fx Future 内部为空,所以 then 里的内容被加到微任务队列中去,微任务优先级最高,所以 Task1 Future 2 被执行
• 其次,Task1 Future 3 被执行。由于存在2个 then,先执行第一个 then 中的 subTask 1 Future 3,然后遇到微任务,所以 Microtask 1 被添加到微任务队列中去,等待下一次 Event Loop 到来时触发。接着执行第二个 then 中的 subTask 3 Future 3。随着下一次 Event Loop 到来,Microtask 1 被执行
• 其次,Task1 Future 4 被执行。随后的第一个 then 中的任务又是被 Future 包装成一个异步任务,被添加到 Event Queue 中,第二个 then 中的内容也被添加到 Event Queue 中。
• 接着,执行 Task1 Future 5。本次事件循环结束
• 等下一轮事件循环到来,打印队列中的 sub subTask 1 Future 4、sub subTask 1 Future 5.
3. 异步函数
异步函数的结果在将来某个时刻才返回,所以需要返回一个 Future 对象,供调用者使用。调用者根据需求,判断是在 Future 对象上注册一个 then 等 Future 执行体结束后再进行异步处理,还是同步等到 Future 执行结束。Future 对象如果需要同步等待,则需要在调用处添加 await,且 Future 所在的函数需要使用 async 关键字。
await 并不是同步等待,而是异步等待。Event Loop 会将调用体所在的函数也当作异步函数,将等待语句的上下文整体添加到 Event Queue 中,一旦返回,Event Loop 会在 Event Queue 中取出上下文代码,等待的代码继续执行。
await 阻塞的是当前上下文的后续代码执行,并不能阻塞其调用栈上层的后续代码执行
void main() {
Future(() => print('Task1 Future 1'))
.then((_) async => await Future(() => print("subTask 1 Future 2")))
.then((_) => print("subTask 2 Future 2"));
Future(() => print('Task1 Future 2'));
}
lbp@MBP ~/Desktop dart index.dart
Task1 Future 1
Task1 Future 2
subTask 1 Future 2
subTask 2 Future 2
解析:
• Future 中的 Task1 Future 1 被添加到 Event Queue 中。其次遇到第一个 then,then 里面是 Future 包装的异步任务,所以 Future(() => print("subTask 1 Future 2"))
被添加到 Event Queue 中,所在的 await 函数也被添加到了 Event Queue 中。第二个 then 也被添加到 Event Queue 中
• 第二个 Future 中的 ‘Task1 Future 2 不会被 await 阻塞,因为 await 是异步等待(添加到 Event Queue)。所以执行 ‘Task1 Future 2。随后执行 “subTask 1 Future 2,接着取出 await 执行 subTask 2 Future 2
4. Isolate
Dart 为了利用多核 CPU,将 CPU 层面的密集型计算进行了隔离设计,提供了多线程机制,即 Isolate。每个 Isolate 资源隔离,都有自己的 Event Loop 和 Event Queue、Microtask Queue。Isolate 之间的资源共享通过消息机制通信(和进程一样)
使用很简单,创建时需要传递一个参数。
void coding(language) {
print("hello " + language);
}
void main() {
Isolate.spawn(coding, "Dart");
}
lbp@MBP ~/Desktop dart index.dart
hello Dart
大多数情况下,不仅仅需要并发执行。可能还需要某个 Isolate 运算结束后将结果告诉主 Isolate。可以通过 Isolate 的管道(SendPort)实现消息通信。可以在主 Isolate 中将管道作为参数传递给子 Isolate,当子 Isolate 运算结束后将结果利用这个管道传递给主 Isolate
void coding(SendPort port) {
const sum = 1 + 2;
// 给调用方发送结果
port.send(sum);
}
void main() {
testIsolate();
}
testIsolate() async {
ReceivePort receivePort = ReceivePort(); // 创建管道
Isolate isolate = await Isolate.spawn(coding, receivePort.sendPort); // 创建 Isolate,并传递发送管道作为参数
// 监听消息
receivePort.listen((message) {
print("data: $message");
receivePort.close();
isolate?.kill(priority: Isolate.immediate);
isolate = null;
});
}
lbp@MBP ~/Desktop dart index.dart
data: 3
此外 Flutter 中提供了执行并发计算任务的快捷方式-compute 函数。其内部对 Isolate 的创建和双向通信进行了封装。
实际上,业务开发中使用 compute 的场景很少,比如 JSON 的编解码可以用 compute。
计算阶乘:
int testCompute() async {
return await compute(syncCalcuateFactorial, 100);
}
int syncCalcuateFactorial(upperBounds) => upperBounds < 2
? upperBounds
: upperBounds * syncCalcuateFactorial(upperBounds • 1);
总结
• Dart 是单线程的,但通过事件循环可以实现异步
• Future 是异步任务的封装,借助于 await 与 async,我们可以通过事件循环实现非阻塞的同步等待
• Isolate 是 Dart 中的多线程,可以实现并发,有自己的事件循环与 Queue,独占资源。Isolate 之间可以通过消息机制进行单向通信,这些传递的消息通过对方的事件循环驱动对方进行异步处理。
• flutter 提供了 CPU 密集运算的 compute 方法,内部封装了 Isolate 和 Isolate 之间的通信
• 事件队列、事件循环的概念在 GUI 系统中非常重要,几乎在前端、Flutter、iOS、Android 甚至是 NodeJS 中都存在。