Skip to content

Commit e4555ae

Browse files
Read lz4-compressed files and remove after reading (#729)
* lz4 decompress in file worker * lz4: start read from offset * file pligin: remove_after * lz4-reader: get min offset * lz4read: avoid eof error * add TestReadCompressedEmpty * add TestReadCompressedEmpty * save last read timestamp in offsets * add last read timestamp * add parseOptionalLine on read offset file * save last read time offset * rm eofTimestamp in doneJob * add customMimeTypes * add TestWorkerRemoveAfter * file remove after: update timestamp on append * use sync_atomic in eofInfo * fix linter * move remove_after to maintenanceJob * use nano unixtime in remove_after && fix after review * fix after review
1 parent 302372d commit e4555ae

10 files changed

Lines changed: 797 additions & 45 deletions

File tree

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ require (
3030
github.com/klauspost/compress v1.18.1
3131
github.com/minio/minio-go v6.0.14+incompatible
3232
github.com/ozontech/insane-json v0.1.9
33+
github.com/pierrec/lz4/v4 v4.1.22
3334
github.com/prometheus/client_golang v1.16.0
3435
github.com/prometheus/client_model v0.3.0
3536
github.com/prometheus/common v0.42.0
@@ -117,7 +118,6 @@ require (
117118
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
118119
github.com/opencontainers/runtime-spec v1.0.2 // indirect
119120
github.com/pascaldekloe/name v1.0.1 // indirect
120-
github.com/pierrec/lz4/v4 v4.1.22 // indirect
121121
github.com/pkg/errors v0.9.1 // indirect
122122
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
123123
github.com/ryanuber/go-glob v1.0.0 // indirect

plugin/input/file/README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,13 @@ Job maintenance `fstat` tracked files to detect if new portion of data have been
119119

120120
<br>
121121

122+
**`remove_after`** *`cfg.Duration`* *`default=0`*
123+
124+
After reaching EOF, the number of seconds to wait before removing the file, unless new data is written.
125+
If not specified, files are not removed.
126+
127+
<br>
128+
122129
**`should_watch_file_changes`** *`bool`* *`default=false`*
123130

124131
It turns on watching for file modifications. Turning it on cause more CPU work, but it is more probable to catch file truncation

plugin/input/file/file.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,13 @@ type Config struct {
152152
MaintenanceInterval cfg.Duration `json:"maintenance_interval" default:"10s" parse:"duration"` // *
153153
MaintenanceInterval_ time.Duration
154154

155+
// > @3@4@5@6
156+
// >
157+
// > After reaching EOF, the number of seconds to wait before removing the file, unless new data is written.
158+
// > If not specified, files are not removed.
159+
RemoveAfter cfg.Duration `json:"remove_after" default:"0" parse:"duration"` // *
160+
RemoveAfter_ time.Duration
161+
155162
// > @3@4@5@6
156163
// >
157164
// > It turns on watching for file modifications. Turning it on cause more CPU work, but it is more probable to catch file truncation

plugin/input/file/file_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"time"
1616

1717
"github.com/alecthomas/units"
18+
"github.com/pierrec/lz4/v4"
1819
uuid "github.com/satori/go.uuid"
1920
"github.com/stretchr/testify/assert"
2021
"github.com/stretchr/testify/require"
@@ -86,6 +87,7 @@ func pluginConfig(opts ...string) *Config {
8687
PersistenceMode: "async",
8788
OffsetsOp: op,
8889
MaintenanceInterval: "5s",
90+
RemoveAfter: "0",
8991
}
9092
test.NewConfig(config, map[string]int{"gomaxprocs": runtime.GOMAXPROCS(0)})
9193

@@ -172,6 +174,16 @@ func createTempFile() string {
172174
return file.Name()
173175
}
174176

177+
func createTempLZ4File() string {
178+
u := uuid.NewV4().String()
179+
file, err := os.Create(path.Join(filesDir, u+".lz4"))
180+
if err != nil {
181+
panic(err.Error())
182+
}
183+
184+
return file.Name()
185+
}
186+
175187
func createOffsetFile() string {
176188
file, err := os.Create(path.Join(offsetsDir, offsetsFile))
177189
if err != nil {
@@ -237,6 +249,27 @@ func addString(file string, str string, isLine bool, doSync bool) {
237249
}
238250
}
239251

252+
func addEventsToLZ4File(file string, events []string) error {
253+
outputFile, err := os.Create(file)
254+
if err != nil {
255+
return err
256+
}
257+
defer outputFile.Close() // Ensure file descriptor is closed
258+
259+
lz4Writer := lz4.NewWriter(outputFile)
260+
defer lz4Writer.Close() // Ensure LZ4 writer is closed
261+
262+
// Join events with newlines and write
263+
data := strings.Join(events, "\n") + "\n"
264+
_, err = lz4Writer.Write([]byte(data))
265+
if err != nil {
266+
return err
267+
}
268+
269+
// Sync to disk before closing
270+
return outputFile.Sync()
271+
}
272+
240273
func addLines(file string, from int, to int) int {
241274
f, err := os.OpenFile(file, os.O_APPEND|os.O_WRONLY, perm)
242275
if err != nil {
@@ -488,6 +521,46 @@ func TestReadContinue(t *testing.T) {
488521
}, blockSize+blockSize-processed, "dirty")
489522
}
490523

524+
// TestReadCompressed test if works of compressed file
525+
func TestReadCompressed(t *testing.T) {
526+
eventCount := 5
527+
events := make([]string, 0)
528+
529+
run(&test.Case{
530+
Prepare: func() {
531+
for i := 0; i < eventCount; i++ {
532+
events = append(events, fmt.Sprintf(`{"field":"value_%d"}`, i))
533+
}
534+
},
535+
Act: func(p *pipeline.Pipeline) {
536+
file := createTempLZ4File()
537+
addEventsToLZ4File(file, events)
538+
},
539+
Assert: func(p *pipeline.Pipeline) {
540+
assert.Equal(t, eventCount, p.GetEventsTotal(), "wrong event count")
541+
for i, s := range events {
542+
assert.Equal(t, s, p.GetEventLogItem(i), "wrong event")
543+
}
544+
},
545+
}, eventCount)
546+
}
547+
548+
// TestReadCompressedEmpty test if works of empty compressed file
549+
func TestReadCompressedEmpty(t *testing.T) {
550+
eventCount := 0
551+
552+
run(&test.Case{
553+
Prepare: func() {
554+
},
555+
Act: func(p *pipeline.Pipeline) {
556+
createTempLZ4File()
557+
},
558+
Assert: func(p *pipeline.Pipeline) {
559+
assert.Equal(t, eventCount, p.GetEventsTotal(), "wrong event count")
560+
},
561+
}, eventCount)
562+
}
563+
491564
// TestOffsetsSaveSimple tests if offsets saving works right in the simple case
492565
func TestOffsetsSaveSimple(t *testing.T) {
493566
eventCount := 5

plugin/input/file/offset.go

Lines changed: 59 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/ozontech/file.d/logger"
1212
"github.com/ozontech/file.d/pipeline"
13+
"github.com/ozontech/file.d/xtime"
1314
"go.uber.org/atomic"
1415
)
1516

@@ -24,9 +25,10 @@ type offsetDB struct {
2425
}
2526

2627
type inodeOffsets struct {
27-
filename string
28-
sourceID pipeline.SourceID
29-
streams map[pipeline.StreamName]int64
28+
filename string
29+
sourceID pipeline.SourceID
30+
streams map[pipeline.StreamName]int64
31+
lastReadTimestamp int64
3032
}
3133

3234
type (
@@ -88,6 +90,7 @@ func (o *offsetDB) parseOne(content string, offsets fpOffsets) (string, error) {
8890
filename := ""
8991
inodeStr := ""
9092
sourceIDStr := ""
93+
lastReadTimestampStr := ""
9194
var err error
9295

9396
filename, content, err = o.parseLine(content, "- file: ")
@@ -103,6 +106,11 @@ func (o *offsetDB) parseOne(content string, offsets fpOffsets) (string, error) {
103106
return "", fmt.Errorf("can't parse source_id: %w", err)
104107
}
105108

109+
lastReadTimestampStr, content, err = o.parseOptionalLine(content, " last_read_timestamp: ")
110+
if err != nil {
111+
return "", fmt.Errorf("can't parse last_read_timestamp: %w", err)
112+
}
113+
106114
sysInode, err := strconv.ParseUint(inodeStr, 10, 64)
107115
if err != nil {
108116
return "", fmt.Errorf("wrong offsets format, can't parse inode: %s: %w", inodeStr, err)
@@ -120,10 +128,21 @@ func (o *offsetDB) parseOne(content string, offsets fpOffsets) (string, error) {
120128
return "", fmt.Errorf("wrong offsets format, duplicate inode %d", inode)
121129
}
122130

131+
var lastReadTimestampVal int64
132+
if lastReadTimestampStr != "" {
133+
lastReadTimestampVal, err = strconv.ParseInt(lastReadTimestampStr, 10, 64)
134+
if err != nil {
135+
return "", fmt.Errorf("invalid timestamp format %q: %w", lastReadTimestampStr, err)
136+
}
137+
} else {
138+
lastReadTimestampVal = xtime.GetInaccurateUnixNano()
139+
}
140+
123141
offsets[fp] = &inodeOffsets{
124-
streams: make(map[pipeline.StreamName]int64),
125-
filename: filename,
126-
sourceID: fp,
142+
streams: make(map[pipeline.StreamName]int64),
143+
filename: filename,
144+
sourceID: fp,
145+
lastReadTimestamp: lastReadTimestampVal,
127146
}
128147

129148
return o.parseStreams(content, offsets[fp].streams)
@@ -172,21 +191,43 @@ func (o *offsetDB) parseStreams(content string, streams streamsOffsets) (string,
172191
return content, nil
173192
}
174193

175-
func (o *offsetDB) parseLine(content string, start string) (string, string, error) {
176-
l := len(start)
194+
func (o *offsetDB) parseLine(content string, prefix string) (string, string, error) {
195+
if content == "" {
196+
return "", "", fmt.Errorf("unexpected end of content while looking for %q", prefix)
197+
}
177198

178199
linePos := strings.IndexByte(content, '\n')
179200
if linePos < 0 {
180-
return "", "", fmt.Errorf("wrong offsets format, no nl: %q", content)
201+
return "", "", fmt.Errorf("no newline found in content")
202+
}
203+
204+
line := content[:linePos]
205+
remaining := content[linePos+1:]
206+
207+
if len(line) < len(prefix) || line[:len(prefix)] != prefix {
208+
return "", "", fmt.Errorf("expected prefix %q, got %q", prefix, safeSubstring(line, len(prefix)))
209+
}
210+
211+
return line[len(prefix):], remaining, nil
212+
}
213+
214+
func (o *offsetDB) parseOptionalLine(content string, prefix string) (string, string, error) {
215+
if content == "" {
216+
return "", content, nil // No content, return empty value
181217
}
182-
line := content[0:linePos]
183218

184-
content = content[linePos+1:]
185-
if linePos < l || line[0:l] != start {
186-
return "", "", fmt.Errorf("wrong offsets file format expected=%q, got=%q", start, line[0:l])
219+
if len(content) >= len(prefix) && content[:len(prefix)] == prefix {
220+
return o.parseLine(content, prefix)
187221
}
188222

189-
return line[l:], content, nil
223+
return "", content, nil
224+
}
225+
226+
func safeSubstring(s string, length int) string {
227+
if len(s) < length {
228+
return s
229+
}
230+
return s[:length]
190231
}
191232

192233
func (o *offsetDB) save(jobs map[pipeline.SourceID]*Job, mu *sync.RWMutex) {
@@ -234,6 +275,10 @@ func (o *offsetDB) save(jobs map[pipeline.SourceID]*Job, mu *sync.RWMutex) {
234275
o.buf = strconv.AppendUint(o.buf, uint64(job.sourceID), 10)
235276
o.buf = append(o.buf, '\n')
236277

278+
o.buf = append(o.buf, " last_read_timestamp: "...)
279+
o.buf = strconv.AppendInt(o.buf, job.eofReadInfo.getUnixNanoTimestamp(), 10)
280+
o.buf = append(o.buf, '\n')
281+
237282
o.buf = append(o.buf, " streams:\n"...)
238283
for _, strOff := range job.offsets {
239284
o.buf = append(o.buf, " "...)

0 commit comments

Comments
 (0)