Skip to content

Commit 8a6ee86

Browse files
committed
added support for checking trade results
1 parent f63c7eb commit 8a6ee86

9 files changed

Lines changed: 351 additions & 28 deletions

File tree

crates/binary_options_tools/src/pocketoption_pre/modules/trades/README.md renamed to crates/binary_options_tools/mkds/README.md

File renamed without changes.

crates/binary_options_tools/src/pocketoption_pre/error.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1+
use std::time::Duration;
2+
13
use binary_options_tools_core_pre::error::CoreError;
4+
use uuid::Uuid;
25

36

47
#[derive(thiserror::Error, Debug)]
@@ -18,8 +21,21 @@ pub enum PocketError {
1821
asset: String,
1922
},
2023

24+
/// Error finding deal.
25+
#[error("Failed to find deal: {0}")]
26+
DealNotFound(Uuid),
27+
28+
/// Timeout error.
29+
#[error("Timeout error: {task} in {context} after {duration:?}")]
30+
Timeout {
31+
task: String, // The task that timed out, eg "check-results",
32+
context: String,
33+
duration: Duration,
34+
},
35+
2136
#[error("General error: {0}")]
2237
General(String),
38+
2339
}
2440

2541
pub type PocketResult<T> = Result<T, PocketError>;

crates/binary_options_tools/src/pocketoption_pre/modules/assets.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use binary_options_tools_core_pre::{
88
reimports::{AsyncReceiver, AsyncSender},
99
traits::{LightweightModule, Rule},
1010
};
11-
use tracing::{info, warn};
11+
use tracing::{debug, warn};
1212

1313
/// Module for handling asset updates in PocketOption
1414
/// This module listens for asset-related messages and processes them accordingly.
@@ -33,7 +33,7 @@ impl LightweightModule<State> for AssetsModule {
3333
while let Ok(msg) = self.receiver.recv().await {
3434
if let Message::Binary(text) = &*msg {
3535
if let Ok(assets) = serde_json::from_slice::<Assets>(text) {
36-
info!("Loaded assets: {:?}", assets.names());
36+
debug!("Loaded assets: {:?}", assets.names());
3737
self.state.set_assets(assets).await;
3838
} else {
3939
warn!("Failed to parse assets message: {:?}", text);

crates/binary_options_tools/src/pocketoption_pre/modules/deals.rs

Lines changed: 221 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
1-
use std::sync::Arc;
1+
use std::{sync::{atomic::{AtomicBool, Ordering}, Arc}, time::Duration};
22

33
use async_trait::async_trait;
44
use binary_options_tools_core_pre::{
55
error::CoreError, reimports::{AsyncReceiver, AsyncSender, Message}, traits::{ApiModule, Rule}
66
};
7+
use serde::Deserialize;
8+
use tracing::info;
79
use uuid::Uuid;
810

9-
use crate::pocketoption_pre::{error::PocketResult, state::State, types::{Deal, MultiPatternRule}};
11+
use crate::pocketoption_pre::{error::{PocketError, PocketResult}, state::State, types::Deal};
1012

13+
const UPDATE_OPENED_DEALS: &str = r#"451-["updateOpenedDeals","#;
14+
const UPDATE_CLOSED_DEALS: &str = r#"451-["updateClosedDeals","#;
15+
const SUCCESS_CLOSE_ORDER: &str = r#"451-["successcloseOrder","#;
1116

1217
#[derive(Debug)]
1318
pub enum Command {
@@ -16,7 +21,22 @@ pub enum Command {
1621

1722
#[derive(Debug)]
1823
pub enum CommandResponse {
19-
CheckResult(PocketResult<Deal>),
24+
CheckResult(Box<Deal>),
25+
DealNotFound(Uuid),
26+
}
27+
28+
enum ExpectedMessage {
29+
UpdateClosedDeals,
30+
UpdateOpenedDeals,
31+
SuccessCloseOrder,
32+
None
33+
}
34+
35+
#[derive(Deserialize)]
36+
struct CloseOrder {
37+
#[serde(rename = "profit")]
38+
_profit: f64,
39+
deals: Vec<Deal>,
2040
}
2141

2242
#[derive(Clone)]
@@ -28,23 +48,69 @@ pub struct DealsHandle {
2848
impl DealsHandle {
2949
pub async fn check_result(&self, trade_id: Uuid) -> PocketResult<Deal> {
3050
self.sender.send(Command::CheckResult(trade_id)).await.map_err(CoreError::from)?;
31-
match self.receiver.recv().await.map_err(CoreError::from)? {
32-
CommandResponse::CheckResult(result) => result,
51+
loop {
52+
match self.receiver.recv().await {
53+
Ok(CommandResponse::CheckResult(deal)) => {
54+
if trade_id == deal.id {
55+
return Ok(*deal);
56+
} else {
57+
// If the request ID does not match, continue waiting for the correct response
58+
continue;
59+
}
60+
},
61+
Ok(CommandResponse::DealNotFound(id)) => return Err(PocketError::DealNotFound(id)),
62+
Err(e) => return Err(CoreError::from(e).into()),
63+
}
64+
}
65+
}
66+
67+
pub async fn check_result_with_timeout(&self, trade_id: Uuid, timeout: Duration) -> PocketResult<Deal> {
68+
self.sender.send(Command::CheckResult(trade_id)).await.map_err(CoreError::from)?;
69+
70+
let timeout_future = tokio::time::sleep(timeout);
71+
tokio::pin!(timeout_future);
72+
73+
loop {
74+
tokio::select! {
75+
result = self.receiver.recv() => {
76+
match result {
77+
Ok(CommandResponse::CheckResult(deal)) => {
78+
if trade_id == deal.id {
79+
return Ok(*deal);
80+
} else {
81+
// If the request ID does not match, continue waiting for the correct response
82+
continue;
83+
}
84+
},
85+
Ok(CommandResponse::DealNotFound(id)) => return Err(PocketError::DealNotFound(id)),
86+
Err(e) => return Err(CoreError::from(e).into()),
87+
}
88+
}
89+
_ = &mut timeout_future => {
90+
return Err(PocketError::Timeout {
91+
task: "check_result".to_string(),
92+
context: format!("Waiting for trade '{trade_id}' result"),
93+
duration: timeout,
94+
});
95+
}
96+
}
3397
}
3498
}
99+
35100
}
36101

37102
/// An API module responsible for listening to deal updates,
38103
/// maintaining the shared `TradeState`, and checking trade results.
39-
pub struct DealsUpdateModule {
104+
pub struct DealsApiModule {
40105
state: Arc<State>,
41106
ws_receiver: AsyncReceiver<Arc<Message>>,
42107
command_receiver: AsyncReceiver<Command>,
43108
command_responder: AsyncSender<CommandResponse>,
109+
waitlist: Vec<Uuid>,
44110
}
45111

46112
#[async_trait]
47-
impl ApiModule<State> for DealsUpdateModule {
113+
impl ApiModule<State> for DealsApiModule {
48114
type Command = Command;
49115
type CommandResponse = CommandResponse;
50116
type Handle = DealsHandle;
@@ -61,6 +127,7 @@ impl ApiModule<State> for DealsUpdateModule {
61127
ws_receiver,
62128
command_receiver,
63129
command_responder,
130+
waitlist: Vec::new(),
64131
}
65132
}
66133

@@ -80,18 +147,160 @@ impl ApiModule<State> for DealsUpdateModule {
80147
// 3. For `CheckResult` commands:
81148
// - Implement the logic described in README.md to wait for the deal to close.
82149
// - Send the result back via `command_responder`.
83-
Ok(())
150+
let mut expected = ExpectedMessage::None;
151+
loop {
152+
tokio::select! {
153+
Ok(msg) = self.ws_receiver.recv() => {
154+
info!("Received message: {:?}", msg);
155+
match msg.as_ref() {
156+
Message::Text(text) => {
157+
if text.starts_with(UPDATE_OPENED_DEALS) {
158+
expected = ExpectedMessage::UpdateOpenedDeals;
159+
} else if text.starts_with(UPDATE_CLOSED_DEALS) {
160+
expected = ExpectedMessage::UpdateClosedDeals;
161+
} else if text.starts_with(SUCCESS_CLOSE_ORDER) {
162+
expected = ExpectedMessage::SuccessCloseOrder;
163+
}
164+
},
165+
Message::Binary(data) => {
166+
// Handle binary messages if needed
167+
match expected {
168+
ExpectedMessage::UpdateOpenedDeals => {
169+
// Handle UpdateOpenedDeals
170+
match serde_json::from_slice::<Vec<Deal>>(data) {
171+
Ok(deals) => {
172+
self.state.trade_state.update_opened_deals(deals).await;
173+
},
174+
Err(e) => return Err(CoreError::from(e)),
175+
}
176+
}
177+
ExpectedMessage::UpdateClosedDeals => {
178+
// Handle UpdateClosedDeals
179+
match serde_json::from_slice::<Vec<Deal>>(data) {
180+
Ok(deals) => {
181+
self.state.trade_state.update_closed_deals(deals).await;
182+
// Check if some trades of the waitlist are now closed
183+
let mut remove = Vec::new();
184+
for id in &self.waitlist {
185+
if let Some(deal) = self.state.trade_state.get_closed_deal(*id).await {
186+
info!("Trade closed: {:?}", deal);
187+
self.command_responder.send(CommandResponse::CheckResult(Box::new(deal))).await?;
188+
remove.push(*id);
189+
}
190+
}
191+
self.waitlist.retain(|id| !remove.contains(id));
192+
},
193+
Err(e) => return Err(CoreError::from(e)),
194+
}
195+
}
196+
ExpectedMessage::SuccessCloseOrder => {
197+
// Handle SuccessCloseOrder
198+
match serde_json::from_slice::<CloseOrder>(data) {
199+
Ok(close_order) => {
200+
self.state.trade_state.update_closed_deals(close_order.deals).await;
201+
// Check if some trades of the waitlist are now closed
202+
let mut remove = Vec::new();
203+
for id in &self.waitlist {
204+
if let Some(deal) = self.state.trade_state.get_closed_deal(*id).await {
205+
info!("Trade closed: {:?}", deal);
206+
self.command_responder.send(CommandResponse::CheckResult(Box::new(deal))).await?;
207+
remove.push(*id);
208+
}
209+
}
210+
self.waitlist.retain(|id| !remove.contains(id));
211+
212+
},
213+
Err(e) => return Err(CoreError::from(e)),
214+
}
215+
},
216+
_ => {}
217+
}
218+
expected = ExpectedMessage::None;
219+
},
220+
_ => {}
221+
}
222+
223+
}
224+
Ok(cmd) = self.command_receiver.recv() => {
225+
match cmd {
226+
Command::CheckResult(trade_id) => {
227+
if self.state.trade_state.contains_opened_deal(trade_id).await {
228+
// If the deal is still opened, add it to the waitlist
229+
self.waitlist.push(trade_id);
230+
} else if let Some(deal) = self.state.trade_state.get_closed_deal(trade_id).await {
231+
// If the deal is already closed, send the result immediately
232+
self.command_responder.send(CommandResponse::CheckResult(Box::new(deal))).await?;
233+
} else {
234+
// If the deal is not found, send a DealNotFound response
235+
self.command_responder.send(CommandResponse::DealNotFound(trade_id)).await?;
236+
}
237+
// Implement logic to check the result of a trade
238+
// For example, wait for the deal to close and return the result
239+
}
240+
}
241+
}
242+
}
243+
}
84244
}
85245

86246
fn rule() -> Box<dyn Rule + Send + Sync> {
87247
// This rule will match messages like:
88248
// 451-["updateOpenedDeals",...]
89249
// 451-["updateClosedDeals",...]
90250
// 451-["successcloseOrder",...]
91-
Box::new(MultiPatternRule::new(vec![
92-
r#"451-\["updateOpenedDeals"#,
93-
r#"451-\["updateClosedDeals"#,
94-
r#"451-\["successcloseOrder"#,
251+
252+
Box::new(DealsUpdateRule::new(vec![
253+
UPDATE_CLOSED_DEALS,
254+
UPDATE_OPENED_DEALS,
255+
SUCCESS_CLOSE_ORDER
95256
]))
96257
}
97258
}
259+
260+
/// Create a new custom rule that matches the specific patterns and also returns true for strings
261+
/// that starts with any of the patterns
262+
struct DealsUpdateRule {
263+
valid: AtomicBool,
264+
patterns: Vec<String>,
265+
}
266+
267+
impl DealsUpdateRule {
268+
/// Create a new MultiPatternRule with the specified patterns
269+
///
270+
/// # Arguments
271+
/// * `patterns` - The string patterns to match against incoming messages
272+
pub fn new(patterns: Vec<impl ToString>) -> Self {
273+
Self {
274+
valid: AtomicBool::new(false),
275+
patterns: patterns.into_iter().map(|p| p.to_string()).collect(),
276+
}
277+
}
278+
}
279+
280+
impl Rule for DealsUpdateRule {
281+
fn call(&self, msg: &Message) -> bool {
282+
match msg {
283+
Message::Text(text) => {
284+
for pattern in &self.patterns {
285+
if text.starts_with(pattern) {
286+
self.valid.store(true, Ordering::SeqCst);
287+
return true;
288+
}
289+
}
290+
false
291+
}
292+
Message::Binary(_) => {
293+
if self.valid.load(Ordering::SeqCst) {
294+
self.valid.store(false, Ordering::SeqCst);
295+
true
296+
} else {
297+
false
298+
}
299+
}
300+
_ => false }
301+
}
302+
303+
fn reset(&self) {
304+
self.valid.store(false, Ordering::SeqCst)
305+
}
306+
}

crates/binary_options_tools/src/pocketoption_pre/modules/mod.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::sync::Arc;
22

33
use binary_options_tools_core_pre::{error::CoreResult, reimports::Message};
4-
use tracing::debug;
54

65
use crate::pocketoption_pre::state::State;
76

@@ -81,9 +80,9 @@ pub mod deals;
8180
///
8281
/// ```rust
8382
/// // Add as a lightweight handler to the client
84-
/// client.add_lightweight_handler(print_handler);
83+
/// client.with_lightweight_handler(|msg, state, _| Box::pin(print_handler(msg, state)));
8584
/// ```
8685
pub async fn print_handler(msg: Arc<Message>, _state: Arc<State>) -> CoreResult<()> {
87-
debug!(target: "Lightweight", "Received: {msg:?}");
86+
tracing::debug!(target: "Lightweight", "Received: {msg:?}");
8887
Ok(())
8988
}

crates/binary_options_tools/src/pocketoption_pre/modules/trades.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use binary_options_tools_core_pre::{
88
};
99
use serde::Deserialize;
1010
use tokio::select;
11-
use tracing::warn;
11+
use tracing::{info, warn};
1212
use uuid::Uuid;
1313

1414
use crate::pocketoption_pre::{
@@ -171,8 +171,10 @@ impl ApiModule<State> for TradesApiModule {
171171
ServerResponse::Success(deal) => {
172172
// Handle successopenOrder.
173173
// Send CommandResponse::Success to command_responder.
174+
self.state.trade_state.add_opened_deal(*deal.clone()).await;
175+
info!(target: "TradesApiModule", "Trade opened: {}", deal.id);
174176
self.command_responder.send(CommandResponse::Success {
175-
req_id: deal.request_id,
177+
req_id: deal.request_id.unwrap_or_default(), // A request should always have a request_id, only for when returning updateOpenedDeals or updateClosedDeals it can not have any
176178
deal,
177179
}).await?;
178180
}

0 commit comments

Comments
 (0)