Skip to content

Commit afe047b

Browse files
author
xml
committed
feat: Implement support for online and local rate limiting
Task: https://pms.uniontech.com/story-view-40501.html
1 parent 3842487 commit afe047b

19 files changed

Lines changed: 487 additions & 80 deletions

File tree

src/internal/config/config.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,11 @@ type Config struct {
9191
NonUnknownList []string // 非未知来源更新list文件
9292
OtherSourceList []string // 其他类型更新list文件路径
9393

94-
DownloadSpeedLimitConfig string
95-
lastoreDaemonStatus LastoreDaemonStatus
96-
UpdateStatus string
97-
PlatformUpdate bool
94+
DownloadSpeedLimitConfig string
95+
LocalDownloadSpeedLimitConfig string
96+
lastoreDaemonStatus LastoreDaemonStatus
97+
UpdateStatus string
98+
PlatformUpdate bool
9899

99100
PlatformUrl string // 更新接口地址
100101
StartCheckRange []int // 开机检查更新区间
@@ -188,6 +189,7 @@ const (
188189
dSettingsKeySystemSourceList = "system-sources"
189190
dSettingsKeyNonUnknownList = "non-unknown-sources"
190191
dSettingsKeyDownloadSpeedLimit = "download-speed-limit"
192+
dSettingsKeyLocalDownloadSpeedLimit = "local-download-speed-limit"
191193
DSettingsKeyLastoreDaemonStatus = "lastore-daemon-status"
192194
dSettingsKeyUpdateStatus = "update-status"
193195
dSettingsKeyPlatformUpdate = "platform-update"
@@ -467,6 +469,13 @@ func getConfigFromDSettings() *Config {
467469
c.DownloadSpeedLimitConfig = v.Value().(string)
468470
}
469471

472+
v, err = c.dsLastoreManager.Value(0, dSettingsKeyLocalDownloadSpeedLimit)
473+
if err != nil {
474+
logger.Warning(err)
475+
} else {
476+
c.LocalDownloadSpeedLimitConfig = v.Value().(string)
477+
}
478+
470479
updateLastoreDaemonStatus := func() {
471480
v, err = c.dsLastoreManager.Value(0, DSettingsKeyLastoreDaemonStatus)
472481
if err != nil {
@@ -899,6 +908,11 @@ func (c *Config) SetDownloadSpeedLimitConfig(config string) error {
899908
return c.save(dSettingsKeyDownloadSpeedLimit, config)
900909
}
901910

911+
func (c *Config) SetLocalDownloadSpeedLimitConfig(config string) error {
912+
c.LocalDownloadSpeedLimitConfig = config
913+
return c.save(dSettingsKeyLocalDownloadSpeedLimit, config)
914+
}
915+
902916
func (c *Config) SetLastoreDaemonStatus(status LastoreDaemonStatus) error {
903917
c.statusMu.Lock()
904918
c.lastoreDaemonStatus = status

src/internal/system/apt/apt.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -120,19 +120,21 @@ func createCommandLine(cmdType string, cmdArgs []string) *exec.Cmd {
120120
return exec.Command("apt-get", args...)
121121
}
122122

123-
func newAPTCommand(cmdSet system.CommandSet, jobId string, cmdType string, fn system.Indicator, cmdArgs []string) *system.Command {
123+
func newAPTCommand(cmdSet system.CommandSet, jobId string, cmdType string, fn system.Indicator, deliveryFn system.DeliveryIndicator, cmdArgs []string) *system.Command {
124124
cmd := createCommandLine(cmdType, cmdArgs)
125125

126126
// See aptCommand.Abort
127127
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
128128
r := &system.Command{
129-
JobId: jobId,
130-
CmdSet: cmdSet,
131-
Indicator: fn,
132-
ParseJobError: parseJobError,
133-
ParseProgressInfo: parseProgressInfo,
134-
Cmd: cmd,
135-
Cancelable: true,
129+
JobId: jobId,
130+
CmdSet: cmdSet,
131+
Indicator: fn,
132+
DeliveryIndicator: deliveryFn,
133+
ParseJobError: parseJobError,
134+
ParseProgressInfo: parseProgressInfo,
135+
ParseDeliveryDownloadInfo: parseDeliveryDownloadInfo,
136+
Cmd: cmd,
137+
Cancelable: true,
136138
}
137139
cmd.Stdout = &r.Stdout
138140
cmd.Stderr = &r.Stderr

src/internal/system/apt/proxy.go

Lines changed: 63 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type APTSystem struct {
2929
CmdSet map[string]*system.Command
3030
Indicator system.Indicator
3131
IncrementalUpdate bool
32+
DeliveryIndicator system.DeliveryIndicator
3233
}
3334

3435
func NewSystem(nonUnknownList []string, otherList []string, incrementalUpdate bool) system.System {
@@ -56,6 +57,53 @@ func parseProgressField(v string) (float64, error) {
5657
return progress, nil
5758
}
5859

60+
func parseDeliveryDownloadInfo(id, line string) (system.JobDeliveryDownloadInfo, error) {
61+
if strings.Contains(line, "102 Status") {
62+
var jobDeliveryDownloadInfo system.JobDeliveryDownloadInfo
63+
jobDeliveryDownloadInfo.JobId = id
64+
line = strings.TrimPrefix(line, "102 Status[")
65+
line = strings.TrimSuffix(line, "]")
66+
parts := strings.Split(line, "} {")
67+
var isFinish bool
68+
var err error
69+
var speed int64
70+
var proto string
71+
for i := range parts {
72+
parts[i] = strings.Trim(parts[i], "{} ")
73+
}
74+
for _, part := range parts {
75+
kv := strings.SplitN(part, " ", 2)
76+
if len(kv) != 2 {
77+
continue
78+
}
79+
key := kv[0]
80+
value := kv[1]
81+
82+
if key != "IsFinish" && key != "Speed" && key != "Proto" {
83+
continue
84+
} else if key == "IsFinish" {
85+
isFinish, err = strconv.ParseBool(value)
86+
if err != nil {
87+
logger.Warningf("failed to parse isFinish %v", err)
88+
isFinish = true
89+
}
90+
} else if key == "Speed" {
91+
speed, _ = strconv.ParseInt(value, 10, 64)
92+
} else if key == "Proto" {
93+
proto = value
94+
}
95+
}
96+
if !isFinish {
97+
jobDeliveryDownloadInfo.Speed = speed
98+
jobDeliveryDownloadInfo.Proto = proto
99+
} else {
100+
jobDeliveryDownloadInfo.Speed = -1
101+
}
102+
return jobDeliveryDownloadInfo, nil
103+
}
104+
return system.JobDeliveryDownloadInfo{JobId: id}, nil
105+
}
106+
59107
func parseProgressInfo(id, line string) (system.JobProgressInfo, error) {
60108
fs := strings.SplitN(line, ":", 4)
61109
if len(fs) != 4 {
@@ -109,6 +157,10 @@ func (p *APTSystem) AttachIndicator(f system.Indicator) {
109157
p.Indicator = f
110158
}
111159

160+
func (p *APTSystem) AttachDeliveryIndicator(f system.DeliveryIndicator) {
161+
p.DeliveryIndicator = f
162+
}
163+
112164
func WaitDpkgLockRelease() {
113165
for {
114166
msg, wait := system.CheckLock("/var/lib/dpkg/lock")
@@ -268,7 +320,7 @@ func (p *APTSystem) DownloadPackages(jobId string, packages []string, environ ma
268320
if err != nil {
269321
return err
270322
}
271-
c := newAPTCommand(p, jobId, system.DownloadJobType, p.Indicator, append(packages, OptionToArgs(args)...))
323+
c := newAPTCommand(p, jobId, system.DownloadJobType, p.Indicator, p.DeliveryIndicator, append(packages, OptionToArgs(args)...))
272324
c.SetEnv(environ)
273325
return c.Start()
274326
}
@@ -300,11 +352,11 @@ func (p *APTSystem) DownloadSource(jobId string, packages []string, environ map[
300352
environ["DEEPIN_IMMUTABLE_UPGRADE_APT_OPTION"] = upgradeArgString
301353
logger.Info("DownloadSource set env DEEPIN_IMMUTABLE_UPGRADE_APT_OPTION:", upgradeArgString)
302354

303-
c := newAPTCommand(p, jobId, system.IncrementalDownloadJobType, p.Indicator, cmdArgs)
355+
c := newAPTCommand(p, jobId, system.IncrementalDownloadJobType, p.Indicator, p.DeliveryIndicator, cmdArgs)
304356
c.SetEnv(environ)
305357
return c.Start()
306358
}
307-
c := newAPTCommand(p, jobId, system.PrepareDistUpgradeJobType, p.Indicator, append(packages, OptionToArgs(args)...))
359+
c := newAPTCommand(p, jobId, system.PrepareDistUpgradeJobType, p.Indicator, p.DeliveryIndicator, append(packages, OptionToArgs(args)...))
308360
c.SetEnv(environ)
309361
return c.Start()
310362
}
@@ -316,7 +368,7 @@ func (p *APTSystem) Remove(jobId string, packages []string, environ map[string]s
316368
return err
317369
}
318370

319-
c := newAPTCommand(p, jobId, system.RemoveJobType, p.Indicator, packages)
371+
c := newAPTCommand(p, jobId, system.RemoveJobType, p.Indicator, p.DeliveryIndicator, packages)
320372
environ["IMMUTABLE_DISABLE_REMOUNT"] = "false"
321373
c.SetEnv(environ)
322374
return safeStart(c)
@@ -328,7 +380,7 @@ func (p *APTSystem) Install(jobId string, packages []string, environ map[string]
328380
if err != nil {
329381
return err
330382
}
331-
c := newAPTCommand(p, jobId, system.InstallJobType, p.Indicator, append(OptionToArgs(args), packages...))
383+
c := newAPTCommand(p, jobId, system.InstallJobType, p.Indicator, p.DeliveryIndicator, append(OptionToArgs(args), packages...))
332384
environ["IMMUTABLE_DISABLE_REMOUNT"] = "false"
333385
c.SetEnv(environ)
334386
return safeStart(c)
@@ -365,12 +417,12 @@ func (p *APTSystem) DistUpgrade(jobId string, packages []string, environ map[str
365417
environ["DEEPIN_IMMUTABLE_UPGRADE_APT_OPTION"] = upgradeArgString
366418
logger.Info("DistUpgrade set env DEEPIN_IMMUTABLE_UPGRADE_APT_OPTION:", upgradeArgString)
367419

368-
c := newAPTCommand(p, jobId, system.IncrementalUpdateJobType, p.Indicator, cmdArgs)
420+
c := newAPTCommand(p, jobId, system.IncrementalUpdateJobType, p.Indicator, p.DeliveryIndicator, cmdArgs)
369421
c.SetEnv(environ)
370422
return c.Start()
371423
}
372424

373-
c := newAPTCommand(p, jobId, system.DistUpgradeJobType, p.Indicator, append(OptionToArgs(args), packages...))
425+
c := newAPTCommand(p, jobId, system.DistUpgradeJobType, p.Indicator, p.DeliveryIndicator, append(OptionToArgs(args), packages...))
374426
environ["IMMUTABLE_DISABLE_REMOUNT"] = "false"
375427
c.SetEnv(environ)
376428
return safeStart(c)
@@ -384,7 +436,7 @@ func (p *APTSystem) UpdateSource(jobId string, environ map[string]string, args m
384436
logger.Warningf("Failed to update remotes: %v, %s", err, string(output))
385437
}
386438
}
387-
c := newAPTCommand(p, jobId, system.UpdateSourceJobType, p.Indicator, OptionToArgs(args))
439+
c := newAPTCommand(p, jobId, system.UpdateSourceJobType, p.Indicator, p.DeliveryIndicator, OptionToArgs(args))
388440
c.AtExitFn = func() bool {
389441
// 无网络时检查更新失败,exitCode为0,空间不足(不确定exit code)导致需要特殊处理
390442
if c.ExitCode == system.ExitSuccess && bytes.Contains(c.Stderr.Bytes(), []byte("Some index files failed to download")) {
@@ -402,7 +454,7 @@ func (p *APTSystem) UpdateSource(jobId string, environ map[string]string, args m
402454
}
403455

404456
func (p *APTSystem) Clean(jobId string) error {
405-
c := newAPTCommand(p, jobId, system.CleanJobType, p.Indicator, nil)
457+
c := newAPTCommand(p, jobId, system.CleanJobType, p.Indicator, p.DeliveryIndicator, nil)
406458
return c.Start()
407459
}
408460

@@ -422,7 +474,7 @@ func (p *APTSystem) AbortWithFailed(jobId string) error {
422474

423475
func (p *APTSystem) FixError(jobId string, errType string, environ map[string]string, args map[string]string) error {
424476
WaitDpkgLockRelease()
425-
c := newAPTCommand(p, jobId, system.FixErrorJobType, p.Indicator, append([]string{errType}, OptionToArgs(args)...))
477+
c := newAPTCommand(p, jobId, system.FixErrorJobType, p.Indicator, p.DeliveryIndicator, append([]string{errType}, OptionToArgs(args)...))
426478
environ["IMMUTABLE_DISABLE_REMOUNT"] = "false"
427479
c.SetEnv(environ)
428480
if system.JobErrorType(errType) == system.ErrorDependenciesBroken { // 修复依赖错误的时候,会有需要卸载dde的情况,因此需要用safeStart来进行处理
@@ -655,7 +707,7 @@ func parseBackupJobError(stdErrStr string, stdOutStr string) *system.JobError {
655707
}
656708

657709
func (p *APTSystem) OsBackup(jobId string) error {
658-
c := newAPTCommand(p, jobId, system.BackupJobType, p.Indicator, nil)
710+
c := newAPTCommand(p, jobId, system.BackupJobType, p.Indicator, p.DeliveryIndicator, nil)
659711
c.ParseJobError = parseBackupJobError
660712
c.ParseProgressInfo = func(id, line string) (system.JobProgressInfo, error) {
661713
type info struct {

src/internal/system/command.go

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,11 @@ type Command struct {
3434

3535
pipe *os.File
3636

37-
Indicator Indicator
38-
ParseProgressInfo ParseProgressInfo
39-
ParseJobError ParseJobError
37+
Indicator Indicator
38+
DeliveryIndicator DeliveryIndicator
39+
ParseProgressInfo ParseProgressInfo
40+
ParseJobError ParseJobError
41+
ParseDeliveryDownloadInfo ParseDeliveryDownloadInfo
4042

4143
Stdout bytes.Buffer
4244
Stderr bytes.Buffer
@@ -275,17 +277,27 @@ func (c *Command) updateProgress() {
275277
return
276278
}
277279

278-
info, err := c.ParseProgressInfo(c.JobId, line)
279-
if err != nil {
280-
logger.Errorf("aptCommand.updateProgress %v -> %v\n", info, err)
281-
c.Indicator(JobProgressInfo{
282-
OnlyLog: true,
283-
OriginalLog: line,
284-
})
280+
if strings.Contains(line, "102 Status") {
281+
deliveryInfo, err := c.ParseDeliveryDownloadInfo(c.JobId, line)
282+
if err != nil {
283+
logger.Errorf("aptCommand.updateProgress %v -> %v\n", deliveryInfo, err)
284+
continue
285+
}
286+
c.DeliveryIndicator(deliveryInfo)
285287
continue
288+
} else {
289+
info, err := c.ParseProgressInfo(c.JobId, line)
290+
if err != nil {
291+
logger.Errorf("aptCommand.updateProgress %v -> %v\n", info, err)
292+
c.Indicator(JobProgressInfo{
293+
OnlyLog: true,
294+
OriginalLog: line,
295+
})
296+
continue
297+
}
298+
info.OriginalLog = line
299+
c.Cancelable = info.Cancelable
300+
c.Indicator(info)
286301
}
287-
info.OriginalLog = line
288-
c.Cancelable = info.Cancelable
289-
c.Indicator(info)
290302
}
291303
}

src/internal/system/system.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,16 @@ const (
6262
NotifyExpireTimeoutPrivateLong = 600000
6363
)
6464

65+
type JobDeliveryDownloadInfo struct {
66+
JobId string
67+
FileName string
68+
Proto string
69+
DownloadSize int64
70+
DownloadedSize int64
71+
Speed int64
72+
Progress float64
73+
}
74+
6575
type JobProgressInfo struct {
6676
JobId string
6777
Progress float64
@@ -132,7 +142,9 @@ var NotSupportError = errors.New("not support operation")
132142
var ResourceExitError = errors.New("resource exists")
133143

134144
type Indicator func(JobProgressInfo)
145+
type DeliveryIndicator func(JobDeliveryDownloadInfo)
135146
type ParseProgressInfo func(id, line string) (JobProgressInfo, error)
147+
type ParseDeliveryDownloadInfo func(id, line string) (JobDeliveryDownloadInfo, error)
136148
type ParseJobError func(stdErrStr string, stdOutStr string) *JobError
137149

138150
type System interface {
@@ -146,6 +158,7 @@ type System interface {
146158
Abort(jobId string) error
147159
AbortWithFailed(jobId string) error
148160
AttachIndicator(Indicator)
161+
AttachDeliveryIndicator(DeliveryIndicator)
149162
FixError(jobId string, errType string, environ map[string]string, cmdArgs map[string]string) error
150163
OsBackup(jobId string) error
151164
CheckSystem(jobId string, checkType string, environ map[string]string, cmdArgs map[string]string) error

0 commit comments

Comments
 (0)