流式 API
简介
Koog 框架中的流式 API 允许您在大型语言模型 (LLM) 的结构化数据到达时立即处理它们,而无需等待整个响应。 本页解释了如何使用流式 API 来高效处理 Markdown 格式的结构化数据。
流式 API 概述
流式 API 实现了对 LLM 响应中结构化数据的实时处理。您无需等待完整响应,即可:
- 在数据块到达时进行处理
- 即时解析结构化信息
- 在结构化对象完成时发出它们
- 立即处理这些对象(收集或传递给工具)
这种方法特别有用,因为它提供了以下优势:
- 提高用户界面的响应能力
- 高效处理大型响应
- 实现实时数据处理流水线
流式 API 允许将输出解析为 .md 格式的结构化数据,或者解析为一系列纯文本数据块。
使用原始字符串流
值得注意的是,您可以通过直接使用原始字符串流来解析输出。 这种方法使您对解析过程具有更大的灵活性和控制力。
以下是一个带有输出结构 Markdown 定义的原始字符串流:
fun markdownBookDefinition(): MarkdownStructuredDataDefinition {
return MarkdownStructuredDataDefinition("name", schema = { /*...*/ })
}
val mdDefinition = markdownBookDefinition()
llm.writeSession {
val stream = requestLLMStreaming(mdDefinition)
// Access the raw string chunks directly
stream.collect { chunk ->
// Process each chunk of text as it arrives
println("Received chunk: $chunk") // The chunks together will be structured as a text following the mdDefinition schema
}
}
以下是一个没有定义的原始字符串流的示例:
llm.writeSession {
val stream = requestLLMStreaming()
// Access the raw string chunks directly
stream.collect { chunk ->
// Process each chunk of text as it arrives
println("Received chunk: $chunk") // The chunks will not be structured in a specific way
}
}
使用结构化数据流
尽管可以使用原始字符串流,但通常使用结构化数据更方便。
结构化数据方法包括以下关键组件:
- MarkdownStructuredDataDefinition:一个帮助您定义 Markdown 格式结构化数据的 schema 和 examples 的类。
- markdownStreamingParser:一个用于创建解析器(该解析器处理 Markdown 数据块流并发出事件)的函数。
以下部分提供了处理结构化数据流的分步说明和代码示例。
1. 定义你的数据结构
首先,定义一个 data class 来表示您的结构化数据:
@Serializable
data class Book(
val title: String,
val author: String,
val description: String
)
2. 定义 Markdown 结构
使用 MarkdownStructuredDataDefinition
类创建一个定义,用于指定您的数据应如何在 Markdown 中进行结构化:
fun markdownBookDefinition(): MarkdownStructuredDataDefinition {
return MarkdownStructuredDataDefinition("bookList", schema = {
markdown {
header(1, "title")
bulleted {
item("author")
item("description")
}
}
}, examples = {
markdown {
header(1, "The Great Gatsby")
bulleted {
item("F. Scott Fitzgerald")
item("A novel set in the Jazz Age that tells the story of Jay Gatsby's unrequited love for Daisy Buchanan.")
}
}
})
}
3. 为你的数据结构创建解析器
markdownStreamingParser
为不同的 Markdown 元素提供了多个处理程序:
markdownStreamingParser {
// Handle level 1 headings
// The heading level can be from 1 to 6
onHeader(1) { headerText ->
// Process heading text
}
// Handle bullet points
onBullet { bulletText ->
// Process bullet text
}
// Handle code blocks
onCodeBlock { codeBlockContent ->
// Process code block content
}
// Handle lines matching a regex pattern
onLineMatching(Regex("pattern")) { line ->
// Process matching lines
}
// Handle the end of the stream
onFinishStream { remainingText ->
// Process any remaining text or perform cleanup
}
}
使用已定义处理程序,您可以实现一个函数,该函数使用 markdownStreamingParser
函数解析 Markdown 流并发出您的数据对象。
fun parseMarkdownStreamToBooks(markdownStream: Flow<String>): Flow<Book> {
return flow {
markdownStreamingParser {
var currentBookTitle = ""
val bulletPoints = mutableListOf<String>()
// Handle the event of receiving the Markdown header in the response stream
onHeader(1) { headerText ->
// If there was a previous book, emit it
if (currentBookTitle.isNotEmpty() && bulletPoints.isNotEmpty()) {
val author = bulletPoints.getOrNull(0) ?: ""
val description = bulletPoints.getOrNull(1) ?: ""
emit(Book(currentBookTitle, author, description))
}
currentBookTitle = headerText
bulletPoints.clear()
}
// Handle the event of receiving the Markdown bullets list in the response stream
onBullet { bulletText ->
bulletPoints.add(bulletText)
}
// Handle the end of the response stream
onFinishStream {
// Emit the last book, if present
if (currentBookTitle.isNotEmpty() && bulletPoints.isNotEmpty()) {
val author = bulletPoints.getOrNull(0) ?: ""
val description = bulletPoints.getOrNull(1) ?: ""
emit(Book(currentBookTitle, author, description))
}
}
}.parseStream(markdownStream)
}
}
4. 在你的代理策略中使用解析器
val agentStrategy = strategy<String, List<Book>>("library-assistant") {
// Describe the node containing the output stream parsing
val getMdOutput by node<String, List<Book>> { booksDescription ->
val books = mutableListOf<Book>()
val mdDefinition = markdownBookDefinition()
llm.writeSession {
updatePrompt { user(booksDescription) }
// Initiate the response stream in the form of the definition `mdDefinition`
val markdownStream = requestLLMStreaming(mdDefinition)
// Call the parser with the result of the response stream and perform actions with the result
parseMarkdownStreamToBooks(markdownStream).collect { book ->
books.add(book)
println("Parsed Book: ${book.title} by ${book.author}")
}
}
books
}
// Describe the agent's graph making sure the node is accessible
edge(nodeStart forwardTo getMdOutput)
edge(getMdOutput forwardTo nodeFinish)
}
高级用法:使用工具进行流式处理
您还可以将流式 API 与工具结合使用,以在数据到达时进行处理。以下部分简要介绍了如何定义工具并将其与流式数据一起使用。
1. 为你的数据结构定义工具
@Serializable
data class Book(
val title: String,
val author: String,
val description: String
) : ToolArgs
class BookTool(): SimpleTool<Book>() {
companion object {
const val NAME = "book"
}
override suspend fun doExecute(args: Book): String {
println("${args.title} by ${args.author}:
${args.description}")
return "Done"
}
override val argsSerializer: KSerializer<Book>
get() = Book.serializer()
override val descriptor: ToolDescriptor
get() = ToolDescriptor(
name = NAME,
description = "A tool to parse book information from Markdown",
requiredParameters = listOf(),
optionalParameters = listOf()
)
}
2. 将工具与流式数据结合使用
val agentStrategy = strategy<String, Unit>("library-assistant") {
val getMdOutput by node<String, Unit> { input ->
val mdDefinition = markdownBookDefinition()
llm.writeSession {
updatePrompt { user(input) }
val markdownStream = requestLLMStreaming(mdDefinition)
parseMarkdownStreamToBooks(markdownStream).collect { book ->
callToolRaw(BookTool.NAME, book as ToolArgs)
/* Other possible options:
callTool(BookTool::class, book)
callTool<BookTool>(book)
findTool(BookTool::class).execute(book)
*/
}
// We can make parallel tool calls
parseMarkdownStreamToBooks(markdownStream).toParallelToolCallsRaw(toolClass=BookTool::class).collect {
println("Tool call result: $it")
}
}
}
edge(nodeStart forwardTo getMdOutput)
edge(getMdOutput forwardTo nodeFinish)
}
3. 在你的代理配置中注册工具
val toolRegistry = ToolRegistry {
tool(BookTool())
}
val runner = AIAgent(
promptExecutor = simpleOpenAIExecutor(token),
toolRegistry = toolRegistry,
strategy = agentStrategy,
agentConfig = agentConfig
)
最佳实践
定义清晰的结构:为您的数据创建清晰明确的 Markdown 结构。
提供良好的示例:在您的
MarkdownStructuredDataDefinition
中包含全面的示例以指导 LLM。处理不完整数据:在从流中解析数据时,始终检测空值或空数据。
清理资源:使用
onFinishStream
处理程序清理资源并处理任何剩余数据。处理错误:为格式错误的 Markdown 或意外数据实现适当的错误处理。
测试:使用各种输入场景(包括部分数据块和格式错误的输入)测试您的解析器。
并行处理:对于独立的数据项,考虑使用并行工具调用以获得更好的性能。