ストリーミングAPI
はじめに
KoogフレームワークのストリーミングAPIを使用すると、大規模言語モデル(LLM)からの構造化データを、応答全体を待つことなく、到着と同時に処理できます。 このページでは、ストリーミングAPIを使用してMarkdown形式の構造化データを効率的に処理する方法について説明します。
ストリーミングAPIの概要
ストリーミングAPIは、LLMの応答から構造化データをリアルタイムで処理することを可能にします。応答全体を待つ代わりに、以下のことができます。
- データがチャンクで到着するたびに処理する
- 構造化情報をその場でパースする
- 構造化されたオブジェクトが完成したら出力する
- これらのオブジェクトを即座に処理する(収集またはツールに渡す)
このアプローチは、特に以下の利点を提供するため、非常に有用です。
- ユーザーインターフェースの応答性を向上させる
- 大規模な応答を効率的に処理する
- リアルタイムデータ処理パイプラインを実装する
ストリーミングAPIでは、出力を.md形式の構造化データとして、またはプレーンテキストのチャンクのセットとしてパースできます。
生の文字列ストリームを操作する
生の文字列ストリームを直接操作して出力をパースできることに注意することが重要です。 このアプローチにより、パース処理に対する柔軟性と制御が向上します。
以下は、出力構造のMarkdown定義を含む生の文字列ストリームの例です。
fun markdownBookDefinition(): MarkdownStructuredDataDefinition {
return MarkdownStructuredDataDefinition("name", schema = { /*...*/ })
}
val mdDefinition = markdownBookDefinition()
llm.writeSession {
val stream = requestLLMStreaming(mdDefinition)
// Access the raw string chunks directly
stream.collect { chunk ->
// Process each chunk of text as it arrives
println("Received chunk: $chunk") // The chunks together will be structured as a text following the mdDefinition schema
}
}
以下は、定義なしの生の文字列ストリームの例です。
llm.writeSession {
val stream = requestLLMStreaming()
// Access the raw string chunks directly
stream.collect { chunk ->
// Process each chunk of text as it arrives
println("Received chunk: $chunk") // The chunks will not be structured in a specific way
}
}
構造化データのストリームを操作する
生の文字列ストリームを操作することも可能ですが、 構造化データを操作する方がより便利な場合が多くあります。
構造化データのアプローチには、以下の主要なコンポーネントが含まれます。
- MarkdownStructuredDataDefinition: Markdown形式で構造化データのスキーマと例を定義するのに役立つクラス。
- markdownStreamingParser: Markdownチャンクのストリームを処理し、イベントを出力するパーサーを作成する関数。
以下のセクションでは、構造化データのストリームを処理することに関連する段階的な手順とコードサンプルを提供します。
1. データ構造を定義する
まず、構造化データを表すデータクラスを定義します。
@Serializable
data class Book(
val title: String,
val author: String,
val description: String
)
2. Markdown構造を定義する
MarkdownStructuredDataDefinition
クラスを使用して、Markdownでデータがどのように構造化されるべきかを指定する定義を作成します。
fun markdownBookDefinition(): MarkdownStructuredDataDefinition {
return MarkdownStructuredDataDefinition("bookList", schema = {
markdown {
header(1, "title")
bulleted {
item("author")
item("description")
}
}
}, examples = {
markdown {
header(1, "The Great Gatsby")
bulleted {
item("F. Scott Fitzgerald")
item("A novel set in the Jazz Age that tells the story of Jay Gatsby's unrequited love for Daisy Buchanan.")
}
}
})
}
3. データ構造のパーサーを作成する
markdownStreamingParser
は、さまざまなMarkdown要素に対応する複数のハンドラを提供します。
markdownStreamingParser {
// Handle level 1 headings
// The heading level can be from 1 to 6
onHeader(1) { headerText ->
// Process heading text
}
// Handle bullet points
onBullet { bulletText ->
// Process bullet text
}
// Handle code blocks
onCodeBlock { codeBlockContent ->
// Process code block content
}
// Handle lines matching a regex pattern
onLineMatching(Regex("pattern")) { line ->
// Process matching lines
}
// Handle the end of the stream
onFinishStream { remainingText ->
// Process any remaining text or perform cleanup
}
}
定義されたハンドラを使用して、markdownStreamingParser
関数でMarkdownストリームをパースし、データオブジェクトを出力する関数を実装できます。
fun parseMarkdownStreamToBooks(markdownStream: Flow<String>): Flow<Book> {
return flow {
markdownStreamingParser {
var currentBookTitle = ""
val bulletPoints = mutableListOf<String>()
// Handle the event of receiving the Markdown header in the response stream
onHeader(1) { headerText ->
// If there was a previous book, emit it
if (currentBookTitle.isNotEmpty() && bulletPoints.isNotEmpty()) {
val author = bulletPoints.getOrNull(0) ?: ""
val description = bulletPoints.getOrNull(1) ?: ""
emit(Book(currentBookTitle, author, description))
}
currentBookTitle = headerText
bulletPoints.clear()
}
// Handle the event of receiving the Markdown bullets list in the response stream
onBullet { bulletText ->
bulletPoints.add(bulletText)
}
// Handle the end of the response stream
onFinishStream {
// Emit the last book, if present
if (currentBookTitle.isNotEmpty() && bulletPoints.isNotEmpty()) {
val author = bulletPoints.getOrNull(0) ?: ""
val description = bulletPoints.getOrNull(1) ?: ""
emit(Book(currentBookTitle, author, description))
}
}
}.parseStream(markdownStream)
}
}
4. エージェント戦略でパーサーを使用する
val agentStrategy = strategy<String, List<Book>>("library-assistant") {
// Describe the node containing the output stream parsing
val getMdOutput by node<String, List<Book>> { booksDescription ->
val books = mutableListOf<Book>()
val mdDefinition = markdownBookDefinition()
llm.writeSession {
updatePrompt { user(booksDescription) }
// Initiate the response stream in the form of the definition `mdDefinition`
val markdownStream = requestLLMStreaming(mdDefinition)
// Call the parser with the result of the response stream and perform actions with the result
parseMarkdownStreamToBooks(markdownStream).collect { book ->
books.add(book)
println("Parsed Book: ${book.title} by ${book.author}")
}
}
books
}
// Describe the agent's graph making sure the node is accessible
edge(nodeStart forwardTo getMdOutput)
edge(getMdOutput forwardTo nodeFinish)
}
高度な使用法:ツールとのストリーミング
ストリーミングAPIをツールと組み合わせて使用し、データが到着と同時に処理することもできます。以下のセクションでは、ツールを定義し、ストリーミングデータで使用する方法について簡単な段階的ガイドを提供します。
1. データ構造のツールを定義する
@Serializable
data class Book(
val title: String,
val author: String,
val description: String
) : ToolArgs
class BookTool(): SimpleTool<Book>() {
companion object {
const val NAME = "book"
}
override suspend fun doExecute(args: Book): String {
println("${args.title} by ${args.author}:
${args.description}")
return "Done"
}
override val argsSerializer: KSerializer<Book>
get() = Book.serializer()
override val descriptor: ToolDescriptor
get() = ToolDescriptor(
name = NAME,
description = "A tool to parse book information from Markdown",
requiredParameters = listOf(),
optionalParameters = listOf()
)
}
2. ストリーミングデータでツールを使用する
val agentStrategy = strategy<String, Unit>("library-assistant") {
val getMdOutput by node<String, Unit> { input ->
val mdDefinition = markdownBookDefinition()
llm.writeSession {
updatePrompt { user(input) }
val markdownStream = requestLLMStreaming(mdDefinition)
parseMarkdownStreamToBooks(markdownStream).collect { book ->
callToolRaw(BookTool.NAME, book as ToolArgs)
/* Other possible options:
callTool(BookTool::class, book)
callTool<BookTool>(book)
findTool(BookTool::class).execute(book)
*/
}
// We can make parallel tool calls
parseMarkdownStreamToBooks(markdownStream).toParallelToolCallsRaw(toolClass=BookTool::class).collect {
println("Tool call result: $it")
}
}
}
edge(nodeStart forwardTo getMdOutput)
edge(getMdOutput forwardTo nodeFinish)
}
3. エージェント構成でツールを登録する
val toolRegistry = ToolRegistry {
tool(BookTool())
}
val runner = AIAgent(
promptExecutor = simpleOpenAIExecutor(token),
toolRegistry = toolRegistry,
strategy = agentStrategy,
agentConfig = agentConfig
)
ベストプラクティス
明確な構造を定義する: データのための明確で曖昧さのないMarkdown構造を作成します。
良い例を提供する:
MarkdownStructuredDataDefinition
に包括的な例を含めて、LLMをガイドします。不完全なデータを処理する: ストリームからデータをパースする際は、常にnullまたは空の値をチェックします。
リソースをクリーンアップする:
onFinishStream
ハンドラを使用して、リソースをクリーンアップし、残りのデータを処理します。エラーを処理する: 不正な形式のMarkdownや予期せぬデータに対して、適切なエラー処理を実装します。
テスト: パーサーを、部分的なチャンクや不正な形式の入力を含む、さまざまな入力シナリオでテストします。
並列処理: 独立したデータ項目については、パフォーマンス向上のために並列ツール呼び出しの使用を検討します。