Kotlinv2.4.0

概览

并行节点执行允许您同时运行多个 AI 智能体节点,从而提高性能并支持复杂的工作流。此功能在以下情况下特别有用:

  • 同时通过不同的模型或方法处理相同的输入
  • 并行执行多个独立的操作
  • 实现竞争性评估模式,即生成多个解决方案然后进行比较

核心组件

Koog 中的并行节点执行由下述方法和数据结构组成。

方法

  • parallel():并行执行多个节点并收集其结果。

数据结构

  • ParallelResult:表示并行节点执行的已完成结果。
  • NodeExecutionResult:包含节点执行的输出和上下文。

基本用法

并行运行节点

要启动节点的并行执行,请按照以下格式使用 parallel 方法:

kotlin
val nodeName by parallel<Input, Output>(
   firstNode, secondNode, thirdNode /* 如果需要,可以添加更多节点 */
) {
   // 此处为合并策略,例如: 
   selectByMax { it.length }
}

以下是一个并行运行三个节点并选择长度最长结果的实际示例:

kotlin
val calc by parallel<String, Int>(
   nodeCalcTokens, nodeCalcSymbols, nodeCalcWords,
) {
   selectByMax { it }
}

上述代码并行运行 nodeCalcTokensnodeCalcSymbolsnodeCalcWords 节点,并返回具有最大值的结果。

合并策略

并行执行节点后,您需要指定如何合并结果。Koog 提供以下合并策略:

  • selectBy():根据谓词函数选择结果。
  • selectByMax():根据比较函数选择具有最大值的结果。
  • selectByIndex():根据选择函数返回的索引选择结果。
  • fold():使用操作函数将结果折叠为单个值。

selectBy

根据谓词函数选择结果:

kotlin
val nodeSelectJoke by parallel<String, String>(
   nodeOpenAI, nodeAnthropicSonnet, nodeAnthropicOpus,
) {
   selectBy { it.contains("programmer") }
}

这将选择第一个包含 "programmer" 单词的笑话。

selectByMax

根据比较函数选择具有最大值的结果:

kotlin
val nodeLongestJoke by parallel<String, String>(
   nodeOpenAI, nodeAnthropicSonnet, nodeAnthropicOpus,
) {
   selectByMax { it.length }
}

这将选择长度最长的笑话。

selectByIndex

根据选择函数返回的索引选择结果:

kotlin
val nodeBestJoke by parallel<String, String>(
   nodeOpenAI, nodeAnthropicSonnet, nodeAnthropicOpus,
) {
   selectByIndex { jokes ->
      // 使用另一个 LLM 来确定最佳笑话
      llm.writeSession {
         model = OpenAIModels.Chat.GPT4o
         appendPrompt {
            system("You are a comedy critic. Select the best joke.")
            user("Here are three jokes: ${jokes.joinToString("
\n")}")
         }
         val response = requestLLMStructured<JokeRating>()
         response.getOrNull()!!.data.bestJokeIndex
      }
   }
}

这将使用另一个 LLM 调用来确定最佳笑话的索引。

fold

使用操作函数将结果折叠为单个值:

kotlin
val nodeAllJokes by parallel<String, String>(
   nodeOpenAI, nodeAnthropicSonnet, nodeAnthropicOpus,
) {
   fold("Jokes:
") { result, joke -> "$result
$joke" }
}

这将把所有笑话组合成一个字符串。

示例:最佳笑话智能体

这里是一个完整的示例,它使用并行执行从不同的 LLM 模型生成笑话并选择最佳笑话:

kotlin
val strategy = strategy("best-joke") {
   // 为不同的 LLM 模型定义节点
   val nodeOpenAI by node<String, String> { topic ->
      llm.writeSession {
         model = OpenAIModels.Chat.GPT4o
         appendPrompt {
            system("You are a comedian. Generate a funny joke about the given topic.")
            user("Tell me a joke about $topic.")
         }
         val response = requestLLMWithoutTools()
         response.parts.filterIsInstance<MessagePart.Text>().joinToString("
") { it.text }
      }
   }

   val nodeAnthropicSonnet by node<String, String> { topic ->
      llm.writeSession {
         model = AnthropicModels.Sonnet_4_5
         appendPrompt {
            system("You are a comedian. Generate a funny joke about the given topic.")
            user("Tell me a joke about $topic.")
         }
         val response = requestLLMWithoutTools()
         response.parts.filterIsInstance<MessagePart.Text>().joinToString("
") { it.text }
      }
   }

   val nodeAnthropicOpus by node<String, String> { topic ->
      llm.writeSession {
         model = AnthropicModels.Opus_4_6
         appendPrompt {
            system("You are a comedian. Generate a funny joke about the given topic.")
            user("Tell me a joke about $topic.")
         }
         val response = requestLLMWithoutTools()
         response.parts.filterIsInstance<MessagePart.Text>().joinToString("
") { it.text }
      }
   }

   // 并行执行笑话生成并选择最佳笑话
   val nodeGenerateBestJoke by parallel(
      nodeOpenAI, nodeAnthropicSonnet, nodeAnthropicOpus,
   ) {
      selectByIndex { jokes ->
         // 另一个 LLM(例如 GPT4o)将找出最有趣的笑话:
         llm.writeSession {
            model = OpenAIModels.Chat.GPT4o
            appendPrompt {
               prompt("best-joke-selector") {
                  system("You are a comedy critic. Give a critique for the given joke.")
                  user(
                     """
                            Here are three jokes about the same topic:

                            ${jokes.mapIndexed { index, joke -> "Joke $index:
$joke" }.joinToString("
\n")}

                            Select the best joke and explain why it's the best.
                            """.trimIndent()
                  )
               }
            }

            val response = requestLLMStructured<JokeRating>()
            val bestJoke = response.getOrNull()!!.data
            bestJoke.bestJokeIndex
         }
      }
   }

   // 连接节点
   nodeStart then nodeGenerateBestJoke then nodeFinish
}

最佳实践

  1. 考虑资源约束:并行执行节点时,请注意资源使用情况,特别是在同时进行多个 LLM API 调用时。

  2. 上下文管理:每次并行执行都会创建一个分叉上下文。合并结果时,选择要保留哪个上下文或如何组合来自不同执行的上下文。

  3. 针对您的用例进行优化

    • 对于竞争性评估(如笑话示例),使用 selectByIndex 选择最佳结果
    • 对于查找最大值,使用 selectByMax
    • 对于根据条件进行过滤,使用 selectBy
    • 对于聚合,使用 fold 将所有结果组合成复合输出

性能注意事项

并行执行可以显著提高吞吐量,但也带来了一些开销:

  • 每个并行节点都会创建一个新的协程
  • 上下文分叉和合并会增加一些计算成本
  • 多个并行执行可能会发生资源竞争

为了获得最佳性能,请对满足以下条件的项进行并行化:

  • 彼此独立
  • 执行时间较长
  • 不共享可变状态