|
204 | 204 | (a/<! (a/thread (mm/clean-tempfiles! attachments))) |
205 | 205 | (recur))))) |
206 | 206 |
|
| 207 | +(defn- get-xapi-version |
| 208 | + [config source-or-target] |
| 209 | + (get-in config [source-or-target :request-config :xapi-version])) |
| 210 | + |
207 | 211 | (defn- statement-loop |
208 | 212 | "Async loop to break GET batches into statements and handle attachments." |
209 | | - [get-chan |
| 213 | + [config |
| 214 | + get-chan |
210 | 215 | statement-chan |
211 | 216 | stop-chan] |
212 | | - (a/go-loop [] |
213 | | - (if-let [[tag x] (a/<! get-chan)] |
214 | | - (do |
215 | | - (case tag |
216 | | - :response |
217 | | - (let [{{:keys [attachments]} :body |
218 | | - :as resp} x |
219 | | - ;; Wrap errors and threadiness |
220 | | - {:keys [statements |
221 | | - error]} |
222 | | - (if (not-empty attachments) |
223 | | - ;; We have to do attachments in a thread |
224 | | - (a/<! |
225 | | - (a/thread |
226 | | - (try {:statements (xapi/response->statements resp)} |
227 | | - (catch Throwable ex |
228 | | - {:error ex})))) |
229 | | - (try {:statements (xapi/response->statements resp)} |
230 | | - (catch Throwable ex |
231 | | - {:error ex})))] |
232 | | - (if error |
233 | | - (a/>! stop-chan |
234 | | - {:status :error |
235 | | - :error {:message (ex-message error) |
236 | | - :type :job}}) |
237 | | - (a/<! (a/onto-chan! statement-chan statements false)))) |
238 | | - :exception |
239 | | - (a/>! stop-chan |
240 | | - {:status :error |
241 | | - :error {:message (ex-message x) |
242 | | - :type :source}})) |
243 | | - (recur)) |
244 | | - (a/close! statement-chan)))) |
| 217 | + (let [source-version (get-xapi-version config :source) |
| 218 | + target-version (get-xapi-version config :target) |
| 219 | + downgrade-version? (and (= "2.0.0" source-version) |
| 220 | + (= "1.0.3" target-version))] |
| 221 | + (when downgrade-version? |
| 222 | + (log/warnf "Source version 2.0.0 and target version 1.0.3. Statements will be downgraded.")) |
| 223 | + (a/go-loop [] |
| 224 | + (if-let [[tag x] (a/<! get-chan)] |
| 225 | + (do |
| 226 | + (case tag |
| 227 | + :response |
| 228 | + (let [{{:keys [attachments]} :body |
| 229 | + :as resp} x |
| 230 | + ;; Wrap errors and threadiness |
| 231 | + {:keys [statements |
| 232 | + error]} |
| 233 | + (if (not-empty attachments) |
| 234 | + ;; We have to do attachments in a thread |
| 235 | + (a/<! |
| 236 | + (a/thread |
| 237 | + (try {:statements (xapi/response->statements resp)} |
| 238 | + (catch Throwable ex |
| 239 | + {:error ex})))) |
| 240 | + (try {:statements (xapi/response->statements resp)} |
| 241 | + (catch Throwable ex |
| 242 | + {:error ex})))] |
| 243 | + (if error |
| 244 | + (a/>! stop-chan |
| 245 | + {:status :error |
| 246 | + :error {:message (ex-message error) |
| 247 | + :type :job}}) |
| 248 | + (a/<! (a/onto-chan! statement-chan |
| 249 | + (cond->> statements |
| 250 | + downgrade-version? |
| 251 | + (map xapi/convert-200-to-103)) |
| 252 | + false)))) |
| 253 | + :exception |
| 254 | + (a/>! stop-chan |
| 255 | + {:status :error |
| 256 | + :error {:message (ex-message x) |
| 257 | + :type :source}})) |
| 258 | + (recur)) |
| 259 | + (a/close! statement-chan))))) |
245 | 260 |
|
246 | 261 | (defn- init-buffers |
247 | 262 | [{{{source-batch-size :batch-size} :source |
|
291 | 306 | conn-mgr-opts |
292 | 307 | source-client-opts |
293 | 308 | target-client-opts] |
294 | | - :or {conn-mgr-opts {} |
295 | | - source-client-opts {} |
296 | | - target-client-opts {}}} :client-opts |
297 | | - reporter :reporter |
298 | | - :or {reporter (metrics/->NoopReporter)}}] |
299 | | - (let [{:keys [id] |
| 309 | + :or {conn-mgr-opts {} |
| 310 | + source-client-opts {} |
| 311 | + target-client-opts {}}} :client-opts |
| 312 | + reporter :reporter |
| 313 | + :or {reporter (metrics/->NoopReporter)}}] |
| 314 | + (let [{:keys [id] |
300 | 315 | {status-before :status |
301 | 316 | cursor-before :cursor |
302 | 317 | filter-before :filter |
303 | | - :as state-before} :state |
304 | | - {{:keys [poll-interval] |
| 318 | + :as state-before} :state |
| 319 | + {{:keys [poll-interval] |
305 | 320 | {?query-since :since |
306 | | - :as get-params} :get-params |
307 | | - get-req-config :request-config |
308 | | - source-backoff-opts :backoff-opts |
309 | | - :as source-config} :source |
| 321 | + :as get-params} :get-params |
| 322 | + get-req-config :request-config |
| 323 | + source-backoff-opts :backoff-opts |
| 324 | + :as source-config} :source |
310 | 325 | {target-batch-size :batch-size |
311 | 326 | post-req-config :request-config |
312 | 327 | target-backoff-opts :backoff-opts |
313 | | - :as target-config} :target |
314 | | - filter-config :filter |
315 | | - :keys [batch-timeout]} :config |
316 | | - :as job-before} |
| 328 | + :as target-config} :target |
| 329 | + filter-config :filter |
| 330 | + :keys [batch-timeout] |
| 331 | + :as config} :config |
| 332 | + :as job-before} |
317 | 333 | (update job :config config/ensure-defaults)] |
318 | 334 | (case status-before |
319 | 335 | :running (throw (ex-info "Job already running!" |
|
326 | 342 | {:type ::cannot-start-completed |
327 | 343 | :job job-before})) |
328 | 344 | (let [;; Get a timestamp for the instant before initialization |
329 | | - init-stamp (t/now-stamp) |
| 345 | + init-stamp (t/now-stamp) |
330 | 346 | ;; The states channel emits the job as it runs |
331 | 347 | states-chan (a/chan) |
332 | 348 |
|
|
336 | 352 | batch-buffer |
337 | 353 | cleanup-buffer]} (init-buffers job-before) |
338 | 354 | ;; Http async conn pool + client |
339 | | - conn-mgr (or conn-mgr |
340 | | - (client/init-conn-mgr |
341 | | - conn-mgr-opts)) |
342 | | - source-client (or http-client |
343 | | - (client/init-client |
344 | | - conn-mgr source-client-opts)) |
345 | | - target-client (or http-client |
346 | | - (client/init-client |
347 | | - conn-mgr target-client-opts)) |
| 355 | + conn-mgr (or conn-mgr |
| 356 | + (client/init-conn-mgr |
| 357 | + conn-mgr-opts)) |
| 358 | + source-client (or http-client |
| 359 | + (client/init-client |
| 360 | + conn-mgr source-client-opts)) |
| 361 | + target-client (or http-client |
| 362 | + (client/init-client |
| 363 | + conn-mgr target-client-opts)) |
348 | 364 |
|
349 | 365 | stop-chan (a/promise-chan) |
350 | 366 | stop-fn #(a/put! stop-chan {:status :paused}) |
351 | 367 |
|
352 | 368 | ;; A channel for get responses |
353 | | - get-chan (a/chan get-buffer) |
| 369 | + get-chan (a/chan get-buffer) |
354 | 370 | ;; A channel that holds statements + attachments |
355 | 371 | statement-chan (a/chan statement-buffer) |
356 | 372 | ;; A channel for batches of statements to target |
357 | | - batch-chan (a/chan batch-buffer) |
| 373 | + batch-chan (a/chan batch-buffer) |
358 | 374 | ;; A channel to get dropped records |
359 | | - cleanup-chan (a/chan cleanup-buffer |
360 | | - ;; Will only pass records with attachments |
361 | | - (filter |
362 | | - (comp |
363 | | - not-empty |
364 | | - :attachments))) |
| 375 | + cleanup-chan (a/chan cleanup-buffer |
| 376 | + ;; Will only pass records with attachments |
| 377 | + (filter |
| 378 | + (comp |
| 379 | + not-empty |
| 380 | + :attachments))) |
365 | 381 |
|
366 | 382 | ;; Cleanup loop deletes tempfiles from batch + post |
367 | 383 | cloop (cleanup-loop cleanup-chan) |
|
382 | 398 | :backoff-opts |
383 | 399 | source-backoff-opts |
384 | 400 | :conn-opts |
385 | | - {:conn-mgr conn-mgr |
| 401 | + {:conn-mgr conn-mgr |
386 | 402 | :http-client source-client} |
387 | 403 | :reporter reporter) |
388 | 404 |
|
389 | 405 | ;; Pipeline from get-chan to statement chan |
390 | | - sloop (statement-loop get-chan statement-chan stop-chan) |
| 406 | + ;; Performs transformations when required |
| 407 | + sloop (statement-loop |
| 408 | + config |
| 409 | + get-chan |
| 410 | + statement-chan |
| 411 | + stop-chan) |
391 | 412 |
|
392 | 413 | ;; Batch + filter statements from statement chan to batch chan |
393 | 414 | bloop (ua/batch-filter |
|
409 | 430 | {:state (-> state-before |
410 | 431 | (cond-> |
411 | 432 | ;; If a job is paused, re-init |
412 | | - (= :paused (:status state-before)) |
| 433 | + (= :paused (:status state-before)) |
413 | 434 | (state/set-status :init)) |
414 | 435 | (assoc :updated init-stamp))}) |
415 | 436 | states-chan |
416 | 437 | stop-chan |
417 | 438 | batch-chan |
418 | 439 | cleanup-chan |
419 | | - {:conn-mgr conn-mgr |
| 440 | + {:conn-mgr conn-mgr |
420 | 441 | :http-client target-client} |
421 | 442 | reporter)] |
422 | 443 |
|
|
438 | 459 | (a/close! states-chan)) |
439 | 460 |
|
440 | 461 | ;; Return the state emitter and stop fn |
441 | | - {:states states-chan |
| 462 | + {:states states-chan |
442 | 463 | :stop-fn stop-fn})))) |
443 | 464 |
|
444 | 465 | (s/fdef log-states |
|
0 commit comments