@@ -41,76 +41,74 @@ type MergedData struct {
4141// Merge reads Activity Statement CSVs and Flex Query cached data for all accounts,
4242// deduplicates trades, and returns the merged result.
4343// The dataDirV1Path is the versioned data directory containing per-account subdirectories.
44+ // The activityStatementsDirPath is the directory containing per-account CSV subdirectories.
4445// The accountAliases map contains alias → account ID mappings from config.
4546func Merge (
46- csvStatements []* ibkractivitycsv.ActivityStatement ,
4747 dataDirV1Path string ,
48+ activityStatementsDirPath string ,
4849 accountAliases map [string ]string ,
4950) (* MergedData , error ) {
5051 tradeMap := make (map [string ]* datav1.Trade )
5152 var allPositions []* datav1.Position
5253 var allTransfers []* datav1.Transfer
5354 var allTradeTransfers []* datav1.TradeTransfer
5455 var allCorporateActions []* datav1.CorporateAction
55- // Load Flex Query cached data per account .
56+ // Process each account: load Flex Query cache + Activity Statement CSVs .
5657 for alias := range accountAliases {
58+ // Load Flex Query cached data for this account.
5759 accountDir := filepath .Join (dataDirV1Path , alias )
58- // Read trades for this account.
5960 tradesPath := filepath .Join (accountDir , "trades.json" )
6061 cachedTrades , err := protoio .ReadMessagesJSON (tradesPath , func () * datav1.Trade { return & datav1.Trade {} })
6162 if err == nil {
6263 for _ , trade := range cachedTrades {
63- key := tradeKey (trade )
64- tradeMap [key ] = trade
64+ tradeMap [tradeKey (trade )] = trade
6565 }
6666 }
67- // Read positions for this account.
6867 positionsPath := filepath .Join (accountDir , "positions.json" )
6968 positions , err := protoio .ReadMessagesJSON (positionsPath , func () * datav1.Position { return & datav1.Position {} })
7069 if err == nil {
7170 allPositions = append (allPositions , positions ... )
7271 }
73- // Read transfers for this account.
7472 transfersPath := filepath .Join (accountDir , "transfers.json" )
7573 transfers , err := protoio .ReadMessagesJSON (transfersPath , func () * datav1.Transfer { return & datav1.Transfer {} })
7674 if err == nil {
7775 allTransfers = append (allTransfers , transfers ... )
7876 }
79- // Read trade transfers for this account.
8077 tradeTransfersPath := filepath .Join (accountDir , "trade_transfers.json" )
8178 tradeTransfers , err := protoio .ReadMessagesJSON (tradeTransfersPath , func () * datav1.TradeTransfer { return & datav1.TradeTransfer {} })
8279 if err == nil {
8380 allTradeTransfers = append (allTradeTransfers , tradeTransfers ... )
8481 }
85- // Read corporate actions for this account.
8682 corporateActionsPath := filepath .Join (accountDir , "corporate_actions.json" )
8783 corporateActions , err := protoio .ReadMessagesJSON (corporateActionsPath , func () * datav1.CorporateAction { return & datav1.CorporateAction {} })
8884 if err == nil {
8985 allCorporateActions = append (allCorporateActions , corporateActions ... )
9086 }
91- }
92- // Merge in Activity Statement CSV trades on top — CSV data takes precedence
93- // over Flex Query data since the user manages the CSVs directly.
94- var csvPositions []* datav1.Position
95- for _ , statement := range csvStatements {
96- for i := range statement .Trades {
97- // CSV trades don't have account IDs — they'll need to be mapped
98- // via the activity_statements_dir subdirectory structure.
99- trade , err := csvTradeToProto (& statement .Trades [i ])
100- if err != nil {
101- continue // Skip unparseable trades.
102- }
103- // CSV trades overwrite Flex Query trades with the same key.
104- key := tradeKey (trade )
105- tradeMap [key ] = trade
87+ // Load Activity Statement CSVs for this account from the matching subdirectory.
88+ // CSV data takes precedence over Flex Query data.
89+ csvDir := filepath .Join (activityStatementsDirPath , alias )
90+ csvStatements , err := ibkractivitycsv .ParseDirectory (csvDir )
91+ if err != nil {
92+ // No CSV directory for this account — skip (not an error).
93+ continue
10694 }
107- // Accumulate positions from CSVs (latest wins by overwrite).
108- for i := range statement .Positions {
109- pos , err := csvPositionToProto (& statement .Positions [i ])
110- if err != nil {
111- continue
95+ for _ , statement := range csvStatements {
96+ for i := range statement .Trades {
97+ trade , err := csvTradeToProto (& statement .Trades [i ], alias )
98+ if err != nil {
99+ continue
100+ }
101+ // CSV trades overwrite Flex Query trades with the same key.
102+ tradeMap [tradeKey (trade )] = trade
103+ }
104+ // CSV positions overwrite Flex Query positions for this account.
105+ for i := range statement .Positions {
106+ pos , err := csvPositionToProto (& statement .Positions [i ], alias )
107+ if err != nil {
108+ continue
109+ }
110+ allPositions = append (allPositions , pos )
112111 }
113- csvPositions = append (csvPositions , pos )
114112 }
115113 }
116114 // Collect and sort trades for deterministic output.
@@ -126,23 +124,18 @@ func Merge(
126124 }
127125 return trades [i ].GetTradeId () < trades [j ].GetTradeId ()
128126 })
129- // Merge positions: use CSV positions for accounts that have them, otherwise use Flex Query.
130- // If CSV positions exist, they take precedence.
131- positions := allPositions
132- if len (csvPositions ) > 0 {
133- positions = csvPositions
134- }
135127 return & MergedData {
136128 Trades : trades ,
137- Positions : positions ,
129+ Positions : allPositions ,
138130 Transfers : allTransfers ,
139131 TradeTransfers : allTradeTransfers ,
140132 CorporateActions : allCorporateActions ,
141133 }, nil
142134}
143135
144136// csvTradeToProto converts an Activity Statement CSV trade to a proto Trade.
145- func csvTradeToProto (csvTrade * ibkractivitycsv.Trade ) (* datav1.Trade , error ) {
137+ // The accountAlias is derived from the CSV subdirectory name.
138+ func csvTradeToProto (csvTrade * ibkractivitycsv.Trade , accountAlias string ) (* datav1.Trade , error ) {
146139 // Parse quantity as decimal (supports fractional shares).
147140 quantity , err := mathpb .NewDecimal (csvTrade .Quantity )
148141 if err != nil {
@@ -176,6 +169,7 @@ func csvTradeToProto(csvTrade *ibkractivitycsv.Trade) (*datav1.Trade, error) {
176169 tradeID := generateTradeID (csvTrade .Symbol , csvTrade .DateTime , csvTrade .Quantity , csvTrade .TradePrice )
177170 return & datav1.Trade {
178171 TradeId : tradeID ,
172+ AccountId : accountAlias ,
179173 TradeDate : protoDate ,
180174 SettleDate : protoDate , // CSV doesn't have settle date, use trade date.
181175 Symbol : csvTrade .Symbol ,
@@ -189,7 +183,8 @@ func csvTradeToProto(csvTrade *ibkractivitycsv.Trade) (*datav1.Trade, error) {
189183}
190184
191185// csvPositionToProto converts an Activity Statement CSV position to a proto Position.
192- func csvPositionToProto (csvPosition * ibkractivitycsv.Position ) (* datav1.Position , error ) {
186+ // The accountAlias is derived from the CSV subdirectory name.
187+ func csvPositionToProto (csvPosition * ibkractivitycsv.Position , accountAlias string ) (* datav1.Position , error ) {
193188 currencyCode := csvPosition .CurrencyCode
194189 quantity , err := mathpb .NewDecimal (csvPosition .Quantity )
195190 if err != nil {
@@ -209,6 +204,7 @@ func csvPositionToProto(csvPosition *ibkractivitycsv.Position) (*datav1.Position
209204 }
210205 return & datav1.Position {
211206 Symbol : csvPosition .Symbol ,
207+ AccountId : accountAlias ,
212208 AssetCategory : csvPosition .AssetCategory ,
213209 Quantity : quantity ,
214210 CostBasisPrice : costBasisPrice ,
0 commit comments