Samuel Toh
Back to blog

從土法煉鋼到系統化:設計一個能 scale 的資料 pipeline

有一段時間,我在工作上負責幫產品接入不同地區的結構化資料。產品的核心是一個 AI agent,需要從這些資料裡做 semantic search 和 keyword search 來回答使用者的問題。每個地區的資料來源格式不一樣、語言不一樣、更新頻率也不一樣,有些甚至有存取限制。

第一個資料源做完的時候,我覺得還好 — 寫個 scraper、處理一下格式、塞進 DB、建個 search service,end-to-end 能跑了。第二個開始做的時候,我發現很多 code 概念上是重複的,但細節又不一樣,沒辦法直接複製。到第三個的時候,我開始覺得痛了 — 每次 onboard 新的資料源都在重新踩差不多的坑,而且每個人接手的時候都要從頭搞清楚整個流程。

這時候我意識到,我需要的不是再寫一個 scraper,而是一個 framework。


我把整個 data pipeline 拆成六層,每一層解決一個問題:資料取得、資料儲存、資料維護、檢索服務、agent tool、onboarding 流程。會這樣分,是因為我之前吃過虧。有一次我 end-to-end 做完整條 pipeline,上線後才發現 schema 設計有問題,結果上下游全部要跟著改,花了很多時間在收拾。分層之後,每一層都有明確的 deliverable — Layer 1 做完你應該有乾淨的結構化資料,Layer 2 做完你應該能在 DB 裡跑 query 拿到正確結果。不用等到全部串起來才發現某一層有問題。


在設計的過程中,有幾個 decision 我覺得蠻值得分享的。

Schema-first:先定義資料結構,再寫 scraper。

以前我的習慣是先把資料抓下來看看長什麼樣,再決定怎麼存。一個資料源的時候沒問題,但多個來源的時候,這種做法會讓每個來源的 schema 長得完全不一樣,下游的 service 就要寫一堆 if-else。後來我改成先用 Pydantic model 把資料結構定好,當作 contract — 所有來源共用一組核心欄位,再各自加 extension fields。這樣下游可以針對核心欄位寫通用邏輯,不用管資料從哪來。這個改變聽起來很小,但對整個 pipeline 的 maintainability 影響很大。

Data version control:追蹤每一筆資料的歷史。

同一筆資料今天跟三個月前的內容可能不一樣,而你的系統可能需要回答「三個月前這筆資料長什麼樣」這種問題。如果你每次更新都直接覆蓋舊的,這種查詢就做不到。

我的做法是每筆資料都帶 version_hasheffective_time,用 is_current_version flag 標記最新版本。更新的時候不是覆蓋,而是新增一筆新版本,把舊版的 is_current_version 設成 false。這樣一般查詢只拿 is_current_version=true 的資料,速度不受影響;需要查歷史的時候,用 effective_time 去撈對應時間點的版本。

搭配這個機制,我還設計了一個 tombstone pattern 來處理上游資料被刪除的情況 — 不直接從 DB 移除,而是建一筆 is_removed=Trueembedding=None 的 record。這樣做是為了避免歷史查詢拿到已經不存在的資料,也讓整個 version history 保持完整。

兩層 change detection:省掉 99% 的 embedding 成本。

有了 version control 的機制之後,下一個問題是怎麼有效率地偵測上游的變更。如果每次都把所有資料重新抓一遍、重新算 embedding,cost 會很可怕。

我設計了兩層機制 — 第一層先看整份資料的 last_modified_date 有沒有變,大部分資料在任何一次 update cycle 裡都不會變,這層就能過濾掉 99% 以上。只有真的變了的才進第二層,用 version_hash 比對具體哪些內容有更新,只對這些建立新版本、重新算 embedding。實際跑起來,原本每次要處理幾萬筆 embedding 的工作量降到幾十到幾百筆,省下來的不只是 API cost,還有 pipeline 的執行時間。


Infra 方面也踩了不少坑。維護層跑在 AWS 上 — ECS Fargate 跑定期更新、Lambda 跑 health check、EventBridge scheduler 排程、S3 存 checkpoint。 有一次我在 Mac 上 build Docker image push 到 ECR,ECS 怎麼都跑不起來,後來才發現是 arm64 跟 x86 的 architecture 不一致。加一個 --platform linux/amd64 就好了,但第一次遇到真的會卡很久。 另一個讓我印象深刻的是 checkpoint 機制。有些資料源的網站不太穩定,scraper 跑到一半常常會斷。一開始每次斷掉都要從頭跑,非常浪費時間。後來我做了 resumable checkpoint — 每處理完一批就存進度到 S3,中斷了下次從上次的位置繼續。這在 scraping 這種充滿不確定性的場景裡,幫我省了非常多時間。後來我也把這些經驗都寫進 checklist 裡,讓接手的人不用再踩一次。


我覺得這整件事最有價值的產出不是 code,而是那六層 checklist。

新的資料源進來的時候,工程師不用從零開始想「我該做哪些事」,按 checklist 走就好 — 每一層該產出什麼、驗收標準是什麼、常見的坑在哪裡,都寫在裡面。後來新同事接手 onboard 新的資料源,靠著 checklist 跟現有 code 當參考,兩個禮拜就從零到上線。對我來說這是最有成就感的部分。不是因為寫了什麼厲害的 code,而是把一個原本只在我腦袋裡的流程,變成任何人都能 follow 的系統。

這段經歷讓我學到一件事:當你發現自己在重複做類似的事情,每次都踩差不多的坑,那就是該停下來好好設計 framework 的時候。短期看起來像在浪費時間做 infrastructure,但長期來看,這才是讓團隊能 scale 的關鍵。