Skip to content

Commit d6ec222

Browse files
committed
service configuration deployment done
1 parent 1a08317 commit d6ec222

9 files changed

Lines changed: 99 additions & 101 deletions

File tree

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@
44
.idea
55
logs/*
66
temp
7-
cli-config.json
7+
cli-config*.json
File renamed without changes.

config/services/fetch.yaml

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,24 @@ serviceConfigurations:
2222
orchestration:
2323
type: Deployment
2424
spec:
25-
replicas: 1
26-
selector:
27-
matchLabels:
28-
app: fetch
29-
template:
30-
metadata:
31-
labels:
25+
apiVersion: apps/v1
26+
kind: Deployment
27+
metadata:
28+
name: fetch
29+
spec:
30+
replicas: 1
31+
selector:
32+
matchLabels:
3233
app: fetch
33-
spec:
34-
containers:
35-
- name: fetch
36-
image: fetch
37-
env:
38-
- name: GET_URL
39-
value: "$(URL)"
34+
template:
35+
metadata:
36+
labels:
37+
app: fetch
38+
spec:
39+
containers:
40+
- name: fetch
41+
image: fetch
42+
imagePullPolicy: Never
43+
env:
44+
- name: GET_URL
45+
value: "$(URL)"

config/services/pacsoi.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ serviceConfigurations:
4343
containers:
4444
- name: pacsoi
4545
image: pacsoi-service
46+
imagePullPolicy: Never
4647
env:
4748
- name: SOURCES
4849
value: "$(SOURCES)"

containers/aggregator/config/service_collection.go

Lines changed: 28 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -253,24 +253,16 @@ func (collec *ServiceCollection) postService(w http.ResponseWriter, r *http.Requ
253253

254254
for pred, output := range service.Configuration.Spec.OutputMapping {
255255

256-
if output.Distribution == nil || output.Distribution.Access == nil {
257-
continue // abstract output → no endpoint
258-
}
259-
260256
externalPath := output.Distribution.Access.ExternalPath
261-
262-
// Validate (now required)
263257
if externalPath == "" {
264258
errMsg := fmt.Sprintf("externalPath is required for output %s", pred)
265259
logrus.Error(errMsg)
266260
http.Error(w, errMsg, http.StatusInternalServerError)
267261
return
268262
}
269263

270-
// Normalize and join
271264
path := util.JoinPaths(aggPath, externalPath)
272265

273-
// Prevent duplicates
274266
if seen[path] {
275267
errMsg := fmt.Sprintf("duplicate externalPath resolved to %s", path)
276268
logrus.Error(errMsg)
@@ -279,15 +271,37 @@ func (collec *ServiceCollection) postService(w http.ResponseWriter, r *http.Requ
279271
}
280272
seen[path] = true
281273

282-
// Register handler
283-
err = collec.HandleFunc(path, collec.HandleServiceOutput, []model.Scope{model.Read, model.Write})
274+
// Build forward URL at registration time, nil if abstract
275+
var forwardURL string
276+
if output.Distribution != nil && output.Distribution.Access != nil {
277+
access := output.Distribution.Access
278+
internalPath := access.InternalPath
279+
if internalPath == "" {
280+
internalPath = "/"
281+
}
282+
if !strings.HasPrefix(internalPath, "/") {
283+
internalPath = "/" + internalPath
284+
}
285+
forwardURL = fmt.Sprintf(
286+
"%s://%s.%s.svc.cluster.local:%d%s",
287+
access.Protocol,
288+
service.NamespaceID,
289+
model.Namespace,
290+
access.ServicePort,
291+
internalPath,
292+
)
293+
}
294+
295+
err = collec.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
296+
collec.HandleServiceOutput(w, r, forwardURL)
297+
}, []model.Scope{model.Read, model.Write})
284298
if err != nil {
285299
logrus.WithError(err).Errorf("Error registering handler for output %s", pred)
286300
http.Error(w, "Failed to create service from request", http.StatusInternalServerError)
287301
return
288302
}
289303

290-
logrus.Infof("Registered endpoint: %s → %s", path, pred)
304+
logrus.Infof("Registered endpoint: %s → %s", pred, path)
291305
}
292306

293307
// Return service information
@@ -328,99 +342,40 @@ func (collec *ServiceCollection) deleteService(w http.ResponseWriter, _ *http.Re
328342
}
329343

330344
// Handles all incoming service requests <service path>/<output>
331-
func (collec *ServiceCollection) HandleServiceOutput(w http.ResponseWriter, r *http.Request) {
332-
// Trim leading/trailing slashes and split path
333-
parts := strings.Split(strings.Trim(r.URL.Path, "/"), "/")
334-
if len(parts) < 2 {
335-
http.Error(w, "Invalid path, must be /<service-path>/<output>", http.StatusBadRequest)
336-
return
337-
}
338-
339-
// Last segment is the output
340-
pred := parts[len(parts)-1]
341-
342-
// Everything else is the service path
343-
servicePath := parts[:len(parts)-1]
344-
serviceID := strings.Join(servicePath, "-")
345-
service, ok := collec.services[serviceID]
346-
if !ok {
347-
http.Error(w, "Service not found", http.StatusNotFound)
348-
return
349-
}
350-
351-
// Lookup output mapping from ServiceConfiguration
352-
output, exists := service.Configuration.Spec.OutputMapping[pred]
353-
if !exists {
354-
logrus.Errorf("No output mapping found for %s", pred)
355-
http.Error(w, fmt.Sprintf("Requested service has no output %s", pred), http.StatusNotFound)
356-
return
357-
}
358-
359-
if output.Distribution == nil || output.Distribution.Access == nil {
360-
logrus.Errorf("Output %s has no distribution access defined", pred)
361-
http.Error(w, fmt.Sprintf("Output %s is not accessible", pred), http.StatusNotFound)
345+
func (collec *ServiceCollection) HandleServiceOutput(w http.ResponseWriter, r *http.Request, forwardURL string) {
346+
if forwardURL == "" {
347+
http.Error(w, "Output has no distribution", http.StatusNotFound)
362348
return
363349
}
364350

365-
access := output.Distribution.Access
366-
367-
// Extract servicePort
368-
port := access.ServicePort
369-
370-
// Normalize internalPath (default "/")
371-
internalPath := access.InternalPath
372-
if internalPath == "" {
373-
internalPath = "/"
374-
}
375-
376-
// ensure it starts with "/"
377-
if !strings.HasPrefix(internalPath, "/") {
378-
internalPath = "/" + internalPath
379-
}
380-
381-
// Build forwarding URL
382-
forwardURL := fmt.Sprintf(
383-
"%s://%s.%s.svc.cluster.local:%d%s",
384-
access.Protocol,
385-
service.NamespaceID,
386-
model.Namespace,
387-
port,
388-
internalPath,
389-
)
390-
391351
req, err := http.NewRequest(r.Method, forwardURL, r.Body)
392352
if err != nil {
393353
logrus.WithError(err).Error("Failed to create forward request")
394354
http.Error(w, "Failed to reach requested service", http.StatusInternalServerError)
395355
return
396356
}
397357

398-
// Copy headers from original request
399358
for k, vv := range r.Header {
400359
for _, v := range vv {
401360
req.Header.Add(k, v)
402361
}
403362
}
404363

405-
// Send request using proxy client
406364
resp, err := model.HttpClient.Do(req)
407365
if err != nil {
408366
http.Error(w, fmt.Sprintf("Failed to forward request: %v", err), http.StatusBadGateway)
409367
return
410368
}
411369
defer resp.Body.Close()
412370

413-
// Copy response headers
414371
for k, vv := range resp.Header {
415372
for _, v := range vv {
416373
w.Header().Add(k, v)
417374
}
418375
}
419376

420-
// Write status code
421377
w.WriteHeader(resp.StatusCode)
422378

423-
// Copy response body
424379
if _, err := io.Copy(w, resp.Body); err != nil {
425380
logrus.WithError(err).Error("Failed to copy response body")
426381
}

containers/aggregator/model/service-description.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -189,14 +189,15 @@ func (service *Service) InitDescription() error {
189189
description.Service.AddQuad(quad)
190190
}
191191

192-
// OUTPUTS (UPDATED)
192+
// OUTPUTS
193193
for pred, output := range service.Configuration.Spec.OutputMapping {
194194

195+
id := uuid.New().String()
195196
outputStore := rdfgo.NewStore()
196197
description.Outputs[pred] = outputStore
197198

198-
// Dataset node
199-
datasetNode := rdfgo.NewNamedNode(service.FullPath + "#" + pred + "Dataset")
199+
// Dataset node
200+
datasetNode := rdfgo.NewNamedNode(service.FullPath + "#" + id + "Dataset")
200201

201202
quad, err := rdfgo.NewQuad(
202203
datasetNode,
@@ -209,7 +210,7 @@ func (service *Service) InitDescription() error {
209210
}
210211
outputStore.AddQuad(quad)
211212

212-
// Service serves dataset
213+
// Service serves dataset
213214
quad, err = rdfgo.NewQuad(
214215
svcNode,
215216
Dcat("servesDataset"),
@@ -221,7 +222,7 @@ func (service *Service) InitDescription() error {
221222
}
222223
outputStore.AddQuad(quad)
223224

224-
// DATASET METADATA
225+
// DATASET METADATA
225226
if output.Dataset != nil {
226227

227228
if output.Dataset.Title != "" {
@@ -250,7 +251,7 @@ func (service *Service) InitDescription() error {
250251
outputStore.AddQuad(quad)
251252
}
252253

253-
// Extra RDF properties
254+
// Extra RDF properties
254255
for predURI, val := range output.Dataset.ExtraProperties {
255256
quad, err = rdfgo.NewQuad(
256257
datasetNode,
@@ -265,10 +266,10 @@ func (service *Service) InitDescription() error {
265266
}
266267
}
267268

268-
// DISTRIBUTION
269+
// DISTRIBUTION
269270
if output.Distribution != nil && output.Distribution.Access != nil {
270271

271-
distNode := rdfgo.NewNamedNode(service.FullPath + "#" + pred + "Distribution")
272+
distNode := rdfgo.NewNamedNode(service.FullPath + "#" + id + "Distribution")
272273

273274
// rdf:type
274275
quad, err = rdfgo.NewQuad(
@@ -294,7 +295,7 @@ func (service *Service) InitDescription() error {
294295
}
295296
outputStore.AddQuad(quad)
296297

297-
// Distribution metadata
298+
// Distribution metadata
298299
if output.Distribution.Title != "" {
299300
quad, err = rdfgo.NewQuad(
300301
distNode,
@@ -321,7 +322,7 @@ func (service *Service) InitDescription() error {
321322
outputStore.AddQuad(quad)
322323
}
323324

324-
// Access URL
325+
// Access URL
325326
access := output.Distribution.Access
326327

327328
externalPath := access.ExternalPath
@@ -347,7 +348,7 @@ func (service *Service) InitDescription() error {
347348
}
348349
outputStore.AddQuad(quad)
349350

350-
// Link to DataService
351+
// Link to DataService
351352
quad, err = rdfgo.NewQuad(
352353
distNode,
353354
Dcat("accessService"),
@@ -359,7 +360,7 @@ func (service *Service) InitDescription() error {
359360
}
360361
outputStore.AddQuad(quad)
361362

362-
// Distribution extra RDF
363+
// Distribution extra RDF
363364
for predURI, val := range output.Distribution.ExtraProperties {
364365
quad, err = rdfgo.NewQuad(
365366
distNode,

containers/aggregator/services/deploy-service.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package services
33
import (
44
"aggregator/model"
55
"context"
6+
"encoding/json"
67
"fmt"
78
"strings"
89
"time"
@@ -73,12 +74,31 @@ func deploy(
7374
return err
7475
}
7576

77+
// Inject UMA Proxy envs
78+
if useUMA {
79+
err = injectUMAEnv(obj)
80+
if err != nil {
81+
return err
82+
}
83+
}
84+
85+
// Inject namespace
86+
err = injectNamespace(obj, model.Namespace)
87+
if err != nil {
88+
return err
89+
}
90+
7691
// Inject labels
7792
err = injectLabels(obj, service)
7893
if err != nil {
7994
return err
8095
}
8196

97+
// Log final spec
98+
if finalBytes, err := json.MarshalIndent(obj, "", " "); err == nil {
99+
logrus.Debugf("Final deployment spec for service %s:\n%s", service.NamespaceID, string(finalBytes))
100+
}
101+
82102
// Apply
83103
err = applyObject(ctx, obj)
84104
if err != nil {

containers/aggregator/services/util.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,20 @@ func injectIntoPodSpec(podSpec *corev1.PodSpec, envVars []corev1.EnvVar) {
162162
}
163163
}
164164

165+
func injectNamespace(obj interface{}, namespace string) error {
166+
switch r := obj.(type) {
167+
case *appsv1.Deployment:
168+
r.Namespace = namespace
169+
case *batchv1.Job:
170+
r.Namespace = namespace
171+
case *batchv1.CronJob:
172+
r.Namespace = namespace
173+
default:
174+
return fmt.Errorf("unsupported object type")
175+
}
176+
return nil
177+
}
178+
165179
func injectLabels(obj interface{}, service *model.Service) error {
166180

167181
labels := map[string]string{

makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# ------------------------
1313

1414
CONFIG_DIR ?= ./config
15+
CONFIG_FILE ?= local.yaml
1516
SERVICE_DIR := $(CONFIG_DIR)/services
1617
FNO_DIR := $(CONFIG_DIR)/fno
1718
SERVICE_FILES := $(wildcard $(SERVICE_DIR)/*.yaml)
@@ -34,7 +35,7 @@ deploy:
3435

3536
kind-deploy:
3637
$(MAKE) configure-etc-hosts HOSTS="aggregator.local wsl.local"
37-
$(MAKE) deploy CONFIG=kind.yaml
38+
$(MAKE) deploy CONFIG=$(CONFIG_FILE)
3839

3940
undeploy:
4041
@echo "🧹 Stopping aggregator deployment..."

0 commit comments

Comments
 (0)