文档中心 / SDK 教程 / 基础篇

流式输出全解

Agent 任务往往较长:模型要逐段推理、还可能穿插工具调用。若只用一次性 run(),用户要等到整轮结束才能看到输出;流式则把过程拆成一条条 StreamChunk,你可以实时刷新终端、Web UI 或日志,并在界面上展示「正在调用哪个工具」——可读性与可调试性都会好得多。

本篇假设你已按 第 01 篇 配置好环境变量(模型 qwen3.5-plusCODY_MODEL_BASE_URL=https://coding.dashscope.aliyuncs.com/v1 等)。下面先给可运行的最小示例,再逐项说明字段与 12 种事件类型。

最基础的流式示例

下面用 AsyncCodyClient(workdir="."),不显式传模型,依赖环境变量。循环里我们关心两类分片:正文增量 text_delta,以及结束时的 done(可读取 usage)。

import asyncio
from cody.sdk import AsyncCodyClient


async def main() -> None:
    # 模型、API Key、Base URL 来自环境变量(见第 01 篇)
    client = AsyncCodyClient(workdir=".")
    async for chunk in client.stream("用一句话说明 Cody 是什么框架"):
        if chunk.type == "text_delta":
            print(chunk.content, end="", flush=True)
        elif chunk.type == "done" and chunk.usage:
            print()
            print(
                f"完成:input={chunk.usage.input_tokens} "
                f"output={chunk.usage.output_tokens} total={chunk.usage.total_tokens}"
            )


if __name__ == "__main__":
    asyncio.run(main())

流中第一个有意义的分片通常是 session_start(携带 session_id),便于与 多轮对话 衔接;上面示例未打印它,但你在产品代码里应保存该 id。

StreamChunk 常用字段

StreamChunk 是基类,各事件类型会填充其中一部分字段。除下表外,compact / prune 等还会带上 original_messagescompacted_messagesestimated_tokens_saved 等(见后文分支示例)。

字段 类型 说明
type str 事件类型,用于 if chunk.type == "..." 分支
content str 文本增量、工具结果正文、熔断说明、交互提示文案等,依类型而定
session_id str | None 当前会话 id
tool_name str | None tool_call / tool_result 时的工具名
args dict | None tool_call 时的参数
tool_call_id str | None 配对同一次调用的 tool_calltool_result
usage Usage | None done 时 token 用量(input_tokens / output_tokens / total_tokens
request_id str | None interaction_request 时的人机交互请求 id
interaction_kind str | None "question" / "confirm" / "feedback"
options list[str] | None 交互场景下的可选答案列表
retry_attempt int | None retry 时:刚失败的是第几次尝试(1-based)
retry_max_attempts int | None retry 时:最多尝试次数

12 种流式事件类型

下面按 chunk.type 的值列出教程约定掌握的 12 类(与 SDK 分片一一对应)。此外,实际流中还可能遇到 prune(工具输出裁剪)、user_input_received(人机流程中的用户输入确认)等,处理方式相同:按 type 增加分支即可。

type 含义(简述)
session_start 始终靠前出现,携带 session_id,标志本轮流开始
text_delta 模型面向用户的正文增量
thinking 思考模式下的推理文本增量(需 Builder 开启 thinking
tool_call 模型发起工具调用:tool_nameargstool_call_id
tool_result 工具执行结果:content 为结果文本,并带 tool_call_idtool_name
done 本轮正常结束,带 usagecontent 为汇总输出
cancelled 已通过 cancel_event 取消
compact 上下文压缩事件,可观测节省了多少消息/ token
retry LLM 调用失败且将重试前发出;应清空 UI 中未确认的流式缓冲
circuit_breaker 熔断触发(token/费用/循环检测等),content 含原因摘要
interaction_request 需要人工输入:request_idinteraction_kindoptions
unknown(类名 UnknownChunk 未知或未映射的事件类型,兜底分支

完整分支:一次处理所有常见事件

产品级代码建议为每种 type 写清分支:这样日志、UI 状态机、指标上报都能对齐。下面示例演示参数形态(含 stream()session_idinclude_tools / exclude_tools 占位)。

import asyncio
from cody.sdk import AsyncCodyClient


async def main() -> None:
    client = AsyncCodyClient(workdir=".")
    async for chunk in client.stream(
        "列出当前目录下的 Python 文件并统计数量",
        session_id=None,          # 多轮时传入已有 id
        cancel_event=None,      # 需要取消时传入 asyncio.Event
        include_tools=None,   # 非 None 时仅允许列表内工具名
        exclude_tools=None,   # 非 None 时排除列表内工具
    ):
        if chunk.type == "session_start":
            print(f"[会话] {chunk.session_id}")
        elif chunk.type == "text_delta":
            print(chunk.content, end="", flush=True)
        elif chunk.type == "thinking":
            print(f"\n[思考] {chunk.content}", end="", flush=True)
        elif chunk.type == "tool_call":
            print(f"\n[调用] {chunk.tool_name} id={chunk.tool_call_id} args={chunk.args}")
        elif chunk.type == "tool_result":
            print(f"\n[结果] {chunk.tool_name} id={chunk.tool_call_id} -> {chunk.content[:200]}...")
        elif chunk.type == "done":
            print("\n[完成]")
            if chunk.usage:
                print(f"usage: {chunk.usage}")
        elif chunk.type == "cancelled":
            print("\n[已取消]")
            break
        elif chunk.type == "compact":
            print(
                f"\n[压缩] {chunk.original_messages}->{chunk.compacted_messages} "
                f"saved≈{chunk.estimated_tokens_saved}"
            )
        elif chunk.type == "retry":
            print(
                f"\n[重试] attempt={chunk.retry_attempt}/"
                f"{chunk.retry_max_attempts} err={chunk.content!r}"
            )
        elif chunk.type == "circuit_breaker":
            print(f"\n[熔断] {chunk.content}")
        elif chunk.type == "interaction_request":
            print(
                f"\n[交互] kind={chunk.interaction_kind} id={chunk.request_id} "
                f"options={chunk.options}\n{chunk.content}"
            )
        elif chunk.type == "unknown":
            print(f"\n[未知分片] {chunk!r}")
        else:
            # prune、user_input_received 等
            print(f"\n[其他:{chunk.type}] {chunk.content[:120]!r}")


if __name__ == "__main__":
    asyncio.run(main())

工具调用事件:tool_calltool_result

Agent 执行真实任务时,模型会先发出 tool_call(要调什么、参数是什么),运行器执行后再来 tool_result(结果字符串在 content)。两者通过相同的 tool_call_id 配对,UI 上可以用 id 把「进行中的调用」与「返回结果」连成一条时间线。

调试建议:对 tool_resultcontent 做长度截断后再上屏,避免一次打印数万字 JSON;完整内容可写入日志文件。

流式取消:cancel_event

从 v1.10.3 起,stream(..., cancel_event=asyncio.Event()) 可在另一协程里 cancel.set(),运行器会尽快结束并发出 cancelled 分片。消费侧在收到 cancelled 后应 break,避免继续处理旧缓冲。

import asyncio
from cody.sdk import AsyncCodyClient


async def main() -> None:
    client = AsyncCodyClient(workdir=".")
    cancel = asyncio.Event()

    async def stop_soon() -> None:
        await asyncio.sleep(2.0)
        cancel.set()  # 模拟用户点击「停止」

    stop_task = asyncio.create_task(stop_soon())
    try:
        async for chunk in client.stream("写一首很长的叙事诗", cancel_event=cancel):
            if chunk.type == "text_delta":
                print(chunk.content, end="", flush=True)
            elif chunk.type == "cancelled":
                print("\n[流已取消]")
                break
    finally:
        stop_task.cancel()


if __name__ == "__main__":
    asyncio.run(main())

思考模式与 thinking 分片

部分模型支持「思考」内容:与面向用户的 text_delta 分离。通过 Builder .thinking(True, budget=...) 开启后,流中会出现 chunk.type == "thinking"content 为思考文本增量。下面示例同时演示如何显式指定 qwen3.5-plus 与 DashScope Coding 端点(也可继续只用环境变量)。

import asyncio
from cody.sdk import Cody


async def main() -> None:
    client = (
        Cody()
        .workdir(".")
        .model("qwen3.5-plus")
        .base_url("https://coding.dashscope.aliyuncs.com/v1")
        .api_key("sk-xxx")  # 建议使用环境变量,勿提交密钥
        .thinking(True, budget=10000)
        .build()
    )
    async for chunk in client.stream("简要分析采用异步 I/O 的利弊"):
        if chunk.type == "thinking":
            print(f"[思考] {chunk.content}", end="", flush=True)
        elif chunk.type == "text_delta":
            print(chunk.content, end="", flush=True)


if __name__ == "__main__":
    asyncio.run(main())

retry 事件

当底层 LLM 请求失败且错误属于可重试类型(如限流、5xx)时,运行器在退避等待之前会发出 retry 分片。字段含义与 Core 层 RetryEvent 一致:retry_attempt 为刚失败的尝试序号(从 1 开始),retry_max_attempts 为计划中的最多尝试次数,content 为错误信息字符串。

收到 retry 后,建议在客户端丢弃本轮已缓冲的未完成文本/思考/工具状态,避免重试成功后与旧片段拼接错位。

stream()run_stream()

二者完全等价run_stream 仅是 stream 的别名,便于与文档或其它示例命名统一。任选其一即可。

# 以下两种写法相同
async for chunk in client.stream("任务"):
    ...
async for chunk in client.run_stream("任务"):
    ...

小结

流式 API 的核心是 async for chunk in client.stream(...):用 chunk.type 驱动状态机,用 text_delta / thinking 做实时输出,用 tool_calltool_result 展示 Agent 行为,用 doneusage,用 cancel_eventcancelled 做可中断任务。下一篇 工具直接调用 会进一步聚焦如何在代码里精确调用内置工具、与流式事件对照阅读。

← 上一篇 多轮对话