Streaming Architecture¶
Goal¶
提供串流(streaming)相關的 helper 與管理:把來自 domain/orchestrator 的 chunked 輸出標準化並逐步推送到前端(或 CLI),並處理延遲、錯誤回報等機制。
Inputs¶
user_input: str(要送入 orchestrator 的文字)。- 環境變數:
STREAM_DELAY(每個 chunk 的 sleep fallback)、Gradio theme 可選。 - 可選的
build_streaming_orchestrator(從 domain 導入,若不可用會 fallback)。
Outputs¶
- Async generator of string chunks(格式化後的串流文字,範例:
[Module:field] text)。 - 當 orchestrator 不可用時,回傳 input 作為 fallback。錯誤時回傳錯誤字串。
Signature¶
format_stream_output(module: str, field: str, text: str) -> str:將 chunk 包裝成易於顯示的格式。async def stream_main_output(user_input: str) -> AsyncGenerator[str, None]:核心的 async generator;從build_streaming_orchestrator()取得output_stream並逐 chunk yield。若無 orchestrator,yield 原始輸入。async def chat_fn(message: str, history: List[Dict[str, Any]]):對 Gradio chat 的串流回調,將stream_main_output的結果逐步 yield 回前端。def test_stream_main_output():CLI 測試輔助函式。
Workflow & Upstream Mapping¶
stream_main_output被呼叫,若build_streaming_orchestrator可用,建立 orchestrator instance。- 從 orchestrator 取得 async output stream(每個 chunk 可能包含
signature_field_name,module_name,chunk等屬性)。 - 當偵測到新 field/module 時重置累積字串,並持續累加,yield 已格式化的累積內容給前端。每次 yield 後會根據
STREAM_DELAY暫停。 - 若發生例外,yield 錯誤訊息以便前端顯示。
簡要流程:
User message → stream_main_output → orchestrator (async chunks) → format & yield → Frontend
State Machine¶
┌─────────────────────────────────────────────────────────────────┐
│ stream_main_output(user_input) │
└────────────────────┬────────────────────────────────────────────┘
│
┌──────────▼──────────┐
│ Load Orchestrator? │
└──────┬────────┬─────┘
│ │
Yes ◄─┘ └─► No (Fallback)
│ │
┌───────▼─────────┐ ┌───▼──────────────┐
│ Build Instance │ │ Yield Raw Input │
└───────┬─────────┘ └──────────────────┘
│
┌───────▼──────────────────────────────┐
│ Iterate Async Output Stream │
│ (Each chunk: {field, module, text}) │
└───────┬──────────────────────────────┘
│
┌───────▼───────────────────────────────────────┐
│ Detect Field/Module Change? │
│ If yes: Reset accumulator │
│ If no: Append to accumulator │
└───────┬──────────────────────────────┬────────┘
│ │
┌───────▼────────────────────────┐ │
│ Format & Yield │ │
│ [Module:field] accumulated_txt │ │
└───────┬────────────────────────┘ │
│ ┌──────────────────────┘
│ │
┌───────▼──────▼───────────────────┐
│ Sleep STREAM_DELAY (env config) │
└────────────┬────────────────────┘
│
Continue iterate? ──Yes──► Loop back to field check
│
No
│
┌─────────▼────────────┐
│ Return (End Stream) │
└──────────┬───────────┘
│
Handle Exception?
│
┌──────────▼──────────┐
│ Yield Error Message │
│ Then Return │
└──────────────────────┘
Integration Points¶
With Domain Orchestrator¶
The orchestrator must provide:
class OutputStream:
"""Expected async iterator from domain.orchestrator"""
async def __aiter__(self):
# Yield chunks with:
# chunk.signature_field_name: str
# chunk.module_name: str
# chunk.chunk: str
pass
With Gradio UI¶
async def chat_fn(message: str, history: List[Dict[str, Any]]):
"""
Gradio callback that:
1. Calls stream_main_output(message)
2. Yields each chunk to Gradio chatbot component
3. Updates message history
"""
async for chunk in stream_main_output(message):
yield chunk
# Gradio handles frontend update
Configuration¶
| Env Variable | Default | Purpose |
|---|---|---|
STREAM_DELAY | 0.1 | Sleep seconds between chunk yields (simulates typing) |
GRADIO_THEME | (auto) | Gradio UI theme (affects color, layout) |
Error Handling¶
try:
async for chunk in orchestrator.output_stream:
# Process chunk
except Exception as e:
yield f"❌ Error: {str(e)}"
# Frontend displays error message
Testing¶
CLI test helper available:
def test_stream_main_output():
"""Run stream_main_output() in isolation, prints chunks to stdout"""
pass
Usage:
python -c "from time_compass.interface.streaming import test_stream_main_output; test_stream_main_output()"
Notes¶
STREAM_DELAY可透過環境變數調整,以模擬或穩定顯示速度。- 請確保 domain 層的 orchestrator 提供相容的 chunk 物件(帶有
signature_field_name、module_name、chunk屬性),否則 fallback 行為會啟動。 - Async generator ensures non-blocking UI updates in Gradio.