@@ -4,21 +4,22 @@ import (
44 "fmt"
55 "os"
66 "strings"
7+ "sync"
78 "time"
89
910 "github.com/devforth/OnLogs/app/util"
1011 "github.com/devforth/OnLogs/app/vars"
1112 "github.com/syndtr/goleveldb/leveldb"
1213 "github.com/syndtr/goleveldb/leveldb/iterator"
14+ leveldbUtil "github.com/syndtr/goleveldb/leveldb/util"
1315)
1416
1517func GetLogStatusKey (message string ) string {
16- if strings .Contains (message , "ERROR" ) || strings .Contains (message , "ERR" ) || // const statuses_errors = ["ERROR", "ERR", "Error", "Err"];
17- strings .Contains (message , "Error" ) || strings .Contains (message , "Err" ) {
18+ if strings .Contains (message , "ERROR" ) || strings .Contains (message , "ERR" ) {
1819 return "error"
19- } else if strings .Contains (message , "WARN" ) || strings .Contains (message , "WARNING" ) { // const statuses_warnings = ["WARN", "WARNING"];
20+ } else if strings .Contains (message , "WARN" ) || strings .Contains (message , "WARNING" ) {
2021 return "warn"
21- } else if strings .Contains (message , "DEBUG" ) { // const statuses_other = ["DEBUG", "INFO", "ONLOGS"];
22+ } else if strings .Contains (message , "DEBUG" ) {
2223 return "debug"
2324 } else if strings .Contains (message , "INFO" ) {
2425 return "info"
@@ -28,6 +29,215 @@ func GetLogStatusKey(message string) string {
2829 return "other"
2930}
3031
32+ func checkAndManageLogSize (host string , container string ) error {
33+ maxSize , err := util .ParseHumanReadableSize (os .Getenv ("MAX_LOGS_SIZE" ))
34+ if err != nil {
35+ return fmt .Errorf ("failed to parse MAX_LOGS_SIZE: %v" , err )
36+ }
37+
38+ for {
39+ hosts , err := os .ReadDir ("leveldb/hosts/" )
40+ if err != nil {
41+ return fmt .Errorf ("failed to read hosts directory: %v" , err )
42+ }
43+
44+ var totalSize int64
45+ for _ , h := range hosts {
46+ hostName := h .Name ()
47+ containers , _ := os .ReadDir ("leveldb/hosts/" + hostName + "/containers" )
48+ for _ , c := range containers {
49+ containerName := c .Name ()
50+ size := util .GetDirSize (hostName , containerName )
51+ totalSize += int64 (size * 1024 * 1024 )
52+ }
53+ }
54+
55+ fmt .Printf ("Max size: %d, current dir size: %d\n " , maxSize , totalSize )
56+ if totalSize <= maxSize {
57+ break
58+ }
59+
60+ var cutoffKeys [][]byte
61+ for _ , h := range hosts {
62+ hostName := h .Name ()
63+ containers , _ := os .ReadDir ("leveldb/hosts/" + hostName + "/containers" )
64+ for _ , c := range containers {
65+ containerName := c .Name ()
66+ logsDB := util .GetDB (hostName , containerName , "logs" )
67+ if logsDB == nil {
68+ continue
69+ }
70+
71+ cutoffKeysForContainer , err := getCutoffKeysForContainer (logsDB , 200 )
72+ if err != nil || len (cutoffKeysForContainer ) == 0 {
73+ continue
74+ }
75+ cutoffKeys = append (cutoffKeys , cutoffKeysForContainer )
76+ }
77+ }
78+
79+ if len (cutoffKeys ) == 0 {
80+ fmt .Println ("Nothing to delete, cutoff keys not found." )
81+ break
82+ }
83+
84+ oldestCutoffKey := findOldestCutoffKey (cutoffKeys )
85+ oldestTime , err := time .Parse (time .RFC3339Nano , getDateTimeFromKey (string (oldestCutoffKey )))
86+ if err != nil {
87+ fmt .Println ("Error parsing oldest time:" , err )
88+ break
89+ }
90+ fmt .Println ("Oldest time for deletion cutoff:" , oldestTime )
91+
92+ for _ , h := range hosts {
93+ hostName := h .Name ()
94+ containers , _ := os .ReadDir ("leveldb/hosts/" + hostName + "/containers" )
95+ for _ , c := range containers {
96+ containerName := c .Name ()
97+ logsDB := util .GetDB (hostName , containerName , "logs" )
98+ if logsDB == nil {
99+ continue
100+ }
101+
102+ batch := new (leveldb.Batch )
103+ deletedCount := 0
104+ iter := logsDB .NewIterator (nil , nil )
105+
106+ count := 0
107+ for ok := iter .First (); ok && count < 200 ; ok = iter .Next () {
108+ count ++
109+ keyTime , err := time .Parse (time .RFC3339Nano , getDateTimeFromKey (string (iter .Key ())))
110+ if err != nil {
111+ fmt .Println ("Error parsing key time:" , err )
112+ continue
113+ }
114+ if keyTime .Before (oldestTime ) || keyTime .Equal (oldestTime ) {
115+ batch .Delete (iter .Key ())
116+ deletedCount ++
117+ }
118+ }
119+ iter .Release ()
120+
121+ if deletedCount > 0 {
122+ err = logsDB .Write (batch , nil )
123+ if err != nil {
124+ fmt .Printf ("Failed to delete batch in %s/%s: %v\n " , hostName , containerName , err )
125+ } else {
126+ fmt .Printf ("Deleted %d logs from %s/%s\n " , deletedCount , hostName , containerName )
127+ }
128+ logsDB .CompactRange (leveldbUtil.Range {Start : nil , Limit : nil })
129+ }
130+
131+ statusesDB := util .GetDB (hostName , containerName , "statuses" )
132+ if statusesDB != nil {
133+ batch := new (leveldb.Batch )
134+ deletedCountStatuses := 0
135+ iter := statusesDB .NewIterator (nil , nil )
136+
137+ for ok := iter .First (); ok ; ok = iter .Next () {
138+ keyTime , err := time .Parse (time .RFC3339Nano , getDateTimeFromKey (string (iter .Key ())))
139+ if err != nil {
140+ fmt .Println ("Error parsing key time:" , err )
141+ continue
142+ }
143+ if keyTime .Before (oldestTime ) || keyTime .Equal (oldestTime ) {
144+ batch .Delete (iter .Key ())
145+ deletedCountStatuses ++
146+ }
147+ }
148+ iter .Release ()
149+
150+ if deletedCountStatuses > 0 {
151+ err := statusesDB .Write (batch , nil )
152+ if err != nil {
153+ fmt .Printf ("Failed to delete batch in statusesDB for %s/%s: %v\n " , hostName , containerName , err )
154+ }
155+ statusesDB .CompactRange (leveldbUtil.Range {Start : nil , Limit : nil })
156+ }
157+ }
158+ }
159+ }
160+
161+ time .Sleep (100 * time .Millisecond )
162+ }
163+
164+ return nil
165+ }
166+
167+ func getCutoffKeysForContainer (db * leveldb.DB , limit int ) ([]byte , error ) {
168+ iter := db .NewIterator (nil , nil )
169+ defer iter .Release ()
170+
171+ var cutoffKeys [][]byte
172+ for ok := iter .First (); ok && len (cutoffKeys ) < limit ; ok = iter .Next () {
173+ key := append ([]byte {}, iter .Key ()... )
174+ cutoffKeys = append (cutoffKeys , key )
175+ }
176+
177+ if len (cutoffKeys ) < limit {
178+ return nil , fmt .Errorf ("insufficient records to form cutoff keys" )
179+ }
180+
181+ return cutoffKeys [len (cutoffKeys )- 1 ], nil
182+ }
183+
184+ func findOldestCutoffKey (cutoffKeys [][]byte ) []byte {
185+ var oldestKey []byte
186+ var oldestTime time.Time
187+ first := true
188+
189+ for _ , key := range cutoffKeys {
190+ keyStr := string (key )
191+ keyTime , err := time .Parse (time .RFC3339Nano , getDateTimeFromKey (keyStr ))
192+ if err != nil {
193+ fmt .Println ("Error parsing key time:" , err )
194+ continue
195+ }
196+
197+ if first || keyTime .Before (oldestTime ) {
198+ oldestKey = key
199+ oldestTime = keyTime
200+ first = false
201+ }
202+ }
203+ return oldestKey
204+ }
205+
206+ var (
207+ logCleanupMu sync.Mutex
208+ nextCleanup time.Time
209+ isCleanupRunning bool
210+ )
211+
212+ func MaybeScheduleCleanup (host string , container string ) {
213+ logCleanupMu .Lock ()
214+
215+ defer logCleanupMu .Unlock ()
216+
217+ if isCleanupRunning {
218+ return
219+ }
220+ if time .Now ().Before (nextCleanup ) {
221+ return
222+ }
223+
224+ isCleanupRunning = true
225+
226+ go func () {
227+ err := checkAndManageLogSize (host , container )
228+
229+ logCleanupMu .Lock ()
230+ defer logCleanupMu .Unlock ()
231+
232+ isCleanupRunning = false
233+ nextCleanup = time .Now ().Add (1 * time .Minute )
234+
235+ if err != nil {
236+ fmt .Printf ("Log cleanup failed: %v\n " , err )
237+ }
238+ }()
239+ }
240+
31241func PutLogMessage (db * leveldb.DB , host string , container string , message_item []string ) error {
32242 if len (message_item [0 ]) < 30 {
33243 fmt .Println ("WARNING: got broken timestamp: " , "timestamp: " + message_item [0 ], "message: " + message_item [1 ])
@@ -37,6 +247,9 @@ func PutLogMessage(db *leveldb.DB, host string, container string, message_item [
37247 if host == "" {
38248 panic ("Host is not mentioned!" )
39249 }
250+
251+ MaybeScheduleCleanup (host , container )
252+
40253 location := host + "/" + container
41254 if vars .Statuses_DBs [location ] == nil {
42255 vars .Statuses_DBs [location ] = util .GetDB (host , container , "statuses" )
0 commit comments