相關文件:
- 本文件涵蓋整個 merge pipeline 的上層流程設計
- 底層 merger 模組技術規格請參閱 docs/merger.md
- BLAST 過濾模組請參閱 docs/BLAST.md
- 對齊模組請參閱 docs/alignment.md
PowerBarcoder Merge Pipeline 是一個整合性的序列合併系統,負責協調 BLAST 比對、序列對齊和最終合併等步驟。整個流程由 src/merge/merge_pipeline.py 中的 MergePipeline 類別管理。
- 流程整合: 協調 BLAST、alignment、merger 三大子系統
- 資料流管理: 維持一致的檔案結構和資料流轉
- 錯誤控制: 提供統一的錯誤處理和回復機制
PowerBarcoder Main
├── run_merge(config, locus, locus_index)
└── MergePipeline
├── BlastManager # BLAST 資料庫建立與比對
├── nnSpliter # 序列拆分處理
├── src/merge/blast/ # BLAST 結果過濾
├── src/merge/align/ # 序列對齊處理
└── src/merge/merger/ # 核心合併模組
MergePipeline 類別位於 src/merge/merge_pipeline.py,負責統一管理整個合併流程。
主要屬性:
config: PowerBarcoderConfig- 系統配置物件path_manager: PowerBarcoderResultPathManager- 路徑管理器blast_manager: BlastManager- BLAST 操作管理器
核心方法:
run_merge_for_locus(locus: str, locus_index: int)- 執行單一 locus 的完整合併流程(在MergePipeline類別中)run_merge(config, locus, locus_index)- 模組層級的方便函數,會建立MergePipeline實例並呼叫run_merge_for_locus_run_nn_spliter(timing)- 由MergePipeline內部呼叫的 nnSpliter 執行器(參數用以標示是在 BLAST 前或後執行)BlastManager.prepare_query_file()/BlastManager.run_blastn()/BlastManager.create_blast_db()- 負責建立 BLAST 資料庫、準備查詢檔與執行blastn(實作於src/merge/merge_pipeline.py中的BlastManager)_run_alignment_pretreater()/_run_alignmenter()/_run_merger()- 分別呼叫 alignmentPretreater、alignmenter 與 merger 的入口函式
模組層級的執行入口位於 src/merge/merge_pipeline.py,提供 run_merge() 輔助函式,實際上會建立 MergePipeline 實例並呼叫 run_merge_for_locus()。上層程式(例如 src/powerBarcode.py)可以呼叫此函式或直接建立 MergePipeline。
執行流程(概要):
- 建立
MergePipeline實例 - 呼叫
run_merge_for_locus()執行單一 locus 的完整流程(包含 library 處理、nnSpliter、BLAST、對齊與合併) - 處理異常並回傳布林結果
使用範例:
from src.merge.merge_pipeline import run_merge
success = run_merge(config, "COI_1", 0)為每個 locus 建立專屬的 BLAST 參考資料庫。現行程式在建立 DB 前,會先依序判斷並「彈性合併」多種來源(多來源支援為新版特性):
來源優先順序與條件(_process_blast_library_options):
- 若設定
sseqidFileName_options[locus_index].use_dada2_merge = true且存在路徑result/{batch_name}/{locus}_result/mergeResult/dada2/merged,加入來源(可為目錄,後續展開所有 FASTA)。 - 若設定
sseqidFileName_options[locus_index].custom_library = true且sseqidFileName_custom_path[locus_index]存在,加入來源。 - 若上述來源均無,退回原始
sseqidFileName[locus_index]。 - 若來源是目錄,會自動收集副檔名為 .fasta|.fas|.fa|.fna 的所有檔案;若找不到符合副檔名,會再檢查無副檔名但首行以
>開頭的檔案。 - 來源數量 >1 時,合併為
{blast_root}/{locus}_combined_library.fasta;單一來源時直接使用其相對於ampliconInfo的路徑。
合併完成後,程式會「回寫覆蓋」原本的 config.sseqidFileName[locus_index](及 BlastManager 內部副本),後續 alignment 前處理會使用更新後的路徑(此為重要副作用)。
資料庫建立流程(合併後的最終檔案):
def create_blast_db(self, locus: str, locus_index: int) -> bool:
# 1. 設定路徑管理器
self.path_manager.set_locus(locus)
# 2. 準備輸入檔案
input_file = os.path.join(amplicon_info, sseqid_file)
output_db = os.path.join(blast_root, f"{locus}_refDB")
# 3. 執行 makeblastdb
makeblastdb_cmd = [
"makeblastdb", "-in", input_file,
"-dbtype", "nucl", "-out", output_db
]輸入: 若多來源合併則為 {blast_root}/{locus}_combined_library.fasta,否則為 ampliconInfo/{locus_sseqid_file}
輸出: {result_path}/{locus}_result/mergeResult/blast/{locus}_refDB.*
根據 blastReadChoosingMode[locus_index](此參數為 per-locus list)準備不同的查詢策略,輸出統一命名為 {blast_root}/{locus}_catQuery.fas:
模式說明:
- Mode 0: 直接使用 concat 檔(r1 + 10N + r2),延後拆分。實際來源為
nCatR1R2/內的 *.fas。 - Mode 1: 先行拆分,使用
r1/與r2/內各自的 *.fas。
nCatR1R2/ 在程式中被檢視以判斷是否有輸入 concat 序列;文件早期稱 nonmerged/,目前程式不再檢查 nonmerged/ 目錄(若存在視為 Legacy,後續可移除)。
BLAST 命令(實作細節):
程式會從 config.localBlastToolDir 指定的目錄呼叫本機 BLAST+ 可執行檔。當前實作組裝的命令範例如下(程式中使用 -outfmt 6):
{localBlastToolDir}/blastn -db {db_path} -query {query_file} -num_threads 20 -out {output_file} -outfmt 6
注意:num_threads 在程式碼中被設定為 20(硬編碼),若需變更請調整程式或將其轉為配置參數。
結果過濾與解析:
建立 BlastProcessorFactory 後呼叫 process_and_export():
過濾步驟(5+1,實作中的實際次序含可選第 0 步):
- (可選)依 ASV 名稱解析的豐度排名保留前
top_k_abundance(blastTopKAbundance[locus_index];設為 0 或 None 可視為關閉) - r1 與 r2 參考交集篩選
- 計算覆蓋長度(含 gap 懲罰)與一致性分數
- 針對每個 ASV 保留最大覆蓋長度配對
- 針對每個 ASV 保留最大一致性配對
- 以 (coverage * identity) 組合分數決定最終配對(平手取第一)
輸出檔案:{locus}_blastResult.txt(alignmentPretreater 相容 15 欄 tab 格式);文件先前提及的 refResult_filtered.txt 為舊版產物,現流程不再產生(保留於說明中僅會造成混淆,已移除)。
根據 blastReadChoosingMode[locus_index](list 形式)與 nCatR1R2/ 目錄是否存在 *.fas 決定:
- Mode 0: 若
nCatR1R2/有檔案,BLAST 先以 concat 執行,之後再_run_nn_spliter(..., "after_blast")拆分產生r1/、r2/。 - Mode 1: 先
_run_nn_spliter(..., "before_blast"),然後 BLAST 使用拆分後的r1/、r2/。
若 nCatR1R2/ 無檔案(空或缺),後續對齊與合併步驟會被跳過(流程僅建立 BLAST 結果)。
輸入格式: r1序列 + NNNNNNNNNN + r2序列
輸出: 獨立的 r1 和 r2 FASTA 檔案
處理流程:
# 1. 掃描輸入目錄
input_files = glob.glob(f"{input_path}/*.fas")
# 2. 逐檔案處理
for file in input_files:
r1_seq, r2_seq = split_sequence_by_nn(sequence)
write_r1_file(r1_seq)
write_r2_file(r2_seq)輸出目錄:
r1/- r1 序列檔案r2/- r2 序列檔案
調用 src/merge/align/ 模組進行序列對齊。
主要組件:
alignmentPretreater.py- 對齊前處理和方向判斷alignmenter.py- MAFFT 對齊執行
處理流程:
- 依據 BLAST 結果準備對齊輸入檔案
- 執行 MAFFT 四方對齊(r1, r2, ref, consensus)
- 產生
aligned/目錄下的對齊結果
詳細技術規格請參閱 docs/alignment.md
調用 src/merge/merger/ 模組執行核心合併邏輯。
處理特點:
- 使用滑窗合併演算法
- 自動假 gap 校正
- 並行處理支援
- 精確重疊區域處理
輸出結果:
rawMerged/- 含 gap 的原始合併結果merged/- 清理後的最終序列
詳細技術規格請參閱 docs/merger.md
{result_path}/{locus}_result/mergeResult/
├── nCatR1R2/ # r1+10N+r2 concat 序列(作為 Mode 0 的初始輸入)
│ ├── sample1.fas
│ └── sample2.fas
└── nonmerged/ (Legacy / 目前流程未引用)
├── sampleX_r1.fas
└── sampleX_r2.fas
{result_path}/{locus}_result/mergeResult/
├── r1/ # 拆分後的 r1 序列
├── r2/ # 拆分後的 r2 序列
├── blast/ # BLAST 相關檔案
│ ├── {locus}_refDB.* # BLAST 資料庫
│ ├── {locus}_refResult.txt # 原始 BLAST outfmt6 結果
│ ├── {locus}_blastResult.txt # 5+1 過濾後(15 欄 tab,alignmentPretreater 用)
│ ├── {locus}_combined_library.fasta # (可選)多來源或目錄合併後的參考檔案
│ └── {locus}_catQuery.fas # 組裝後查詢檔案(Mode 0 或 Mode 1)
└── aligned/ # 對齊結果
├── sample1_r1.fas
├── sample1_r2.fas
└── mafft/ # MAFFT 暫存檔案
{result_path}/{locus}_result/mergeResult/merger/
├── rawMerged/ # 原始合併結果(含 gap)
│ ├── sample1.fas
│ └── sample2.fas
└── merged/ # 最終清理結果
├── sample1.fas
└── sample2.fas
BLAST 相關:
blastReadChoosingMode: locus-indexed list,每個 locus 可為 0 或 1localBlastToolDir: BLAST 工具目錄路徑sseqidFileName: 各 locus 原始參考序列檔案名(可能被多來源合併流程覆寫)sseqidFileName_options: 多來源選項(use_dada2_merge,custom_library)sseqidFileName_custom_path: 自定義 library 路徑(locus-indexed)blastTopKAbundance: 豐度前 K 之 ASV 保留(0 或空可視為關閉)
路徑管理:
resultDataPath: 結果資料根目錄ampliconInfo: 參考序列目錄
處理控制:
- 並行處理 worker 數量(merger 模組預設 5;實際值請參考
merger/core.py) enable_4way_alignment/enable_overlap_realign/enable_fake_gap_correction/enable_sliding_extension:locus-indexed 布林旗標,不足長度時預設 True- 各階段的重試機制和超時設定
必要工具:
- BLAST+ suite (makeblastdb, blastn)
- MAFFT 序列對齊工具
Python 依賴:
- subprocess: 外部工具調用
- concurrent.futures: 並行處理
- pathlib: 路徑管理
- glob: 檔案搜尋
graph TB
A[PowerBarcoder Main] --> B[run_merge]
B --> C[MergePipeline.run_merge_for_locus]
C --> D{blastReadChoosingMode}
D -->|Mode 1| E[前拆分: nnSpliter]
D -->|Mode 0| F[跳過前拆分]
E --> G[BLAST 比對流程]
F --> G
G --> H[BlastManager.create_blast_db]
G --> I[prepare_query_file]
G --> J[run_blastn]
J --> K{檢查 nonmerged 檔案}
K -->|無檔案| L[流程結束]
K -->|有檔案| M{blastReadChoosingMode}
M -->|Mode 0| N[後拆分: nnSpliter]
M -->|Mode 1| O[跳過後拆分]
N --> P[BLAST 結果解析]
O --> P
P --> Q[對齊預處理]
Q --> R[序列對齊]
R --> S[序列合併]
S --> T[rawMerged 輸出]
T --> U[merged 最終輸出]
%% 錯誤處理
G -.-> V[BLAST 錯誤處理]
P -.-> W[解析錯誤處理]
Q -.-> X[對齊錯誤處理]
R -.-> Y[對齊錯誤處理]
S -.-> Z[合併錯誤處理]
%% 設定樣式
classDef processBox fill:#e1f5fe
classDef decisionBox fill:#fff3e0
classDef outputBox fill:#e8f5e8
classDef errorBox fill:#ffebee
classDef skipBox fill:#f3e5f5
class A,B,C,E,G,H,I,J,N,P,Q,R,S processBox
class D,M decisionBox
class K checkBox
class T,U outputBox
class F,O skipBox
class V,W,X,Y,Z errorBox
graph TB
A[輸入序列] --> B{blastReadChoosingMode}
B -->|Mode 1| C[前拆分 nnSpliter]
B -->|Mode 0| D[跳過前拆分]
C --> E[r1/, r2/ 分離結果]
D --> F[nCatR1R2/ 合併序列]
E --> G[prepare_query_file]
F --> G
G --> H[合併查詢檔案]
H --> I[執行 blastn 命令]
I --> J[原始 BLAST 結果]
J --> K{檢查檔案數量}
K -->|無檔案| L[跳過對齊與合併]
K -->|有檔案| M{blastReadChoosingMode}
M -->|Mode 0| N[後拆分 nnSpliter]
M -->|Mode 1| O[跳過後拆分]
N --> P[BLAST 結果解析]
O --> P
P --> Q["5+1 過濾 (含可選 TopK)"]
Q --> R[最終配對結果]
%% 平行處理參考資料庫
S[ampliconInfo 參考序列] --> T[makeblastdb]
T --> U[BLAST 資料庫]
U --> I
classDef inputBox fill:#e3f2fd
classDef processBox fill:#e8f5e8
classDef outputBox fill:#fff3e0
classDef skipBox fill:#f3e5f5
class A,S inputBox
class C,E,G,H,I,N,P,Q,T processBox
class R,U outputBox
class D,O skipBox
graph TB
A[BLAST 配對結果] --> B[對齊預處理階段]
subgraph "對齊預處理階段"
B --> C[alignmentPretreater]
C --> D[方向判斷與前處理]
end
subgraph "序列對齊階段"
D --> E[alignmenter]
E --> F[MAFFT 四方對齊]
F --> G[產生 aligned/ 結果]
end
subgraph "序列合併階段"
G --> H[run_merger_process]
H --> I[載入對齊序列]
subgraph "核心合併處理"
I --> J[滑動窗口合併]
J --> K[重疊區域處理]
K --> L[假 gap 檢測]
L --> M[gap 校正]
end
subgraph "結果輸出"
M --> N[rawMerged/ 原始結果]
N --> O[gap 清理]
O --> P[merged/ 最終結果]
end
end
classDef stageBox fill:#e1f5fe,stroke:#01579b,stroke-width:2px
classDef processBox fill:#e8f5e8,stroke:#2e7d32
classDef subProcessBox fill:#fff3e0,stroke:#ef6c00
classDef outputBox fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
class B,E,H stageBox
class C,D,F,I processBox
class J,K,L,M,N,O subProcessBox
class P outputBox
graph TB
subgraph "輸入資料"
A1[nCatR1R2/]
A2[nonmerged/]
A3[ampliconInfo/]
end
subgraph "BLAST 處理"
B1[r1/]
B2[r2/]
B3[blast/]
B4[BLAST DB]
end
subgraph "對齊處理"
C1[aligned/]
C2[mafft/]
end
subgraph "合併處理"
D1[merger/rawMerged/]
D2[merger/merged/]
end
A1 --> B1
A1 --> B2
A2 --> B1
A2 --> B2
A3 --> B4
B1 --> B3
B2 --> B3
B4 --> B3
B3 --> C1
C1 --> C2
C2 --> C1
C1 --> D1
D1 --> D2
%% 條件分支
B5{blastReadChoosingMode}
B5 -->|Mode 1| B1
B5 -->|Mode 0| A1
classDef inputDir fill:#e3f2fd
classDef blastDir fill:#fff3e0
classDef alignDir fill:#e1f5fe
classDef mergeDir fill:#e8f5e8
classDef decisionBox fill:#f3e5f5
class A1,A2,A3 inputDir
class B1,B2,B3,B4 blastDir
class C1,C2 alignDir
class D1,D2 mergeDir
class B5 decisionBox
並行化策略:
- BLAST 階段: 單執行緒(避免資料庫鎖定)
- Alignment 階段: 檔案級並行
- Merger 階段: 樣本級並行(5 workers)
記憶體管理:
- 串流處理大型檔案
- 及時清理暫存檔案
- 進程隔離避免記憶體洩漏
症狀: MergePipeline 建立失敗
解決方案:
- 檢查配置檔案格式
- 驗證路徑存在性
- 確認工具可執行性
症狀: makeblastdb 返回錯誤
解決方案:
- 檢查參考序列檔案格式
- 驗證 BLAST 工具安裝
- 確認檔案讀寫權限
症狀: nnSpliter 無法正確拆分
解決方案:
- 檢查輸入序列格式
- 驗證 NNNNNNNNNN 分隔符
- 確認檔案編碼格式
PowerBarcoder Merge Pipeline 提供了一個完整、穩定的序列合併解決方案。透過模組化設計和統一的流程控制,系統能夠高效地處理大量的序列資料,同時保持良好的可維護性和擴展性。各個子系統的協調運作確保了從 BLAST 比對到最終合併的整個流程的一致性和可靠性。