A2A サーバー
--8<-- "versioning-snippets.md:beta"
A2A サーバーを使用すると、標準化された A2A (Agent-to-Agent) プロトコルを介して AI エージェントを公開できます。これは A2A プロトコル仕様 の完全な実装を提供し、クライアントリクエストの処理、エージェントロジックの実行、複雑なタスクライフサイクルの管理、およびリアルタイムのストリーミングレスポンスをサポートします。
依存関係
プロジェクトで A2A サーバーを使用するには、以下の依存関係を build.gradle.kts に追加します。
dependencies {
// A2A サーバーのコアライブラリ
implementation("ai.koog:a2a-server:$koogVersion")
// HTTP JSON-RPC トランスポート(最も一般的)
implementation("ai.koog:a2a-transport-server-jsonrpc-http:$koogVersion")
// Ktor サーバーエンジン(ニーズに合わせて選択)
implementation("io.ktor:ktor-server-netty:$ktorVersion")
}概要
A2A サーバーは、A2A プロトコルのトランスポート層とカスタムエージェントロジックの間の架け橋として機能します。 プロトコルへの準拠を維持し、堅牢なセッション管理を提供しながら、リクエストライフサイクル全体をオーケストレーションします。
主要コンポーネント
A2AServer
完全な A2A プロトコルを実装するメインのサーバークラスです。以下の役割を果たす中心的なコーディネーターとして機能します。
- プロトコル仕様に照らした受信リクエストの検証
- 並列セッションとタスクライフサイクルの管理
- トランスポート、ストレージ、ビジネスロジック層間の通信のオーケストレーション
- すべてのプロトコル操作(メッセージ送信、タスク照会、キャンセル、プッシュ通知)の処理
A2AServer は 2 つの必須パラメータを受け取ります。
AgentExecutor: エージェントのビジネスロジックの実装を定義します。AgentCard: エージェントの機能とメタデータを定義します。
また、ストレージやトランスポートの動作をカスタマイズするために使用できる、いくつかのオプションパラメータもあります。
AgentExecutor
AgentExecutor インターフェースは、エージェントのコアビジネスロジックを実装する場所です。 これは、A2A プロトコルと特定のエージェント機能の間の架け橋として機能します。 エージェントの実行を開始するには、エージェントのロジックを定義する execute メソッドを実装する必要があります。 エージェントをキャンセルするには、cancel メソッドを実装する必要があります。
class MyAgentExecutor : AgentExecutor {
override suspend fun execute(
context: RequestContext<MessageSendParams>,
eventProcessor: SessionEventProcessor
) {
// ここにエージェントのロジックを記述
}
override suspend fun cancel(
context: RequestContext<TaskIdParams>,
eventProcessor: SessionEventProcessor,
agentJob: Deferred<Unit>?
) {
// エージェントのキャンセル処理(任意)
}
}RequestContext は、現在のセッションの contextId や taskId、送信された message、リクエストの params など、現在のリクエストに関する豊富な情報を提供します。
SessionEventProcessor はクライアントと通信します。
sendMessage(message): 即時レスポンス(チャット形式のインタラクション)を送信します。sendTaskEvent(event): タスク関連の更新(長時間実行される操作)を送信します。
// 即時レスポンス(チャットボットなど)の場合
eventProcessor.sendMessage(
Message(
messageId = generateId(),
role = Role.Agent,
parts = listOf(TextPart("こちらが回答です!")),
contextId = context.contextId
)
)
// タスクベースの操作の場合
eventProcessor.sendTaskEvent(
TaskStatusUpdateEvent(
contextId = context.contextId,
taskId = context.taskId,
status = TaskStatus(
state = TaskState.Working,
message = Message(/* 進捗の更新 */),
timestamp = Clock.System.now()
),
final = false // さらなる更新が続く場合
)
)AgentCard
AgentCard は、エージェントの自己記述的なマニフェストとして機能します。エージェントができること、通信方法、およびセキュリティ要件をクライアントに伝えます。
val agentCard = AgentCard(
// 基本的なアイデンティティ
name = "高度なレシピ・アシスタント",
description = "料理のアドバイス、レシピ生成、食事計画に特化した AI エージェント",
version = "2.1.0",
protocolVersion = "0.3.0",
// 通信設定
url = "https://api.example.com/a2a",
preferredTransport = TransportProtocol.JSONRPC,
// 任意:複数のトランスポートのサポート
additionalInterfaces = listOf(
AgentInterface("https://api.example.com/a2a", TransportProtocol.JSONRPC),
),
// 機能の宣言
capabilities = AgentCapabilities(
streaming = true, // リアルタイムレスポンスをサポート
pushNotifications = true, // 非同期通知を送信
stateTransitionHistory = true // タスク履歴を維持
),
// コンテンツタイプのサポート
defaultInputModes = listOf("text/plain", "text/markdown", "image/jpeg"),
defaultOutputModes = listOf("text/plain", "text/markdown", "application/json"),
// 利用可能なセキュリティスキームの定義
securitySchemes = mapOf(
"bearer" to HTTPAuthSecurityScheme(
scheme = "Bearer",
bearerFormat = "JWT",
description = "JWT トークン認証"
),
"api-key" to APIKeySecurityScheme(
`in` = In.Header,
name = "X-API-Key",
description = "サービス認証用 API キー"
)
),
// セキュリティ要件の指定(要件の論理和)
security = listOf(
mapOf("bearer" to listOf("read", "write")), // オプション 1: read/write スコープを持つ JWT
mapOf("api-key" to emptyList()) // オプション 2: API キー
),
// 認証済みユーザー向けの拡張カードを有効にする
supportsAuthenticatedExtendedCard = true,
// スキル/機能
skills = listOf(
AgentSkill(
id = "recipe-generation",
name = "レシピ生成",
description = "材料、食事制限、好みに基づいてカスタムレシピを生成します",
tags = listOf("料理", "レシピ", "栄養"),
examples = listOf(
"キノコを使ったヴィーガンパスタのレシピを作って",
"鶏肉、ご飯、野菜があります。何が作れますか?"
)
),
AgentSkill(
id = "meal-planning",
name = "食事計画",
description = "週間の食事を計画し、買い物リストを生成します",
tags = listOf("食事計画", "栄養", "買い物")
)
),
// 任意:ブランディング
iconUrl = "https://example.com/agent-icon.png",
documentationUrl = "https://docs.example.com/recipe-agent",
provider = AgentProvider(
organization = "CookingAI Inc.",
url = "https://cookingai.com"
)
)トランスポート層
A2A 自体は、クライアントとの通信のために複数のトランスポートプロトコルをサポートしています。 現在、Koog は HTTP 経由の JSON-RPC サーバー・トランスポートの実装を提供しています。
HTTP JSON-RPC トランスポート
val transport = HttpJSONRPCServerTransport(server)
transport.start(
engineFactory = CIO, // Ktor エンジン (CIO, Netty, Jetty)
port = 8080, // サーバーポート
path = "/a2a", // API エンドポイントパス
wait = true // サーバーが停止するまでブロック
)ストレージ
A2A サーバーは、異なるタイプのデータを分離するプラグイン可能なストレージアーキテクチャを使用しています。 すべてのストレージ実装はオプションであり、開発用にはデフォルトでインメモリ版が使用されます。
- TaskStorage: タスクライフサイクル管理 - タスクの状態、履歴、およびアーティファクトを保存・管理します。
- MessageStorage: 会話履歴 - 会話コンテキスト内のメッセージ履歴を管理します。
- PushNotificationConfigStorage: Webhook 管理 - 非同期通知用の Webhook 設定を管理します。
クイックスタート
1. AgentCard の作成
エージェントの機能とメタデータを定義します。
val agentCard = AgentCard(
name = "IO アシスタント",
description = "入力の修正に特化した AI エージェント",
version = "2.1.0",
protocolVersion = "0.3.0",
// 通信設定
url = "https://api.example.com/a2a",
preferredTransport = TransportProtocol.JSONRPC,
// 機能の宣言
capabilities =
AgentCapabilities(
streaming = true, // リアルタイムレスポンスをサポート
pushNotifications = true, // 非同期通知を送信
stateTransitionHistory = true // タスク履歴を維持
),
// コンテンツタイプのサポート
defaultInputModes = listOf("text/plain", "text/markdown", "image/jpeg"),
defaultOutputModes = listOf("text/plain", "text/markdown", "application/json"),
// スキル/機能
skills = listOf(
AgentSkill(
id = "echo",
name = "echo",
description = "ユーザーメッセージをそのまま返します",
tags = listOf("io"),
)
)
)2. AgentExecutor の作成
Executor でエージェントのロジックを実装し、受信リクエストの処理とレスポンスの送信を行います。
class EchoAgentExecutor : AgentExecutor {
override suspend fun execute(
context: RequestContext<MessageSendParams>,
eventProcessor: SessionEventProcessor
) {
val userMessage = context.params.message
val userText = userMessage.parts
.filterIsInstance<TextPart>()
.joinToString(" ") { it.text }
// ユーザーのメッセージをそのまま返す
val response = Message(
messageId = UUID.randomUUID().toString(),
role = Role.Agent,
parts = listOf(TextPart("あなたが言ったこと: $userText")),
contextId = context.contextId,
taskId = context.taskId
)
eventProcessor.sendMessage(response)
}
}3. サーバーの作成
エージェント Executor とエージェント Card をサーバーに渡します。
val server = A2AServer(
agentExecutor = EchoAgentExecutor(),
agentCard = agentCard
)4. トランスポート層の追加
トランスポート層を作成し、サーバーを開始します。
// HTTP JSON-RPC トランスポート
val transport = HttpJSONRPCServerTransport(server)
transport.start(
engineFactory = CIO,
port = 8080,
path = "/agent",
wait = true
)エージェントの実装パターン
単純なレスポンスエージェント
エージェントが単一のメッセージに応答するだけでよい場合は、単純なエージェントとして実装できます。 これは、エージェントの実行ロジックが複雑でなく、時間がかからない場合にも使用できます。
class SimpleAgentExecutor : AgentExecutor {
override suspend fun execute(
context: RequestContext<MessageSendParams>,
eventProcessor: SessionEventProcessor
) {
val response = Message(
messageId = UUID.randomUUID().toString(),
role = Role.Agent,
parts = listOf(TextPart("エージェントからの挨拶です!")),
contextId = context.contextId,
taskId = context.taskId
)
eventProcessor.sendMessage(response)
}
}タスクベースのエージェント
エージェントの実行ロジックが複雑で、複数のステップを必要とする場合は、タスクベースのエージェントとして実装できます。 また、エージェントの実行ロジックに時間がかかり、中断(サスペンド)が必要な場合にも使用できます。
class TaskAgentExecutor : AgentExecutor {
override suspend fun execute(
context: RequestContext<MessageSendParams>,
eventProcessor: SessionEventProcessor
) {
// 実行中のステータスを送信
eventProcessor.sendTaskEvent(
TaskStatusUpdateEvent(
contextId = context.contextId,
taskId = context.taskId,
status = TaskStatus(
state = TaskState.Working,
timestamp = Clock.System.now()
),
final = false
)
)
// 処理を実行...
// 完了を送信
eventProcessor.sendTaskEvent(
TaskStatusUpdateEvent(
contextId = context.contextId,
taskId = context.taskId,
status = TaskStatus(
state = TaskState.Completed,
timestamp = Clock.System.now()
),
final = true
)
)
}
}