-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker.go
More file actions
64 lines (48 loc) · 1.61 KB
/
worker.go
File metadata and controls
64 lines (48 loc) · 1.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package workerPool
import (
"sync/atomic"
"time"
)
//===========[STATIC/CACHE]====================================================================================================
var uniqueIdCounter int64 = 0
//===========[STRUCTS]====================================================================================================
type worker[TWork any] struct {
//A channel of work that this worker will be dealing with
workBucket chan TWork
//This function will be processing the work from workBucket
workHandler func(TWork)
//Holds pointer to the pool this worker resides in
workerPool *WorkerPool[TWork]
timeout time.Duration
//ID of the worker
id int
}
//spawnGoroutine spawns a new goroutine that monitors this worker
func (w *worker[TWork]) spawnGoroutine() {
go func() {
defer w.workerPool.workers.Remove(w.id)
exit := make(chan struct{})
t := time.AfterFunc(w.timeout, func() {
exit <- struct{}{}
})
for {
select {
case work := <-w.workBucket:
stoppedOnTime := t.Stop()
w.workHandler(work)
//If this check is not made, you can restart the timer after this goroutine has already exited
//and once the timer fires it will try to send signal down the channel that nobody is listening to
if stoppedOnTime {
t.Reset(w.timeout)
}
case <-exit:
return
}
}
}()
}
//===========[FUNCTIONALITY]====================================================================================================
//issueNewWorkerId returns an int incremented by one every time this function is invoked
func issueNewWorkerId() int {
return int(atomic.AddInt64(&uniqueIdCounter, 1))
}