-
Notifications
You must be signed in to change notification settings - Fork 73
Expand file tree
/
Copy pathqueue.py
More file actions
152 lines (123 loc) · 4.83 KB
/
Copy pathqueue.py
File metadata and controls
152 lines (123 loc) · 4.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import collections.abc
import datetime
from typing import List, Dict, Any, Union, Optional
from azure.functions import _abc as azf_abc
from azure.functions import _queue as azf_queue
from . import meta
from ._jsonutils import json
class QueueMessage(azf_queue.QueueMessage):
"""A Queue message object."""
def __init__(self, *,
id=None, body=None,
dequeue_count=None,
expiration_time=None,
insertion_time=None,
time_next_visible=None,
pop_receipt=None):
super().__init__(id=id, body=body, pop_receipt=pop_receipt)
self.__dequeue_count = dequeue_count
self.__expiration_time = expiration_time
self.__insertion_time = insertion_time
self.__time_next_visible = time_next_visible
@property
def dequeue_count(self):
return self.__dequeue_count
@property
def expiration_time(self):
return self.__expiration_time
@property
def insertion_time(self):
return self.__insertion_time
@property
def time_next_visible(self):
return self.__time_next_visible
def __repr__(self) -> str:
return (
f'<azure.QueueMessage id={self.id} '
f'dequeue_count={self.dequeue_count} '
f'insertion_time={self.insertion_time} '
f'expiration_time={self.expiration_time} '
f'at 0x{id(self):0x}>'
)
class QueueMessageInConverter(meta.InConverter,
binding='queueTrigger', trigger=True):
@classmethod
def check_input_type_annotation(cls, pytype: type) -> bool:
return issubclass(pytype, azf_abc.QueueMessage)
@classmethod
def decode(cls, data: meta.Datum, *,
trigger_metadata) -> Any:
data_type = data.type
if data_type in ['string', 'bytes']:
body = data.value
else:
raise NotImplementedError(
f'unsupported queue payload type: {data_type}')
if trigger_metadata is None:
raise NotImplementedError(
'missing trigger metadata for queue input')
return QueueMessage(
id=cls._decode_trigger_metadata_field(
trigger_metadata, 'Id', python_type=str),
body=body,
dequeue_count=cls._decode_trigger_metadata_field(
trigger_metadata, 'DequeueCount', python_type=int),
expiration_time=cls._parse_datetime_metadata(
trigger_metadata, 'ExpirationTime'),
insertion_time=cls._parse_datetime_metadata(
trigger_metadata, 'InsertionTime'),
time_next_visible=cls._parse_datetime_metadata(
trigger_metadata, 'NextVisibleTime'),
pop_receipt=cls._decode_trigger_metadata_field(
trigger_metadata, 'PopReceipt', python_type=str)
)
class QueueMessageOutConverter(meta.OutConverter, binding='queue'):
@classmethod
def check_output_type_annotation(cls, pytype: type) -> bool:
valid_types = (azf_abc.QueueMessage, str, bytes)
return (
meta.is_iterable_type_annotation(pytype, valid_types)
or (isinstance(pytype, type) and issubclass(pytype, valid_types))
)
@classmethod
def encode(cls, obj: Any, *,
expected_type: Optional[type]) -> meta.Datum:
if isinstance(obj, str):
return meta.Datum(type='string', value=obj)
elif isinstance(obj, bytes):
return meta.Datum(type='bytes', value=obj)
elif isinstance(obj, azf_queue.QueueMessage):
return meta.Datum(
type='json',
value=json.dumps({
'id': obj.id,
'body': obj.get_body().decode('utf-8'),
})
)
elif isinstance(obj, collections.abc.Iterable):
msgs: List[Union[str, Dict]] = []
for item in obj:
if isinstance(item, str):
msgs.append(item)
elif isinstance(item, azf_queue.QueueMessage):
msgs.append({
'id': item.id,
'body': item.get_body().decode('utf-8')
})
else:
raise NotImplementedError(
'invalid data type in output '
'queue message list: {}'.format(type(item)))
return meta.Datum(
type='json',
value=json.dumps(msgs)
)
raise NotImplementedError
@classmethod
def _format_datetime(cls, dt: Optional[datetime.datetime]):
if dt is None:
return None
else:
return dt.isoformat()