Skip to content

Commit 596d0b3

Browse files
committed
refactor(iv): align jwt-invalidated replay with #2613 buffer-and-consume
Rewrite the listener replay to match the reference design from #2613 commit 89cca43: - Lock-protected pendingJwtInvalidatedExternalId (consume-on-first-subscribe) replaces the @volatile lastJwtInvalidatedExternalId (always-replay) - fireJwtInvalidated buffers ONLY when no subscribers exist; otherwise schedules an async fire. Closes the double-fire race. - onModelReplaced clears the buffer on IdentityModel replacement (login or logout switch via UserSwitcher.createAndSwitchToNewUser → replace). - Drop the explicit clearLastJwtInvalidated() method and its calls in OneSignalImp.logout() / logoutSuspend() — clear is now automatic. Match #2599 conventions: - Drop the isInitialized throw in addUserJwtInvalidatedListener / removeUserJwtInvalidatedListener. - Add updateUserJwtSuspend. Update IUserJwtInvalidatedListener docstring to clarify replay is synchronous on the caller's thread; regular fire is async. Tests rewritten for the new semantics: first-subscriber-replay, consume-on-first, fire-with-subscribers-doesn't-buffer, onModelReplaced clears.
1 parent 6b90ff2 commit 596d0b3

5 files changed

Lines changed: 130 additions & 58 deletions

File tree

OneSignalSDK/onesignal/core/src/main/java/com/onesignal/IOneSignal.kt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,4 +255,16 @@ interface IOneSignal {
255255
* Logout the current user (suspend version).
256256
*/
257257
suspend fun logoutSuspend()
258+
259+
/**
260+
* Update the JWT bearer token associated with [externalId] (suspend version). Suspends
261+
* until SDK initialization is complete, then stores the JWT and wakes the operation queue.
262+
*
263+
* @param externalId The external ID the JWT belongs to.
264+
* @param token The new JWT bearer token issued by your backend.
265+
*/
266+
suspend fun updateUserJwtSuspend(
267+
externalId: String,
268+
token: String,
269+
)
258270
}

OneSignalSDK/onesignal/core/src/main/java/com/onesignal/IUserJwtInvalidatedListener.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@ package com.onesignal
66
* detected that the JWT for a user is no longer valid (typically a 401 from
77
* the OneSignal backend on a request signed with that JWT).
88
*
9-
* Callbacks are delivered on a background thread.
9+
* Threading: regular fire delivery happens on a background dispatcher. Replay
10+
* delivery (when an invalidation occurred before any listener was subscribed)
11+
* happens synchronously on the thread that calls
12+
* [IOneSignal.addUserJwtInvalidatedListener]. Implementations should not assume
13+
* a specific thread.
1014
*/
1115
fun interface IUserJwtInvalidatedListener {
1216
/**

OneSignalSDK/onesignal/core/src/main/java/com/onesignal/internal/OneSignalImp.kt

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -420,10 +420,6 @@ internal class OneSignalImp(
420420
}
421421
}
422422

423-
// Clear the replay cache so a stale jwt-invalidated event from the previous user
424-
// doesn't fire for a listener registered after logout switches us to a new user.
425-
services.getServiceOrNull<UserManager>()?.clearLastJwtInvalidated()
426-
427423
val context = logoutHelper.switchUser() ?: return
428424

429425
if (isBackgroundThreadingEnabled) {
@@ -457,16 +453,10 @@ internal class OneSignalImp(
457453
}
458454

459455
override fun addUserJwtInvalidatedListener(listener: IUserJwtInvalidatedListener) {
460-
if (!isInitialized) {
461-
throw IllegalStateException("Must call 'initWithContext' before 'addUserJwtInvalidatedListener'")
462-
}
463456
services.getService<UserManager>().addJwtInvalidatedListener(listener)
464457
}
465458

466459
override fun removeUserJwtInvalidatedListener(listener: IUserJwtInvalidatedListener) {
467-
if (!isInitialized) {
468-
throw IllegalStateException("Must call 'initWithContext' before 'removeUserJwtInvalidatedListener'")
469-
}
470460
services.getService<UserManager>().removeJwtInvalidatedListener(listener)
471461
}
472462

@@ -711,6 +701,22 @@ internal class OneSignalImp(
711701
loginHelper.enqueueLogin(context)
712702
}
713703

704+
override suspend fun updateUserJwtSuspend(
705+
externalId: String,
706+
token: String,
707+
) = withContext(runtimeIoDispatcher) {
708+
Logging.log(LogLevel.DEBUG, "updateUserJwtSuspend(externalId: $externalId, token: ...${token.takeLast(8)})")
709+
710+
suspendUntilInit(operationName = "updateUserJwt")
711+
712+
if (!isInitialized) {
713+
throw IllegalStateException("'initWithContext failed' before 'updateUserJwt'")
714+
}
715+
716+
jwtTokenStore.putJwt(externalId, token)
717+
operationRepo.forceExecuteOperations()
718+
}
719+
714720
override suspend fun logoutSuspend() =
715721
withContext(runtimeIoDispatcher) {
716722
Logging.log(LogLevel.DEBUG, "logoutSuspend()")
@@ -721,8 +727,6 @@ internal class OneSignalImp(
721727
throw IllegalStateException("'initWithContext failed' before 'logout'")
722728
}
723729

724-
services.getServiceOrNull<UserManager>()?.clearLastJwtInvalidated()
725-
726730
val context = logoutHelper.switchUser() ?: return@withContext
727731
logoutHelper.enqueueLogout(context)
728732
}

OneSignalSDK/onesignal/core/src/main/java/com/onesignal/user/internal/UserManager.kt

Lines changed: 40 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,18 @@ internal open class UserManager(
5656
private val jwtInvalidatedNotifier = EventProducer<IUserJwtInvalidatedListener>()
5757

5858
/**
59-
* Most recent invalidation event, replayed to listeners that subscribe after the fact.
60-
* Cleared on logout (via [clearLastJwtInvalidated]) so a stale event from a previous
61-
* user doesn't fire for an unrelated session.
59+
* Buffers a fired invalidation when no listeners are subscribed yet (e.g. SDK init / cold-start
60+
* 401 before app code wires up its listener). Consumed-on-first-subscribe by the next
61+
* [addJwtInvalidatedListener] call. Cleared automatically when the IdentityModel is replaced
62+
* (login or logout) so a stale event doesn't leak across users.
6263
*/
63-
@Volatile
64-
private var lastJwtInvalidatedExternalId: String? = null
64+
private val jwtInvalidatedLock = Any()
65+
private var pendingJwtInvalidatedExternalId: String? = null
6566

6667
/**
6768
* Async dispatch of [IUserJwtInvalidatedListener] callbacks so HYDRATE / op-repo paths
6869
* that synchronously trigger invalidation don't run app code on the SDK's internal thread.
70+
* Replay (synchronous, on the calling thread) bypasses this scope.
6971
*/
7072
private val jwtInvalidatedDispatchScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
7173

@@ -91,17 +93,22 @@ internal open class UserManager(
9193

9294
/**
9395
* Subscribe a developer-facing listener for JWT-invalidated events. If an invalidation
94-
* has already fired before this listener subscribed, the most recent event is replayed
95-
* so apps that subscribe late don't miss the signal.
96+
* has already fired before any listener was subscribed (e.g. early-startup 401), the
97+
* buffered event is delivered to this listener synchronously and consumed — subsequent
98+
* subscribers do not receive that same event.
9699
*/
97100
fun addJwtInvalidatedListener(listener: IUserJwtInvalidatedListener) {
98-
jwtInvalidatedNotifier.subscribe(listener)
99-
val replayId = lastJwtInvalidatedExternalId
100-
if (replayId != null) {
101-
jwtInvalidatedDispatchScope.launch {
102-
runCatching { listener.onUserJwtInvalidated(UserJwtInvalidatedEvent(replayId)) }
103-
.onFailure { Logging.warn("UserManager: replayed jwt-invalidated listener threw: ${it.message}") }
104-
}
101+
val pendingExternalId: String?
102+
synchronized(jwtInvalidatedLock) {
103+
jwtInvalidatedNotifier.subscribe(listener)
104+
pendingExternalId = pendingJwtInvalidatedExternalId
105+
pendingJwtInvalidatedExternalId = null
106+
}
107+
// Deliver the replay outside the lock so a slow listener doesn't block other
108+
// subscribe/unsubscribe/fire calls. Replay runs on the caller's thread (sync).
109+
pendingExternalId?.let {
110+
runCatching { listener.onUserJwtInvalidated(UserJwtInvalidatedEvent(it)) }
111+
.onFailure { ex -> Logging.warn("UserManager: replayed jwt-invalidated listener threw: ${ex.message}") }
105112
}
106113
}
107114

@@ -111,25 +118,24 @@ internal open class UserManager(
111118

112119
/**
113120
* Fire [IUserJwtInvalidatedListener.onUserJwtInvalidated] to all subscribed listeners on
114-
* a background dispatcher. Caches the event for late-subscribing listeners (replay).
121+
* a background dispatcher. If no listener is currently subscribed, buffer the externalId
122+
* to be delivered to the next listener that subscribes (consume-on-first-subscribe).
115123
*/
116124
fun fireJwtInvalidated(externalId: String) {
117-
lastJwtInvalidatedExternalId = externalId
118-
jwtInvalidatedDispatchScope.launch {
119-
runCatching {
120-
jwtInvalidatedNotifier.fire { listener ->
121-
runCatching { listener.onUserJwtInvalidated(UserJwtInvalidatedEvent(externalId)) }
122-
.onFailure { Logging.warn("UserManager: jwt-invalidated listener threw: ${it.message}") }
125+
synchronized(jwtInvalidatedLock) {
126+
if (jwtInvalidatedNotifier.hasSubscribers) {
127+
jwtInvalidatedDispatchScope.launch {
128+
jwtInvalidatedNotifier.fire { listener ->
129+
runCatching { listener.onUserJwtInvalidated(UserJwtInvalidatedEvent(externalId)) }
130+
.onFailure { ex -> Logging.warn("UserManager: jwt-invalidated listener threw: ${ex.message}") }
131+
}
123132
}
124-
}.onFailure { Logging.warn("Failed to deliver JWT invalidated event for externalId=$externalId: ${it.message}") }
133+
} else {
134+
pendingJwtInvalidatedExternalId = externalId
135+
}
125136
}
126137
}
127138

128-
/** Clear the replay cache. Called on logout so stale events don't fire for the next user. */
129-
fun clearLastJwtInvalidated() {
130-
lastJwtInvalidatedExternalId = null
131-
}
132-
133139
// IJwtUpdateListener — JwtTokenStore -> developer-facing event bridge.
134140
override fun onJwtInvalidated(externalId: String) {
135141
fireJwtInvalidated(externalId)
@@ -336,7 +342,13 @@ internal open class UserManager(
336342
override fun onModelReplaced(
337343
model: IdentityModel,
338344
tag: String,
339-
) { }
345+
) {
346+
// IdentityModel replacement = login or logout switch. Clear any buffered invalidation
347+
// so the next user's listener doesn't replay the previous user's stale event.
348+
synchronized(jwtInvalidatedLock) {
349+
pendingJwtInvalidatedExternalId = null
350+
}
351+
}
340352

341353
override fun onModelUpdated(
342354
args: ModelChangedArgs,

OneSignalSDK/onesignal/core/src/test/java/com/onesignal/user/internal/UserManagerTests.kt

Lines changed: 57 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ class UserManagerTests : FunSpec({
227227
firedExternalId shouldBe "alice"
228228
}
229229

230-
test("listener replay: subscribers added after fire receive the most recent event") {
230+
test("listener replay: first subscriber after a no-listener fire receives the buffered event") {
231231
// Given
232232
val jwtTokenStore = JwtTokenStore(MockPreferencesService())
233233
val userManager =
@@ -239,24 +239,18 @@ class UserManagerTests : FunSpec({
239239
MockHelper.languageContext(),
240240
jwtTokenStore,
241241
)
242-
// Fire an invalidation BEFORE any listener is registered.
242+
// Fire BEFORE any listener is registered → buffered.
243243
userManager.fireJwtInvalidated("alice")
244244

245-
var lateExternalId: String? = null
246-
val waiter = com.onesignal.common.threading.Waiter()
247-
248245
// When: listener subscribes after the fire.
249-
userManager.addJwtInvalidatedListener { event ->
250-
lateExternalId = event.externalId
251-
waiter.wake()
252-
}
253-
waiter.waitForWake()
246+
var lateExternalId: String? = null
247+
userManager.addJwtInvalidatedListener { event -> lateExternalId = event.externalId }
254248

255-
// Then: late subscriber receives the cached event.
249+
// Then: replay delivers synchronously on subscribe.
256250
lateExternalId shouldBe "alice"
257251
}
258252

259-
test("clearLastJwtInvalidated stops replay for new subscribers") {
253+
test("buffered event is consumed by the first subscriber; second subscriber gets nothing") {
260254
val jwtTokenStore = JwtTokenStore(MockPreferencesService())
261255
val userManager =
262256
UserManager(
@@ -269,16 +263,62 @@ class UserManagerTests : FunSpec({
269263
)
270264
userManager.fireJwtInvalidated("alice")
271265

272-
// When: cache cleared (e.g. on logout)
273-
userManager.clearLastJwtInvalidated()
266+
// First subscriber consumes the buffered event.
267+
var firstFired: String? = null
268+
userManager.addJwtInvalidatedListener { event -> firstFired = event.externalId }
269+
firstFired shouldBe "alice"
270+
271+
// Second subscriber must NOT receive a replay (buffer was already consumed).
272+
var secondFired = false
273+
userManager.addJwtInvalidatedListener { secondFired = true }
274+
secondFired shouldBe false
275+
}
274276

277+
test("fire when subscribers exist does NOT buffer for late subscribers") {
278+
val jwtTokenStore = JwtTokenStore(MockPreferencesService())
279+
val userManager =
280+
UserManager(
281+
mockk<ISubscriptionManager>(),
282+
MockHelper.identityModelStore(),
283+
MockHelper.propertiesModelStore(),
284+
MockHelper.customEventController(),
285+
MockHelper.languageContext(),
286+
jwtTokenStore,
287+
)
288+
289+
// Existing subscriber at the time of fire.
290+
userManager.addJwtInvalidatedListener { /* no-op */ }
291+
userManager.fireJwtInvalidated("alice")
292+
Thread.sleep(50) // allow async fire to dispatch
293+
294+
// Late subscriber must NOT receive a replay (event was not buffered, since
295+
// there was already a listener at fire time).
275296
var lateFired = false
276297
userManager.addJwtInvalidatedListener { lateFired = true }
298+
lateFired shouldBe false
299+
}
277300

278-
// Give async dispatcher a chance to run if it would have.
279-
Thread.sleep(50)
301+
test("onModelReplaced clears any buffered invalidation event (login/logout switch)") {
302+
val identityModelStore = MockHelper.identityModelStore()
303+
val jwtTokenStore = JwtTokenStore(MockPreferencesService())
304+
val userManager =
305+
UserManager(
306+
mockk<ISubscriptionManager>(),
307+
identityModelStore,
308+
MockHelper.propertiesModelStore(),
309+
MockHelper.customEventController(),
310+
MockHelper.languageContext(),
311+
jwtTokenStore,
312+
)
313+
userManager.fireJwtInvalidated("alice")
314+
315+
// Simulate a user-switch: IdentityModelStore replaces the model and notifies
316+
// subscribers (UserManager subscribes itself in init).
317+
userManager.onModelReplaced(identityModelStore.model, "")
280318

281-
// Then: no replay fired.
319+
// Late subscriber must NOT receive the (now-cleared) buffered event.
320+
var lateFired = false
321+
userManager.addJwtInvalidatedListener { lateFired = true }
282322
lateFired shouldBe false
283323
}
284324

0 commit comments

Comments
 (0)