流式输出全解
Agent 任务往往较长:模型要逐段推理、还可能穿插工具调用。若只用一次性 run(),用户要等到整轮结束才能看到输出;流式则把过程拆成一条条 StreamChunk,你可以实时刷新终端、Web UI 或日志,并在界面上展示「正在调用哪个工具」——可读性与可调试性都会好得多。
本篇假设你已按 第 01 篇 配置好环境变量(模型 qwen3.5-plus、CODY_MODEL_BASE_URL=https://coding.dashscope.aliyuncs.com/v1 等)。下面先给可运行的最小示例,再逐项说明字段与 12 种事件类型。
最基础的流式示例
下面用 AsyncCodyClient(workdir="."),不显式传模型,依赖环境变量。循环里我们关心两类分片:正文增量 text_delta,以及结束时的 done(可读取 usage)。
import asyncio from cody.sdk import AsyncCodyClient async def main() -> None: # 模型、API Key、Base URL 来自环境变量(见第 01 篇) client = AsyncCodyClient(workdir=".") async for chunk in client.stream("用一句话说明 Cody 是什么框架"): if chunk.type == "text_delta": print(chunk.content, end="", flush=True) elif chunk.type == "done" and chunk.usage: print() print( f"完成:input={chunk.usage.input_tokens} " f"output={chunk.usage.output_tokens} total={chunk.usage.total_tokens}" ) if __name__ == "__main__": asyncio.run(main())
流中第一个有意义的分片通常是 session_start(携带 session_id),便于与 多轮对话 衔接;上面示例未打印它,但你在产品代码里应保存该 id。
StreamChunk 常用字段
StreamChunk 是基类,各事件类型会填充其中一部分字段。除下表外,compact / prune 等还会带上 original_messages、compacted_messages、estimated_tokens_saved 等(见后文分支示例)。
| 字段 | 类型 | 说明 |
|---|---|---|
type |
str |
事件类型,用于 if chunk.type == "..." 分支 |
content |
str |
文本增量、工具结果正文、熔断说明、交互提示文案等,依类型而定 |
session_id |
str | None |
当前会话 id |
tool_name |
str | None |
tool_call / tool_result 时的工具名 |
args |
dict | None |
tool_call 时的参数 |
tool_call_id |
str | None |
配对同一次调用的 tool_call 与 tool_result |
usage |
Usage | None |
done 时 token 用量(input_tokens / output_tokens / total_tokens) |
request_id |
str | None |
interaction_request 时的人机交互请求 id |
interaction_kind |
str | None |
如 "question" / "confirm" / "feedback" |
options |
list[str] | None |
交互场景下的可选答案列表 |
retry_attempt |
int | None |
retry 时:刚失败的是第几次尝试(1-based) |
retry_max_attempts |
int | None |
retry 时:最多尝试次数 |
12 种流式事件类型
下面按 chunk.type 的值列出教程约定掌握的 12 类(与 SDK 分片一一对应)。此外,实际流中还可能遇到 prune(工具输出裁剪)、user_input_received(人机流程中的用户输入确认)等,处理方式相同:按 type 增加分支即可。
type |
含义(简述) |
|---|---|
session_start |
始终靠前出现,携带 session_id,标志本轮流开始 |
text_delta |
模型面向用户的正文增量 |
thinking |
思考模式下的推理文本增量(需 Builder 开启 thinking) |
tool_call |
模型发起工具调用:tool_name、args、tool_call_id |
tool_result |
工具执行结果:content 为结果文本,并带 tool_call_id 与 tool_name |
done |
本轮正常结束,带 usage,content 为汇总输出 |
cancelled |
已通过 cancel_event 取消 |
compact |
上下文压缩事件,可观测节省了多少消息/ token |
retry |
LLM 调用失败且将重试前发出;应清空 UI 中未确认的流式缓冲 |
circuit_breaker |
熔断触发(token/费用/循环检测等),content 含原因摘要 |
interaction_request |
需要人工输入:request_id、interaction_kind、options |
unknown(类名 UnknownChunk) |
未知或未映射的事件类型,兜底分支 |
完整分支:一次处理所有常见事件
产品级代码建议为每种 type 写清分支:这样日志、UI 状态机、指标上报都能对齐。下面示例演示参数形态(含 stream() 的 session_id、include_tools / exclude_tools 占位)。
import asyncio from cody.sdk import AsyncCodyClient async def main() -> None: client = AsyncCodyClient(workdir=".") async for chunk in client.stream( "列出当前目录下的 Python 文件并统计数量", session_id=None, # 多轮时传入已有 id cancel_event=None, # 需要取消时传入 asyncio.Event include_tools=None, # 非 None 时仅允许列表内工具名 exclude_tools=None, # 非 None 时排除列表内工具 ): if chunk.type == "session_start": print(f"[会话] {chunk.session_id}") elif chunk.type == "text_delta": print(chunk.content, end="", flush=True) elif chunk.type == "thinking": print(f"\n[思考] {chunk.content}", end="", flush=True) elif chunk.type == "tool_call": print(f"\n[调用] {chunk.tool_name} id={chunk.tool_call_id} args={chunk.args}") elif chunk.type == "tool_result": print(f"\n[结果] {chunk.tool_name} id={chunk.tool_call_id} -> {chunk.content[:200]}...") elif chunk.type == "done": print("\n[完成]") if chunk.usage: print(f"usage: {chunk.usage}") elif chunk.type == "cancelled": print("\n[已取消]") break elif chunk.type == "compact": print( f"\n[压缩] {chunk.original_messages}->{chunk.compacted_messages} " f"saved≈{chunk.estimated_tokens_saved}" ) elif chunk.type == "retry": print( f"\n[重试] attempt={chunk.retry_attempt}/" f"{chunk.retry_max_attempts} err={chunk.content!r}" ) elif chunk.type == "circuit_breaker": print(f"\n[熔断] {chunk.content}") elif chunk.type == "interaction_request": print( f"\n[交互] kind={chunk.interaction_kind} id={chunk.request_id} " f"options={chunk.options}\n{chunk.content}" ) elif chunk.type == "unknown": print(f"\n[未知分片] {chunk!r}") else: # prune、user_input_received 等 print(f"\n[其他:{chunk.type}] {chunk.content[:120]!r}") if __name__ == "__main__": asyncio.run(main())
工具调用事件:tool_call 与 tool_result
Agent 执行真实任务时,模型会先发出 tool_call(要调什么、参数是什么),运行器执行后再来 tool_result(结果字符串在 content)。两者通过相同的 tool_call_id 配对,UI 上可以用 id 把「进行中的调用」与「返回结果」连成一条时间线。
调试建议:对 tool_result 的 content 做长度截断后再上屏,避免一次打印数万字 JSON;完整内容可写入日志文件。
流式取消:cancel_event
从 v1.10.3 起,stream(..., cancel_event=asyncio.Event()) 可在另一协程里 cancel.set(),运行器会尽快结束并发出 cancelled 分片。消费侧在收到 cancelled 后应 break,避免继续处理旧缓冲。
import asyncio from cody.sdk import AsyncCodyClient async def main() -> None: client = AsyncCodyClient(workdir=".") cancel = asyncio.Event() async def stop_soon() -> None: await asyncio.sleep(2.0) cancel.set() # 模拟用户点击「停止」 stop_task = asyncio.create_task(stop_soon()) try: async for chunk in client.stream("写一首很长的叙事诗", cancel_event=cancel): if chunk.type == "text_delta": print(chunk.content, end="", flush=True) elif chunk.type == "cancelled": print("\n[流已取消]") break finally: stop_task.cancel() if __name__ == "__main__": asyncio.run(main())
思考模式与 thinking 分片
部分模型支持「思考」内容:与面向用户的 text_delta 分离。通过 Builder .thinking(True, budget=...) 开启后,流中会出现 chunk.type == "thinking",content 为思考文本增量。下面示例同时演示如何显式指定 qwen3.5-plus 与 DashScope Coding 端点(也可继续只用环境变量)。
import asyncio from cody.sdk import Cody async def main() -> None: client = ( Cody() .workdir(".") .model("qwen3.5-plus") .base_url("https://coding.dashscope.aliyuncs.com/v1") .api_key("sk-xxx") # 建议使用环境变量,勿提交密钥 .thinking(True, budget=10000) .build() ) async for chunk in client.stream("简要分析采用异步 I/O 的利弊"): if chunk.type == "thinking": print(f"[思考] {chunk.content}", end="", flush=True) elif chunk.type == "text_delta": print(chunk.content, end="", flush=True) if __name__ == "__main__": asyncio.run(main())
retry 事件
当底层 LLM 请求失败且错误属于可重试类型(如限流、5xx)时,运行器在退避等待之前会发出 retry 分片。字段含义与 Core 层 RetryEvent 一致:retry_attempt 为刚失败的尝试序号(从 1 开始),retry_max_attempts 为计划中的最多尝试次数,content 为错误信息字符串。
收到 retry 后,建议在客户端丢弃本轮已缓冲的未完成文本/思考/工具状态,避免重试成功后与旧片段拼接错位。
stream() 与 run_stream()
二者完全等价:run_stream 仅是 stream 的别名,便于与文档或其它示例命名统一。任选其一即可。
# 以下两种写法相同 async for chunk in client.stream("任务"): ... async for chunk in client.run_stream("任务"): ...
小结
流式 API 的核心是 async for chunk in client.stream(...):用 chunk.type 驱动状态机,用 text_delta / thinking 做实时输出,用 tool_call 与 tool_result 展示 Agent 行为,用 done 收 usage,用 cancel_event 与 cancelled 做可中断任务。下一篇 工具直接调用 会进一步聚焦如何在代码里精确调用内置工具、与流式事件对照阅读。