|
18 | 18 | import java.util.concurrent.atomic.AtomicBoolean; |
19 | 19 | import java.util.stream.Collectors; |
20 | 20 |
|
| 21 | +import static com.launchdarkly.sdk.server.FDv2DataSourceConditions.Condition; |
| 22 | +import static com.launchdarkly.sdk.server.FDv2DataSourceConditions.ConditionFactory; |
| 23 | +import static com.launchdarkly.sdk.server.FDv2DataSourceConditions.FallbackCondition; |
| 24 | +import static com.launchdarkly.sdk.server.FDv2DataSourceConditions.RecoveryCondition; |
| 25 | + |
21 | 26 | class FDv2DataSource implements DataSource { |
22 | 27 | /** |
23 | 28 | * Default fallback timeout of 2 minutes. The timeout is only configurable for testing. |
@@ -50,199 +55,6 @@ class FDv2DataSource implements DataSource { |
50 | 55 |
|
51 | 56 | private final LDLogger logger; |
52 | 57 |
|
53 | | - /** |
54 | | - * Package-private for testing. |
55 | | - */ |
56 | | - interface Condition { |
57 | | - enum ConditionType { |
58 | | - FALLBACK, |
59 | | - RECOVERY, |
60 | | - } |
61 | | - CompletableFuture<Condition> execute(); |
62 | | - |
63 | | - void inform(FDv2SourceResult sourceResult); |
64 | | - |
65 | | - void close() throws IOException; |
66 | | - |
67 | | - ConditionType getType(); |
68 | | - } |
69 | | - |
70 | | - interface ConditionFactory { |
71 | | - Condition build(); |
72 | | - |
73 | | - Condition.ConditionType getType(); |
74 | | - } |
75 | | - |
76 | | - |
77 | | - static abstract class TimedCondition implements Condition { |
78 | | - protected final CompletableFuture<Condition> resultFuture = new CompletableFuture<>(); |
79 | | - |
80 | | - protected final ScheduledExecutorService sharedExecutor; |
81 | | - |
82 | | - /** |
83 | | - * Future for the timeout task, if any. Will be null when no timeout is active. |
84 | | - */ |
85 | | - protected ScheduledFuture<Void> timerFuture; |
86 | | - |
87 | | - /** |
88 | | - * Timeout duration for the fallback operation. |
89 | | - */ |
90 | | - protected final long timeoutSeconds; |
91 | | - |
92 | | - public TimedCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) { |
93 | | - this.sharedExecutor = sharedExecutor; |
94 | | - this.timeoutSeconds = timeoutSeconds; |
95 | | - } |
96 | | - |
97 | | - @Override |
98 | | - public CompletableFuture<Condition> execute() { |
99 | | - return resultFuture; |
100 | | - } |
101 | | - |
102 | | - @Override |
103 | | - public void close() throws IOException { |
104 | | - if (timerFuture != null) { |
105 | | - timerFuture.cancel(false); |
106 | | - timerFuture = null; |
107 | | - } |
108 | | - } |
109 | | - |
110 | | - static abstract class Factory implements ConditionFactory { |
111 | | - protected final ScheduledExecutorService sharedExecutor; |
112 | | - protected final long timeoutSeconds; |
113 | | - |
114 | | - public Factory(ScheduledExecutorService sharedExecutor, long timeout) { |
115 | | - this.sharedExecutor = sharedExecutor; |
116 | | - this.timeoutSeconds = timeout; |
117 | | - } |
118 | | - } |
119 | | - } |
120 | | - |
121 | | - /** |
122 | | - * This condition is used to determine if a fallback should be performed. It is informed of each data source result |
123 | | - * via {@link #inform(FDv2SourceResult)}. Based on the results, it updates its internal state. When the fallback |
124 | | - * condition is met, then the {@link Future} returned by {@link #execute()} will complete. |
125 | | - * <p> |
126 | | - * This is package-private, instead of private, for ease of testing. |
127 | | - */ |
128 | | - static class FallbackCondition extends TimedCondition { |
129 | | - static class Factory extends TimedCondition.Factory { |
130 | | - public Factory(ScheduledExecutorService sharedExecutor, long timeout) { |
131 | | - super(sharedExecutor, timeout); |
132 | | - } |
133 | | - @Override |
134 | | - public Condition build() { |
135 | | - return new FallbackCondition(sharedExecutor, timeoutSeconds); |
136 | | - } |
137 | | - |
138 | | - @Override |
139 | | - public ConditionType getType() { |
140 | | - return ConditionType.FALLBACK; |
141 | | - } |
142 | | - } |
143 | | - |
144 | | - public FallbackCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) { |
145 | | - super(sharedExecutor, timeoutSeconds); |
146 | | - } |
147 | | - |
148 | | - @Override |
149 | | - public void inform(FDv2SourceResult sourceResult) { |
150 | | - if(sourceResult == null) { |
151 | | - return; |
152 | | - } |
153 | | - if(sourceResult.getResultType() == FDv2SourceResult.ResultType.CHANGE_SET) { |
154 | | - if(timerFuture != null) { |
155 | | - timerFuture.cancel(false); |
156 | | - timerFuture = null; |
157 | | - } |
158 | | - } |
159 | | - if(sourceResult.getResultType() == FDv2SourceResult.ResultType.STATUS && sourceResult.getStatus().getState() == FDv2SourceResult.State.INTERRUPTED) { |
160 | | - if (timerFuture == null) { |
161 | | - timerFuture = sharedExecutor.schedule(() -> { |
162 | | - resultFuture.complete(this); |
163 | | - return null; |
164 | | - }, timeoutSeconds, TimeUnit.SECONDS); |
165 | | - } |
166 | | - } |
167 | | - } |
168 | | - |
169 | | - @Override |
170 | | - public ConditionType getType() { |
171 | | - return ConditionType.FALLBACK; |
172 | | - } |
173 | | - } |
174 | | - |
175 | | - static class RecoveryCondition extends TimedCondition { |
176 | | - |
177 | | - static class Factory extends TimedCondition.Factory { |
178 | | - public Factory(ScheduledExecutorService sharedExecutor, long timeout) { |
179 | | - super(sharedExecutor, timeout); |
180 | | - } |
181 | | - @Override |
182 | | - public Condition build() { |
183 | | - return new RecoveryCondition(sharedExecutor, timeoutSeconds); |
184 | | - } |
185 | | - |
186 | | - @Override |
187 | | - public ConditionType getType() { |
188 | | - return ConditionType.RECOVERY; |
189 | | - } |
190 | | - } |
191 | | - |
192 | | - public RecoveryCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) { |
193 | | - super(sharedExecutor, timeoutSeconds); |
194 | | - timerFuture = sharedExecutor.schedule(() -> { |
195 | | - resultFuture.complete(this); |
196 | | - return null; |
197 | | - }, timeoutSeconds, TimeUnit.SECONDS); |
198 | | - } |
199 | | - |
200 | | - @Override |
201 | | - public void inform(FDv2SourceResult sourceResult) { |
202 | | - // Time-based recovery. |
203 | | - } |
204 | | - |
205 | | - @Override |
206 | | - public ConditionType getType() { |
207 | | - return ConditionType.RECOVERY; |
208 | | - } |
209 | | - } |
210 | | - |
211 | | - private static class SynchronizerFactoryWithState { |
212 | | - public enum State { |
213 | | - /** |
214 | | - * This synchronizer is available to use. |
215 | | - */ |
216 | | - Available, |
217 | | - |
218 | | - /** |
219 | | - * This synchronizer is no longer available to use. |
220 | | - */ |
221 | | - Blocked |
222 | | - } |
223 | | - |
224 | | - private final DataSourceFactory<Synchronizer> factory; |
225 | | - |
226 | | - private State state = State.Available; |
227 | | - |
228 | | - |
229 | | - public SynchronizerFactoryWithState(DataSourceFactory<Synchronizer> factory) { |
230 | | - this.factory = factory; |
231 | | - } |
232 | | - |
233 | | - public State getState() { |
234 | | - return state; |
235 | | - } |
236 | | - |
237 | | - public void block() { |
238 | | - state = State.Blocked; |
239 | | - } |
240 | | - |
241 | | - public Synchronizer build() { |
242 | | - return factory.build(); |
243 | | - } |
244 | | - } |
245 | | - |
246 | 58 | public interface DataSourceFactory<T> { |
247 | 59 | T build(); |
248 | 60 | } |
|
0 commit comments