|
| 1 | +// Copyright 2026 Redpanda Data, Inc. |
| 2 | +// |
| 3 | +// Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +// you may not use this file except in compliance with the License. |
| 5 | +// You may obtain a copy of the License at |
| 6 | +// |
| 7 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +// |
| 9 | +// Unless required by applicable law or agreed to in writing, software |
| 10 | +// distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +// See the License for the specific language governing permissions and |
| 13 | +// limitations under the License. |
| 14 | + |
| 15 | +package io |
| 16 | + |
| 17 | +import ( |
| 18 | + "bytes" |
| 19 | + "context" |
| 20 | + "errors" |
| 21 | + "fmt" |
| 22 | + "io" |
| 23 | + "io/fs" |
| 24 | + "os" |
| 25 | + "path/filepath" |
| 26 | + "runtime" |
| 27 | + "strings" |
| 28 | + "sync" |
| 29 | + |
| 30 | + "github.com/redpanda-data/benthos/v4/public/service" |
| 31 | +) |
| 32 | + |
| 33 | +const ( |
| 34 | + foFieldPath = "path" |
| 35 | + foFieldCodec = "codec" |
| 36 | +) |
| 37 | + |
| 38 | +func fileOutputSpec() *service.ConfigSpec { |
| 39 | + return service.NewConfigSpec(). |
| 40 | + Stable(). |
| 41 | + Categories("Local"). |
| 42 | + Summary(`Writes messages to files on disk based on a chosen codec.`). |
| 43 | + Description(`Messages can be written to different files by using xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions] in the path field. However, only one file is ever open at a given time, and therefore when the path changes the previously open file is closed.`). |
| 44 | + Fields( |
| 45 | + service.NewInterpolatedStringField(foFieldPath). |
| 46 | + Description("The file to write to, if the file does not yet exist it will be created."). |
| 47 | + Examples( |
| 48 | + "/tmp/data.txt", |
| 49 | + "/tmp/${! timestamp_unix() }.txt", |
| 50 | + `/tmp/${! json("document.id") }.json`, |
| 51 | + ). |
| 52 | + Version("3.33.0"), |
| 53 | + service.NewStringAnnotatedEnumField(foFieldCodec, map[string]string{ |
| 54 | + "all-bytes": "Only applicable to file based outputs. Writes each message to a file in full, if the file already exists the old content is deleted.", |
| 55 | + "append": "Append each message to the output stream without any delimiter or special encoding.", |
| 56 | + "lines": "Append each message to the output stream followed by a line break.", |
| 57 | + "delim:x": "Append each message to the output stream followed by a custom delimiter.", |
| 58 | + }). |
| 59 | + Description("The way in which the bytes of messages should be written out into the output data stream. It's possible to write lines using a custom delimiter with the `delim:x` codec, where x is the character sequence custom delimiter."). |
| 60 | + LintRule(""). |
| 61 | + Examples("lines", "delim:\t", "delim:foobar"). |
| 62 | + Default("lines"). |
| 63 | + Version("3.33.0"), |
| 64 | + ) |
| 65 | +} |
| 66 | + |
| 67 | +func init() { |
| 68 | + service.MustRegisterOutput("file", fileOutputSpec(), |
| 69 | + func(pConf *service.ParsedConfig, res *service.Resources) (out service.Output, mif int, err error) { |
| 70 | + var conf fileOutputConfig |
| 71 | + if conf, err = fileOutputConfigFromParsed(pConf); err != nil { |
| 72 | + return |
| 73 | + } |
| 74 | + mif = 1 |
| 75 | + out, err = newFileWriter(conf.Path, conf.Codec, res) |
| 76 | + return |
| 77 | + }) |
| 78 | +} |
| 79 | + |
| 80 | +//------------------------------------------------------------------------------ |
| 81 | + |
| 82 | +type fileOutputConfig struct { |
| 83 | + Path *service.InterpolatedString |
| 84 | + Codec string |
| 85 | +} |
| 86 | + |
| 87 | +func fileOutputConfigFromParsed(pConf *service.ParsedConfig) (conf fileOutputConfig, err error) { |
| 88 | + if conf.Path, err = pConf.FieldInterpolatedString(foFieldPath); err != nil { |
| 89 | + return |
| 90 | + } |
| 91 | + if conf.Codec, err = pConf.FieldString(foFieldCodec); err != nil { |
| 92 | + return |
| 93 | + } |
| 94 | + return |
| 95 | +} |
| 96 | + |
| 97 | +//------------------------------------------------------------------------------ |
| 98 | + |
| 99 | +type suffixFn func(data []byte) ([]byte, bool) |
| 100 | + |
| 101 | +func getWriterCodec(codec string) (sFn suffixFn, appendMode bool, err error) { |
| 102 | + switch codec { |
| 103 | + case "all-bytes": |
| 104 | + return func([]byte) ([]byte, bool) { return nil, false }, false, nil |
| 105 | + case "append": |
| 106 | + return customDelimSuffix(""), true, nil |
| 107 | + case "lines": |
| 108 | + return customDelimSuffix("\n"), true, nil |
| 109 | + } |
| 110 | + if after, ok := strings.CutPrefix(codec, "delim:"); ok { |
| 111 | + if after == "" { |
| 112 | + return nil, false, errors.New("custom delimiter codec requires a non-empty delimiter") |
| 113 | + } |
| 114 | + return customDelimSuffix(after), true, nil |
| 115 | + } |
| 116 | + return nil, false, fmt.Errorf("codec was not recognised: %v", codec) |
| 117 | +} |
| 118 | + |
| 119 | +func customDelimSuffix(suffix string) suffixFn { |
| 120 | + suffixB := []byte(suffix) |
| 121 | + return func(data []byte) ([]byte, bool) { |
| 122 | + if len(suffixB) == 0 { |
| 123 | + return nil, false |
| 124 | + } |
| 125 | + if !bytes.HasSuffix(data, suffixB) { |
| 126 | + return suffixB, true |
| 127 | + } |
| 128 | + return nil, false |
| 129 | + } |
| 130 | +} |
| 131 | + |
| 132 | +//------------------------------------------------------------------------------ |
| 133 | + |
| 134 | +type fileWriter struct { |
| 135 | + log *service.Logger |
| 136 | + nm *service.Resources |
| 137 | + |
| 138 | + path *service.InterpolatedString |
| 139 | + suffixFn suffixFn |
| 140 | + appendMode bool |
| 141 | + |
| 142 | + handleMut sync.Mutex |
| 143 | + handlePath string |
| 144 | + handle io.WriteCloser |
| 145 | +} |
| 146 | + |
| 147 | +func newFileWriter(path *service.InterpolatedString, codecStr string, mgr *service.Resources) (*fileWriter, error) { |
| 148 | + codec, appendMode, err := getWriterCodec(codecStr) |
| 149 | + if err != nil { |
| 150 | + return nil, err |
| 151 | + } |
| 152 | + return &fileWriter{ |
| 153 | + suffixFn: codec, |
| 154 | + appendMode: appendMode, |
| 155 | + path: path, |
| 156 | + log: mgr.Logger(), |
| 157 | + nm: mgr, |
| 158 | + }, nil |
| 159 | +} |
| 160 | + |
| 161 | +func (*fileWriter) Connect(_ context.Context) error { |
| 162 | + return nil |
| 163 | +} |
| 164 | + |
| 165 | +func (w *fileWriter) writeTo(wtr io.Writer, p *service.Message) error { |
| 166 | + mBytes, err := p.AsBytes() |
| 167 | + if err != nil { |
| 168 | + return err |
| 169 | + } |
| 170 | + |
| 171 | + suffix, addSuffix := w.suffixFn(mBytes) |
| 172 | + |
| 173 | + if _, err := wtr.Write(mBytes); err != nil { |
| 174 | + return err |
| 175 | + } |
| 176 | + if addSuffix { |
| 177 | + if _, err := wtr.Write(suffix); err != nil { |
| 178 | + return err |
| 179 | + } |
| 180 | + } |
| 181 | + return nil |
| 182 | +} |
| 183 | + |
| 184 | +func (w *fileWriter) Write(_ context.Context, msg *service.Message) error { |
| 185 | + path, err := w.path.TryString(msg) |
| 186 | + if err != nil { |
| 187 | + return fmt.Errorf("path interpolation error: %w", err) |
| 188 | + } |
| 189 | + path = filepath.Clean(path) |
| 190 | + |
| 191 | + if err := validateFilePath(path, runtime.GOOS); err != nil { |
| 192 | + return err |
| 193 | + } |
| 194 | + |
| 195 | + w.handleMut.Lock() |
| 196 | + defer w.handleMut.Unlock() |
| 197 | + |
| 198 | + if w.handle != nil && path == w.handlePath { |
| 199 | + return w.writeTo(w.handle, msg) |
| 200 | + } |
| 201 | + if w.handle != nil { |
| 202 | + if err := w.handle.Close(); err != nil { |
| 203 | + return err |
| 204 | + } |
| 205 | + w.handle = nil |
| 206 | + } |
| 207 | + |
| 208 | + flag := os.O_CREATE | os.O_RDWR |
| 209 | + if w.appendMode { |
| 210 | + flag |= os.O_APPEND |
| 211 | + } else { |
| 212 | + flag |= os.O_TRUNC |
| 213 | + } |
| 214 | + |
| 215 | + if err := w.nm.FS().MkdirAll(filepath.Dir(path), fs.FileMode(0o777)); err != nil { |
| 216 | + return err |
| 217 | + } |
| 218 | + |
| 219 | + file, err := w.nm.FS().OpenFile(path, flag, fs.FileMode(0o666)) |
| 220 | + if err != nil { |
| 221 | + return err |
| 222 | + } |
| 223 | + |
| 224 | + handle, ok := file.(io.WriteCloser) |
| 225 | + if !ok { |
| 226 | + _ = file.Close() |
| 227 | + return errors.New("opening file for writing") |
| 228 | + } |
| 229 | + |
| 230 | + w.handlePath = path |
| 231 | + if err := w.writeTo(handle, msg); err != nil { |
| 232 | + _ = handle.Close() |
| 233 | + return err |
| 234 | + } |
| 235 | + |
| 236 | + if w.appendMode { |
| 237 | + w.handle = handle |
| 238 | + } else { |
| 239 | + _ = handle.Close() |
| 240 | + } |
| 241 | + return nil |
| 242 | +} |
| 243 | + |
| 244 | +func (w *fileWriter) Close(_ context.Context) error { |
| 245 | + w.handleMut.Lock() |
| 246 | + defer w.handleMut.Unlock() |
| 247 | + |
| 248 | + var err error |
| 249 | + if w.handle != nil { |
| 250 | + err = w.handle.Close() |
| 251 | + w.handle = nil |
| 252 | + } |
| 253 | + return err |
| 254 | +} |
| 255 | + |
| 256 | +//------------------------------------------------------------------------------ |
| 257 | + |
| 258 | +// validateFilePath checks that the base file name component of path does not |
| 259 | +// contain characters that would cause silent data loss or confusing behavior on |
| 260 | +// the target operating system. |
| 261 | +// |
| 262 | +// The goos parameter mirrors runtime.GOOS and is accepted explicitly so that |
| 263 | +// tests can exercise all platform branches regardless of the build host. |
| 264 | +// |
| 265 | +// Rules: |
| 266 | +// - All platforms: NUL bytes are rejected in any path component. |
| 267 | +// - Windows: <, >, :, ", |, ?, * and control characters 0x01–0x1F are rejected |
| 268 | +// in the base file name. The drive-letter colon (C:) is not part of the base |
| 269 | +// name and is therefore not rejected. |
| 270 | +// - macOS/Darwin: colons are rejected in the base file name because HFS+/APFS |
| 271 | +// maps ':' to '/', silently placing the file in a different directory. |
| 272 | +func validateFilePath(path, goos string) error { |
| 273 | + if strings.ContainsRune(path, '\x00') { |
| 274 | + return fmt.Errorf( |
| 275 | + "file name %q in path %q contains a NUL byte which is invalid on all platforms", |
| 276 | + crossPlatformBase(path), path, |
| 277 | + ) |
| 278 | + } |
| 279 | + |
| 280 | + base := crossPlatformBase(path) |
| 281 | + |
| 282 | + switch goos { |
| 283 | + case "windows": |
| 284 | + return validateWindowsFileName(base, path) |
| 285 | + case "darwin": |
| 286 | + return validateDarwinFileName(base, path) |
| 287 | + } |
| 288 | + return nil |
| 289 | +} |
| 290 | + |
| 291 | +// crossPlatformBase returns the last element of path, considering both forward |
| 292 | +// and back slashes as separators. This is necessary because filepath.Base only |
| 293 | +// recognises the build-host separator, but validateFilePath must correctly |
| 294 | +// extract the base name for any target OS. |
| 295 | +func crossPlatformBase(path string) string { |
| 296 | + // Find the last separator (either / or \). |
| 297 | + i := strings.LastIndexAny(path, `/\`) |
| 298 | + if i >= 0 { |
| 299 | + path = path[i+1:] |
| 300 | + } |
| 301 | + if path == "" { |
| 302 | + return "." |
| 303 | + } |
| 304 | + return path |
| 305 | +} |
| 306 | + |
| 307 | +func validateWindowsFileName(base, fullPath string) error { |
| 308 | + const badChars = `<>:"|?*` |
| 309 | + var found []string |
| 310 | + for i, r := range base { |
| 311 | + switch { |
| 312 | + case r >= '\x01' && r <= '\x1F': |
| 313 | + found = append(found, fmt.Sprintf("control character 0x%02X at position %d", r, i)) |
| 314 | + case strings.ContainsRune(badChars, r): |
| 315 | + found = append(found, fmt.Sprintf("%q at position %d", string(r), i)) |
| 316 | + } |
| 317 | + } |
| 318 | + if len(found) > 0 { |
| 319 | + return fmt.Errorf( |
| 320 | + "file name %q in path %q contains characters invalid on windows: %s", |
| 321 | + base, fullPath, strings.Join(found, ", "), |
| 322 | + ) |
| 323 | + } |
| 324 | + return nil |
| 325 | +} |
| 326 | + |
| 327 | +func validateDarwinFileName(base, fullPath string) error { |
| 328 | + if strings.ContainsRune(base, ':') { |
| 329 | + return fmt.Errorf( |
| 330 | + "file name %q in path %q contains a colon which is invalid on macOS "+ |
| 331 | + "(HFS+/APFS maps ':' to '/', creating a file in the wrong directory)", |
| 332 | + base, fullPath, |
| 333 | + ) |
| 334 | + } |
| 335 | + return nil |
| 336 | +} |
0 commit comments