-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathuser.py
More file actions
41 lines (35 loc) · 1.16 KB
/
user.py
File metadata and controls
41 lines (35 loc) · 1.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import functools
from typing import List, cast
from sift.common.type.v1.user_pb2 import User
from sift.users.v2.users_pb2 import ListActiveUsersRequest, ListActiveUsersResponse
from sift.users.v2.users_pb2_grpc import UserServiceStub
@functools.cache
def get_active_users(
user_service: UserServiceStub,
filter: str,
page_size: int = 1_000,
page_token: str = "",
) -> List[User]:
"""
Get active users from the user service with the given filter.
The filter must be a CEL expression.
"""
users_pb: List[User] = []
req = ListActiveUsersRequest(
filter=filter,
page_size=page_size,
page_token=page_token,
)
res = cast(ListActiveUsersResponse, user_service.ListActiveUsers(req))
users_pb.extend(res.users)
next_page_token = res.next_page_token
while len(next_page_token) > 0:
req = ListActiveUsersRequest(
filter=filter,
page_size=page_size,
page_token=page_token,
)
res = cast(ListActiveUsersResponse, user_service.ListActiveUsers(req))
users_pb.extend(res.users)
next_page_token = res.next_page_token
return users_pb