1- use std:: net:: { IpAddr , SocketAddr } ;
1+ use std:: { net:: { IpAddr , SocketAddr } , sync :: Arc } ;
22
33use crate :: {
44 config:: Configuration ,
@@ -8,6 +8,7 @@ use futures::{
88 stream:: { SplitSink , SplitStream } ,
99 SinkExt , StreamExt ,
1010} ;
11+ use roles_logic_sv2:: utils:: Mutex ;
1112use tokio:: {
1213 net:: { TcpListener , TcpStream } ,
1314 sync:: mpsc:: { channel, Receiver , Sender } ,
@@ -72,9 +73,10 @@ impl Downstream {
7273 sender : Sender < String > ,
7374 ) {
7475 let ( writer, reader) = framed. split ( ) ;
76+ let firmware = Arc :: new ( Mutex :: new ( Firmware :: Uninitialized ) ) ;
7577 let result = tokio:: select! {
76- result1 = Self :: receive_from_downstream_and_relay_up( reader, sender) => result1,
77- result2 = Self :: receive_from_upstream_and_relay_down( writer, receiver) => result2,
78+ result1 = Self :: receive_from_downstream_and_relay_up( reader, sender, firmware . clone ( ) ) => result1,
79+ result2 = Self :: receive_from_upstream_and_relay_down( writer, receiver, firmware . clone ( ) ) => result2,
7880 } ;
7981 // upstream disconnected make sure to clean everything before exit
8082 match result {
@@ -86,12 +88,24 @@ impl Downstream {
8688 async fn receive_from_downstream_and_relay_up (
8789 mut recv : SplitStream < Framed < TcpStream , LinesCodec > > ,
8890 send : Sender < String > ,
91+ firmware : Arc < Mutex < Firmware > > ,
8992 ) -> Sv1IngressError {
93+ let mut is_subscribed = false ;
9094 let task = tokio:: spawn ( async move {
9195 while let Some ( Ok ( message) ) = recv. next ( ) . await {
9296 if Configuration :: sv1_ingress_log ( ) {
9397 info ! ( "Sending msg to upstream: {}" , message) ;
9498 }
99+ if ! is_subscribed {
100+ if message. contains ( "mining.subscribe" ) {
101+ is_subscribed = true ;
102+ if message. contains ( "LUXminer" ) {
103+ firmware. safe_lock ( |f| * f = Firmware :: Luxor ) . unwrap ( ) ;
104+ } else {
105+ firmware. safe_lock ( |f| * f = Firmware :: Other ) . unwrap ( ) ;
106+ }
107+ }
108+ }
95109 if send. send ( message) . await . is_err ( ) {
96110 error ! ( "Upstream dropped trying to send" ) ;
97111 return Sv1IngressError :: TranslatorDropped ;
@@ -109,12 +123,21 @@ impl Downstream {
109123 async fn receive_from_upstream_and_relay_down (
110124 mut send : SplitSink < Framed < TcpStream , LinesCodec > , String > ,
111125 mut recv : Receiver < String > ,
126+ firmware_ : Arc < Mutex < Firmware > > ,
112127 ) -> Sv1IngressError {
128+ let mut firmware = Firmware :: Uninitialized ;
113129 let task = tokio:: spawn ( async move {
114130 while let Some ( message) = recv. recv ( ) . await {
115- let message = message. replace ( [ '\n' , '\r' ] , "" ) ;
131+ let mut message = message. replace ( [ '\n' , '\r' ] , "" ) ;
132+ if !firmware. is_initialized ( ) {
133+ firmware = firmware_. safe_lock ( |f| * f) . unwrap ( ) ;
134+ } else if firmware. is_luxor ( ) {
135+ if let Some ( pos) = message. find ( '{' ) {
136+ message. insert_str ( pos + 1 , r#""id":null,"# ) ;
137+ }
138+ }
116139 if Configuration :: sv1_ingress_log ( ) {
117- info ! ( "Sending msg to downstream : {}" , message) ;
140+ info ! ( "Sending msg to downstream_ : {}" , message) ;
118141 }
119142 if send. send ( message) . await . is_err ( ) {
120143 warn ! ( "Downstream dropped while trying to send message down" ) ;
@@ -134,3 +157,19 @@ impl Downstream {
134157 }
135158 }
136159}
160+
161+ #[ derive( Debug , Clone , Copy ) ]
162+ enum Firmware {
163+ Luxor ,
164+ Other ,
165+ Uninitialized ,
166+ }
167+
168+ impl Firmware {
169+ fn is_initialized ( & self ) -> bool {
170+ !matches ! ( self , Firmware :: Uninitialized )
171+ }
172+ fn is_luxor ( & self ) -> bool {
173+ matches ! ( self , Firmware :: Luxor )
174+ }
175+ }
0 commit comments