-
Notifications
You must be signed in to change notification settings - Fork 38
feat: discover team members in large teams - WPB-24947 #4720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
2e59f37
66b9b22
56510ce
29bab2f
ff7a378
bb75833
eccfe5b
7310667
0ef13fd
67d3909
fde00ad
25baea1
b5b9a1d
f1ffdc9
6d8a95a
f283cc1
e8b1b9b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,6 +45,7 @@ | |
| private let journal: Journal | ||
| private let mlsGroupRepairAgent: MLSGroupRepairAgentProtocol | ||
| private let earService: EARServiceInterface | ||
| private let teamMemberDiscoveryAgent: any TeamMemberDiscoveryAgentProtocol | ||
|
|
||
| public init( | ||
| selfClientID: String, | ||
|
|
@@ -59,7 +60,8 @@ | |
| liveBrokenGroupSubject: PassthroughSubject<Set<String>, Never>, | ||
| journal: Journal, | ||
| mlsGroupRepairAgent: MLSGroupRepairAgentProtocol, | ||
| earService: EARServiceInterface | ||
| earService: EARServiceInterface, | ||
| teamMemberDiscoveryAgent: any TeamMemberDiscoveryAgentProtocol | ||
| ) { | ||
| self.selfClientID = selfClientID | ||
| self.pushChannelAPI = pushChannelAPI | ||
|
|
@@ -74,6 +76,7 @@ | |
| self.journal = journal | ||
| self.mlsGroupRepairAgent = mlsGroupRepairAgent | ||
| self.earService = earService | ||
| self.teamMemberDiscoveryAgent = teamMemberDiscoveryAgent | ||
| } | ||
|
|
||
| public func perform() async throws -> Token { | ||
|
|
@@ -161,14 +164,18 @@ | |
|
|
||
| await mlsGroupRepairAgent.repairConversations() | ||
|
|
||
| if !backgroundAccessibleOnly { | ||
| await teamMemberDiscoveryAgent.discoverMembers() | ||
| } | ||
|
|
||
| let liveEventTask = Task { @Sendable [self] in | ||
|
Check warning on line 171 in WireDomain/Sources/WireDomain/Synchronization/IncrementalSync.swift
|
||
| logger.info("handling live event stream", attributes: .incrementalSyncV2, .safePublic) | ||
| syncStateSubject.send(.liveSyncing(.ongoing)) | ||
|
|
||
| do { | ||
| // because we might be interrupted when in background, we wrap the sync in an expiringActivity that | ||
| // will cancel the task - not keeping any db operation (sqlite file opened) in suspend mode | ||
| try await withExpiringActivity(reason: "processLiveStream IncrementalSync") { | ||
|
Check warning on line 178 in WireDomain/Sources/WireDomain/Synchronization/IncrementalSync.swift
|
||
| await processLiveEvents( | ||
| liveEventStream: liveEventStream, | ||
| processedEnvelopeIDs: processedEnvelopeIDs, | ||
|
|
@@ -349,7 +356,7 @@ | |
| privateKeys: privateKeys, | ||
| backgroundAccessibleOnly: backgroundAccessibleOnly | ||
| ) | ||
| let envelopes = envelopesWithObjectIDs.map(\.envelope) | ||
|
Check warning on line 359 in WireDomain/Sources/WireDomain/Synchronization/IncrementalSync.swift
|
||
| let envelopesObjectIDs = envelopesWithObjectIDs.map(\.objectID) | ||
|
|
||
| guard !envelopes.isEmpty else { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,7 +20,7 @@ | |
| import Foundation | ||
| import WireCoreCrypto | ||
| import WireDataModel | ||
| import WireLogging | ||
|
Check warning on line 23 in WireDomain/Sources/WireDomain/Synchronization/IncrementalSyncV2.swift
|
||
| import WireNetwork | ||
|
|
||
| public typealias CreatePushChannelStateClosure = () -> PushChannelStateProtocol | ||
|
|
@@ -52,6 +52,7 @@ | |
| private let createPushChannelState: CreatePushChannelStateClosure | ||
| private let mlsGroupRepairAgent: MLSGroupRepairAgentProtocol | ||
| private let earService: EARServiceInterface | ||
| private let teamMemberDiscoveryAgent: any TeamMemberDiscoveryAgentProtocol | ||
|
|
||
| weak var delegate: (any LiveSyncDelegate)? | ||
|
|
||
|
|
@@ -70,6 +71,7 @@ | |
| journal: Journal, | ||
| mlsGroupRepairAgent: MLSGroupRepairAgentProtocol, | ||
| earService: EARServiceInterface, | ||
| teamMemberDiscoveryAgent: any TeamMemberDiscoveryAgentProtocol, | ||
| createPushChannelState: @escaping CreatePushChannelStateClosure, | ||
| syncMarkerGenerator: @escaping SyncMarkerGenerator = { UUID().uuidString } | ||
| ) { | ||
|
|
@@ -87,6 +89,7 @@ | |
| self.journal = journal | ||
| self.mlsGroupRepairAgent = mlsGroupRepairAgent | ||
| self.earService = earService | ||
| self.teamMemberDiscoveryAgent = teamMemberDiscoveryAgent | ||
| self.syncMarkerGenerator = syncMarkerGenerator | ||
| self.createPushChannelState = createPushChannelState | ||
| } | ||
|
|
@@ -152,6 +155,8 @@ | |
|
|
||
| await mlsGroupRepairAgent.repairConversations() | ||
|
|
||
| await teamMemberDiscoveryAgent.discoverMembers() | ||
|
|
||
| let task = Task { @Sendable [self] in | ||
| logger.debug("handling live event stream", attributes: logAttributes) | ||
| syncStateSubject.send(.liveSyncing(.ongoing)) | ||
|
|
@@ -160,14 +165,14 @@ | |
| // because we might be interrupted when in background, we wrap the sync in an expiringActivity that will | ||
| // cancel the task (not keeping any file lock in suspend mode) | ||
| try await withExpiringActivity(reason: "processLiveStream IncrementalSyncV2") { | ||
| await processLiveStream( | ||
|
Check warning on line 168 in WireDomain/Sources/WireDomain/Synchronization/IncrementalSyncV2.swift
|
||
| liveEventStream, | ||
| pushChannel: pushChannel, | ||
| syncMarker: syncMarker | ||
| ) | ||
|
|
||
| WireLogger.sync.debug("Live stream ended, close push channel", attributes: logAttributes) | ||
| await pushChannel.close() | ||
|
Check warning on line 175 in WireDomain/Sources/WireDomain/Synchronization/IncrementalSyncV2.swift
|
||
| await pushChannelState.markAsClosed() | ||
| } | ||
| } catch { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| // | ||
| // Wire | ||
| // Copyright (C) 2026 Wire Swiss GmbH | ||
| // | ||
| // This program is free software: you can redistribute it and/or modify | ||
| // it under the terms of the GNU General Public License as published by | ||
| // the Free Software Foundation, either version 3 of the License, or | ||
| // (at your option) any later version. | ||
| // | ||
| // This program is distributed in the hope that it will be useful, | ||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| // GNU General Public License for more details. | ||
| // | ||
| // You should have received a copy of the GNU General Public License | ||
| // along with this program. If not, see http://www.gnu.org/licenses/. | ||
| // | ||
|
|
||
| import Foundation | ||
|
|
||
| // sourcery: AutoMockable | ||
| /// Discovers self-team members by replaying recent team notifications, | ||
| /// working around the 2000-member cap on the legacy bulk team-members endpoint. | ||
| public protocol TeamMemberDiscoveryAgentProtocol { | ||
|
|
||
| /// Fetch recent team notifications and apply any discovered | ||
| /// `team.member-join` events to local storage. | ||
| func discoverMembers() async | ||
|
|
||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,105 @@ | ||
| // | ||
| // Wire | ||
| // Copyright (C) 2026 Wire Swiss GmbH | ||
| // | ||
| // This program is free software: you can redistribute it and/or modify | ||
| // it under the terms of the GNU General Public License as published by | ||
| // the Free Software Foundation, either version 3 of the License, or | ||
| // (at your option) any later version. | ||
| // | ||
| // This program is distributed in the hope that it will be useful, | ||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| // GNU General Public License for more details. | ||
| // | ||
| // You should have received a copy of the GNU General Public License | ||
| // along with this program. If not, see http://www.gnu.org/licenses/. | ||
| // | ||
|
|
||
| import Foundation | ||
| import WireLogging | ||
| import WireNetwork | ||
|
|
||
| struct TeamMemberDiscoveryAgent: TeamMemberDiscoveryAgentProtocol { | ||
|
|
||
| private let api: any TeamsAPI | ||
| private let store: any TeamLocalStoreProtocol | ||
| private let journal: Journal | ||
|
|
||
| // MARK: - Life cycle | ||
|
|
||
| init( | ||
| api: any TeamsAPI, | ||
| store: any TeamLocalStoreProtocol, | ||
| journal: Journal | ||
| ) { | ||
| self.api = api | ||
| self.store = store | ||
| self.journal = journal | ||
| } | ||
|
|
||
| // MARK: - TeamMemberDiscoveryAgentProtocol | ||
|
|
||
| func discoverMembers() async { | ||
| guard let selfTeamID = await store.selfTeamID() else { | ||
| WireLogger.sync.debug("team member discovery: self user is not in a team") | ||
| return | ||
| } | ||
|
|
||
| do { | ||
| try await runDiscovery(selfTeamID: selfTeamID) | ||
| } catch TeamsAPIError.missedEvents { | ||
| WireLogger.sync.warn( | ||
| "team member discovery: server lost cursor history, resetting and re-running", | ||
| attributes: .safePublic | ||
| ) | ||
| journal[.lastTeamNotificationID] = nil | ||
| do { | ||
| try await runDiscovery(selfTeamID: selfTeamID) | ||
| } catch { | ||
| WireLogger.sync.error( | ||
| "team member discovery failed after cursor reset: \(String(describing: error))" | ||
| ) | ||
| } | ||
| } catch { | ||
| WireLogger.sync.error("team member discovery failed: \(String(describing: error))") | ||
| } | ||
| } | ||
|
|
||
| private func runDiscovery(selfTeamID: UUID) async throws { | ||
| let sinceNotificationID = journal[.lastTeamNotificationID].flatMap(UUID.init(uuidString:)) | ||
| let pager = try api.getNotifications(sinceNotificationID: sinceNotificationID) | ||
|
|
||
| var discoveredCount = 0 | ||
| for try await notifications in pager { | ||
| let teamMembersInfo = notifications.map { notification -> TeamMemberInfo in | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit-pick(subjective): Consider renaming |
||
| switch notification.kind { | ||
| case let .memberJoin(event): | ||
| TeamMemberInfo( | ||
| id: event.userID, | ||
| selfPermission: nil, | ||
| creatorID: nil, | ||
| creationDate: event.time | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| guard !teamMembersInfo.isEmpty else { continue } | ||
|
|
||
| try await store.storeTeamMembers( | ||
| selfTeamID: selfTeamID, | ||
| teamMembersInfo: teamMembersInfo | ||
| ) | ||
| discoveredCount += teamMembersInfo.count | ||
|
|
||
| // Persist forward progress per page so an interruption | ||
| // doesn't force a full re-walk on the next run. | ||
| if let lastID = notifications.last?.id { | ||
| journal[.lastTeamNotificationID] = lastID.uuidString | ||
| } | ||
| } | ||
|
|
||
| WireLogger.sync.debug("team member discovery: stored \(discoveredCount) member(s)") | ||
| } | ||
|
|
||
| } | ||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Uh oh!
There was an error while loading. Please reload this page.