-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathenvelope.go
More file actions
148 lines (122 loc) · 3.42 KB
/
envelope.go
File metadata and controls
148 lines (122 loc) · 3.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package rate_envelope_queue
import (
"context"
"errors"
"time"
)
type Envelope struct {
id uint64
_type string
interval time.Duration
deadline time.Duration
// ----- хуки, который разрешают вмешиваться в процесс обработки конверта и остановить его через ошибку ErrStopEnvelope
beforeHook func(ctx context.Context, envelope *Envelope) error
invoke func(ctx context.Context, envelope *Envelope) error
afterHook func(ctx context.Context, envelope *Envelope) error
// -----
// хук, которй позволяет динамически реагировать по решению пользователя на поведение конверта при ошибке
failureHook func(ctx context.Context, envelope *Envelope, err error) Decision
successHook func(ctx context.Context, envelope *Envelope)
stamps []Stamp // per-envelope stamps
payload interface{}
}
func NewDynamicEnvelope(deadline time.Duration, invoke Invoker, payload interface{}) (*Envelope, error) {
envelope, err := NewEnvelope(
WithDeadline(deadline),
WithInvoke(invoke),
WithScheduleModeInterval(0),
WithPayload(payload),
)
return envelope, err
}
func NewScheduleEnvelope(interval, deadline time.Duration, invoke Invoker, payload interface{}) (*Envelope, error) {
envelope, err := NewEnvelope(
WithDeadline(deadline),
WithInvoke(invoke),
WithScheduleModeInterval(interval),
WithPayload(payload),
)
return envelope, err
}
func NewEnvelope(opt ...func(*Envelope)) (*Envelope, error) {
envelope := &Envelope{}
for _, o := range opt {
o(envelope)
}
// validate required fields
if envelope.invoke == nil {
return nil, errors.New("envelope invoke is nil")
}
return envelope, nil
}
func (envelope *Envelope) GetId() uint64 {
return envelope.id
}
func (envelope *Envelope) GetType() string {
return envelope._type
}
func (envelope *Envelope) GetStamps() []Stamp {
return envelope.stamps
}
func (envelope *Envelope) GetPayload() interface{} {
return envelope.payload
}
func (envelope *Envelope) UpdatePayload(p interface{}) {
envelope.payload = p
}
func WithStampsPerEnvelope(stamps ...Stamp) func(*Envelope) {
return func(e *Envelope) {
e.stamps = append(e.stamps, stamps...)
}
}
func WithBeforeHook(hook Invoker) func(*Envelope) {
return func(e *Envelope) {
e.beforeHook = hook
}
}
func WithAfterHook(hook Invoker) func(*Envelope) {
return func(e *Envelope) {
e.afterHook = hook
}
}
func WithInvoke(invoke Invoker) func(*Envelope) {
return func(e *Envelope) {
e.invoke = invoke
}
}
func WithFailureHook(hook func(ctx context.Context, envelope *Envelope, err error) Decision) func(*Envelope) {
return func(e *Envelope) {
e.failureHook = hook
}
}
func WithSuccessHook(hook func(ctx context.Context, envelope *Envelope)) func(*Envelope) {
return func(e *Envelope) {
e.successHook = hook
}
}
// WithScheduleModeInterval 0 means run once, not a schedule
func WithScheduleModeInterval(d time.Duration) func(*Envelope) {
return func(e *Envelope) {
e.interval = d
}
}
func WithDeadline(d time.Duration) func(*Envelope) {
return func(e *Envelope) {
e.deadline = d
}
}
func WithType(t string) func(*Envelope) {
return func(e *Envelope) {
e._type = t
}
}
func WithId(id uint64) func(*Envelope) {
return func(e *Envelope) {
e.id = id
}
}
func WithPayload(p interface{}) func(*Envelope) {
return func(e *Envelope) {
e.payload = p
}
}