-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathprotocol.py
More file actions
40 lines (32 loc) · 1.28 KB
/
protocol.py
File metadata and controls
40 lines (32 loc) · 1.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
"""Protocol for contacting the coordinator to make coordinator calls."""
import json
from typing import Dict, NamedTuple, Iterable, Optional
from .chk_manager import NULL_CHK_ID
from .consts import CheckpointID, Pid, Seqno
ParamDict = Dict[str, object]
class FinalizedCoordinatorCall(NamedTuple):
"""Represents the request sent to the coordinator for a single coordinator call."""
seqno: Seqno
op: str
params: ParamDict
class Request(NamedTuple):
"""Represents a request to send to the coordinator when making a batch of coordinator calls."""
pid: Pid
seqno: Seqno
chk_id: CheckpointID
calls: Iterable[FinalizedCoordinatorCall]
blocked: bool = False # True if the lambda has terminated because it's blocked on a coordinator call.
err: Optional[str] = None
def __str__(self) -> str:
return json.dumps({
"pid": self.pid,
"seqno": self.seqno,
"chk_id": self.chk_id,
"calls": [call._asdict() for call in self.calls],
"blocked": self.blocked,
"err": self.err,
})
@staticmethod
def make_blocked(pid: Pid, seqno: Seqno) -> "Request":
"""Returns an empty request."""
return Request(pid=pid, seqno=seqno, chk_id=NULL_CHK_ID, calls=[], blocked=True)