Appearance
SDK 与 IDE 集成 — 代码走读
Codex 提供两套 SDK:TypeScript SDK 和 Python SDK。它们的架构有本质区别——TypeScript SDK 使用 codex exec --experimental-json 的单向流模式,Python SDK 使用 codex app-server --listen stdio:// 的双向 JSON-RPC 模式。
sdk/typescript/src/codex.ts — TypeScript SDK 入口
Codex 类是轻量门面,创建单个 CodexExec 实例(只解析一次二进制路径),Thread 对象共享该实例:
typescript
export class Codex {
private exec: CodexExec;
private options: CodexOptions;
constructor(options: CodexOptions = {}) {
this.exec = new CodexExec(codexPathOverride, env, config);
}
startThread(options: ThreadOptions = {}): Thread {
return new Thread(this.exec, this.options, options);
}
resumeThread(id: string, options: ThreadOptions = {}): Thread {
return new Thread(this.exec, this.options, options, id);
}
}sdk/typescript/src/exec.ts — CodexExec 子进程管理
二进制路径解析
findCodexPath() 通过以下步骤定位 codex 二进制:
- 根据
process.platform+process.arch计算 Rust 风格的 target triple(如aarch64-apple-darwin) - 查找平台特定的 npm 包(如
@openai/codex-darwin-arm64) - 沿
require.resolve链找到 vendor 目录 - 检查两种布局:
vendor/<triple>/bin/codex(新)和vendor/<triple>/codex/codex(旧)
run() — 异步生成器
CodexExec.run() 是一个 async generator,启动子进程并逐行产出 JSON Lines:
typescript
async *run(args: CodexExecArgs): AsyncGenerator<string> {
const commandArgs = ["exec", "--experimental-json"];
// 构建 model, sandbox, cwd, config overrides 等参数
if (args.threadId) {
commandArgs.push("resume", args.threadId);
}
const child = spawn(this.executablePath, commandArgs, { env, signal: args.signal });
child.stdin.write(args.input); // 写入提示
child.stdin.end(); // 立即关闭 stdin
const rl = readline.createInterface({ input: child.stdout, crlfDelay: Infinity });
for await (const line of rl) {
yield line as string; // 产出原始 JSON Lines
}
}关键差异:立即关闭 stdin(stdin.end())——这是 fire-and-forget 模式,不是双向通信。每行 stdout 是一个 JSON 事件。
配置覆盖序列化
嵌套配置对象被展平为点分 --config key=value 对:
typescript
flattenConfigOverrides(configOverrides, "", overrides);
// { sandbox_workspace_write: { network_access: true } }
// → ["--config", "sandbox_workspace_write.network_access=true"]sdk/typescript/src/thread.ts — Thread 生命周期
Thread 类包装 CodexExec 的异步生成器,提供两种消费模式:
typescript
export class Thread {
private _exec: CodexExec;
private _id: string | null;
// 流式模式:逐事件产出
async runStreamed(input: Input, turnOptions = {}): Promise<StreamedTurn> {
return { events: this.runStreamedInternal(input, turnOptions) };
}
// 一次性模式:等待全部完成后返回
async run(input: Input, turnOptions = {}): Promise<Turn> {
const generator = this.runStreamedInternal(input, turnOptions);
const items: ThreadItem[] = [];
for await (const event of generator) {
if (event.type === "item.completed") items.push(event.item);
else if (event.type === "turn.completed") usage = event.usage;
else if (event.type === "turn.failed") throw new Error(event.error.message);
}
return { items, finalResponse, usage };
}
private async *runStreamedInternal(input, turnOptions): AsyncGenerator<ThreadEvent> {
const generator = this._exec.run({ input, threadId: this._id, ... });
for await (const item of generator) {
const parsed = JSON.parse(item) as ThreadEvent;
if (parsed.type === "thread.started") this._id = parsed.thread_id;
yield parsed;
}
}
}事件生命周期:thread.started → turn.started → [item.started → item.updated → item.completed]* → turn.completed
输入归一化:接受纯字符串或类型化项目数组({type: "text", text: "..."} 或 {type: "local_image", path: "..."}),文本部分用 \n\n 连接。
sdk/python/src/openai_codex/client.py — Python SDK 核心
CodexConfig 配置
python
@dataclass(slots=True)
class CodexConfig:
codex_bin: str | None = None # 自定义二进制路径
launch_args_override: tuple[str, ...] | None = None
config_overrides: tuple[str, ...] = ()
cwd: str | None = None
env: dict[str, str] | None = None
client_name: str = "codex_python_sdk"
client_version: str = SDK_VERSION
experimental_api: bool = True当 codex_bin 为 None 时,通过 _installed_codex_path() 解析,该方法从 openai-codex-cli-bin 伴随包获取固定运行时路径。
CodexClient 构造与传输层
python
class CodexClient:
def __init__(self, config=None, approval_handler=None):
self.config = config or CodexConfig()
self._approval_handler = approval_handler or self._default_approval_handler
self._proc: subprocess.Popen | None = None
self._lock = threading.Lock()
self._router = MessageRouter()
self._stderr_lines: deque[str] = deque(maxlen=400)start() 方法启动 codex app-server --listen stdio:// 作为子进程:
python
def start(self) -> None:
codex_bin = _resolve_codex_bin(self.config)
args = [str(codex_bin), "app-server", "--listen", "stdio://"]
self._proc = subprocess.Popen(
args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, encoding="utf-8", cwd=self.config.cwd, bufsize=1,
)
self._start_reader_thread() # 读取 stdout JSON-RPC
self._start_stderr_drain_thread()JSON-RPC 协议实现
Python SDK 实现了完整的 JSON-RPC 2.0 多路复用器:
python
def _request_raw(self, method, params=None) -> JsonValue:
request_id = str(uuid.uuid4())
waiter = self._router.create_response_waiter(request_id)
message = {"id": request_id, "method": method}
if params is not None:
message["params"] = params
self._write_message(message)
item = waiter.get() # 阻塞直到 reader 线程路由响应
return item
def notify(self, method, params=None):
# 无 id,不期待响应
message = {"method": method}
if params is not None:
message["params"] = params
self._write_message(message)写入路径通过线程锁串行化:self._lock + json.dumps(payload) + "\n" 写入 stdin。
Reader 线程 — 消息分类
python
def _reader_loop(self) -> None:
while True:
msg = self._read_message()
if "method" in msg and "id" in msg:
# 服务器发起的请求(如审批提示)
response = self._handle_server_request(msg)
self._write_message({"id": msg["id"], "result": response})
elif "method" in msg:
# 服务器通知 → 路由到 turn/login/global 队列
self._router.route_notification(...)
else:
# 客户端请求的响应 → 路由到等待的请求
self._router.route_response(msg)MessageRouter — 扇出引擎
MessageRouter 解决"单个 stdout 流、多个消费者"问题:
- Response waiters:一次性
maxsize=1队列,按 request ID 索引 - Turn notification queues:按
turn_id索引,用于流式 turn 事件 - Login notification queues:按
login_id索引,用于交互式登录 - Pending deques:缓冲在队列注册前到达的通知(防止竞态)
- Global queue:兜底队列,处理不属于特定 turn/login 的通知
Thread 生命周期方法
python
def thread_start(self, params=None) -> ThreadStartResponse:
return self.request("thread/start", _params_dict(params))
def turn_start(self, thread_id, input_items, params=None) -> TurnStartResponse:
payload = {"threadId": thread_id, "input": self._normalize_input_items(input_items)}
started = self.request("turn/start", payload)
self.register_turn_notifications(started.turn.id) # 立即注册,防止丢失事件
return started
def stream_text(self, thread_id, text, params=None) -> Iterator:
started = self.turn_start(thread_id, text, params)
while True:
notification = self.next_turn_notification(started.turn.id)
if notification.method == "item/agentMessage/delta":
yield notification.payload
elif notification.method == "turn/completed":
break审批处理与错误层级
默认审批处理器自动接受所有操作:
python
def _default_approval_handler(self, method, params):
if method == "item/commandExecution/requestApproval":
return {"decision": "accept"}
return {}错误层级遵循 JSON-RPC 标准:
CodexError (基类)
+-- JsonRpcError(code, message, data)
| +-- ParseError (-32700)
| +-- InvalidRequestError (-32600)
| +-- MethodNotFoundError (-32601)
| +-- InvalidParamsError (-32602)
| +-- InternalRpcError (-32603)
| +-- ServerBusyError (-32099..-32000)
+-- TransportClosedError重试机制使用指数退避 + 抖动,仅在 ServerBusyError 时触发:
python
def retry_on_overload(op, max_attempts=3, initial_delay_s=0.25, max_delay_s=2.0):
delay = initial_delay_s
while True:
try: return op()
except Exception as exc:
if not is_retryable_error(exc): raise
sleep(min(max_delay_s, delay) + random.uniform(-jitter, jitter))
delay = min(max_delay_s, delay * 2)架构对比
| 方面 | Python SDK | TypeScript SDK |
|---|---|---|
| 协议 | JSON-RPC over stdio(app-server) | JSON Lines over stdout(exec) |
| 进程模式 | 长驻服务器进程,多请求复用连接 | Fire-and-forget,每次 turn 新进程 |
| 传输方向 | 双向 stdin/stdout | 单向 stdout 流 |
| 并发模型 | threading.Thread + queue.Queue | AsyncGenerator + Node.js readline |
| 通知路由 | MessageRouter 多队列扇出 | 无需路由,单流 per turn |
| 审批处理 | 回调式,服务器推送审批请求 | 不适用(CLI exec 模式) |
| 二进制解析 | openai-codex-cli-bin 伴随包 | 平台 npm 包(@openai/codex-darwin-arm64) |
关键函数索引
| 函数/模块 | 文件路径 | 说明 |
|---|---|---|
Codex class | sdk/typescript/src/codex.ts | TypeScript SDK 入口门面 |
CodexExec.run() | sdk/typescript/src/exec.ts | 子进程 async generator |
findCodexPath() | sdk/typescript/src/exec.ts | 平台二进制路径解析 |
flattenConfigOverrides() | sdk/typescript/src/exec.ts | 嵌套配置展平为点分键 |
Thread.runStreamed() | sdk/typescript/src/thread.ts | 流式事件消费 |
Thread.run() | sdk/typescript/src/thread.ts | 一次性等待完成 |
CodexClient | sdk/python/.../client.py | Python SDK JSON-RPC 引擎 |
CodexClient.start() | sdk/python/.../client.py | 启动 app-server 子进程 |
_request_raw() | sdk/python/.../client.py | JSON-RPC 请求发送 |
_reader_loop() | sdk/python/.../client.py | 消息分类多路复用 |
MessageRouter | sdk/python/.../_message_router.py | 扇出路由引擎 |
thread_start() | sdk/python/.../client.py | 启动新线程 |
turn_start() | sdk/python/.../client.py | 启动新 turn + 注册通知 |
stream_text() | sdk/python/.../client.py | 流式文本迭代器 |
retry_on_overload() | sdk/python/.../retry.py | 指数退避重试 |