Skip to content

Latest commit

 

History

History
570 lines (403 loc) · 17.5 KB

File metadata and controls

570 lines (403 loc) · 17.5 KB

參考序列導引合併 (Reference-Guided Merge)

相關文件

1. Pipeline 概述

PowerBarcoder Merge Pipeline 是一個整合性的序列合併系統,負責協調 BLAST 比對、序列對齊和最終合併等步驟。整個流程由 src/merge/merge_pipeline.py 中的 MergePipeline 類別管理。

1.1 主要目標

  • 流程整合: 協調 BLAST、alignment、merger 三大子系統
  • 資料流管理: 維持一致的檔案結構和資料流轉
  • 錯誤控制: 提供統一的錯誤處理和回復機制

1.2 系統架構

PowerBarcoder Main
├── run_merge(config, locus, locus_index)
└── MergePipeline
    ├── BlastManager                    # BLAST 資料庫建立與比對
    ├── nnSpliter                      # 序列拆分處理
    ├── src/merge/blast/               # BLAST 結果過濾
    ├── src/merge/align/               # 序列對齊處理
    └── src/merge/merger/              # 核心合併模組

2. 流程控制 (MergePipeline)

2.1 類別設計

MergePipeline 類別位於 src/merge/merge_pipeline.py,負責統一管理整個合併流程。

主要屬性:

  • config: PowerBarcoderConfig - 系統配置物件
  • path_manager: PowerBarcoderResultPathManager - 路徑管理器
  • blast_manager: BlastManager - BLAST 操作管理器

核心方法:

  • run_merge_for_locus(locus: str, locus_index: int) - 執行單一 locus 的完整合併流程(在 MergePipeline 類別中)
  • run_merge(config, locus, locus_index) - 模組層級的方便函數,會建立 MergePipeline 實例並呼叫 run_merge_for_locus
  • _run_nn_spliter(timing) - 由 MergePipeline 內部呼叫的 nnSpliter 執行器(參數用以標示是在 BLAST 前或後執行)
  • BlastManager.prepare_query_file() / BlastManager.run_blastn() / BlastManager.create_blast_db() - 負責建立 BLAST 資料庫、準備查詢檔與執行 blastn(實作於 src/merge/merge_pipeline.py 中的 BlastManager)
  • _run_alignment_pretreater() / _run_alignmenter() / _run_merger() - 分別呼叫 alignmentPretreater、alignmenter 與 merger 的入口函式

2.2 流程入口

run_merge(config: PowerBarcoderConfig, locus: str, locus_index: int)

模組層級的執行入口位於 src/merge/merge_pipeline.py,提供 run_merge() 輔助函式,實際上會建立 MergePipeline 實例並呼叫 run_merge_for_locus()。上層程式(例如 src/powerBarcode.py)可以呼叫此函式或直接建立 MergePipeline

執行流程(概要):

  1. 建立 MergePipeline 實例
  2. 呼叫 run_merge_for_locus() 執行單一 locus 的完整流程(包含 library 處理、nnSpliter、BLAST、對齊與合併)
  3. 處理異常並回傳布林結果

使用範例:

from src.merge.merge_pipeline import run_merge

success = run_merge(config, "COI_1", 0)

3. 處理階段詳解

3.1 BLAST 管理 (BlastManager)

3.1.1 參考 Library 處理與資料庫建立

為每個 locus 建立專屬的 BLAST 參考資料庫。現行程式在建立 DB 前,會先依序判斷並「彈性合併」多種來源(多來源支援為新版特性):

來源優先順序與條件(_process_blast_library_options):

  1. 若設定 sseqidFileName_options[locus_index].use_dada2_merge = true 且存在路徑 result/{batch_name}/{locus}_result/mergeResult/dada2/merged,加入來源(可為目錄,後續展開所有 FASTA)。
  2. 若設定 sseqidFileName_options[locus_index].custom_library = truesseqidFileName_custom_path[locus_index] 存在,加入來源。
  3. 若上述來源均無,退回原始 sseqidFileName[locus_index]
  4. 若來源是目錄,會自動收集副檔名為 .fasta|.fas|.fa|.fna 的所有檔案;若找不到符合副檔名,會再檢查無副檔名但首行以 > 開頭的檔案。
  5. 來源數量 >1 時,合併為 {blast_root}/{locus}_combined_library.fasta;單一來源時直接使用其相對於 ampliconInfo 的路徑。

合併完成後,程式會「回寫覆蓋」原本的 config.sseqidFileName[locus_index](及 BlastManager 內部副本),後續 alignment 前處理會使用更新後的路徑(此為重要副作用)。

資料庫建立流程(合併後的最終檔案):

def create_blast_db(self, locus: str, locus_index: int) -> bool:
    # 1. 設定路徑管理器
    self.path_manager.set_locus(locus)
    
    # 2. 準備輸入檔案
    input_file = os.path.join(amplicon_info, sseqid_file)
    output_db = os.path.join(blast_root, f"{locus}_refDB")
    
    # 3. 執行 makeblastdb
    makeblastdb_cmd = [
        "makeblastdb", "-in", input_file, 
        "-dbtype", "nucl", "-out", output_db
    ]

輸入: 若多來源合併則為 {blast_root}/{locus}_combined_library.fasta,否則為 ampliconInfo/{locus_sseqid_file}

輸出: {result_path}/{locus}_result/mergeResult/blast/{locus}_refDB.*

3.1.2 查詢檔案準備

根據 blastReadChoosingMode[locus_index](此參數為 per-locus list)準備不同的查詢策略,輸出統一命名為 {blast_root}/{locus}_catQuery.fas

模式說明:

  • Mode 0: 直接使用 concat 檔(r1 + 10N + r2),延後拆分。實際來源為 nCatR1R2/ 內的 *.fas。
  • Mode 1: 先行拆分,使用 r1/r2/ 內各自的 *.fas。

nCatR1R2/ 在程式中被檢視以判斷是否有輸入 concat 序列;文件早期稱 nonmerged/,目前程式不再檢查 nonmerged/ 目錄(若存在視為 Legacy,後續可移除)。

3.1.3 BLAST 執行與結果處理

BLAST 命令(實作細節):

程式會從 config.localBlastToolDir 指定的目錄呼叫本機 BLAST+ 可執行檔。當前實作組裝的命令範例如下(程式中使用 -outfmt 6):

{localBlastToolDir}/blastn -db {db_path} -query {query_file} -num_threads 20 -out {output_file} -outfmt 6

注意:num_threads 在程式碼中被設定為 20(硬編碼),若需變更請調整程式或將其轉為配置參數。

結果過濾與解析:

建立 BlastProcessorFactory 後呼叫 process_and_export()

過濾步驟(5+1,實作中的實際次序含可選第 0 步):

  1. (可選)依 ASV 名稱解析的豐度排名保留前 top_k_abundance(blastTopKAbundance[locus_index];設為 0 或 None 可視為關閉)
  2. r1 與 r2 參考交集篩選
  3. 計算覆蓋長度(含 gap 懲罰)與一致性分數
  4. 針對每個 ASV 保留最大覆蓋長度配對
  5. 針對每個 ASV 保留最大一致性配對
  6. 以 (coverage * identity) 組合分數決定最終配對(平手取第一)

輸出檔案:{locus}_blastResult.txt(alignmentPretreater 相容 15 欄 tab 格式);文件先前提及的 refResult_filtered.txt 為舊版產物,現流程不再產生(保留於說明中僅會造成混淆,已移除)。

3.2 序列拆分 (nnSpliter)

3.2.1 執行時機與條件

根據 blastReadChoosingMode[locus_index](list 形式)與 nCatR1R2/ 目錄是否存在 *.fas 決定:

  • Mode 0: 若 nCatR1R2/ 有檔案,BLAST 先以 concat 執行,之後再 _run_nn_spliter(..., "after_blast") 拆分產生 r1/r2/
  • Mode 1: 先 _run_nn_spliter(..., "before_blast"),然後 BLAST 使用拆分後的 r1/r2/

nCatR1R2/ 無檔案(空或缺),後續對齊與合併步驟會被跳過(流程僅建立 BLAST 結果)。

3.2.2 拆分邏輯

輸入格式: r1序列 + NNNNNNNNNN + r2序列

輸出: 獨立的 r1 和 r2 FASTA 檔案

處理流程:

# 1. 掃描輸入目錄
input_files = glob.glob(f"{input_path}/*.fas")

# 2. 逐檔案處理
for file in input_files:
    r1_seq, r2_seq = split_sequence_by_nn(sequence)
    write_r1_file(r1_seq)
    write_r2_file(r2_seq)

輸出目錄:

  • r1/ - r1 序列檔案
  • r2/ - r2 序列檔案

3.3 序列對齊處理

調用 src/merge/align/ 模組進行序列對齊。

主要組件:

  • alignmentPretreater.py - 對齊前處理和方向判斷
  • alignmenter.py - MAFFT 對齊執行

處理流程:

  1. 依據 BLAST 結果準備對齊輸入檔案
  2. 執行 MAFFT 四方對齊(r1, r2, ref, consensus)
  3. 產生 aligned/ 目錄下的對齊結果

詳細技術規格請參閱 docs/alignment.md

3.4 最終合併處理

調用 src/merge/merger/ 模組執行核心合併邏輯。

處理特點:

  • 使用滑窗合併演算法
  • 自動假 gap 校正
  • 並行處理支援
  • 精確重疊區域處理

輸出結果:

  • rawMerged/ - 含 gap 的原始合併結果
  • merged/ - 清理後的最終序列

詳細技術規格請參閱 docs/merger.md

4. 檔案流與目錄結構

4.1 輸入資料結構

{result_path}/{locus}_result/mergeResult/
├── nCatR1R2/                          # r1+10N+r2 concat 序列(作為 Mode 0 的初始輸入)
│   ├── sample1.fas
│   └── sample2.fas
└── nonmerged/ (Legacy / 目前流程未引用)
    ├── sampleX_r1.fas
    └── sampleX_r2.fas

4.2 中間處理檔案

{result_path}/{locus}_result/mergeResult/
├── r1/                                # 拆分後的 r1 序列
├── r2/                                # 拆分後的 r2 序列
├── blast/                             # BLAST 相關檔案
│   ├── {locus}_refDB.*                # BLAST 資料庫
│   ├── {locus}_refResult.txt          # 原始 BLAST outfmt6 結果
│   ├── {locus}_blastResult.txt        # 5+1 過濾後(15 欄 tab,alignmentPretreater 用)
│   ├── {locus}_combined_library.fasta # (可選)多來源或目錄合併後的參考檔案
│   └── {locus}_catQuery.fas           # 組裝後查詢檔案(Mode 0 或 Mode 1)
└── aligned/                           # 對齊結果
    ├── sample1_r1.fas
    ├── sample1_r2.fas
    └── mafft/                        # MAFFT 暫存檔案

4.3 最終輸出結構

{result_path}/{locus}_result/mergeResult/merger/
├── rawMerged/                         # 原始合併結果(含 gap)
│   ├── sample1.fas
│   └── sample2.fas
└── merged/                            # 最終清理結果
    ├── sample1.fas
    └── sample2.fas

5. 配置與參數

5.1 關鍵配置參數(新增多來源與對齊旗標)

BLAST 相關:

  • blastReadChoosingMode: locus-indexed list,每個 locus 可為 0 或 1
  • localBlastToolDir: BLAST 工具目錄路徑
  • sseqidFileName: 各 locus 原始參考序列檔案名(可能被多來源合併流程覆寫)
  • sseqidFileName_options: 多來源選項(use_dada2_merge, custom_library)
  • sseqidFileName_custom_path: 自定義 library 路徑(locus-indexed)
  • blastTopKAbundance: 豐度前 K 之 ASV 保留(0 或空可視為關閉)

路徑管理:

  • resultDataPath: 結果資料根目錄
  • ampliconInfo: 參考序列目錄

處理控制:

  • 並行處理 worker 數量(merger 模組預設 5;實際值請參考 merger/core.py)
  • enable_4way_alignment / enable_overlap_realign / enable_fake_gap_correction / enable_sliding_extension:locus-indexed 布林旗標,不足長度時預設 True
  • 各階段的重試機制和超時設定

5.2 環境依賴

必要工具:

  • BLAST+ suite (makeblastdb, blastn)
  • MAFFT 序列對齊工具

Python 依賴:

  • subprocess: 外部工具調用
  • concurrent.futures: 並行處理
  • pathlib: 路徑管理
  • glob: 檔案搜尋

6. 流程圖

6.1 整體流程架構

graph TB
    A[PowerBarcoder Main] --> B[run_merge]
    B --> C[MergePipeline.run_merge_for_locus]

    C --> D{blastReadChoosingMode}
    D -->|Mode 1| E[前拆分: nnSpliter]
    D -->|Mode 0| F[跳過前拆分]

    E --> G[BLAST 比對流程]
    F --> G

    G --> H[BlastManager.create_blast_db]
    G --> I[prepare_query_file]
    G --> J[run_blastn]

    J --> K{檢查 nonmerged 檔案}
    K -->|無檔案| L[流程結束]
    K -->|有檔案| M{blastReadChoosingMode}

    M -->|Mode 0| N[後拆分: nnSpliter]
    M -->|Mode 1| O[跳過後拆分]

    N --> P[BLAST 結果解析]
    O --> P

    P --> Q[對齊預處理]
    Q --> R[序列對齊]
    R --> S[序列合併]

    S --> T[rawMerged 輸出]
    T --> U[merged 最終輸出]

    %% 錯誤處理
    G -.-> V[BLAST 錯誤處理]
    P -.-> W[解析錯誤處理]
    Q -.-> X[對齊錯誤處理]
    R -.-> Y[對齊錯誤處理]
    S -.-> Z[合併錯誤處理]

    %% 設定樣式
    classDef processBox fill:#e1f5fe
    classDef decisionBox fill:#fff3e0
    classDef outputBox fill:#e8f5e8
    classDef errorBox fill:#ffebee
    classDef skipBox fill:#f3e5f5

    class A,B,C,E,G,H,I,J,N,P,Q,R,S processBox
    class D,M decisionBox
    class K checkBox
    class T,U outputBox
    class F,O skipBox
    class V,W,X,Y,Z errorBox
Loading

6.2 BLAST 處理流程

graph TB
    A[輸入序列] --> B{blastReadChoosingMode}
    
    B -->|Mode 1| C[前拆分 nnSpliter]
    B -->|Mode 0| D[跳過前拆分]
    
    C --> E[r1/, r2/ 分離結果]
    D --> F[nCatR1R2/ 合併序列]
    
    E --> G[prepare_query_file]
    F --> G
    
    G --> H[合併查詢檔案]
    H --> I[執行 blastn 命令]
    I --> J[原始 BLAST 結果]
    
    J --> K{檢查檔案數量}
    K -->|無檔案| L[跳過對齊與合併]
    K -->|有檔案| M{blastReadChoosingMode}
    
    M -->|Mode 0| N[後拆分 nnSpliter]
    M -->|Mode 1| O[跳過後拆分]
    
    N --> P[BLAST 結果解析]
    O --> P
    
    P --> Q["5+1 過濾 (含可選 TopK)"]
    Q --> R[最終配對結果]
    
    %% 平行處理參考資料庫
    S[ampliconInfo 參考序列] --> T[makeblastdb]
    T --> U[BLAST 資料庫]
    U --> I
    
    classDef inputBox fill:#e3f2fd
    classDef processBox fill:#e8f5e8  
    classDef outputBox fill:#fff3e0
    classDef skipBox fill:#f3e5f5
    
    class A,S inputBox
    class C,E,G,H,I,N,P,Q,T processBox
    class R,U outputBox
    class D,O skipBox
Loading

6.3 序列對齊與合併流程

graph TB
    A[BLAST 配對結果] --> B[對齊預處理階段]
    
    subgraph "對齊預處理階段"
        B --> C[alignmentPretreater]
        C --> D[方向判斷與前處理]
    end
    
    subgraph "序列對齊階段"
        D --> E[alignmenter]
        E --> F[MAFFT 四方對齊]
        F --> G[產生 aligned/ 結果]
    end
    
    subgraph "序列合併階段"
        G --> H[run_merger_process]
        H --> I[載入對齊序列]
        
        subgraph "核心合併處理"
            I --> J[滑動窗口合併]
            J --> K[重疊區域處理]
            K --> L[假 gap 檢測]
            L --> M[gap 校正]
        end
        
        subgraph "結果輸出"
            M --> N[rawMerged/ 原始結果]
            N --> O[gap 清理]
            O --> P[merged/ 最終結果]
        end
    end
    
    classDef stageBox fill:#e1f5fe,stroke:#01579b,stroke-width:2px
    classDef processBox fill:#e8f5e8,stroke:#2e7d32
    classDef subProcessBox fill:#fff3e0,stroke:#ef6c00
    classDef outputBox fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
    
    class B,E,H stageBox
    class C,D,F,I processBox
    class J,K,L,M,N,O subProcessBox
    class P outputBox
Loading

6.4 資料流向圖

graph TB
    subgraph "輸入資料"
        A1[nCatR1R2/]
        A2[nonmerged/]
        A3[ampliconInfo/]
    end
    
    subgraph "BLAST 處理"
        B1[r1/]
        B2[r2/] 
        B3[blast/]
        B4[BLAST DB]
    end
    
    subgraph "對齊處理"
        C1[aligned/]
        C2[mafft/]
    end
    
    subgraph "合併處理"
        D1[merger/rawMerged/]
        D2[merger/merged/]
    end
    
    A1 --> B1
    A1 --> B2
    A2 --> B1
    A2 --> B2
    A3 --> B4
    
    B1 --> B3
    B2 --> B3
    B4 --> B3
    
    B3 --> C1
    C1 --> C2
    C2 --> C1
    
    C1 --> D1
    D1 --> D2
    
    %% 條件分支
    B5{blastReadChoosingMode}
    B5 -->|Mode 1| B1
    B5 -->|Mode 0| A1
    
    classDef inputDir fill:#e3f2fd
    classDef blastDir fill:#fff3e0
    classDef alignDir fill:#e1f5fe
    classDef mergeDir fill:#e8f5e8
    classDef decisionBox fill:#f3e5f5
    
    class A1,A2,A3 inputDir
    class B1,B2,B3,B4 blastDir
    class C1,C2 alignDir
    class D1,D2 mergeDir
    class B5 decisionBox
Loading

7. 效能考量

7.1 處理效能

並行化策略:

  • BLAST 階段: 單執行緒(避免資料庫鎖定)
  • Alignment 階段: 檔案級並行
  • Merger 階段: 樣本級並行(5 workers)

記憶體管理:

  • 串流處理大型檔案
  • 及時清理暫存檔案
  • 進程隔離避免記憶體洩漏

8. 疑難排解

8.1 常見問題

Pipeline 初始化失敗

症狀: MergePipeline 建立失敗

解決方案:

  1. 檢查配置檔案格式
  2. 驗證路徑存在性
  3. 確認工具可執行性

BLAST 資料庫建立失敗

症狀: makeblastdb 返回錯誤

解決方案:

  1. 檢查參考序列檔案格式
  2. 驗證 BLAST 工具安裝
  3. 確認檔案讀寫權限

序列拆分異常

症狀: nnSpliter 無法正確拆分

解決方案:

  1. 檢查輸入序列格式
  2. 驗證 NNNNNNNNNN 分隔符
  3. 確認檔案編碼格式

結語

PowerBarcoder Merge Pipeline 提供了一個完整、穩定的序列合併解決方案。透過模組化設計和統一的流程控制,系統能夠高效地處理大量的序列資料,同時保持良好的可維護性和擴展性。各個子系統的協調運作確保了從 BLAST 比對到最終合併的整個流程的一致性和可靠性。