Skip to content

Commit ae0a111

Browse files
committed
p2p challenge and anti-spoof
1 parent 7321a5d commit ae0a111

4 files changed

Lines changed: 178 additions & 61 deletions

File tree

ex/lib/node/node_gen.ex

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@ defmodule NodeGen do
77

88
def init([ip_tuple, _port]) do
99
ip = Tuple.to_list(ip_tuple) |> Enum.join(".")
10-
NodePeers.seed(ip)
10+
NodeANR.seed()
11+
#NodePeers.seed(ip)
1112

1213
state = %{
1314
ns: NodeState.init()
1415
}
1516

1617
:erlang.send_after(1000, self(), :tick)
1718
:erlang.send_after(1000, self(), :tick_ping)
19+
:erlang.send_after(1000, self(), :tick_anr)
1820
{:ok, state}
1921
end
2022

@@ -36,6 +38,21 @@ defmodule NodeGen do
3638
end)
3739
end
3840

41+
def broadcast_check_anr(state) do
42+
my_pk = Application.fetch_env!(:ama, :trainer_pk)
43+
NodeANR.get_random_unverified(3)
44+
|> Enum.filter(& elem(&1,0) != my_pk)
45+
|> Enum.reduce(state, fn({pk, ip}, state)->
46+
IO.inspect {:anr_check, ip}
47+
challenge = :os.system_time(1)
48+
:erlang.spawn(fn()->
49+
msg = NodeProto.new_phone_who_dis(challenge)
50+
send(get_socket_gen(), {:send_to_some, [ip], NodeProto.compress(msg)})
51+
end)
52+
put_in(state, [:ns, :challenges, pk], challenge)
53+
end)
54+
end
55+
3956
def broadcast(:txpool, who, [txs_packed]) do
4057
:erlang.spawn(fn()->
4158
msg = NodeProto.txpool(txs_packed)
@@ -81,20 +98,28 @@ defmodule NodeGen do
8198
end
8299

83100
def handle_info(msg, state) do
84-
case msg do
101+
state = case msg do
85102

86103
:tick ->
87104
:erlang.send_after(1000, self(), :tick)
88105
tick()
106+
state
89107

90108
:tick_ping ->
91109
:erlang.send_after(500, self(), :tick_ping)
92110
broadcast_ping()
111+
state
112+
113+
:tick_anr ->
114+
:erlang.send_after(1000, self(), :tick_anr)
115+
state = broadcast_check_anr(state)
116+
state
93117

94118
{:handle_sync, op, innerstate, args} ->
95119
#TODO: ns dropped
96120
innerstate = Map.put(innerstate, :ns, state.ns)
97-
NodeState.handle(op, innerstate, args)
121+
innerstate = NodeState.handle(op, innerstate, args)
122+
Map.put(state, :ns, innerstate)
98123

99124
end
100125
{:noreply, state}

ex/lib/node/node_gen_socket_gen.ex

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,26 +58,36 @@ defmodule NodeGenSocketGen do
5858
case msg do
5959
{:udp, _socket, ip, _inportno, data} ->
6060
#IO.puts IO.ANSI.red() <> inspect({:relay_from, ip, msg.op}) <> IO.ANSI.reset()
61-
6261
:erlang.spawn(fn()->
6362
try do
6463
case NodeProto.unpack_message_v2(data) do
6564
%{error: :signature, shard_total: 1, pk: pk, version: version, signature: signature, payload: payload} ->
6665
if !BlsEx.verify?(pk, signature, Blake3.hash(pk<>payload), BLS12AggSig.dst_node()), do: throw(%{error: :invalid_signature})
66+
6767
msg = payload
6868
|> NodeProto.deflate_decompress()
6969
|> :erlang.binary_to_term([:safe])
70+
7071
peer_ip = Tuple.to_list(ip) |> Enum.join(".")
7172
peer = %{ip: peer_ip, signer: pk, version: version}
72-
NodeState.handle(msg.op, %{peer: peer}, msg)
73+
74+
hasPermissionSlip = NodeANR.handshaked_and_valid_ip4(pk, peer_ip)
75+
cond do
76+
hasPermissionSlip -> NodeState.handle(msg.op, %{peer: peer}, msg)
77+
msg.op in [:ping, :new_phone_who_dis, :what?] -> NodeState.handle(msg.op, %{peer: peer}, msg)
78+
true -> nil
79+
end
7380

7481
%{error: :signature, pk: pk, signature: signature, ts_nano: ts_nano, shard_index: shard_index, shard_total: shard_total,
7582
version: version, original_size: original_size, payload: payload}
7683
->
7784
peer_ip = Tuple.to_list(ip) |> Enum.join(".")
7885

79-
gen = NodeGen.get_reassembly_gen(pk, ts_nano)
80-
send(gen, {:add_shard, {pk, ts_nano, shard_total}, {peer_ip, version, nil, signature, shard_index, original_size}, payload})
86+
hasPermissionSlip = NodeANR.handshaked_and_valid_ip4(pk, peer_ip)
87+
if hasPermissionSlip do
88+
gen = NodeGen.get_reassembly_gen(pk, ts_nano)
89+
send(gen, {:add_shard, {pk, ts_nano, shard_total}, {peer_ip, version, nil, signature, shard_index, original_size}, payload})
90+
end
8191

8292
%{error: :encrypted, shard_total: 1, pk: pk, version: version, ts_nano: ts_nano, payload: payload} ->
8393
shared_secret = NodePeers.get_shared_secret(pk)
@@ -92,20 +102,32 @@ defmodule NodeGenSocketGen do
92102

93103
peer_ip = Tuple.to_list(ip) |> Enum.join(".")
94104
peer = %{ip: peer_ip, signer: pk, version: version}
95-
NodeState.handle(msg.op, %{peer: peer}, msg)
105+
106+
hasPermissionSlip = NodeANR.handshaked_and_valid_ip4(pk, peer_ip)
107+
cond do
108+
hasPermissionSlip -> NodeState.handle(msg.op, %{peer: peer}, msg)
109+
msg.op in [:ping, :new_phone_who_dis, :what?] -> NodeState.handle(msg.op, %{peer: peer}, msg)
110+
true -> nil
111+
end
112+
96113

97114
%{error: :encrypted, pk: pk, ts_nano: ts_nano, shard_index: shard_index, shard_total: shard_total,
98115
version: version, original_size: original_size, payload: payload} ->
99-
shared_secret = NodePeers.get_shared_secret(pk)
100-
101116
peer_ip = Tuple.to_list(ip) |> Enum.join(".")
102-
gen = NodeGen.get_reassembly_gen(pk, ts_nano)
103-
send(gen, {:add_shard, {pk, ts_nano, shard_total}, {peer_ip, version, shared_secret, nil, shard_index, original_size}, payload})
104117

105-
_ -> nil
118+
hasPermissionSlip = NodeANR.handshaked_and_valid_ip4(pk, peer_ip)
119+
if hasPermissionSlip do
120+
shared_secret = NodePeers.get_shared_secret(pk)
121+
122+
gen = NodeGen.get_reassembly_gen(pk, ts_nano)
123+
send(gen, {:add_shard, {pk, ts_nano, shard_total}, {peer_ip, version, shared_secret, nil, shard_index, original_size}, payload})
124+
end
125+
126+
_data ->
127+
nil
106128
end
107129
catch
108-
_,_ -> nil
130+
_e, _r -> nil
109131
end
110132
end)
111133

ex/lib/node/node_peers.ex

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,53 @@
11
defmodule NodePeers do
2+
def clear_stale() do
3+
ts_m = :os.system_time(1000)
4+
5+
validators = Consensus.trainers_for_height(Consensus.chain_height()+1)
6+
validator_anr_ips = NodeANR.by_pks_ip(validators)
7+
validators_map = validators |> Enum.into(%{}, & {&1,true})
8+
9+
handshaked_ips = NodeANR.handshaked_pk_ip4() |> Enum.map(& elem(&1,1))
10+
11+
{cur_ips, cur_val_ips} = :ets.foldl(fn({key, v}, {acc, acc_vals})->
12+
lp = v[:last_msg]
13+
cond do
14+
!lp -> :ets.insert(NODEPeers, {key, Map.put(v, :last_msg, ts_m)})
15+
ts_m > lp + (1_000*60) -> :ets.delete(NODEPeers, key)
16+
true -> nil
17+
end
18+
if validators_map[v[:pk]] do
19+
{acc, acc_vals ++ [key]}
20+
else
21+
{acc ++ [key], acc_vals}
22+
end
23+
end, {[], []}, NODEPeers)
24+
25+
missing_vals = validator_anr_ips -- cur_val_ips
26+
missing_ips = handshaked_ips -- cur_ips
27+
28+
max_peers = Application.fetch_env!(:ama, :max_peers)
29+
add_size = max_peers - :ets.info(NODEPeers, :size) - length(cur_val_ips) - length(missing_vals)
30+
31+
missing_ips = Enum.take(Enum.shuffle(missing_ips), add_size)
32+
33+
Enum.each(missing_vals ++ missing_ips, fn(ip)->
34+
:ets.insert(NODEPeers, {ip, %{ip: ip}})
35+
end)
36+
end
37+
238
def seed(my_ip) do
339
seeds = Application.fetch_env!(:ama, :seednodes)
440
nodes = Application.fetch_env!(:ama, :othernodes)
541
filtered = Enum.uniq(seeds ++ nodes) -- [my_ip]
642
Enum.each(filtered, fn(ip)->
743
:ets.insert(NODEPeers, {ip, %{ip: ip, static: true}})
844
end)
45+
46+
validators = Consensus.trainers_for_height(Consensus.chain_height()+1)
47+
validators = NodeANR.by_pks_ip(validators)
48+
Enum.each(validators, fn(ip)->
49+
:ets.insert(NODEPeers, {ip, %{ip: ip}})
50+
end)
951
end
1052

1153
def random(no) do
@@ -18,20 +60,6 @@ defmodule NodePeers do
1860
end
1961
end
2062

21-
def clear_stale() do
22-
ts_m = :os.system_time(1000)
23-
:ets.foldl(fn({key, v}, acc)->
24-
lp = v[:last_msg]
25-
#60 minutes
26-
if !v[:static] and !!lp and ts_m > lp+(1_000*60*60) do
27-
acc ++ [key]
28-
else
29-
acc
30-
end
31-
end, [], NODEPeers)
32-
|> Enum.each(& :ets.delete(NODEPeers, &1))
33-
end
34-
3563
def all() do
3664
peers = :ets.tab2list(NODEPeers)
3765
|> Enum.map(& elem(&1,1))
@@ -91,7 +119,7 @@ defmodule NodePeers do
91119
cond do
92120
!peer[:pk] -> false
93121
Application.fetch_env!(:ama, :trainer_pk) == peer.pk -> true
94-
!!lp and ts_m - lp <= 3_000 -> true
122+
!!lp and ts_m - lp <= 6_000 -> true
95123
true -> false
96124
end
97125
end
@@ -109,6 +137,10 @@ defmodule NodePeers do
109137
|> List.first()
110138
end
111139

140+
def ips_by_pk(pk) do
141+
:ets.select(NODEPeers, [{{:"$1", %{pk: pk}}, [], [:"$1"]}])
142+
end
143+
112144
def by_pk(pk) do
113145
ip = :ets.select(NODEPeers, [{{:"$1", %{pk: pk}}, [], [:"$1"]}])
114146
|> List.first()

ex/lib/node/node_state.ex

Lines changed: 70 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,62 @@ defmodule NodeState do
33

44
def init() do
55
%{
6+
challenges: %{},
67
}
78
end
89

10+
#TODO: anti local compute dos here
11+
def handle(:new_phone_who_dis, istate, term) do
12+
anr = NodeANR.verify_and_unpack(term.anr)
13+
if !!anr and is_integer(term.challenge) and istate.peer.ip == anr.ip4 do
14+
sk = Application.fetch_env!(:ama, :trainer_sk)
15+
pk = Application.fetch_env!(:ama, :trainer_pk)
16+
sig = BlsEx.sign!(sk, <<pk::binary, :erlang.integer_to_binary(term.challenge)::binary>>, BLS12AggSig.dst_anr_challenge())
17+
send(NodeGen.get_socket_gen(), {:send_to_some, [istate.peer.ip], compress(NodeProto.what?(term.challenge, sig))})
18+
send(NodeGen, {:handle_sync, :new_phone_who_dis_ns, istate, %{anr: anr}})
19+
end
20+
end
21+
def handle(:new_phone_who_dis_ns, istate, term) do
22+
NodeANR.insert(term.anr)
23+
istate.ns
24+
end
25+
26+
def handle(:what?, istate, term) do
27+
anr = NodeANR.verify_and_unpack(term.anr)
28+
29+
#signed within 6 seconds
30+
ts = :os.system_time(1)
31+
delta = abs(ts - term.challenge)
32+
33+
if !!anr and istate.peer.ip == anr.ip4 and delta <= 6 do
34+
challenge_bin = :erlang.integer_to_binary(term.challenge)
35+
if BlsEx.verify?(anr.pk, term.signature, <<anr.pk::binary, challenge_bin::binary>>, BLS12AggSig.dst_anr_challenge()) do
36+
send(NodeGen, {:handle_sync, :'what?_ns', istate, %{pk: anr.pk, anr: anr}})
37+
end
38+
end
39+
end
40+
def handle(:'what?_ns', istate, term) do
41+
NodeANR.insert(term.anr)
42+
NodeANR.set_handshaked(term.anr.pk)
43+
istate.ns
44+
end
45+
946
def handle(:ping, istate, term) do
1047
temporal = Entry.unpack(term.temporal)
1148
rooted = Entry.unpack(term.rooted)
1249
try do
1350
%{error: :ok, hash: hasht} = Entry.validate_signature(temporal.header, temporal.signature, temporal.header_unpacked.signer, temporal[:mask])
1451
%{error: :ok, hash: hashr} = Entry.validate_signature(rooted.header, rooted.signature, rooted.header_unpacked.signer, rooted[:mask])
1552

53+
hasPermissionSlip = NodeANR.handshaked_and_valid_ip4(istate.peer.signer, istate.peer.ip)
1654
:erlang.spawn(fn()->
17-
#txs_packed = TXPool.random()
18-
#txs_packed && send(NodeGen.get_socket_gen(), {:send_to_some, [istate.peer.ip], compress(NodeProto.txpool(txs_packed))})
19-
20-
rng_peers = NodePeers.random(6) |> Enum.map(& &1.ip)
21-
if rng_peers != [], do: send(NodeGen.get_socket_gen(), {:send_to_some, [istate.peer.ip], compress(NodeProto.peers(rng_peers))})
55+
if hasPermissionSlip do
56+
anrs = NodeANR.get_random_verified(3)
57+
if anrs != [], do: send(NodeGen.get_socket_gen(), {:send_to_some, [istate.peer.ip], compress(NodeProto.peers_v2(anrs))})
58+
end
2259
end)
2360

24-
term = %{temporal: Map.put(temporal, :hash, hasht), rooted: Map.put(rooted, :hash, hashr), ts_m: term.ts_m}
61+
term = %{temporal: Map.put(temporal, :hash, hasht), rooted: Map.put(rooted, :hash, hashr), ts_m: term.ts_m, hasPermissionSlip: hasPermissionSlip}
2562
send(NodeGen, {:handle_sync, :ping_ns, istate, term})
2663
catch
2764
e,{:badmatch, %{error: :wrong_epoch}} -> nil
@@ -30,23 +67,26 @@ defmodule NodeState do
3067
end
3168
def handle(:ping_ns, istate, term) do
3269
peer_ip = istate.peer.ip
33-
peer = :ets.lookup_element(NODEPeers, peer_ip, 2, %{})
34-
peer = Map.merge(peer, %{
35-
ip: peer_ip,
36-
pk: istate.peer.signer, version: istate.peer.version,
37-
last_ping: :os.system_time(1000),
38-
last_msg: :os.system_time(1000),
39-
temporal: term.temporal, rooted: term.rooted,
40-
})
41-
42-
peer = if !!peer[:shared_secret] do peer else
43-
shared_key = BlsEx.get_shared_secret!(istate.peer.signer, Application.fetch_env!(:ama, :trainer_sk))
44-
Map.put(peer, :shared_secret, shared_key)
45-
end
4670

47-
:ets.insert(NODEPeers, {peer_ip, peer})
71+
if term.hasPermissionSlip or (istate.peer.signer in Consensus.trainers_for_height(Consensus.chain_height())) do
72+
peer = :ets.lookup_element(NODEPeers, peer_ip, 2, %{})
73+
peer = Map.merge(peer, %{
74+
ip: peer_ip,
75+
pk: istate.peer.signer, version: istate.peer.version,
76+
last_ping: :os.system_time(1000),
77+
last_msg: :os.system_time(1000),
78+
temporal: term.temporal, rooted: term.rooted,
79+
})
80+
81+
peer = if !!peer[:shared_secret] do peer else
82+
shared_key = BlsEx.get_shared_secret!(istate.peer.signer, Application.fetch_env!(:ama, :trainer_sk))
83+
Map.put(peer, :shared_secret, shared_key)
84+
end
85+
86+
:ets.insert(NODEPeers, {peer_ip, peer})
4887

49-
:erlang.spawn(fn()-> send(NodeGen.get_socket_gen(), {:send_to_some, [peer_ip], compress(NodeProto.pong(term.ts_m))}) end)
88+
:erlang.spawn(fn()-> send(NodeGen.get_socket_gen(), {:send_to_some, [peer_ip], compress(NodeProto.pong(term.ts_m))}) end)
89+
end
5090

5191
istate.ns
5292
end
@@ -73,14 +113,15 @@ defmodule NodeState do
73113
TXPool.insert(good)
74114
end
75115

76-
def handle(:peers, istate, term) do
77-
send(NodeGen, {:handle_sync, :peers_ns, istate, term})
116+
def handle(:peers_v2, istate, term) do
117+
anrs = Enum.map(term.anrs, & NodeANR.verify_and_unpack(&1))
118+
|> Enum.filter(& &1)
119+
term = %{anrs: anrs}
120+
send(NodeGen, {:handle_sync, :peers_v2_ns, istate, term})
78121
end
79-
def handle(:peers_ns, istate, term) do
80-
Enum.each(term.ips, fn(peer_ip)->
81-
peer = :ets.lookup_element(NODEPeers, peer_ip, 2, %{})
82-
peer = Map.merge(peer, %{ip: peer_ip})
83-
:ets.insert(NODEPeers, {peer_ip, peer})
122+
def handle(:peers_v2_ns, istate, term) do
123+
Enum.each(term.anrs, fn(anr)->
124+
NodeANR.insert(anr)
84125
end)
85126
istate.ns
86127
end
@@ -93,10 +134,7 @@ defmodule NodeState do
93134
sol.epoch != Consensus.chain_epoch() ->
94135
#IO.inspect {:broadcasted_sol_invalid_epoch, sol.epoch, Consensus.chain_epoch()}
95136
nil
96-
sol.epoch < 260 and !BIC.Sol.verify(term.sol) ->
97-
IO.inspect {:peer_sent_invalid_sol, :TODO_block_malicious_peer}
98-
nil
99-
sol.epoch >= 260 and !BIC.Sol.verify(term.sol, %{vr_b3: :crypto.strong_rand_bytes(32)}) ->
137+
!BIC.Sol.verify(term.sol, %{vr_b3: :crypto.strong_rand_bytes(32)}) ->
100138
IO.inspect {:peer_sent_invalid_sol, :TODO_block_malicious_peer}
101139
nil
102140
!BlsEx.verify?(sol.pk, sol.pop, sol.pk, BLS12AggSig.dst_pop()) ->

0 commit comments

Comments
 (0)