@@ -490,36 +490,9 @@ impl Plan {
490490 Self :: refine_union_negate_consolidation ( & mut dataflow) ;
491491 }
492492
493- if dataflow. is_single_time ( ) {
494- Self :: refine_single_time_operator_selection ( & mut dataflow) ;
495-
496- // The relaxation of the `must_consolidate` flag performs an LIR-based
497- // analysis and transform under checked recursion. By a similar argument
498- // made in `from_mir`, we do not expect the recursion limit to be hit.
499- // However, if that happens, we propagate an error to the caller.
500- // To apply the transform, we first obtain monotonic source and index
501- // global IDs and add them to a `TransformConfig` instance.
502- let monotonic_ids = dataflow
503- . source_imports
504- . iter ( )
505- . filter_map ( |( id, source_import) | source_import. monotonic . then_some ( * id) )
506- . chain (
507- dataflow
508- . index_imports
509- . iter ( )
510- . filter_map ( |( _id, index_import) | {
511- if index_import. monotonic {
512- Some ( index_import. desc . on_id )
513- } else {
514- None
515- }
516- } ) ,
517- )
518- . collect :: < BTreeSet < _ > > ( ) ;
493+ Self :: refine_single_time_operator_selection ( & mut dataflow) ;
519494
520- let config = TransformConfig { monotonic_ids } ;
521- Self :: refine_single_time_consolidation ( & mut dataflow, & config) ?;
522- }
495+ Self :: refine_single_time_consolidation ( & mut dataflow) ?;
523496
524497 soft_assert_eq_no_log ! ( dataflow. check_invariants( ) , Ok ( ( ) ) ) ;
525498
@@ -639,18 +612,36 @@ impl Plan {
639612 mz_repr:: explain:: trace_plan ( dataflow) ;
640613 }
641614
642- /// Refines the plans of objects to be built as part of `dataflow` to take advantage
643- /// of monotonic operators if the dataflow refers to a single-time, i.e., is for a
644- /// one-shot SELECT query.
615+ /// If the dataflow refers to a single-time, i.e., is for a one-shot SELECT query, this function
616+ /// refines the `dataflow` to use monotonic operators.
617+ ///
618+ /// Note that we set the `must_consolidate` flag on the monotonic operators, i.e., we don't
619+ /// assume physical monotonicity here. Reasoning about physical monotonicity and refining the
620+ /// `must_consolidate` flag is the responsibility of `refine_single_time_consolidation`.
621+ ///
622+ /// Note that, strictly speaking, choosing monotonic operators is valid only by assuming that
623+ /// - all inputs of single-time dataflows are logically (i.e., after consolidation) monotonic;
624+ /// - it's not possible to introduce logically non-monotonic collections inside a single-time
625+ /// dataflow when the inputs are logically monotonic.
626+ ///
627+ /// This assumption is not true when there are negative accumulations, which can be introduced
628+ /// by bugs (in Materialize or external systems), or by user error when using `repeat_row`.
629+ /// In such cases, the introduced monotonic operators won't produce correct results. This is
630+ /// considered ok, because:
631+ /// - In most cases, monotonic operators will detect non-monotonic input, and cleanly error out.
632+ /// - In some cases, a monotonic operator might produce garbage output when given non-monotonic
633+ /// input. Even this is considered ok, because we generally don't expect the system to always
634+ /// detect negative accumulations (because this seems to be ~impossible to achieve without
635+ /// adding further resource usage to various operators, not just monotonic operators).
645636 #[ mz_ore:: instrument(
646637 target = "optimizer" ,
647638 level = "debug" ,
648639 fields( path. segment = "refine_single_time_operator_selection" )
649640 ) ]
650641 fn refine_single_time_operator_selection ( dataflow : & mut DataflowDescription < Self > ) {
651- // We should only reach here if we have a one-shot SELECT query, i.e.,
652- // a single-time dataflow.
653- assert ! ( dataflow . is_single_time ( ) ) ;
642+ if !dataflow . is_single_time ( ) {
643+ return ;
644+ }
654645
655646 // Upgrade single-time plans to monotonic.
656647 for build_desc in dataflow. objects_to_build . iter_mut ( ) {
@@ -698,16 +689,41 @@ impl Plan {
698689 ) ]
699690 fn refine_single_time_consolidation (
700691 dataflow : & mut DataflowDescription < Self > ,
701- config : & TransformConfig ,
702692 ) -> Result < ( ) , String > {
703- // We should only reach here if we have a one-shot SELECT query, i.e.,
704- // a single-time dataflow.
705- assert ! ( dataflow. is_single_time( ) ) ;
693+ if !dataflow. is_single_time ( ) {
694+ return Ok ( ( ) ) ;
695+ }
696+
697+ // The relaxation of the `must_consolidate` flag performs an LIR-based
698+ // analysis and transform under checked recursion. By a similar argument
699+ // made in `from_mir`, we do not expect the recursion limit to be hit.
700+ // However, if that happens, we propagate an error to the caller.
701+ // To apply the transform, we first obtain monotonic source and index
702+ // global IDs and add them to a `TransformConfig` instance.
703+ let monotonic_ids = dataflow
704+ . source_imports
705+ . iter ( )
706+ . filter_map ( |( id, source_import) | source_import. monotonic . then_some ( * id) )
707+ . chain (
708+ dataflow
709+ . index_imports
710+ . iter ( )
711+ . filter_map ( |( _id, index_import) | {
712+ if index_import. monotonic {
713+ Some ( index_import. desc . on_id )
714+ } else {
715+ None
716+ }
717+ } ) ,
718+ )
719+ . collect :: < BTreeSet < _ > > ( ) ;
720+
721+ let config = TransformConfig { monotonic_ids } ;
706722
707723 let transform = transform:: RelaxMustConsolidate ;
708724 for build_desc in dataflow. objects_to_build . iter_mut ( ) {
709725 transform
710- . transform ( config, & mut build_desc. plan )
726+ . transform ( & config, & mut build_desc. plan )
711727 . map_err ( |_| "Maximum recursion limit error in consolidation relaxation." ) ?;
712728 }
713729 mz_repr:: explain:: trace_plan ( dataflow) ;
0 commit comments