雖然 Queuer 提供了具備時序控制的同步佇列功能,AsyncQueuer 則是專門設計來處理並發的非同步操作。它實現了傳統上稱為「任務池 (task pool)」或「工作池 (worker pool)」的模式,允許多個操作同時處理,同時保持對並發數量和時序的控制。此實現主要複製自 Swimmer,這是 Tanner 自 2017 年以來為 JavaScript 社群提供的原始任務池工具。
非同步佇列透過增加並發處理能力擴展了基本佇列概念。相較於一次處理一個項目,非同步佇列可以同時處理多個項目,同時仍保持執行順序和控制。這在處理 I/O 操作、網路請求或任何大部分時間處於等待狀態而非消耗 CPU 的任務時特別有用。
Async Queueing (concurrency: 2, wait: 2 ticks)
Timeline: [1 second per tick]
Calls: ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️
Queue: [ABC] [C] [CDE] [E] []
Active: [A,B] [B,C] [C,D] [D,E] [E]
Completed: - A B C D,E
[=================================================================]
^ 與常規佇列不同,多個項目
可以並發處理
[項目排隊] [一次處理 2 個] [完成]
當忙碌時 在等待間隔後 所有項目
Async Queueing (concurrency: 2, wait: 2 ticks)
Timeline: [1 second per tick]
Calls: ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️
Queue: [ABC] [C] [CDE] [E] []
Active: [A,B] [B,C] [C,D] [D,E] [E]
Completed: - A B C D,E
[=================================================================]
^ 與常規佇列不同,多個項目
可以並發處理
[項目排隊] [一次處理 2 個] [完成]
當忙碌時 在等待間隔後 所有項目
非同步佇列在以下情況特別有效:
常見使用案例包括:
AsyncQueuer 非常多功能,可以在許多情況下使用。實際上,只有當你不打算利用其所有功能時,它才不是一個好的選擇。如果你不需要所有排隊的執行都通過,請改用 [Throttling][../guides/throttling]。如果你不需要並發處理,請改用 [Queueing][../guides/queueing]。
TanStack Pacer 透過簡單的 asyncQueue 函式和更強大的 AsyncQueuer 類別提供非同步佇列功能。
asyncQueue 函式提供了一種簡單的方式來創建一個始終運行的非同步佇列:
import { asyncQueue } from '@tanstack/pacer'
// 創建一個最多同時處理 2 個項目的佇列
const processItems = asyncQueue<string>({
concurrency: 2,
onItemsChange: (queuer) => {
console.log('Active tasks:', queuer.getActiveItems().length)
}
})
// 添加非同步任務進行處理
processItems(async () => {
const result = await fetchData(1)
return result
})
processItems(async () => {
const result = await fetchData(2)
return result
})
import { asyncQueue } from '@tanstack/pacer'
// 創建一個最多同時處理 2 個項目的佇列
const processItems = asyncQueue<string>({
concurrency: 2,
onItemsChange: (queuer) => {
console.log('Active tasks:', queuer.getActiveItems().length)
}
})
// 添加非同步任務進行處理
processItems(async () => {
const result = await fetchData(1)
return result
})
processItems(async () => {
const result = await fetchData(2)
return result
})
asyncQueue 函式的使用有些限制,因為它只是 AsyncQueuer 類別的包裝,僅公開了 addItem 方法。如需更多對佇列的控制,請直接使用 AsyncQueuer 類別。
AsyncQueuer 類別提供了對非同步佇列行為的完整控制:
import { AsyncQueuer } from '@tanstack/pacer'
const queue = new AsyncQueuer<string>({
concurrency: 2, // 一次處理 2 個項目
wait: 1000, // 在開始新項目之間等待 1 秒
started: true // 立即開始處理
})
// 添加錯誤和成功處理器
queue.onError((error) => {
console.error('Task failed:', error)
})
queue.onSuccess((result) => {
console.log('Task completed:', result)
})
// 添加非同步任務
queue.addItem(async () => {
const result = await fetchData(1)
return result
})
queue.addItem(async () => {
const result = await fetchData(2)
return result
})
import { AsyncQueuer } from '@tanstack/pacer'
const queue = new AsyncQueuer<string>({
concurrency: 2, // 一次處理 2 個項目
wait: 1000, // 在開始新項目之間等待 1 秒
started: true // 立即開始處理
})
// 添加錯誤和成功處理器
queue.onError((error) => {
console.error('Task failed:', error)
})
queue.onSuccess((result) => {
console.log('Task completed:', result)
})
// 添加非同步任務
queue.addItem(async () => {
const result = await fetchData(1)
return result
})
queue.addItem(async () => {
const result = await fetchData(2)
return result
})
AsyncQueuer 支援不同的佇列策略來處理各種處理需求。每種策略決定了任務如何被添加和從佇列中處理。
FIFO 佇列按照任務添加的順序處理它們,非常適合保持順序:
const queue = new AsyncQueuer<string>({
addItemsTo: 'back', // 預設
getItemsFrom: 'front', // 預設
concurrency: 2
})
queue.addItem(async () => 'first') // [first]
queue.addItem(async () => 'second') // [first, second]
// 處理順序: first 和 second 並發
const queue = new AsyncQueuer<string>({
addItemsTo: 'back', // 預設
getItemsFrom: 'front', // 預設
concurrency: 2
})
queue.addItem(async () => 'first') // [first]
queue.addItem(async () => 'second') // [first, second]
// 處理順序: first 和 second 並發
LIFO 堆疊優先處理最近添加的任務,適合優先處理新任務:
const stack = new AsyncQueuer<string>({
addItemsTo: 'back',
getItemsFrom: 'back', // 優先處理最新項目
concurrency: 2
})
stack.addItem(async () => 'first') // [first]
stack.addItem(async () => 'second') // [first, second]
// 處理順序: second 先,然後 first
const stack = new AsyncQueuer<string>({
addItemsTo: 'back',
getItemsFrom: 'back', // 優先處理最新項目
concurrency: 2
})
stack.addItem(async () => 'first') // [first]
stack.addItem(async () => 'second') // [first, second]
// 處理順序: second 先,然後 first
優先佇列根據任務的優先級值處理任務,確保重要任務優先處理。有兩種指定優先級的方式:
const priorityQueue = new AsyncQueuer<string>({
concurrency: 2
})
// 創建帶有靜態優先級值的任務
const lowPriorityTask = Object.assign(
async () => 'low priority result',
{ priority: 1 }
)
const highPriorityTask = Object.assign(
async () => 'high priority result',
{ priority: 3 }
)
const mediumPriorityTask = Object.assign(
async () => 'medium priority result',
{ priority: 2 }
)
// 以任意順序添加任務 - 它們將按優先級處理(數字越大優先級越高)
priorityQueue.addItem(lowPriorityTask)
priorityQueue.addItem(highPriorityTask)
priorityQueue.addItem(mediumPriorityTask)
// 處理順序: high 和 medium 並發,然後 low
const priorityQueue = new AsyncQueuer<string>({
concurrency: 2
})
// 創建帶有靜態優先級值的任務
const lowPriorityTask = Object.assign(
async () => 'low priority result',
{ priority: 1 }
)
const highPriorityTask = Object.assign(
async () => 'high priority result',
{ priority: 3 }
)
const mediumPriorityTask = Object.assign(
async () => 'medium priority result',
{ priority: 2 }
)
// 以任意順序添加任務 - 它們將按優先級處理(數字越大優先級越高)
priorityQueue.addItem(lowPriorityTask)
priorityQueue.addItem(highPriorityTask)
priorityQueue.addItem(mediumPriorityTask)
// 處理順序: high 和 medium 並發,然後 low
const dynamicPriorityQueue = new AsyncQueuer<string>({
concurrency: 2,
getPriority: (task) => {
// 根據任務屬性或其它因素計算優先級
// 數字越大優先級越高
return calculateTaskPriority(task)
}
})
// 添加任務 - 優先級將動態計算
dynamicPriorityQueue.addItem(async () => {
const result = await processTask('low')
return result
})
dynamicPriorityQueue.addItem(async () => {
const result = await processTask('high')
return result
})
const dynamicPriorityQueue = new AsyncQueuer<string>({
concurrency: 2,
getPriority: (task) => {
// 根據任務屬性或其它因素計算優先級
// 數字越大優先級越高
return calculateTaskPriority(task)
}
})
// 添加任務 - 優先級將動態計算
dynamicPriorityQueue.addItem(async () => {
const result = await processTask('low')
return result
})
dynamicPriorityQueue.addItem(async () => {
const result = await processTask('high')
return result
})
優先佇列在以下情況特別重要:
AsyncQueuer 提供了全面的錯誤處理能力,確保穩健的任務處理。你可以在佇列級別和單個任務級別處理錯誤:
const queue = new AsyncQueuer<string>()
// 全域錯誤處理
const queue = new AsyncQueuer<string>({
onError: (error) => {
console.error('Task failed:', error)
},
onSuccess: (result) => {
console.log('Task succeeded:', result)
},
onSettled: (result) => {
if (result instanceof Error) {
console.log('Task failed:', result)
} else {
console.log('Task succeeded:', result)
}
}
})
// 每個任務的錯誤處理
queue.addItem(async () => {
throw new Error('Task failed')
}).catch(error => {
console.error('Individual task error:', error)
})
const queue = new AsyncQueuer<string>()
// 全域錯誤處理
const queue = new AsyncQueuer<string>({
onError: (error) => {
console.error('Task failed:', error)
},
onSuccess: (result) => {
console.log('Task succeeded:', result)
},
onSettled: (result) => {
if (result instanceof Error) {
console.log('Task failed:', result)
} else {
console.log('Task succeeded:', result)
}
}
})
// 每個任務的錯誤處理
queue.addItem(async () => {
throw new Error('Task failed')
}).catch(error => {
console.error('Individual task error:', error)
})
AsyncQueuer 提供了多種方法來監控和控制佇列狀態:
// 佇列檢查
queue.getPeek() // 查看下一個項目而不移除它
queue.getSize() // 獲取當前佇列大小
queue.getIsEmpty() // 檢查佇列是否為空
queue.getIsFull() // 檢查佇列是否達到 maxSize
queue.getAllItems() // 獲取所有排隊項目的副本
queue.getActiveItems() // 獲取當前正在處理的項目
queue.getPendingItems() // 獲取等待處理的項目
// 佇列操作
queue.clear() // 移除所有項目
queue.reset() // 重置到初始狀態
queue.getExecutionCount() // 獲取已處理項目的數量
// 處理控制
queue.start() // 開始處理項目
queue.stop() // 暫停處理
queue.getIsRunning() // 檢查佇列是否正在處理
queue.getIsIdle() // 檢查佇列是否為空且未處理
// 佇列檢查
queue.getPeek() // 查看下一個項目而不移除它
queue.getSize() // 獲取當前佇列大小
queue.getIsEmpty() // 檢查佇列是否為空
queue.getIsFull() // 檢查佇列是否達到 maxSize
queue.getAllItems() // 獲取所有排隊項目的副本
queue.getActiveItems() // 獲取當前正在處理的項目
queue.getPendingItems() // 獲取等待處理的項目
// 佇列操作
queue.clear() // 移除所有項目
queue.reset() // 重置到初始狀態
queue.getExecutionCount() // 獲取已處理項目的數量
// 處理控制
queue.start() // 開始處理項目
queue.stop() // 暫停處理
queue.getIsRunning() // 檢查佇列是否正在處理
queue.getIsIdle() // 檢查佇列是否為空且未處理
AsyncQueuer 提供了三種類型的回調來監控任務執行:
const queue = new AsyncQueuer<string>()
// 處理成功的任務完成
const unsubSuccess = queue.onSuccess((result) => {
console.log('Task succeeded:', result)
})
// 處理任務錯誤
const unsubError = queue.onError((error) => {
console.error('Task failed:', error)
})
// 無論成功或失敗都處理任務完成
const unsubSettled = queue.onSettled((result) => {
if (result instanceof Error) {
console.log('Task failed:', result)
} else {
console.log('Task succeeded:', result)
}
})
// 不再需要時取消訂閱回調
unsubSuccess()
unsubError()
unsubSettled()
const queue = new AsyncQueuer<string>()
// 處理成功的任務完成
const unsubSuccess = queue.onSuccess((result) => {
console.log('Task succeeded:', result)
})
// 處理任務錯誤
const unsubError = queue.onError((error) => {
console.error('Task failed:', error)
})
// 無論成功或失敗都處理任務完成
const unsubSettled = queue.onSettled((result) => {
if (result instanceof Error) {
console.log('Task failed:', result)
} else {
console.log('Task succeeded:', result)
}
})
// 不再需要時取消訂閱回調
unsubSuccess()
unsubError()
unsubSettled()
當佇列達到其最大大小(由 maxSize 選項設置)時,新任務將被拒絕。AsyncQueuer 提供了處理和監控這些拒絕的方法:
const queue = new AsyncQueuer<string>({
maxSize: 2, // 只允許 2 個任務在佇列中
onReject: (task, queuer) => {
console.log('Queue is full. Task rejected:', task)
}
})
queue.addItem(async () => 'first') // 接受
queue.addItem(async () => 'second') // 接受
queue.addItem(async () => 'third') // 拒絕,觸發 onReject 回調
console.log(queue.getRejectionCount()) // 1
const queue = new AsyncQueuer<string>({
maxSize: 2, // 只允許 2 個任務在佇列中
onReject: (task, queuer) => {
console.log('Queue is full. Task rejected:', task)
}
})
queue.addItem(async () => 'first') // 接受
queue.addItem(async () => 'second') // 接受
queue.addItem(async () => 'third') // 拒絕,觸發 onReject 回調
console.log(queue.getRejectionCount()) // 1
你可以在創建非同步佇列時預先填充初始任務:
const queue = new AsyncQueuer<string>({
initialItems: [
async () => 'first',
async () => 'second',
async () => 'third'
],
started: true // 立即開始處理
})
// 佇列開始時有三個任務並立即開始處理它們
const queue = new AsyncQueuer<string>({
initialItems: [
async () => 'first',
async () => 'second',
async () => 'third'
],
started: true // 立即開始處理
})
// 佇列開始時有三個任務並立即開始處理它們
AsyncQueuer 的選項可以在創建後使用 setOptions() 修改,並使用 getOptions() 檢索:
const queue = new AsyncQueuer<string>({
concurrency: 2,
started: false
})
// 更改配置
queue.setOptions({
concurrency: 4, // 同時處理更多任務
started: true // 開始處理
})
// 獲取當前配置
const options = queue.getOptions()
console.log(options.concurrency) // 4
const queue = new AsyncQueuer<string>({
concurrency: 2,
started: false
})
// 更改配置
queue.setOptions({
concurrency: 4, // 同時處理更多任務
started: true // 開始處理
})
// 獲取當前配置
const options = queue.getOptions()
console.log(options.concurrency) // 4
AsyncQueuer 提供了監控活動和待處理任務的方法:
const queue = new AsyncQueuer<string>({
concurrency: 2
})
// 添加一些任務
queue.addItem(async () => {
await new Promise(resolve => setTimeout(resolve, 1000))
return 'first'
})
queue.addItem(async () => {
await new Promise(resolve => setTimeout(resolve, 1000))
return 'second'
})
queue.addItem(async () => 'third')
// 監控任務狀態
console.log(queue.getActiveItems().length) // 當前正在處理的任務
console.log(queue.getPendingItems().length) // 等待處理的任務
const queue = new AsyncQueuer<string>({
concurrency: 2
})
// 添加一些任務
queue.addItem(async () => {
await new Promise(resolve => setTimeout(resolve, 1000))
return 'first'
})
queue.addItem(async () => {
await new Promise(resolve => setTimeout(resolve, 1000))
return 'second'
})
queue.addItem(async () => 'third')
// 監控任務狀態
console.log(queue.getActiveItems().length) // 當前正在處理的任務
console.log(queue.getPendingItems().length) // 等待處理的任務
每個框架適配器圍繞非同步佇列類別構建了方便的鉤子和函式。像 useAsyncQueuer 或 useAsyncQueuedState 這樣的鉤子是小包裝器,可以減少你在自己代碼中對一些常見用例所需的樣板代碼。
Your weekly dose of JavaScript news. Delivered every Monday to over 100,000 devs, for free.