Skip to content

Commit 8b254db

Browse files
author
DogLooksGood
committed
Add recorder file writer
1 parent 50e9f96 commit 8b254db

3 files changed

Lines changed: 90 additions & 51 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ result
99
.blacklist
1010
logs
1111
bundles
12+
records

transactor-components/src/recorder.rs

Lines changed: 84 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,39 @@
1+
use async_trait::async_trait;
2+
use borsh::{BorshSerialize, BorshDeserialize};
3+
use race_core::context::Node;
4+
use race_api::event::Event;
5+
use race_transactor_frames::EventFrame;
6+
use std::fs::{File, create_dir};
7+
use std::io::{Write, BufWriter};
8+
use std::path::{Path, PathBuf};
19
use std::sync::Arc;
10+
use super::ComponentEnv;
11+
use super::common::ConsumerPorts;
212
use tokio::sync::Mutex;
3-
use async_trait::async_trait;
13+
use tracing::{info, error};
414

515
use crate::common::Component;
616
use crate::event_bus::CloseReason;
7-
use race_transactor_frames::EventFrame;
8-
9-
use tracing::info;
10-
11-
use super::common::ConsumerPorts;
12-
use super::ComponentEnv;
17+
use crate::utils::base64_encode;
18+
19+
#[derive(Debug, BorshSerialize, BorshDeserialize)]
20+
enum Record {
21+
Checkpoint {
22+
state: Vec<u8>,
23+
nodes: Vec<Node>,
24+
},
25+
Event {
26+
event: Event,
27+
timestamp: u64,
28+
}
29+
}
1330

1431
trait RecordWriter {
15-
fn write(&mut self, event_frame: EventFrame);
16-
17-
#[allow(unused)]
18-
fn dump_records(&self) -> Vec<EventFrame>;
32+
fn write(&mut self, record: Record);
1933
}
2034

2135
struct InMemoryRecordWriter {
22-
pub records: Vec<EventFrame>,
36+
pub records: Vec<Record>,
2337
}
2438

2539
impl InMemoryRecordWriter {
@@ -29,12 +43,39 @@ impl InMemoryRecordWriter {
2943
}
3044

3145
impl RecordWriter for InMemoryRecordWriter {
32-
fn write(&mut self, event_frame: EventFrame) {
33-
self.records.push(event_frame);
46+
47+
fn write(&mut self, record: Record) {
48+
self.records.push(record);
49+
}
50+
}
51+
52+
struct FileRecordWriter {
53+
pub writer: BufWriter<File>,
54+
}
55+
56+
impl FileRecordWriter {
57+
pub fn try_new(record_file_name: PathBuf) -> std::io::Result<Self> {
58+
let file = File::create(record_file_name)?;
59+
let writer = BufWriter::new(file);
60+
Ok(Self {
61+
writer
62+
})
63+
}
64+
}
65+
66+
impl FileRecordWriter {
67+
fn write_internal(&mut self, record: Record) -> std::io::Result<()> {
68+
let s = borsh::to_vec(&record)?;
69+
writeln!(&mut self.writer, "{}", base64_encode(&s))?;
70+
self.writer.flush()
3471
}
72+
}
3573

36-
fn dump_records(&self) -> Vec<EventFrame> {
37-
self.records.clone()
74+
impl RecordWriter for FileRecordWriter {
75+
fn write(&mut self, record: Record) {
76+
if let Err(e) = self.write_internal(record) {
77+
error!("Failed to write record: {}", e);
78+
}
3879
}
3980
}
4081

@@ -52,11 +93,20 @@ pub struct RecorderContext {
5293
impl Recorder {
5394
pub fn init(
5495
addr: String,
55-
_in_mem: bool,
96+
in_mem: bool,
5697
) -> (Self, RecorderContext) {
57-
58-
let writer = InMemoryRecordWriter::new();
59-
let writer = Arc::new(Mutex::new(writer));
98+
let writer: Arc<Mutex<dyn RecordWriter + Send + Sync>>;
99+
100+
if in_mem {
101+
writer = Arc::new(Mutex::new(InMemoryRecordWriter::new()))
102+
} else {
103+
let dir = Path::new("records");
104+
if !dir.exists() {
105+
create_dir(dir).expect("Failed to create records directory");
106+
}
107+
let file_path = format!("records/{}", addr);
108+
writer = Arc::new(Mutex::new(FileRecordWriter::try_new(file_path.into()).expect("Fail to create record writer")))
109+
}
60110

61111
(
62112
Self { addr, writer: writer.clone() },
@@ -86,43 +136,27 @@ impl Component<ConsumerPorts, RecorderContext> for Recorder {
86136

87137
let mut writer = writer.lock().await;
88138
match event_frame {
139+
140+
EventFrame::Broadcast {
141+
event,
142+
timestamp,
143+
..
144+
} => {
145+
let record = Record::Event {
146+
event, timestamp
147+
};
148+
149+
writer.write(record);
150+
}
151+
89152
EventFrame::Shutdown => {
90-
writer.write(event_frame);
91153
break;
92154
}
93155

94-
_ => {
95-
writer.write(event_frame);
96-
}
156+
_ => ()
97157
}
98158
}
99159

100160
CloseReason::Complete
101161
}
102162
}
103-
104-
#[cfg(test)]
105-
mod tests {
106-
107-
use super::*;
108-
109-
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
110-
async fn test_recorder() -> anyhow::Result<()> {
111-
let (recorder, recorder_ctx) = Recorder::init("test_addr".into(), true);
112-
// let mut recorder_handle = recorder.start("test_addr", recorder_ctx);
113-
114-
let (ports, io, env) = recorder.prepare("test_addr");
115-
116-
io.send(EventFrame::Empty).await?;
117-
io.send(EventFrame::Shutdown).await?;
118-
119-
let _close_reason = Recorder::run(ports, recorder_ctx, env).await;
120-
121-
let writer = recorder.writer.lock().await;
122-
assert!(matches!(writer.dump_records()[0], EventFrame::Empty));
123-
assert!(matches!(writer.dump_records()[1], EventFrame::Shutdown));
124-
125-
Ok(())
126-
}
127-
128-
}

transactor/src/handle/transactor.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::sync::Arc;
22

33
use race_transactor_components::{
4-
Broadcaster, Component, EventBridgeParent, EventBus, EventLoop, GameSynchronizer, LocalConnection, PortsHandle, Refunder, Submitter, WrappedClient, WrappedHandler
4+
Broadcaster, Component, EventBridgeParent, EventBus, EventLoop, GameSynchronizer, LocalConnection, PortsHandle, Refunder, Submitter, WrappedClient, WrappedHandler, Recorder
55
};
66
use race_transactor_frames::{EventFrame, SignalFrame};
77
use race_core::error::{Error, Result};
@@ -115,6 +115,9 @@ impl TransactorHandle {
115115
let (bridge, bridge_ctx) = EventBridgeParent::init(signal_tx);
116116
let mut bridge_handle = bridge.start(&game_account.addr, bridge_ctx);
117117

118+
let (recorder, recorder_ctx) = Recorder::init(game_account.addr.clone(), false);
119+
let mut recorder_handle = recorder.start(&game_account.addr, recorder_ctx);
120+
118121
let (event_loop, event_loop_ctx) = EventLoop::init(
119122
handler,
120123
game_context,
@@ -153,6 +156,7 @@ impl TransactorHandle {
153156
event_bus.attach(&mut event_loop_handle).await;
154157
event_bus.attach(&mut client_handle).await;
155158
event_bus.attach(&mut refunder_handle).await;
159+
event_bus.attach(&mut recorder_handle).await;
156160

157161
// Dispatch init state
158162
event_bus

0 commit comments

Comments
 (0)