Skip to content

概觀

平行節點執行允許您同步執行多個 AI 代理節點,從而提升效能並啟用複雜的工作流程。此功能在以下情境中特別有用:

  • 同時透過不同的模型或方法處理相同的輸入
  • 平行執行多個獨立操作
  • 實作競爭性評估模式,其中會產生多個解決方案然後進行比較

關鍵元件

Koog 中的平行節點執行由以下所述的方法和資料結構組成。

方法

  • parallel(): 平行執行多個節點並收集其結果。

資料結構

  • ParallelResult: 代表平行節點執行完成的結果。
  • NodeExecutionResult: 包含節點執行的輸出和上下文。

基本用法

平行執行節點

若要啟動節點的平行執行,請使用以下格式的 parallel 方法:

kotlin
val nodeName by parallel<Input, Output>(
   firstNode, secondNode, thirdNode /* Add more nodes if needed */
) {
   // Merge strategy goes here, for example: 
   selectByMax { it.length }
}

這是一個實際範例,它平行執行三個節點並選取長度最大的結果:

kotlin
val calc by parallel<String, Int>(
   nodeCalcTokens, nodeCalcSymbols, nodeCalcWords,
) {
   selectByMax { it }
}

上述程式碼平行執行 nodeCalcTokensnodeCalcSymbolsnodeCalcWords 節點,並返回具有最大值的結果。

合併策略

平行執行節點後,您需要指定如何合併結果。Koog 提供以下合併策略:

  • selectBy(): 根據謂詞函式選取結果。
  • selectByMax(): 根據比較函式選取具有最大值的結果。
  • selectByIndex(): 根據選取函式返回的索引選取結果。
  • fold(): 使用操作函式將結果折疊成單一值。

selectBy

根據謂詞函式選取結果:

kotlin
val nodeSelectJoke by parallel<String, String>(
   nodeOpenAI, nodeAnthropicSonnet, nodeAnthropicOpus,
) {
   selectBy { it.contains("programmer") }
}

這會選取第一個包含「programmer」這個字的笑話。

selectByMax

根據比較函式選取具有最大值的結果:

kotlin
val nodeLongestJoke by parallel<String, String>(
   nodeOpenAI, nodeAnthropicSonnet, nodeAnthropicOpus,
) {
   selectByMax { it.length }
}

這會選取長度最大的笑話。

selectByIndex

根據選取函式返回的索引選取結果:

kotlin
val nodeBestJoke by parallel<String, String>(
   nodeOpenAI, nodeAnthropicSonnet, nodeAnthropicOpus,
) {
   selectByIndex { jokes ->
      // 使用另一個大型語言模型 (LLM) 來判斷最佳笑話
      llm.writeSession {
         model = OpenAIModels.Chat.GPT4o
         updatePrompt {
            system("You are a comedy critic. Select the best joke.")
            user("Here are three jokes: ${jokes.joinToString("
\n")}")
         }
         val response = requestLLMStructured<JokeRating>()
         response.getOrNull()!!.structure.bestJokeIndex
      }
   }
}

這會使用另一個大型語言模型 (LLM) 呼叫來判斷最佳笑話的索引。

fold

使用操作函式將結果折疊成單一值:

kotlin
val nodeAllJokes by parallel<String, String>(
   nodeOpenAI, nodeAnthropicSonnet, nodeAnthropicOpus,
) {
   fold("Jokes:
") { result, joke -> "$result
$joke" }
}

這會將所有笑話組合成一個單一字串。

範例:最佳笑話代理

這是一個完整範例,它使用平行執行從不同的 LLM 模型產生笑話並選取最佳笑話:

kotlin
val strategy = strategy("best-joke") {
   // 為不同的 LLM 模型定義節點
   val nodeOpenAI by node<String, String> { topic ->
      llm.writeSession {
         model = OpenAIModels.Chat.GPT4o
         updatePrompt {
            system("You are a comedian. Generate a funny joke about the given topic.")
            user("Tell me a joke about $topic.")
         }
         val response = requestLLMWithoutTools()
         response.content
      }
   }

   val nodeAnthropicSonnet by node<String, String> { topic ->
      llm.writeSession {
         model = AnthropicModels.Sonnet_3_5
         updatePrompt {
            system("You are a comedian. Generate a funny joke about the given topic.")
            user("Tell me a joke about $topic.")
         }
         val response = requestLLMWithoutTools()
         response.content
      }
   }

   val nodeAnthropicOpus by node<String, String> { topic ->
      llm.writeSession {
         model = AnthropicModels.Opus_3
         updatePrompt {
            system("You are a comedian. Generate a funny joke about the given topic.")
            user("Tell me a joke about $topic.")
         }
         val response = requestLLMWithoutTools()
         response.content
      }
   }

   // 平行執行笑話生成並選取最佳笑話
   val nodeGenerateBestJoke by parallel(
      nodeOpenAI, nodeAnthropicSonnet, nodeAnthropicOpus,
   ) {
      selectByIndex { jokes ->
         // 另一個 LLM (例如 GPT4o) 會找到最有趣的笑話:
         llm.writeSession {
            model = OpenAIModels.Chat.GPT4o
            updatePrompt {
               prompt("best-joke-selector") {
                  system("You are a comedy critic. Give a critique for the given joke.")
                  user(
                     """
                            Here are three jokes about the same topic:

                            ${jokes.mapIndexed { index, joke -> "Joke $index:
$joke" }.joinToString("
\n")}

                            Select the best joke and explain why it's the best.
                            """.trimIndent()
                  )
               }
            }

            val response = requestLLMStructured<JokeRating>()
            val bestJoke = response.getOrNull()!!.structure
            bestJoke.bestJokeIndex
         }
      }
   }

   // 連接節點
   nodeStart then nodeGenerateBestJoke then nodeFinish
}

最佳實踐

  1. 考慮資源限制:在平行執行節點時,請注意資源使用情況,尤其是在同時發出多個 LLM API 呼叫時。

  2. 上下文管理:每個平行執行都會創建一個分叉上下文。合併結果時,請選擇要保留哪個上下文或如何組合來自不同執行的上下文。

  3. 針對您的用例進行最佳化

    • 對於競爭性評估(例如笑話範例),請使用 selectByIndex 來選取最佳結果。
    • 若要尋找最大值,請使用 selectByMax
    • 若要根據條件進行篩選,請使用 selectBy
    • 若要進行聚合,請使用 fold 將所有結果組合成複合輸出。

效能考量

平行執行可以顯著提高吞吐量,但會產生一些額外負荷:

  • 每個平行節點都會創建一個新的協程。
  • 上下文分叉和合併會增加一些計算成本。
  • 許多平行執行可能會發生資源爭用。

為獲得最佳效能,請平行處理以下操作:

  • 彼此獨立
  • 執行時間顯著
  • 不共用可變狀態