文档中心 / SDK 教程 / 进阶篇

存储抽象

默认情况下,Cody 用本地 SQLite(及配套结构)持久化会话、审计与文件历史,适合开发与单机部署。到了生产环境,你往往希望换成 PostgreSQL、托管 KV 或公司自研存储。本篇说明如何通过 Protocol 注入自定义实现,并与无状态模式组合;最后梳理 SDK 的异常体系,做到分层捕获。演示模型与全系列一致:qwen3.5-plus(依赖环境变量,见第 01 篇)。

三大存储 Protocol 与默认实现

核心定义在 cody.core.storage:引擎只认「结构正确」的对象,不必继承任何基类。下表是三类存储的职责对照。

Protocol 默认实现 模块位置 职责
SessionStoreProtocol SessionStore(SQLite) cody/core/session.py 会话创建、消息追加与查询、列表与删除、压缩检查点(compaction)相关消息区间等。
AuditLoggerProtocol AuditLogger cody/core/audit.py 记录工具调用与关键事件,支持按条件查询、计数与清理。
FileHistoryProtocol FileHistory cody/core/file_history.py 记录文件改写、支持撤销/重做栈与变更列表。

另有一个 MemoryStoreProtocol(项目记忆)用于跨会话记忆,在 第 11 篇 已有侧重;本篇聚焦上述三类与 SDK 构建器注入。

完整示例:自定义 SessionStore(模拟 PostgreSQL)

下面是一段可直接运行的骨架:PostgresStyleSessionStore 用内存字典模拟「库表」,你把其中的读写换成 asyncpg / SQL 即可接入真实 PostgreSQL。客户端通过 Cody().session_store(...).build() 注入,模型仍读环境变量(CODY_MODEL=qwen3.5-plus)。

# 完整示例:自定义 SessionStore + 运行一次任务
import asyncio
import uuid
from datetime import datetime, timezone

from cody.core.session import Message, Session
from cody.sdk import Cody


class PostgresStyleSessionStore:
    """模拟 PG:内存实现;生产环境把 _sessions/_rows 换成 SQL。"""

    def __init__(self, dsn: str) -> None:
        self._dsn = dsn
        self._sessions: dict[str, Session] = {}
        # 每条消息带单调整数 id,语义对齐 SQLite messages.id
        self._rows: dict[str, list[tuple[int, Message]]] = {}

    def close(self) -> None:
        pass

    def create_session(
        self, title: str = "New session", model: str = "", workdir: str = ""
    ) -> Session:
        now = datetime.now(timezone.utc).isoformat()
        sid = uuid.uuid4().hex[:12]
        s = Session(
            id=sid,
            title=title,
            messages=[],
            model=model,
            workdir=workdir,
            created_at=now,
            updated_at=now,
        )
        self._sessions[sid] = s
        self._rows[sid] = []
        return s

    def add_message(
        self,
        session_id: str,
        role: str,
        content: str,
        images=None,
    ) -> Message:
        msg = Message(role=role, content=content, images=list(images or []))
        rows = self._rows.setdefault(session_id, [])
        new_id = (max((r[0] for r in rows), default=0) + 1)
        rows.append((new_id, msg))
        s = self._sessions[session_id]
        s.updated_at = datetime.now(timezone.utc).isoformat()
        return msg

    def get_session(self, session_id: str) -> Session | None:
        s = self._sessions.get(session_id)
        if s is None:
            return None
        msgs = [m for _, m in self._rows.get(session_id, [])]
        return Session(
            id=s.id,
            title=s.title,
            messages=msgs,
            model=s.model,
            workdir=s.workdir,
            created_at=s.created_at,
            updated_at=s.updated_at,
            message_count=None,
            compacted_summary=s.compacted_summary,
            compacted_up_to=s.compacted_up_to,
        )

    def list_sessions(self, limit: int = 20) -> list[Session]:
        items = sorted(
            self._sessions.values(),
            key=lambda x: x.updated_at,
            reverse=True,
        )[:limit]
        out = []
        for s in items:
            cnt = len(self._rows.get(s.id, []))
            out.append(
                Session(
                    id=s.id,
                    title=s.title,
                    messages=[],
                    model=s.model,
                    workdir=s.workdir,
                    created_at=s.created_at,
                    updated_at=s.updated_at,
                    message_count=cnt,
                )
            )
        return out

    def delete_session(self, session_id: str) -> bool:
        ok = session_id in self._sessions
        self._sessions.pop(session_id, None)
        self._rows.pop(session_id, None)
        return ok

    def get_latest_session(self, workdir: str | None = None) -> Session | None:
        cand = [
            s
            for s in self._sessions.values()
            if workdir is None or s.workdir == workdir
        ]
        if not cand:
            return None
        latest = max(cand, key=lambda x: x.updated_at)
        return self.get_session(latest.id)

    def get_message_count(self, session_id: str) -> int:
        return len(self._rows.get(session_id, []))

    def update_title(self, session_id: str, title: str) -> None:
        if s := self._sessions.get(session_id):
            s.title = title

    def save_compaction(
        self, session_id: str, summary: str, up_to_message_id: int
    ) -> None:
        if s := self._sessions.get(session_id):
            s.compacted_summary = summary
            s.compacted_up_to = up_to_message_id
            s.updated_at = datetime.now(timezone.utc).isoformat()

    def get_messages_after(
        self, session_id: str, after_id: int
    ) -> list[Message]:
        return [m for mid, m in self._rows.get(session_id, []) if mid > after_id]

    def get_last_message_id(self, session_id: str) -> int | None:
        rows = self._rows.get(session_id, [])
        if not rows:
            return None
        return max(mid for mid, _ in rows)


async def main() -> None:
    store = PostgresStyleSessionStore("postgresql://user:pass@localhost/cody")
    client = (
        Cody()
        .workdir(".")
        .session_store(store)
        .build()
    )
    async with client:
        r = await client.run("用一句话说明当前目录下有哪些文件")
        print(r.output)


if __name__ == "__main__":
    asyncio.run(main())

SessionStoreProtocol 必实现方法一览

下列方法与 cody/core/storage.pySessionStoreProtocol 一致;全部实现后,类型检查与运行期行为才能与默认 SessionStore 对齐(含上下文压缩用到的消息 id)。

方法 说明
close()释放连接或句柄;无持久化时可空实现。
create_session(title, model, workdir)创建会话并返回 Session
add_message(session_id, role, content, images)追加消息,返回 Message
get_session(session_id)按 id 取完整会话(含消息列表);不存在返回 None
list_sessions(limit)最近会话列表,通常不带消息体、可带 message_count
delete_session(session_id)删除成功返回 True
get_latest_session(workdir)可选按工作目录筛选最近更新的会话。
get_message_count(session_id)消息条数。
update_title(session_id, title)更新标题。
save_compaction(session_id, summary, up_to_message_id)写入压缩检查点。
get_messages_after(session_id, after_id)取 id 大于 after_id 的消息。
get_last_message_id(session_id)当前会话最后一条消息 id;无消息返回 None

在构建器上注入

cody.core.storage 导入的是 Protocol 类型,用于标注你自己的类;注入走 Cody 链式 API:

from cody.core.storage import (
    SessionStoreProtocol,
    AuditLoggerProtocol,
    FileHistoryProtocol,
)

client = (
    Cody()
    .session_store(my_session_store)
    .audit_logger(my_audit_logger)
    .file_history(my_file_history)
    .build()
)

可以只替换其中一项:例如仅自定义 session_store,其余仍为默认 SQLite 实现。按需组合,避免「为了换存储而重写全套」。

stateless() 的关系

Cody().stateless().build() 会为未显式指定的存储槽位填入空实现(NullSessionStoreNullAuditLoggerNullFileHistory 等),适合一次性脚本或无需落盘的场景。若在 stateless() 之后再调用 .audit_logger(real_logger),则只有审计会恢复为真实实现——其余仍为 null。

client = Cody().stateless().build()
client = Cody().stateless().audit_logger(real_logger).build()  # 仅覆盖审计

无状态模式下会话不会真正持久化;若你仍注入自定义 session_store,则以你注入的为准(stateless 只处理仍为「未设置」的槽位)。

SDK 错误类型与分级处理

所有具体异常均继承 CodyError,并带有 messagecodedetails 等字段。建议先捕获更具体的子类,最后再用 CodyError 兜底。

类型 典型场景
CodyRateLimitError模型 API 限流;可读 retry_after
CodyToolError工具执行失败;details["tool_name"]
CodyPermissionError工具或动作权限不足。
CodyNotFoundError会话、技能、文件等资源不存在。
CodySessionError会话状态异常(如无效 id)。
CodyModelError模型调用失败(非限流类)。
CodyConfigError配置错误(缺 key、非法项等)。
CodyTimeoutError请求超时。
CodyConnectionError网络或连接问题。
CodyError基类,兜底其他 SDK 错误。
import asyncio
from cody.sdk import (
    CodyError, CodyModelError, CodyToolError, CodyPermissionError,
    CodyNotFoundError, CodyRateLimitError, CodyConfigError,
    CodyTimeoutError, CodyConnectionError, CodySessionError,
)

try:
    result = await client.run("任务")
except CodyRateLimitError as e:
    await asyncio.sleep(e.retry_after)
    result = await client.run("任务")
except CodyToolError as e:
    print(f"工具 {e.details['tool_name']} 失败: {e.message}")
except CodyPermissionError as e:
    print(f"权限拒绝: {e.message}")
except CodyError as e:
    print(f"[{e.code}] {e.message}")

生产代码中应对 retry_afterNone 判断并设默认退避;限流与模型错误也可配合重试策略与日志上报。

系列总结

恭喜你读完 Cody SDK 教程全部 13 篇。你从一次性调用与多轮对话出发,掌握了流式输出、工具与自定义工具、Prompt 与 Skills、MCP、安全与事件、项目记忆、人机协同,直至本篇的存储注入与错误分层——这已经覆盖用 Cody 搭建「自己的 AI Agent」的主干路径。

建议把本系列当作路线图:日常开发先按篇实践,遇到深坑再对照 GitHub 仓库 中的 docs/、源码与测试用例。开源的意义在于可 fork、可改、可回馈;如果你在生产里沉淀了更好的存储实现或排错笔记,也欢迎提 PR 与 Issue,让更多人少走弯路。

保持好奇、小步验证、把抽象(Protocol、事件、存储)落在真实业务里,你就会越来越顺手。下一程,祝你编码愉快。

← 上一篇 人机协同