Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions packages/streams-adapter/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ type RequestParams map[string]string

// Observation represents the data returned from an adapter
type Observation struct {
Data json.RawMessage `json:"data"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
Data json.RawMessage `json:"data"`
Timestamps json.RawMessage `json:"timestamps"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
}

// CacheItem represents a cached value with metadata
Expand Down
6 changes: 6 additions & 0 deletions packages/streams-adapter/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ type Config struct {
CacheTTLMinutes uint // Cache TTL in minutes (0 = default 5 minutes)
CacheCleanupInterval uint // Cache cleanup interval in minutes (0 = default 1 minute)

// Subscription configuration
SubscriptionRetryDelaySeconds uint // Delay before allowing re-subscription (0 = default 10s)

// Other configuration
LogLevel string
AdapterName string
Expand All @@ -39,6 +42,9 @@ func Load() *Config {
CacheTTLMinutes: getEnvAsInt("CACHE_TTL_MINUTES", 5),
CacheCleanupInterval: getEnvAsInt("CACHE_CLEANUP_INTERVAL", 1),

// Subscription
SubscriptionRetryDelaySeconds: getEnvAsInt("SUBSCRIPTION_RETRY_DELAY_SECONDS", 10),

// Other
LogLevel: getEnv("LOG_LEVEL", "info"),
AdapterName: adapterName,
Expand Down
4 changes: 2 additions & 2 deletions packages/streams-adapter/helpers/aliases.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (idx *endpointIndex) addAlias(alias, canonical string) {
// by scanning the Redis key (OriginalAdapterKey) for any known endpoint aliases.
func findEndpointInKey(key string) (string, error) {
if activeIndex == nil {
return "", fmt.Errorf("alias index not initialized")
panic("alias index not initialized: InitAliasIndex must be called before processing requests")
}
lowerKey := strings.ToLower(key)

Expand Down Expand Up @@ -144,7 +144,7 @@ func BuildCacheKeyParams(data map[string]interface{}) (types.RequestParams, erro
}

if activeIndex == nil {
return nil, fmt.Errorf("alias index not initialized")
panic("alias index not initialized: InitAliasIndex must be called before processing requests")
}
canonicalEndpoint, ok := activeIndex.alias[strings.ToLower(ep)]
if !ok || canonicalEndpoint == "" {
Expand Down
20 changes: 16 additions & 4 deletions packages/streams-adapter/helpers/aliases_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,17 @@ func TestInitAliasIndex_Idempotent(t *testing.T) {
require.Equal(t, firstIdx, activeIndex)
}

// ---------------------------------------------------------------------------
// findEndpointInKey tests
// ---------------------------------------------------------------------------

func TestFindEndpointInKey_IndexNotInitialized(t *testing.T) {
resetGlobals()
require.Panics(t, func() {
findEndpointInKey("TEST-ADAPTER-price-ws-subscriptionSet")
})
}

// ---------------------------------------------------------------------------
// BuildCacheKeyParams tests
// ---------------------------------------------------------------------------
Expand All @@ -148,11 +159,12 @@ func initTestAdapter(t *testing.T) {

func TestBuildCacheKeyParams_AliasIndexNotInitialized(t *testing.T) {
resetGlobals()
_, err := BuildCacheKeyParams(map[string]interface{}{
"endpoint": "price",
"base": "ETH",
require.Panics(t, func() {
BuildCacheKeyParams(map[string]interface{}{
"endpoint": "price",
"base": "ETH",
})
})
require.Error(t, err)
}

func TestBuildCacheKeyParams_UnknownEndpoint(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions packages/streams-adapter/helpers/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ func TestRequestParamsFromKey_CannotDeriveEndpoint(t *testing.T) {
func TestRequestParamsFromKey_AliasIndexNotInitialized(t *testing.T) {
resetGlobals()

_, err := RequestParamsFromKey(`adapter-price-{"base":"eth"}`)
require.Error(t, err)
require.Panics(t, func() {
RequestParamsFromKey(`adapter-price-{"base":"eth"}`)
})
}

// ---------------------------------------------------------------------------
Expand Down
12 changes: 10 additions & 2 deletions packages/streams-adapter/helpers/keymapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"time"
)

const zaddNotificationTimeout = 2 * time.Second

// KeyMapper learns and stores mappings from raw cache keys (derived from client
// request params with endpoint-only alias resolution) to transformed cache keys
// (derived from the JS adapter's validated+transformed params seen in ZADD).
Expand Down Expand Up @@ -70,11 +72,17 @@ func (km *KeyMapper) SubscribeAndLearn(
rawCacheKey string,
subscribeFn func(),
) {
// Fast path: mapping already known, no lock needed.
if _, ok := km.Get(rawCacheKey); ok {
subscribeFn()
return
}

epLock := km.getEndpointLock(endpointTransport)
epLock.Lock()
defer epLock.Unlock()

// If we already have the mapping, no need to learn again.
// Re-check after acquiring lock — another goroutine may have learned it.
if _, ok := km.Get(rawCacheKey); ok {
subscribeFn()
return
Expand Down Expand Up @@ -105,7 +113,7 @@ func (km *KeyMapper) SubscribeAndLearn(
km.mu.Unlock()
km.logger.Debug("Learned key mapping", "rawKey", rawCacheKey, "transformedKey", transformedKey)
}
case <-time.After(2 * time.Second):
case <-time.After(zaddNotificationTimeout):
km.logger.Debug("Timed out waiting for ZADD notification", "endpoint", endpointTransport, "rawKey", rawCacheKey)
}
}
Expand Down
5 changes: 5 additions & 0 deletions packages/streams-adapter/redcon/redcon.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ func (s *RedconServer) handleEval(conn redcon.Conn, cmd redcon.Command) {
obs.Data = data
}

// Extract timestamps field
if timestamps, hasTimestamps := rawJSON["timestamps"]; hasTimestamps {
obs.Timestamps = timestamps
}

s.cache.Set(params, obs, time.Now(), key)
conn.WriteInt(1)
}
Expand Down
4 changes: 2 additions & 2 deletions packages/streams-adapter/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,8 @@ func (s *Server) adapterHandler(c *gin.Context) {
})

// Remove from tracker after subscription attempt completes.
// Allow retries after 10 seconds if data still not available.
time.Sleep(10 * time.Second)
// Allow retries after delay if data still not available.
time.Sleep(time.Duration(s.config.SubscriptionRetryDelaySeconds) * time.Second)
s.subscriptionTracker.Delete(key)
}(rawCacheKey, rawParams, reqData.Data)
} else if s.config.LogLevel == "debug" {
Expand Down
Loading