feat: ingest logic to use FileService - BED-8317,BED-8318#2808
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThis PR refactors ingest file handling from filesystem paths to a ChangesStorage-backed ingest file lifecycle refactor
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
c0f97d8 to
7b55a77
Compare
70b7597 to
1c28d46
Compare
| } | ||
| defer sourceFile.Close() | ||
|
|
||
| scratchFile, err = os.CreateTemp(scratchDirectory, "archive-*") |
There was a problem hiding this comment.
I am a little confused why we're doing direct OS file operations here, doesn't file service have a local storage driver?
There was a problem hiding this comment.
It does, but this is supposed to be a stage earlier than the fileService, in this case spooling is done to take the users file and validate it prior to saving it to the file service. In the event that we have a file service that is not on the os, this handles the resources ensuring they are available for the fileService. Not in this scope, but maybe the upload could be shifted to a presigned url so we do not need this step at all.
7b55a77 to
c5996af
Compare
ceb50f8 to
f1acc3a
Compare
37fba2f to
aed5917
Compare
9673a51 to
48491d4
Compare
There was a problem hiding this comment.
Actionable comments posted: 9
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@cmd/api/src/api/tools/ingest.go`:
- Around line 128-151: The handler currently calls
response.WriteHeader(http.StatusOK) before any archive data is produced, so
clients can get a 200 with an empty/broken .tar.gz; change this by delaying the
success status until the archive is actually created—either build the tar+gzip
into a temporary sink (e.g., a buffer or temp file) and only set headers and
stream it after the first successful file write, or defer writing the 200 until
after s.writeRetainedFileToTar has successfully written the first entry; locate
response.WriteHeader(http.StatusOK), gzip.NewWriter, tar.NewWriter and
s.writeRetainedFileToTar to implement the chosen approach and ensure errors from
GetFile/file writes result in an error status and not a premature 200.
- Around line 128-130: The response currently sets both Content-Type and
Content-Encoding for the attachment; remove the line that sets Content-Encoding
to "gzip" (the call response.Header().Set(headers.ContentEncoding.String(),
"gzip")) so the handler that builds the attachment only sets Content-Type and
Content-Disposition (the existing
response.Header().Set(headers.ContentType.String(), ...) and
response.Header().Set(headers.ContentDisposition.String(), ...)) and does not
advertise an HTTP-level gzip encoding for the attachment body.
- Around line 197-221: The cleanup goroutine currently uses
context.WithoutCancel(request.Context()) which can hang indefinitely; replace it
with a bounded background context (e.g. ctx, cancel :=
context.WithTimeout(context.Background(), 30*time.Second)) and use that ctx for
retainedFileService.ListFiles and DeleteFile calls, and ensure you call defer
cancel() inside the goroutine so the timeout is enforced; update the cleanupCtx
variable usage and add the time import as needed, keeping the defer
s.retainedFileLock.Unlock() in place.
In `@cmd/api/src/api/v2/fileingest.go`:
- Around line 173-176: The error path uses the original request.Context() when
calling ingestFileService.DeleteFile, so if the request context is canceled the
cleanup won't run; change the delete call to use a non-cancelable short-lived
context (mirror upload.cleanupTempFile): create a ctx using
context.WithoutCancel(request.Context()) combined with a small timeout (e.g.,
context.WithTimeout(..., 5*time.Second)) and pass that ctx into
ingestFileService.DeleteFile; update imports if needed and ensure the cancel
function is deferred where applicable.
In `@cmd/api/src/daemons/datapipe/cleanup.go`:
- Around line 232-245: Normalize the storage path used for map lookup and
deletion by applying the same cleaning used elsewhere: replace uses of file.Path
with a cleaned version (logicalPath := path.Clean(file.Path)) before checking
expectedFiles and before calling ingestFileService.DeleteFile; ensure
addExpectedStoragePath and the expectedFiles map keys match that cleaned form,
and update the loop that references isExcludedStoragePath,
expectedFiles[logicalPath], and ingestFileService.DeleteFile to all use the same
logicalPath variable so lookups and deletions are consistent.
- Around line 293-307: The directory branch currently removes directories
immediately; modify the loop in cleanup.go (where cutoff is computed and entries
are iterated) to apply the same orphanMinimumAge gate used for files: call
entry.Info() and check the entry.Info().ModTime() against cutoff (as you do for
files) before invoking s.fileOps.RemoveAll(fullPath) for directories (keeping
isExpectedLocalEntry logic intact); ensure errors from entry.Info() are handled
similarly to file handling so directories younger than cutoff are skipped.
In `@cmd/api/src/services/entrypoint.go`:
- Around line 113-116: Entrypoint currently only checks that
dependencies.FileServiceResolver is non-nil but doesn't verify it can actually
provide the ingest service; update Entrypoint to preflight the resolver by
attempting to obtain the ingest service (e.g., call the resolver method used
elsewhere to produce storage.FileServiceIngest) and return an error if that call
fails or returns nil so the app fails fast on startup; apply the same preflight
check wherever CreateRuntimeDependencies expects a populated FileServiceResolver
(also update the second nil-check block referenced around lines 170-173) so
runtime ingest paths can't later 500 due to a missing storage.FileServiceIngest.
In `@cmd/api/src/services/graphify/tasks.go`:
- Around line 118-121: In ProcessIngestFile, do not early-return when
ExtractIngestFiles returns (fileData, err); instead assign the returned fileData
regardless of err and proceed into the per-file processing loop so valid members
are handled (keep ExtractIngestFiles, ProcessIngestFile and fileData references
to locate the code). Remove the "return fileData, err" early exit, optionally
log or stash the err for diagnostics, and let each fileData[i].Errors drive
skipping/failure logic in the loop; return the collected fileData and a combined
or nil error only after processing the loop.
In `@cmd/api/src/services/upload/upload.go`:
- Around line 100-113: The pipeline currently persists the raw request bytes
(teeReader := io.TeeReader(fileData, pw)) while the validator (validationFunc)
writes its normalized output to io.Discard, so the on-disk temp file can differ
from what validation accepted (e.g. UTF-16 -> UTF-8). Fix by making the
validator produce the stream that is persisted: invoke validationFunc with the
original request reader as src and the pipe writer (pw) as dst
(validationFunc(fileData, pw)) in the goroutine, then have the main writer read
from the pipe reader (pr) for fileService.WriteTempFile instead of using
io.TeeReader; remove the TeeReader usage so WriteTempFile consumes the
validator-normalized stream.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Organization UI (inherited)
Review profile: CHILL
Plan: Pro
Run ID: 059df95f-e49a-42e8-8ca3-96bc1ddc13a8
📒 Files selected for processing (24)
cmd/api/src/api/tools/ingest.gocmd/api/src/api/tools/ingest_test.gocmd/api/src/api/v2/fileingest.gocmd/api/src/api/v2/fileingest_test.gocmd/api/src/daemons/api/toolapi/api.gocmd/api/src/daemons/datapipe/cleanup.gocmd/api/src/daemons/datapipe/cleanup_internal_test.gocmd/api/src/daemons/datapipe/cleanup_test.gocmd/api/src/daemons/datapipe/datapipe_integration_test.gocmd/api/src/daemons/datapipe/delete_integration_test.gocmd/api/src/daemons/datapipe/pipeline.gocmd/api/src/daemons/datapipe/pipeline_integration_test.gocmd/api/src/services/entrypoint.gocmd/api/src/services/entrypoint_test.gocmd/api/src/services/fs/fs.gocmd/api/src/services/fs/mocks/fs.gocmd/api/src/services/graphify/graphify_integration_test.gocmd/api/src/services/graphify/ingest_storage.gocmd/api/src/services/graphify/ingest_storage_test.gocmd/api/src/services/graphify/service.gocmd/api/src/services/graphify/tasks.gocmd/api/src/services/graphify/tasks_integration_test.gocmd/api/src/services/upload/upload.gocmd/api/src/services/upload/upload_test.go
💤 Files with no reviewable changes (2)
- cmd/api/src/services/fs/mocks/fs.go
- cmd/api/src/services/fs/fs.go
b540eb4 to
70013a1
Compare
* feat: move storage to packages * fix: remove config from test * chore: pfc * chore: pfc * chore: pfc * fix: move mockgen to appropriate file name
70013a1 to
0e1e1ff
Compare
Description
This change migrates the ingestion logic to use the new FileService. In addition, the cleanup daemon was updated to be able to delete files using the new FileService.
Motivation and Context
Resolves BED-8317,BED-8318
How Has This Been Tested?
The application was started and multiple ingested files were created. The data from these files were ingested correctly. In addition new tests were added.
For the cleanup, files were added to the directories, and both with exemptions and without PruneData was run to ensure the proper files were deleted.
Screenshots (optional):
Types of changes
Checklist:
Summary by CodeRabbit
New Features
Bug Fixes & Improvements
Tests