概觀
並行節點執行(Parallel node execution)可讓您同時執行多個 AI agent 節點,提升效能並實現複雜的工作流程。當您需要執行以下操作時,此功能特別有用:
- 同時透過不同的模型或方法處理相同的輸入
- 並行執行多個獨立操作
- 實作競爭性評估模式,即產生多個解決方案後進行比較
關鍵組件
Koog 中的並行節點執行由下列方法與資料結構組成。
方法
parallel():並行執行多個節點並收集其結果。
資料結構
ParallelResult:代表並行節點執行的完成結果。NodeExecutionResult:包含節點執行的輸出與內容(context)。
基本用法
並行執行節點
若要啟動節點的並行執行,請使用以下格式的 parallel 方法:
kotlin
val nodeName by parallel<Input, Output>(
firstNode, secondNode, thirdNode /* 如果需要,可增加更多節點 */
) {
// 合併策略放置於此,例如:
selectByMax { it.length }
}以下是並行執行三個節點並選擇長度最大之結果的實際範例:
kotlin
val calc by parallel<String, Int>(
nodeCalcTokens, nodeCalcSymbols, nodeCalcWords,
) {
selectByMax { it }
}上述程式碼並行執行 nodeCalcTokens、nodeCalcSymbols 和 nodeCalcWords 節點,並回傳值最大的結果。
合併策略
並行執行節點後,您需要指定如何合併結果。Koog 提供下列合併策略:
selectBy():根據述詞函式(predicate function)選擇結果。selectByMax():根據比較函式選擇具有最大值的結果。selectByIndex():根據選擇函式回傳的索引選擇結果。fold():使用運算函式將結果摺疊為單一值。
selectBy
根據述詞函式選擇結果:
kotlin
val nodeSelectJoke by parallel<String, String>(
nodeOpenAI, nodeAnthropicSonnet, nodeAnthropicOpus,
) {
selectBy { it.contains("programmer") }
}這會選擇第一個包含「programmer」單字的笑話。
selectByMax
根據比較函式選擇具有最大值的結果:
kotlin
val nodeLongestJoke by parallel<String, String>(
nodeOpenAI, nodeAnthropicSonnet, nodeAnthropicOpus,
) {
selectByMax { it.length }
}這會選擇長度最長的笑話。
selectByIndex
根據選擇函式回傳的索引選擇結果:
kotlin
val nodeBestJoke by parallel<String, String>(
nodeOpenAI, nodeAnthropicSonnet, nodeAnthropicOpus,
) {
selectByIndex { jokes ->
// 使用另一個 LLM 來判定最佳笑話
llm.writeSession {
model = OpenAIModels.Chat.GPT4o
appendPrompt {
system("You are a comedy critic. Select the best joke.")
user("Here are three jokes: ${jokes.joinToString("
\n")}")
}
val response = requestLLMStructured<JokeRating>()
response.getOrNull()!!.data.bestJokeIndex
}
}
}這會使用另一個 LLM 呼叫來判定最佳笑話的索引。
fold
使用運算函式將結果摺疊為單一值:
kotlin
val nodeAllJokes by parallel<String, String>(
nodeOpenAI, nodeAnthropicSonnet, nodeAnthropicOpus,
) {
fold("Jokes:
") { result, joke -> "$result
$joke" }
}這會將所有笑話結合成單一字串。
範例:最佳笑話 Agent
這是一個完整範例,使用並行執行從不同的 LLM 模型產生笑話並選出最佳笑話:
kotlin
val strategy = strategy("best-joke") {
// 為不同的 LLM 模型定義節點
val nodeOpenAI by node<String, String> { topic ->
llm.writeSession {
model = OpenAIModels.Chat.GPT4o
appendPrompt {
system("You are a comedian. Generate a funny joke about the given topic.")
user("Tell me a joke about $topic.")
}
val response = requestLLMWithoutTools()
response.parts.filterIsInstance<MessagePart.Text>().joinToString("
") { it.text }
}
}
val nodeAnthropicSonnet by node<String, String> { topic ->
llm.writeSession {
model = AnthropicModels.Sonnet_4_5
appendPrompt {
system("You are a comedian. Generate a funny joke about the given topic.")
user("Tell me a joke about $topic.")
}
val response = requestLLMWithoutTools()
response.parts.filterIsInstance<MessagePart.Text>().joinToString("
") { it.text }
}
}
val nodeAnthropicOpus by node<String, String> { topic ->
llm.writeSession {
model = AnthropicModels.Opus_4_6
appendPrompt {
system("You are a comedian. Generate a funny joke about the given topic.")
user("Tell me a joke about $topic.")
}
val response = requestLLMWithoutTools()
response.parts.filterIsInstance<MessagePart.Text>().joinToString("
") { it.text }
}
}
// 並行執行笑話產生並選擇最佳笑話
val nodeGenerateBestJoke by parallel(
nodeOpenAI, nodeAnthropicSonnet, nodeAnthropicOpus,
) {
selectByIndex { jokes ->
// 另一個 LLM(例如 GPT4o)將找出最有趣的笑話:
llm.writeSession {
model = OpenAIModels.Chat.GPT4o
appendPrompt {
prompt("best-joke-selector") {
system("You are a comedy critic. Give a critique for the given joke.")
user(
"""
Here are three jokes about the same topic:
${jokes.mapIndexed { index, joke -> "Joke $index:
$joke" }.joinToString("
\n")}
Select the best joke and explain why it's the best.
""".trimIndent()
)
}
}
val response = requestLLMStructured<JokeRating>()
val bestJoke = response.getOrNull()!!.data
bestJoke.bestJokeIndex
}
}
}
// 連接節點
nodeStart then nodeGenerateBestJoke then nodeFinish
}最佳實務
考量資源限制:並行執行節點時,請留意資源使用情況,特別是同時進行多個 LLM API 呼叫時。
內容管理:每次並行執行都會建立一個分叉的內容(forked context)。合併結果時,請選擇要保留哪個內容,或如何結合來自不同執行的內容。
針對您的使用案例進行優化:
- 對於競爭性評估(如笑話範例),使用
selectByIndex選擇最佳結果 - 對於尋找最大值,使用
selectByMax - 對於基於條件的篩選,使用
selectBy - 對於聚合操作,使用
fold將所有結果結合成複合輸出
- 對於競爭性評估(如笑話範例),使用
效能考量
並行執行可顯著提升吞吐量,但也會帶來一些開銷:
- 每個並行節點都會建立一個新的協同程式(coroutine)
- 內容的分叉與合併會增加一些運算成本
- 當並行執行過多時,可能會發生資源爭奪
為了獲得最佳效能,請對以下操作進行並行化:
- 彼此獨立的操作
- 具有顯著執行時間的操作
- 不共享可變狀態的操作
