Skip to content
This repository was archived by the owner on Nov 5, 2024. It is now read-only.

Commit 30a1726

Browse files
committed
简化Agent连接代码
1 parent 08af230 commit 30a1726

13 files changed

Lines changed: 140 additions & 139 deletions

File tree

Lines changed: 1 addition & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,102 +1,15 @@
11
package agentutils
22

3-
import (
4-
"sync"
5-
)
6-
73
// Agent事件
84
type AgentEvent struct {
95
Name string `json:"name"`
106
Data interface{} `json:"data"`
117
}
128

13-
var eventQueueMap = map[string]map[chan *AgentEvent]*AgentState{} // agentId => { chan => State }
14-
var eventQueueLocker = sync.Mutex{}
15-
16-
// 新Agent事件
9+
// 获取新对象
1710
func NewAgentEvent(name string, data interface{}) *AgentEvent {
1811
return &AgentEvent{
1912
Name: name,
2013
Data: data,
2114
}
2215
}
23-
24-
// 等待Agent事件
25-
func WaitAgentQueue(agentId string, agentVersion string, osName string, speed float64, ip string, c chan *AgentEvent) {
26-
eventQueueLocker.Lock()
27-
defer eventQueueLocker.Unlock()
28-
_, found := eventQueueMap[agentId]
29-
if found {
30-
eventQueueMap[agentId][c] = &AgentState{
31-
Version: agentVersion,
32-
OsName: osName,
33-
Speed: speed,
34-
IP: ip,
35-
IsAvailable: true,
36-
}
37-
} else {
38-
eventQueueMap[agentId] = map[chan *AgentEvent]*AgentState{
39-
c: {
40-
Version: agentVersion,
41-
OsName: osName,
42-
Speed: speed,
43-
IP: ip,
44-
IsAvailable: true,
45-
},
46-
}
47-
}
48-
}
49-
50-
// 禁用Channel
51-
func DisableAgentQueue(agentId string, c chan *AgentEvent) {
52-
eventQueueLocker.Lock()
53-
defer eventQueueLocker.Unlock()
54-
m, found := eventQueueMap[agentId]
55-
if found {
56-
state, ok := m[c]
57-
if ok {
58-
state.IsAvailable = false
59-
}
60-
}
61-
}
62-
63-
// 删除Agent
64-
func RemoveAgentQueue(agentId string, c chan *AgentEvent) {
65-
eventQueueLocker.Lock()
66-
defer eventQueueLocker.Unlock()
67-
_, ok := eventQueueMap[agentId]
68-
if ok {
69-
delete(eventQueueMap[agentId], c)
70-
71-
if len(eventQueueMap[agentId]) == 0 {
72-
delete(eventQueueMap, agentId)
73-
}
74-
}
75-
}
76-
77-
// 发送Agent事件
78-
func PostAgentEvent(agentId string, event *AgentEvent) {
79-
eventQueueLocker.Lock()
80-
defer eventQueueLocker.Unlock()
81-
m, found := eventQueueMap[agentId]
82-
if found {
83-
for c, state := range m {
84-
if state.IsAvailable {
85-
c <- event
86-
}
87-
}
88-
}
89-
}
90-
91-
// 检查Agent是否正在运行
92-
func CheckAgentIsWaiting(agentId string) (state *AgentState, isWaiting bool) {
93-
eventQueueLocker.Lock()
94-
defer eventQueueLocker.Unlock()
95-
queue, _ := eventQueueMap[agentId]
96-
if len(queue) > 0 {
97-
for _, v := range queue {
98-
return v, true
99-
}
100-
}
101-
return nil, false
102-
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package agentutils
2+
3+
import (
4+
"sync"
5+
"time"
6+
)
7+
8+
var agentQueueMap = map[string]*AgentQueue{} // agentId => queue
9+
var agentQueueLocker = &sync.Mutex{}
10+
11+
type AgentQueue struct {
12+
c chan *AgentEvent
13+
}
14+
15+
func NewAgentQueue() *AgentQueue {
16+
return &AgentQueue{
17+
c: make(chan *AgentEvent, 32),
18+
}
19+
}
20+
21+
// 通知事件
22+
func PostAgentEvent(agentId string, event *AgentEvent) {
23+
state := FindAgentState(agentId)
24+
if !state.IsActive {
25+
return
26+
}
27+
28+
agentQueueLocker.Lock()
29+
queue, ok := agentQueueMap[agentId]
30+
if !ok {
31+
agentQueueLocker.Unlock()
32+
return
33+
}
34+
agentQueueLocker.Unlock()
35+
select {
36+
case queue.c <- event:
37+
default:
38+
}
39+
}
40+
41+
// 等待事件
42+
func Wait(agentId string) *AgentEvent {
43+
agentQueueLocker.Lock()
44+
queue, ok := agentQueueMap[agentId]
45+
if !ok {
46+
queue = NewAgentQueue()
47+
agentQueueMap[agentId] = queue
48+
}
49+
agentQueueLocker.Unlock()
50+
51+
timer := time.NewTimer(59 * time.Second)
52+
53+
select {
54+
case event := <-queue.c:
55+
timer.Stop()
56+
return event
57+
case <-timer.C:
58+
}
59+
60+
return nil
61+
}
Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,29 @@
11
package agentutils
22

3+
import "sync"
4+
5+
var agentStateMap = map[string]*AgentState{} //agentId => state
6+
var agentStateLocker = sync.Mutex{}
7+
38
// Agent状态
49
type AgentState struct {
5-
Version string // 版本号
6-
OsName string // 操作系统
7-
Speed float64 // 连接速度,ms
8-
IP string // IP地址
9-
IsAvailable bool // 是否可用
10+
Version string // 版本号
11+
OsName string // 操作系统
12+
Speed float64 // 连接速度,ms
13+
IP string // IP地址
14+
IsActive bool // 是否在线
15+
}
16+
17+
// 查找Agent状态
18+
func FindAgentState(agentId string) *AgentState {
19+
agentStateLocker.Lock()
20+
defer agentStateLocker.Unlock()
21+
22+
state, ok := agentStateMap[agentId]
23+
if ok {
24+
return state
25+
}
26+
state = &AgentState{}
27+
agentStateMap[agentId] = state
28+
return state
1029
}

teaweb/actions/default/agents/agentutils/init.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ func checkConnecting() {
3939
runtimeAgent := FindAgentRuntime(agent)
4040

4141
// 监控连通性
42-
_, isWaiting := CheckAgentIsWaiting(agent.Id)
43-
if !isWaiting {
42+
state := FindAgentState(agent.Id)
43+
if !state.IsActive {
4444
runtimeAgent.CountDisconnections++
4545

4646
if runtimeAgent.CountDisconnections > 0 && runtimeAgent.CountDisconnections%maxDisconnections == 0 { // 失去连接 N 次则提醒

teaweb/actions/default/agents/agentutils/menu.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ func AddTabbar(actionWrapper actions.ActionWrapper) {
4747
}
4848
defaultGroup := agents.SharedGroupList().FindGroup("default")
4949

50-
state, isWaiting := CheckAgentIsWaiting("local")
50+
state := FindAgentState("local")
5151
menu := menuGroup.FindMenu("default", defaultGroup.Name+topSubName)
52-
if isWaiting {
52+
if state.IsActive {
5353
subName := "已连接"
5454
if state != nil && len(state.OsName) > 0 {
5555
subName = state.OsName
@@ -67,7 +67,7 @@ func AddTabbar(actionWrapper actions.ActionWrapper) {
6767
counterMapping := map[string]int{} // groupId => count
6868
maxCount := 50
6969
for _, agent := range allAgents {
70-
state, isWaiting := CheckAgentIsWaiting(agent.Id)
70+
state := FindAgentState(agent.Id)
7171

7272
var menu *utils.Menu = nil
7373
if len(agent.GroupIds) > 0 {
@@ -111,7 +111,7 @@ func AddTabbar(actionWrapper actions.ActionWrapper) {
111111
}
112112
}
113113

114-
if isWaiting {
114+
if state.IsActive {
115115
subName := "已连接"
116116
if state != nil && len(state.OsName) > 0 {
117117
subName = state.OsName

teaweb/actions/default/agents/apps/addTask.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/TeaWeb/code/teaweb/actions/default/agents/agentutils"
88
"github.com/iwind/TeaGo/actions"
99
"github.com/iwind/TeaGo/lists"
10+
"github.com/iwind/TeaGo/logs"
1011
"github.com/iwind/TeaGo/maps"
1112
"github.com/iwind/TeaGo/types"
1213
"reflect"
@@ -143,19 +144,19 @@ func (this *AddTaskAction) RunPost(params struct {
143144

144145
switch timeTypeString {
145146
case "second":
146-
schedule.AddSecondRanges(ranges ...)
147+
schedule.AddSecondRanges(ranges...)
147148
case "minute":
148-
schedule.AddMinuteRanges(ranges ...)
149+
schedule.AddMinuteRanges(ranges...)
149150
case "hour":
150-
schedule.AddHourRanges(ranges ...)
151+
schedule.AddHourRanges(ranges...)
151152
case "day":
152-
schedule.AddDayRanges(ranges ...)
153+
schedule.AddDayRanges(ranges...)
153154
case "month":
154-
schedule.AddMonthRanges(ranges ...)
155+
schedule.AddMonthRanges(ranges...)
155156
case "year":
156-
schedule.AddYearRanges(ranges ...)
157+
schedule.AddYearRanges(ranges...)
157158
case "weekDay":
158-
schedule.AddWeekDayRanges(ranges ...)
159+
schedule.AddWeekDayRanges(ranges...)
159160
}
160161
}
161162

@@ -180,10 +181,13 @@ func (this *AddTaskAction) RunPost(params struct {
180181
}))
181182

182183
if app.IsSharedWithGroup {
183-
agentutils.SyncApp(agent.Id, agent.GroupIds, app, agentutils.NewAgentEvent("ADD_TASK", maps.Map{
184+
err := agentutils.SyncApp(agent.Id, agent.GroupIds, app, agentutils.NewAgentEvent("ADD_TASK", maps.Map{
184185
"appId": app.Id,
185186
"taskId": task.Id,
186187
}), nil)
188+
if err != nil {
189+
logs.Error(err)
190+
}
187191
}
188192

189193
this.Success()

teaweb/actions/default/agents/apps/execItemSource.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"github.com/TeaWeb/code/teaconfigs/agents"
55
"github.com/TeaWeb/code/teaweb/actions/default/agents/agentutils"
66
"github.com/iwind/TeaGo/actions"
7+
"github.com/iwind/TeaGo/logs"
78
"github.com/iwind/TeaGo/maps"
89
)
910

@@ -38,13 +39,16 @@ func (this *ExecItemSourceAction) Run(params struct {
3839

3940
// 同步
4041
if app.IsSharedWithGroup {
41-
agentutils.SyncAppEvent(agent.Id, agent.GroupIds, app, &agentutils.AgentEvent{
42+
err := agentutils.SyncAppEvent(agent.Id, agent.GroupIds, app, &agentutils.AgentEvent{
4243
Name: "RUN_ITEM",
4344
Data: maps.Map{
4445
"appId": app.Id,
4546
"itemId": params.ItemId,
4647
},
4748
})
49+
if err != nil {
50+
logs.Error(err)
51+
}
4852
}
4953

5054
this.Success()

teaweb/actions/default/agents/apps/itemDetail.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ func (this *ItemDetailAction) Run(params struct {
6767
// 是否在线
6868
this.Data["isWaiting"] = false
6969
if agent.On && app.On && item.On {
70-
state, isWaiting := agentutils.CheckAgentIsWaiting(params.AgentId)
71-
if state != nil && isWaiting {
70+
state := agentutils.FindAgentState(params.AgentId)
71+
if state.IsActive {
7272
if stringutil.VersionCompare(state.Version, "0.1") > 0 {
7373
this.Data["isWaiting"] = true
7474
}

teaweb/actions/default/agents/apps/itemOn.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"github.com/TeaWeb/code/teaconfigs/agents"
55
"github.com/TeaWeb/code/teaweb/actions/default/agents/agentutils"
66
"github.com/iwind/TeaGo/actions"
7+
"github.com/iwind/TeaGo/logs"
78
"github.com/iwind/TeaGo/maps"
89
)
910

@@ -43,10 +44,13 @@ func (this *ItemOnAction) Run(params struct {
4344
}))
4445

4546
if app.IsSharedWithGroup {
46-
agentutils.SyncApp(agent.Id, agent.GroupIds, app, agentutils.NewAgentEvent("UPDATE_ITEM", maps.Map{
47+
err := agentutils.SyncApp(agent.Id, agent.GroupIds, app, agentutils.NewAgentEvent("UPDATE_ITEM", maps.Map{
4748
"appId": app.Id,
4849
"itemId": params.ItemId,
4950
}), nil)
51+
if err != nil {
52+
logs.Error(err)
53+
}
5054
}
5155

5256
this.Success()

teaweb/actions/default/agents/apps/off.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"github.com/TeaWeb/code/teaconfigs/agents"
55
"github.com/TeaWeb/code/teaweb/actions/default/agents/agentutils"
66
"github.com/iwind/TeaGo/actions"
7+
"github.com/iwind/TeaGo/logs"
78
"github.com/iwind/TeaGo/maps"
89
)
910

@@ -37,9 +38,12 @@ func (this *OffAction) Run(params struct {
3738

3839
// 同步
3940
if app.IsSharedWithGroup {
40-
agentutils.SyncApp(agent.Id, agent.GroupIds, app, agentutils.NewAgentEvent("UPDATE_APP", maps.Map{
41+
err := agentutils.SyncApp(agent.Id, agent.GroupIds, app, agentutils.NewAgentEvent("UPDATE_APP", maps.Map{
4142
"appId": app.Id,
4243
}), nil)
44+
if err != nil {
45+
logs.Error(err)
46+
}
4347
}
4448

4549
this.Success()

0 commit comments

Comments
 (0)