async-await-parallel

最近学习 Node.JS,很多教程都是在 Node 早期版本发布的,控制并发的手段大多使用 eventproxy,或者是一些其他的第三方库类似于bluebird,async。而 Node.JS 早已实现对原生async/await 的支持,而且我学习的框架是 koa2,也是使用async/await的一个框架。为了避免语法混乱,我希望有一个使用async/await的并发控制模块。

async-await-parallel

很简单的一个模块,本质是一个Promise 的队列函数,我还没有读懂源码,但是它的语法是asycn/await,没有更多的api,非常简单易用的一个模块。
用法:

1
2
3
4
5
6
7
8
9
const parallel = require('async-await-parallel')
await parallel([
async () => { ... },
async () => { ... },
async () => { ... },
async () => { ... },
async () => { ... },
], 2)

就是这么简单,第一个参数是 async 函数数组,第二个参数是并发数

源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
async function parallelMap (thunks, concurrency = 5) {
if (concurrency > 0) {
concurrency = Math.min(concurrency, thunks.length)
} else {
concurrency = thunks.length
}
let index = 0
let results = [ ]
await parallelPool(concurrency, async () => {
if (index < thunks.length) {
const currentIndex = index++
const thunk = thunks[currentIndex]
results[currentIndex] = await thunk.call(this)
}
return (index < thunks.length)
})
return results
}
/**
* Executes a given async `task` multiple times in parallel with a guaranteed
* max concurrency given by `size`.
*
* The task should be an async function which resolves to a boolean for whether
* or not there are more tasks to process.
*
* If any single task fails (eg, returns a rejected Promise), the pool will drain
* any remaining active tasks and reject the resulting Promsie.
*
* @param {Number} size
* @param {async Function(Void) => Boolean} task
*
* @return {Promise<Void>}
*/
async function parallelPool (size, task) {
let done = false
let active = 0
let errors = [ ]
return new Promise((resolve, reject) => {
function poolIterator () {
while (active < size && !done) {
active++
task()
.then((more) => {
if (--active <= 0 && (done || !more)) {
if (errors.length > 0) {
// at least one task failed
return reject(new Error(errors))
} else {
// all tasks completed successfully
return resolve()
}
} else if (more) {
poolIterator()
} else {
done = true
}
})
.catch((err) => {
errors.push(err)
done = true
if (--active <= 0) {
return reject(new Error(errors))
} else {
// wait until all active tasks are drained before rejecting the
// final result (no new tasks will be started now that we're in
// this error state)
}
})
}
}
poolIterator()
})
}