Skip to content

Commit 5d002ef

Browse files
author
DogLooksGood
committed
Add retry in game account fetching after subscription
1 parent 251acc8 commit 5d002ef

3 files changed

Lines changed: 36 additions & 20 deletions

File tree

transactor-components/src/synchronizer.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ async fn maybe_send_sync(
3939

4040
// Drop duplicated updates
4141
if access_version <= prev_access_version {
42-
info!("{} Drop update, this version {} < previous version {}", env.log_prefix, access_version, prev_access_version);
42+
info!("{} Drop update, this version({}) is not newer than previous version({})", env.log_prefix, access_version, prev_access_version);
4343
return (prev_access_version, None);
4444
}
4545

@@ -161,9 +161,8 @@ impl Component<PipelinePorts, GameSynchronizerContext> for GameSynchronizer {
161161
}
162162
};
163163

164-
164+
// Do a first query, to handle those transactions made when our transactor is offline.
165165
let account = ctx.transport.get_game_account(&ctx.game_addr).await;
166-
167166
if let Ok(Some(game_account)) = account {
168167
let (new_access_version, close_reason) = maybe_send_sync(
169168
prev_access_version,
@@ -191,13 +190,16 @@ impl Component<PipelinePorts, GameSynchronizerContext> for GameSynchronizer {
191190
}
192191

193192
sub_item = sub.next() => {
194-
195193
// The retry mechanism is implemented in `WrappedTransport`. An Ok(Err(..))
196194
// means the transport has gave up on reading game state. The Ok(None) stands
197195
// for the end of the stream, which is supposed to be sent after an error. In
198196
// both cases, we shutdown the game by sending a Shutdown frame.
199197
match sub_item {
200198
Some(Ok(game_account)) => {
199+
200+
info!("{} Get account from subscription, access_version = {}, previous access version = {}",
201+
env.log_prefix, game_account.access_version, prev_access_version);
202+
201203
let (new_access_version, close_reason) = maybe_send_sync(
202204
prev_access_version,
203205
game_account,
@@ -227,6 +229,7 @@ impl Component<PipelinePorts, GameSynchronizerContext> for GameSynchronizer {
227229
}
228230
}
229231

232+
info!("{} Synchronizer exits.", env.log_prefix);
230233
return CloseReason::Complete;
231234
}
232235
}

transactor/src/reg.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ pub async fn start_reg_task(context: &ApplicationContext) -> JoinHandle<()> {
152152
}
153153
}
154154
} else {
155-
warn!("Failed to load registration at {}", addr);
155+
warn!("Failed to load registration at {}, skip it for this time.", addr);
156156
}
157157
}
158158

transport/src/solana.rs

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use constants::*;
77
use futures::{Stream, StreamExt};
88
use solana_transaction_status::UiTransactionEncoding;
99
use tokio::sync::Mutex;
10-
use tokio::time::timeout;
1110
use std::time::Duration;
1211
use tracing::{error, info, warn};
1312
use types::*;
@@ -65,7 +64,6 @@ mod nft;
6564
const METAPLEX_PROGRAM_ID: &str = "metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s";
6665

6766
const FEE_CAP: u64 = 20000;
68-
const RESTART_SUBSCRIPTION_TIMEOUT_SECS: u64 = 180;
6967

7068
const PLAYERS_REG_HEAD_LEN: usize = 152;
7169
const PLAYERS_REG_INIT_LEN: usize = PLAYERS_REG_HEAD_LEN + 4;
@@ -1233,9 +1231,8 @@ impl TransportT for SolanaTransport {
12331231
}
12341232
};
12351233

1236-
while let Ok(Some(rpc_response)) = timeout(
1237-
Duration::from_secs(RESTART_SUBSCRIPTION_TIMEOUT_SECS), stream.next()
1238-
).await {
1234+
info!("Start the subscription for {}", addr);
1235+
while let Some(rpc_response) = stream.next().await {
12391236
let ui_account: UiAccount = rpc_response.value;
12401237

12411238
let Some(data) = ui_account.data.decode() else {
@@ -1259,16 +1256,31 @@ impl TransportT for SolanaTransport {
12591256
}
12601257
};
12611258

1262-
let (game_state, players_reg) = match self.internal_get_game_state_and_players_reg_state(
1263-
&game_account_pubkey,
1264-
&state.players_reg_account,
1265-
).await {
1266-
Ok((game_state, players_reg)) => (game_state, players_reg),
1267-
Err(e) => {
1268-
error!("Get players reg error: {}", e.to_string());
1269-
unsub().await;
1270-
return;
1271-
}
1259+
info!("GameState From Sub, versions #A{}#S{}", state.access_version, state.settle_version);
1260+
1261+
1262+
// We have to make sure the we get the data that is as new as the one from subscription
1263+
// So we keep checking their access_version and settle_version.
1264+
let (game_state, players_reg) = loop {
1265+
match self.internal_get_game_state_and_players_reg_state(
1266+
&game_account_pubkey,
1267+
&state.players_reg_account,
1268+
).await {
1269+
Ok((game_state, players_reg)) => {
1270+
info!("GameState From Query, versions #A{}#S{}", game_state.access_version, game_state.settle_version);
1271+
if game_state.access_version != state.access_version
1272+
|| game_state.settle_version != state.settle_version {
1273+
info!("Retry query");
1274+
continue;
1275+
}
1276+
break (game_state, players_reg)
1277+
}
1278+
Err(e) => {
1279+
error!("Get players reg error: {}", e.to_string());
1280+
unsub().await;
1281+
return;
1282+
}
1283+
};
12721284
};
12731285

12741286
let players = players_reg.players;
@@ -1288,6 +1300,7 @@ impl TransportT for SolanaTransport {
12881300
// We restart subscription regularly to avoid broken connection.
12891301
// A broken connection is the case where the connection is established,
12901302
// no error raises, but no further update is delivered.
1303+
info!("Stop the subscription for {}", addr);
12911304
unsub().await;
12921305
return;
12931306
}))

0 commit comments

Comments
 (0)