Skip to content

Commit cd610de

Browse files
committed
Make State a first class object
1 parent 68ba4d5 commit cd610de

File tree

6 files changed

+453
-111
lines changed

6 files changed

+453
-111
lines changed

setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
'simplejson==3.11.1',
1616
'python-dateutil>=2.6.0',
1717
'backoff==1.8.0',
18-
'ciso8601',
18+
'ciso8601',
19+
'typing-extensions'
1920
],
2021
extras_require={
2122
'dev': [

singer/__init__.py

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,7 @@
6161
)
6262
from singer.schema import Schema
6363

64-
from singer.bookmarks import (
65-
write_bookmark,
66-
get_bookmark,
67-
clear_bookmark,
68-
reset_stream,
69-
set_offset,
70-
clear_offset,
71-
get_offset,
72-
set_currently_syncing,
73-
get_currently_syncing,
74-
)
64+
from singer.bookmarks import State
7565

7666
if __name__ == "__main__":
7767
import doctest

singer/bookmarks.py

Lines changed: 96 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,96 @@
1-
def ensure_bookmark_path(state, path):
2-
submap = state
3-
for path_component in path:
4-
if submap.get(path_component) is None:
5-
submap[path_component] = {}
6-
7-
submap = submap[path_component]
8-
return state
9-
10-
def write_bookmark(state, tap_stream_id, key, val):
11-
state = ensure_bookmark_path(state, ['bookmarks', tap_stream_id])
12-
state['bookmarks'][tap_stream_id][key] = val
13-
return state
14-
15-
def clear_bookmark(state, tap_stream_id, key):
16-
state = ensure_bookmark_path(state, ['bookmarks', tap_stream_id])
17-
state['bookmarks'][tap_stream_id].pop(key, None)
18-
return state
19-
20-
def reset_stream(state, tap_stream_id):
21-
state = ensure_bookmark_path(state, ['bookmarks', tap_stream_id])
22-
state['bookmarks'][tap_stream_id] = {}
23-
return state
24-
25-
def get_bookmark(state, tap_stream_id, key, default=None):
26-
return state.get('bookmarks', {}).get(tap_stream_id, {}).get(key, default)
27-
28-
def set_offset(state, tap_stream_id, offset_key, offset_value):
29-
state = ensure_bookmark_path(state, ['bookmarks', tap_stream_id, "offset", offset_key])
30-
state['bookmarks'][tap_stream_id]["offset"][offset_key] = offset_value
31-
return state
32-
33-
def clear_offset(state, tap_stream_id):
34-
state = ensure_bookmark_path(state, ['bookmarks', tap_stream_id, "offset"])
35-
state['bookmarks'][tap_stream_id]["offset"] = {}
36-
return state
37-
38-
def get_offset(state, tap_stream_id, default=None):
39-
return state.get('bookmarks', {}).get(tap_stream_id, {}).get("offset", default)
40-
41-
def set_currently_syncing(state, tap_stream_id):
42-
state['currently_syncing'] = tap_stream_id
43-
return state
44-
45-
def get_currently_syncing(state, default=None):
46-
return state.get('currently_syncing', default)
1+
import json
2+
import sys
3+
from typing import Any, Dict, Optional, Sequence, Union
4+
from .logger import get_logger
5+
6+
7+
LOGGER = get_logger()
8+
9+
def write_state(state):
10+
json.dump(state.to_dict(), sys.stdout, indent=2)
11+
12+
class State:
13+
def __init__(
14+
self, bookmarks: Optional[Dict] = None, currently_syncing: Optional[str] = None
15+
) -> None:
16+
self._bookmarks = bookmarks or {}
17+
self._currently_syncing = currently_syncing
18+
19+
def __str__(self) -> str:
20+
return str(self.__dict__)
21+
22+
def __eq__(self, other: Any) -> bool:
23+
return self.__dict__ == other.__dict__
24+
25+
@property
26+
def bookmarks(self) -> Dict:
27+
return self._bookmarks
28+
29+
@classmethod
30+
def load(cls, filename: str) -> "State":
31+
with open(filename) as fp: # pylint: disable=invalid-name
32+
return State.from_dict(json.load(fp))
33+
34+
@classmethod
35+
def from_dict(cls, data: Dict) -> "State":
36+
return State(
37+
bookmarks=data.get("bookmarks"),
38+
currently_syncing=data.get("currently_syncing"),
39+
)
40+
41+
def to_dict(self) -> Dict:
42+
state: Dict[str, Any] = {"bookmarks": self.bookmarks}
43+
if self.get_currently_syncing():
44+
state["currently_syncing"] = self.get_currently_syncing()
45+
return state
46+
47+
def dump(self) -> None:
48+
json.dump(self.to_dict(), sys.stdout, indent=2)
49+
50+
def _ensure_bookmark_path(self, path: Sequence) -> None:
51+
submap = self.bookmarks
52+
for path_component in path:
53+
if submap.get(path_component) is None:
54+
submap[path_component] = {}
55+
56+
submap = submap[path_component]
57+
58+
def write_bookmark(self, tap_stream_id: str, key: str, val: Any) -> None:
59+
self._ensure_bookmark_path((tap_stream_id,))
60+
self.bookmarks[tap_stream_id][key] = val
61+
62+
def clear_bookmark(self, tap_stream_id: str, key: str) -> None:
63+
self._ensure_bookmark_path((tap_stream_id,))
64+
self.bookmarks[tap_stream_id].pop(key, None)
65+
66+
def reset_stream(self, tap_stream_id: str) -> None:
67+
self._ensure_bookmark_path((tap_stream_id,))
68+
self.bookmarks[tap_stream_id] = {}
69+
70+
def get_bookmark(self, tap_stream_id: str, key: str, default: Any = None) -> Any:
71+
return self.bookmarks.get(tap_stream_id, {}).get(key, default)
72+
73+
def set_offset(
74+
self, tap_stream_id: str, offset_key: str, offset_value: Any
75+
) -> None:
76+
self._ensure_bookmark_path((tap_stream_id, "offset", offset_key))
77+
self.bookmarks[tap_stream_id]["offset"][offset_key] = offset_value
78+
79+
def clear_offset(self, tap_stream_id: str) -> None:
80+
self._ensure_bookmark_path((tap_stream_id, "offset"))
81+
self.bookmarks[tap_stream_id]["offset"] = {}
82+
83+
def get_offset(
84+
self, tap_stream_id: str, offset_key: str, default: Any = None
85+
) -> Any:
86+
return (
87+
self.bookmarks.get(tap_stream_id, {})
88+
.get("offset", {})
89+
.get(offset_key, default)
90+
)
91+
92+
def get_currently_syncing(self, default: Optional[str] = None) -> Optional[str]:
93+
return self._currently_syncing or default
94+
95+
def set_currently_syncing(self, value: Union[str, None]) -> None:
96+
self._currently_syncing = value

singer/catalog.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import sys
44

55
from . import metadata as metadata_module
6-
from .bookmarks import get_currently_syncing
76
from .logger import get_logger
87
from .schema import Schema
98

@@ -132,7 +131,7 @@ def get_stream(self, tap_stream_id):
132131
return None
133132

134133
def _shuffle_streams(self, state):
135-
currently_syncing = get_currently_syncing(state)
134+
currently_syncing = state.get_currently_syncing()
136135

137136
if currently_syncing is None:
138137
return self.streams

0 commit comments

Comments
 (0)