|
31 | 31 | import java.nio.ByteBuffer; |
32 | 32 | import java.util.ArrayDeque; |
33 | 33 | import java.util.Deque; |
| 34 | +import java.util.HashSet; |
34 | 35 | import java.util.Map; |
35 | 36 | import java.util.Map.Entry; |
36 | 37 | import java.util.Set; |
|
54 | 55 | import org.apache.zookeeper.server.Request; |
55 | 56 | import org.apache.zookeeper.server.ServerCnxn; |
56 | 57 | import org.apache.zookeeper.server.ServerMetrics; |
| 58 | +import org.apache.zookeeper.server.SessionTracker; |
57 | 59 | import org.apache.zookeeper.server.TxnLogEntry; |
| 60 | +import org.apache.zookeeper.server.ZKDatabase; |
58 | 61 | import org.apache.zookeeper.server.ZooTrace; |
59 | 62 | import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; |
60 | 63 | import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; |
@@ -558,6 +561,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { |
558 | 561 | boolean snapshotNeeded = true; |
559 | 562 | boolean syncSnapshot = false; |
560 | 563 | readPacket(qp); |
| 564 | + boolean diffSync = qp.getType() == Leader.DIFF; |
561 | 565 | Deque<Long> packetsCommitted = new ArrayDeque<>(); |
562 | 566 | Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>(); |
563 | 567 | Deque<Request> requestsToAck = new ArrayDeque<>(); |
@@ -613,6 +617,10 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { |
613 | 617 | } |
614 | 618 | zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); |
615 | 619 | zk.createSessionTracker(); |
| 620 | + // DIFF keeps the local tree; clear ephemerals without sessions before applying new transactions. |
| 621 | + if (diffSync) { |
| 622 | + purgeOrphanedEphemerals(); |
| 623 | + } |
616 | 624 |
|
617 | 625 | long lastQueued = 0; |
618 | 626 |
|
@@ -845,6 +853,43 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { |
845 | 853 | // New server type need to handle in-flight packets |
846 | 854 | throw new UnsupportedOperationException("Unknown server type"); |
847 | 855 | } |
| 856 | + |
| 857 | + } |
| 858 | + |
| 859 | + void purgeOrphanedEphemerals() { |
| 860 | + if (zk == null) { |
| 861 | + return; |
| 862 | + } |
| 863 | + SessionTracker sessionTracker = zk.getSessionTracker(); |
| 864 | + if (sessionTracker == null) { |
| 865 | + return; |
| 866 | + } |
| 867 | + ZKDatabase zkDatabase = zk.getZKDatabase(); |
| 868 | + if (zkDatabase == null) { |
| 869 | + return; |
| 870 | + } |
| 871 | + |
| 872 | + Set<Long> globalSessions = sessionTracker.globalSessions(); |
| 873 | + Set<Long> localSessions = sessionTracker.localSessions(); |
| 874 | + Set<Long> sessionsWithEphemerals = new HashSet<>(zkDatabase.getSessions()); |
| 875 | + if (sessionsWithEphemerals.isEmpty()) { |
| 876 | + return; |
| 877 | + } |
| 878 | + |
| 879 | + long zxid = zkDatabase.getDataTreeLastProcessedZxid(); |
| 880 | + for (Long sessionId : sessionsWithEphemerals) { |
| 881 | + if (globalSessions.contains(sessionId) |
| 882 | + || localSessions.contains(sessionId) |
| 883 | + || (sessionTracker instanceof UpgradeableSessionTracker |
| 884 | + && ((UpgradeableSessionTracker) sessionTracker).isUpgradingSession(sessionId))) { |
| 885 | + continue; |
| 886 | + } |
| 887 | + LOG.warn( |
| 888 | + "Removing ephemeral nodes for unknown session 0x{} after DIFF sync", |
| 889 | + Long.toHexString(sessionId)); |
| 890 | + zkDatabase.killSession(sessionId, zxid); |
| 891 | + sessionTracker.removeSession(sessionId); |
| 892 | + } |
848 | 893 | } |
849 | 894 |
|
850 | 895 | protected void revalidate(QuorumPacket qp) throws IOException { |
|
0 commit comments