Skip to content

Commit 3371d84

Browse files
committed
translate tcpcl tests to uts
1 parent fee5a51 commit 3371d84

4 files changed

Lines changed: 181 additions & 6 deletions

File tree

scapy/contrib/dtn/tcpcl_session.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
# scapy.contrib.description = TCP Convergence Layer version 4 (TCPCLv4)
2+
# scapy.contrib.status = loads
3+
4+
# These classes support unit testing of the TCPCL scapy layer (scapy.contrib.dtn.tcpcl) and illustrate how the protocol messages may be used to emulate a TCPCL session.
5+
6+
from scapy.all import Raw, raw, TCP, Packet, bind_layers, split_layers
7+
import scapy.contrib.dtn.tcpcl as TCPCL
8+
from typing import List
9+
10+
11+
class Session:
12+
"""
13+
TCPCL messages are conventionally, but not necessarily, sent on port 4556. Since this cannot be relied upon, especially on a localhost session, the best way to bind TCP packets to TCPCL message is to track the state of a TCPCL session. Once Contact Headers are successfuly exchanged, TCP packets can be assumed to carry payloads of TCPCL messages until the session ends.
14+
"""
15+
def __init__(self):
16+
self.contact_init = False
17+
self.contact_ack = False
18+
self.is_active = False
19+
self.term_begun = False
20+
self.sport = 0
21+
self.dport = 0
22+
23+
@staticmethod
24+
def bind_messages(sport, dport):
25+
bind_layers(TCP, TCPCL.MsgHeader, sport=sport, dport=dport)
26+
bind_layers(TCP, TCPCL.MsgHeader, sport=dport, dport=sport)
27+
28+
@staticmethod
29+
def split_messages(sport, dport):
30+
split_layers(TCP, TCPCL.MsgHeader, sport=sport, dport=dport)
31+
split_layers(TCP, TCPCL.MsgHeader, sport=dport, dport=sport)
32+
33+
def activate(self):
34+
if not (self.contact_init and self.contact_ack):
35+
raise Exception("tried to activate a session before initialization and acknowledgement")
36+
37+
self.is_active = self.contact_init and self.contact_ack
38+
39+
Session.bind_messages(self.sport, self.dport)
40+
41+
def terminate(self):
42+
if not (self.contact_init and self.contact_ack):
43+
raise Exception("tried to terminate a session while none was active")
44+
45+
self.is_active = self.contact_ack = self.contact_init = False
46+
47+
Session.split_messages(self.sport, self.dport)
48+
49+
def init_contact(self, sport, dport):
50+
self.contact_init=True
51+
self.sport = sport
52+
self.dport = dport
53+
54+
def init_timeout(self):
55+
self.contact_init=False
56+
57+
def proc_ack(self):
58+
self.contact_ack=True
59+
self.activate()
60+
61+
def proc_term(self):
62+
self.term_begun = True
63+
64+
def proc_term_ack(self):
65+
self.terminate()
66+
67+
class TestTcpcl():
68+
69+
@staticmethod
70+
def check_pkt(pkt: Packet, options: List[Packet]):
71+
"""Asserts that pkt is equal to one of the packets in options (according to the raw representation)"""
72+
for opt in options:
73+
assert raw(pkt) in list(map(raw, options)), "Failed to build a properly formatted TCPCL message"
74+
75+
@staticmethod
76+
def make_prn():
77+
"""Define a function for processing packets that closes over a new Session.
78+
Return it for use in Scapy.sniff."""
79+
80+
sess = Session()
81+
82+
def process(pkt):
83+
# Manage session initialization
84+
if not sess.is_active:
85+
try: # try to find a Contact Header
86+
pay = pkt[Raw].load
87+
contact = TCPCL.ContactHeader(pay) # should raise unhandled error if
88+
# the TCP payload does not fit ContactHeader
89+
# replace pkt's raw payload with a ContactHeader formatted payload
90+
pkt[TCP].remove_payload()
91+
pkt = pkt / contact
92+
93+
# process ContactHeader
94+
if sess.contact_init: # session aready initialized, Header is an ack
95+
sess.proc_ack()
96+
print("BEGIN TCPCL SESSION")
97+
else:
98+
sess.init_contact(pkt[TCP].sport, pkt[TCP].dport)
99+
except IndexError: # no TCP payload to process
100+
pass
101+
else: # currently in an active session
102+
if TCPCL.SessTerm in pkt:
103+
# process SessTerm
104+
if sess.term_begun:
105+
sess.proc_term_ack()
106+
print("END TCPCL SESSION")
107+
else:
108+
sess.proc_term()
109+
110+
return pkt # end of process
111+
112+
return process # end of make_prn

test/contrib/bpv7.uts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
= Test decode
66
~ dtn bpv7
7-
87
* dissecting valid bundle from string and checking all fields for accuracy
8+
99
import scapy.contrib.dtn.bpv7 as BPv7
1010

1111
bs = '9f8907040282028202018202820101820100821b000000b4e6fc6dae001a000f4240440512dd21860a021002448218640044db675d49860101000258640000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000044dd3243fcff'
@@ -48,8 +48,8 @@ assert block2.crc == b'\xdd\x32\x43\xfc', "Wrong crc in second canonical block"
4848

4949
= Test decode invalid bundle
5050
~ dtn bpv7
51-
5251
* attempting to decode a bundle with two primary block elements (should fail to dissect)
52+
5353
bs = '9f8907040282028202018202820403820105821b000000b700dfc451001a000f424044b3cf0f1d8907040282028202018202820403820105821b000000b700dfc451001a000f424044b3cf0f1dff'
5454

5555
try:
@@ -60,8 +60,8 @@ except:
6060

6161
= Test encode
6262
~ dtn bpv7
63+
* building a bundle
6364

64-
* building bundle
6565
block1 = BPv7.HopCountBlock(
6666
block_number=2,
6767
flags=BPv7.CanonicalBlock.CtrlFlags.DISCARD_IF_NOT_PROCESSED,
@@ -107,8 +107,8 @@ captured_bytes == bundle_bytes
107107

108108
= Test blocks
109109
~ dtn bpv7
110-
111110
* testing a bundle with default block instances can be built without error
111+
112112
bundle=BPv7.Bundle(canonical_blocks=[
113113
BPv7.PreviousNodeBlock(),
114114
BPv7.HopCountBlock(),
@@ -129,8 +129,8 @@ b is not None
129129

130130
= Test bpsec bundle integrity block
131131
~ dtn bpv7 bpsec
132+
* testing that a non-default BIB can be created
132133

133-
* test that a non-default BIB can be created
134134
b = BPv7.BlockIntegrityBlock(
135135
block_number=3,
136136
flags=BPv7.CanonicalBlock.CtrlFlags.BLOCK_MUST_BE_REPLICATED,
@@ -155,8 +155,8 @@ b is not None
155155

156156
= Test bpsec bundle confidentiality blocks
157157
~ dtn bpv7 bpsec
158-
159158
* testing confidentiality block
159+
160160
f = open(scapy_path("test/pcaps/bpv7_bundle_with_con.pcap"), "rb")
161161
content = f.read()
162162

test/contrib/tcpcl.uts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
% TCP Convergence Layer tests
2+
3+
+ Test full TCPCL session
4+
5+
= Test dissect and build
6+
~ dtn tcpcl
7+
* testing packet dissection and build for full TCPCL session from pcap
8+
9+
import scapy.contrib.dtn.tcpcl as TCPCL
10+
from scapy.contrib.dtn.tcpcl_session import TestTcpcl
11+
12+
# Test dissection from pcap
13+
pkts=sniff(offline=scapy_path("test/pcaps/tcpcl.pcap"),
14+
prn=TestTcpcl.make_prn())
15+
assert len(pkts) == 26, "Failed to dissect some packets"
16+
17+
# Define expected messages
18+
init1 = TCPCL.SessInit(
19+
keepalive=17,
20+
segment_mru=200000,
21+
transfer_mru=10000000,
22+
id=b"ipn:1.0"
23+
)
24+
init2 = TCPCL.SessInit(
25+
keepalive=15,
26+
segment_mru=200000,
27+
transfer_mru=10000000,
28+
id=b"ipn:10.0"
29+
)
30+
bundle0 = bytearray.fromhex('9f8907040282028202018202820101820100821b000000b4e6fc6dae001a000f4240440512dd21860a021002448218640044db675d49860101000258640000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000044dd3243fcff')
31+
bundle1 = bytearray.fromhex('9f8907040282028202018202820101820100821b000000b4e6fc6db8001a000f424044bb1ffd92860a021002448218640044db675d4986010100025864010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000004419e13bfbff')
32+
bundle2 = bytearray.fromhex('9f8907040282028202018202820101820100821b000000b4e6fc6dc2001a000f42404418438afa860a021002448218640044db675d498601010002586402000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000445178c503ff')
33+
bundle3 = bytearray.fromhex('9f8907040282028202018202820101820100821b000000b4e6fcfb66001a000f4240449fdacc81860a021002448218640044db675d49860101000258642c0e000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000044b1439f4cff')
34+
flags = TCPCL.Xfer.Flag.START | TCPCL.Xfer.Flag.END
35+
xfer0 = TCPCL.XferSegment(flags=flags, id=0) / bundle0
36+
xfer1 = TCPCL.XferSegment(flags=flags, id=1) / bundle1
37+
xfer2 = TCPCL.XferSegment(flags=flags, id=2) / bundle2
38+
xfer3 = TCPCL.XferSegment(flags=flags, id=3628) / bundle3
39+
ack0 = TCPCL.XferAck(flags=flags, id=0, length=167)
40+
ack1 = TCPCL.XferAck(flags=flags, id=1, length=167)
41+
ack2 = TCPCL.XferAck(flags=flags, id=2, length=167)
42+
ack3 = TCPCL.XferAck(flags=flags, id=3628, length=167)
43+
term0 = TCPCL.SessTerm()
44+
term1 = TCPCL.SessTerm(flags=TCPCL.SessTerm.Flag.REPLY)
45+
46+
# Test that built TCPCL messages have the expected value
47+
# (including auto-computed fields, such as length fields).
48+
# They should be identical to the packets from the pcap.
49+
for pkt in pkts:
50+
try:
51+
msg = pkt[TCPCL.MsgHeader].payload
52+
mtype = type(msg)
53+
if mtype == TCPCL.SessInit:
54+
TestTcpcl.check_pkt(msg, [init1, init2])
55+
elif mtype == TCPCL.XferSegment:
56+
TestTcpcl.check_pkt(msg, [xfer0, xfer1, xfer2, xfer3])
57+
elif mtype == TCPCL.XferAck:
58+
TestTcpcl.check_pkt(msg, [ack0, ack1, ack2, ack3])
59+
elif mtype == TCPCL.SessTerm:
60+
TestTcpcl.check_pkt(msg, [term0, term1])
61+
except IndexError: # pkt contains no TCPCL msg
62+
continue
63+

test/pcaps/tcpcl.pcap

3.01 KB
Binary file not shown.

0 commit comments

Comments
 (0)