Skip to content

Commit fb70608

Browse files
authored
Merge pull request #3 from PavelAgarkov/pre-release
rebuild architecture of package
2 parents ac0a4e5 + 0ef52ab commit fb70608

6 files changed

Lines changed: 160 additions & 139 deletions

File tree

README.md

Lines changed: 101 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,26 @@
11
# rate-envelope-queue
22

3-
Лёгкий пакет для управления пулом задач (**envelopes**) поверх `k8s.io/client-go/util/workqueue` с ограничением параллелизма, ретраями, периодическим планированием, **stamps (middleware)** и хуками до/после выполнения.
3+
Лёгкий пакет для управления пулом задач (**envelopes**) поверх `k8s.io/client-go/util/workqueue` с ограничением
4+
параллелизма, ретраями, периодическим планированием, **stamps (middleware)** и хуками до/после выполнения.
45

5-
> Основано на `workqueue` из client-go: очередь дедуплицирует одинаковые элементы (один и тот же **указатель**) и поддерживает rate-limiting / отложенное перепланирование.
6+
> Основано на `workqueue` из client-go: очередь дедуплицирует одинаковые элементы (один и тот же **указатель**) и
7+
> поддерживает rate-limiting / отложенное перепланирование.
68
79
---
810

911
## Что нового в API
1012

1113
- **Builder-подход для Envelope** — поля неэкспортируемые, настройка через `NewEnvelope(opts...)` и `With*`-опции:
1214
```go
13-
e := pkg.NewEnvelope(
14-
pkg.WithId(1),
15-
pkg.WithType("email"),
16-
pkg.WithInterval(5*time.Second),
17-
pkg.WithDeadline(3*time.Second),
18-
pkg.WithBeforeHook(func(ctx context.Context, e *pkg.Envelope) error { return nil }),
19-
pkg.WithInvoke(func(ctx context.Context) error { return nil }),
20-
pkg.WithAfterHook(func(ctx context.Context, e *pkg.Envelope) error { return nil }),
21-
pkg.WithStampsPerEnvelope(/* per-envelope stamps */),
15+
e := NewEnvelope(
16+
WithId(1),
17+
WithType("email"),
18+
WithInterval(5*time.Second),
19+
WithDeadline(3*time.Second),
20+
WithBeforeHook(func(ctx context.Context, e *Envelope) error { return nil }),
21+
WithInvoke(func(ctx context.Context) error { return nil }),
22+
WithAfterHook(func(ctx context.Context, e *Envelope) error { return nil }),
23+
WithStampsPerEnvelope(/* per-envelope stamps */),
2224
)
2325
```
2426
Для чтения используйте геттеры: `GetId()`, `GetType()`, `GetStamps()`.
@@ -28,7 +30,8 @@
2830
- Per-envelope — через `WithStampsPerEnvelope(...)` в `NewEnvelope(...)`.
2931
- Порядок исполнения: **сначала глобальные, затем per-envelope** (глобальные — внешние).
3032

31-
- **Тайм‑бюджеты для хуков** в `BeforeAfterStamp`: по умолчанию рекомендуем `frac=0.5` и `min=800ms``max(50% от deadline, 800ms)`.
33+
- **Тайм‑бюджеты для хуков** в `BeforeAfterStamp`: по умолчанию рекомендуем `frac=0.5` и `min=800ms`
34+
`max(50% от deadline, 800ms)`.
3235

3336
---
3437

@@ -39,10 +42,12 @@
3942
- **Дедлайны**: `deadline > 0` ограничивает время выполнения `invoke` (оборачивается таймаутом в воркере).
4043
- **Хуки**: `beforeHook` / `afterHook` с отдельным тайм-бюджетом (через `BeforeAfterStamp(WithHookTimeout)`).
4144
- **Stamps (middleware)**: глобальные и per-envelope; компонуются в цепочку (**chain**).
42-
- **Остановка типа**: `ErrStopEnvelope` из любого места (`beforeHook`/`invoke`/`afterHook`) кладёт `_type` в **blacklist**.
45+
- **Остановка типа**: `ErrStopEnvelope` из любого места (`beforeHook`/`invoke`/`afterHook`) кладёт `_type` в **blacklist
46+
**.
4347
- **Backoff/ретраи**: дефолтный лимитер = `MaxOf(Exponential(1s..30s), TokenBucket(5 rps, burst=10))`.
4448
- **Грациозная остановка**: режимы `Drain`/`Stop`.
45-
- **Безопасность при паниках**: паника внутри обработки **конверта**`Forget+Done` и лог стека; паника воркера также перехватывается.
49+
- **Безопасность при паниках**: паника внутри обработки **конверта**`Forget+Done` и лог стека; паника воркера также
50+
перехватывается.
4651

4752
---
4853

@@ -58,11 +63,11 @@
5863
## Установка
5964

6065
```bash
61-
go get github.com/PavelAgarkov/rate-pool/pkg
66+
go get github.com/PavelAgarkov/rate-envelope-queue
6267
```
6368

6469
```go
65-
import "github.com/PavelAgarkov/rate-pool/pkg"
70+
import "github.com/PavelAgarkov/rate-envelope-queue"
6671
```
6772

6873
---
@@ -73,47 +78,47 @@ import "github.com/PavelAgarkov/rate-pool/pkg"
7378
ctx, cancel := context.WithCancel(context.Background())
7479
defer cancel()
7580

76-
q := pkg.NewRateEnvelopeQueue(
77-
pkg.WithLimitOption(3), // 3 воркера
78-
pkg.WithWaitingOption(true), // ждать завершения горутин при Stop()
79-
pkg.WithStopModeOption(pkg.Drain),
80-
pkg.WithStamps( // глобальные stamps (внешние)
81-
pkg.BeforeAfterStamp(pkg.WithHookTimeout), // 50% от deadline, минимум 800ms
82-
pkg.LoggingStamp(log.Default()),
83-
),
81+
q := NewRateEnvelopeQueue(
82+
WithLimitOption(3), // 3 воркера
83+
WithWaitingOption(true), // ждать завершения горутин при Stop()
84+
WithStopModeOption(Drain),
85+
WithStamps( // глобальные stamps (внешние)
86+
BeforeAfterStamp(WithHookTimeout), // 50% от deadline, минимум 800ms
87+
LoggingStamp(log.Default()),
88+
),
8489
)
8590

86-
email := pkg.NewEnvelope(
87-
pkg.WithId(1),
88-
pkg.WithType("email"),
89-
pkg.WithInterval(5*time.Second),
90-
pkg.WithDeadline(3*time.Second),
91-
pkg.WithBeforeHook(func(ctx context.Context, e *pkg.Envelope) error {
92-
fmt.Println("before:", e.GetId(), time.Now())
93-
return nil
94-
}),
95-
pkg.WithInvoke(func(ctx context.Context) error {
96-
// имитируем работу; уважайте ctx.Done()
97-
time.Sleep(5 * time.Second)
98-
fmt.Println("invoke email", time.Now())
99-
return nil
100-
}),
101-
pkg.WithAfterHook(func(ctx context.Context, e *pkg.Envelope) error {
102-
fmt.Println("after:", e.GetId(), time.Now())
103-
// Остановим дальнейшие email
104-
return pkg.ErrStopEnvelope
105-
}),
91+
email := NewEnvelope(
92+
WithId(1),
93+
WithType("email"),
94+
WithInterval(5*time.Second),
95+
WithDeadline(3*time.Second),
96+
WithBeforeHook(func (ctx context.Context, e *Envelope) error {
97+
fmt.Println("before:", e.GetId(), time.Now())
98+
return nil
99+
}),
100+
WithInvoke(func (ctx context.Context) error {
101+
// имитируем работу; уважайте ctx.Done()
102+
time.Sleep(5 * time.Second)
103+
fmt.Println("invoke email", time.Now())
104+
return nil
105+
}),
106+
WithAfterHook(func (ctx context.Context, e *Envelope) error {
107+
fmt.Println("after:", e.GetId(), time.Now())
108+
// Остановим дальнейшие email
109+
return ErrStopEnvelope
110+
}),
106111
)
107112

108-
metrics := pkg.NewEnvelope(
109-
pkg.WithId(2),
110-
pkg.WithType("metrics"),
111-
pkg.WithInterval(3*time.Second),
112-
pkg.WithDeadline(1*time.Second),
113-
pkg.WithInvoke(func(ctx context.Context) error {
114-
fmt.Println("metrics tick", time.Now())
115-
return nil
116-
}),
113+
metrics := NewEnvelope(
114+
WithId(2),
115+
WithType("metrics"),
116+
WithInterval(3*time.Second),
117+
WithDeadline(1*time.Second),
118+
WithInvoke(func (ctx context.Context) error {
119+
fmt.Println("metrics tick", time.Now())
120+
return nil
121+
}),
117122
)
118123

119124
q.Start(ctx)
@@ -129,17 +134,18 @@ q.Stop()
129134

130135
## Поведение очереди
131136

132-
| Сценарий | Действие очереди |
133-
|------------------------------------------------------------------|----------------------------------------------------------------------------------|
134-
| `invoke` вернул `nil` | `Forget`; если `interval > 0``AddAfter(interval)` |
135-
| Контекст задачи истёк/отменён (`DeadlineExceeded`/`Canceled`) | `Forget`; если периодическая → `AddAfter(interval)` |
136-
| `ErrStopEnvelope` (из `beforeHook`/`invoke`/`afterHook`) | `Forget` + поместить `_type` в **blacklist** |
137-
| Ошибка в `beforeHook` (не `ErrStopEnvelope`) | Периодические: `AddRateLimited`; одноразовые: `Forget` |
138-
| Ошибка в `invoke` (не `ErrStopEnvelope`) | Периодические: `AddRateLimited`; одноразовые: `Forget` |
139-
| Ошибка в `afterHook` (не `ErrStopEnvelope`) | Возвращается наверх → те же правила, что и для обычной ошибки |
140-
| Паника внутри обработки элемента | Элемент `Forget+Done`, стек логируется; воркер продолжает работу |
137+
| Сценарий | Действие очереди |
138+
|---------------------------------------------------------------|------------------------------------------------------------------|
139+
| `invoke` вернул `nil` | `Forget`; если `interval > 0``AddAfter(interval)` |
140+
| Контекст задачи истёк/отменён (`DeadlineExceeded`/`Canceled`) | `Forget`; если периодическая → `AddAfter(interval)` |
141+
| `ErrStopEnvelope` (из `beforeHook`/`invoke`/`afterHook`) | `Forget` + поместить `_type` в **blacklist** |
142+
| Ошибка в `beforeHook` (не `ErrStopEnvelope`) | Периодические: `AddRateLimited`; одноразовые: `Forget` |
143+
| Ошибка в `invoke` (не `ErrStopEnvelope`) | Периодические: `AddRateLimited`; одноразовые: `Forget` |
144+
| Ошибка в `afterHook` (не `ErrStopEnvelope`) | Возвращается наверх → те же правила, что и для обычной ошибки |
145+
| Паника внутри обработки элемента | Элемент `Forget+Done`, стек логируется; воркер продолжает работу |
141146

142-
> Валидация: для периодических задач `deadline` **не должен превышать** `interval` — иначе `ErrAdditionEnvelopeToQueueBadIntervals`.
147+
> Валидация: для периодических задач `deadline` **не должен превышать** `interval` — иначе
148+
`ErrAdditionEnvelopeToQueueBadIntervals`.
143149

144150
---
145151

@@ -150,11 +156,13 @@ Stamps — это обёртки вокруг `Invoker` (обработчика
150156
- **Глобальные stamps** — задаются на очередь через `WithStamps(...)`.
151157
- **Per-envelope stamps** — через `WithStampsPerEnvelope(...)` в `NewEnvelope(...)`.
152158

153-
Порядок: глобальные идут **первее** и становятся **внешними** (самыми «оборачивающими»), затем per-envelope — **внутренние**.
159+
Порядок: глобальные идут **первее** и становятся **внешними** (самыми «оборачивающими»), затем per-envelope — *
160+
*внутренние**.
154161

155162
### Встроенные stamps
156163

157-
- `BeforeAfterStamp(withTimeout)` — исполняет `beforeHook` и `afterHook` с отдельными тайм-бюджетами; любые ошибки, кроме `ErrStopEnvelope`, **возвращаются** наверх. Рекомендуемая функция тайм-бюджета:
164+
- `BeforeAfterStamp(withTimeout)` — исполняет `beforeHook` и `afterHook` с отдельными тайм-бюджетами; любые ошибки,
165+
кроме `ErrStopEnvelope`, **возвращаются** наверх. Рекомендуемая функция тайм-бюджета:
158166
`WithHookTimeout(ctx, base=deadline, frac=0.5, min=800ms)``max(50% от deadline, 800ms)`.
159167
- `LoggingStamp(l *log.Logger)` — логирует длительность и ошибку обработки конверта.
160168

@@ -165,55 +173,58 @@ Stamps — это обёртки вокруг `Invoker` (обработчика
165173
### Конструктор и геттеры
166174

167175
```go
168-
e := pkg.NewEnvelope(opts...)
176+
e := NewEnvelope(opts...)
169177

170-
id := e.GetId()
178+
id := e.GetId()
171179
name := e.GetType()
172-
st := e.GetStamps()
180+
st := e.GetStamps()
173181
```
174182

175183
### Опции `Envelope`
176184

177185
```go
178-
pkg.WithId(id uint64)
179-
pkg.WithType(t string)
180-
pkg.WithInterval(d time.Duration) // 0 = одноразовая задача
181-
pkg.WithDeadline(d time.Duration) // 0 = без таймаута
182-
pkg.WithBeforeHook(func(ctx context.Context, e *Envelope) error)
183-
pkg.WithInvoke(func(ctx context.Context) error)
184-
pkg.WithAfterHook(func(ctx context.Context, e *Envelope) error)
185-
pkg.WithStampsPerEnvelope(stamps ...Stamp)
186+
WithId(id uint64)
187+
WithType(t string)
188+
WithInterval(d time.Duration) // 0 = одноразовая задача
189+
WithDeadline(d time.Duration) // 0 = без таймаута
190+
WithBeforeHook(func(ctx context.Context, e *Envelope) error)
191+
WithInvoke(func (ctx context.Context) error)
192+
WithAfterHook(func (ctx context.Context, e *Envelope) error)
193+
WithStampsPerEnvelope(stamps ...Stamp)
186194
```
187195

188196
### Опции очереди
189197

190198
```go
191-
pkg.WithLimitOption(n) // число воркеров (>0)
192-
pkg.WithWaitingOption(true|false) // ждать ли завершения воркеров в Stop()
193-
pkg.WithStopModeOption(pkg.Drain|pkg.Stop)
194-
pkg.WithLimiterOption(customLimiter) // если не задан — дефолтный
195-
pkg.WithWorkqueueConfigOption(conf) // конфиг workqueue
196-
pkg.WithStamps(stamps...) // глобальные stamps
199+
WithLimitOption(n) // число воркеров (>0)
200+
WithWaitingOption(true|false) // ждать ли завершения воркеров в Stop()
201+
WithStopModeOption(Drain|Stop)
202+
WithLimiterOption(customLimiter) // если не задан — дефолтный
203+
WithWorkqueueConfigOption(conf) // конфиг workqueue
204+
WithStamps(stamps...) // глобальные stamps
197205
```
198206

199207
### Ошибки
200208

201209
```go
202-
pkg.ErrStopEnvelope // поместить `_type` в blacklist
203-
pkg.ErrEnvelopeInBlacklist // попытка добавить тип из blacklist
204-
pkg.ErrEnvelopeQueueIsNotRunning // Add до Start/после Stop
205-
pkg.ErrAdditionEnvelopeToQueueBadFields // пустой тип / nil invoke / отрицательные интервалы
206-
pkg.ErrAdditionEnvelopeToQueueBadIntervals // deadline > interval для периодических
210+
ErrStopEnvelope // поместить `_type` в blacklist
211+
ErrEnvelopeInBlacklist // попытка добавить тип из blacklist
212+
ErrEnvelopeQueueIsNotRunning // Add до Start/после Stop
213+
ErrAdditionEnvelopeToQueueBadFields // пустой тип / nil invoke / отрицательные интервалы
214+
ErrAdditionEnvelopeToQueueBadIntervals // deadline > interval для периодических
207215
```
208216

209217
---
210218

211219
## Эксплуатационные заметки
212220

213-
- **Один объект — один запуск**: текущая реализация рассчитана на одноразовый жизненный цикл `Start/Stop`. Для повторного использования создайте **новый объект** очереди.
214-
- **Дедупликация**: для указателей — по адресу. Не «переиспользуйте» один и тот же указатель для разных логических задач.
221+
- **Один объект — один запуск**: текущая реализация рассчитана на одноразовый жизненный цикл `Start/Stop`. Для
222+
повторного использования создайте **новый объект** очереди.
223+
- **Дедупликация**: для указателей — по адресу. Не «переиспользуйте» один и тот же указатель для разных логических
224+
задач.
215225
- **Jitter**: чтобы периодические задачи не «стреляли строем», можно добавить случайный сдвиг к `AddAfter`.
216-
- **Соблюдайте контекст** в `invoke`/хуках: долгие операции должны уважать `ctx.Done()`; иначе получится «карусель» таймаутов с перепланированием.
226+
- **Соблюдайте контекст** в `invoke`/хуках: долгие операции должны уважать `ctx.Done()`; иначе получится «карусель»
227+
таймаутов с перепланированием.
217228

218229
---
219230

0 commit comments

Comments
 (0)