Skip to content

Commit c4ee063

Browse files
committed
feat(logging): add HomeAppLogForwarder for application log forwarding
1 parent f28258d commit c4ee063

4 files changed

Lines changed: 351 additions & 2 deletions

File tree

internal/home/client.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const (
2828
redisKeyModels = "models"
2929
redisKeyUsage = "usage"
3030
redisKeyRequestLog = "request-log"
31+
redisKeyAppLog = "app-log"
3132

3233
homeReconnectInterval = time.Second
3334
homeReconnectFailoverThreshold = 3
@@ -650,6 +651,17 @@ func (c *Client) RPushRequestLog(ctx context.Context, payload []byte) error {
650651
return cmd.RPush(ctx, redisKeyRequestLog, payload).Err()
651652
}
652653

654+
func (c *Client) RPushAppLog(ctx context.Context, payload []byte) error {
655+
cmd, errClient := c.commandClient()
656+
if errClient != nil {
657+
return errClient
658+
}
659+
if len(payload) == 0 {
660+
return nil
661+
}
662+
return cmd.RPush(ctx, redisKeyAppLog, payload).Err()
663+
}
664+
653665
func (c *Client) handleSubscriptionPayload(channel string, payload string, onConfig func([]byte) error) error {
654666
payload = strings.TrimSpace(payload)
655667
if payload == "" {
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package logging
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"strings"
8+
"sync"
9+
"sync/atomic"
10+
"time"
11+
12+
"github.com/router-for-me/CLIProxyAPI/v7/internal/home"
13+
log "github.com/sirupsen/logrus"
14+
)
15+
16+
const defaultHomeAppLogQueueSize = 1024
17+
18+
type homeAppLogClient interface {
19+
HeartbeatOK() bool
20+
RPushAppLog(ctx context.Context, payload []byte) error
21+
}
22+
23+
type homeAppLogPayload struct {
24+
Line string `json:"line"`
25+
Level string `json:"level,omitempty"`
26+
Timestamp string `json:"timestamp,omitempty"`
27+
}
28+
29+
var currentHomeAppLogClient = func() homeAppLogClient {
30+
return home.Current()
31+
}
32+
33+
// HomeAppLogForwarder forwards application logs to Home after the control connection is healthy.
34+
type HomeAppLogForwarder struct {
35+
formatter log.Formatter
36+
queue chan homeAppLogPayload
37+
stop chan struct{}
38+
stopOnce sync.Once
39+
wg sync.WaitGroup
40+
enabled atomic.Bool
41+
}
42+
43+
// StartHomeAppLogForwarder installs a logrus hook that forwards future application logs to Home.
44+
func StartHomeAppLogForwarder(queueSize int) *HomeAppLogForwarder {
45+
if queueSize <= 0 {
46+
queueSize = defaultHomeAppLogQueueSize
47+
}
48+
forwarder := &HomeAppLogForwarder{
49+
formatter: &LogFormatter{},
50+
queue: make(chan homeAppLogPayload, queueSize),
51+
stop: make(chan struct{}),
52+
}
53+
forwarder.enabled.Store(true)
54+
forwarder.wg.Add(1)
55+
go forwarder.run()
56+
log.AddHook(forwarder)
57+
return forwarder
58+
}
59+
60+
// Stop disables forwarding and waits for the background sender to exit.
61+
func (f *HomeAppLogForwarder) Stop() {
62+
if f == nil {
63+
return
64+
}
65+
f.stopOnce.Do(func() {
66+
f.enabled.Store(false)
67+
close(f.stop)
68+
f.wg.Wait()
69+
})
70+
}
71+
72+
// Levels implements logrus.Hook.
73+
func (f *HomeAppLogForwarder) Levels() []log.Level {
74+
return log.AllLevels
75+
}
76+
77+
// Fire implements logrus.Hook.
78+
func (f *HomeAppLogForwarder) Fire(entry *log.Entry) error {
79+
if f == nil || entry == nil || !f.enabled.Load() {
80+
return nil
81+
}
82+
client := currentHomeAppLogClient()
83+
if client == nil || !client.HeartbeatOK() {
84+
return nil
85+
}
86+
line, errFormat := f.formatEntry(entry)
87+
if errFormat != nil || strings.TrimSpace(line) == "" {
88+
return nil
89+
}
90+
91+
payload := homeAppLogPayload{
92+
Line: line,
93+
Level: entry.Level.String(),
94+
Timestamp: entry.Time.Format(time.RFC3339Nano),
95+
}
96+
select {
97+
case f.queue <- payload:
98+
default:
99+
}
100+
return nil
101+
}
102+
103+
func (f *HomeAppLogForwarder) formatEntry(entry *log.Entry) (string, error) {
104+
formatter := f.formatter
105+
if formatter == nil {
106+
formatter = &LogFormatter{}
107+
}
108+
raw, errFormat := formatter.Format(entry)
109+
if errFormat != nil {
110+
return "", errFormat
111+
}
112+
return string(raw), nil
113+
}
114+
115+
func (f *HomeAppLogForwarder) run() {
116+
defer f.wg.Done()
117+
for {
118+
select {
119+
case <-f.stop:
120+
return
121+
case payload := <-f.queue:
122+
f.forward(payload)
123+
}
124+
}
125+
}
126+
127+
func (f *HomeAppLogForwarder) forward(payload homeAppLogPayload) {
128+
if !f.enabled.Load() {
129+
return
130+
}
131+
client := currentHomeAppLogClient()
132+
if client == nil || !client.HeartbeatOK() {
133+
return
134+
}
135+
raw, errMarshal := json.Marshal(&payload)
136+
if errMarshal != nil {
137+
return
138+
}
139+
if errPush := client.RPushAppLog(context.Background(), raw); errPush != nil && isHomeAppLogUnsupported(errPush) {
140+
f.enabled.Store(false)
141+
}
142+
}
143+
144+
func isHomeAppLogUnsupported(err error) bool {
145+
if err == nil {
146+
return false
147+
}
148+
msg := strings.ToLower(strings.TrimSpace(err.Error()))
149+
if msg == "" {
150+
return false
151+
}
152+
for {
153+
switch {
154+
case strings.Contains(msg, "unsupported key"):
155+
return true
156+
case strings.Contains(msg, "unknown command"):
157+
return true
158+
case strings.Contains(msg, "unsupported command"):
159+
return true
160+
}
161+
err = errors.Unwrap(err)
162+
if err == nil {
163+
return false
164+
}
165+
msg = strings.ToLower(strings.TrimSpace(err.Error()))
166+
}
167+
}
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
package logging
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"errors"
8+
"strings"
9+
"sync"
10+
"testing"
11+
"time"
12+
13+
log "github.com/sirupsen/logrus"
14+
)
15+
16+
type stubHomeAppLogClient struct {
17+
mu sync.Mutex
18+
heartbeatOK bool
19+
err error
20+
pushed [][]byte
21+
}
22+
23+
func (c *stubHomeAppLogClient) HeartbeatOK() bool { return c.heartbeatOK }
24+
25+
func (c *stubHomeAppLogClient) RPushAppLog(_ context.Context, payload []byte) error {
26+
c.mu.Lock()
27+
defer c.mu.Unlock()
28+
if c.err != nil {
29+
return c.err
30+
}
31+
c.pushed = append(c.pushed, bytes.Clone(payload))
32+
return nil
33+
}
34+
35+
func (c *stubHomeAppLogClient) pushedCount() int {
36+
c.mu.Lock()
37+
defer c.mu.Unlock()
38+
return len(c.pushed)
39+
}
40+
41+
func (c *stubHomeAppLogClient) pushedAt(index int) []byte {
42+
c.mu.Lock()
43+
defer c.mu.Unlock()
44+
if index < 0 || index >= len(c.pushed) {
45+
return nil
46+
}
47+
return bytes.Clone(c.pushed[index])
48+
}
49+
50+
func TestHomeAppLogForwarder_ForwardsFormattedLogWhenHomeHealthy(t *testing.T) {
51+
original := currentHomeAppLogClient
52+
defer func() {
53+
currentHomeAppLogClient = original
54+
}()
55+
56+
stub := &stubHomeAppLogClient{heartbeatOK: true}
57+
currentHomeAppLogClient = func() homeAppLogClient {
58+
return stub
59+
}
60+
61+
forwarder := &HomeAppLogForwarder{
62+
formatter: &LogFormatter{},
63+
queue: make(chan homeAppLogPayload, 4),
64+
stop: make(chan struct{}),
65+
}
66+
forwarder.enabled.Store(true)
67+
forwarder.wg.Add(1)
68+
go forwarder.run()
69+
defer forwarder.Stop()
70+
71+
entry := log.NewEntry(log.StandardLogger())
72+
entry.Time = time.Date(2026, 5, 29, 8, 0, 0, 0, time.Local)
73+
entry.Level = log.DebugLevel
74+
entry.Message = "debug details"
75+
76+
if errFire := forwarder.Fire(entry); errFire != nil {
77+
t.Fatalf("Fire error: %v", errFire)
78+
}
79+
80+
deadline := time.Now().Add(time.Second)
81+
for stub.pushedCount() == 0 && time.Now().Before(deadline) {
82+
time.Sleep(10 * time.Millisecond)
83+
}
84+
if stub.pushedCount() != 1 {
85+
t.Fatalf("pushed records = %d, want 1", stub.pushedCount())
86+
}
87+
88+
var got homeAppLogPayload
89+
if errUnmarshal := json.Unmarshal(stub.pushedAt(0), &got); errUnmarshal != nil {
90+
t.Fatalf("unmarshal payload: %v", errUnmarshal)
91+
}
92+
if got.Level != "debug" {
93+
t.Fatalf("level = %q, want debug", got.Level)
94+
}
95+
if !strings.Contains(got.Line, "debug details") {
96+
t.Fatalf("line %q missing log message", got.Line)
97+
}
98+
if strings.TrimSpace(got.Timestamp) == "" {
99+
t.Fatal("timestamp empty, want non-empty")
100+
}
101+
}
102+
103+
func TestHomeAppLogForwarder_SkipsWhenHomeHeartbeatIsDown(t *testing.T) {
104+
original := currentHomeAppLogClient
105+
defer func() {
106+
currentHomeAppLogClient = original
107+
}()
108+
109+
stub := &stubHomeAppLogClient{heartbeatOK: false}
110+
currentHomeAppLogClient = func() homeAppLogClient {
111+
return stub
112+
}
113+
114+
forwarder := &HomeAppLogForwarder{
115+
formatter: &LogFormatter{},
116+
queue: make(chan homeAppLogPayload, 4),
117+
stop: make(chan struct{}),
118+
}
119+
forwarder.enabled.Store(true)
120+
121+
entry := log.NewEntry(log.StandardLogger())
122+
entry.Time = time.Now()
123+
entry.Level = log.InfoLevel
124+
entry.Message = "should stay local"
125+
126+
if errFire := forwarder.Fire(entry); errFire != nil {
127+
t.Fatalf("Fire error: %v", errFire)
128+
}
129+
if stub.pushedCount() != 0 {
130+
t.Fatalf("pushed records = %d, want 0", stub.pushedCount())
131+
}
132+
}
133+
134+
func TestHomeAppLogForwarder_DisablesForwardingWhenHomeDoesNotSupportAppLog(t *testing.T) {
135+
original := currentHomeAppLogClient
136+
defer func() {
137+
currentHomeAppLogClient = original
138+
}()
139+
140+
stub := &stubHomeAppLogClient{
141+
heartbeatOK: true,
142+
err: errors.New("ERR unsupported key"),
143+
}
144+
currentHomeAppLogClient = func() homeAppLogClient {
145+
return stub
146+
}
147+
148+
forwarder := &HomeAppLogForwarder{
149+
formatter: &LogFormatter{},
150+
queue: make(chan homeAppLogPayload, 4),
151+
stop: make(chan struct{}),
152+
}
153+
forwarder.enabled.Store(true)
154+
155+
forwarder.forward(homeAppLogPayload{Line: "legacy home cannot receive app logs"})
156+
if forwarder.enabled.Load() {
157+
t.Fatal("forwarder still enabled, want disabled after unsupported app-log response")
158+
}
159+
}

0 commit comments

Comments
 (0)