-
Notifications
You must be signed in to change notification settings - Fork 45
Expand file tree
/
Copy pathtools.py
More file actions
27 lines (20 loc) · 746 Bytes
/
tools.py
File metadata and controls
27 lines (20 loc) · 746 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from typing import Any, Callable, Dict, Iterable, Optional
import dpath
from airbyte_cdk.models import AirbyteStream
def get_first(
iterable: Iterable[Any], predicate: Callable[[Any], bool] = lambda m: True
) -> Optional[Any]:
return next(filter(predicate, iterable), None)
def get_defined_id(stream: AirbyteStream, data: Dict[str, Any]) -> Optional[str]:
if not stream.source_defined_primary_key:
return None
primary_key = []
for key in stream.source_defined_primary_key:
try:
primary_key.append(str(dpath.get(data, key)))
except KeyError:
primary_key.append("__not_found__")
return "_".join(primary_key)