Skip to content

Commit 45f260e

Browse files
committed
unified labels and name consistancy
1 parent 21868d8 commit 45f260e

7 files changed

Lines changed: 67 additions & 103 deletions

File tree

internal/managers/shard/shard_merge.go

Lines changed: 0 additions & 1 deletion
This file was deleted.

internal/monitor/monitor_handlers.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@ import (
1414
"dlockss/pkg/schema"
1515
)
1616

17-
// writeJSONError writes a JSON {"error":"..."} response with the given status code.
17+
func writeJSON(w http.ResponseWriter, v interface{}) {
18+
w.Header().Set("Content-Type", "application/json")
19+
json.NewEncoder(w).Encode(v)
20+
}
21+
1822
func writeJSONError(w http.ResponseWriter, msg string, code int) {
1923
w.Header().Set("Content-Type", "application/json")
2024
w.WriteHeader(code)
@@ -127,7 +131,7 @@ func (m *Monitor) handleHeartbeatWithRole(ctx context.Context, senderID peer.ID,
127131
if nodeName != "" {
128132
logName = nodeName + " (" + peerIDStr + ")"
129133
}
130-
slog.Info("new node discovered via heartbeat", "peer", logName, "shard", shardLogLabel(shardID), "pinned", pinnedCount, "role", role)
134+
slog.Info("new node discovered via heartbeat", "peer", logName, "shard", shardLabel(shardID), "pinned", pinnedCount, "role", role)
131135
nodeState = &NodeState{
132136
PeerID: peerIDStr,
133137
NodeName: nodeName,
@@ -158,9 +162,7 @@ func (m *Monitor) handleHeartbeatWithRole(ctx context.Context, senderID peer.ID,
158162
}
159163
// Ignore pinned=0 during grace period: stale heartbeats can arrive
160164
// before the node finishes its first pin cycle.
161-
if now.Sub(firstSeen) < unpinGracePeriod {
162-
// nop
163-
} else {
165+
if now.Sub(firstSeen) >= unpinGracePeriod {
164166
removedFromManifests := 0
165167
for manifest, peers := range m.manifestReplication {
166168
if _, had := peers[peerIDStr]; had {
@@ -172,7 +174,7 @@ func (m *Monitor) handleHeartbeatWithRole(ctx context.Context, senderID peer.ID,
172174
}
173175
}
174176
if removedFromManifests > 0 {
175-
slog.Info("unpin all", "peer", peerIDStr, "shard", shardLogLabel(shardID), "removed_manifests", removedFromManifests)
177+
slog.Info("unpin all", "peer", peerIDStr, "shard", shardLabel(shardID), "removed_manifests", removedFromManifests)
176178
}
177179
}
178180
} else {
@@ -246,12 +248,12 @@ func (m *Monitor) updateNodeShardLocked(node *NodeState, newShard string, timest
246248
}
247249

248250
m.treeDirty = true
249-
slog.Info("shard move", "peer", node.PeerID, "from", shardLogLabel(lastShard), "to", shardLogLabel(newShard))
251+
slog.Info("shard move", "peer", node.PeerID, "from", shardLabel(lastShard), "to", shardLabel(newShard))
250252

251253
isSplit := len(newShard) > len(lastShard) && strings.HasPrefix(newShard, lastShard)
252254
if isSplit {
253255
if !m.hasSplitEvent(lastShard, newShard) {
254-
slog.Info("detected shard split", "parent", shardLogLabel(lastShard), "child", newShard, "peer", node.PeerID)
256+
slog.Info("detected shard split", "parent", shardLabel(lastShard), "child", newShard, "peer", node.PeerID)
255257
}
256258
m.lastSplitTime = timestamp
257259
m.splitEvents = append(m.splitEvents, ShardSplitEvent{
@@ -298,7 +300,7 @@ func (m *Monitor) updateNodeShardLocked(node *NodeState, newShard string, timest
298300
}
299301
}
300302
if removed > 0 {
301-
slog.Info("shard move removed peer from manifests", "peer", peerIDStr, "removed_manifests", removed, "shard", shardLogLabel(newShard))
303+
slog.Info("shard move removed peer from manifests", "peer", peerIDStr, "removed_manifests", removed, "shard", shardLabel(newShard))
302304
}
303305
if isSiblingShard(lastShard, newShard) {
304306
m.peerLastSiblingMove[peerIDStr] = siblingMoveRecord{from: lastShard, to: newShard, when: timestamp}

internal/monitor/monitor_models.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func (n *NodeState) EffectiveShard() string {
143143
return ""
144144
}
145145

146-
func shardLogLabel(shardID string) string {
146+
func shardLabel(shardID string) string {
147147
if shardID == "" {
148148
return "root"
149149
}

internal/monitor/monitor_replication.go

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,6 @@ func (mr manifestResult) isAtTarget() bool {
134134
return mr.count >= minRep && mr.count <= mr.maxRep
135135
}
136136

137-
// --- Public methods using the shared helpers ---
138-
139137
func (m *Monitor) runReplicationCleanup() {
140138
ticker := time.NewTicker(ReplicationCleanupEvery)
141139
defer ticker.Stop()
@@ -147,15 +145,15 @@ func (m *Monitor) runReplicationCleanup() {
147145
}
148146
m.mu.Lock()
149147
cutoff := time.Now().Add(-ReplicationAnnounceTTL)
150-
for manifest, peers := range m.manifestReplication {
148+
for cid, peers := range m.manifestReplication {
151149
for peerID, lastSeen := range peers {
152150
if lastSeen.Before(cutoff) {
153151
delete(peers, peerID)
154152
}
155153
}
156154
if len(peers) == 0 {
157-
delete(m.manifestReplication, manifest)
158-
delete(m.manifestShard, manifest)
155+
delete(m.manifestReplication, cid)
156+
delete(m.manifestShard, cid)
159157
}
160158
}
161159
m.mu.Unlock()
@@ -178,11 +176,7 @@ func (m *Monitor) runReplicationCleanup() {
178176
for _, shard := range shardLabels {
179177
peers := membership[shard]
180178
atTarget := byShard[shard]
181-
shardLabel := shard
182-
if shardLabel == "" {
183-
shardLabel = "(root)"
184-
}
185-
fmt.Fprintf(&b, " %s: %d nodes [%s] %d files at target;", shardLabel, len(peers), strings.Join(peers, ","), atTarget)
179+
fmt.Fprintf(&b, " %s: %d nodes [%s] %d files at target;", shardLabel(shard), len(peers), strings.Join(peers, ","), atTarget)
186180
}
187181
slog.Info("replication snapshot", "total_nodes", totalNodes, "total_manifests", totalFiles, "total_at_target", filesAtTarget, "avg_replication", fmt.Sprintf("%.2f", avgLevel), "shards", strings.TrimSpace(b.String()))
188182
if filesAtTarget == 0 && totalFiles > 0 && totalNodes > 0 {
@@ -277,12 +271,12 @@ func (m *Monitor) getReplicationStats() (distribution [11]int, avgLevel float64,
277271
rs := m.newReplicationSnapshotUnlocked()
278272
var totalReplication, manifestCount int
279273

280-
for manifest, peers := range m.manifestReplication {
274+
for cid, peers := range m.manifestReplication {
281275
if len(peers) == 0 {
282276
continue
283277
}
284278
shardCounts := rs.buildShardCounts(peers)
285-
mr := rs.resolveManifest(manifest, peers, shardCounts)
279+
mr := rs.resolveManifest(cid, peers, shardCounts)
286280
if mr.count == 0 {
287281
continue
288282
}
@@ -313,22 +307,18 @@ func (m *Monitor) getReplicationCIDsByLevel(level int) []CIDEntry {
313307
rs := m.newReplicationSnapshotUnlocked()
314308
var result []CIDEntry
315309

316-
for manifest, peers := range m.manifestReplication {
310+
for cid, peers := range m.manifestReplication {
317311
if len(peers) == 0 {
318312
continue
319313
}
320314
shardCounts := rs.buildShardCounts(peers)
321-
mr := rs.resolveManifest(manifest, peers, shardCounts)
315+
mr := rs.resolveManifest(cid, peers, shardCounts)
322316
if mr.count == 0 {
323317
continue
324318
}
325319
matches := (level == 10 && mr.count >= 10) || (level < 10 && mr.count == level)
326320
if matches {
327-
shardLabel := m.manifestShard[manifest]
328-
if shardLabel == "" {
329-
shardLabel = "root"
330-
}
331-
result = append(result, CIDEntry{CID: manifest, Shard: shardLabel, Replicas: mr.count})
321+
result = append(result, CIDEntry{CID: cid, Shard: shardLabel(m.manifestShard[cid]), Replicas: mr.count})
332322
}
333323
}
334324
sort.Slice(result, func(i, j int) bool { return result[i].CID < result[j].CID })
@@ -347,19 +337,19 @@ func (m *Monitor) getReplicationByShard() map[string]int {
347337
peers map[string]time.Time
348338
}
349339
manifests := make(map[string]manifestInfo, len(m.manifestReplication))
350-
for manifest, peers := range m.manifestReplication {
340+
for cid, peers := range m.manifestReplication {
351341
if len(peers) == 0 {
352342
continue
353343
}
354-
manifests[manifest] = manifestInfo{
344+
manifests[cid] = manifestInfo{
355345
shardCounts: rs.buildShardCounts(peers),
356346
peers: peers,
357347
}
358348
}
359349

360350
filesAtTargetPerShard := make(map[string]int)
361-
for manifest, info := range manifests {
362-
mr := rs.resolveManifest(manifest, info.peers, info.shardCounts)
351+
for cid, info := range manifests {
352+
mr := rs.resolveManifest(cid, info.peers, info.shardCounts)
363353
if mr.count == 0 || mr.maxRep == 0 {
364354
continue
365355
}

internal/monitor/monitor_routes.go

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -132,14 +132,12 @@ func (m *Monitor) handleNodes(w http.ResponseWriter, r *http.Request) {
132132
strings.Contains(strings.ToLower(node.NodeName), query)
133133
})
134134
response := m.buildNodeResponse(snapshot)
135-
w.Header().Set("Content-Type", "application/json")
136-
json.NewEncoder(w).Encode(response)
135+
writeJSON(w, response)
137136
}
138137

139138
func (m *Monitor) handleShardTree(w http.ResponseWriter, r *http.Request) {
140139
tree := m.GetShardTree()
141-
w.Header().Set("Content-Type", "application/json")
142-
json.NewEncoder(w).Encode(tree)
140+
writeJSON(w, tree)
143141
}
144142

145143
func (m *Monitor) handleShardNodes(w http.ResponseWriter, r *http.Request) {
@@ -148,16 +146,10 @@ func (m *Monitor) handleShardNodes(w http.ResponseWriter, r *http.Request) {
148146
return shard == shardFilter
149147
})
150148
response := m.buildNodeResponse(snapshot)
151-
shardLabel := shardFilter
152-
if shardLabel == "" {
153-
shardLabel = "root"
154-
}
155-
w.Header().Set("Content-Type", "application/json")
156-
json.NewEncoder(w).Encode(map[string]interface{}{"shard_id": shardFilter, "shard_label": shardLabel, "nodes": response, "count": len(response)})
149+
writeJSON(w, map[string]interface{}{"shard_id": shardFilter, "shard_label": shardLabel(shardFilter), "nodes": response, "count": len(response)})
157150
}
158151

159152
func (m *Monitor) handleRootTopic(w http.ResponseWriter, r *http.Request) {
160-
w.Header().Set("Content-Type", "application/json")
161153
if r.Method == http.MethodPost {
162154
var body struct {
163155
TopicPrefix string `json:"topic_prefix,omitempty"`
@@ -183,7 +175,7 @@ func (m *Monitor) writeRootTopicResponse(w http.ResponseWriter) {
183175
prefix := m.getTopicPrefix()
184176
topic := m.getTopicName()
185177
rootTopic := fmt.Sprintf("%s-%s-shard-", prefix, topic)
186-
json.NewEncoder(w).Encode(map[string]string{
178+
writeJSON(w, map[string]string{
187179
"root_topic": rootTopic,
188180
"topic_prefix": prefix,
189181
"topic_name": topic,
@@ -204,16 +196,14 @@ func (m *Monitor) handleNodeFiles(w http.ResponseWriter, r *http.Request) {
204196
entries = []CIDEntry{}
205197
}
206198
m.mu.RUnlock()
207-
w.Header().Set("Content-Type", "application/json")
208-
json.NewEncoder(w).Encode(map[string]interface{}{"peer_id": peerID, "cids": entries, "count": len(entries)})
199+
writeJSON(w, map[string]interface{}{"peer_id": peerID, "cids": entries, "count": len(entries)})
209200
}
210201

211202
func (m *Monitor) handleUniqueCIDs(w http.ResponseWriter, r *http.Request) {
212203
m.mu.RLock()
213204
entries := m.buildCIDEntriesUnlocked(m.uniqueCIDs)
214205
m.mu.RUnlock()
215-
w.Header().Set("Content-Type", "application/json")
216-
json.NewEncoder(w).Encode(map[string]interface{}{"cids": entries, "count": len(entries)})
206+
writeJSON(w, map[string]interface{}{"cids": entries, "count": len(entries)})
217207
}
218208

219209
func (m *Monitor) handleReplication(w http.ResponseWriter, r *http.Request) {
@@ -223,9 +213,8 @@ func (m *Monitor) handleReplication(w http.ResponseWriter, r *http.Request) {
223213
}
224214
dist, avg, atTarget := m.getReplicationStats()
225215
byShard := m.getReplicationByShard()
226-
w.Header().Set("Content-Type", "application/json")
227216
w.Header().Set("Cache-Control", "no-store, no-cache, must-revalidate")
228-
json.NewEncoder(w).Encode(map[string]interface{}{
217+
writeJSON(w, map[string]interface{}{
229218
"replication_distribution": dist,
230219
"avg_replication_level": avg,
231220
"files_at_target": atTarget,
@@ -246,8 +235,7 @@ func (m *Monitor) handleReplicationCIDs(w http.ResponseWriter, r *http.Request)
246235
return
247236
}
248237
entries := m.getReplicationCIDsByLevel(level)
249-
w.Header().Set("Content-Type", "application/json")
250-
json.NewEncoder(w).Encode(map[string]interface{}{"level": level, "cids": entries, "count": len(entries)})
238+
writeJSON(w, map[string]interface{}{"level": level, "cids": entries, "count": len(entries)})
251239
}
252240

253241
func (m *Monitor) handleManifestPayload(w http.ResponseWriter, r *http.Request) {
@@ -284,17 +272,16 @@ func (m *Monitor) handleManifestPayload(w http.ResponseWriter, r *http.Request)
284272
"size": ro.TotalSize,
285273
"sig": base64.StdEncoding.EncodeToString(ro.Signature),
286274
}
287-
w.Header().Set("Content-Type", "application/json")
288-
json.NewEncoder(w).Encode(map[string]interface{}{"payload_cid": ro.Payload.String(), "manifest": manifest})
275+
writeJSON(w, map[string]interface{}{"payload_cid": ro.Payload.String(), "manifest": manifest})
289276
}
290277

291278
func (m *Monitor) handleIdentify(w http.ResponseWriter, r *http.Request) {
292-
peerStr := strings.TrimSpace(r.URL.Query().Get("peer"))
293-
if peerStr == "" {
279+
peerIDStr := strings.TrimSpace(r.URL.Query().Get("peer"))
280+
if peerIDStr == "" {
294281
writeJSONError(w, "missing peer parameter", http.StatusBadRequest)
295282
return
296283
}
297-
pid, err := peer.Decode(peerStr)
284+
pid, err := peer.Decode(peerIDStr)
298285
if err != nil {
299286
writeJSONError(w, "invalid peer ID", http.StatusBadRequest)
300287
return
@@ -335,8 +322,7 @@ func (m *Monitor) handleIdentify(w http.ResponseWriter, r *http.Request) {
335322
"connected": connected,
336323
"region": region,
337324
}
338-
w.Header().Set("Content-Type", "application/json")
339-
json.NewEncoder(w).Encode(result)
325+
writeJSON(w, result)
340326
}
341327

342328
func (m *Monitor) handleDashboard(w http.ResponseWriter, r *http.Request) {
@@ -376,7 +362,7 @@ func (m *Monitor) RunStatusLogger(ctx context.Context) {
376362
sort.Strings(shardIDs)
377363
parts := make([]string, 0, len(shardIDs))
378364
for _, sid := range shardIDs {
379-
parts = append(parts, fmt.Sprintf("%s: %d", shardLogLabel(sid), shardCounts[sid]))
365+
parts = append(parts, fmt.Sprintf("%s: %d", shardLabel(sid), shardCounts[sid]))
380366
}
381367
slog.Info("status", "nodes", nodeCount, "shards", len(shardCounts), "pinned", totalPinned, "detail", strings.Join(parts, ", "))
382368
}

internal/monitor/monitor_subscription.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func (m *Monitor) ensureShardSubscriptionUnlocked(ctx context.Context, shardID s
9696
_ = topic.Publish(ctx, probeMsg)
9797
}
9898
go m.handleShardMessages(m.subCtx, sub, shardID, topicName)
99-
slog.Info("subscribed to shard topic", "shard", shardLogLabel(shardID))
99+
slog.Info("subscribed to shard topic", "shard", shardLabel(shardID))
100100
}
101101

102102
// resolveIPFromPeer extracts the best public IP address for the given peer
@@ -169,7 +169,7 @@ func (m *Monitor) handleShardMessages(ctx context.Context, sub *pubsub.Subscript
169169
if peerStr := strings.TrimSpace(string(data[6:])); peerStr != "" {
170170
if leaveID, err := decodePeerIDWithFallback(peerStr); err == nil {
171171
m.handleLeaveShard(ctx, leaveID, shardID)
172-
slog.Info("shard leave", "peer", leaveID.String(), "shard", shardLogLabel(shardID))
172+
slog.Info("shard leave", "peer", leaveID.String(), "shard", shardLabel(shardID))
173173
}
174174
}
175175

@@ -263,7 +263,7 @@ func (m *Monitor) dispatchJoin(ctx context.Context, data []byte, senderID peer.I
263263
if nodeName != "" {
264264
logLabel = nodeName + " (" + joinID.String() + ")"
265265
}
266-
slog.Info("shard join", "peer", logLabel, "shard", shardLogLabel(shardID), "role", role)
266+
slog.Info("shard join", "peer", logLabel, "shard", shardLabel(shardID), "role", role)
267267
}
268268
}
269269

0 commit comments

Comments
 (0)