异步任务队列实现

我承认确实有些标题党,我认为任务队列就是为异步任务而生的,同步任务不需要任务队列,所以任务队列本就应该是异步任务队列。这个任务队列也可以叫做任务调度器,主要是控制任务在NodeJavaScript任务队列的的入队。其需求来源于上一篇文章《一个批量链接替换小工具》,上一篇文章水话比较多,这篇算是对上一篇的补充,加点干粮,建议在读之前先了解下实际需求。

任务队列可以理解为存储任务的队列,其本质还是队列,特点是先进先出,这个不必多说。不同于普通队列,只存储数据,任务队列是存放的要执行的任务,所以任务队列的先进先出应该理解为先进先执行。先来列举一下这个任务队列的特性:

  1. 只有一个 enqueue API,用于任务入队
  2. 可以控制任务执行并发数
  3. 支持同步和异步任务,执行顺序和入队顺序相同

设计思路

队列的表示

设计一个队列应该首先思考,能不能基于JavaScript的数据类型来表示队列,如果没有再自己实现,一个队列应满足两个要求:

  1. 可以存储任意数量的值
  2. 各值的存储或者访问是有序的

因此,基于 JavaScript 数据类型来实现,可以采用这些数据结构:

  • Array
  • Object(key 全为数字字符串,或者非数字字符串、Symbol)
  • Map
  • WeakMap

任务的表示

任务就是一个函数,一个函数最重要的部分是输入和执行输出,可以没有输出,所以表示一个任务应该有函数体和参数两部分,可以如下表示:

JavaScript
1
2
3
4
5
6
7
8
const task = {
func: (...props) => {
props.forEach((prop) => {
console.log(prop);
});
},
payload: [prop1, prop2, ...props],
};

执行任务

这个任务队列并没有提供出队(也就是执行任务)API,原因是没有必要,如果手动调用出队,和直接手动调用任务函数无异,先放入队列再手动调用多此一举,所以这个任务队列设计之初就是自执行任务的。先不考虑自执行任务,一个任务的执行应该是这样的:

JavaScript
1
2
3
4
5
6
new Promise((resolve) => {
task[func](...payload);
resolve();
}).then(() => {
console.log("run complete");
});

之所以使用Promise包裹,主要是让同时含同步和异步任务的队列按照入队顺序执行,同时添加一个 then 作为回调,可以作为任务执行完毕的通知,后边我们会用到。

并发控制

并发控制是我们写任务队列的主要目的,因为 JavaScript 的事件循环只会将你的异步代码胡乱的塞进任务队列,不管有多少,这样虽然更快,但在一些情况下无法满足业务要求,甚至还会引发一些问题。所谓并发控制,就是控制哪些任务『同时执行』,举个银行排队叫号的例子,假如只有 1 个窗口,那么这个银行同一时间只能办理一个业务,如果有 5 个窗口,那么同一时间就能处理 5 个业务,银行同时能处理几个业务取决于窗口的数量,所以并发控制依赖于一个有固定长度的队列。

要实现并发控制,我们至少需要两个队列,接着上面的例子,一个队列表示银行正在排队的客户人数,一个队列来表示正在办理业务的客户数,我们限制正在办理的客户数,而不限制排队的人数。就是说两个队列,一个队列的长度是固定的,其值代表并发执行的数量,另一个队列长度无限制,两个队列的每一个元素都是相同的结构。

让队列动起来

现在有两个队列,入队、执行和出队就不像只有一个队列的时候那么简单了。个人认为这个是任务队列的核心,任务队列的工作流程如下:入队时,将任务放进无长度限制的排队队列,然后判断执行队列长度是否达到最高并发数,如果没有,就将该任务移动到执行队列中,并执行该任务,任务执行完成后,执行回调,将该任务从执行队列中移除。执行队列有任务被移除后检查排队队列是否有任务,有任务就将排队队列任务移动到执行任务队列中执行。如此往复,直至排队队列为空且执行队列也为空。

代码实现

一个简单的队列用了约 57 行 JavaScript 代码实现。

队列的选型

在一开始就提到,在 JavaScript 中可以借助四种内置的数据类型来实现,那到底应该选哪一种呢?分析一下,可能很多同学会选择Array来作为底层的数据结构,但是我们要实现的是任务队列,有同步有异步任务,异步任务的执行时机是确定的,即为任务队列的入队顺序。但是,其出队顺序是不确定的,异步任务耗时可长可短,存在后入队任务先执行完成的情况,如果按照记录的索引来出队,一旦前面的节点更改,后面的节点索引就会变化,这样就会造成错误的出队。

对于Object类型,主要问题在于其 key,当字符串数字作为 key 时会使其迭代顺序混乱,而且 key 不能重复,假如以随机字符串作为 key,这样会多一个随机字符串的生成步骤,耗费性能。假如以Symbol作为 key,我们用Object.keys方法无法获取到其 key,这样就不能取到第一个元素,找不到队头,就不能实现队列。WeakMap作为 key 也是同样的问题,其无法迭代,也不能获取第一个元素,同样是找不到队头的问题。

Map作为底层数据结构的话,不仅不会出现上述问题,而且 JavaScript 引擎也专门对 Map 做了优化,在频繁增删键值对的场景下性能更好,而且使用Symbol作为 key 的性能也比使用字符串作为 key 好。所以队列采用了 Map 的数据类型,Symbol作为 key,不需要生成随机字符串。在我们提到的两个队列中,有固定长度的执行队列其实并不一定要是队列,但如果两个都是相同的数据结构就能用同一套方法去处理他们。

整体结构与初始化

首先,这个任务队列初始化时一定要接收一个参数,即最大并发数。

JavaScript
1
function Queue(max = 1) {}

任务队列的初始化可以采用 new 和无 new 的方式,由于 this 的指向易混乱,就采用无 new 的方式,直接返回一个对象。根据上边的队列选型,构建两个队列:

JavaScript
1
2
3
4
function Queue(max = 1) {
const queue = new Map(); // 未执行任务队列(排队队列)
const active = new Map(); // 正在执行的任务队列(执行队列)
}

整个队列只需要一个 API,即 enqueue 入队方法,而且操作的对象都是 queue 对象, active 只是辅助作用,为了避免在外部直接访问到他们的方法,可以使用 Proxy 做代理,屏蔽掉外部访问其他方法和属性,只允许 enqueue 方法,于是,整体代码应该是这样的:

JavaScript
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
function Queue(max = 1) {
const queue = new Map(); // 未执行任务队列
const active = new Map(); // 正在执行的任务队列

return new Proxy(queue, {
get(target, property) {
const value = Reflect.get(...arguments);
if (target[property] instanceof Function) {
if (target[property]["name"] === "enqueue") {
// 只允许 enqueue 方法
return value.bind(target[property]);
}
return new Error(`${property} of queue is not a function`);
}
return undefined;
},
});
}

入队操作

dequeue 方法在内部使用,出队即执行,所以这个方法有执行任务的功能。

JavaScript
1
2
3
4
5
6
7
queue.enqueue = function enqueue(task) {
const key = Symbol();
queue.set(key, task);
// 入队即判断是否能执行
queue.dequeue();
return;
};

执行任务

任务的执行需要先判断执行队列是否满了,满了就不执行。 ok 方法用于通知一个任务执行,接下来应该更新队列。因为我们使用了 Promise 的 thenable 特性,所以需要手动控制触发 resolve 方法,在任务代码里边第一个参数用于触发,假如用 next 表示,在执行任务时,会将resolve作为next参数传入。

JavaScript
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
queue.dequeue = function dequeue() {
if (active.size >= max || queue.size === 0) {
// 限制最多同时运行的任务数
return;
}
const key = Array.from(queue.keys())[0];
const run = queue.get(key);

// 将任务添加在执行队列中,并将该任务从排队队列删除
active.set(key, run);
queue.delete(key);

// 执行新加入的任务
const { func, payload = [] } = run;
new Promise((resolve) => {
// 第一个参数用于触发 resolve
func(() => {
resolve();
}, ...payload);
}).then(() => {
queue.ok(key);
});
return;
};

执行完成回调

JavaScript
1
2
3
4
5
6
7
8
// 执行完成
queue.ok = function ok(task) {
// 将执行完成的任务从执行队列移除
active.delete(task);
// 通知可以执行任务
queue.dequeue();
return;
};

使用方法

初始化

由于使用无 new 方式构造队列,所以应该这样初始化:

JavaScript
1
const taskQueue = Queue(2); // 最大并发数 2

执行顺序

JavaScript
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 带参数的异步任务
taskQueue.enqueue({
func: (next, prop1, prop2) => {
console.log("Task1开始执行时间:", Date.now());
console.log(prop1);
setTimeout(() => {
console.log(prop2);
console.log("异步执行完成");
next();
}, 200);
},
payload: ["hello", "world"],
});

// 不带参数的同步任务
taskQueue.enqueue({
func: (next) => {
console.log("Task2开始执行时间:", Date.now());
next();
},
});

可以看到不管是同步任务还是异步任务都是按照入队顺序执行的:

运行结果
1
2
3
4
5
Task1开始执行时间: 1614776269577
hello
Task2开始执行时间: 1614776269583
world
异步执行完成

控制并发

刚才的最大并发数为 2,当这个值是默认值 1 时会是什么效果呢:

JavaScript
1
const taskQueue = Queue();
运行结果
1
2
3
4
5
Task1开始执行时间: 1614776456817
hello
world
异步执行完成
Task2开始执行时间: 1614776457029

不同于普通的 JavaScript 事件循环,同步任务在等待异步任务之后才执行,完全按照入队的顺序执行。

总结

很庆幸在写需求的时候能花时间来研究,其实写的时候还有很多思考的细节,就是过去的时间太久,没有总结,很多思考的过程都已经忘了,有逻辑不正确的地方希望能够指正。最开始做的时候想了很久,现在看来,整个设计方式以及实现都是非常简单的,但再看时又会觉得有的地方处理地非常巧妙,我也很难说清楚当时是怎么想到的了,如果再让我写一次不一定还能写到这种程度,觉得颇为神奇。

因为这个思路完全是按照我自己的想法写的,甚至没有参考代码与阅读资料,实现的思路肯定不止一种,如果有其他的实现思路,不妨评论留言。

其实这个缺点还是很明显的,对代码本身侵入性太强,需要手动传入一个next函数,这个是和上一篇的需求是相结合的,模仿了 Koanext机制,实际上我也不知道Koa是如何实现的,只能说是形似神不似。文章设计的全部代码实现已放在此处,如有问题欢迎指正。

作者

KylinLee

发布于

2021-03-26

更新于

2022-02-11

许可协议

CC BY-NC-SA 4.0

评论

Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×