Skip to content

Commit 21868d8

Browse files
committed
decouling of the AI indexer from dlockss and extracting the code to a different repo (cid-indexer-ipfs)
1 parent 153abba commit 21868d8

25 files changed

Lines changed: 530 additions & 2182 deletions

cmd/dlockss-monitor/main.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,7 @@ func main() {
5353
if geoDBPath == "" {
5454
geoDBPath = os.Getenv("DLOCKSS_MONITOR_GEOIP_DB")
5555
}
56-
saiaAPIKey := os.Getenv("SAIA_API_KEY")
57-
58-
m := monitor.NewMonitor(cfg, geoDBPath, saiaAPIKey)
56+
m := monitor.NewMonitor(cfg, geoDBPath)
5957
defer m.Close()
6058

6159
h, err := monitor.StartLibP2P(ctx, m)

docs/DLOCKSS_PROTOCOL.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,14 +247,13 @@ services:
247247
depends_on: [ipfs]
248248
volumes:
249249
- ipfs-data:/ipfs-repo:ro
250-
- dlockss-data:/data
250+
- ./dlockss-files:/data
251251
environment:
252252
DLOCKSS_IPFS_CONFIG: /ipfs-repo/config
253253
DLOCKSS_IPFS_NODE: /dns4/ipfs/tcp/5001
254254
DLOCKSS_DATA_DIR: /data/ingest
255255
volumes:
256256
ipfs-data:
257-
dlockss-data:
258257
```
259258
260259
**Legacy migration:** If a `dlockss.key` exists in the working directory but not at the configured identity path (e.g. after upgrading), it is automatically copied to the new location so the node retains its Peer ID.

internal/config/config.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,21 @@ func getEnvBool(key string, defaultValue bool) bool {
6161
return defaultValue
6262
}
6363

64+
func getEnvStringSlice(key string) []string {
65+
if value := os.Getenv(key); value != "" {
66+
parts := strings.Split(value, ",")
67+
var result []string
68+
for _, p := range parts {
69+
p = strings.TrimSpace(p)
70+
if p != "" {
71+
result = append(result, p)
72+
}
73+
}
74+
return result
75+
}
76+
return nil
77+
}
78+
6479
func clusterStorePath(dataDir string) string {
6580
if p := os.Getenv("DLOCKSS_CLUSTER_STORE"); p != "" {
6681
return p
@@ -79,10 +94,15 @@ func nodeNamePath(dataDir string) string {
7994
return filepath.Join(filepath.Dir(dataDir), "node_name")
8095
}
8196

97+
// DefaultTopicName is the archive topic when none is configured.
98+
const DefaultTopicName = "creative-commons"
99+
82100
// Config holds all runtime configuration for a D-LOCKSS node.
83101
type Config struct {
84102
DiscoveryServiceTag string
85103
PubsubTopicPrefix string
104+
TopicName string
105+
IngestAllowlist []string
86106
FileWatchFolder string
87107
ClusterStorePath string
88108
MinReplication int
@@ -160,6 +180,8 @@ func DefaultConfig() *Config {
160180
return &Config{
161181
DiscoveryServiceTag: "dlockss-prod",
162182
PubsubTopicPrefix: DefaultPubsubVersion,
183+
TopicName: DefaultTopicName,
184+
IngestAllowlist: nil,
163185
FileWatchFolder: dataDir,
164186
ClusterStorePath: filepath.Join(filepath.Dir(dataDir), "cluster_store"),
165187
MinReplication: 5,
@@ -238,6 +260,8 @@ func LoadFromEnv() *Config {
238260
return &Config{
239261
DiscoveryServiceTag: getEnvString("DLOCKSS_DISCOVERY_TAG", "dlockss-prod"),
240262
PubsubTopicPrefix: getEnvString("DLOCKSS_PUBSUB_TOPIC_PREFIX", DefaultPubsubVersion),
263+
TopicName: getEnvString("DLOCKSS_TOPIC_NAME", DefaultTopicName),
264+
IngestAllowlist: getEnvStringSlice("DLOCKSS_INGEST_ALLOWLIST"),
241265
FileWatchFolder: dataDir,
242266
ClusterStorePath: clusterStorePath(dataDir),
243267
MinReplication: getEnvInt("DLOCKSS_MIN_REPLICATION", 5),
@@ -372,9 +396,15 @@ func (c *Config) ValidatePathSafetyCheck() string {
372396
func (c *Config) Log() {
373397
c.Validate()
374398

399+
ingestMode := "open"
400+
if len(c.IngestAllowlist) > 0 {
401+
ingestMode = fmt.Sprintf("allowlist (%d peers)", len(c.IngestAllowlist))
402+
}
375403
slog.Info("config: network",
376404
"discovery_tag", c.DiscoveryServiceTag,
377405
"pubsub_prefix", c.PubsubTopicPrefix,
406+
"topic_name", c.TopicName,
407+
"ingest_mode", ingestMode,
378408
"ipfs_node", c.IPFSNodeAddress,
379409
"api_port", c.APIPort,
380410
"bootstrap_timeout", c.BootstrapTimeout,

internal/fileops/fileops.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type ShardIdentity interface {
3131
GetShardInfo() (string, int)
3232
AnnouncePinned(manifestCID string)
3333
AmIResponsibleFor(key string) bool
34+
IsLocalNodeIngestor() bool
3435
}
3536

3637
// ShardPublisher handles ingest publishing and cluster pinning within the node's own shard.

internal/fileops/fileops_process.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ func (fp *FileProcessor) validateFilePath(path string) bool {
4545
func (fp *FileProcessor) processNewFile(path string) {
4646
slog.Info("processing file", "path", path)
4747

48+
if !fp.shardMgr.IsLocalNodeIngestor() {
49+
slog.Warn("node is not an authorized ingestor, skipping file", "path", path)
50+
return
51+
}
52+
4853
if !fp.validateFilePath(path) {
4954
slog.Warn("file validation failed", "path", path)
5055
return

internal/fileops/fileops_test.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ func (m *mockIPFS) IsPinned(context.Context, cid.Cid) (bool, error) { return f
5151
func (m *mockIPFS) GetFileSize(context.Context, cid.Cid) (uint64, error) {
5252
return 0, nil
5353
}
54-
func (m *mockIPFS) GetPeerID(context.Context) (string, error) { return "test-peer", nil }
55-
func (m *mockIPFS) SwarmConnect(context.Context, []string) error { return nil }
54+
func (m *mockIPFS) GetPeerID(context.Context) (string, error) { return "test-peer", nil }
55+
func (m *mockIPFS) SwarmConnect(context.Context, []string) error { return nil }
5656

5757
// ---------------------------------------------------------------------------
5858
// Mock: ShardCoordinator (ShardIdentity + ShardPublisher + CustodialInjector)
@@ -64,15 +64,16 @@ type mockShardCoordinator struct {
6464
shardDepth int
6565
responsible bool
6666

67-
mu sync.Mutex
68-
announced []string
69-
pinned []cid.Cid
70-
publishedTo []string
67+
mu sync.Mutex
68+
announced []string
69+
pinned []cid.Cid
70+
publishedTo []string
7171
}
7272

73-
func (m *mockShardCoordinator) PeerID() peer.ID { return m.peerID }
74-
func (m *mockShardCoordinator) GetShardInfo() (string, int) { return m.shardID, m.shardDepth }
75-
func (m *mockShardCoordinator) AmIResponsibleFor(string) bool { return m.responsible }
73+
func (m *mockShardCoordinator) PeerID() peer.ID { return m.peerID }
74+
func (m *mockShardCoordinator) GetShardInfo() (string, int) { return m.shardID, m.shardDepth }
75+
func (m *mockShardCoordinator) AmIResponsibleFor(string) bool { return m.responsible }
76+
func (m *mockShardCoordinator) IsLocalNodeIngestor() bool { return true }
7677

7778
func (m *mockShardCoordinator) AnnouncePinned(manifestCID string) {
7879
m.mu.Lock()
@@ -96,11 +97,11 @@ func (m *mockShardCoordinator) PublishIngestMessageToCurrentAndChildIfSplit(data
9697
func (m *mockShardCoordinator) ResolveTargetShardForCustodial(nominal, _ string) string {
9798
return nominal
9899
}
99-
func (m *mockShardCoordinator) JoinShardAsObserver(string) bool { return true }
100-
func (m *mockShardCoordinator) LeaveShardAsObserver(string) {}
100+
func (m *mockShardCoordinator) JoinShardAsObserver(string) bool { return true }
101+
func (m *mockShardCoordinator) LeaveShardAsObserver(string) {}
101102
func (m *mockShardCoordinator) EnsureClusterForShard(context.Context, string) error { return nil }
102103
func (m *mockShardCoordinator) PinToShard(context.Context, string, cid.Cid) error { return nil }
103-
func (m *mockShardCoordinator) PublishToShardCBOR([]byte, string) {}
104+
func (m *mockShardCoordinator) PublishToShardCBOR([]byte, string) {}
104105

105106
// ---------------------------------------------------------------------------
106107
// Mock: StorageTracker

0 commit comments

Comments
 (0)