@@ -15070,6 +15070,267 @@ def send_from_fileobj(fileobj, host, port, proto="tcp", path_text=None, **kwargs
1507015070 return _udp_quic_send(fileobj, host, port, **kwargs)
1507115071 return _udp_seq_send(fileobj, host, port, **kwargs)
1507215072
15073+ def _udp_raw_send(fileobj, host, port, **kwargs):
15074+ logger = _logger_from_kwargs(kwargs)
15075+ addr = (host or "127.0.0.1", int(port))
15076+
15077+ # ---- normalize bool-ish flags (URL query values may be strings) ----
15078+ handshake = _kw_bool(kwargs.get("handshake", True), True)
15079+ raw_ack = _kw_bool(kwargs.get("raw_ack", False), False)
15080+ raw_meta = _kw_bool(kwargs.get("raw_meta", True), True)
15081+ raw_sha = _kw_bool(kwargs.get("raw_sha", False), False)
15082+ wait = _kw_bool(kwargs.get("wait", True), True) or _kw_bool(kwargs.get("connect_wait", False), False)
15083+
15084+ verbose = _kw_bool(kwargs.get("verbose", False), False)
15085+
15086+ def _log(msg):
15087+ _net_log(verbose, msg, logger=logger)
15088+
15089+ # ---- numeric params ----
15090+ try:
15091+ chunk = int(kwargs.get("chunk", 1200))
15092+ except Exception:
15093+ chunk = 1200
15094+ if chunk < 256:
15095+ chunk = 256
15096+
15097+ try:
15098+ wt = kwargs.get("wait_timeout", None)
15099+ wt = float(wt) if wt is not None else None
15100+ except Exception:
15101+ wt = None
15102+
15103+ try:
15104+ hello_iv = float(kwargs.get("hello_interval", 0.1) or 0.1)
15105+ except Exception:
15106+ hello_iv = 0.1
15107+ if hello_iv <= 0:
15108+ hello_iv = 0.1
15109+
15110+ # ---- compute total remaining length (for META and/or HASH) ----
15111+ total_len = None
15112+ pos = None
15113+ if raw_meta or raw_sha:
15114+ try:
15115+ pos = fileobj.tell()
15116+ fileobj.seek(0, os.SEEK_END)
15117+ end = fileobj.tell()
15118+ fileobj.seek(pos, os.SEEK_SET)
15119+ total_len = int(end - pos)
15120+ if total_len < 0:
15121+ total_len = None
15122+ except Exception:
15123+ total_len = None
15124+ try:
15125+ if pos is not None:
15126+ fileobj.seek(pos, os.SEEK_SET)
15127+ except Exception:
15128+ pass
15129+
15130+ # ---- precompute expected hash (optional) ----
15131+ expected_hex = None
15132+ raw_hash = (kwargs.get("raw_hash", "sha256") or "sha256").lower()
15133+ if raw_sha and total_len is not None:
15134+ try:
15135+ h = hashlib.sha256() if raw_hash != "md5" else hashlib.md5()
15136+ cur = fileobj.tell()
15137+ while True:
15138+ b = fileobj.read(65536)
15139+ if not b:
15140+ break
15141+ h.update(_to_bytes(b))
15142+ expected_hex = h.hexdigest()
15143+ fileobj.seek(cur, os.SEEK_SET)
15144+ except Exception:
15145+ expected_hex = None
15146+ try:
15147+ if pos is not None:
15148+ fileobj.seek(pos, os.SEEK_SET)
15149+ except Exception:
15150+ pass
15151+
15152+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
15153+
15154+ try:
15155+ # ---- handshake / wait-for-receiver ----
15156+ tok = kwargs.get("token")
15157+ tok = _hs_token() if tok is None else _to_bytes(tok)
15158+
15159+ if wait:
15160+ start_t = time.time()
15161+ while True:
15162+ if wt is not None and wt >= 0 and (time.time() - start_t) >= wt:
15163+ _log("UDP raw: wait_timeout reached; no receiver READY")
15164+ try:
15165+ sock.close()
15166+ except Exception:
15167+ pass
15168+ return False
15169+
15170+ # announce
15171+ if handshake:
15172+ try:
15173+ sock.sendto(b"HELLO " + tok + b"\n", addr)
15174+ except Exception:
15175+ pass
15176+
15177+ if raw_meta and total_len is not None:
15178+ try:
15179+ sock.sendto(b"META " + str(total_len).encode("ascii") + b"\n", addr)
15180+ except Exception:
15181+ pass
15182+
15183+ if raw_sha and expected_hex:
15184+ try:
15185+ sock.sendto(b"HASH " + raw_hash.encode("ascii") + b" " + expected_hex.encode("ascii") + b"\n", addr)
15186+ except Exception:
15187+ pass
15188+
15189+ # wait briefly for READY
15190+ try:
15191+ sock.settimeout(hello_iv)
15192+ except Exception:
15193+ pass
15194+
15195+ try:
15196+ pkt, _a = sock.recvfrom(1024)
15197+ if pkt.startswith(b"READY"):
15198+ # READY or READY <token>
15199+ if b" " in pkt:
15200+ rt = pkt.split(None, 1)[1].strip()
15201+ if rt and rt != tok:
15202+ continue
15203+ _log("UDP raw: received READY from receiver")
15204+ break
15205+ except socket.timeout:
15206+ continue
15207+ except Exception:
15208+ continue
15209+ else:
15210+ # if not waiting, still send META/HASH once up front
15211+ if handshake:
15212+ try:
15213+ sock.sendto(b"HELLO " + tok + b"\n", addr)
15214+ except Exception:
15215+ pass
15216+ if raw_meta and total_len is not None:
15217+ try:
15218+ sock.sendto(b"META " + str(total_len).encode("ascii") + b"\n", addr)
15219+ except Exception:
15220+ pass
15221+ if raw_sha and expected_hex:
15222+ try:
15223+ sock.sendto(b"HASH " + raw_hash.encode("ascii") + b" " + expected_hex.encode("ascii") + b"\n", addr)
15224+ except Exception:
15225+ pass
15226+
15227+ # ---- send data ----
15228+ if raw_ack:
15229+ # sliding window retransmit
15230+ try:
15231+ ack_to = float(kwargs.get("raw_ack_timeout", 0.5) or 0.5)
15232+ except Exception:
15233+ ack_to = 0.5
15234+ try:
15235+ retries_max = int(kwargs.get("raw_ack_retries", 40) or 40)
15236+ except Exception:
15237+ retries_max = 40
15238+ try:
15239+ win = int(kwargs.get("raw_ack_window", 1) or 1)
15240+ except Exception:
15241+ win = 1
15242+ if win < 1:
15243+ win = 1
15244+
15245+ try:
15246+ sock.settimeout(ack_to)
15247+ except Exception:
15248+ pass
15249+
15250+ def _make_pkt(seq, data):
15251+ return b"PKT " + str(seq).encode("ascii") + b" " + _to_bytes(data)
15252+
15253+ base_seq = 0
15254+ next_seq = 0
15255+ pkts = {}
15256+ eof = False
15257+ timeout_tries = 0
15258+
15259+ while True:
15260+ # fill window
15261+ while (not eof) and next_seq < base_seq + win:
15262+ data = fileobj.read(chunk)
15263+ if not data:
15264+ eof = True
15265+ break
15266+ pkt = _make_pkt(next_seq, data)
15267+ pkts[next_seq] = pkt
15268+ try:
15269+ sock.sendto(pkt, addr)
15270+ except Exception:
15271+ pass
15272+ next_seq += 1
15273+
15274+ if eof and base_seq == next_seq:
15275+ break
15276+
15277+ try:
15278+ apkt, _a = sock.recvfrom(1024)
15279+ if apkt.startswith(b"ACK "):
15280+ try:
15281+ aseq = int(apkt.split()[1])
15282+ except Exception:
15283+ aseq = -1
15284+ new_base = aseq + 1
15285+ if new_base > base_seq:
15286+ for s in list(pkts.keys()):
15287+ if s < new_base:
15288+ pkts.pop(s, None)
15289+ base_seq = new_base
15290+ timeout_tries = 0
15291+ except socket.timeout:
15292+ timeout_tries += 1
15293+ if retries_max >= 0 and timeout_tries >= retries_max:
15294+ _log("UDP raw: too many ACK timeouts, giving up")
15295+ return False
15296+ # retransmit all in-flight
15297+ for s in range(base_seq, next_seq):
15298+ pkt = pkts.get(s)
15299+ if pkt is None:
15300+ continue
15301+ try:
15302+ sock.sendto(pkt, addr)
15303+ except Exception:
15304+ pass
15305+ except Exception:
15306+ # treat as timeout-ish
15307+ timeout_tries += 1
15308+
15309+ else:
15310+ # legacy raw: just send datagrams
15311+ while True:
15312+ data = fileobj.read(chunk)
15313+ if not data:
15314+ break
15315+ try:
15316+ sock.sendto(_to_bytes(data), addr)
15317+ except Exception:
15318+ pass
15319+
15320+ # ---- finish ----
15321+ try:
15322+ sock.sendto(b"DONE", addr)
15323+ except Exception:
15324+ pass
15325+
15326+ return True
15327+
15328+ finally:
15329+ try:
15330+ sock.close()
15331+ except Exception:
15332+ pass
15333+
1507315334def _udp_raw_recv(fileobj, host, port, **kwargs):
1507415335 logger = _logger_from_kwargs(kwargs)
1507515336 addr = (host or "", int(port))
0 commit comments