Appearance
MCP 协议 — 代码走读
MCP(Model Context Protocol)实现跨越两个 Rust crate:rmcp-client 是底层协议客户端,codex-mcp 是高层编排层,聚合多个 MCP 服务器并管理工具发现、调用路由和交互策略。
codex-rs/rmcp-client/src/rmcp_client.rs — 底层 MCP 客户端
RmcpClient 结构体
rust
pub struct RmcpClient {
state: Mutex<ClientState>, // 状态机:Connecting → Ready → Closed
stdio_process: Option<StdioServerProcessHandle>,
transport_recipe: TransportRecipe, // 可克隆的传输层"配方",用于会话恢复
initialize_context: Mutex<Option<InitializeContext>>,
session_recovery_lock: Semaphore, // 确保同时只有一个恢复尝试
elicitation_pause_state: ElicitationPauseState, // 暂停超时计时的 watch 通道
}传输层抽象
三种传输变体,通过 TransportRecipe(配方)和 PendingTransport(具体实例)分离:
rust
enum TransportRecipe {
InProcess { factory: Arc<dyn InProcessTransportFactory> },
Stdio { command: StdioServerCommand, launcher: Arc<dyn StdioServerLauncher> },
StreamableHttp { url, bearer_token, http_headers, auth_provider, ... },
}- InProcess:使用 Tokio
DuplexStream(内存字节管道),通过InProcessTransportFactorytrait 创建 - Stdio:生成子进程。
LocalStdioServerLauncher在本地启动,ExecutorStdioServerLauncher委托远程执行器 - StreamableHttp:使用
rmcp::transport::StreamableHttpClientTransport,通过自定义StreamableHttpClientAdapter实现 POST/DELETE/GET 操作
连接生命周期
初始化流程(initialize 方法):
rust
pub async fn initialize(
&self,
params: InitializeRequestParams,
timeout: Option<Duration>,
send_elicitation: SendElicitation,
) -> Result<InitializeResult>- 从
Connecting状态提取待用传输层 - 创建
ElicitationClientService(拦截服务器发起的请求) - 调用
rmcp::service::serve_client(client_service, transport)完成 JSON-RPC 握手 - 将
RunningService存入Ready状态 - 保存
InitializeContext(超时 + 服务克隆)用于会话恢复
工具调用流
rust
pub async fn call_tool(
&self, name: String, arguments: Option<serde_json::Value>,
meta: Option<serde_json::Value>, timeout: Option<Duration>,
) -> Result<CallToolResult>所有操作(list_tools、call_tool、list_resources 等)通过 run_service_operation 统一处理:
- 从状态获取
service - 运行操作(含可选的 active_time_timeout)
- 若遇到 404 会话过期错误,调用
reinitialize_after_session_expiry从配方重建传输层,重新握手,重试操作
交互暂停机制
rust
async fn active_time_timeout<T, Fut>(
duration: Duration,
pause_state: watch::Receiver<bool>, // true = 暂停
operation: Fut,
) -> Result<T, ()>当 MCP 服务器发起交互(elicitation)时,暂停计时器。只有在未暂停时才消耗超时预算,防止服务器与用户的交互吃掉工具调用的超时额度。
ElicitationClientService — RMCP 服务层
rust
struct ElicitationClientService {
handler: LoggingClientHandler,
send_elicitation: Arc<SendElicitation>,
pause_state: ElicitationPauseState,
}实现 rmcp::service::Service<RoleClient>:
handle_request— 拦截CreateElicitationRequest,调用send_elicitation回调(路由到 UI),进入暂停守卫handle_notification— 委托给LoggingClientHandler(记录进度、取消、资源变更等)
codex-rs/codex-mcp/src/connection_manager.rs — MCP 连接管理器
McpConnectionManager 结构体
rust
pub struct McpConnectionManager {
clients: HashMap<String, AsyncManagedClient>,
server_metadata: HashMap<String, McpServerMetadata>,
tool_plugin_provenance: Arc<ToolPluginProvenance>,
host_owned_codex_apps_enabled: bool,
prefix_mcp_tool_names: bool,
elicitation_requests: ElicitationRequestManager,
startup_cancellation_token: CancellationToken,
}服务器初始化(并发启动)
构造函数并发启动所有服务器连接:
- 遍历
mcp_servers,过滤启用的服务器 - 为每个服务器创建
AsyncManagedClient,包装共享 future:- 验证服务器名称
- 调用
make_rmcp_client构建RmcpClient - 调用
start_server_task:MCP 握手 → 列出工具 → 应用过滤 → 缓存快照
- 每个 future spawn 到
JoinSet,通过tx_event发出状态事件(Starting/Ready/Failed/Cancelled) - 所有客户端完成后发出
McpStartupComplete事件
AsyncManagedClient 与 ManagedClient
rust
struct ManagedClient {
client: Arc<RmcpClient>,
server_info: McpServerInfo,
tools: Vec<ToolInfo>,
tool_filter: ToolFilter,
tool_timeout: Option<Duration>,
server_instructions: Option<String>,
}
struct AsyncManagedClient {
client: Shared<BoxFuture<'static, Result<ManagedClient>>>, // .boxed().shared()
cached_tool_info_snapshot: Option<Vec<ToolInfo>>,
startup_complete: Arc<AtomicBool>,
}使用 futures::future::Shared 使多个调用者可以并发 .clone().await 同一个启动 future,不重复工作。
工具发现与调用路由
rust
// 聚合所有服务器的工具列表
pub async fn list_all_tools(&self) -> Vec<ToolInfo>
// 路由工具调用到正确的服务器
pub async fn call_tool(
&self, server: &str, tool: &str,
arguments: Option<serde_json::Value>,
) -> Result<CallToolResult>list_all_tools 遍历所有客户端,await 各自的 listed_tools(),合并为单一列表后调用 normalize_tools_for_model_with_prefix 生成唯一的、经过清理的模型可见名称。
call_tool 通过 client_by_name 查找客户端,检查 ToolFilter,委托给 client.call_tool()。
资源操作
list_all_resources 和 list_all_resource_templates 使用 JoinSet 并发查询所有服务器(支持游标分页),收集结果到 HashMap<String, Vec<Resource>>。
codex-rs/codex-mcp/src/elicitation.rs — 交互请求管理
rust
struct ElicitationRequestManager {
requests: Arc<Mutex<HashMap<(String, RequestId), oneshot::Sender<ElicitationResponse>>>>,
approval_policy: Arc<StdMutex<AskForApproval>>,
auto_deny: Arc<StdMutex<bool>>,
reviewer: Option<ElicitationReviewerHandle>,
}交互策略链(按优先级执行):
auto_deny→ 若为 true,立即拒绝- 自动审批 → 若审批策略允许且 schema 无必填字段,自动接受
- 策略拒绝 → 若
AskForApproval::Never,拒绝 - 审阅器 → 若注册了
ElicitationReviewer,调用reviewer.review() - 用户事件 → 创建 oneshot channel,发出
ElicitationRequest事件,等待用户响应
UI 层通过 resolve() 方法传递用户响应:
rust
pub async fn resolve(&self, server_name: String, id: RequestId, response: ElicitationResponse) -> Result<()>codex-rs/codex-mcp/src/tools.rs — 工具名称规范化
rust
pub struct ToolInfo {
pub server_name: String,
pub callable_name: String,
pub callable_namespace: String,
pub tool: Tool,
pub connector_id: Option<String>,
pub plugin_display_names: Vec<String>,
}normalize_tools_for_model_with_prefix 确保 64 字节限制内的唯一名称,名称冲突时使用 SHA-1 后缀。tool_with_model_visible_input_schema 将文件路径参数替换为 type: string + 引导文本。
连接器元数据隔离:非 codex_apps 服务器的连接器元数据被剥离,防止欺骗。只有可信的 codex_apps 服务允许传播连接器 ID。
OAuth 令牌管理(rmcp-client/src/oauth.rs)
rust
pub struct StoredOAuthTokens {
pub server_name: String,
pub client_id: String,
pub token_response: WrappedOAuthTokenResponse,
pub expires_at: Option<u64>,
}分层存储策略:
- Keyring(macOS Keychain / Windows Credential Manager / Linux Secret Service)— 首选
- 文件回退(
CODEX_HOME/.credentials.json)— keyring 不可用时使用
OAuthPersistor 在每次操作后持久化刷新令牌,在每次操作前检查令牌是否在 30 秒内过期并主动刷新。
关键函数索引
| 函数/模块 | 文件路径 | 说明 |
|---|---|---|
RmcpClient | rmcp-client/src/rmcp_client.rs | 底层 MCP 客户端 |
RmcpClient.initialize() | rmcp-client/src/rmcp_client.rs | JSON-RPC 握手 |
RmcpClient.call_tool() | rmcp-client/src/rmcp_client.rs | 工具调用 + 会话恢复 |
TransportRecipe | rmcp-client/src/rmcp_client.rs | 传输层配方(工厂模式) |
active_time_timeout() | rmcp-client/src/rmcp_client.rs | 暂停感知超时 |
ElicitationClientService | rmcp-client/src/elicitation_client_service.rs | RMCP 服务层 |
McpConnectionManager | codex-mcp/src/connection_manager.rs | 多服务器聚合管理 |
list_all_tools() | codex-mcp/src/connection_manager.rs | 并发工具发现 |
call_tool() | codex-mcp/src/connection_manager.rs | 工具调用路由 |
AsyncManagedClient | codex-mcp/src/rmcp_client.rs | 共享 future 启动 |
start_server_task() | codex-mcp/src/rmcp_client.rs | 服务器启动序列 |
ElicitationRequestManager | codex-mcp/src/elicitation.rs | 交互策略链 |
normalize_tools_for_model() | codex-mcp/src/tools.rs | 工具名称规范化 |
OAuthPersistor | rmcp-client/src/oauth.rs | OAuth 令牌持久化 |
StreamableHttpClientAdapter | rmcp-client/src/http_client_adapter.rs | HTTP 传输适配器 |
tool_is_model_visible() | codex-mcp/src/connection_manager.rs | 工具可见性过滤 |
read_mcp_resource() | core/src/codex_thread.rs | 读取 MCP 资源 |
call_mcp_tool() | core/src/codex_thread.rs | 调用 MCP 工具 |