-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmock_file_source.cpp
More file actions
73 lines (58 loc) · 2.22 KB
/
Copy pathmock_file_source.cpp
File metadata and controls
73 lines (58 loc) · 2.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// Copyright 2026 Davide Faconti
// SPDX-License-Identifier: Apache-2.0
#include <pj_base/sdk/data_source_patterns.hpp>
#include <string>
namespace {
class MockFileSource : public PJ::FileSourceBase {
public:
uint64_t extraCapabilities() const override {
return PJ::kCapabilityDirectIngest;
}
std::string saveConfig() const override {
return config_;
}
PJ::Status loadConfig(std::string_view config_json) override {
config_ = std::string(config_json);
return PJ::okStatus();
}
PJ::Status importData() override {
// Test-only failure injection: the host creates the dataset in the live engine
// before importData() runs, so returning an error here lets tests exercise the
// host's abandoned-dataset cleanup. config_ is opaque to this example (no JSON
// dependency), so the marker is matched by substring — safe because only tests
// set this config.
if (config_.find("fail_start") != std::string::npos) {
return PJ::unexpected("mock_file_source: configured start failure");
}
if (!runtimeHost().progressStart("Importing", 3, true)) {
return PJ::unexpected("progress unavailable");
}
auto topic = writeHost().ensureTopic("mock/file_data");
if (!topic) {
return PJ::unexpected(topic.error());
}
for (uint64_t i = 1; i <= 3; ++i) {
if (runtimeHost().isStopRequested()) {
return PJ::unexpected("import cancelled");
}
auto status = writeHost().appendRecord(
*topic, PJ::Timestamp{static_cast<int64_t>(i * 100)},
{{.name = "value", .value = static_cast<double>(i) * 10.0}});
if (!status) {
return PJ::unexpected(status.error());
}
if (!runtimeHost().progressUpdate(i)) {
return PJ::unexpected("import cancelled via progress");
}
}
runtimeHost().reportMessage(PJ::DataSourceMessageLevel::kInfo, "imported 3 records");
return PJ::okStatus();
}
private:
std::string config_ = "{}";
};
} // namespace
PJ_DATA_SOURCE_PLUGIN(
MockFileSource, R"({"id":"mock-file-source","name":"Mock File Source","version":"1.0.0",)"
R"("description":"Test FileSourceBase lifecycle and progress",)"
R"("file_extensions":[".mock"]})")