Skip to content

Commit 37ecf5a

Browse files
authored
Add cog proxy endpoint (#615)
1 parent fb42974 commit 37ecf5a

4 files changed

Lines changed: 174 additions & 3 deletions

File tree

api/handlers/productfile_cog.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package handlers
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"net/http"
7+
8+
"github.com/aws/aws-sdk-go/aws"
9+
"github.com/aws/aws-sdk-go/aws/session"
10+
"github.com/aws/aws-sdk-go/service/s3"
11+
"github.com/google/uuid"
12+
"github.com/labstack/echo/v4"
13+
14+
"github.com/USACE/cumulus-api/api/middleware"
15+
"github.com/USACE/cumulus-api/api/models"
16+
17+
_ "github.com/jackc/pgx/v4"
18+
"github.com/jackc/pgx/v4/pgxpool"
19+
)
20+
21+
// ListProductfilesCOG returns the productfiles for a product over a time range as
22+
// directly-readable COG proxy URLs (each backed by StreamProductfileCOG, which is
23+
// Range-capable so a GDAL /vsicurl/ client can read tiles). Same query params as
24+
// ListProductfiles: 'after' and 'before' (RFC3339).
25+
func ListProductfilesCOG(db *pgxpool.Pool) echo.HandlerFunc {
26+
return func(c echo.Context) error {
27+
id, err := uuid.Parse(c.Param("product_id"))
28+
if err != nil {
29+
return c.String(http.StatusBadRequest, "Malformed ID")
30+
}
31+
after := c.QueryParam("after")
32+
before := c.QueryParam("before")
33+
if after == "" || before == "" {
34+
return c.String(
35+
http.StatusBadRequest,
36+
"Missing query parameter 'after' or 'before'",
37+
)
38+
}
39+
40+
ff, err := models.ListProductfiles(db, id, after, before)
41+
if err != nil {
42+
return c.String(http.StatusInternalServerError, err.Error())
43+
}
44+
45+
cogs := make([]models.ProductfileCOG, 0, len(ff))
46+
for _, f := range ff {
47+
cogs = append(cogs, models.ProductfileCOG{
48+
ID: f.ID,
49+
Datetime: f.Datetime,
50+
Version: f.Version,
51+
CogURL: fmt.Sprintf("/api/products/%s/cog/%s", id.String(), f.ID.String()),
52+
})
53+
}
54+
return c.JSON(http.StatusOK, cogs)
55+
}
56+
}
57+
58+
// StreamProductfileCOG streams a productfile's COG object from S3, passing the
59+
// inbound HTTP Range header through to S3 and returning 206 Partial Content with
60+
// Content-Range/Accept-Ranges so a GDAL /vsicurl/ client can read tiles directly
61+
// (no full download). HEAD returns size + range support for /vsicurl/ probing.
62+
// Every request is authenticated (private route) and logged for metering.
63+
func StreamProductfileCOG(db *pgxpool.Pool, awsCfg *aws.Config) echo.HandlerFunc {
64+
return func(c echo.Context) error {
65+
pfID, err := uuid.Parse(c.Param("productfile_id"))
66+
if err != nil {
67+
return c.String(http.StatusBadRequest, "Malformed productfile ID")
68+
}
69+
70+
obj, err := models.GetProductfileObject(db, pfID)
71+
if err != nil {
72+
return c.String(http.StatusNotFound, "Productfile not found")
73+
}
74+
75+
client := s3.New(session.New(awsCfg))
76+
77+
// HEAD: metadata only (size + range support) for /vsicurl/ probing.
78+
if c.Request().Method == http.MethodHead {
79+
head, err := client.HeadObject(&s3.HeadObjectInput{Bucket: &obj.Bucket, Key: &obj.Key})
80+
if err != nil {
81+
return c.String(http.StatusInternalServerError, err.Error())
82+
}
83+
h := c.Response().Header()
84+
h.Set("Accept-Ranges", "bytes")
85+
if head.ContentLength != nil {
86+
h.Set(echo.HeaderContentLength, fmt.Sprintf("%d", *head.ContentLength))
87+
}
88+
c.Response().WriteHeader(http.StatusOK)
89+
return nil
90+
}
91+
92+
in := &s3.GetObjectInput{Bucket: &obj.Bucket, Key: &obj.Key}
93+
if rangeHeader := c.Request().Header.Get("Range"); rangeHeader != "" {
94+
in.Range = aws.String(rangeHeader)
95+
}
96+
97+
out, err := client.GetObject(in)
98+
if err != nil {
99+
return c.String(http.StatusInternalServerError, err.Error())
100+
}
101+
defer out.Body.Close()
102+
103+
h := c.Response().Header()
104+
h.Set("Accept-Ranges", "bytes")
105+
status := http.StatusOK
106+
if out.ContentRange != nil {
107+
h.Set("Content-Range", *out.ContentRange)
108+
status = http.StatusPartialContent
109+
}
110+
if out.ContentLength != nil {
111+
h.Set(echo.HeaderContentLength, fmt.Sprintf("%d", *out.ContentLength))
112+
}
113+
114+
// Metering hook: who pulled which productfile and how many bytes. This is the
115+
// natural place to enforce a future per-user rate limit / quota.
116+
var bytesServed int64
117+
if out.ContentLength != nil {
118+
bytesServed = *out.ContentLength
119+
}
120+
logCOGAccess(c, pfID, bytesServed)
121+
122+
return c.Stream(status, "application/octet-stream", out.Body)
123+
}
124+
}
125+
126+
func logCOGAccess(c echo.Context, productfileID uuid.UUID, bytes int64) {
127+
sub := "key-auth"
128+
if ui, ok := c.Get("userInfo").(middleware.UserInfo); ok && ui.Sub != nil {
129+
sub = ui.Sub.String()
130+
}
131+
log.Printf("cog-access sub=%s productfile=%s bytes=%d", sub, productfileID.String(), bytes)
132+
}

api/main.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ func main() {
160160
// Additional Information About Products
161161
public.GET("/products/:product_id/availability", handlers.GetProductAvailability(db))
162162
public.GET("/products/:product_id/files", handlers.ListProductfiles(db))
163+
// Direct, Range-capable COG access (authenticated + metered) for desktop clients
164+
private.GET("/products/:product_id/cog-files", handlers.ListProductfilesCOG(db))
165+
private.GET("/products/:product_id/cog/:productfile_id", handlers.StreamProductfileCOG(db, &awsCfg))
166+
private.HEAD("/products/:product_id/cog/:productfile_id", handlers.StreamProductfileCOG(db, &awsCfg))
163167

164168
// Productfiles
165169
private.POST("/productfiles", handlers.CreateProductfiles(db),

api/middleware/gzip.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@ import (
1212
var GZIP = middleware.GzipWithConfig(middleware.GzipConfig{
1313
Level: 5,
1414
Skipper: func(c echo.Context) bool {
15-
// Skip GZIP compression for routes starting with /features
16-
// as compression messes with pg_featureserv
17-
return strings.Contains(c.Request().URL.Path, "/features")
15+
p := c.Request().URL.Path
16+
// Skip GZIP compression for routes starting with /features as compression
17+
// messes with pg_featureserv, and for COG byte-range streaming where
18+
// compression breaks Range / Content-Length / Content-Range semantics.
19+
return strings.Contains(p, "/features") || strings.Contains(p, "/cog/")
1820
},
1921
})

api/models/productfiles.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,39 @@ type ProductfileAvailability struct {
2626
IsAvailable bool `json:"is_available"`
2727
}
2828

29+
// ProductfileCOG is a productfile exposed as a directly-readable, Range-capable
30+
// COG proxy URL (served by StreamProductfileCOG) instead of a raw S3 key.
31+
type ProductfileCOG struct {
32+
ID uuid.UUID `json:"id"`
33+
Datetime time.Time `json:"datetime"`
34+
Version *time.Time `json:"version"`
35+
CogURL string `json:"cog_url"`
36+
}
37+
38+
// ProductfileObject is the S3 bucket + key backing a single productfile.
39+
type ProductfileObject struct {
40+
Key string `json:"key" db:"key"`
41+
Bucket string `json:"bucket" db:"bucket"`
42+
}
43+
44+
// GetProductfileObject returns the S3 bucket and key for a productfile id.
45+
// The bucket mirrors the download view: the 'write_to_bucket' config value; the
46+
// key is the productfile.file column.
47+
func GetProductfileObject(db *pgxpool.Pool, ID uuid.UUID) (*ProductfileObject, error) {
48+
var obj ProductfileObject
49+
if err := pgxscan.Get(
50+
context.Background(), db, &obj,
51+
`SELECT f.file AS key,
52+
(SELECT config.config_value FROM config WHERE config.config_name::text = 'write_to_bucket'::text) AS bucket
53+
FROM productfile f
54+
WHERE f.id = $1`,
55+
ID,
56+
); err != nil {
57+
return nil, err
58+
}
59+
return &obj, nil
60+
}
61+
2962
// ListProductfiles returns array of productfiles
3063
func ListProductfiles(db *pgxpool.Pool, ID uuid.UUID, after string, before string) ([]Productfile, error) {
3164
ff := make([]Productfile, 0)

0 commit comments

Comments
 (0)