Skip to content

Commit 5c4f978

Browse files
authored
Merge pull request dmnd-pool#89 from Fi3/RemoveRecentNotifies
Remove Downstream::recent_notifies
2 parents 99792ca + b09cc29 commit 5c4f978

6 files changed

Lines changed: 201 additions & 213 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "demand-cli"
3-
version = "0.1.11"
3+
version = "0.1.12"
44
edition = "2021"
55

66
[dependencies]

src/translator/downstream/diff_management.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,12 +111,10 @@ impl Downstream {
111111

112112
// Get the last notify
113113
let recent_notify = self_
114-
.safe_lock(|d| d.recent_notifies.back().cloned())
114+
.safe_lock(|d| d.recent_jobs.clone_last())
115115
.map_err(|_| Error::TranslatorDiffConfigMutexPoisoned)?;
116116

117-
if let Some(mut notify) = recent_notify {
118-
let id = self_.safe_lock(|d| d.job_ids.new_v1(notify.job_id.parse::<u32>().unwrap())).unwrap();
119-
notify.job_id = id.to_string();
117+
if let Some(notify) = recent_notify {
120118
Downstream::send_message_downstream(self_.clone(), notify.into()).await;
121119
}
122120

src/translator/downstream/downstream.rs

Lines changed: 120 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,19 @@ use roles_logic_sv2::{
2828
utils::Mutex,
2929
};
3030

31-
use std::{collections::{hash_map::Entry, HashMap, VecDeque}, net::IpAddr, sync::Arc};
31+
use rand::Rng;
32+
use server_to_client::Notify;
33+
use std::{
34+
collections::{hash_map::Entry, HashMap, VecDeque},
35+
net::IpAddr,
36+
sync::Arc,
37+
};
3238
use sv1_api::{
3339
client_to_server, json_rpc, server_to_client,
3440
utils::{Extranonce, HexU32Be},
3541
IsServer,
3642
};
3743
use tracing::{error, info, warn};
38-
use rand::Rng;
3944

4045
#[derive(Debug, Clone)]
4146
pub struct DownstreamDifficultyConfig {
@@ -106,15 +111,13 @@ pub struct Downstream {
106111
/// translation into a SV2 `SubmitSharesExtended`.
107112
tx_sv1_bridge: Sender<DownstreamMessages>,
108113
tx_outgoing: Sender<json_rpc::Message>,
109-
/// True if this is the first job received from `Upstream`.
110-
pub(super) first_job_received: bool,
111114
extranonce2_len: usize,
112115
pub(super) difficulty_mgmt: DownstreamDifficultyConfig,
113116
pub(super) upstream_difficulty_config: Arc<Mutex<UpstreamDifficultyConfig>>,
114117
pub last_call_to_update_hr: u128,
115-
pub(super) recent_notifies: VecDeque<server_to_client::Notify<'static>>,
116118
pub(super) stats_sender: StatsSender,
117-
pub job_ids: JobIds,
119+
pub recent_jobs: RecentJobs,
120+
pub first_job: Notify<'static>,
118121
}
119122

120123
impl Downstream {
@@ -179,11 +182,6 @@ impl Downstream {
179182
initial_difficulty,
180183
};
181184

182-
let mut recent_notifies = VecDeque::with_capacity(2);
183-
if let Some(notify) = last_notify.clone() {
184-
recent_notifies.push_back(notify);
185-
}
186-
187185
let downstream = Arc::new(Mutex::new(Downstream {
188186
connection_id,
189187
authorized_names: vec![],
@@ -192,14 +190,13 @@ impl Downstream {
192190
version_rolling_min_bit: None,
193191
tx_sv1_bridge,
194192
tx_outgoing,
195-
first_job_received: false,
196193
extranonce2_len,
197194
difficulty_mgmt,
198195
upstream_difficulty_config,
199196
last_call_to_update_hr: 0,
200-
recent_notifies: recent_notifies.clone(),
201197
stats_sender,
202-
job_ids: JobIds::new(),
198+
recent_jobs: RecentJobs::new(),
199+
first_job: last_notify.expect("we have an assertion at the beginning of this function"),
203200
}));
204201

205202
if let Err(e) = start_receive_downstream(
@@ -231,7 +228,6 @@ impl Downstream {
231228
task_manager.clone(),
232229
downstream.clone(),
233230
rx_sv1_notify,
234-
recent_notifies,
235231
host.clone(),
236232
connection_id,
237233
)
@@ -354,7 +350,6 @@ impl Downstream {
354350
version_rolling_min_bit: Option<HexU32Be>,
355351
tx_sv1_bridge: Sender<DownstreamMessages>,
356352
tx_outgoing: Sender<json_rpc::Message>,
357-
first_job_received: bool,
358353
extranonce2_len: usize,
359354
difficulty_mgmt: DownstreamDifficultyConfig,
360355
upstream_difficulty_config: Arc<Mutex<UpstreamDifficultyConfig>>,
@@ -368,13 +363,13 @@ impl Downstream {
368363
version_rolling_min_bit,
369364
tx_sv1_bridge,
370365
tx_outgoing,
371-
first_job_received,
372366
extranonce2_len,
373367
difficulty_mgmt,
374368
upstream_difficulty_config,
375369
last_call_to_update_hr: 0,
376-
recent_notifies: VecDeque::with_capacity(2),
370+
first_job: Notify,
377371
stats_sender,
372+
recent_jobs: RecentJobs::new(),
378373
}
379374
}
380375
}
@@ -393,6 +388,10 @@ impl IsServer<'static> for Downstream {
393388

394389
self.version_rolling_mask = Some(version_rolling_mask.clone());
395390
self.version_rolling_min_bit = Some(version_rolling_min_bit_count.clone());
391+
let mut first_job = self.first_job.clone();
392+
self.recent_jobs
393+
.add_job(&mut first_job, self.version_rolling_mask.clone());
394+
self.first_job = first_job;
396395

397396
(
398397
Some(server_to_client::VersionRollingParams::new(
@@ -441,76 +440,73 @@ impl IsServer<'static> for Downstream {
441440
request.id, request.user_name, request.job_id, request.nonce
442441
);
443442

444-
// check first job received
445-
if !self.first_job_received {
446-
self.stats_sender.update_rejected_shares(self.connection_id);
447-
return false;
448-
}
449443
let mut request = request.clone();
450444
let job_id_as_number = request.job_id.parse::<u32>();
451445
if job_id_as_number.is_err() {
452-
error!("Share rejected: can not convert v1 job id to number. v1 id: {}", request.job_id);
446+
error!(
447+
"Share rejected: can not convert v1 job id to number. v1 id: {}",
448+
request.job_id
449+
);
453450
self.stats_sender.update_rejected_shares(self.connection_id);
454451
return false;
455452
}
456-
let v2_id = self.job_ids.get_v2(job_id_as_number.unwrap());
457-
if v2_id.is_none() {
458-
error!("Share rejected: can not convert from v1 to v2 job id. v1 id: {}", request.job_id);
459-
self.stats_sender.update_rejected_shares(self.connection_id);
460-
return false;
461-
}
462-
request.job_id = v2_id.unwrap();
463-
//check allowed to send shares
464453
match allow_submit_share() {
465454
Ok(true) => {
466-
if self.recent_notifies.is_empty() {
467-
error!("Share rejected: No last job found");
468-
self.stats_sender.update_rejected_shares(self.connection_id);
469-
return false;
470-
};
471455
crate::translator::utils::update_share_count(self.connection_id); // update share count
472-
473-
//check share is valid
474-
if let Some(met_difficulty) = validate_share(
475-
&request,
476-
&self.recent_notifies,
477-
&self.difficulty_mgmt.current_difficulties,
478-
self.extranonce1.clone(),
479-
self.version_rolling_mask.clone(),
480-
) {
481-
// Only forward upstream if the share meets the latest difficulty
482-
if let Some(latest_difficulty) =
483-
self.difficulty_mgmt.current_difficulties.back()
484-
{
485-
if met_difficulty == *latest_difficulty {
486-
let to_send = SubmitShareWithChannelId {
487-
channel_id: self.connection_id,
488-
share: request.clone(),
489-
extranonce: self.extranonce1.clone(),
490-
extranonce2_len: self.extranonce2_len,
491-
version_rolling_mask: self.version_rolling_mask.clone(),
492-
};
493-
if let Err(e) = self
494-
.tx_sv1_bridge
495-
.try_send(DownstreamMessages::SubmitShares(to_send))
496-
{
497-
error!("Failed to start receive downstream task: {e:?}");
498-
self.stats_sender.update_rejected_shares(self.connection_id);
499-
// Return false because submit was not properly handled
500-
return false;
456+
if let Some(job) = self
457+
.recent_jobs
458+
.get_matching_job(job_id_as_number.expect("checked above"))
459+
{
460+
request.job_id = job.job_id.clone();
461+
//check share is valid
462+
if let Some(met_difficulty) = validate_share(
463+
&request,
464+
&job,
465+
&self.difficulty_mgmt.current_difficulties,
466+
self.extranonce1.clone(),
467+
self.version_rolling_mask.clone(),
468+
) {
469+
// Only forward upstream if the share meets the latest difficulty
470+
if let Some(latest_difficulty) =
471+
self.difficulty_mgmt.current_difficulties.back()
472+
{
473+
if met_difficulty == *latest_difficulty {
474+
let to_send = SubmitShareWithChannelId {
475+
channel_id: self.connection_id,
476+
share: request.clone(),
477+
extranonce: self.extranonce1.clone(),
478+
extranonce2_len: self.extranonce2_len,
479+
version_rolling_mask: self.version_rolling_mask.clone(),
480+
};
481+
if let Err(e) = self
482+
.tx_sv1_bridge
483+
.try_send(DownstreamMessages::SubmitShares(to_send))
484+
{
485+
error!("Failed to start receive downstream task: {e:?}");
486+
self.stats_sender.update_rejected_shares(self.connection_id);
487+
// Return false because submit was not properly handled
488+
return false;
489+
}
501490
}
502491
}
492+
self.stats_sender.update_accepted_shares(self.connection_id);
493+
info!(
494+
"Share for Job {} and difficulty {} is accepted",
495+
request.job_id, met_difficulty
496+
);
497+
return true;
498+
} else {
499+
error!("Share rejected: Invalid share");
500+
self.stats_sender.update_rejected_shares(self.connection_id);
501+
return false;
503502
}
504-
self.stats_sender.update_accepted_shares(self.connection_id);
505-
info!(
506-
"Share for Job {} and difficulty {} is accepted",
507-
request.job_id, met_difficulty
508-
);
509-
true
510503
} else {
511-
error!("Share rejected: Invalid share");
504+
error!(
505+
"Share rejected: can not find job with id {}",
506+
request.job_id
507+
);
512508
self.stats_sender.update_rejected_shares(self.connection_id);
513-
false
509+
return false;
514510
}
515511
}
516512
Ok(false) => {
@@ -596,14 +592,55 @@ impl IsDownstream for Downstream {
596592
}
597593

598594
#[derive(Debug)]
599-
pub struct JobIds {
600-
v1_to_v2: HashMap<u32,u32>,
601-
v2_to_v1: HashMap<u32,Vec<u32>>,
602-
last_v2s: CircularBuffer<u32,3>,
595+
pub struct RecentJobs {
596+
v1_to_v2: HashMap<u32, u32>,
597+
v2_to_v1: HashMap<u32, Vec<u32>>,
598+
jobs: VecDeque<Notify<'static>>,
599+
last_v2s: CircularBuffer<u32, 3>,
600+
tracked_jobs: usize,
601+
}
602+
fn apply_mask(mask: Option<HexU32Be>, message: &mut server_to_client::Notify<'static>) {
603+
if let Some(mask) = mask {
604+
message.version = HexU32Be(message.version.0 & !mask.0);
605+
}
603606
}
607+
impl RecentJobs {
608+
pub fn add_job(&mut self, notify: &mut Notify<'static>, mask: Option<HexU32Be>) {
609+
apply_mask(mask, notify);
610+
// save it with the v2 id
611+
self.jobs.push_back(notify.clone());
612+
let new_id = self.new_v1(notify.job_id.parse::<u32>().unwrap());
613+
// send it with the v1 id
614+
notify.job_id = new_id.to_string();
615+
if self.jobs.len() > self.tracked_jobs {
616+
self.jobs.pop_front();
617+
};
618+
}
619+
620+
pub fn clone_last(&mut self) -> Option<Notify<'static>> {
621+
if let Some(job) = self.jobs.back() {
622+
let mut job = job.clone();
623+
let new_id = self.new_v1(job.job_id.parse::<u32>().unwrap());
624+
job.job_id = new_id.to_string();
625+
Some(job.clone())
626+
} else {
627+
None
628+
}
629+
}
630+
631+
pub fn current_jobs(&self) -> VecDeque<Notify<'static>> {
632+
self.jobs.clone()
633+
}
604634

605-
impl JobIds {
606-
pub fn new_v1(&mut self, v2_id: u32) -> u32 {
635+
pub fn get_matching_job(&self, v1_id: u32) -> Option<Notify<'static>> {
636+
let v2_id = self.get_v2(v1_id)?;
637+
self.current_jobs()
638+
.iter()
639+
.find(|notify| notify.job_id == v2_id)
640+
.cloned()
641+
}
642+
643+
fn new_v1(&mut self, v2_id: u32) -> u32 {
607644
let mut v1_id = rand::thread_rng().gen();
608645
while self.v1_to_v2.contains_key(&v1_id) {
609646
v1_id = rand::thread_rng().gen();
@@ -637,17 +674,18 @@ impl JobIds {
637674
v1_to_v2: HashMap::new(),
638675
v2_to_v1: HashMap::new(),
639676
last_v2s: CircularBuffer::new(),
677+
jobs: VecDeque::new(),
678+
tracked_jobs: 3,
640679
}
641680
}
642681
}
643682

644-
impl Default for JobIds {
683+
impl Default for RecentJobs {
645684
fn default() -> Self {
646685
Self::new()
647686
}
648687
}
649688

650-
651689
#[derive(Debug)]
652690
struct CircularBuffer<T, const N: usize> {
653691
buffer: [Option<T>; N],
@@ -680,7 +718,6 @@ impl<T, const N: usize> CircularBuffer<T, N> {
680718
}
681719
}
682720

683-
684721
//#[cfg(test)]
685722
//mod tests {
686723
// use super::*;

0 commit comments

Comments
 (0)