Skip to content

Commit 9015a0b

Browse files
committed
Split Storage_migrate.start
The original Storage_migrate.start function is quite fat and does lots of things. For better readability and maintainability, split it into multiple functions that represent different stages of the SXM: - prepare: makes a remote call to the dest host and asks it to prepare for the VDIs for receiving data - mirror_pass_fds: passes the fd to tapdisk to establish connection between two tapdisk processes (sending & receiving) - mirror_snapshot: takes a snapshot of the VDI to be mirrored, because the tapdisk mirroring only mirrors data written after the mirror is initiated - mirror_copy: copy the snapshot from the source to destination As these operations are all specific to SMAPIv1, move them to `storage_smapiv1_migrate` as well. Also restructure the way clean up is done. As there are multiple points at which the migration might fail, previously a `on_fail` list ref is constructed and populated as we go. Now that we have explicitly defined the different errors during the migration, we could handle these errors accordingly. The way to reason about cleanups is: we have three possible clean up actions: - receive_cancel: which will undo the preparation work done on the dest host. This is added once preparation is done; - destroy snapshot: which will destroy the snapshot. This is called after the snapshot is created; - stop: which will disable mirroring, delete snapshot and also do what receive_cancel does, which is basically clean up everything. And this will be called after the mirror has been started. Note many of these functions are best-effort, for example, the stop function won't delete a snapshot if it does not exist, so it's generally safe to call them. We put these clean up functions into the appropriate places, see the actual arrangement of these functions in the code, they should be self-explanatory. Signed-off-by: Vincent Liu <shuntian.liu2@cloud.com>
1 parent 4005b4f commit 9015a0b

3 files changed

Lines changed: 263 additions & 149 deletions

File tree

ocaml/xapi/storage_migrate.ml

Lines changed: 37 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,20 @@ module MigrateLocal = struct
247247
| e ->
248248
raise e
249249

250+
let prepare ~dbg ~sr ~vdi ~dest ~local_vdi ~mirror_id ~mirror_vm ~url
251+
~verify_dest =
252+
try
253+
let (module Remote) = get_remote_backend url verify_dest in
254+
let similars = similar_vdis ~dbg ~sr ~vdi in
255+
256+
Remote.DATA.MIRROR.receive_start2 dbg dest local_vdi mirror_id similars
257+
mirror_vm
258+
with e ->
259+
error "%s Caught error %s while preparing for SXM" __FUNCTION__
260+
(Printexc.to_string e) ;
261+
raise
262+
(Storage_error (Migration_preparation_failure (Printexc.to_string e)))
263+
250264
let start ~task ~dbg ~sr ~vdi ~dp ~mirror_vm ~copy_vm ~url ~dest ~verify_dest
251265
=
252266
SXM.info
@@ -262,7 +276,6 @@ module MigrateLocal = struct
262276
(Storage_interface.Sr.string_of dest)
263277
verify_dest ;
264278

265-
let remote_url = Http.Url.of_string url in
266279
let (module Remote) = get_remote_backend url verify_dest in
267280
(* Find the local VDI *)
268281
let local_vdi = find_local_vdi ~dbg ~sr ~vdi in
@@ -285,176 +298,51 @@ module MigrateLocal = struct
285298
State.add mirror_id (State.Send_op alm) ;
286299
debug "%s Added mirror %s to active local mirrors" __FUNCTION__ mirror_id ;
287300
(* A list of cleanup actions to perform if the operation should fail. *)
288-
let on_fail : (unit -> unit) list ref = ref [] in
289301
try
290-
let similar_vdis = Local.VDI.similar_content dbg sr vdi in
291-
let similars =
292-
List.filter
293-
(fun x -> x <> "")
294-
(List.map (fun vdi -> vdi.content_id) similar_vdis)
302+
let (Vhd_mirror remote_mirror) =
303+
prepare ~dbg ~sr ~vdi ~dest ~local_vdi ~mirror_id ~mirror_vm ~url
304+
~verify_dest
295305
in
296-
debug "Similar VDIs to = [ %s ]"
297-
(String.concat "; "
298-
(List.map
299-
(fun x ->
300-
Printf.sprintf "(vdi=%s,content_id=%s)"
301-
(Storage_interface.Vdi.string_of x.vdi)
302-
x.content_id
303-
)
304-
similar_vdis
305-
)
306-
) ;
307-
let (Mirror.Vhd_mirror result) =
308-
Remote.DATA.MIRROR.receive_start2 dbg dest local_vdi mirror_id similars
309-
mirror_vm
310-
in
311-
(* Enable mirroring on the local machine *)
312-
let mirror_dp = result.Mirror.mirror_datapath in
313-
let uri =
314-
Printf.sprintf "/services/SM/nbd/%s/%s/%s/%s"
315-
(Storage_interface.Vm.string_of mirror_vm)
316-
(Storage_interface.Sr.string_of dest)
317-
(Storage_interface.Vdi.string_of result.Mirror.mirror_vdi.vdi)
318-
mirror_dp
319-
in
320-
debug "%s: uri of http request for mirroring is %s" __FUNCTION__ uri ;
321-
let dest_url = Http.Url.set_uri remote_url uri in
322-
let request =
323-
Http.Request.make
324-
~query:(Http.Url.get_query_params dest_url)
325-
~version:"1.0" ~user_agent:"smapiv2" Http.Put uri
326-
in
327-
let verify_cert = if verify_dest then Stunnel_client.pool () else None in
328-
let transport = Xmlrpc_client.transport_of_url ~verify_cert dest_url in
329-
debug "Searching for data path: %s" dp ;
330-
let attach_info = Local.DP.attach_info dbg sr vdi dp mirror_vm in
331-
on_fail :=
332-
(fun () -> Remote.DATA.MIRROR.receive_cancel dbg mirror_id) :: !on_fail ;
333306
let tapdev =
334-
match tapdisk_of_attach_info attach_info with
335-
| Some tapdev ->
336-
let pid = Tapctl.get_tapdisk_pid tapdev in
337-
let path =
338-
Printf.sprintf "/var/run/blktap-control/nbdclient%d" pid
339-
in
340-
with_transport ~stunnel_wait_disconnect:false transport
341-
(with_http request (fun (_response, s) ->
342-
let control_fd =
343-
Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0
344-
in
345-
finally
346-
(fun () ->
347-
Unix.connect control_fd (Unix.ADDR_UNIX path) ;
348-
let msg = dp in
349-
let len = String.length msg in
350-
let written =
351-
Unixext.send_fd_substring control_fd msg 0 len [] s
352-
in
353-
if written <> len then (
354-
error "Failed to transfer fd to %s" path ;
355-
failwith "Internal error transferring fd to tapdisk"
356-
)
357-
)
358-
(fun () -> Unix.close control_fd)
359-
)
360-
) ;
361-
tapdev
362-
| None ->
363-
failwith "Not attached"
364-
in
365-
debug "%s Updating active local mirrors: id=%s" __FUNCTION__ mirror_id ;
366-
let alm =
367-
State.Send_state.
368-
{
369-
url
370-
; dest_sr= dest
371-
; remote_info=
372-
Some
373-
{
374-
dp= mirror_dp
375-
; vdi= result.Mirror.mirror_vdi.vdi
376-
; url
377-
; verify_dest
378-
}
379-
; local_dp= dp
380-
; tapdev= Some tapdev
381-
; failed= false
382-
; watchdog= None
383-
}
307+
Storage_smapiv1_migrate.mirror_pass_fds ~dbg ~dp ~sr ~vdi ~mirror_vm
308+
~mirror_id ~url ~dest_sr:dest ~verify_dest ~remote_mirror
384309
in
385-
386-
State.add mirror_id (State.Send_op alm) ;
387-
debug "%s Updated mirror_id %s in the active local mirror" __FUNCTION__
388-
mirror_id ;
389-
390-
SXM.info "%s About to snapshot VDI = %s" __FUNCTION__
391-
(string_of_vdi_info local_vdi) ;
392-
let local_vdi = add_to_sm_config local_vdi "mirror" ("nbd:" ^ dp) in
393-
let local_vdi = add_to_sm_config local_vdi "base_mirror" mirror_id in
394310
let snapshot =
395-
try Local.VDI.snapshot dbg sr local_vdi with
396-
| Storage_interface.Storage_error (Backend_error (code, _))
397-
when code = "SR_BACKEND_FAILURE_44" ->
398-
raise
399-
(Api_errors.Server_error
400-
( Api_errors.sr_source_space_insufficient
401-
, [Storage_interface.Sr.string_of sr]
402-
)
403-
)
404-
| e ->
405-
raise e
311+
Storage_smapiv1_migrate.mirror_snapshot ~dbg ~sr ~dp ~mirror_id
312+
~local_vdi
406313
in
407-
SXM.info "%s: snapshot created, mirror initiated vdi:%s snapshot_of:%s"
408-
__FUNCTION__
409-
(Storage_interface.Vdi.string_of snapshot.vdi)
410-
(Storage_interface.Vdi.string_of local_vdi.vdi) ;
411-
on_fail := (fun () -> Local.VDI.destroy dbg sr snapshot.vdi) :: !on_fail ;
412-
(let rec inner () =
413-
let alm_opt = State.find_active_local_mirror mirror_id in
414-
match alm_opt with
415-
| Some alm ->
416-
let stats = Tapctl.stats (Tapctl.create ()) tapdev in
417-
if stats.Tapctl.Stats.nbd_mirror_failed = 1 then (
418-
error "Tapdisk mirroring has failed" ;
419-
Updates.add (Dynamic.Mirror mirror_id) updates
420-
) ;
421-
alm.State.Send_state.watchdog <-
422-
Some
423-
(Scheduler.one_shot scheduler (Scheduler.Delta 5)
424-
"tapdisk_watchdog" inner
425-
)
426-
| None ->
427-
()
428-
in
429-
inner ()
430-
) ;
431-
on_fail := (fun () -> stop ~dbg ~id:mirror_id) :: !on_fail ;
432-
(* Copy the snapshot to the remote *)
314+
Storage_smapiv1_migrate.mirror_checker mirror_id tapdev ;
433315
let new_parent =
434-
Storage_task.with_subtask task "copy" (fun () ->
435-
copy_into_vdi ~task ~dbg ~sr ~vdi:snapshot.vdi ~vm:copy_vm ~url
436-
~dest ~dest_vdi:result.Mirror.copy_diffs_to ~verify_dest
437-
)
438-
|> vdi_info
316+
Storage_smapiv1_migrate.mirror_copy ~task ~dbg ~sr ~snapshot ~copy_vm
317+
~url ~dest_sr:dest ~remote_mirror ~verify_dest
439318
in
440319
debug "Local VDI %s = remote VDI %s"
441320
(Storage_interface.Vdi.string_of snapshot.vdi)
442321
(Storage_interface.Vdi.string_of new_parent.vdi) ;
443322
debug "Local VDI %s now mirrored to remote VDI: %s"
444323
(Storage_interface.Vdi.string_of local_vdi.vdi)
445-
(Storage_interface.Vdi.string_of result.Mirror.mirror_vdi.vdi) ;
324+
(Storage_interface.Vdi.string_of remote_mirror.Mirror.mirror_vdi.vdi) ;
446325
debug "Destroying snapshot on src" ;
447326
Local.VDI.destroy dbg sr snapshot.vdi ;
448327
Some (Mirror_id mirror_id)
449328
with
450329
| Storage_error (Sr_not_attached sr_uuid) ->
451330
error " Caught exception %s:%s. Performing cleanup."
452331
Api_errors.sr_not_attached sr_uuid ;
453-
perform_cleanup_actions !on_fail ;
454332
raise (Api_errors.Server_error (Api_errors.sr_not_attached, [sr_uuid]))
333+
| ( Storage_error (Migration_mirror_fd_failure reason)
334+
| Storage_error (Migration_mirror_snapshot_failure reason) ) as e ->
335+
error "%s: Caught %s: during storage migration preparation" __FUNCTION__
336+
reason ;
337+
MigrateRemote.receive_cancel ~dbg ~id:mirror_id ;
338+
raise e
339+
| Storage_error (Migration_mirror_copy_failure reason) as e ->
340+
error "%s: Caught %s: during storage migration copy" __FUNCTION__ reason ;
341+
stop ~dbg ~id:mirror_id ;
342+
raise e
455343
| e ->
456-
error "Caught %s: performing cleanup actions" (Api_errors.to_string e) ;
457-
perform_cleanup_actions !on_fail ;
344+
error "Caught %s during SXM: " (Api_errors.to_string e) ;
345+
stop ~dbg ~id:mirror_id ;
458346
raise e
459347

460348
let stat ~dbg:_ ~id =

ocaml/xapi/storage_smapiv1_migrate.ml

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,161 @@ module Copy = struct
401401
raise (Storage_error (Internal_error (Printexc.to_string e)))
402402
end
403403

404+
let mirror_pass_fds ~dbg ~dp ~sr ~vdi ~mirror_vm ~mirror_id ~url ~dest_sr
405+
~verify_dest ~(remote_mirror : Mirror.mirror_receive_result_vhd_t) =
406+
let remote_vdi = remote_mirror.mirror_vdi.vdi in
407+
let mirror_dp = remote_mirror.mirror_datapath in
408+
409+
let uri =
410+
Printf.sprintf "/services/SM/nbd/%s/%s/%s/%s"
411+
(Storage_interface.Vm.string_of mirror_vm)
412+
(Storage_interface.Sr.string_of dest_sr)
413+
(Storage_interface.Vdi.string_of remote_vdi)
414+
mirror_dp
415+
in
416+
D.debug "%s: uri of http request for mirroring is %s" __FUNCTION__ uri ;
417+
let dest_url = Http.Url.set_uri (Http.Url.of_string url) uri in
418+
D.debug "%s url of http request for mirroring is %s" __FUNCTION__
419+
(Http.Url.to_string dest_url) ;
420+
let request =
421+
Http.Request.make
422+
~query:(Http.Url.get_query_params dest_url)
423+
~version:"1.0" ~user_agent:"smapiv2" Http.Put uri
424+
in
425+
let verify_cert = if verify_dest then Stunnel_client.pool () else None in
426+
let transport = Xmlrpc_client.transport_of_url ~verify_cert dest_url in
427+
D.debug "Searching for data path: %s" dp ;
428+
let attach_info = Local.DP.attach_info dbg sr vdi dp mirror_vm in
429+
430+
let tapdev =
431+
match tapdisk_of_attach_info attach_info with
432+
| Some tapdev ->
433+
let pid = Tapctl.get_tapdisk_pid tapdev in
434+
let path = Printf.sprintf "/var/run/blktap-control/nbdclient%d" pid in
435+
with_transport ~stunnel_wait_disconnect:false transport
436+
(with_http request (fun (_response, s) ->
437+
(* Enable mirroring on the local machine *)
438+
let control_fd = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in
439+
finally
440+
(fun () ->
441+
Unix.connect control_fd (Unix.ADDR_UNIX path) ;
442+
let msg = dp in
443+
let len = String.length msg in
444+
let written =
445+
Unixext.send_fd_substring control_fd msg 0 len [] s
446+
in
447+
if written <> len then (
448+
D.error "Failed to transfer fd to %s" path ;
449+
failwith "Internal error transferring fd to tapdisk"
450+
)
451+
)
452+
(fun () -> Unix.close control_fd)
453+
)
454+
) ;
455+
tapdev
456+
| None ->
457+
D.error "%s: vdi %s not attached" __FUNCTION__ (Vdi.string_of vdi) ;
458+
raise
459+
(Storage_interface.Storage_error
460+
(Migration_mirror_fd_failure "VDI Not Attached")
461+
)
462+
| exception e ->
463+
D.error "%s Caught exception %s:. Performing cleanup." __FUNCTION__
464+
(Printexc.to_string e) ;
465+
raise
466+
(Storage_interface.Storage_error
467+
(Migration_mirror_fd_failure (Printexc.to_string e))
468+
)
469+
in
470+
D.debug "%s Updating active local mirrors: id=%s" __FUNCTION__ mirror_id ;
471+
let alm =
472+
State.Send_state.
473+
{
474+
url
475+
; dest_sr
476+
; remote_info=
477+
Some
478+
{
479+
dp= remote_mirror.mirror_datapath
480+
; vdi= remote_mirror.mirror_vdi.vdi
481+
; url
482+
; verify_dest
483+
}
484+
; local_dp= dp
485+
; tapdev= Some tapdev
486+
; failed= false
487+
; watchdog= None
488+
}
489+
in
490+
State.add mirror_id (State.Send_op alm) ;
491+
D.debug "%s Updated mirror_id %s in the active local mirror" __FUNCTION__
492+
mirror_id ;
493+
tapdev
494+
495+
let mirror_snapshot ~dbg ~sr ~dp ~mirror_id ~local_vdi =
496+
SXM.info "%s About to snapshot VDI = %s" __FUNCTION__
497+
(string_of_vdi_info local_vdi) ;
498+
let local_vdi = add_to_sm_config local_vdi "mirror" ("nbd:" ^ dp) in
499+
let local_vdi = add_to_sm_config local_vdi "base_mirror" mirror_id in
500+
let snapshot =
501+
try Local.VDI.snapshot dbg sr local_vdi with
502+
| Storage_interface.Storage_error (Backend_error (code, _))
503+
when code = "SR_BACKEND_FAILURE_44" ->
504+
raise
505+
(Storage_interface.Storage_error
506+
(Migration_mirror_snapshot_failure
507+
(Printf.sprintf "%s:%s" Api_errors.sr_source_space_insufficient
508+
(Storage_interface.Sr.string_of sr)
509+
)
510+
)
511+
)
512+
| e ->
513+
raise
514+
(Storage_interface.Storage_error
515+
(Migration_mirror_snapshot_failure (Printexc.to_string e))
516+
)
517+
in
518+
519+
SXM.info "%s: snapshot created, mirror initiated vdi:%s snapshot_of:%s"
520+
__FUNCTION__
521+
(Storage_interface.Vdi.string_of snapshot.vdi)
522+
(Storage_interface.Vdi.string_of local_vdi.vdi) ;
523+
524+
snapshot
525+
526+
let mirror_checker mirror_id tapdev =
527+
let rec inner () =
528+
let alm_opt = State.find_active_local_mirror mirror_id in
529+
match alm_opt with
530+
| Some alm ->
531+
let stats = Tapctl.stats (Tapctl.create ()) tapdev in
532+
if stats.Tapctl.Stats.nbd_mirror_failed = 1 then (
533+
D.error "Tapdisk mirroring has failed" ;
534+
Updates.add (Dynamic.Mirror mirror_id) updates
535+
) ;
536+
alm.State.Send_state.watchdog <-
537+
Some
538+
(Scheduler.one_shot scheduler (Scheduler.Delta 5) "tapdisk_watchdog"
539+
inner
540+
)
541+
| None ->
542+
()
543+
in
544+
inner ()
545+
546+
let mirror_copy ~task ~dbg ~sr ~snapshot ~copy_vm ~url ~dest_sr ~remote_mirror
547+
~verify_dest =
548+
(* Copy the snapshot to the remote *)
549+
try
550+
Storage_task.with_subtask task "copy" (fun () ->
551+
Copy.copy_into_vdi ~task ~dbg ~sr ~vdi:snapshot.vdi ~vm:copy_vm ~url
552+
~dest:dest_sr ~dest_vdi:remote_mirror.Mirror.copy_diffs_to
553+
~verify_dest
554+
)
555+
|> vdi_info
556+
with e ->
557+
raise (Storage_error (Migration_mirror_copy_failure (Printexc.to_string e)))
558+
404559
module MIRROR : SMAPIv2_MIRROR = struct
405560
type context = unit
406561

0 commit comments

Comments
 (0)