-
Notifications
You must be signed in to change notification settings - Fork 228
Expand file tree
/
Copy pathsample_kclpy_app.py
More file actions
executable file
·180 lines (153 loc) · 8.38 KB
/
sample_kclpy_app.py
File metadata and controls
executable file
·180 lines (153 loc) · 8.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
#!/usr/bin/env python
# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
from __future__ import print_function
import sys
import time
from amazon_kclpy import kcl
from amazon_kclpy.v2 import processor
class RecordProcessor(processor.RecordProcessorBase):
"""
A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:
* initialize will be called once
* process_records will be called zero or more times
* shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
a scaling change.
"""
def __init__(self):
self._CHECKPOINT_SLEEP_SECONDS = 5
self._CHECKPOINT_RETRIES = 5
self._CHECKPOINT_FREQ_SECONDS = 60
self._largest_seq = (None, None)
self._largest_sub_seq = None
self._last_checkpoint_time = None
def initialize(self, initialize_input):
"""
Called once by a KCLProcess before any calls to process_records
:param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record
processor has been assigned.
"""
self._largest_seq = (None, None)
self._last_checkpoint_time = time.time()
def process_records(self, process_records_input):
"""
Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
from the records to indicate where in the stream to checkpoint.
:param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
records.
"""
try:
for record in process_records_input.records:
data = record.binary_data
seq = int(record.sequence_number)
sub_seq = record.sub_sequence_number
key = record.partition_key
self._process_record(data, key, seq, sub_seq)
if self._should_update_sequence(seq, sub_seq):
self._largest_seq = (seq, sub_seq)
#
# Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds
#
if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
self._checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
self._last_checkpoint_time = time.time()
except Exception as e:
sys.stderr.write("Encountered an exception while processing records. Exception was {e}\n".format(e=e))
def _process_record(self, data, partition_key, sequence_number, sub_sequence_number):
"""
Called for each record that is passed to process_records.
:param str data: The blob of data that was contained in the record.
:param str partition_key: The key associated with this recod.
:param int sequence_number: The sequence number associated with this record.
:param int sub_sequence_number: the sub sequence number associated with this record.
"""
####################################
# Insert your processing logic here
####################################
return
def _checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None):
"""
Checkpoints with retries on retryable exceptions.
:param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records
or shutdown
:param str or None sequence_number: the sequence number to checkpoint at.
:param int or None sub_sequence_number: the sub sequence number to checkpoint at.
"""
for n in range(0, self._CHECKPOINT_RETRIES):
try:
checkpointer.checkpoint(sequence_number, sub_sequence_number)
return
except kcl.CheckpointError as e:
if 'ShutdownException' == e.value:
#
# A ShutdownException indicates that this record processor should be shutdown. This is due to
# some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
#
print('Encountered shutdown exception, skipping checkpoint')
return
elif 'ThrottlingException' == e.value:
#
# A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
# dynamo writes. We will sleep temporarily to let it recover.
#
if self._CHECKPOINT_RETRIES - 1 == n:
sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
return
else:
print('Was throttled while checkpointing, will attempt again in {s} seconds'
.format(s=self._CHECKPOINT_SLEEP_SECONDS))
elif 'InvalidStateException' == e.value:
sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
else: # Some other error
sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
time.sleep(self._CHECKPOINT_SLEEP_SECONDS)
def _should_update_sequence(self, sequence_number, sub_sequence_number):
"""
Determines whether a new larger sequence number is available
:param int sequence_number: the sequence number from the current record
:param int sub_sequence_number: the sub sequence number from the current record
:return boolean: true if the largest sequence should be updated, false otherwise
"""
return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \
(sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1])
def shutdown(self, shutdown_input):
"""
Called by a KCLProcess instance to indicate that this record processor should shutdown. After this is called,
there will be no more calls to any other methods of this record processor.
As part of the shutdown process you must inspect :attr:`amazon_kclpy.messages.ShutdownInput.reason` to
determine the steps to take.
* Shutdown Reason ZOMBIE:
**ATTEMPTING TO CHECKPOINT ONCE A LEASE IS LOST WILL FAIL**
A record processor will be shutdown if it loses its lease. In this case the KCL will terminate the
record processor. It is not possible to checkpoint once a record processor has lost its lease.
* Shutdown Reason TERMINATE:
**THE RECORD PROCESSOR MUST CHECKPOINT OR THE KCL WILL BE UNABLE TO PROGRESS**
A record processor will be shutdown once it reaches the end of a shard. A shard ending indicates that
it has been either split into multiple shards or merged with another shard. To begin processing the new
shard(s) it's required that a final checkpoint occurs.
:param amazon_kclpy.messages.ShutdownInput shutdown_input: Information related to the shutdown request
"""
try:
if shutdown_input.reason == 'TERMINATE':
# Checkpointing with no parameter will checkpoint at the
# largest sequence number reached by this processor on this
# shard id
print('Was told to terminate, will attempt to checkpoint.')
self.checkpoint(shutdown_input.checkpointer, None)
else: # reason == 'ZOMBIE'
print('Shutting down due to failover. Will not checkpoint.')
except:
pass
if __name__ == "__main__":
kcl_process = kcl.KCLProcess(RecordProcessor())
kcl_process.run()