Skip to content

A2A 與 Koog 整合

Koog 提供與 A2A 協定的無縫整合,讓您可以將 Koog agent 公開為 A2A 伺服器,並將 Koog agent 連接至其他符合 A2A 規範的 agent。

相依性

A2A Koog 整合需要根據您的使用案例使用特定的功能模組:

將 Koog agent 公開為 A2A 伺服器

將這些相依性新增至您的 build.gradle.kts

kotlin
dependencies {
    // Koog A2A 伺服器整合功能
    implementation("ai.koog:agents-features-a2a-server:$koogVersion")

    // HTTP JSON-RPC 傳輸
    implementation("ai.koog:a2a-transport-server-jsonrpc-http:$koogVersion")

    // Ktor 伺服器引擎(選擇適合您需求的引擎)
    implementation("io.ktor:ktor-server-netty:$ktorVersion")
}

將 Koog agent 連接至 A2A agent

將這些相依性新增至您的 build.gradle.kts

kotlin
dependencies {
    // Koog A2A 用戶端整合功能
    implementation("ai.koog:agents-features-a2a-client:$koogVersion")

    // HTTP JSON-RPC 傳輸
    implementation("ai.koog:a2a-transport-client-jsonrpc-http:$koogVersion")

    // Ktor 用戶端引擎(選擇適合您需求的引擎)
    implementation("io.ktor:ktor-client-cio:$ktorVersion")
}

總覽

此整合支援兩種主要模式:

  1. 將 Koog agent 公開為 A2A 伺服器 - 讓您的 Koog agent 可以透過 A2A 協定被探索與存取
  2. 將 Koog agent 連接至 A2A agent - 讓您的 Koog agent 能與其他符合 A2A 規範的 agent 進行通訊

將 Koog agent 公開為 A2A 伺服器

定義具備 A2A 功能的 Koog agent

首先讓我們定義一個 Koog agent。agent 的邏輯可以有所不同,以下是一個基本的單次執行且包含工具的 agent 範例。 該 agent 接收來自使用者的訊息,並將其轉發給 LLM。 如果 LLM 的回應包含工具呼叫,agent 會執行該工具並將結果轉發給 LLM。 如果 LLM 的回應包含助理訊息,agent 會將該助理訊息傳送給使用者並結束。

在輸入調整大小時,agent 會向 A2A 用戶端傳送一個任務已提交事件以及輸入訊息。 在每次工具呼叫時,agent 會向 A2A 用戶端傳送一個任務執行中事件,其中包含工具呼叫與結果。 收到助理訊息時,agent 會向 A2A 用戶端傳送一個任務已完成事件以及該助理訊息。

kotlin
/**
 * 建立具備 A2A 功能的 Koog agent
 */
@OptIn(ExperimentalUuidApi::class)
private fun createAgent(
    context: RequestContext<MessageSendParams>,
    eventProcessor: SessionEventProcessor,
) = AIAgent(
    promptExecutor = MultiLLMPromptExecutor(
        LLMProvider.Google to GoogleLLMClient("api-key")
    ),
    toolRegistry = ToolRegistry {
        // 在此宣告工具
    },
    strategy = strategy<A2AMessage, Unit>("test") {
        val nodeSetup by node<A2AMessage, Unit> { inputMessage ->
            // 將 A2A 訊息轉換為 Koog 訊息的便利函式
            val input = inputMessage.toKoogMessage()
            llm.writeSession {
                appendPrompt {
                    message(input)
                }
            }
            // 傳送更新事件給 A2A 用戶端
            withA2AAgentServer {
                sendTaskUpdate("Request submitted: ${input.content}", TaskState.Submitted)
            }
        }

        // 呼叫 LLM
        val nodeLLMRequest by node<Unit, Message> {
            llm.writeSession {
                requestLLM()
            }
        }

        // 執行工具
        val nodeProcessTool by node<MessagePart.Tool.Call, Unit> { toolCall ->
            withA2AAgentServer {
                sendTaskUpdate("Executing tool: ${toolCall.content}", TaskState.Working)
            }

            val toolResult = environment.executeTool(toolCall)

            llm.writeSession {
                appendPrompt {
                    tool {
                        result(toolResult)
                    }
                }
            }
            withA2AAgentServer {
                sendTaskUpdate("Tool result: ${toolResult.content}", TaskState.Working)
            }
        }

        // 傳送助理訊息
        val nodeProcessAssistant by node<String, Unit> { assistantMessage ->
            withA2AAgentServer {
                sendTaskUpdate(assistantMessage, TaskState.Completed)
            }
        }

        edge(nodeStart forwardTo nodeSetup)
        edge(nodeSetup forwardTo nodeLLMRequest)

        // 如果 LLM 傳回工具呼叫,轉發至工具處理節點,然後回到 LLM
        edge(nodeLLMRequest forwardTo nodeProcessTool onToolCall { true })
        edge(nodeProcessTool forwardTo nodeLLMRequest)

        // 如果 LLM 傳回助理訊息,轉發至助理處理節點,然後結束
        edge(nodeLLMRequest forwardTo nodeProcessAssistant onAssistantMessage { true })
        edge(nodeProcessAssistant forwardTo nodeFinish)
    },
    agentConfig = AIAgentConfig(
        prompt = prompt("agent") { system("You are a helpful assistant.") },
        model = GoogleModels.Gemini2_5Pro,
        maxAgentIterations = 10
    ),
) {
    install(A2AAgentServer) {
        this.context = context
        this.eventProcessor = eventProcessor
    }
}

/**
 * 將任務更新事件傳送至 A2A 用戶端的便利函式
 * @param content 訊息內容
 * @param state 任務狀態
 */
@OptIn(ExperimentalUuidApi::class)
private suspend fun A2AAgentServer.sendTaskUpdate(
    content: String,
    state: TaskState,
) {
    val message = A2AMessage(
        messageId = Uuid.random().toString(),
        role = Role.Agent,
        parts = listOf(
            TextPart(content)
        ),
        contextId = context.contextId,
        taskId = context.taskId,
    )

    val task = Task(
        id = context.taskId,
        contextId = context.contextId,
        status = TaskStatus(
            state = state,
            message = message,
            timestamp = Clock.System.now(),
        )
    )
    eventProcessor.sendTaskEvent(task)
}

A2AAgentServer 功能機制

A2AAgentServer 是一個 Koog agent 功能,可實現 Koog agent 與 A2A 協定之間的無縫整合。 A2AAgentServer 功能提供了對 RequestContextSessionEventProcessor 實體的存取,這些實體用於在 Koog agent 內部與 A2A 用戶端進行通訊。

要安裝此功能,請在 agent 上呼叫 install 函式,並傳入 A2AAgentServer 功能以及 RequestContextSessionEventProcessor

kotlin
// 安裝功能
install(A2AAgentServer) {
    this.context = context
    this.eventProcessor = eventProcessor
}

為了從 Koog agent 策略中存取這些實體,該功能提供了一個 withA2AAgentServer 函式,允許 agent 節點在執行上下文中存取 A2A 伺服器功能。 它會檢索已安裝的 A2AAgentServer 功能,並將其作為操作區塊的接收者。

kotlin
// 在 agent 節點內使用
withA2AAgentServer {
    // 此處的 'this' 即為 A2AAgentServer 執行個體
    eventProcessor.sendTaskUpdate("Processing your request...", TaskState.Working)
}

啟動 A2A 伺服器

執行伺服器後,Koog agent 將可以透過 A2A 協定被探索與存取。

kotlin
val agentCard = AgentCard(
    name = "Koog Agent",
    url = "http://localhost:9999/koog",
    description = "Simple universal agent powered by Koog",
    version = "1.0.0",
    protocolVersion = "0.3.0",
    preferredTransport = TransportProtocol.JSONRPC,
    capabilities = AgentCapabilities(streaming = true),
    defaultInputModes = listOf("text"),
    defaultOutputModes = listOf("text"),
    skills = listOf(
        AgentSkill(
            id = "koog",
            name = "Koog Agent",
            description = "Universal agent powered by Koog. Supports tool calling.",
            tags = listOf("chat", "tool"),
        )
    )
)
// 伺服器設定
val server = A2AServer(agentExecutor = KoogAgentExecutor(), agentCard = agentCard)
val transport = HttpJSONRPCServerTransport(server)
transport.start(engineFactory = Netty, port = 8080, path = "/chat", wait = true)

將 Koog agent 連接至 A2A agent

建立 A2A 用戶端並連接至 A2A 伺服器

kotlin
val transport = HttpJSONRPCClientTransport(url = "http://localhost:9999/koog")
val agentCardResolver =
    UrlAgentCardResolver(baseUrl = "http://localhost:9999", path = "/koog")
val client = A2AClient(transport = transport, agentCardResolver = agentCardResolver)

val agentId = "koog"
client.connect()

建立 Koog Agent 並將 A2A 用戶端新增至 A2AAgentClient 功能

若要從您的 Koog agent 連接至 A2A agent,您可以使用 A2AAgentClient 功能,它提供了連接至 A2A agent 的用戶端 API。 用戶端的原理與伺服器相同:您安裝該功能,並傳入 A2AAgentClient 功能以及 RequestContextSessionEventProcessor

kotlin
val agent = AIAgent(
    promptExecutor = MultiLLMPromptExecutor(
        LLMProvider.Google to GoogleLLMClient("api-key")
    ),
    toolRegistry = ToolRegistry {
        // 在此宣告工具
    },
    strategy = strategy<String, Unit>("test") {

        val nodeCheckStreaming by nodeA2AClientGetAgentCard().transform { it.capabilities.streaming }

        val nodeA2ASendMessageStreaming by nodeA2AClientSendMessageStreaming()
        val nodeA2ASendMessage by nodeA2AClientSendMessage()

        val nodeProcessStreaming by node<Flow<Response<Event>>, Unit> {
            it.collect { response ->
                when (response.data) {
                    is Task -> {
                        // 處理任務
                    }

                    is A2AMessage -> {
                        // 處理訊息
                    }

                    is TaskStatusUpdateEvent -> {
                        // 處理任務狀態更新
                    }

                    is TaskArtifactUpdateEvent -> {
                        // 處理任務產物更新
                    }
                }
            }
        }

        val nodeProcessEvent by node<CommunicationEvent, Unit> { event ->
            when (event) {
                is Task -> {
                    // 處理任務
                }

                is A2AMessage -> {
                    // 處理訊息
                }
            }
        }

        // 如果支援串流,傳送訊息,處理回應並結束
        edge(nodeStart forwardTo nodeCheckStreaming transformed { agentId })
        edge(
            nodeCheckStreaming forwardTo nodeA2ASendMessageStreaming
                onCondition { it == true } transformed { buildA2ARequest(agentId) }
        )
        edge(nodeA2ASendMessageStreaming forwardTo nodeProcessStreaming)
        edge(nodeProcessStreaming forwardTo nodeFinish)

        // 如果不支援串流,傳送訊息,處理回應並結束
        edge(
            nodeCheckStreaming forwardTo nodeA2ASendMessage
                onCondition { it == false } transformed { buildA2ARequest(agentId) }
        )
        edge(nodeA2ASendMessage forwardTo nodeProcessEvent)
        edge(nodeProcessEvent forwardTo nodeFinish)

        // 如果無法取得 agent,則結束並列印錯誤
        edge(nodeCheckStreaming forwardTo nodeFinish onCondition { it == null }
            transformed { println("Failed to get agents card") }
        )

    },
    agentConfig = AIAgentConfig(
        prompt = prompt("agent") { system("You are a helpful assistant.") },
        model = GoogleModels.Gemini2_5Pro,
        maxAgentIterations = 10
    ),
) {
    install(A2AAgentClient) {
        this.a2aClients = mapOf(agentId to client)
    }
}

@OptIn(ExperimentalUuidApi::class)
private fun AIAgentGraphContextBase.buildA2ARequest(agentId: String): A2AClientRequest<MessageSendParams> =
    A2AClientRequest(
        agentId = agentId,
        callContext = ClientCallContext.Default,
        params = MessageSendParams(
            message = A2AMessage(
                messageId = Uuid.random().toString(),
                role = Role.User,
                parts = listOf(
                    TextPart(agentInput as String)
                )
            )
        )
    )