diff --git a/scapy/packet.py b/scapy/packet.py index 8c558513157..232cfc53bfc 100644 --- a/scapy/packet.py +++ b/scapy/packet.py @@ -101,7 +101,7 @@ class Packet( "direction", "sniffed_on", # handle snaplen Vs real length "wirelen", - "comment", + "comments", "process_information" ] name = None @@ -179,7 +179,7 @@ def __init__(self, self.wirelen = None # type: Optional[int] self.direction = None # type: Optional[int] self.sniffed_on = None # type: Optional[_GlobInterfaceType] - self.comment = None # type: Optional[bytes] + self.comments = None # type: Optional[List[bytes]] self.process_information = None # type: Optional[Dict[str, Any]] self.stop_dissection_after = stop_dissection_after if _pkt: @@ -223,6 +223,26 @@ def __init__(self, Optional[bytes], ] + @property + def comment(self): + # type: () -> Optional[bytes] + """Get the comment of the packet""" + if self.comments and len(self.comments): + return self.comments[0] + return None + + @comment.setter + def comment(self, value): + # type: (Optional[bytes]) -> None + """ + Set the comment of the packet. + If value is None, it will clear the comments. + """ + if value is not None: + self.comments = [value] + else: + self.comments = None + def __reduce__(self): # type: () -> Tuple[Type[Packet], Tuple[bytes], Packet._PickleType] """Used by pickling methods""" @@ -435,7 +455,7 @@ def copy(self) -> Self: clone.payload = self.payload.copy() clone.payload.add_underlayer(clone) clone.time = self.time - clone.comment = self.comment + clone.comments = self.comments clone.direction = self.direction clone.sniffed_on = self.sniffed_on return clone @@ -1145,7 +1165,7 @@ def clone_with(self, payload=None, **kargs): self.raw_packet_cache_fields ) pkt.wirelen = self.wirelen - pkt.comment = self.comment + pkt.comments = self.comments pkt.sniffed_on = self.sniffed_on pkt.direction = self.direction if payload is not None: diff --git a/scapy/utils.py b/scapy/utils.py index f7ecf6d447c..648c8f01a19 100644 --- a/scapy/utils.py +++ b/scapy/utils.py @@ -1648,7 +1648,7 @@ class RawPcapNgReader(RawPcapReader): PacketMetadata = collections.namedtuple("PacketMetadataNg", # type: ignore ["linktype", "tsresol", "tshigh", "tslow", "wirelen", - "comment", "ifname", "direction", + "comments", "ifname", "direction", "process_information"]) def __init__(self, filename, fdesc=None, magic=None): # type: ignore @@ -1792,8 +1792,8 @@ def _read_packet(self, size=MTU): # type: ignore return res def _read_options(self, options): - # type: (bytes) -> Dict[int, bytes] - opts = dict() + # type: (bytes) -> Dict[int, Union[bytes, List[bytes]]] + opts = dict() # type: Dict[int, Union[bytes, List[bytes]]] while len(options) >= 4: try: code, length = struct.unpack(self.endian + "HH", options[:4]) @@ -1802,7 +1802,13 @@ def _read_options(self, options): "%d !" % len(options)) raise EOFError if code != 0 and 4 + length <= len(options): - opts[code] = options[4:4 + length] + # https://www.ietf.org/archive/id/draft-tuexen-opsawg-pcapng-05.html#name-options-format + if code in [1, 2988, 2989, 19372, 19373]: + if code not in opts: + opts[code] = [] + opts[code].append(options[4:4 + length]) # type: ignore + else: + opts[code] = options[4:4 + length] if code == 0: if length != 0: warning("PcapNg: invalid option " @@ -1821,6 +1827,12 @@ def _read_block_idb(self, block, _): options_raw = self._read_options(block[8:]) options = self.default_options.copy() # type: Dict[str, Any] for c, v in options_raw.items(): + if isinstance(v, list): + # Spec allows multiple occurrences (see + # https://www.ietf.org/archive/id/draft-tuexen-opsawg-pcapng-05.html#section-4.2-8.6) + # but does not define which to use. We take the first for + # backward compatibility. + v = v[0] if c == 9: length = len(v) if length == 1: @@ -1876,11 +1888,13 @@ def _read_block_epb(self, block, size): process_information = {} for code, value in options.items(): - if code in [0x8001, 0x8003]: # PCAPNG_EPB_PIB_INDEX, PCAPNG_EPB_E_PIB_INDEX + # PCAPNG_EPB_PIB_INDEX, PCAPNG_EPB_E_PIB_INDEX + if code in [0x8001, 0x8003]: try: - proc_index = struct.unpack(self.endian + "I", value)[0] + proc_index = struct.unpack( + self.endian + "I", value)[0] # type: ignore except struct.error: - warning("PcapNg: EPB invalid proc index" + warning("PcapNg: EPB invalid proc index " "(expected 4 bytes, got %d) !" % len(value)) raise EOFError if proc_index < len(self.process_information): @@ -1890,9 +1904,9 @@ def _read_block_epb(self, block, size): warning("PcapNg: EPB invalid process information index " "(%d/%d) !" % (proc_index, len(self.process_information))) - comment = options.get(1, None) + comments = options.get(1, None) epb_flags_raw = options.get(2, None) - if epb_flags_raw: + if epb_flags_raw and isinstance(epb_flags_raw, bytes): try: epb_flags, = struct.unpack(self.endian + "I", epb_flags_raw) except struct.error: @@ -1913,10 +1927,10 @@ def _read_block_epb(self, block, size): tshigh=tshigh, tslow=tslow, wirelen=wirelen, - comment=comment, ifname=ifname, direction=direction, - process_information=process_information)) + process_information=process_information, + comments=comments)) def _read_block_spb(self, block, size): # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] @@ -1940,10 +1954,10 @@ def _read_block_spb(self, block, size): tshigh=None, tslow=None, wirelen=wirelen, - comment=None, ifname=None, direction=None, - process_information={})) + process_information={}, + comments=None)) def _read_block_pkt(self, block, size): # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] @@ -1964,10 +1978,10 @@ def _read_block_pkt(self, block, size): tshigh=tshigh, tslow=tslow, wirelen=wirelen, - comment=None, ifname=None, direction=None, - process_information={})) + process_information={}, + comments=None)) def _read_block_dsb(self, block, size): # type: (bytes, int) -> None @@ -2039,10 +2053,11 @@ def _read_block_pib(self, block, _): options = self._read_options(block) for code, value in options.items(): if code == 2: - process_information["name"] = value.decode("ascii", "backslashreplace") + process_information["name"] = value.decode( # type: ignore + "ascii", "backslashreplace") elif code == 4: if len(value) == 16: - process_information["uuid"] = str(UUID(bytes=value)) + process_information["uuid"] = str(UUID(bytes=value)) # type: ignore else: warning("PcapNg: DPEB UUID length is invalid (%d)!", len(value)) @@ -2068,7 +2083,7 @@ def read_packet(self, size=MTU, **kwargs): rp = super(PcapNgReader, self)._read_packet(size=size) if rp is None: raise EOFError - s, (linktype, tsresol, tshigh, tslow, wirelen, comment, ifname, direction, process_information) = rp # noqa: E501 + s, (linktype, tsresol, tshigh, tslow, wirelen, comments, ifname, direction, process_information) = rp # noqa: E501 try: cls = conf.l2types.num2layer[linktype] # type: Type[Packet] p = cls(s, **kwargs) # type: Packet @@ -2084,7 +2099,7 @@ def read_packet(self, size=MTU, **kwargs): if tshigh is not None: p.time = EDecimal((tshigh << 32) + tslow) / tsresol p.wirelen = wirelen - p.comment = comment + p.comments = comments p.direction = direction p.process_information = process_information.copy() if ifname is not None: @@ -2110,9 +2125,9 @@ def _write_packet(self, usec=None, # type: Optional[int] caplen=None, # type: Optional[int] wirelen=None, # type: Optional[int] - comment=None, # type: Optional[bytes] ifname=None, # type: Optional[bytes] direction=None, # type: Optional[int] + comments=None, # type: Optional[List[bytes]] ): # type: (...) -> None raise NotImplementedError @@ -2193,7 +2208,7 @@ def write_packet(self, if wirelen is None: wirelen = caplen - comment = getattr(packet, "comment", None) + comments = getattr(packet, "comments", None) ifname = getattr(packet, "sniffed_on", None) direction = getattr(packet, "direction", None) if not isinstance(packet, bytes): @@ -2208,10 +2223,10 @@ def write_packet(self, rawpkt, sec=f_sec, usec=usec, caplen=caplen, wirelen=wirelen, - comment=comment, ifname=ifname, direction=direction, - linktype=linktype + linktype=linktype, + comments=comments, ) @@ -2363,9 +2378,9 @@ def _write_packet(self, usec=None, # type: Optional[int] caplen=None, # type: Optional[int] wirelen=None, # type: Optional[int] - comment=None, # type: Optional[bytes] ifname=None, # type: Optional[bytes] direction=None, # type: Optional[int] + comments=None, # type: Optional[List[bytes]] ): # type: (...) -> None """ @@ -2541,7 +2556,7 @@ def _write_block_epb(self, timestamp=None, # type: Optional[Union[EDecimal, float]] # noqa: E501 caplen=None, # type: Optional[int] orglen=None, # type: Optional[int] - comment=None, # type: Optional[bytes] + comments=None, # type: Optional[List[bytes]] flags=None, # type: Optional[int] ): # type: (...) -> None @@ -2576,11 +2591,12 @@ def _write_block_epb(self, # Options opts = b'' - if comment is not None: - comment = bytes_encode(comment) - opts += struct.pack(self.endian + "HH", 1, len(comment)) - # Pad Option Value to 32 bits - opts += self._add_padding(comment) + if comments and len(comments): + for c in comments: + comment = bytes_encode(c) + opts += struct.pack(self.endian + "HH", 1, len(comment)) + # Pad Option Value to 32 bits + opts += self._add_padding(comment) if type(flags) == int: opts += struct.pack(self.endian + "HH", 2, 4) opts += struct.pack(self.endian + "I", flags) @@ -2597,9 +2613,9 @@ def _write_packet(self, # type: ignore usec=None, # type: Optional[int] caplen=None, # type: Optional[int] wirelen=None, # type: Optional[int] - comment=None, # type: Optional[bytes] ifname=None, # type: Optional[bytes] direction=None, # type: Optional[int] + comments=None, # type: Optional[List[bytes]] ): # type: (...) -> None """ @@ -2655,7 +2671,7 @@ def _write_packet(self, # type: ignore flags = None self._write_block_epb(packet, timestamp=sec, caplen=caplen, - orglen=wirelen, comment=comment, ifid=ifid, flags=flags) + orglen=wirelen, comments=comments, ifid=ifid, flags=flags) if self.sync: self.f.flush() diff --git a/test/regression.uts b/test/regression.uts index 6ff43dae945..5e6b109028f 100644 --- a/test/regression.uts +++ b/test/regression.uts @@ -2250,6 +2250,12 @@ wrpcapng(tmpfile, p) l = rdpcap(tmpfile) assert l[0].comment == p.comment +p = Ether() / IPv6() / TCP() +p.comments = [b"Hello!", b"Scapy!", b"Pcapng!"] +wrpcapng(tmpfile, p) +l = rdpcap(tmpfile) +assert l[0].comments == p.comments + = rdpcap on fifo ~ linux f = get_temp_file()