@@ -13,22 +13,26 @@ use tokio::net::TcpStream;
1313use tokio:: sync:: mpsc;
1414use tokio:: task:: JoinSet ;
1515use tokio:: time:: timeout;
16+ use std:: collections:: HashSet ;
17+ use std:: sync:: { Arc , Mutex } ;
1618use zond_common:: models:: host:: Host ;
1719use zond_common:: models:: ip:: set:: IpSet ;
18- use zond_common:: models:: port:: { Port , PortState , Protocol } ;
19- use zond_common:: models:: target:: Target ;
20+ use zond_common:: models:: port:: { Port , PortSet , PortState , Protocol } ;
21+ use zond_common:: models:: target:: { Target , TargetMap , TargetSet } ;
2022
2123use super :: STOP_SIGNAL ;
22-
24+ use super :: dispatcher :: Dispatcher ;
2325use crate :: scanner:: increment_host_count;
2426
25- /// Performs an asynchronous, highly-concurrent full TCP connect port scan over a randomized target stream.
27+ /// Most common ports across Linux, Windows, and Networking gear.
28+ const DISCOVERY_PORTS : & [ u16 ] = & [ 22 , 80 , 443 , 445 , 3389 ] ;
29+
30+ /// Performs a high-concurrency, unprivileged port scan.
2631///
27- /// This leverages an [`mpsc::Receiver`] (typically from a [`Dispatcher`]), bounds the number
28- /// of concurrent connections via `concurrency_limit`, and aggregates the findings into
29- /// a minimal set of [`Host`]s. IP addresses that return at least one non-closed port
30- /// will be mapped to a [`Host`] with their discovered [`Port`]s. This is the primary
31- /// port scanning strategy for users without root privileges.
32+ /// This engine is the primary scanning strategy for users without root privileges.
33+ /// It consumes a randomized stream of [`Target`]s from a [`Dispatcher`], maintaining
34+ /// a strictly bounded concurrency set to prevent OS socket exhaustion. Discovered
35+ /// open or filtered ports are aggregated into a collection of [`Host`] entities.
3236pub async fn scan (
3337 mut rx : mpsc:: Receiver < Target > ,
3438 concurrency_limit : usize ,
@@ -107,39 +111,119 @@ async fn port_prober(target: Target) -> anyhow::Result<Option<(IpAddr, Port)>> {
107111 }
108112}
109113
110- /// Discovers live hosts using a standard, unprivileged TCP connect sweep .
114+ /// High-fidelity, multi-port host discovery for unprivileged environments .
111115///
112- /// Sweeps through the provided [`IpSet`], attempting a basic connection to port 443.
113- /// This approach serves as a fallback for users without root privileges where raw
114- /// socket-based ARP/NDP sweeps are not possible. Returns a [`Vec<Host>`] for all IPs
115- /// that responded within the timeout window.
116+ /// This engine performs a rapid sweep of target networks by probing a curated
117+ /// set of infrastructure ports: SSH (22), HTTP (80), HTTPS (443), SMB (445),
118+ /// and RDP (3389). This multi-port approach ensures high discovery fidelity
119+ /// across Linux, Windows, and embedded network targets.
120+ ///
121+ /// ### Characteristics
122+ /// - **Early-Exit**: Probes for an IP are immediately bypassed if the host
123+ /// has already been confirmed alive by a parallel task.
124+ /// - **Randomized**: Target distribution is handled by a shuffling [`Dispatcher`]
125+ /// to minimize local network congestion.
126+ /// - **Fidelity Range**: Uses an adjustable 1000ms timeout window to capture
127+ /// hosts on high-latency or geographically distant links.
116128pub async fn discover ( ips : IpSet ) -> anyhow:: Result < Vec < Host > > {
117- let mut result: Vec < Host > = Vec :: new ( ) ;
118- for ip in ips {
129+ const CONCURRENCY_LIMIT : usize = 2048 ;
130+
131+ // 1. Prepare Target Map for all IP x Common Port combinations
132+ let mut target_map = TargetMap :: new ( ) ;
133+ let port_set = PortSet :: try_from (
134+ DISCOVERY_PORTS . iter ( )
135+ . map ( |p| p. to_string ( ) )
136+ . collect :: < Vec < _ > > ( )
137+ . join ( "," )
138+ . as_str ( )
139+ ) ?;
140+ target_map. add_unit ( TargetSet :: new ( ips, port_set) ) ;
141+
142+ // 2. Setup Dispatcher and Shared State
143+ let dispatcher = Dispatcher :: new ( target_map) . with_batch_size ( 1024 ) ;
144+ let mut rx = dispatcher. run_shuffled ( ) ;
145+ let mut set = JoinSet :: new ( ) ;
146+ let found_hosts = Arc :: new ( Mutex :: new ( HashSet :: new ( ) ) ) ;
147+ let mut hosts = Vec :: new ( ) ;
148+
149+ // 3. Concurrent Execution Loop
150+ while let Some ( target) = rx. recv ( ) . await {
119151 if STOP_SIGNAL . load ( Ordering :: Relaxed ) {
120152 break ;
121153 }
122- if let Some ( found) = prober ( ip) . await ? {
123- result. push ( found) ;
154+
155+ while set. len ( ) >= CONCURRENCY_LIMIT {
156+ if let Some ( res) = set. join_next ( ) . await {
157+ if let Ok ( Ok ( Some ( host) ) ) = res {
158+ hosts. push ( host) ;
159+ }
160+ }
161+ }
162+
163+ let inner_found = Arc :: clone ( & found_hosts) ;
164+ set. spawn ( async move { prober ( target, inner_found) . await } ) ;
165+ }
166+
167+ // 4. Final Collection
168+ while let Some ( res) = set. join_next ( ) . await {
169+ if let Ok ( Ok ( Some ( host) ) ) = res {
170+ hosts. push ( host) ;
124171 }
125172 }
126- Ok ( result)
173+
174+ Ok ( hosts)
127175}
128176
129- /// Attempts a basic TCP connection to port 443 on the specified [`IpAddr`] .
177+ /// Concurrent network host prober .
130178///
131- /// Returns a basic [`Host`] object on success. Times out quickly (`100ms`) since it's
132- /// primarily intended for rapid host discovery, not deep service enumeration.
133- async fn prober ( ip : IpAddr ) -> anyhow:: Result < Option < Host > > {
134- let socket_addr: SocketAddr = SocketAddr :: new ( ip, 443 ) ;
135- let probe_timeout: Duration = Duration :: from_millis ( 100 ) ;
179+ /// Attempts a TCP connection to a specific [`Target`]. To minimize unnecessary
180+ /// network traffic and OS resource usage, it employs a thread-safe early-exit
181+ /// mechanism: if the host has already been identified by a parallel probe
182+ /// (e.g., SSH responded before HTTP), this task terminates immediately.
183+ async fn prober ( target : Target , found_set : Arc < Mutex < HashSet < IpAddr > > > ) -> anyhow:: Result < Option < Host > > {
184+ // 1. Early exit if already discovered
185+ {
186+ let set = found_set. lock ( ) . unwrap ( ) ;
187+ if set. contains ( & target. ip ) {
188+ return Ok ( None ) ;
189+ }
190+ }
191+
192+ let socket_addr: SocketAddr = SocketAddr :: new ( target. ip , target. port ) ;
193+ let probe_timeout: Duration = Duration :: from_millis ( 1000 ) ;
136194
137195 let start: Instant = Instant :: now ( ) ;
138196 match timeout ( probe_timeout, TcpStream :: connect ( socket_addr) ) . await {
139- Ok ( Ok ( _) ) | Ok ( Err ( _) ) => {
140- increment_host_count ( ) ;
141- let host: Host = Host :: new ( ip) . with_rtt ( start. elapsed ( ) ) ;
142- Ok ( Some ( host) )
197+ Ok ( Ok ( _) ) => {
198+ // 2. Successful handshake -> Host is alive
199+ let mut set = found_set. lock ( ) . unwrap ( ) ;
200+ if set. insert ( target. ip ) {
201+ increment_host_count ( ) ;
202+ let host: Host = Host :: new ( target. ip ) . with_rtt ( start. elapsed ( ) ) ;
203+ Ok ( Some ( host) )
204+ } else {
205+ Ok ( None )
206+ }
207+ }
208+ Ok ( Err ( e) ) => {
209+ use std:: io:: ErrorKind ;
210+ // 3. Only specific TCP errors imply the target host responded at the IP/TCP layer
211+ match e. kind ( ) {
212+ ErrorKind :: ConnectionRefused | ErrorKind :: ConnectionReset | ErrorKind :: ConnectionAborted => {
213+ let mut set = found_set. lock ( ) . unwrap ( ) ;
214+ if set. insert ( target. ip ) {
215+ increment_host_count ( ) ;
216+ let host: Host = Host :: new ( target. ip ) . with_rtt ( start. elapsed ( ) ) ;
217+ Ok ( Some ( host) )
218+ } else {
219+ Ok ( None )
220+ }
221+ }
222+ _ => {
223+ // Ignore local network errors (No route, Permission denied, etc.)
224+ Ok ( None )
225+ }
226+ }
143227 }
144228 Err ( _elapsed) => Ok ( None ) ,
145229 }
0 commit comments