Time Compass 的 Rust 風格錯誤處理設計:Railway-Oriented Programming 深度分析¶
目錄¶
- 簡介:為什麼用 Result 而非 Exception
- Railway-Oriented Programming 的核心概念
- Result Monad 的實際應用
- Google API 認證錯誤 vs 業務邏輯錯誤的分類
- Batch API 層級的錯誤處理策略
- 錯誤類型定義:完整的 Error Model
- 日誌與可觀測性設計
- 復見策略與 Retry Logic
- 實戰案例:async_core 的錯誤流轉
簡介:為什麼用 Result 而非 Exception¶
傳統異常處理的問題¶
Python 開發者習慣使用 try/except 捕捉異常。但在分散式 API 呼叫(如 Google Calendar、Tasks、Moodle)的高並發場景中,這種方式暴露了三個核心問題:
1. 隱藏的控制流¶
異常隱藏了實際的控制分支,導致程式碼難以追蹤。在 src/time_compass/utils/result.py 中,我們實作了 Result Monad 來解決此問題:
# ❌ 異常隱藏了實際的控制分支
def fetch_events():
try:
response = api.get_events() # 可能拋出異常
process_events(response) # 某個步驟也可能拋出異常
return events # 成功分支不清晰
except ApiError as e:
# 不知道是 fetch 還是 process 失敗
handle_error(e)
2. 型別安全性喪失¶
# ❌ 函數型別簽名無法表達失敗情況
def get_calendar_list(calendar_id: str) -> List[Event]:
# 呼叫者無法從型別推斷可能的失敗情況
# 必須查看實作才知道會拋出異常
pass
3. Batch 操作的複雜性¶
# ❌ 在 Batch 中,某些請求成功、某些失敗
# 異常模型不適合這種「部分成功」情景
results = batch_execute([req1, req2, req3]) # req2 失敗
# 整個 batch 拋出異常?還是返回部分結果?
Result Monad 的優勢¶
Time Compass 採用 Railway-Oriented Programming (ROP) 的 Result Monad,將失敗狀態提升為一級概念:
from time_compass.utils.result import Result, Ok, Err
# ✅ 型別簽名明確表達失敗情況
def async_get_all_tasks(
client: ClientSession,
request: GoogleAllTasksRequest,
) -> Result[AllTaskResult]:
"""成功返回 Ok[AllTaskResult],失敗返回 Err[GoogleError]"""
...
# ✅ 呼叫者從型別簽名即知曉可能失敗
result = await async_get_all_tasks(...)
if is_ok(result):
tasks = result.value # 安全地解包
process_tasks(tasks)
elif is_err(result):
error = result.error # 清晰的失敗處理
log_error(error)
特別適用於 Time Compass 的場景¶
- 多源並行抓取:Google Calendar + Google Tasks + Moodle 並行執行,每個源獨立失敗
- Batch 部分成功:批次 API 中,單一請求失敗不應中斷其他請求
- 漸進式遠端取消:Moodle 爬蟲超時與 Google API 失敗應被區分對待
- 可觀測性:每個中間步驟的成敗狀態可被追蹤、日誌記錄
Railway-Oriented Programming 的核心概念¶
什麼是 Railway?¶
ROP 將程式執行比作兩條平行的鐵軌:
成功軌道 (Success Track) ──────────→ 最終成功結果
↖ 錯誤產生
分支軌道 (Failure Track) ──────────→ 最終失敗結果
↑ 依然流動,不中斷
核心原則:一旦進入失敗軌道,後續操作自動跳過,直到終點。
Time Compass 的 Result 容器¶
# src/time_compass/utils/result.py
@dataclass(frozen=True)
class Ok(Generic[T]):
"""成功情況:包含希望的值"""
value: T
@dataclass(frozen=True)
class Err:
"""失敗情況:包含異常物件"""
error: Exception
# 型別別名
Result = Union[Ok[T], Err]
為什麼用 @dataclass(frozen=True)?
- 不可變性:Result 建立後無法改變,適合並發場景
- 型別安全:型別檢查器可完整驗證 Ok/Err 訪問
- 易於序列化:支持日誌、追蹤、重試邏輯中的序列化
Type Guard 與型別縮窄¶
from time_compass.utils.result import is_ok, is_err
def process_result(result: Result[int]) -> None:
if is_ok(result):
# 型別檢查器現在知道 result 是 Ok[int]
value: int = result.value # ✅ 安全
elif is_err(result):
# 型別檢查器現在知道 result 是 Err
error: Exception = result.error # ✅ 安全
為什麼需要 is_ok() 和 is_err()?
- TypeGuard 支持:Python 3.10+ 的
TypeGuard允許型別檢查器在 if 分支後縮窄型別 - 避免類型轉換:無需
isinstance()檢查或強制轉換 - 可讀性:比
isinstance(result, Ok)更清晰意圖
Result Monad 的實際應用¶
1. map() - 轉換成功值¶
from time_compass.utils.result import map_result, Ok, Err
def double(x: int) -> int:
return x * 2
# ✅ 成功情況:轉換值
result: Result[int] = Ok(5)
doubled = map_result(result, double) # Ok(10)
# ✅ 失敗情況:保持原樣
result: Result[int] = Err(ValueError("invalid"))
doubled = map_result(result, double) # Err(ValueError(...)),double 不執行
實際應用場景:將 Google API 原始 JSON 轉換為領域模型
# src/time_compass/integrations/google_calendar/async_core.py
async def async_get_calendar_events(
calendar_id: str,
client: ClientSession,
) -> Result[CalendarEventsResult]:
try:
# 1. 取得原始 JSON
raw_items = await fetch_raw_events_from_api(...)
# 2. 轉換為讀取模型(使用 map_result)
def convert_to_read_model(items):
return [GoogleEventRead.from_raw_model(r) for r in items]
converted = map_result(
Ok(raw_items),
convert_to_read_model
)
# 3. 處理業務邏輯(過濾、分組)
def process_events(events):
return _process_read_models(events, calendar_id)
result_model = map_result(converted, process_events)
return result_model
except Exception as e:
return Err(GoogleAPIError(f"Calendar fetch failed: {e}"))
2. flat_map() 或 bind() - 鏈式操作¶
from time_compass.utils.result import flat_map_result, Ok, Err
def parse_int(s: str) -> Result[int]:
try:
return Ok(int(s))
except ValueError:
return Err(ValueError(f"Cannot parse '{s}'"))
def validate_positive(x: int) -> Result[int]:
if x > 0:
return Ok(x)
else:
return Err(ValueError(f"Must be positive, got {x}"))
# ✅ 鏈式操作
result: Result[str] = Ok("42")
parsed = flat_map_result(result, parse_int) # Ok(42)
validated = flat_map_result(parsed, validate_positive) # Ok(42)
# ✅ 若中途失敗,後續自動跳過
result: Result[str] = Ok("-5")
parsed = flat_map_result(result, parse_int) # Ok(-5)
validated = flat_map_result(parsed, validate_positive) # Err(ValueError(...))
3. is_ok() / is_err() - 模式匹配¶
from time_compass.utils.result import is_ok, is_err
# src/time_compass/integrations/get_all_information_from_api.py
# 處理 Google Tasks 結果
google_res = _coerce_result(results[google_task_idx], "Google tasks")
if google_res is not None:
if is_ok(google_res):
task_result = google_res.value
task_grouped = task_result.items_by_group_id or task_result.items
log.info("Google tasks fetched: %d lists", len(task_grouped))
elif is_err(google_res):
log.error("Google tasks fetching returned error: %s", google_res.error)
4. unwrap_or() - 提供預設值¶
from time_compass.utils.result import unwrap_or
# 取得 Moodle 結果,若失敗採用預設空列表
moodle_events = unwrap_or(moodle_result, [])
5. unwrap() - 強制解包(危險操作)¶
from time_compass.utils.result import unwrap
# 僅在確定不會失敗時使用
tasks = unwrap(task_result) # 若是 Err,拋出異常,中斷流程
什麼時候使用 unwrap()?
- ✅ 單元測試中確保預期成功時
- ✅ 應用初始化時,異常應立即中止程式
- ❌ 正常業務邏輯中(應改用
is_ok()+ 日誌)
Google API 認證錯誤 vs 業務邏輯錯誤的分類¶
完整的錯誤層級體系¶
Time Compass 設計了多層級的錯誤分類,每層承載特定的語義。
第一層:根異常類 GoogleError¶
# src/time_compass/integrations/common/exceptions.py
class GoogleError(Exception):
"""所有 Google API 相關異常的基類"""
pass
目的:統一 catch Google API 相關的所有異常,與其他系統錯誤區分。
第二層:功能性分類¶
class GoogleAuthError(GoogleError):
"""認證/授權異常
HTTP 狀態碼:401 (Unauthorized), 403 (Forbidden)
常見原因:
- invalid_grant: 刷新令牌已過期或無效
- token_expiry: Access token 已過期
- insufficient_scope: 令牌缺少必要的 scopes
優先級:高 - 應立即通知使用者重新授權
"""
pass
class GoogleParseError(GoogleError):
"""解析異常
JSON 解析失敗或 Pydantic 驗證失敗
常見原因:
- API 返回非預期的 JSON 結構
- 欄位型別不符(例如期望字串得到數字)
優先級:高 - 表示 API 契約破口或版本不相容
"""
pass
class GoogleNetworkError(GoogleError):
"""網路相關異常
常見原因:
- asyncio.TimeoutError: 單一請求超過時限
- aiohttp.ClientError: 連接失敗、DNS 解析失敗
- connection reset by peer
優先級:中 - 通常可重試
"""
pass
class GoogleNotFoundError(GoogleError):
"""資源不存在
HTTP 狀態碼:404
常見原因:
- calendar_id 不存在
- tasklist_id 已被刪除
優先級:低 - 通常表示使用者資源狀態變化
"""
pass
class GoogleRateLimitError(GoogleError):
"""速率限制
HTTP 狀態碼:429
常見原因:
- 在短時間內發送過多請求(超過配額)
- 批次請求過大
優先級:中 - 應實現指數退避重試
"""
pass
class GoogleBatchError(GoogleError):
"""批次操作級別異常
常見原因:
- 批次請求大小超過上限(>100 個子請求)
- 多部分邊界格式錯誤
優先級:高 - 表示客戶端程式碼錯誤
"""
pass
HTTP 狀態碼路由¶
Time Compass 實現了精確的 HTTP 狀態碼 → GoogleError 對應:
# src/time_compass/integrations/common/google_api_dispatcher.py
def _status_code_to_google_error(status_code: int, error_msg: str) -> GoogleError:
"""
根據 HTTP 狀態碼決定適當的 GoogleError 子類。
This implements HTTP status code routing:
- 401/403: GoogleAuthError (authentication/authorization)
- 404: GoogleNotFoundError (resource not found)
- 429: GoogleRateLimitError (rate limited)
- Anything else: GoogleAPIError (generic API error)
"""
if status_code in (401, 403):
return GoogleAuthError(f"HTTP {status_code}: {error_msg}")
if status_code == 404:
return GoogleNotFoundError(f"HTTP {status_code}: {error_msg}")
if status_code == 429:
return GoogleRateLimitError(f"HTTP {status_code}: {error_msg}")
# Default to generic APIError for 5xx or other codes
return GoogleAPIError(f"HTTP {status_code}: {error_msg}")
業務邏輯錯誤 vs 技術錯誤¶
| 類別 | 例子 | 處理方式 | 重試? |
|---|---|---|---|
| 認證錯誤 | 401 Unauthorized, token 過期 | 向使用者要求重新授權 | ❌ 否(需使用者操作) |
| 授權錯誤 | 403 Forbidden, 缺少 scope | 向使用者解釋權限不足 | ❌ 否(需使用者同意) |
| 資源不存在 | 404 Not Found | 記錄、忽略或提示使用者 | ❌ 否(資源確實不存在) |
| 速率限制 | 429 Too Many Requests | 指數退避重試 | ✅ 是(暫時性) |
| 網路超時 | asyncio.TimeoutError | 重試或使用備用方案 | ✅ 是(暫時性) |
| 解析錯誤 | JSON parsing fail | 記錄詳細資訊,上報 | ❌ 否(程式碼或 API 契約問題) |
Batch API 層級的錯誤處理策略¶
Batch 請求的特殊性¶
Google Batch API(multipart/mixed)允許在單一 HTTP 請求中包含多個子請求:
POST /batch/calendar/v3 HTTP/1.1
Content-Type: multipart/mixed; boundary=batch_boundary
--batch_boundary
Content-Type: application/http
GET /calendar/v3/calendars/primary/events?timeMin=...
Authorization: Bearer <token>
--batch_boundary
Content-Type: application/http
POST /calendar/v3/calendars/primary/events
Authorization: Bearer <token>
{...event body...}
--batch_boundary--
「部分成功」模式¶
Batch API 的核心特性:一個子請求的失敗不會中斷其他子請求。
# src/time_compass/integrations/common/google_api_dispatcher.py
async def batch_execute_async(
client: ClientSession,
credentials: Any,
requests: List[BaseGoogleRequest],
endpoint: str,
boundary: str = "batch_boundary",
raw: bool = False
) -> List[Result[BaseGoogleRaw]] | List[Dict[str, Any]]:
"""
Executes a list of Google request models as a single batch.
Type-Safe Behavior (via @overload):
- If raw=True: Returns List[Dict[str, Any]] (raw HTTP response dicts)
- If raw=False: Returns List[Result[BaseGoogleRaw]] (Ok or Err wrapped)
關鍵:返回的是 List,包含多個 Result,而非單一 Result
這樣呼叫者可以逐個檢查每個子請求的成敗
"""
多層級的異常捕捉¶
async def batch_execute_async(...) -> List[Result[BaseGoogleRaw]]:
"""
多層級異常處理的體現:
1. 網路層:整個 batch 請求失敗
2. HTTP 層:子請求返回不同的狀態碼
3. 解析層:JSON 結構不符預期
"""
# Layer 1: 網路請求失敗(整個 batch 無法發送)
try:
response_text, resp_boundary = await send_batch_request_payload(...)
except Exception as e:
network_error = _convert_to_google_error(e)
# 若網路失敗,所有子請求都返回同一個錯誤
return [Err(network_error) for _ in requests]
# Layer 2: 解析 Batch 響應信封
raw_results = parse_generic_batch_response(...)
# Layer 3: 逐個檢查子請求結果
parsed_results: List[Result[BaseGoogleRaw]] = []
for req, res in zip(requests, raw_results):
if res["ok"]:
# 子請求成功,嘗試解析
try:
model = req.parse_response(res["data"])
parsed_results.append(Ok(model))
except Exception as e:
# 解析錯誤 → Err[GoogleParseError]
parse_error = GoogleParseError(...)
parsed_results.append(Err(parse_error))
else:
# 子請求返回 HTTP 錯誤
status_code = res.get("status", 0)
error_msg = res.get("error", "Unknown API error")
api_error = _status_code_to_google_error(status_code, error_msg)
parsed_results.append(Err(api_error))
return parsed_results
錯誤類型定義:完整的 Error Model¶
異常分類矩陣¶
GoogleError (基類)
├─ GoogleAuthError (401/403)
├─ GoogleParseError (客戶端層)
├─ GoogleNetworkError (網路層)
├─ GoogleNotFoundError (404)
├─ GoogleRateLimitError (429)
├─ GoogleAPIError (其他)
└─ GoogleBatchError (Batch 層級)
異常訊息的設計規範¶
# ✅ 好的異常訊息(包含上下文)
GoogleAuthError("HTTP 401: invalid_grant - Refresh token expired at 2026-03-02T10:30:00Z")
GoogleNetworkError("Request timeout after 30s: GET /calendar/v3/calendars/primary/events")
GoogleParseError("Parse error in ListEventsRequest: 'start' field expected datetime, got string '2026-03-02'")
# ❌ 不好的異常訊息(缺乏上下文)
GoogleAuthError("401")
GoogleNetworkError("timeout")
GoogleParseError("Parse error")
日誌與可觀測性設計¶
Logger 的模組路徑追蹤¶
# src/time_compass/utils/logger.py
class _ModulePathFormatter(logging.Formatter):
"""Formatter that ensures `record.module_path` is present."""
def format(self, record: logging.LogRecord) -> str:
module_name = getattr(record, "name", "")
if "time_compass" in module_name:
module_path = module_name.split("time_compass.", 1)[-1]
else:
module_path = module_name
record.module_path = module_path
return super().format(record)
# 日誌格式
fmt = "[%(module_path)s] %(levelname)s %(message)s"
三層級的日誌記錄¶
# 1. 入口日誌(DEBUG)
logger.debug("async_get_all_information_from_api start: ...")
# 2. 中間步驟日誌(INFO)
logger.info("Awaiting %d tasks", len(tasks_to_wait))
# 3. 錯誤日誌(ERROR/WARNING)
logger.error("Google tasks fetching returned error: %s", result.error)
復見策略與 Retry Logic¶
建議的重試策略¶
1. 指數退避(Exponential Backoff)¶
async def retry_with_exponential_backoff(
fn: Callable[[], Awaitable[Result[T]]],
max_retries: int = 3,
initial_delay: float = 1.0,
max_delay: float = 32.0,
) -> Result[T]:
"""重試邏輯,適用於暫時性失敗"""
delay = initial_delay
for attempt in range(max_retries):
result = await fn()
if is_ok(result):
return result
error = result.error
if isinstance(error, (GoogleRateLimitError, GoogleNetworkError)):
if attempt < max_retries - 1:
logger.warning(
"Retryable error (attempt %d/%d): %s. Waiting %.1fs",
attempt + 1, max_retries, error, delay
)
await asyncio.sleep(delay)
delay = min(delay * 2, max_delay)
continue
return result
2. 斷路器(Circuit Breaker)¶
class CircuitBreaker:
"""防止 Cascading Failure"""
def is_available(self) -> bool:
"""檢查是否應發起請求"""
if self.state == CircuitState.CLOSED:
return True
# ... 詳細邏輯
實戰案例:async_core 的錯誤流轉¶
完整流程¶
async_get_all_information_from_api()
├─ 1. 並行建立三個任務
├─ 2. asyncio.gather(*tasks, return_exceptions=True)
├─ 3. 後處理:_coerce_result()
├─ 4. 逐個檢查結果
└─ 5. 最終合併為 ResourceContext
錯誤在層級間的流轉¶
Layer 1: Google API 返回 HTTP 401
↓
Layer 2: parse_generic_batch_response() 解析為
{"ok": False, "status": 401, "error": "invalid_grant"}
↓
Layer 3: batch_execute_async() 轉換為
Err(GoogleAuthError("HTTP 401: invalid_grant"))
↓
Layer 4: async_core 檢查並傳遞
if is_err(result) and isinstance(result.error, GoogleAuthError):
return Err(result.error)
↓
Layer 5: MCP 工具回傳失敗訊息給前端
最佳實踐清單¶
- ✅ 使用 Result[T] 表達可能失敗的操作
- ✅ 使用 is_ok() / is_err() 進行型別縮窄
- ✅ 使用 map_result / flat_map_result 鏈式操作
- ✅ 細分 GoogleError 子類,實現狀態碼路由
- ✅ 在 Batch 結果中逐個檢查,支持部分成功
- ✅ 記錄詳細的日誌上下文,便於分佈式追蹤
- ✅ 實現指數退避和斷路器,處理暫時性失敗
- ❌ 避免盲目重試 AuthError 或 ParseError
- ❌ 避免在熱路徑中使用 unwrap(),會拋出異常
文檔版本:Phase 4 (Result Monad 整合階段)
適用範圍:src/time_compass/integrations/
參考檔案: - src/time_compass/utils/result.py - src/time_compass/integrations/common/exceptions.py - src/time_compass/integrations/common/google_api_dispatcher.py - src/time_compass/integrations/get_all_information_from_api.py