@@ -525,6 +525,59 @@ let rec next ~__context =
525525 else
526526 rpc_of_events relevant
527527
528+ type time = Xapi_database.Db_cache_types.Time .t
529+
530+ type entry = {table : string ; obj : string ; time : time }
531+
532+ type acc = {
533+ creates : entry list
534+ ; mods : entry list
535+ ; deletes : entry list
536+ ; last : time
537+ }
538+
539+ let collect_events (subs , tables , last_generation ) acc table =
540+ let open Xapi_database in
541+ let open Db_cache_types in
542+ let table_value = TableSet. find table tables in
543+ let prepend_recent obj stat _ ({creates; mods; last; _} as entries ) =
544+ let Stat. {created; modified; deleted} = stat in
545+ if Subscription. object_matches subs table obj then
546+ let last = max last (max modified deleted) in
547+ let creates =
548+ if created > last_generation then
549+ {table; obj; time= created} :: creates
550+ else
551+ creates
552+ in
553+ let mods =
554+ if modified > last_generation && not (created > last_generation) then
555+ {table; obj; time= modified} :: mods
556+ else
557+ mods
558+ in
559+ {entries with creates; mods; last}
560+ else
561+ entries
562+ in
563+ let prepend_deleted obj stat ({deletes; last; _} as entries ) =
564+ let Stat. {created; modified; deleted} = stat in
565+ if Subscription. object_matches subs table obj then
566+ let last = max last (max modified deleted) in
567+ let deletes =
568+ if created < = last_generation then
569+ {table; obj; time= deleted} :: deletes
570+ else
571+ deletes
572+ in
573+ {entries with deletes; last}
574+ else
575+ entries
576+ in
577+ acc
578+ |> Table. fold_over_recent last_generation prepend_recent table_value
579+ |> Table. fold_over_deleted last_generation prepend_deleted table_value
580+
528581let from_inner __context session subs from from_t timer batching =
529582 let open Xapi_database in
530583 let open From in
@@ -541,159 +594,118 @@ let from_inner __context session subs from from_t timer batching =
541594 in
542595 List. filter (fun table -> Subscription. table_matches subs table) all
543596 in
544- let last_generation = ref from in
545597 let last_msg_gen = ref from_t in
546- let grab_range t =
598+ let grab_range ~ since t =
547599 let tableset = Db_cache_types.Database. tableset (Db_ref. get_database t) in
548600 let msg_gen, messages =
549601 if Subscription. table_matches subs " message" then
550602 ! Message. get_since_for_events ~__context ! last_msg_gen
551603 else
552604 (0L , [] )
553605 in
554- ( msg_gen
555- , messages
556- , tableset
557- , List. fold_left
558- (fun acc table ->
559- (* Fold over the live objects *)
560- let acc =
561- Db_cache_types.Table. fold_over_recent ! last_generation
562- (fun objref {Db_cache_types.Stat. created; modified; deleted} _
563- (creates , mods , deletes , last ) ->
564- if Subscription. object_matches subs table objref then
565- let last = max last (max modified deleted) in
566- (* mtime guaranteed to always be larger than ctime *)
567- ( ( if created > ! last_generation then
568- (table, objref, created) :: creates
569- else
570- creates
571- )
572- , ( if
573- modified > ! last_generation
574- && not (created > ! last_generation)
575- then
576- (table, objref, modified) :: mods
577- else
578- mods
579- )
580- , (* Only have a mod event if we don't have a created event *)
581- deletes
582- , last
583- )
584- else
585- (creates, mods, deletes, last)
586- )
587- (Db_cache_types.TableSet. find table tableset)
588- acc
589- in
590- (* Fold over the deleted objects *)
591- Db_cache_types.Table. fold_over_deleted ! last_generation
592- (fun objref {Db_cache_types.Stat. created; modified; deleted}
593- (creates , mods , deletes , last ) ->
594- if Subscription. object_matches subs table objref then
595- let last = max last (max modified deleted) in
596- (* mtime guaranteed to always be larger than ctime *)
597- if created > ! last_generation then
598- (creates, mods, deletes, last)
599- (* It was created and destroyed since the last update *)
600- else
601- (creates, mods, (table, objref, deleted) :: deletes, last)
602- (* It might have been modified, but we can't tell now *)
603- else
604- (creates, mods, deletes, last)
605- )
606- (Db_cache_types.TableSet. find table tableset)
607- acc
608- )
609- ([] , [] , [] , ! last_generation)
610- tables
611- )
606+ let events =
607+ let initial = {creates= [] ; mods= [] ; deletes= [] ; last= since} in
608+ let folder = collect_events (subs, tableset, since) in
609+ List. fold_left folder initial tables
610+ in
611+ (msg_gen, messages, tableset, events)
612612 in
613613 (* Each event.from should have an independent subscription record *)
614- let msg_gen, messages, tableset, (creates, mods, deletes, last) =
614+ let msg_gen, messages, tableset, events =
615615 with_call session subs (fun sub ->
616616 let grab_nonempty_range =
617- Throttle.Batching. with_recursive_loop batching @@ fun self arg ->
618- let ( (msg_gen, messages, _tableset, (creates, mods, deletes, last))
619- as result
620- ) =
621- Db_lock. with_lock (fun () -> grab_range (Db_backend. make () ))
617+ Throttle.Batching. with_recursive_loop batching @@ fun self since ->
618+ let result =
619+ Db_lock. with_lock (fun () -> grab_range ~since (Db_backend. make () ))
622620 in
621+ let msg_gen, messages, _tables, events = result in
622+ let {creates; mods; deletes; last} = events in
623623 if
624624 creates = []
625625 && mods = []
626626 && deletes = []
627627 && messages = []
628628 && not (Clock.Timer. has_expired timer)
629629 then (
630- last_generation := last ;
631- (* Cur_id was bumped, but nothing relevent fell out of the db. Therefore the *)
630+ (* cur_id was bumped, but nothing relevent fell out of the database.
631+ Therefore the last ID the client got is equivalent to the current one. *)
632632 sub.cur_id < - last ;
633- (* last id the client got is equivalent to the current one *)
634633 last_msg_gen := msg_gen ;
635634 wait2 sub last timer ;
636- (self [@ tailcall]) arg
635+ (* The next iteration will fold over events starting after
636+ the last database event that matched a subscription. *)
637+ let next = last in
638+ (self [@ tailcall]) next
637639 ) else
638640 result
639641 in
640- grab_nonempty_range ()
642+ grab_nonempty_range from
641643 )
642644 in
643- last_generation := last ;
644- let event_of op ?snapshot ( table , objref , time ) =
645+ let {creates; mods; deletes; last} = events in
646+ let event_of op ?snapshot { table; obj; time} =
645647 {
646648 id= Int64. to_string time
647649 ; ts= " 0.0"
648650 ; ty= String. lowercase_ascii table
649651 ; op
650- ; reference= objref
652+ ; reference= obj
651653 ; snapshot
652654 }
653655 in
654- let events =
655- List. fold_left
656- (fun acc x ->
657- let ev = event_of `del x in
658- if Subscription. event_matches subs ev then ev :: acc else acc
659- )
660- [] deletes
661- in
662- let events =
663- List. fold_left
664- (fun acc (table , objref , mtime ) ->
665- let serialiser = Eventgen. find_get_record table in
666- try
667- let xml = serialiser ~__context ~self: objref () in
668- let ev = event_of `_mod ?snapshot:xml (table, objref, mtime) in
669- if Subscription. event_matches subs ev then ev :: acc else acc
670- with _ -> acc
671- )
672- events mods
656+ let events_of ~kind ?(with_snapshot = true ) entries acc =
657+ let rec go events ({table; obj; time = _ } as entry ) =
658+ try
659+ let snapshot =
660+ let serialiser = Eventgen. find_get_record table in
661+ if with_snapshot then
662+ serialiser ~__context ~self: obj ()
663+ else
664+ None
665+ in
666+ let event = event_of kind ?snapshot entry in
667+ if Subscription. event_matches subs event then
668+ event :: events
669+ else
670+ events
671+ with _ ->
672+ (* CA-91931: An exception may be raised here if an object's
673+ lifetime is too short.
674+
675+ The problem is that "collect_events" and "events_of" work
676+ on different versions of the database, so some `add and
677+ `mod events can be lost if the corresponding object is
678+ deleted before a snapshot is taken.
679+
680+ In practice, this has only been seen with the "task"
681+ object - which can be rapidly created and destroyed using
682+ helper functions.
683+
684+ These exceptions have been suppressed since [bc0cc5a9]. *)
685+ events
686+ in
687+ List. fold_left go acc entries
673688 in
674689 let events =
675- List. fold_left
676- (fun acc (table , objref , ctime ) ->
677- let serialiser = Eventgen. find_get_record table in
678- try
679- let xml = serialiser ~__context ~self: objref () in
680- let ev = event_of `add ?snapshot:xml (table, objref, ctime) in
681- if Subscription. event_matches subs ev then ev :: acc else acc
682- with _ -> acc
683- )
684- events creates
690+ [] (* Accumulate the events for objects stored in the database. *)
691+ |> events_of ~kind: `del ~with_snapshot: false deletes
692+ |> events_of ~kind: `_mod mods
693+ |> events_of ~kind: `add creates
685694 in
686695 let events =
696+ (* Messages require a special casing as their contents are not
697+ stored in the database. *)
687698 List. fold_left
688699 (fun acc mev ->
689700 let event =
701+ let table = " message" in
690702 match mev with
691703 | Message. Create (_ref , message ) ->
692704 event_of `add
693705 ?snapshot:(Some (API. rpc_of_message_t message))
694- ( " message " , Ref. string_of _ref, 0L )
706+ {table; obj = Ref. string_of _ref; time = 0L }
695707 | Message. Del _ref ->
696- event_of `del ( " message " , Ref. string_of _ref, 0L )
708+ event_of `del {table; obj = Ref. string_of _ref; time = 0L }
697709 in
698710 event :: acc
699711 )
0 commit comments