diff --git a/packages/streams-adapter/common/types.go b/packages/streams-adapter/common/types.go index bc9124ec1d7..1f043183e56 100644 --- a/packages/streams-adapter/common/types.go +++ b/packages/streams-adapter/common/types.go @@ -12,9 +12,10 @@ type RequestParams map[string]string // Observation represents the data returned from an adapter type Observation struct { - Data json.RawMessage `json:"data"` - Success bool `json:"success"` - Error string `json:"error,omitempty"` + Data json.RawMessage `json:"data"` + Timestamps json.RawMessage `json:"timestamps"` + Success bool `json:"success"` + Error string `json:"error,omitempty"` } // CacheItem represents a cached value with metadata diff --git a/packages/streams-adapter/config/config.go b/packages/streams-adapter/config/config.go index c8c10d837c2..52c42af0851 100644 --- a/packages/streams-adapter/config/config.go +++ b/packages/streams-adapter/config/config.go @@ -18,6 +18,9 @@ type Config struct { CacheTTLMinutes uint // Cache TTL in minutes (0 = default 5 minutes) CacheCleanupInterval uint // Cache cleanup interval in minutes (0 = default 1 minute) + // Subscription configuration + SubscriptionRetryDelaySeconds uint // Delay before allowing re-subscription (0 = default 10s) + // Other configuration LogLevel string AdapterName string @@ -39,6 +42,9 @@ func Load() *Config { CacheTTLMinutes: getEnvAsInt("CACHE_TTL_MINUTES", 5), CacheCleanupInterval: getEnvAsInt("CACHE_CLEANUP_INTERVAL", 1), + // Subscription + SubscriptionRetryDelaySeconds: getEnvAsInt("SUBSCRIPTION_RETRY_DELAY_SECONDS", 10), + // Other LogLevel: getEnv("LOG_LEVEL", "info"), AdapterName: adapterName, diff --git a/packages/streams-adapter/helpers/aliases.go b/packages/streams-adapter/helpers/aliases.go index 69c04b0bb20..1860b718149 100644 --- a/packages/streams-adapter/helpers/aliases.go +++ b/packages/streams-adapter/helpers/aliases.go @@ -111,7 +111,7 @@ func (idx *endpointIndex) addAlias(alias, canonical string) { // by scanning the Redis key (OriginalAdapterKey) for any known endpoint aliases. func findEndpointInKey(key string) (string, error) { if activeIndex == nil { - return "", fmt.Errorf("alias index not initialized") + panic("alias index not initialized: InitAliasIndex must be called before processing requests") } lowerKey := strings.ToLower(key) @@ -144,7 +144,7 @@ func BuildCacheKeyParams(data map[string]interface{}) (types.RequestParams, erro } if activeIndex == nil { - return nil, fmt.Errorf("alias index not initialized") + panic("alias index not initialized: InitAliasIndex must be called before processing requests") } canonicalEndpoint, ok := activeIndex.alias[strings.ToLower(ep)] if !ok || canonicalEndpoint == "" { diff --git a/packages/streams-adapter/helpers/aliases_test.go b/packages/streams-adapter/helpers/aliases_test.go index cd436c791e6..3933e1bd037 100644 --- a/packages/streams-adapter/helpers/aliases_test.go +++ b/packages/streams-adapter/helpers/aliases_test.go @@ -134,6 +134,17 @@ func TestInitAliasIndex_Idempotent(t *testing.T) { require.Equal(t, firstIdx, activeIndex) } +// --------------------------------------------------------------------------- +// findEndpointInKey tests +// --------------------------------------------------------------------------- + +func TestFindEndpointInKey_IndexNotInitialized(t *testing.T) { + resetGlobals() + require.Panics(t, func() { + findEndpointInKey("TEST-ADAPTER-price-ws-subscriptionSet") + }) +} + // --------------------------------------------------------------------------- // BuildCacheKeyParams tests // --------------------------------------------------------------------------- @@ -148,11 +159,12 @@ func initTestAdapter(t *testing.T) { func TestBuildCacheKeyParams_AliasIndexNotInitialized(t *testing.T) { resetGlobals() - _, err := BuildCacheKeyParams(map[string]interface{}{ - "endpoint": "price", - "base": "ETH", + require.Panics(t, func() { + BuildCacheKeyParams(map[string]interface{}{ + "endpoint": "price", + "base": "ETH", + }) }) - require.Error(t, err) } func TestBuildCacheKeyParams_UnknownEndpoint(t *testing.T) { diff --git a/packages/streams-adapter/helpers/helpers_test.go b/packages/streams-adapter/helpers/helpers_test.go index 0e723350dd0..57801d1880e 100644 --- a/packages/streams-adapter/helpers/helpers_test.go +++ b/packages/streams-adapter/helpers/helpers_test.go @@ -62,8 +62,9 @@ func TestRequestParamsFromKey_CannotDeriveEndpoint(t *testing.T) { func TestRequestParamsFromKey_AliasIndexNotInitialized(t *testing.T) { resetGlobals() - _, err := RequestParamsFromKey(`adapter-price-{"base":"eth"}`) - require.Error(t, err) + require.Panics(t, func() { + RequestParamsFromKey(`adapter-price-{"base":"eth"}`) + }) } // --------------------------------------------------------------------------- diff --git a/packages/streams-adapter/helpers/keymapper.go b/packages/streams-adapter/helpers/keymapper.go index 7fecdb24ef9..9743bcc91e3 100644 --- a/packages/streams-adapter/helpers/keymapper.go +++ b/packages/streams-adapter/helpers/keymapper.go @@ -7,6 +7,8 @@ import ( "time" ) +const zaddNotificationTimeout = 2 * time.Second + // KeyMapper learns and stores mappings from raw cache keys (derived from client // request params with endpoint-only alias resolution) to transformed cache keys // (derived from the JS adapter's validated+transformed params seen in ZADD). @@ -70,11 +72,17 @@ func (km *KeyMapper) SubscribeAndLearn( rawCacheKey string, subscribeFn func(), ) { + // Fast path: mapping already known, no lock needed. + if _, ok := km.Get(rawCacheKey); ok { + subscribeFn() + return + } + epLock := km.getEndpointLock(endpointTransport) epLock.Lock() defer epLock.Unlock() - // If we already have the mapping, no need to learn again. + // Re-check after acquiring lock — another goroutine may have learned it. if _, ok := km.Get(rawCacheKey); ok { subscribeFn() return @@ -105,7 +113,7 @@ func (km *KeyMapper) SubscribeAndLearn( km.mu.Unlock() km.logger.Debug("Learned key mapping", "rawKey", rawCacheKey, "transformedKey", transformedKey) } - case <-time.After(2 * time.Second): + case <-time.After(zaddNotificationTimeout): km.logger.Debug("Timed out waiting for ZADD notification", "endpoint", endpointTransport, "rawKey", rawCacheKey) } } diff --git a/packages/streams-adapter/redcon/redcon.go b/packages/streams-adapter/redcon/redcon.go index 894ac2434db..553841068d6 100644 --- a/packages/streams-adapter/redcon/redcon.go +++ b/packages/streams-adapter/redcon/redcon.go @@ -211,6 +211,11 @@ func (s *RedconServer) handleEval(conn redcon.Conn, cmd redcon.Command) { obs.Data = data } + // Extract timestamps field + if timestamps, hasTimestamps := rawJSON["timestamps"]; hasTimestamps { + obs.Timestamps = timestamps + } + s.cache.Set(params, obs, time.Now(), key) conn.WriteInt(1) } diff --git a/packages/streams-adapter/server/server.go b/packages/streams-adapter/server/server.go index cbf88a66175..25b5070756e 100644 --- a/packages/streams-adapter/server/server.go +++ b/packages/streams-adapter/server/server.go @@ -334,8 +334,8 @@ func (s *Server) adapterHandler(c *gin.Context) { }) // Remove from tracker after subscription attempt completes. - // Allow retries after 10 seconds if data still not available. - time.Sleep(10 * time.Second) + // Allow retries after delay if data still not available. + time.Sleep(time.Duration(s.config.SubscriptionRetryDelaySeconds) * time.Second) s.subscriptionTracker.Delete(key) }(rawCacheKey, rawParams, reqData.Data) } else if s.config.LogLevel == "debug" {