Skip to content
This repository was archived by the owner on Jun 1, 2026. It is now read-only.

Commit 6297425

Browse files
committed
Add handling for fetching nft asset
1 parent 309bfb2 commit 6297425

5 files changed

Lines changed: 141 additions & 103 deletions

File tree

apps/daemon/src/consumers/store/store_transactions_consumer.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use std::collections::HashSet;
22
use std::{collections::HashMap, error::Error};
33

44
use async_trait::async_trait;
5-
use primitives::{AssetAddress, DeviceSubscription, Transaction, TransactionId, TransactionState, TransactionType};
6-
use storage::{AssetsAddressesRepository, AssetsRepository, Database, TransactionsRepository, WalletsRepository};
5+
use primitives::{AssetAddress, DeviceSubscription, NFTAssetId, Transaction, TransactionId, TransactionState, TransactionType};
6+
use storage::{AssetsAddressesRepository, AssetsRepository, Database, NftAssetFilter, NftRepository, TransactionsRepository, WalletsRepository};
77
use streamer::{AssetId, NotificationsPayload, StreamProducer, StreamProducerQueue, TransactionsPayload, WalletStreamEvent, WalletStreamPayload, consumer::MessageConsumer};
88
use swapper::cross_chain::{self, DepositAddressMap, SendAddressMap};
99

@@ -64,6 +64,17 @@ impl MessageConsumer<TransactionsPayload, usize> for StoreTransactionsConsumer {
6464

6565
let _ = self.stream_producer.publish_fetch_assets(missing_assets).await;
6666

67+
let nft_asset_ids: Vec<NFTAssetId> = transactions
68+
.iter()
69+
.filter(|x| x.addresses().iter().any(|addr| subscription_addresses.contains(addr)))
70+
.filter_map(|x| x.nft_asset_id())
71+
.collect::<HashSet<_>>()
72+
.into_iter()
73+
.collect();
74+
75+
let missing_nft_assets = self.get_missing_nft_assets(nft_asset_ids).await?;
76+
let _ = self.stream_producer.publish_fetch_nft_assets(missing_nft_assets).await;
77+
6778
let mut transactions_map: HashMap<TransactionId, Transaction> = HashMap::new();
6879
let mut assets_addresses = HashSet::new();
6980
let mut notifications: Vec<NotificationsPayload> = Vec::new();
@@ -225,6 +236,16 @@ impl StoreTransactionsConsumer {
225236
Ok((assets_with_prices, missing_assets_ids))
226237
}
227238

239+
async fn get_missing_nft_assets(&self, nft_asset_ids: Vec<NFTAssetId>) -> Result<Vec<NFTAssetId>, Box<dyn Error + Send + Sync>> {
240+
if nft_asset_ids.is_empty() {
241+
return Ok(Vec::new());
242+
}
243+
let identifiers: Vec<String> = nft_asset_ids.iter().map(|id| id.to_string()).collect();
244+
let existing = self.database.nft()?.get_nft_assets_by_filter(vec![NftAssetFilter::Identifiers(identifiers)])?;
245+
let existing_ids: HashSet<NFTAssetId> = existing.into_iter().map(|row| row.identifier.0).collect();
246+
Ok(nft_asset_ids.into_iter().filter(|id| !existing_ids.contains(id)).collect())
247+
}
248+
228249
async fn store_transactions(&self, transactions: Vec<Transaction>) -> Result<usize, Box<dyn Error + Send + Sync>> {
229250
if transactions.is_empty() {
230251
return Ok(0);

crates/gem_ton/src/provider/transactions_mapper.rs

Lines changed: 97 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -59,79 +59,103 @@ pub fn map_trace_transactions(traces: Vec<Trace>) -> Vec<Transaction> {
5959
traces.into_iter().filter_map(map_root_trace_transaction).collect()
6060
}
6161

62+
struct TransferDetails {
63+
asset_id: AssetId,
64+
from: String,
65+
to: String,
66+
value: String,
67+
transaction_type: TransactionType,
68+
memo: Option<String>,
69+
metadata: Option<serde_json::Value>,
70+
}
71+
6272
fn map_root_trace_transaction(trace: Trace) -> Option<Transaction> {
6373
let state = if trace.is_incomplete || trace.has_actions() { Some(trace.action_state()) } else { None };
64-
let swap = jetton_swap(&trace.actions);
65-
let jetton_transfer = jetton_transfer(&trace.actions);
6674
let mut transactions = trace.transactions;
6775
let root_hash = trace.transactions_order.into_iter().next()?;
6876
let root = transactions.remove(&root_hash)?;
6977

70-
if let Some(details) = jetton_transfer {
71-
return map_jetton_transfer_transaction(&root, state, details);
72-
}
73-
74-
let mut transaction = map_transaction_message_with_state(root, state)?;
75-
if let Some((sender, metadata)) = swap
76-
&& let Ok(value) = serde_json::to_value(metadata)
77-
{
78-
transaction.transaction_type = TransactionType::Swap;
79-
transaction.from = sender.clone();
80-
transaction.to = sender;
81-
transaction.metadata = Some(value);
82-
}
83-
Some(transaction)
84-
}
78+
let details = if let Some(d) = jetton_transfer_details(&trace.actions) {
79+
d
80+
} else {
81+
let mut d = simple_transfer_details(&root)?;
82+
apply_jetton_swap(&mut d, &trace.actions);
83+
d
84+
};
8585

86-
fn jetton_transfer(actions: &[TraceAction]) -> Option<JettonTransferDetails> {
87-
let action = actions
88-
.iter()
89-
.find(|action| action.action_type.as_deref() == Some(TRACE_ACTION_JETTON_TRANSFER) && action.success == Some(true))?;
90-
serde_json::from_value(action.details.clone()?).ok()
86+
build_transaction(&root, state, details)
9187
}
9288

93-
fn map_jetton_transfer_transaction(transaction: &TransactionMessage, state: Option<TransactionState>, details: JettonTransferDetails) -> Option<Transaction> {
89+
fn build_transaction(message: &TransactionMessage, state: Option<TransactionState>, details: TransferDetails) -> Option<Transaction> {
9490
let fee_asset_id = Chain::Ton.as_asset_id();
95-
let state = state.unwrap_or_else(|| map_transaction_state(transaction));
96-
let created_at = DateTime::from_timestamp(transaction.now, 0)?;
97-
let hash = base64_hash_to_hex(&transaction.hash)?;
98-
let token_id = hex_to_base64_address(&details.asset)?;
99-
let asset_id = AssetId::from_token(Chain::Ton, &token_id);
100-
let from = parse_address(&details.sender)?;
101-
let to = parse_address(&details.receiver)?;
102-
let memo = details.comment.filter(|comment| !comment.is_empty());
91+
let state = state.unwrap_or_else(|| map_transaction_state(message));
92+
let created_at = DateTime::from_timestamp(message.now, 0)?;
93+
let hash = base64_hash_to_hex(&message.hash)?;
10394

10495
Some(Transaction::new(
10596
hash,
106-
asset_id,
107-
from,
108-
to,
97+
details.asset_id,
98+
details.from,
99+
details.to,
109100
None,
110-
TransactionType::Transfer,
101+
details.transaction_type,
111102
state,
112-
transaction.total_fees.to_string(),
103+
message.total_fees.to_string(),
113104
fee_asset_id,
114-
details.amount,
115-
memo,
116-
None,
105+
details.value,
106+
details.memo,
107+
details.metadata,
117108
created_at,
118109
))
119110
}
120111

121-
fn jetton_swap(actions: &[TraceAction]) -> Option<(String, TransactionSwapMetadata)> {
122-
let action = actions
112+
fn find_action<'a>(actions: &'a [TraceAction], action_type: &str) -> Option<&'a TraceAction> {
113+
actions
123114
.iter()
124-
.find(|action| action.action_type.as_deref() == Some(TRACE_ACTION_JETTON_SWAP) && action.success == Some(true))?;
125-
let details: JettonSwapDetails = serde_json::from_value(action.details.clone()?).ok()?;
126-
let sender = parse_address(&details.sender)?;
115+
.find(|action| action.action_type.as_deref() == Some(action_type) && action.success == Some(true))
116+
}
117+
118+
fn jetton_transfer_details(actions: &[TraceAction]) -> Option<TransferDetails> {
119+
let details: JettonTransferDetails = serde_json::from_value(find_action(actions, TRACE_ACTION_JETTON_TRANSFER)?.details.clone()?).ok()?;
120+
let token_id = hex_to_base64_address(&details.asset)?;
121+
Some(TransferDetails {
122+
asset_id: AssetId::from_token(Chain::Ton, &token_id),
123+
from: parse_address(&details.sender)?,
124+
to: parse_address(&details.receiver)?,
125+
value: details.amount,
126+
transaction_type: TransactionType::Transfer,
127+
memo: details.comment.filter(|comment| !comment.is_empty()),
128+
metadata: None,
129+
})
130+
}
131+
132+
fn apply_jetton_swap(details: &mut TransferDetails, actions: &[TraceAction]) {
133+
let Some(action) = find_action(actions, TRACE_ACTION_JETTON_SWAP) else {
134+
return;
135+
};
136+
let Some(swap): Option<JettonSwapDetails> = action.details.clone().and_then(|value| serde_json::from_value(value).ok()) else {
137+
return;
138+
};
139+
let Some(sender) = parse_address(&swap.sender) else {
140+
return;
141+
};
142+
let (Some(from_asset), Some(to_asset)) = (ton_asset_id(swap.asset_in.as_deref()), ton_asset_id(swap.asset_out.as_deref())) else {
143+
return;
144+
};
127145
let metadata = TransactionSwapMetadata {
128-
from_asset: ton_asset_id(details.asset_in.as_deref())?,
129-
from_value: details.dex_incoming_transfer.amount,
130-
to_asset: ton_asset_id(details.asset_out.as_deref())?,
131-
to_value: details.dex_outgoing_transfer.amount,
132-
provider: details.dex,
146+
from_asset,
147+
from_value: swap.dex_incoming_transfer.amount,
148+
to_asset,
149+
to_value: swap.dex_outgoing_transfer.amount,
150+
provider: swap.dex,
133151
};
134-
Some((sender, metadata))
152+
let Ok(metadata) = serde_json::to_value(metadata) else {
153+
return;
154+
};
155+
details.transaction_type = TransactionType::Swap;
156+
details.from = sender.clone();
157+
details.to = sender;
158+
details.metadata = Some(metadata);
135159
}
136160

137161
fn ton_asset_id(raw_address: Option<&str>) -> Option<AssetId> {
@@ -141,63 +165,38 @@ fn ton_asset_id(raw_address: Option<&str>) -> Option<AssetId> {
141165
}
142166
}
143167

144-
fn map_transaction_message_with_state(transaction: TransactionMessage, state: Option<TransactionState>) -> Option<Transaction> {
168+
fn simple_transfer_details(message: &TransactionMessage) -> Option<TransferDetails> {
145169
let asset_id = Chain::Ton.as_asset_id();
146-
let state = state.unwrap_or_else(|| map_transaction_state(&transaction));
147-
let created_at = DateTime::from_timestamp(transaction.now, 0)?;
148-
let hash = base64_hash_to_hex(&transaction.hash)?;
149-
150-
if transaction.out_msgs.len() == 1 && is_simple_transfer(transaction.out_msgs.first()?) {
151-
let out_message = transaction.out_msgs.first()?;
152-
let from = parse_address(&out_message.source)?;
153-
let to = match &out_message.destination {
154-
Some(destination) => parse_address(destination)?,
155-
None => return None,
156-
};
157-
let value = out_message.value.as_ref().unwrap_or(&"0".to_string()).clone();
158-
let memo = extract_memo(out_message);
159-
160-
return Some(Transaction::new(
161-
hash,
162-
asset_id.clone(),
163-
from,
164-
to,
165-
None,
166-
TransactionType::Transfer,
167-
state,
168-
transaction.total_fees.to_string(),
170+
171+
if message.out_msgs.len() == 1 && is_simple_transfer(message.out_msgs.first()?) {
172+
let out_message = message.out_msgs.first()?;
173+
let to = parse_address(out_message.destination.as_deref()?)?;
174+
return Some(TransferDetails {
169175
asset_id,
170-
value,
171-
memo,
172-
None,
173-
created_at,
174-
));
176+
from: parse_address(&out_message.source)?,
177+
to,
178+
value: out_message.value.clone().unwrap_or_else(|| "0".to_string()),
179+
transaction_type: TransactionType::Transfer,
180+
memo: extract_memo(out_message),
181+
metadata: None,
182+
});
175183
}
176184

177-
if transaction.out_msgs.is_empty()
178-
&& let Some(in_msg) = &transaction.in_msg
185+
if message.out_msgs.is_empty()
186+
&& let Some(in_msg) = &message.in_msg
179187
&& let (Some(value), Some(source)) = (&in_msg.value, &in_msg.source)
180188
&& let Ok(value_int) = value.parse::<i64>()
181189
&& value_int > 0
182190
{
183-
let from = parse_address(source)?;
184-
let to = parse_address(&in_msg.destination)?;
185-
186-
return Some(Transaction::new(
187-
hash,
188-
asset_id.clone(),
189-
from,
190-
to,
191-
None,
192-
TransactionType::Transfer,
193-
state,
194-
transaction.total_fees.to_string(),
191+
return Some(TransferDetails {
195192
asset_id,
196-
value.clone(),
197-
None,
198-
None,
199-
created_at,
200-
));
193+
from: parse_address(source)?,
194+
to: parse_address(&in_msg.destination)?,
195+
value: value.clone(),
196+
transaction_type: TransactionType::Transfer,
197+
memo: None,
198+
metadata: None,
199+
});
201200
}
202201

203202
None

crates/primitives/src/transaction.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
2-
AddressName, AssetAddress, Chain, TransactionId, TransactionSwapMetadata, asset_id::AssetId, transaction_direction::TransactionDirection, transaction_state::TransactionState,
3-
transaction_type::TransactionType, transaction_utxo::TransactionUtxoInput,
2+
AddressName, AssetAddress, Chain, NFTAssetId, TransactionId, TransactionNFTTransferMetadata, TransactionSwapMetadata, asset_id::AssetId,
3+
transaction_direction::TransactionDirection, transaction_state::TransactionState, transaction_type::TransactionType, transaction_utxo::TransactionUtxoInput,
44
};
55

66
use chrono::{DateTime, Utc};
@@ -254,6 +254,16 @@ impl Transaction {
254254
self.metadata.as_ref().and_then(|value| TransactionSwapMetadata::deserialize(value).ok())
255255
}
256256

257+
pub fn nft_asset_id(&self) -> Option<NFTAssetId> {
258+
if self.transaction_type != TransactionType::TransferNFT {
259+
return None;
260+
}
261+
self.metadata
262+
.as_ref()
263+
.and_then(|value| TransactionNFTTransferMetadata::deserialize(value).ok())
264+
.map(|metadata| metadata.asset_id)
265+
}
266+
257267
pub fn asset_ids(&self) -> Vec<AssetId> {
258268
match self.transaction_type {
259269
TransactionType::Transfer

crates/storage/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub use self::database::{
1919
assets::{AssetFilter, AssetUpdate},
2020
charts::ChartFilter,
2121
fiat::FiatAssetFilter,
22-
nft::NftCollectionFilter,
22+
nft::{NftAssetFilter, NftCollectionFilter},
2323
perpetuals::PerpetualFilter,
2424
prices::{AssetsWithPricesFilter, PriceUpdate},
2525
referrals::{AbusePatterns, ReferralUpdate},

crates/streamer/src/steam_producer_queue.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use crate::{
1111
pub trait StreamProducerQueue {
1212
async fn publish_fetch_assets(&self, asset_ids: Vec<AssetId>) -> Result<bool, Box<dyn Error + Send + Sync>>;
1313
async fn publish_fetch_nft_asset(&self, asset_id: NFTAssetId) -> Result<bool, Box<dyn Error + Send + Sync>>;
14+
async fn publish_fetch_nft_assets(&self, asset_ids: Vec<NFTAssetId>) -> Result<bool, Box<dyn Error + Send + Sync>>;
1415
async fn publish_fetch_prices(&self, payload: FetchPricesPayload) -> Result<bool, Box<dyn Error + Send + Sync>>;
1516
async fn publish_fetch_prices_assets(&self, asset_ids: Vec<AssetId>) -> Result<bool, Box<dyn Error + Send + Sync>>;
1617
async fn publish_transactions(&self, payload: TransactionsPayload) -> Result<bool, Box<dyn Error + Send + Sync>>;
@@ -44,6 +45,13 @@ impl StreamProducerQueue for StreamProducer {
4445
self.publish(QueueName::FetchNFTCollectionAssets, &FetchNFTAssetPayload::new(asset_id)).await
4546
}
4647

48+
async fn publish_fetch_nft_assets(&self, asset_ids: Vec<NFTAssetId>) -> Result<bool, Box<dyn Error + Send + Sync>> {
49+
for asset_id in asset_ids {
50+
self.publish_fetch_nft_asset(asset_id).await?;
51+
}
52+
Ok(true)
53+
}
54+
4755
async fn publish_fetch_prices(&self, payload: FetchPricesPayload) -> Result<bool, Box<dyn Error + Send + Sync>> {
4856
self.publish(QueueName::FetchPrices, &payload).await
4957
}

0 commit comments

Comments
 (0)