[ntuple] Add RPageSinkS3 for the basic S3 write path (Mode B)#22653
[ntuple] Add RPageSinkS3 for the basic S3 write path (Mode B)#22653JasMehta08 wants to merge 1 commit into
Conversation
jblomer
left a comment
There was a problem hiding this comment.
Very nice! Some discussion points.
| target_sources(ROOTNTuple PRIVATE src/RPageStorageS3.cxx) | ||
| target_compile_definitions(ROOTNTuple PRIVATE R__ENABLE_S3) | ||
| target_link_libraries(ROOTNTuple PRIVATE nlohmann_json::nlohmann_json) | ||
| target_link_libraries(ROOTNTuple PRIVATE RCurlHttp) |
There was a problem hiding this comment.
I think that should be a public linking, i.e. I'd rather have this as an optional dependency for the ROOT_STANDARD_LIBRARY_PACKAGE above.
| /// Object ID counter; incremented for each object written. Atomic to match the DAOS pattern and | ||
| /// prepare for a future parallel CommitSealedPageVImpl. |
There was a problem hiding this comment.
| /// Object ID counter; incremented for each object written. Atomic to match the DAOS pattern and | |
| /// prepare for a future parallel CommitSealedPageVImpl. | |
| /// Object ID counter; incremented for each object written. Atomic to prepare for | |
| /// a future parallel CommitSealedPageVImpl. |
| RNTupleAnchorS3 fAnchor; | ||
|
|
||
| /// Resolve a numeric object ID to its full HTTP URL | ||
| std::string ObjectUrl(std::uint64_t objId) const; |
There was a problem hiding this comment.
| std::string ObjectUrl(std::uint64_t objId) const; | |
| std::string MakeObjectUrl(std::uint64_t objId) const; |
| if (location.find("s3://") == 0 || location.find("s3+http://") == 0 || location.find("s3+https://") == 0) | ||
| throw RException(R__FAIL("S3 read support is not yet implemented.")); |
There was a problem hiding this comment.
I realize that we have a problem here: we cannot distinguish between an RNTuple natively stored on S3 and a ROOT file on S3. I think we have to come up with a special scheme for the first case. E.g. ntpl+s3:// (or something else if you have a better suggestion). The advantage is that we then only need to check for one scheme.
| #include <string> | ||
| #include <utility> | ||
|
|
||
| namespace { |
There was a problem hiding this comment.
Why are the usings in an anonymous namespace?
| // one PUT per object on a fresh connection. | ||
| ROOT::Internal::RCurlConnection conn(url); | ||
| conn.SetCredentialsFromEnvironment(); |
There was a problem hiding this comment.
I think we should already now reuse the connection, i.e. make the connection a class member
There was a problem hiding this comment.
Since the URL changes per object but RCurlConnection fixes its URL at construction. I think it would be better to overload SendPutReq so that it first sets the URL and then sends the PUT request. Or should I implement it in some other way?
| fAnchor.fLenHeader = length; | ||
| } | ||
|
|
||
| ROOT::RNTupleLocator ROOT::Experimental::Internal::RPageSinkS3::CommitPageImpl(ColumnHandle_t columnHandle, |
There was a problem hiding this comment.
I think this is exactly the same code for file, DAOS, and S3. I suggest to make a first, preparatory commit that moves this logic to RPagePersistentSink::CommitPage() (the only place where it is used) and to remove CommitPageImpl() altogether.
There was a problem hiding this comment.
Should I create a separate PR or just a commit in this PR before the S3 sink commit
| std::string fBaseUrl; | ||
| /// Object ID counter; incremented for each object written. Atomic to match the DAOS pattern and | ||
| /// prepare for a future parallel CommitSealedPageVImpl. | ||
| std::atomic<std::uint64_t> fObjectId{0}; |
There was a problem hiding this comment.
Actually, I don't think we would parallelize (as in: run in multiple threads) the creation of object IDs. Let's keep this a standard integer for the time being.
| // Upload the anchor LAST: once it exists at the base URL, a reader can assume the whole ntuple | ||
| // is complete. Never upload it before all other objects are in place. | ||
| const auto anchorJson = fAnchor.ToJSON(); | ||
| PutObject(fBaseUrl, reinterpret_cast<const unsigned char *>(anchorJson.data()), anchorJson.size()); |
There was a problem hiding this comment.
One thing I forgot (not for this PR): we should checksum the anchor. E.g., using some canonical json normalization. Let's discuss this in the next meeting.
| } | ||
|
|
||
| std::unique_ptr<ROOT::Internal::RPageSink> | ||
| ROOT::Experimental::Internal::RPageSinkS3::CloneAsHidden(std::string_view /*name*/, |
There was a problem hiding this comment.
That should be relatively simple to add. E.g., name could be added to the baseurl as $baseurl/name and then we return a new S3 page sink with this extended base url. @silverweed what do you think?
This Pull request:
(Is a part of the GSoC 2026 project
S3 Backend for RNTuple.)Adds
RPageSinkS3, the write path of the experimental S3 storage backend for RNTuple.Changes or fixes:
RPageSinkS3(Mode B full write path):InitImpl(header),CommitPageImpl/CommitSealedPageImpl(one S3 object per sealed page,kTypeObject64locator),StageClusterImpl,CommitClusterGroupImpl(page list), andCommitDatasetImpl(footer, then the JSON anchor written last for atomicity).Checklist: