跳轉到

Streaming Architecture

Goal

提供串流(streaming)相關的 helper 與管理:把來自 domain/orchestrator 的 chunked 輸出標準化並逐步推送到前端(或 CLI),並處理延遲、錯誤回報等機制。


Inputs

  • user_input: str(要送入 orchestrator 的文字)。
  • 環境變數:STREAM_DELAY(每個 chunk 的 sleep fallback)、Gradio theme 可選。
  • 可選的 build_streaming_orchestrator(從 domain 導入,若不可用會 fallback)。

Outputs

  • Async generator of string chunks(格式化後的串流文字,範例:[Module:field] text)。
  • 當 orchestrator 不可用時,回傳 input 作為 fallback。錯誤時回傳錯誤字串。

Signature

  • format_stream_output(module: str, field: str, text: str) -> str:將 chunk 包裝成易於顯示的格式。
  • async def stream_main_output(user_input: str) -> AsyncGenerator[str, None]:核心的 async generator;從 build_streaming_orchestrator() 取得 output_stream 並逐 chunk yield。若無 orchestrator,yield 原始輸入。
  • async def chat_fn(message: str, history: List[Dict[str, Any]]):對 Gradio chat 的串流回調,將 stream_main_output 的結果逐步 yield 回前端。
  • def test_stream_main_output():CLI 測試輔助函式。

Workflow & Upstream Mapping

  1. stream_main_output 被呼叫,若 build_streaming_orchestrator 可用,建立 orchestrator instance。
  2. 從 orchestrator 取得 async output stream(每個 chunk 可能包含 signature_field_name, module_name, chunk 等屬性)。
  3. 當偵測到新 field/module 時重置累積字串,並持續累加,yield 已格式化的累積內容給前端。每次 yield 後會根據 STREAM_DELAY 暫停。
  4. 若發生例外,yield 錯誤訊息以便前端顯示。

簡要流程:

User message → stream_main_output → orchestrator (async chunks) → format & yield → Frontend

State Machine

┌─────────────────────────────────────────────────────────────────┐
│  stream_main_output(user_input)                                │
└────────────────────┬────────────────────────────────────────────┘
                     │
          ┌──────────▼──────────┐
          │ Load Orchestrator?  │
          └──────┬────────┬─────┘
                 │        │
           Yes ◄─┘        └─► No (Fallback)
             │                 │
     ┌───────▼─────────┐   ┌───▼──────────────┐
     │ Build Instance  │   │ Yield Raw Input  │
     └───────┬─────────┘   └──────────────────┘
             │
     ┌───────▼──────────────────────────────┐
     │ Iterate Async Output Stream          │
     │ (Each chunk: {field, module, text})  │
     └───────┬──────────────────────────────┘
             │
     ┌───────▼───────────────────────────────────────┐
     │ Detect Field/Module Change?                   │
     │ If yes: Reset accumulator                     │
     │ If no: Append to accumulator                  │
     └───────┬──────────────────────────────┬────────┘
             │                              │
     ┌───────▼────────────────────────┐   │
     │ Format & Yield                 │   │
     │ [Module:field] accumulated_txt │   │
     └───────┬────────────────────────┘   │
             │      ┌──────────────────────┘
             │      │
     ┌───────▼──────▼───────────────────┐
     │ Sleep STREAM_DELAY (env config)  │
     └────────────┬────────────────────┘
                  │
        Continue iterate? ──Yes──► Loop back to field check
                  │
                 No
                  │
        ┌─────────▼────────────┐
        │ Return (End Stream)  │
        └──────────┬───────────┘
                   │
        Handle Exception?
                   │
        ┌──────────▼──────────┐
        │ Yield Error Message  │
        │ Then Return          │
        └──────────────────────┘

Integration Points

With Domain Orchestrator

The orchestrator must provide:

class OutputStream:
    """Expected async iterator from domain.orchestrator"""
    async def __aiter__(self):
        # Yield chunks with:
        # chunk.signature_field_name: str
        # chunk.module_name: str
        # chunk.chunk: str
        pass

With Gradio UI

async def chat_fn(message: str, history: List[Dict[str, Any]]):
    """
    Gradio callback that:
    1. Calls stream_main_output(message)
    2. Yields each chunk to Gradio chatbot component
    3. Updates message history
    """
    async for chunk in stream_main_output(message):
        yield chunk
        # Gradio handles frontend update

Configuration

Env Variable Default Purpose
STREAM_DELAY 0.1 Sleep seconds between chunk yields (simulates typing)
GRADIO_THEME (auto) Gradio UI theme (affects color, layout)

Error Handling

try:
    async for chunk in orchestrator.output_stream:
        # Process chunk
except Exception as e:
    yield f"❌ Error: {str(e)}"
    # Frontend displays error message

Testing

CLI test helper available:

def test_stream_main_output():
    """Run stream_main_output() in isolation, prints chunks to stdout"""
    pass

Usage:

python -c "from time_compass.interface.streaming import test_stream_main_output; test_stream_main_output()"


Notes

  • STREAM_DELAY 可透過環境變數調整,以模擬或穩定顯示速度。
  • 請確保 domain 層的 orchestrator 提供相容的 chunk 物件(帶有 signature_field_namemodule_namechunk 屬性),否則 fallback 行為會啟動。
  • Async generator ensures non-blocking UI updates in Gradio.