Skip to content

Commit 52f9c46

Browse files
committed
feat: add path matching and rules for API route inclusion/exclusion
- Implemented MatchPath function to match URL paths against glob patterns. - Added PathRule struct to define include/exclude rules for API paths. - Introduced ShouldIncludePath function to evaluate path rules. - Created InternalPathRules function for default exclusion of internal endpoints. - Enhanced BuildPathRules to combine internal and user-defined rules. - Added comprehensive tests for path matching and rule evaluation. - Updated RoutingConfig to include PathRules for controlling published routes.
1 parent 8407831 commit 52f9c46

57 files changed

Lines changed: 12110 additions & 90 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

discovery/consul/consul.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,10 @@ type Config struct {
5656
// ConsulDiscovery implements discovery.ServiceDiscovery using Consul.
5757
// It also implements farp.StorageBackend for KV operations.
5858
type ConsulDiscovery struct {
59-
client *consulapi.Client
60-
config Config
61-
mu sync.RWMutex
62-
closed bool
59+
client *consulapi.Client
60+
config Config
61+
mu sync.RWMutex
62+
closed bool
6363
}
6464

6565
// New creates a new Consul-based discovery backend.

discovery/mdns/mdns.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -228,12 +228,12 @@ func (m *MDNSDiscovery) Register(_ context.Context, instance discovery.ServiceIn
228228
}
229229

230230
server, err := zeroconf.Register(
231-
instance.ID, // instance name
232-
m.config.ServiceType, // service type
233-
m.config.Domain, // domain
234-
port, // port
235-
txt, // TXT records
236-
ifaces, // interfaces
231+
instance.ID, // instance name
232+
m.config.ServiceType, // service type
233+
m.config.Domain, // domain
234+
port, // port
235+
txt, // TXT records
236+
ifaces, // interfaces
237237
)
238238
if err != nil {
239239
return fmt.Errorf("%w: mDNS registration failed: %w", farp.ErrRegistrationFailed, err)

discovery/node.go

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func NewServiceNode(config ServiceNodeConfig) (*ServiceNode, error) {
5555
manifest := farp.NewManifest(config.ServiceName, config.ServiceVersion, config.InstanceID)
5656
manifest.Routing.Strategy = config.MountStrategy
5757
manifest.Routing.BasePath = config.BasePath
58+
manifest.Routing.PathRules = config.PathRules
5859

5960
// Wire endpoints from config, falling back to FARP defaults
6061
manifest.Endpoints.Health = "/_farp/health"
@@ -387,6 +388,11 @@ func (n *ServiceNode) healthLoop(ctx context.Context, discovery ServiceDiscovery
387388

388389
// pushHealthCheck sends a heartbeat with routes_checksum and handles
389390
// reconciliation if the gateway's state doesn't match (§17.4.1).
391+
//
392+
// Gateway restart recovery: if the heartbeat returns an error (e.g. 404
393+
// because the gateway lost all state), we re-register with the full
394+
// manifest so the gateway can rebuild its route table immediately
395+
// without needing to fetch the manifest back from the service.
390396
func (n *ServiceNode) pushHealthCheck(ctx context.Context, pushDisc *PushDiscovery) {
391397
n.mu.RLock()
392398
expectedChecksum := n.manifest.RoutesChecksum
@@ -398,22 +404,36 @@ func (n *ServiceNode) pushHealthCheck(ctx context.Context, pushDisc *PushDiscove
398404
expectedChecksum,
399405
)
400406
if err != nil {
401-
// Gateway unreachable — retry registration
402-
n.retryRegister(ctx, pushDisc)
407+
// Gateway unreachable or returned 404 (service unknown after restart).
408+
// Re-register with full manifest so the gateway can rebuild immediately.
409+
n.pushReRegisterWithManifest(ctx, pushDisc)
403410
return
404411
}
405412

406413
// Reconciliation: if gateway checksum doesn't match, re-register WITH
407414
// manifest so the gateway can apply schemas without fetching.
408415
if resp.RoutesChecksum != expectedChecksum {
409-
n.mu.RLock()
410-
manifest := n.manifest
411-
n.mu.RUnlock()
416+
n.pushReRegisterWithManifest(ctx, pushDisc)
417+
}
418+
}
412419

413-
instance := n.buildInstance()
414-
if _, err := pushDisc.RegisterWithManifest(ctx, instance, manifest); err != nil {
415-
// Non-fatal: will retry on next heartbeat
416-
_ = err
420+
// pushReRegisterWithManifest re-registers with the gateway, sending the
421+
// full manifest inline. This handles both gateway restarts (404) and
422+
// checksum mismatches without requiring the gateway to fetch back.
423+
func (n *ServiceNode) pushReRegisterWithManifest(ctx context.Context, pushDisc *PushDiscovery) {
424+
n.mu.RLock()
425+
manifest := n.manifest
426+
n.mu.RUnlock()
427+
428+
instance := n.buildInstance()
429+
430+
for attempt := range n.config.MaxRetries {
431+
if attempt > 0 {
432+
time.Sleep(n.config.RetryBackoff * time.Duration(attempt))
433+
}
434+
435+
if _, err := pushDisc.RegisterWithManifest(ctx, instance, manifest); err == nil {
436+
return
417437
}
418438
}
419439
}

discovery/options.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ type ServiceNodeConfig struct {
5050
// Routing configuration (flows into manifest)
5151
MountStrategy farp.MountStrategy
5252
BasePath string
53+
PathRules []farp.PathRule
5354

5455
// Routes provides route information for OpenAPI schema generation.
5556
// Can be []farp.RouteDescriptor, map[string]any (OpenAPI paths), or any
@@ -144,4 +145,3 @@ func (c *GatewayNodeConfig) setDefaults() {
144145
c.HeartbeatTimeout = 30 * time.Second
145146
}
146147
}
147-

discovery/push_handler.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ type PushHandler struct {
3232
}
3333

3434
type pushEntry struct {
35-
Instance ServiceInstance
36-
Manifest *farp.SchemaManifest
37-
LastSeen time.Time
35+
Instance ServiceInstance
36+
Manifest *farp.SchemaManifest
37+
LastSeen time.Time
3838
}
3939

4040
// NewPushHandler creates a new push registration handler.

farp-rust/src/merger/mod.rs

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ pub struct MergerConfig {
3434
pub merged_version: String,
3535
/// Whether to include service tags in operations
3636
pub include_service_tags: bool,
37+
/// When true, all operations from a service are grouped under a single
38+
/// tag matching the service name, instead of prefixing individual tags.
39+
/// Takes precedence over include_service_tags when both are true.
40+
pub collapse_service_tags: bool,
3741
/// Whether to sort merged content alphabetically
3842
pub sort_output: bool,
3943
/// Custom server URLs for the merged spec
@@ -48,6 +52,7 @@ impl Default for MergerConfig {
4852
merged_description: "Merged API specification from multiple services".to_string(),
4953
merged_version: "1.0.0".to_string(),
5054
include_service_tags: true,
55+
collapse_service_tags: false,
5156
sort_output: true,
5257
servers: Vec::new(),
5358
}
@@ -254,10 +259,14 @@ impl Merger {
254259
&operation_id_prefix,
255260
&tag_prefix,
256261
&service_name,
262+
self.config.collapse_service_tags,
257263
&mut seen_operation_ids,
258264
&mut result,
259265
);
260266

267+
// Rewrite $ref strings to match prefixed component names
268+
rewrite_path_item_refs(&mut path_item, &component_prefix);
269+
261270
result.spec.paths.insert(path.clone(), path_item);
262271
seen_paths.insert(path, service_name.clone());
263272
}
@@ -372,26 +381,40 @@ impl Merger {
372381
}
373382

374383
// Merge tags
375-
for mut tag in parsed.tags.clone() {
376-
if !tag_prefix.is_empty() && self.config.include_service_tags {
377-
tag.name = format!("{}_{}", tag_prefix, tag.name);
384+
if self.config.collapse_service_tags {
385+
// Collapse all tags into a single service-level tag
386+
let service_tag = Tag {
387+
name: service_name.clone(),
388+
description: Some(format!("Routes from {service_name}")),
389+
extensions: HashMap::new(),
390+
};
391+
if !seen_tags.contains_key(&service_tag.name) {
392+
seen_tags.insert(service_tag.name.clone(), service_tag.clone());
393+
result.spec.tags.push(service_tag);
378394
}
395+
} else {
396+
for mut tag in parsed.tags.clone() {
397+
if !tag_prefix.is_empty() && self.config.include_service_tags {
398+
tag.name = format!("{}_{}", tag_prefix, tag.name);
399+
}
379400

380-
if let Some(existing) = seen_tags.get(&tag.name) {
381-
// Merge descriptions
382-
if tag.description.is_some() && existing.description.is_none() {
383-
let mut updated = existing.clone();
384-
updated.description = tag.description;
385-
seen_tags.insert(tag.name.clone(), updated.clone());
386-
// Update in result as well
387-
if let Some(pos) = result.spec.tags.iter().position(|t| t.name == tag.name)
388-
{
389-
result.spec.tags[pos] = updated;
401+
if let Some(existing) = seen_tags.get(&tag.name) {
402+
// Merge descriptions
403+
if tag.description.is_some() && existing.description.is_none() {
404+
let mut updated = existing.clone();
405+
updated.description = tag.description;
406+
seen_tags.insert(tag.name.clone(), updated.clone());
407+
// Update in result as well
408+
if let Some(pos) =
409+
result.spec.tags.iter().position(|t| t.name == tag.name)
410+
{
411+
result.spec.tags[pos] = updated;
412+
}
390413
}
414+
} else {
415+
seen_tags.insert(tag.name.clone(), tag.clone());
416+
result.spec.tags.push(tag);
391417
}
392-
} else {
393-
seen_tags.insert(tag.name.clone(), tag.clone());
394-
result.spec.tags.push(tag);
395418
}
396419
}
397420
}

farp-rust/src/merger/openapi.rs

Lines changed: 101 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ fn apply_mount_strategy(path: &str, manifest: &SchemaManifest) -> String {
334334
}
335335
}
336336

337-
/// Adds prefix to component schema names
337+
/// Adds prefix to component schema names and rewrites $ref strings.
338338
pub fn prefix_component_names(components: &Components, prefix: &str) -> Components {
339339
if prefix.is_empty() {
340340
return components.clone();
@@ -344,7 +344,11 @@ pub fn prefix_component_names(components: &Components, prefix: &str) -> Componen
344344
schemas: components
345345
.schemas
346346
.iter()
347-
.map(|(name, schema)| (format!("{prefix}_{name}"), schema.clone()))
347+
.map(|(name, schema)| {
348+
let mut rewritten = schema.clone();
349+
rewrite_refs(&mut rewritten, prefix);
350+
(format!("{prefix}_{name}"), rewritten)
351+
})
348352
.collect(),
349353
responses: components
350354
.responses
@@ -362,8 +366,97 @@ pub fn prefix_component_names(components: &Components, prefix: &str) -> Componen
362366
.map(|(name, body)| (format!("{prefix}_{name}"), body.clone()))
363367
.collect(),
364368
headers: HashMap::new(),
365-
security_schemes: components.security_schemes.clone(), // Don't prefix security schemes
369+
security_schemes: components.security_schemes.clone(),
370+
}
371+
}
372+
373+
/// Recursively rewrites $ref strings in a JSON value.
374+
/// Transforms "#/components/schemas/Foo" → "#/components/schemas/prefix_Foo".
375+
pub fn rewrite_refs(value: &mut serde_json::Value, prefix: &str) {
376+
match value {
377+
serde_json::Value::Object(map) => {
378+
if let Some(serde_json::Value::String(ref_str)) = map.get("$ref") {
379+
let rewritten = rewrite_ref_string(ref_str, prefix);
380+
map.insert("$ref".to_string(), serde_json::Value::String(rewritten));
381+
}
382+
for (key, val) in map.iter_mut() {
383+
if key != "$ref" {
384+
rewrite_refs(val, prefix);
385+
}
386+
}
387+
}
388+
serde_json::Value::Array(arr) => {
389+
for item in arr.iter_mut() {
390+
rewrite_refs(item, prefix);
391+
}
392+
}
393+
_ => {}
394+
}
395+
}
396+
397+
fn rewrite_ref_string(ref_str: &str, prefix: &str) -> String {
398+
let component_prefixes = [
399+
"#/components/schemas/",
400+
"#/components/responses/",
401+
"#/components/parameters/",
402+
"#/components/requestBodies/",
403+
"#/components/headers/",
404+
];
405+
406+
for cp in &component_prefixes {
407+
if let Some(name) = ref_str.strip_prefix(cp) {
408+
return format!("{cp}{prefix}_{name}");
409+
}
410+
}
411+
412+
ref_str.to_string()
413+
}
414+
415+
/// Rewrites $ref strings in all operations of a PathItem.
416+
pub fn rewrite_path_item_refs(item: &mut PathItem, prefix: &str) {
417+
if prefix.is_empty() {
418+
return;
366419
}
420+
421+
let rewrite_op = |op: &mut Option<Operation>| {
422+
if let Some(operation) = op {
423+
// Rewrite refs in responses
424+
if let Some(ref mut responses) = operation.responses {
425+
for resp in responses.values_mut() {
426+
if let Some(ref mut content) = resp.content {
427+
for media in content.values_mut() {
428+
if let Some(ref mut schema) = media.schema {
429+
rewrite_refs(schema, prefix);
430+
}
431+
}
432+
}
433+
}
434+
}
435+
// Rewrite refs in parameters
436+
for param in &mut operation.parameters {
437+
if let Some(ref mut schema) = param.schema {
438+
rewrite_refs(schema, prefix);
439+
}
440+
}
441+
// Rewrite refs in request body
442+
if let Some(ref mut rb) = operation.request_body {
443+
for media in rb.content.values_mut() {
444+
if let Some(ref mut schema) = media.schema {
445+
rewrite_refs(schema, prefix);
446+
}
447+
}
448+
}
449+
}
450+
};
451+
452+
rewrite_op(&mut item.get);
453+
rewrite_op(&mut item.post);
454+
rewrite_op(&mut item.put);
455+
rewrite_op(&mut item.delete);
456+
rewrite_op(&mut item.patch);
457+
rewrite_op(&mut item.options);
458+
rewrite_op(&mut item.head);
459+
rewrite_op(&mut item.trace);
367460
}
368461

369462
/// Applies prefixes to operation IDs and tags
@@ -372,6 +465,7 @@ pub fn apply_operation_prefixes(
372465
op_id_prefix: &str,
373466
tag_prefix: &str,
374467
service_name: &str,
468+
collapse_service_tags: bool,
375469
seen_operation_ids: &mut HashMap<String, String>,
376470
result: &mut MergeResult,
377471
) -> PathItem {
@@ -399,8 +493,10 @@ pub fn apply_operation_prefixes(
399493
operation.operation_id = Some(new_id);
400494
}
401495

402-
// Prefix tags
403-
if !tag_prefix.is_empty() {
496+
// Handle tags: collapse all to service name, or prefix individually
497+
if collapse_service_tags {
498+
operation.tags = vec![service_name.to_string()];
499+
} else if !tag_prefix.is_empty() {
404500
operation.tags = operation
405501
.tags
406502
.iter()

0 commit comments

Comments
 (0)