Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,24 @@ public static TargetIdGenerator forSyncEngine() {
return new TargetIdGenerator(SYNC_ENGINE_ID, 1);
}

/**
* Creates and returns a new TargetIdGenerator for remote target cache watches.
*
* @return A new instance of TargetIdGenerator starting at 1000 (even).
*/
public static TargetIdGenerator forRemoteTargetCache() {
return new TargetIdGenerator(QUERY_CACHE_ID, 1000);
}

/**
* Creates and returns a new TargetIdGenerator for remote sync engine watches.
*
* @return A new instance of TargetIdGenerator starting at 1001 (odd).
*/
public static TargetIdGenerator forRemoteSyncEngine() {
return new TargetIdGenerator(SYNC_ENGINE_ID, 1001);
}

private static final int QUERY_CACHE_ID = 0;
private static final int SYNC_ENGINE_ID = 1;

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.firebase.firestore.remote;

import androidx.annotation.NonNull;

/** Represents a transient, session-specific target ID used strictly over the watch stream. */
public final class RemoteTargetId implements Comparable<RemoteTargetId> {
private final int value;

private RemoteTargetId(int value) {
this.value = value;
}

public static RemoteTargetId from(int value) {
return new RemoteTargetId(value);
}

public int value() {
return value;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
return value == ((RemoteTargetId) o).value;
}

@Override
public int hashCode() {
return value;
}

@Override
public int compareTo(@NonNull RemoteTargetId other) {
return Integer.compare(value, other.value);
}

@Override
public String toString() {
return "RemoteTargetId(" + value + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,32 +51,38 @@ public interface TargetMetadataProvider {
* Returns the set of remote document keys for the given target ID as of the last raised
* snapshot or an empty set of document keys for unknown targets.
*/
ImmutableSortedSet<DocumentKey> getRemoteKeysForTarget(int targetId);
ImmutableSortedSet<DocumentKey> getRemoteKeysForTarget(RemoteTargetId targetId);

/**
* Returns the TargetData for an active target ID or 'null' if this query is unknown or has
* become inactive.
*/
@Nullable
TargetData getTargetDataForTarget(int targetId);
TargetData getTargetDataForTarget(RemoteTargetId targetId);

/**
* Translates a RemoteTargetId to its stable SDK TargetId. Returns the remoteTargetId's value
* if no mapping exists.
*/
int getSdkTargetId(RemoteTargetId remoteTargetId);
}

private final TargetMetadataProvider targetMetadataProvider;

/** The internal state of all tracked targets. */
private final Map<Integer, TargetState> targetStates = new HashMap<>();
private final Map<RemoteTargetId, TargetState> targetStates = new HashMap<>();

/** Keeps track of the documents to update since the last raised snapshot. */
private Map<DocumentKey, MutableDocument> pendingDocumentUpdates = new HashMap<>();

/** A mapping of document keys to their set of target IDs. */
private Map<DocumentKey, Set<Integer>> pendingDocumentTargetMapping = new HashMap<>();
private Map<DocumentKey, Set<RemoteTargetId>> pendingDocumentTargetMapping = new HashMap<>();

/**
* A map of targets with existence filter mismatches. These targets are known to be inconsistent
* and their listens needs to be re-established by RemoteStore.
*/
private Map<Integer, QueryPurpose> pendingTargetResets = new HashMap<>();
private Map<RemoteTargetId, QueryPurpose> pendingTargetResets = new HashMap<>();

private final DatabaseId databaseId;

Expand All @@ -102,21 +108,23 @@ public void handleDocumentChange(DocumentChange documentChange) {
DocumentKey documentKey = documentChange.getDocumentKey();

for (int targetId : documentChange.getUpdatedTargetIds()) {
RemoteTargetId remoteTargetId = RemoteTargetId.from(targetId);
if (document != null && document.isFoundDocument()) {
addDocumentToTarget(targetId, document);
addDocumentToTarget(remoteTargetId, document);
} else {
removeDocumentFromTarget(targetId, documentKey, document);
removeDocumentFromTarget(remoteTargetId, documentKey, document);
}
}

for (int targetId : documentChange.getRemovedTargetIds()) {
removeDocumentFromTarget(targetId, documentKey, documentChange.getNewDocument());
RemoteTargetId remoteTargetId = RemoteTargetId.from(targetId);
removeDocumentFromTarget(remoteTargetId, documentKey, documentChange.getNewDocument());
}
}

/** Processes and adds the WatchTargetChange to the current set of changes. */
public void handleTargetChange(WatchTargetChange targetChange) {
for (int targetId : getTargetIds(targetChange)) {
for (RemoteTargetId targetId : getTargetIds(targetChange)) {
TargetState targetState = ensureTargetState(targetId);

switch (targetChange.getChangeType()) {
Expand All @@ -142,7 +150,7 @@ public void handleTargetChange(WatchTargetChange targetChange) {
// We need to decrement the number of pending acks needed from watch for this targetId.
targetState.recordTargetResponse();
if (!targetState.isPending()) {
removeTarget(targetId);
removeTarget(targetId.value());
}
hardAssert(
targetChange.getCause() == null,
Expand Down Expand Up @@ -173,18 +181,16 @@ public void handleTargetChange(WatchTargetChange targetChange) {
* Returns all targetIds that the watch change applies to: either the targetIds explicitly listed
* in the change or the targetIds of all currently active targets.
*/
private Collection<Integer> getTargetIds(WatchTargetChange targetChange) {
private Collection<RemoteTargetId> getTargetIds(WatchTargetChange targetChange) {
List<Integer> targetIds = targetChange.getTargetIds();
if (!targetIds.isEmpty()) {
return targetIds;
} else {
List<Integer> activeIds = new ArrayList<>();
for (Integer id : targetStates.keySet()) {
if (isActiveTarget(id)) {
activeIds.add(id);
}
List<RemoteTargetId> result = new ArrayList<>(targetIds.size());
for (int id : targetIds) {
result.add(RemoteTargetId.from(id));
}
return activeIds;
return result;
} else {
return targetStates.keySet();
}
}

Expand All @@ -193,7 +199,7 @@ private Collection<Integer> getTargetIds(WatchTargetChange targetChange) {
* invalidated by filter mismatches are added to `pendingTargetResets`.
*/
public void handleExistenceFilter(ExistenceFilterWatchChange watchChange) {
int targetId = watchChange.getTargetId();
RemoteTargetId targetId = RemoteTargetId.from(watchChange.getTargetId());
int expectedCount = watchChange.getExistenceFilter().getCount();

TargetData targetData = queryDataForActiveTarget(targetId);
Expand Down Expand Up @@ -285,7 +291,8 @@ private BloomFilterApplicationStatus applyBloomFilter(
BloomFilter bloomFilter, ExistenceFilterWatchChange watchChange, int currentCount) {
int expectedCount = watchChange.getExistenceFilter().getCount();

int removedDocumentCount = this.filterRemovedDocuments(bloomFilter, watchChange.getTargetId());
RemoteTargetId remoteTargetId = RemoteTargetId.from(watchChange.getTargetId());
int removedDocumentCount = this.filterRemovedDocuments(bloomFilter, remoteTargetId);

return (expectedCount == (currentCount - removedDocumentCount))
? BloomFilterApplicationStatus.SUCCESS
Expand All @@ -296,7 +303,7 @@ private BloomFilterApplicationStatus applyBloomFilter(
* Filter out removed documents based on bloom filter membership result and return number of
* documents removed.
*/
private int filterRemovedDocuments(BloomFilter bloomFilter, int targetId) {
private int filterRemovedDocuments(BloomFilter bloomFilter, RemoteTargetId targetId) {
ImmutableSortedSet<DocumentKey> existingKeys =
targetMetadataProvider.getRemoteKeysForTarget(targetId);
int removalCount = 0;
Expand All @@ -323,8 +330,8 @@ private int filterRemovedDocuments(BloomFilter bloomFilter, int targetId) {
public RemoteEvent createRemoteEvent(SnapshotVersion snapshotVersion) {
Map<Integer, TargetChange> targetChanges = new HashMap<>();

for (Map.Entry<Integer, TargetState> entry : targetStates.entrySet()) {
int targetId = entry.getKey();
for (Map.Entry<RemoteTargetId, TargetState> entry : targetStates.entrySet()) {
RemoteTargetId targetId = entry.getKey();
TargetState targetState = entry.getValue();

TargetData targetData = queryDataForActiveTarget(targetId);
Expand All @@ -343,7 +350,8 @@ public RemoteEvent createRemoteEvent(SnapshotVersion snapshotVersion) {
}

if (targetState.hasChanges()) {
targetChanges.put(targetId, targetState.toTargetChange());
int sdkTargetId = targetMetadataProvider.getSdkTargetId(targetId);
targetChanges.put(sdkTargetId, targetState.toTargetChange());
targetState.clearChanges();
}
}
Expand All @@ -355,13 +363,14 @@ public RemoteEvent createRemoteEvent(SnapshotVersion snapshotVersion) {
// that do not appear in the query cache.
//
// TODO(gsoltis): Expand on this comment once GC is available in the Android client.
for (Map.Entry<DocumentKey, Set<Integer>> entry : pendingDocumentTargetMapping.entrySet()) {
for (Map.Entry<DocumentKey, Set<RemoteTargetId>> entry :
pendingDocumentTargetMapping.entrySet()) {
DocumentKey key = entry.getKey();
Set<Integer> targets = entry.getValue();
Set<RemoteTargetId> targets = entry.getValue();

boolean isOnlyLimboTarget = true;

for (int targetId : targets) {
for (RemoteTargetId targetId : targets) {
TargetData targetData = queryDataForActiveTarget(targetId);
if (targetData != null && !targetData.getPurpose().equals(QueryPurpose.LIMBO_RESOLUTION)) {
isOnlyLimboTarget = false;
Expand All @@ -378,11 +387,17 @@ public RemoteEvent createRemoteEvent(SnapshotVersion snapshotVersion) {
document.setReadTime(snapshotVersion);
}

Map<Integer, QueryPurpose> translatedTargetMismatches = new HashMap<>();
for (Map.Entry<RemoteTargetId, QueryPurpose> entry : pendingTargetResets.entrySet()) {
int sdkTargetId = targetMetadataProvider.getSdkTargetId(entry.getKey());
translatedTargetMismatches.put(sdkTargetId, entry.getValue());
}

RemoteEvent remoteEvent =
new RemoteEvent(
snapshotVersion,
Collections.unmodifiableMap(targetChanges),
Collections.unmodifiableMap(pendingTargetResets),
Collections.unmodifiableMap(translatedTargetMismatches),
Collections.unmodifiableMap(pendingDocumentUpdates),
Collections.unmodifiableSet(resolvedLimboDocuments));

Expand All @@ -398,7 +413,7 @@ public RemoteEvent createRemoteEvent(SnapshotVersion snapshotVersion) {
* Adds the provided document to the internal list of document updates and its document key to the
* given target's mapping.
*/
private void addDocumentToTarget(int targetId, MutableDocument document) {
private void addDocumentToTarget(RemoteTargetId targetId, MutableDocument document) {
if (!isActiveTarget(targetId)) {
return;
}
Expand All @@ -423,7 +438,7 @@ private void addDocumentToTarget(int targetId, MutableDocument document) {
* provided to update the remote document cache.
*/
private void removeDocumentFromTarget(
int targetId, DocumentKey key, @Nullable MutableDocument updatedDocument) {
RemoteTargetId targetId, DocumentKey key, @Nullable MutableDocument updatedDocument) {
if (!isActiveTarget(targetId)) {
return;
}
Expand All @@ -445,15 +460,15 @@ private void removeDocumentFromTarget(
}

void removeTarget(int targetId) {
targetStates.remove(targetId);
targetStates.remove(RemoteTargetId.from(targetId));
}

/**
* Returns the current count of documents in the target. This includes both the number of
* documents that the LocalStore considers to be part of the target as well as any accumulated
* changes.
*/
private int getCurrentDocumentCountForTarget(int targetId) {
private int getCurrentDocumentCountForTarget(RemoteTargetId targetId) {
TargetState targetState = ensureTargetState(targetId);
TargetChange targetChange = targetState.toTargetChange();
return (targetMetadataProvider.getRemoteKeysForTarget(targetId).size()
Expand All @@ -467,11 +482,11 @@ private int getCurrentDocumentCountForTarget(int targetId) {
*/
void recordPendingTargetRequest(int targetId) {
// For each request we get we need to record we need a response for it.
TargetState targetState = ensureTargetState(targetId);
TargetState targetState = ensureTargetState(RemoteTargetId.from(targetId));
targetState.recordPendingTargetRequest();
}

private TargetState ensureTargetState(int targetId) {
private TargetState ensureTargetState(RemoteTargetId targetId) {
TargetState targetState = targetStates.get(targetId);
if (targetState == null) {
targetState = new TargetState();
Expand All @@ -481,8 +496,8 @@ private TargetState ensureTargetState(int targetId) {
return targetState;
}

private Set<Integer> ensureDocumentTargetMapping(DocumentKey key) {
Set<Integer> targetMapping = pendingDocumentTargetMapping.get(key);
private Set<RemoteTargetId> ensureDocumentTargetMapping(DocumentKey key) {
Set<RemoteTargetId> targetMapping = pendingDocumentTargetMapping.get(key);

if (targetMapping == null) {
targetMapping = new HashSet<>();
Expand All @@ -496,7 +511,7 @@ private Set<Integer> ensureDocumentTargetMapping(DocumentKey key) {
* Verifies that the user is still interested in this target (by calling
* `getTargetDataForTarget()`) and that we are not waiting for pending ADDs from watch.
*/
private boolean isActiveTarget(int targetId) {
private boolean isActiveTarget(RemoteTargetId targetId) {
return queryDataForActiveTarget(targetId) != null;
}

Expand All @@ -505,7 +520,7 @@ private boolean isActiveTarget(int targetId) {
* interested in that has no outstanding target change requests).
*/
@Nullable
private TargetData queryDataForActiveTarget(int targetId) {
private TargetData queryDataForActiveTarget(RemoteTargetId targetId) {
TargetState targetState = targetStates.get(targetId);
return targetState != null && targetState.isPending()
? null
Expand All @@ -516,7 +531,7 @@ private TargetData queryDataForActiveTarget(int targetId) {
* Resets the state of a Watch target to its initial state (sets 'current' to false, clears the
* resume token and removes its target mapping from all documents).
*/
private void resetTarget(int targetId) {
private void resetTarget(RemoteTargetId targetId) {
hardAssert(
targetStates.get(targetId) != null && !targetStates.get(targetId).isPending(),
"Should only reset active targets");
Expand All @@ -532,7 +547,7 @@ private void resetTarget(int targetId) {
}

/** Returns whether the LocalStore considers the document to be part of the specified target. */
private boolean targetContainsDocument(int targetId, DocumentKey key) {
private boolean targetContainsDocument(RemoteTargetId targetId, DocumentKey key) {
ImmutableSortedSet<DocumentKey> existingKeys =
targetMetadataProvider.getRemoteKeysForTarget(targetId);
return existingKeys.contains(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,12 @@ void writeWatchChange(WatchChange change, SnapshotVersion snapshotVersion) {
// Technically removing an unknown target is valid (e.g. it could race with a
// server-side removal), but we want to pay extra careful attention in tests
// that we only remove targets we listened too.
throw new IllegalStateException("Removing a non-active target");
if (!allowUnlistedTargetRemoval) {
throw new IllegalStateException("Removing a non-active target");
}
} else {
activeTargets.remove(targetId);
}
activeTargets.remove(targetId);
}
}
if (!targetChange.getTargetIds().isEmpty()) {
Expand Down Expand Up @@ -209,6 +212,8 @@ int getWritesSent() {
}
}

public boolean allowUnlistedTargetRemoval = false;

private MockWatchStream watchStream;
private MockWriteStream writeStream;
private int writeStreamRequestCount;
Expand Down
Loading
Loading