11import collections
22import datetime
33import functools
4+ import heapq
5+ import itertools
46import logging
57
68from django .apps import apps
2830)
2931from osf .metrics import reports as es6_reports
3032from osf .metrics import es8_metrics , RegistriesModerationMetrics
31- from osf .metrics .reporters .public_item_usage import (
32- _iter_composite_bucket_keys ,
33- _zip_sorted ,
34- )
33+ from osf .metrics .reporters .public_item_usage import _iter_composite_bucket_keys
3534from osf .metrics .utils import (
3635 YearMonth ,
3736 get_database_iri ,
@@ -157,7 +156,7 @@ def _each_new():
157156 _each_hit = _es6_scan_range (
158157 es6_reports .PublicItemUsageReport ,
159158 until_when = until_when ,
160- addl_filter = {'term ' : {'item_osfid' : osfid }},
159+ addl_filter = {'terms ' : {'item_osfid' : _synonymous_osfids ( osfid ) }},
161160 sort = 'report_yearmonth' ,
162161 )
163162 _hits = list (_each_hit )
@@ -543,13 +542,9 @@ def _cumulative_countedusage_downloads(osfid, until_when) -> tuple[int, int]:
543542def _cumulative_preprint_count (preprint_metric_cls , osfid : str , until_when : str ) -> int :
544543 '''aggregate views on each preprint'''
545544 # copied/adapted from osf.metrics.preprint_metrics
546- _preprint_ids = [osfid ]
547- if osfid .endswith ('_v1' ):
548- # include pre-versioned-guid counts for v1
549- _preprint_ids .append (osfid .removesuffix ('_v1' ))
550545 _search = (
551546 preprint_metric_cls .search ()
552- .filter ('terms' , preprint_id = _preprint_ids )
547+ .filter ('terms' , preprint_id = _synonymous_osfids ( osfid ) )
553548 .filter ('range' , timestamp = {'lt' : until_when })
554549 .extra (size = 0 ) # no hits; only aggs
555550 )
@@ -563,6 +558,17 @@ def _cumulative_preprint_count(preprint_metric_cls, osfid: str, until_when: str)
563558 return _view_count
564559
565560
561+ def _synonymous_osfids (osfid : str ) -> list [str ]:
562+ _synonyms = [osfid ]
563+ if osfid .endswith ('_v1' ):
564+ # include pre-versioned-guid counts for v1
565+ _synonyms .append (osfid .removesuffix ('_v1' ))
566+ elif '_' not in osfid :
567+ # include v1 (if it exists) with unversioned guid
568+ _synonyms .append (f'{ osfid } _v1' )
569+ return _synonyms
570+
571+
566572def _convert_item_type_list (osf_model_names : list [str ] | str , has_surrounding_items : bool ):
567573 if isinstance (osf_model_names , str ):
568574 osf_model_names = [osf_model_names ]
@@ -675,6 +681,20 @@ def _each_preprintdownload_osfid(until_when, after_osfid=None) -> collections.ab
675681 return _iter_composite_bucket_keys (_search , 'agg_osfid' , 'osfid' , after = after_osfid )
676682
677683
684+ def _merge_sorted_osfids (* osfid_iterables ):
685+ def _osfids_group_key (osfid : str ):
686+ return ( # v1 same as unversioned
687+ osfid .removesuffix ('_v1' )
688+ if osfid .endswith ('_v1' )
689+ else osfid
690+ )
691+ for _k , _g in itertools .groupby (
692+ heapq .merge (* osfid_iterables ),
693+ key = _osfids_group_key ,
694+ ):
695+ yield _k
696+
697+
678698###
679699# the command itself
680700
@@ -845,7 +865,7 @@ def _handle_usage_reports(self, *, start: bool, no_counts: bool):
845865 self .stdout .write (
846866 f'starting per-item { es6_reports .PublicItemUsageReport .__name__ } => { es8_metrics .MonthlyPublicItemUsageReportEs8 .__name__ } '
847867 )
848- for _osfid in _zip_sorted (
868+ for _osfid in _merge_sorted_osfids (
849869 _each_usage_report_osfid (until_when = self ._migration_started_at ),
850870 _each_countedusage_osfid (until_when = self ._migration_started_at ),
851871 _each_preprintview_osfid (until_when = self ._migration_started_at ),
0 commit comments