Skip to content

feat(selector): subscribe to token DB notifications to invalidate cache#1613

Open
atharrva01 wants to merge 3 commits into
hyperledger-labs:mainfrom
atharrva01:feat/token-cache-notifier-subscription
Open

feat(selector): subscribe to token DB notifications to invalidate cache#1613
atharrva01 wants to merge 3 commits into
hyperledger-labs:mainfrom
atharrva01:feat/token-cache-notifier-subscription

Conversation

@atharrva01
Copy link
Copy Markdown
Contributor

@atharrva01 atharrva01 commented Apr 29, 2026

Closes #884


While working in this area for the sherdlock metrics PR (#1459), I noticed the cachedFetcher only goes stale through two mechanisms: a time-based freshnessInterval and a query-count cap. That means a token written to the DB can sit invisible to the selector for up to a full freshness interval, even though the infrastructure to know about it immediately was already there.

driver.TokenNotifier (with its Subscribe callback) was already implemented and tested in the storage layer. It just wasn't wired anywhere.

What I did:

  • Added a dirty atomic.Int32 flag to cachedFetcher. The subscription callback sets it on any DB write; isCacheStale() checks it so the next query forces an immediate refresh.
  • The flag is cleared before the DB fetch, not after , so a notification that arrives mid-fetch re-sets the flag and triggers another refresh on the next query rather than getting lost.
  • If the DB fetch fails, the flag is restored so the retry happens automatically.
  • The Mixed fetchFunc obtains the notifier from db.Notifier() and passes it down. If the notifier is unavailable, we log a warning and fall back gracefully to the existing time-based behavior , nothing breaks.

Tests added:

  • TestCachedFetcher_NotifierMarksDirtyOnInsert , Insert notification immediately makes cache stale
  • TestCachedFetcher_NotifierMarksDirtyOnDelete , Delete notification immediately makes cache stale
  • TestCachedFetcher_DirtyFlagClearedAfterUpdate , flag is gone after a successful refresh
  • TestCachedFetcher_DirtyFlagRestoredOnDBError , flag survives a failed DB fetch so retries work

@atharrva01
Copy link
Copy Markdown
Contributor Author

Hey @adecaro, I noticed the cachedFetcher had no way to react to DB writes in real time, so I wired driver.TokenNotifier into it. The cache now marks itself stale immediately on any insert or delete, rather than waiting out the freshness interval.

@adecaro adecaro self-requested a review April 29, 2026 04:46
@adecaro adecaro self-assigned this Apr 29, 2026
@adecaro adecaro added this to the Q2/26 milestone Apr 29, 2026
@adecaro adecaro force-pushed the feat/token-cache-notifier-subscription branch from ea96a12 to c8f3782 Compare April 29, 2026 04:47
@adecaro
Copy link
Copy Markdown
Contributor

adecaro commented Apr 29, 2026

Hi @atharrva01 , thanks for picking this up 🙏

So, in a running system, we can have tokens being continuously added and this will trigger a refresh of the cache that will prevent the selector any read. I see a possible substantial degradation of performance here. But what if instead we take the input passed to the listener and surgically update the cache with the new entry? We would only need to keep the lock to add the new token for a very short period. What do you think?

It might be good to check the benchmarks here token/services/selector/benchmark_test.go. Not sure, if they can already catch the scenario this PR is concerned with.

What do you think? 😄

@atharrva01
Copy link
Copy Markdown
Contributor Author

Hey @adecaro, you're right, I missed that.

The tricky part is that TokenRecordReference only gives us {TxID, Index} and no WalletID or token Type, which are what we need to find the right cache bucket. So we'd still need a DB call either way. For inserts the idea would be to call QueryTokenDetails with just that one ID to get the wallet and type, then append to the right bucket. For deletes we'd need a small reverse index alongside the cache (TxID:Index to cache key) so we can find and remove it without scanning everything.

Still much cheaper than a full table scan under the write lock for high-write scenarios, so the direction makes sense to me.

One thing worth flagging, the cache value is currently an immutable iterators.Slice, so to do in-place appends and removes we'd need to switch to a mutable slice covered by the existing mu. Let me know if that's the way to go.

Also the existing benchmarks only cover read throughput on a static cache, they wouldn't catch the write-contention case you're describing. I can add a concurrent-write benchmark to make the improvement visible.

Does this sound like the right direction?

@adecaro
Copy link
Copy Markdown
Contributor

adecaro commented Apr 29, 2026

Hi @atharrva01 , if more is needed we can modify what is put inside TokenRecordReference by updating the storage service. Please, have an assessment there. What do you think?

@atharrva01
Copy link
Copy Markdown
Contributor Author

Hey @adecaro, had a look at the storage layer. The reference gets populated from a Postgres LISTEN/NOTIFY trigger and right now it only emits tx_id and idx in the payload. The notifier already supports extra columns through its PrimaryKey struct so extending it is pretty straightforward, we just pass owner_wallet_id, token_type and quantity as additional columns to NewTokenNotifier, update Subscribe to map them, and add those three fields to TokenRecordReference. That gives the callback everything it needs for a surgical update with no extra DB call at all. 4 files touched total. Happy to go ahead if this sounds right!

@adecaro
Copy link
Copy Markdown
Contributor

adecaro commented Apr 29, 2026

HI @atharrva01 , yes, let's try that. There is a limit to the amount of data that Postgres can send. Let's see if we are in the boundaries 🤞

@adecaro
Copy link
Copy Markdown
Contributor

adecaro commented Apr 29, 2026

@atharrva01 , if you have cycles, please, think also about a benchmark that we can run against Postgres under different loads to see how the selector react. The current benchmarks work against a fake DB. What do you think?

@atharrva01
Copy link
Copy Markdown
Contributor Author

Hey @adecaro, good news, the infrastructure is already there. manager_test.go already spins up a real Postgres container via StartPostgres and wires up the real TokenNotifier. I can add a BenchmarkSelectorWithConcurrentWrites in that same file using startManagers, drive concurrent inserts at different rates alongside selections, and measure throughput. That should make the improvement visible end to end. I'll include this alongside the surgical update changes.

@adecaro
Copy link
Copy Markdown
Contributor

adecaro commented Apr 29, 2026

Hi @atharrva01 , I was double checking and we have an 8,000 bytes limit on the payload Postgres would send back with the current approach. We need to be carefully. I was also checking the performance of this mechanism under high load. It is not design to be efficient under high load so each use case must evaluate if it makes sense to relay on this feature.

So, it might make sense also to have the possibility to disable completely the token notifier in case it starts being a bottleneck.

@atharrva01
Copy link
Copy Markdown
Contributor Author

Hey @adecaro, good catches. On the 8KB limit, individual token fields should fit fine but I'll add a safeguard so the trigger fails loudly rather than silently truncating anything unexpected.

For the disable option, the fetcher already falls back to time-based polling when the notifier is unavailable, I'll just wire an explicit config flag into the selector config so operators can turn it off cleanly if it becomes a bottleneck under load.

@atharrva01 atharrva01 force-pushed the feat/token-cache-notifier-subscription branch from c8f3782 to 1587f6b Compare April 29, 2026 13:30
@atharrva01
Copy link
Copy Markdown
Contributor Author

Hey @adecaro, addressed all three points!

For the surgical update, I extended TokenRecordReference with WalletID, Type, and Quantity so the callback can patch the right cache bucket directly without any extra DB call. Inserts append, deletes filter by TxID+Index. The dirty flag fallback is still there for Updates and for stores that don't wire the notifier (like SQLite).

For the 8KB limit, the trigger now raises an explicit exception before calling pg_notify if the payload exceeds 8000 bytes, so any overflow fails loudly instead of delivering a malformed message.

For the disable option, added tokenNotifierDisabled: true to the selector config. When set, the fetcher skips the notifier entirely and falls back to the time-based freshness interval.

Also added BenchmarkSelectorWithConcurrentWrites in manager_test.go against a real Postgres container. It seeds tokens, drives inserts at ~10/s in the background, and measures selector throughput end to end.

Let me know if anything needs tweaking!

@adecaro adecaro force-pushed the feat/token-cache-notifier-subscription branch from 99af372 to 3cbd485 Compare May 4, 2026 09:45
@adecaro
Copy link
Copy Markdown
Contributor

adecaro commented May 4, 2026

Hi @atharrva01 , thanks for the effort and patience. I will review ASAP 🙏

@atharrva01
Copy link
Copy Markdown
Contributor Author

hey @adecaro , a gentle ping on this pr , whenever you get time , thanks :)

@adecaro adecaro force-pushed the feat/token-cache-notifier-subscription branch 2 times, most recently from 8eaa2b4 to 67dbc86 Compare May 7, 2026 14:23
@adecaro
Copy link
Copy Markdown
Contributor

adecaro commented May 7, 2026

Hi @EvanYan1024 ,
Given that you have pushed recently changes related to the selector service, I would like to ask you some feedback on this PR, if you can 🙏
I want to make sure we introduce something that is helpful especially when running the stack at scale.

Many thanks

Wire driver.TokenNotifier into cachedFetcher so the cache is marked
stale immediately when a token is inserted or deleted from the DB,
eliminating the polling staleness window caused by freshnessInterval.

The dirty flag (atomic.Int32) is set by the subscription callback and
checked in isCacheStale(), ensuring the next query forces a DB refresh.
The flag is cleared before fetching and restored if the DB call fails,
so transient errors retry on the next query.

Closes hyperledger-labs#884

Signed-off-by: atharrva01 <atharvaborade568@gmail.com>
- Extend TokenRecordReference with WalletID, Type, Quantity
- Update Postgres trigger to include owner_wallet_id, token_type, quantity in NOTIFY payload
- Replace dirty-flag invalidation with surgical insert/delete in onTokenChange
- Add 8KB payload guard to trigger (RAISE EXCEPTION on overflow)
- Add TokenNotifierDisabled config flag for operators to opt out
- Add BenchmarkSelectorWit
Signed-off-by: Atharva Borade <atharvaborade568@gmail.com>

Signed-off-by: atharrva01 <atharvaborade568@gmail.com>
Signed-off-by: Atharva Borade <atharvaborade568@gmail.com>
Signed-off-by: atharrva01 <atharvaborade568@gmail.com>
@adecaro adecaro force-pushed the feat/token-cache-notifier-subscription branch from 67dbc86 to 1ae9618 Compare May 10, 2026 06:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

shared token selector: subscribe to the token DB write operations

2 participants