Skip to content

MCP 协议 — 代码走读

MCP(Model Context Protocol)实现跨越两个 Rust crate:rmcp-client 是底层协议客户端,codex-mcp 是高层编排层,聚合多个 MCP 服务器并管理工具发现、调用路由和交互策略。

codex-rs/rmcp-client/src/rmcp_client.rs — 底层 MCP 客户端

RmcpClient 结构体

rust
pub struct RmcpClient {
    state: Mutex<ClientState>,           // 状态机:Connecting → Ready → Closed
    stdio_process: Option<StdioServerProcessHandle>,
    transport_recipe: TransportRecipe,   // 可克隆的传输层"配方",用于会话恢复
    initialize_context: Mutex<Option<InitializeContext>>,
    session_recovery_lock: Semaphore,    // 确保同时只有一个恢复尝试
    elicitation_pause_state: ElicitationPauseState,  // 暂停超时计时的 watch 通道
}

传输层抽象

三种传输变体,通过 TransportRecipe(配方)和 PendingTransport(具体实例)分离:

rust
enum TransportRecipe {
    InProcess { factory: Arc<dyn InProcessTransportFactory> },
    Stdio { command: StdioServerCommand, launcher: Arc<dyn StdioServerLauncher> },
    StreamableHttp { url, bearer_token, http_headers, auth_provider, ... },
}
  • InProcess:使用 Tokio DuplexStream(内存字节管道),通过 InProcessTransportFactory trait 创建
  • Stdio:生成子进程。LocalStdioServerLauncher 在本地启动,ExecutorStdioServerLauncher 委托远程执行器
  • StreamableHttp:使用 rmcp::transport::StreamableHttpClientTransport,通过自定义 StreamableHttpClientAdapter 实现 POST/DELETE/GET 操作

连接生命周期

初始化流程(initialize 方法):

rust
pub async fn initialize(
    &self,
    params: InitializeRequestParams,
    timeout: Option<Duration>,
    send_elicitation: SendElicitation,
) -> Result<InitializeResult>
  1. Connecting 状态提取待用传输层
  2. 创建 ElicitationClientService(拦截服务器发起的请求)
  3. 调用 rmcp::service::serve_client(client_service, transport) 完成 JSON-RPC 握手
  4. RunningService 存入 Ready 状态
  5. 保存 InitializeContext(超时 + 服务克隆)用于会话恢复

工具调用流

rust
pub async fn call_tool(
    &self, name: String, arguments: Option<serde_json::Value>,
    meta: Option<serde_json::Value>, timeout: Option<Duration>,
) -> Result<CallToolResult>

所有操作(list_toolscall_toollist_resources 等)通过 run_service_operation 统一处理:

  1. 从状态获取 service
  2. 运行操作(含可选的 active_time_timeout)
  3. 若遇到 404 会话过期错误,调用 reinitialize_after_session_expiry 从配方重建传输层,重新握手,重试操作

交互暂停机制

rust
async fn active_time_timeout<T, Fut>(
    duration: Duration,
    pause_state: watch::Receiver<bool>,  // true = 暂停
    operation: Fut,
) -> Result<T, ()>

当 MCP 服务器发起交互(elicitation)时,暂停计时器。只有在未暂停时才消耗超时预算,防止服务器与用户的交互吃掉工具调用的超时额度。

ElicitationClientService — RMCP 服务层

rust
struct ElicitationClientService {
    handler: LoggingClientHandler,
    send_elicitation: Arc<SendElicitation>,
    pause_state: ElicitationPauseState,
}

实现 rmcp::service::Service<RoleClient>

  • handle_request — 拦截 CreateElicitationRequest,调用 send_elicitation 回调(路由到 UI),进入暂停守卫
  • handle_notification — 委托给 LoggingClientHandler(记录进度、取消、资源变更等)

codex-rs/codex-mcp/src/connection_manager.rs — MCP 连接管理器

McpConnectionManager 结构体

rust
pub struct McpConnectionManager {
    clients: HashMap<String, AsyncManagedClient>,
    server_metadata: HashMap<String, McpServerMetadata>,
    tool_plugin_provenance: Arc<ToolPluginProvenance>,
    host_owned_codex_apps_enabled: bool,
    prefix_mcp_tool_names: bool,
    elicitation_requests: ElicitationRequestManager,
    startup_cancellation_token: CancellationToken,
}

服务器初始化(并发启动)

构造函数并发启动所有服务器连接:

  1. 遍历 mcp_servers,过滤启用的服务器
  2. 为每个服务器创建 AsyncManagedClient,包装共享 future:
    • 验证服务器名称
    • 调用 make_rmcp_client 构建 RmcpClient
    • 调用 start_server_task:MCP 握手 → 列出工具 → 应用过滤 → 缓存快照
  3. 每个 future spawn 到 JoinSet,通过 tx_event 发出状态事件(Starting/Ready/Failed/Cancelled
  4. 所有客户端完成后发出 McpStartupComplete 事件

AsyncManagedClient 与 ManagedClient

rust
struct ManagedClient {
    client: Arc<RmcpClient>,
    server_info: McpServerInfo,
    tools: Vec<ToolInfo>,
    tool_filter: ToolFilter,
    tool_timeout: Option<Duration>,
    server_instructions: Option<String>,
}

struct AsyncManagedClient {
    client: Shared<BoxFuture<'static, Result<ManagedClient>>>,  // .boxed().shared()
    cached_tool_info_snapshot: Option<Vec<ToolInfo>>,
    startup_complete: Arc<AtomicBool>,
}

使用 futures::future::Shared 使多个调用者可以并发 .clone().await 同一个启动 future,不重复工作。

工具发现与调用路由

rust
// 聚合所有服务器的工具列表
pub async fn list_all_tools(&self) -> Vec<ToolInfo>

// 路由工具调用到正确的服务器
pub async fn call_tool(
    &self, server: &str, tool: &str,
    arguments: Option<serde_json::Value>,
) -> Result<CallToolResult>

list_all_tools 遍历所有客户端,await 各自的 listed_tools(),合并为单一列表后调用 normalize_tools_for_model_with_prefix 生成唯一的、经过清理的模型可见名称。

call_tool 通过 client_by_name 查找客户端,检查 ToolFilter,委托给 client.call_tool()

资源操作

list_all_resourceslist_all_resource_templates 使用 JoinSet 并发查询所有服务器(支持游标分页),收集结果到 HashMap<String, Vec<Resource>>

codex-rs/codex-mcp/src/elicitation.rs — 交互请求管理

rust
struct ElicitationRequestManager {
    requests: Arc<Mutex<HashMap<(String, RequestId), oneshot::Sender<ElicitationResponse>>>>,
    approval_policy: Arc<StdMutex<AskForApproval>>,
    auto_deny: Arc<StdMutex<bool>>,
    reviewer: Option<ElicitationReviewerHandle>,
}

交互策略链(按优先级执行):

  1. auto_deny → 若为 true,立即拒绝
  2. 自动审批 → 若审批策略允许且 schema 无必填字段,自动接受
  3. 策略拒绝 → 若 AskForApproval::Never,拒绝
  4. 审阅器 → 若注册了 ElicitationReviewer,调用 reviewer.review()
  5. 用户事件 → 创建 oneshot channel,发出 ElicitationRequest 事件,等待用户响应

UI 层通过 resolve() 方法传递用户响应:

rust
pub async fn resolve(&self, server_name: String, id: RequestId, response: ElicitationResponse) -> Result<()>

codex-rs/codex-mcp/src/tools.rs — 工具名称规范化

rust
pub struct ToolInfo {
    pub server_name: String,
    pub callable_name: String,
    pub callable_namespace: String,
    pub tool: Tool,
    pub connector_id: Option<String>,
    pub plugin_display_names: Vec<String>,
}

normalize_tools_for_model_with_prefix 确保 64 字节限制内的唯一名称,名称冲突时使用 SHA-1 后缀。tool_with_model_visible_input_schema 将文件路径参数替换为 type: string + 引导文本。

连接器元数据隔离:非 codex_apps 服务器的连接器元数据被剥离,防止欺骗。只有可信的 codex_apps 服务允许传播连接器 ID。

OAuth 令牌管理(rmcp-client/src/oauth.rs)

rust
pub struct StoredOAuthTokens {
    pub server_name: String,
    pub client_id: String,
    pub token_response: WrappedOAuthTokenResponse,
    pub expires_at: Option<u64>,
}

分层存储策略:

  • Keyring(macOS Keychain / Windows Credential Manager / Linux Secret Service)— 首选
  • 文件回退CODEX_HOME/.credentials.json)— keyring 不可用时使用

OAuthPersistor 在每次操作后持久化刷新令牌,在每次操作前检查令牌是否在 30 秒内过期并主动刷新。

关键函数索引

函数/模块文件路径说明
RmcpClientrmcp-client/src/rmcp_client.rs底层 MCP 客户端
RmcpClient.initialize()rmcp-client/src/rmcp_client.rsJSON-RPC 握手
RmcpClient.call_tool()rmcp-client/src/rmcp_client.rs工具调用 + 会话恢复
TransportRecipermcp-client/src/rmcp_client.rs传输层配方(工厂模式)
active_time_timeout()rmcp-client/src/rmcp_client.rs暂停感知超时
ElicitationClientServicermcp-client/src/elicitation_client_service.rsRMCP 服务层
McpConnectionManagercodex-mcp/src/connection_manager.rs多服务器聚合管理
list_all_tools()codex-mcp/src/connection_manager.rs并发工具发现
call_tool()codex-mcp/src/connection_manager.rs工具调用路由
AsyncManagedClientcodex-mcp/src/rmcp_client.rs共享 future 启动
start_server_task()codex-mcp/src/rmcp_client.rs服务器启动序列
ElicitationRequestManagercodex-mcp/src/elicitation.rs交互策略链
normalize_tools_for_model()codex-mcp/src/tools.rs工具名称规范化
OAuthPersistorrmcp-client/src/oauth.rsOAuth 令牌持久化
StreamableHttpClientAdapterrmcp-client/src/http_client_adapter.rsHTTP 传输适配器
tool_is_model_visible()codex-mcp/src/connection_manager.rs工具可见性过滤
read_mcp_resource()core/src/codex_thread.rs读取 MCP 资源
call_mcp_tool()core/src/codex_thread.rs调用 MCP 工具