|
31 | 31 | import java.nio.ByteBuffer; |
32 | 32 | import java.util.ArrayDeque; |
33 | 33 | import java.util.Deque; |
| 34 | +import java.util.HashSet; |
34 | 35 | import java.util.Map; |
35 | 36 | import java.util.Map.Entry; |
36 | 37 | import java.util.Set; |
|
54 | 55 | import org.apache.zookeeper.server.Request; |
55 | 56 | import org.apache.zookeeper.server.ServerCnxn; |
56 | 57 | import org.apache.zookeeper.server.ServerMetrics; |
| 58 | +import org.apache.zookeeper.server.SessionTracker; |
57 | 59 | import org.apache.zookeeper.server.TxnLogEntry; |
| 60 | +import org.apache.zookeeper.server.ZKDatabase; |
58 | 61 | import org.apache.zookeeper.server.ZooTrace; |
59 | 62 | import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; |
60 | 63 | import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; |
@@ -579,6 +582,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { |
579 | 582 | boolean snapshotNeeded = true; |
580 | 583 | boolean syncSnapshot = false; |
581 | 584 | readPacket(qp); |
| 585 | + boolean diffSync = qp.getType() == Leader.DIFF; |
582 | 586 | Deque<Long> packetsCommitted = new ArrayDeque<>(); |
583 | 587 | Deque<PacketInFlight> packetsNotLogged = new ArrayDeque<>(); |
584 | 588 |
|
@@ -633,6 +637,10 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { |
633 | 637 | } |
634 | 638 | zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); |
635 | 639 | zk.createSessionTracker(); |
| 640 | + // DIFF keeps the local tree; clear ephemerals without sessions before applying new transactions. |
| 641 | + if (diffSync) { |
| 642 | + purgeOrphanedEphemerals(); |
| 643 | + } |
636 | 644 |
|
637 | 645 | // TODO: Ideally, this should be lastProcessZxid(a.k.a. QuorumPacket::zxid from above), but currently |
638 | 646 | // LearnerHandler does not guarantee this. So, let's be conservative and keep it unchange for now. |
@@ -869,6 +877,43 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { |
869 | 877 | // New server type need to handle in-flight packets |
870 | 878 | throw new UnsupportedOperationException("Unknown server type"); |
871 | 879 | } |
| 880 | + |
| 881 | + } |
| 882 | + |
| 883 | + void purgeOrphanedEphemerals() { |
| 884 | + if (zk == null) { |
| 885 | + return; |
| 886 | + } |
| 887 | + SessionTracker sessionTracker = zk.getSessionTracker(); |
| 888 | + if (sessionTracker == null) { |
| 889 | + return; |
| 890 | + } |
| 891 | + ZKDatabase zkDatabase = zk.getZKDatabase(); |
| 892 | + if (zkDatabase == null) { |
| 893 | + return; |
| 894 | + } |
| 895 | + |
| 896 | + Set<Long> globalSessions = sessionTracker.globalSessions(); |
| 897 | + Set<Long> localSessions = sessionTracker.localSessions(); |
| 898 | + Set<Long> sessionsWithEphemerals = new HashSet<>(zkDatabase.getSessions()); |
| 899 | + if (sessionsWithEphemerals.isEmpty()) { |
| 900 | + return; |
| 901 | + } |
| 902 | + |
| 903 | + long zxid = zkDatabase.getDataTreeLastProcessedZxid(); |
| 904 | + for (Long sessionId : sessionsWithEphemerals) { |
| 905 | + if (globalSessions.contains(sessionId) |
| 906 | + || localSessions.contains(sessionId) |
| 907 | + || (sessionTracker instanceof UpgradeableSessionTracker |
| 908 | + && ((UpgradeableSessionTracker) sessionTracker).isUpgradingSession(sessionId))) { |
| 909 | + continue; |
| 910 | + } |
| 911 | + LOG.warn( |
| 912 | + "Removing ephemeral nodes for unknown session 0x{} after DIFF sync", |
| 913 | + Long.toHexString(sessionId)); |
| 914 | + zkDatabase.killSession(sessionId, zxid); |
| 915 | + sessionTracker.removeSession(sessionId); |
| 916 | + } |
872 | 917 | } |
873 | 918 |
|
874 | 919 | protected void revalidate(QuorumPacket qp) throws IOException { |
|
0 commit comments