跳轉到

Time Compass 的 Rust 風格錯誤處理設計:Railway-Oriented Programming 深度分析

目錄

  1. 簡介:為什麼用 Result 而非 Exception
  2. Railway-Oriented Programming 的核心概念
  3. Result Monad 的實際應用
  4. Google API 認證錯誤 vs 業務邏輯錯誤的分類
  5. Batch API 層級的錯誤處理策略
  6. 錯誤類型定義:完整的 Error Model
  7. 日誌與可觀測性設計
  8. 復見策略與 Retry Logic
  9. 實戰案例:async_core 的錯誤流轉

簡介:為什麼用 Result 而非 Exception

傳統異常處理的問題

Python 開發者習慣使用 try/except 捕捉異常。但在分散式 API 呼叫(如 Google Calendar、Tasks、Moodle)的高並發場景中,這種方式暴露了三個核心問題:

1. 隱藏的控制流

異常隱藏了實際的控制分支,導致程式碼難以追蹤。在 src/time_compass/utils/result.py 中,我們實作了 Result Monad 來解決此問題:

# ❌ 異常隱藏了實際的控制分支
def fetch_events():
    try:
        response = api.get_events()  # 可能拋出異常
        process_events(response)      # 某個步驟也可能拋出異常
        return events                 # 成功分支不清晰
    except ApiError as e:
        # 不知道是 fetch 還是 process 失敗
        handle_error(e)

2. 型別安全性喪失

# ❌ 函數型別簽名無法表達失敗情況
def get_calendar_list(calendar_id: str) -> List[Event]:
    # 呼叫者無法從型別推斷可能的失敗情況
    # 必須查看實作才知道會拋出異常
    pass

3. Batch 操作的複雜性

# ❌ 在 Batch 中,某些請求成功、某些失敗
# 異常模型不適合這種「部分成功」情景
results = batch_execute([req1, req2, req3])  # req2 失敗
# 整個 batch 拋出異常?還是返回部分結果?

Result Monad 的優勢

Time Compass 採用 Railway-Oriented Programming (ROP) 的 Result Monad,將失敗狀態提升為一級概念:

from time_compass.utils.result import Result, Ok, Err

# ✅ 型別簽名明確表達失敗情況
def async_get_all_tasks(
    client: ClientSession,
    request: GoogleAllTasksRequest,
) -> Result[AllTaskResult]:
    """成功返回 Ok[AllTaskResult],失敗返回 Err[GoogleError]"""
    ...

# ✅ 呼叫者從型別簽名即知曉可能失敗
result = await async_get_all_tasks(...)
if is_ok(result):
    tasks = result.value  # 安全地解包
    process_tasks(tasks)
elif is_err(result):
    error = result.error  # 清晰的失敗處理
    log_error(error)

特別適用於 Time Compass 的場景

  1. 多源並行抓取:Google Calendar + Google Tasks + Moodle 並行執行,每個源獨立失敗
  2. Batch 部分成功:批次 API 中,單一請求失敗不應中斷其他請求
  3. 漸進式遠端取消:Moodle 爬蟲超時與 Google API 失敗應被區分對待
  4. 可觀測性:每個中間步驟的成敗狀態可被追蹤、日誌記錄

Railway-Oriented Programming 的核心概念

什麼是 Railway?

ROP 將程式執行比作兩條平行的鐵軌:

成功軌道 (Success Track)  ──────────→ 最終成功結果
         ↖ 錯誤產生
分支軌道 (Failure Track)  ──────────→ 最終失敗結果
         ↑ 依然流動,不中斷

核心原則:一旦進入失敗軌道,後續操作自動跳過,直到終點。

Time Compass 的 Result 容器

# src/time_compass/utils/result.py

@dataclass(frozen=True)
class Ok(Generic[T]):
    """成功情況:包含希望的值"""
    value: T

@dataclass(frozen=True)
class Err:
    """失敗情況:包含異常物件"""
    error: Exception

# 型別別名
Result = Union[Ok[T], Err]

為什麼用 @dataclass(frozen=True)

  • 不可變性:Result 建立後無法改變,適合並發場景
  • 型別安全:型別檢查器可完整驗證 Ok/Err 訪問
  • 易於序列化:支持日誌、追蹤、重試邏輯中的序列化

Type Guard 與型別縮窄

from time_compass.utils.result import is_ok, is_err

def process_result(result: Result[int]) -> None:
    if is_ok(result):
        # 型別檢查器現在知道 result 是 Ok[int]
        value: int = result.value  # ✅ 安全
    elif is_err(result):
        # 型別檢查器現在知道 result 是 Err
        error: Exception = result.error  # ✅ 安全

為什麼需要 is_ok()is_err()

  • TypeGuard 支持:Python 3.10+ 的 TypeGuard 允許型別檢查器在 if 分支後縮窄型別
  • 避免類型轉換:無需 isinstance() 檢查或強制轉換
  • 可讀性:比 isinstance(result, Ok) 更清晰意圖

Result Monad 的實際應用

1. map() - 轉換成功值

from time_compass.utils.result import map_result, Ok, Err

def double(x: int) -> int:
    return x * 2

# ✅ 成功情況:轉換值
result: Result[int] = Ok(5)
doubled = map_result(result, double)  # Ok(10)

# ✅ 失敗情況:保持原樣
result: Result[int] = Err(ValueError("invalid"))
doubled = map_result(result, double)  # Err(ValueError(...)),double 不執行

實際應用場景:將 Google API 原始 JSON 轉換為領域模型

# src/time_compass/integrations/google_calendar/async_core.py

async def async_get_calendar_events(
    calendar_id: str,
    client: ClientSession,
) -> Result[CalendarEventsResult]:
    try:
        # 1. 取得原始 JSON
        raw_items = await fetch_raw_events_from_api(...)

        # 2. 轉換為讀取模型(使用 map_result)
        def convert_to_read_model(items):
            return [GoogleEventRead.from_raw_model(r) for r in items]

        converted = map_result(
            Ok(raw_items),
            convert_to_read_model
        )

        # 3. 處理業務邏輯(過濾、分組)
        def process_events(events):
            return _process_read_models(events, calendar_id)

        result_model = map_result(converted, process_events)

        return result_model
    except Exception as e:
        return Err(GoogleAPIError(f"Calendar fetch failed: {e}"))

2. flat_map()bind() - 鏈式操作

from time_compass.utils.result import flat_map_result, Ok, Err

def parse_int(s: str) -> Result[int]:
    try:
        return Ok(int(s))
    except ValueError:
        return Err(ValueError(f"Cannot parse '{s}'"))

def validate_positive(x: int) -> Result[int]:
    if x > 0:
        return Ok(x)
    else:
        return Err(ValueError(f"Must be positive, got {x}"))

# ✅ 鏈式操作
result: Result[str] = Ok("42")
parsed = flat_map_result(result, parse_int)        # Ok(42)
validated = flat_map_result(parsed, validate_positive)  # Ok(42)

# ✅ 若中途失敗,後續自動跳過
result: Result[str] = Ok("-5")
parsed = flat_map_result(result, parse_int)        # Ok(-5)
validated = flat_map_result(parsed, validate_positive)  # Err(ValueError(...))

3. is_ok() / is_err() - 模式匹配

from time_compass.utils.result import is_ok, is_err

# src/time_compass/integrations/get_all_information_from_api.py

# 處理 Google Tasks 結果
google_res = _coerce_result(results[google_task_idx], "Google tasks")

if google_res is not None:
    if is_ok(google_res):
        task_result = google_res.value
        task_grouped = task_result.items_by_group_id or task_result.items
        log.info("Google tasks fetched: %d lists", len(task_grouped))
    elif is_err(google_res):
        log.error("Google tasks fetching returned error: %s", google_res.error)

4. unwrap_or() - 提供預設值

from time_compass.utils.result import unwrap_or

# 取得 Moodle 結果,若失敗採用預設空列表
moodle_events = unwrap_or(moodle_result, [])

5. unwrap() - 強制解包(危險操作)

from time_compass.utils.result import unwrap

# 僅在確定不會失敗時使用
tasks = unwrap(task_result)  # 若是 Err,拋出異常,中斷流程

什麼時候使用 unwrap()

  • ✅ 單元測試中確保預期成功時
  • ✅ 應用初始化時,異常應立即中止程式
  • ❌ 正常業務邏輯中(應改用 is_ok() + 日誌)

Google API 認證錯誤 vs 業務邏輯錯誤的分類

完整的錯誤層級體系

Time Compass 設計了多層級的錯誤分類,每層承載特定的語義。

第一層:根異常類 GoogleError

# src/time_compass/integrations/common/exceptions.py

class GoogleError(Exception):
    """所有 Google API 相關異常的基類"""
    pass

目的:統一 catch Google API 相關的所有異常,與其他系統錯誤區分。

第二層:功能性分類

class GoogleAuthError(GoogleError):
    """認證/授權異常

    HTTP 狀態碼:401 (Unauthorized), 403 (Forbidden)
    常見原因:
    - invalid_grant: 刷新令牌已過期或無效
    - token_expiry: Access token 已過期
    - insufficient_scope: 令牌缺少必要的 scopes

    優先級:高 - 應立即通知使用者重新授權
    """
    pass

class GoogleParseError(GoogleError):
    """解析異常

    JSON 解析失敗或 Pydantic 驗證失敗
    常見原因:
    - API 返回非預期的 JSON 結構
    - 欄位型別不符(例如期望字串得到數字)

    優先級:高 - 表示 API 契約破口或版本不相容
    """
    pass

class GoogleNetworkError(GoogleError):
    """網路相關異常

    常見原因:
    - asyncio.TimeoutError: 單一請求超過時限
    - aiohttp.ClientError: 連接失敗、DNS 解析失敗
    - connection reset by peer

    優先級:中 - 通常可重試
    """
    pass

class GoogleNotFoundError(GoogleError):
    """資源不存在

    HTTP 狀態碼:404
    常見原因:
    - calendar_id 不存在
    - tasklist_id 已被刪除

    優先級:低 - 通常表示使用者資源狀態變化
    """
    pass

class GoogleRateLimitError(GoogleError):
    """速率限制

    HTTP 狀態碼:429
    常見原因:
    - 在短時間內發送過多請求(超過配額)
    - 批次請求過大

    優先級:中 - 應實現指數退避重試
    """
    pass

class GoogleBatchError(GoogleError):
    """批次操作級別異常

    常見原因:
    - 批次請求大小超過上限(>100 個子請求)
    - 多部分邊界格式錯誤

    優先級:高 - 表示客戶端程式碼錯誤
    """
    pass

HTTP 狀態碼路由

Time Compass 實現了精確的 HTTP 狀態碼 → GoogleError 對應:

# src/time_compass/integrations/common/google_api_dispatcher.py

def _status_code_to_google_error(status_code: int, error_msg: str) -> GoogleError:
    """
    根據 HTTP 狀態碼決定適當的 GoogleError 子類。

    This implements HTTP status code routing:
    - 401/403: GoogleAuthError (authentication/authorization)
    - 404: GoogleNotFoundError (resource not found)
    - 429: GoogleRateLimitError (rate limited)
    - Anything else: GoogleAPIError (generic API error)
    """
    if status_code in (401, 403):
        return GoogleAuthError(f"HTTP {status_code}: {error_msg}")

    if status_code == 404:
        return GoogleNotFoundError(f"HTTP {status_code}: {error_msg}")

    if status_code == 429:
        return GoogleRateLimitError(f"HTTP {status_code}: {error_msg}")

    # Default to generic APIError for 5xx or other codes
    return GoogleAPIError(f"HTTP {status_code}: {error_msg}")

業務邏輯錯誤 vs 技術錯誤

類別 例子 處理方式 重試?
認證錯誤 401 Unauthorized, token 過期 向使用者要求重新授權 ❌ 否(需使用者操作)
授權錯誤 403 Forbidden, 缺少 scope 向使用者解釋權限不足 ❌ 否(需使用者同意)
資源不存在 404 Not Found 記錄、忽略或提示使用者 ❌ 否(資源確實不存在)
速率限制 429 Too Many Requests 指數退避重試 ✅ 是(暫時性)
網路超時 asyncio.TimeoutError 重試或使用備用方案 ✅ 是(暫時性)
解析錯誤 JSON parsing fail 記錄詳細資訊,上報 ❌ 否(程式碼或 API 契約問題)

Batch API 層級的錯誤處理策略

Batch 請求的特殊性

Google Batch API(multipart/mixed)允許在單一 HTTP 請求中包含多個子請求:

POST /batch/calendar/v3 HTTP/1.1
Content-Type: multipart/mixed; boundary=batch_boundary

--batch_boundary
Content-Type: application/http

GET /calendar/v3/calendars/primary/events?timeMin=...
Authorization: Bearer <token>

--batch_boundary
Content-Type: application/http

POST /calendar/v3/calendars/primary/events
Authorization: Bearer <token>

{...event body...}

--batch_boundary--

「部分成功」模式

Batch API 的核心特性:一個子請求的失敗不會中斷其他子請求

# src/time_compass/integrations/common/google_api_dispatcher.py

async def batch_execute_async(
    client: ClientSession,
    credentials: Any,
    requests: List[BaseGoogleRequest],
    endpoint: str,
    boundary: str = "batch_boundary",
    raw: bool = False
) -> List[Result[BaseGoogleRaw]] | List[Dict[str, Any]]:
    """
    Executes a list of Google request models as a single batch.

    Type-Safe Behavior (via @overload):
    - If raw=True: Returns List[Dict[str, Any]] (raw HTTP response dicts)
    - If raw=False: Returns List[Result[BaseGoogleRaw]] (Ok or Err wrapped)

    關鍵:返回的是 List,包含多個 Result,而非單一 Result
    這樣呼叫者可以逐個檢查每個子請求的成敗
    """

多層級的異常捕捉

async def batch_execute_async(...) -> List[Result[BaseGoogleRaw]]:
    """
    多層級異常處理的體現:
    1. 網路層:整個 batch 請求失敗
    2. HTTP 層:子請求返回不同的狀態碼
    3. 解析層:JSON 結構不符預期
    """

    # Layer 1: 網路請求失敗(整個 batch 無法發送)
    try:
        response_text, resp_boundary = await send_batch_request_payload(...)
    except Exception as e:
        network_error = _convert_to_google_error(e)
        # 若網路失敗,所有子請求都返回同一個錯誤
        return [Err(network_error) for _ in requests]

    # Layer 2: 解析 Batch 響應信封
    raw_results = parse_generic_batch_response(...)

    # Layer 3: 逐個檢查子請求結果
    parsed_results: List[Result[BaseGoogleRaw]] = []
    for req, res in zip(requests, raw_results):
        if res["ok"]:
            # 子請求成功,嘗試解析
            try:
                model = req.parse_response(res["data"])
                parsed_results.append(Ok(model))
            except Exception as e:
                # 解析錯誤 → Err[GoogleParseError]
                parse_error = GoogleParseError(...)
                parsed_results.append(Err(parse_error))
        else:
            # 子請求返回 HTTP 錯誤
            status_code = res.get("status", 0)
            error_msg = res.get("error", "Unknown API error")
            api_error = _status_code_to_google_error(status_code, error_msg)
            parsed_results.append(Err(api_error))

    return parsed_results

錯誤類型定義:完整的 Error Model

異常分類矩陣

GoogleError (基類)
├─ GoogleAuthError (401/403)
├─ GoogleParseError (客戶端層)
├─ GoogleNetworkError (網路層)
├─ GoogleNotFoundError (404)
├─ GoogleRateLimitError (429)
├─ GoogleAPIError (其他)
└─ GoogleBatchError (Batch 層級)

異常訊息的設計規範

# ✅ 好的異常訊息(包含上下文)
GoogleAuthError("HTTP 401: invalid_grant - Refresh token expired at 2026-03-02T10:30:00Z")

GoogleNetworkError("Request timeout after 30s: GET /calendar/v3/calendars/primary/events")

GoogleParseError("Parse error in ListEventsRequest: 'start' field expected datetime, got string '2026-03-02'")

# ❌ 不好的異常訊息(缺乏上下文)
GoogleAuthError("401")
GoogleNetworkError("timeout")
GoogleParseError("Parse error")

日誌與可觀測性設計

Logger 的模組路徑追蹤

# src/time_compass/utils/logger.py

class _ModulePathFormatter(logging.Formatter):
    """Formatter that ensures `record.module_path` is present."""

    def format(self, record: logging.LogRecord) -> str:
        module_name = getattr(record, "name", "")
        if "time_compass" in module_name:
            module_path = module_name.split("time_compass.", 1)[-1]
        else:
            module_path = module_name
        record.module_path = module_path
        return super().format(record)

# 日誌格式
fmt = "[%(module_path)s] %(levelname)s %(message)s"

三層級的日誌記錄

# 1. 入口日誌(DEBUG)
logger.debug("async_get_all_information_from_api start: ...")

# 2. 中間步驟日誌(INFO)
logger.info("Awaiting %d tasks", len(tasks_to_wait))

# 3. 錯誤日誌(ERROR/WARNING)
logger.error("Google tasks fetching returned error: %s", result.error)

復見策略與 Retry Logic

建議的重試策略

1. 指數退避(Exponential Backoff)

async def retry_with_exponential_backoff(
    fn: Callable[[], Awaitable[Result[T]]],
    max_retries: int = 3,
    initial_delay: float = 1.0,
    max_delay: float = 32.0,
) -> Result[T]:
    """重試邏輯,適用於暫時性失敗"""
    delay = initial_delay

    for attempt in range(max_retries):
        result = await fn()

        if is_ok(result):
            return result

        error = result.error
        if isinstance(error, (GoogleRateLimitError, GoogleNetworkError)):
            if attempt < max_retries - 1:
                logger.warning(
                    "Retryable error (attempt %d/%d): %s. Waiting %.1fs",
                    attempt + 1, max_retries, error, delay
                )
                await asyncio.sleep(delay)
                delay = min(delay * 2, max_delay)
                continue

        return result

2. 斷路器(Circuit Breaker)

class CircuitBreaker:
    """防止 Cascading Failure"""

    def is_available(self) -> bool:
        """檢查是否應發起請求"""
        if self.state == CircuitState.CLOSED:
            return True
        # ... 詳細邏輯

實戰案例:async_core 的錯誤流轉

完整流程

async_get_all_information_from_api()
├─ 1. 並行建立三個任務
├─ 2. asyncio.gather(*tasks, return_exceptions=True)
├─ 3. 後處理:_coerce_result()
├─ 4. 逐個檢查結果
└─ 5. 最終合併為 ResourceContext

錯誤在層級間的流轉

Layer 1: Google API 返回 HTTP 401
  ↓
Layer 2: parse_generic_batch_response() 解析為
  {"ok": False, "status": 401, "error": "invalid_grant"}
  ↓
Layer 3: batch_execute_async() 轉換為
  Err(GoogleAuthError("HTTP 401: invalid_grant"))
  ↓
Layer 4: async_core 檢查並傳遞
  if is_err(result) and isinstance(result.error, GoogleAuthError):
      return Err(result.error)
  ↓
Layer 5: MCP 工具回傳失敗訊息給前端

最佳實踐清單

  • 使用 Result[T] 表達可能失敗的操作
  • 使用 is_ok() / is_err() 進行型別縮窄
  • 使用 map_result / flat_map_result 鏈式操作
  • 細分 GoogleError 子類,實現狀態碼路由
  • 在 Batch 結果中逐個檢查,支持部分成功
  • 記錄詳細的日誌上下文,便於分佈式追蹤
  • 實現指數退避和斷路器,處理暫時性失敗
  • 避免盲目重試 AuthError 或 ParseError
  • 避免在熱路徑中使用 unwrap(),會拋出異常

文檔版本:Phase 4 (Result Monad 整合階段)
適用範圍:src/time_compass/integrations/
參考檔案: - src/time_compass/utils/result.py - src/time_compass/integrations/common/exceptions.py - src/time_compass/integrations/common/google_api_dispatcher.py - src/time_compass/integrations/get_all_information_from_api.py