存储抽象
默认情况下,Cody 用本地 SQLite(及配套结构)持久化会话、审计与文件历史,适合开发与单机部署。到了生产环境,你往往希望换成 PostgreSQL、托管 KV 或公司自研存储。本篇说明如何通过 Protocol 注入自定义实现,并与无状态模式组合;最后梳理 SDK 的异常体系,做到分层捕获。演示模型与全系列一致:qwen3.5-plus(依赖环境变量,见第 01 篇)。
三大存储 Protocol 与默认实现
核心定义在 cody.core.storage:引擎只认「结构正确」的对象,不必继承任何基类。下表是三类存储的职责对照。
| Protocol | 默认实现 | 模块位置 | 职责 |
|---|---|---|---|
SessionStoreProtocol |
SessionStore(SQLite) |
cody/core/session.py |
会话创建、消息追加与查询、列表与删除、压缩检查点(compaction)相关消息区间等。 |
AuditLoggerProtocol |
AuditLogger |
cody/core/audit.py |
记录工具调用与关键事件,支持按条件查询、计数与清理。 |
FileHistoryProtocol |
FileHistory |
cody/core/file_history.py |
记录文件改写、支持撤销/重做栈与变更列表。 |
另有一个 MemoryStoreProtocol(项目记忆)用于跨会话记忆,在 第 11 篇 已有侧重;本篇聚焦上述三类与 SDK 构建器注入。
完整示例:自定义 SessionStore(模拟 PostgreSQL)
下面是一段可直接运行的骨架:PostgresStyleSessionStore 用内存字典模拟「库表」,你把其中的读写换成 asyncpg / SQL 即可接入真实 PostgreSQL。客户端通过 Cody().session_store(...).build() 注入,模型仍读环境变量(CODY_MODEL=qwen3.5-plus)。
# 完整示例:自定义 SessionStore + 运行一次任务 import asyncio import uuid from datetime import datetime, timezone from cody.core.session import Message, Session from cody.sdk import Cody class PostgresStyleSessionStore: """模拟 PG:内存实现;生产环境把 _sessions/_rows 换成 SQL。""" def __init__(self, dsn: str) -> None: self._dsn = dsn self._sessions: dict[str, Session] = {} # 每条消息带单调整数 id,语义对齐 SQLite messages.id self._rows: dict[str, list[tuple[int, Message]]] = {} def close(self) -> None: pass def create_session( self, title: str = "New session", model: str = "", workdir: str = "" ) -> Session: now = datetime.now(timezone.utc).isoformat() sid = uuid.uuid4().hex[:12] s = Session( id=sid, title=title, messages=[], model=model, workdir=workdir, created_at=now, updated_at=now, ) self._sessions[sid] = s self._rows[sid] = [] return s def add_message( self, session_id: str, role: str, content: str, images=None, ) -> Message: msg = Message(role=role, content=content, images=list(images or [])) rows = self._rows.setdefault(session_id, []) new_id = (max((r[0] for r in rows), default=0) + 1) rows.append((new_id, msg)) s = self._sessions[session_id] s.updated_at = datetime.now(timezone.utc).isoformat() return msg def get_session(self, session_id: str) -> Session | None: s = self._sessions.get(session_id) if s is None: return None msgs = [m for _, m in self._rows.get(session_id, [])] return Session( id=s.id, title=s.title, messages=msgs, model=s.model, workdir=s.workdir, created_at=s.created_at, updated_at=s.updated_at, message_count=None, compacted_summary=s.compacted_summary, compacted_up_to=s.compacted_up_to, ) def list_sessions(self, limit: int = 20) -> list[Session]: items = sorted( self._sessions.values(), key=lambda x: x.updated_at, reverse=True, )[:limit] out = [] for s in items: cnt = len(self._rows.get(s.id, [])) out.append( Session( id=s.id, title=s.title, messages=[], model=s.model, workdir=s.workdir, created_at=s.created_at, updated_at=s.updated_at, message_count=cnt, ) ) return out def delete_session(self, session_id: str) -> bool: ok = session_id in self._sessions self._sessions.pop(session_id, None) self._rows.pop(session_id, None) return ok def get_latest_session(self, workdir: str | None = None) -> Session | None: cand = [ s for s in self._sessions.values() if workdir is None or s.workdir == workdir ] if not cand: return None latest = max(cand, key=lambda x: x.updated_at) return self.get_session(latest.id) def get_message_count(self, session_id: str) -> int: return len(self._rows.get(session_id, [])) def update_title(self, session_id: str, title: str) -> None: if s := self._sessions.get(session_id): s.title = title def save_compaction( self, session_id: str, summary: str, up_to_message_id: int ) -> None: if s := self._sessions.get(session_id): s.compacted_summary = summary s.compacted_up_to = up_to_message_id s.updated_at = datetime.now(timezone.utc).isoformat() def get_messages_after( self, session_id: str, after_id: int ) -> list[Message]: return [m for mid, m in self._rows.get(session_id, []) if mid > after_id] def get_last_message_id(self, session_id: str) -> int | None: rows = self._rows.get(session_id, []) if not rows: return None return max(mid for mid, _ in rows) async def main() -> None: store = PostgresStyleSessionStore("postgresql://user:pass@localhost/cody") client = ( Cody() .workdir(".") .session_store(store) .build() ) async with client: r = await client.run("用一句话说明当前目录下有哪些文件") print(r.output) if __name__ == "__main__": asyncio.run(main())
SessionStoreProtocol 必实现方法一览
下列方法与 cody/core/storage.py 中 SessionStoreProtocol 一致;全部实现后,类型检查与运行期行为才能与默认 SessionStore 对齐(含上下文压缩用到的消息 id)。
| 方法 | 说明 |
|---|---|
close() | 释放连接或句柄;无持久化时可空实现。 |
create_session(title, model, workdir) | 创建会话并返回 Session。 |
add_message(session_id, role, content, images) | 追加消息,返回 Message。 |
get_session(session_id) | 按 id 取完整会话(含消息列表);不存在返回 None。 |
list_sessions(limit) | 最近会话列表,通常不带消息体、可带 message_count。 |
delete_session(session_id) | 删除成功返回 True。 |
get_latest_session(workdir) | 可选按工作目录筛选最近更新的会话。 |
get_message_count(session_id) | 消息条数。 |
update_title(session_id, title) | 更新标题。 |
save_compaction(session_id, summary, up_to_message_id) | 写入压缩检查点。 |
get_messages_after(session_id, after_id) | 取 id 大于 after_id 的消息。 |
get_last_message_id(session_id) | 当前会话最后一条消息 id;无消息返回 None。 |
在构建器上注入
从 cody.core.storage 导入的是 Protocol 类型,用于标注你自己的类;注入走 Cody 链式 API:
from cody.core.storage import ( SessionStoreProtocol, AuditLoggerProtocol, FileHistoryProtocol, ) client = ( Cody() .session_store(my_session_store) .audit_logger(my_audit_logger) .file_history(my_file_history) .build() )
可以只替换其中一项:例如仅自定义 session_store,其余仍为默认 SQLite 实现。按需组合,避免「为了换存储而重写全套」。
与 stateless() 的关系
Cody().stateless().build() 会为未显式指定的存储槽位填入空实现(NullSessionStore、NullAuditLogger、NullFileHistory 等),适合一次性脚本或无需落盘的场景。若在 stateless() 之后再调用 .audit_logger(real_logger),则只有审计会恢复为真实实现——其余仍为 null。
client = Cody().stateless().build()
client = Cody().stateless().audit_logger(real_logger).build() # 仅覆盖审计
无状态模式下会话不会真正持久化;若你仍注入自定义 session_store,则以你注入的为准(stateless 只处理仍为「未设置」的槽位)。
SDK 错误类型与分级处理
所有具体异常均继承 CodyError,并带有 message、code、details 等字段。建议先捕获更具体的子类,最后再用 CodyError 兜底。
| 类型 | 典型场景 |
|---|---|
CodyRateLimitError | 模型 API 限流;可读 retry_after。 |
CodyToolError | 工具执行失败;details["tool_name"]。 |
CodyPermissionError | 工具或动作权限不足。 |
CodyNotFoundError | 会话、技能、文件等资源不存在。 |
CodySessionError | 会话状态异常(如无效 id)。 |
CodyModelError | 模型调用失败(非限流类)。 |
CodyConfigError | 配置错误(缺 key、非法项等)。 |
CodyTimeoutError | 请求超时。 |
CodyConnectionError | 网络或连接问题。 |
CodyError | 基类,兜底其他 SDK 错误。 |
import asyncio from cody.sdk import ( CodyError, CodyModelError, CodyToolError, CodyPermissionError, CodyNotFoundError, CodyRateLimitError, CodyConfigError, CodyTimeoutError, CodyConnectionError, CodySessionError, ) try: result = await client.run("任务") except CodyRateLimitError as e: await asyncio.sleep(e.retry_after) result = await client.run("任务") except CodyToolError as e: print(f"工具 {e.details['tool_name']} 失败: {e.message}") except CodyPermissionError as e: print(f"权限拒绝: {e.message}") except CodyError as e: print(f"[{e.code}] {e.message}")
生产代码中应对 retry_after 做 None 判断并设默认退避;限流与模型错误也可配合重试策略与日志上报。
系列总结
恭喜你读完 Cody SDK 教程全部 13 篇。你从一次性调用与多轮对话出发,掌握了流式输出、工具与自定义工具、Prompt 与 Skills、MCP、安全与事件、项目记忆、人机协同,直至本篇的存储注入与错误分层——这已经覆盖用 Cody 搭建「自己的 AI Agent」的主干路径。
建议把本系列当作路线图:日常开发先按篇实践,遇到深坑再对照 GitHub 仓库 中的 docs/、源码与测试用例。开源的意义在于可 fork、可改、可回馈;如果你在生产里沉淀了更好的存储实现或排错笔记,也欢迎提 PR 与 Issue,让更多人少走弯路。
保持好奇、小步验证、把抽象(Protocol、事件、存储)落在真实业务里,你就会越来越顺手。下一程,祝你编码愉快。