11import collections
22import datetime
33import functools
4+ import heapq
5+ import itertools
46import logging
57
68from django .apps import apps
2830)
2931from osf .metrics import reports as es6_reports
3032from osf .metrics import es8_metrics , RegistriesModerationMetrics
31- from osf .metrics .reporters .public_item_usage import (
32- _iter_composite_bucket_keys ,
33- _zip_sorted ,
34- )
33+ from osf .metrics .reporters .public_item_usage import _iter_composite_bucket_keys
3534from osf .metrics .utils import (
3635 YearMonth ,
3736 get_database_iri ,
@@ -145,35 +144,40 @@ def migrate_preprint_downloads(from_when: str, until_when: str):
145144 _es8_bulk_save (es8_metrics .OsfCountedUsageEvent , _each_new )
146145
147146
147+ @celery_app .task (** _TASK_KWARGS )
148+ def schedule_migrate_usage_reports (until_when : str ):
149+ for _osfid in _merge_sorted_osfids (
150+ _each_usage_report_osfid (until_when = until_when ),
151+ _each_countedusage_osfid (until_when = until_when ),
152+ _each_preprintview_osfid (until_when = until_when ),
153+ _each_preprintdownload_osfid (until_when = until_when ),
154+ ):
155+ migrate_usage_reports .delay (_osfid , until_when )
156+
157+
148158@celery_app .task (** _TASK_KWARGS )
149159def migrate_usage_reports (osfid : str , until_when : str ):
150160 # from PublicItemUsageReport to MonthlyPublicItemUsageReportEs8
151161 _osfobj , _ = osfdb .Guid .load_referent (osfid )
152162 _item_is_component = is_osf_component (_osfobj ) if _osfobj else False
153163
154164 def _each_new ():
155- # go in sorted order to build cumulative counts
156- # (only a few dozen of these per item; should be fine to sort and load all at once)
157165 _each_hit = _es6_scan_range (
158166 es6_reports .PublicItemUsageReport ,
159167 until_when = until_when ,
160- addl_filter = {'term' : {'item_osfid' : osfid }},
161- sort = 'report_yearmonth' ,
168+ addl_filter = {'terms' : {'item_osfid' : _synonymous_osfids (osfid )}},
162169 )
170+ # (only a few dozen of these per item; should be fine to load all at once)
163171 _hits = list (_each_hit )
164172 if _osfobj and not _hits :
165173 # this item has usages, but only before the monthly usage reparts started
166174 # -- create one for cumulative counts (if the object still exists)
167175 yield _backfill_old_usage_report (_osfobj , _item_is_component , until_when )
168176 else :
169- _prior_report = None
170177 for _hit in _hits :
171- yield (
172- _prior_report := _convert_public_usage_report (
173- _hit ['_source' ],
174- _prior_report ,
175- item_is_component = _item_is_component ,
176- )
178+ yield _convert_public_usage_report (
179+ _hit ['_source' ],
180+ item_is_component = _item_is_component ,
177181 )
178182
179183 _es8_bulk_save (es8_metrics .MonthlyPublicItemUsageReportEs8 , _each_new ())
@@ -213,7 +217,6 @@ def _es6_scan_range(
213217 from_when : str = '' ,
214218 until_when : str ,
215219 addl_filter = None ,
216- sort = None ,
217220):
218221 _timestamp_range = {'lt' : until_when }
219222 if from_when :
@@ -224,8 +227,6 @@ def _es6_scan_range(
224227 if addl_filter :
225228 _filters .append (addl_filter )
226229 _query_body = {'query' : {'bool' : {'filter' : _filters }}}
227- if sort :
228- _query_body ['sort' ] = sort
229230 return es6_helpers .scan (
230231 _es6_connection (),
231232 index = es6_recordtype ._template_pattern ,
@@ -391,26 +392,13 @@ def _convert_preprint_metric(
391392
392393def _convert_public_usage_report (
393394 source : dict ,
394- prior_report : es8_metrics .MonthlyPublicItemUsageReportEs8 | None ,
395395 item_is_component : bool ,
396396) -> es8_metrics .MonthlyPublicItemUsageReportEs8 :
397- if prior_report is None :
398- _c_views , _c_view_sess , _c_downloads , _c_download_sess = _get_cumulative_usage (
399- osfid = source ['item_osfid' ],
400- until_when = YearMonth .from_str (source ['report_yearmonth' ]).month_end (),
401- is_preprint = (source .get ('item_type' ) == 'preprint' ),
402- )
403- else :
404- _c_views = prior_report .cumulative_view_count + source .get ('view_count' , 0 )
405- _c_view_sess = prior_report .cumulative_view_session_count + (
406- source .get ('view_session_count' , 0 ) or source .get ('view_count' , 0 )
407- )
408- _c_downloads = prior_report .cumulative_download_count + source .get (
409- 'download_count' , 0
410- )
411- _c_download_sess = prior_report .cumulative_download_session_count + (
412- source .get ('download_session_count' , 0 ) or source .get ('download_count' )
413- )
397+ _c_views , _c_view_sess , _c_downloads , _c_download_sess = _get_cumulative_usage (
398+ osfid = source ['item_osfid' ],
399+ until_when = YearMonth .from_str (source ['report_yearmonth' ]).month_end (),
400+ is_preprint = ('preprint' in source .get ('item_type' , ())),
401+ )
414402 return es8_metrics .MonthlyPublicItemUsageReportEs8 (
415403 cycle_coverage = _semverish_from_yearmonth (source ['report_yearmonth' ]),
416404 item_iri = osfid_iri (source ['item_osfid' ]),
@@ -426,11 +414,11 @@ def _convert_public_usage_report(
426414 provider_ids = source .get ('provider_id' ),
427415 platform_iris = source .get ('platform_iri' ) or [website_settings .DOMAIN ],
428416 view_count = source .get ('view_count' , 0 ),
429- view_session_count = source .get ('view_session_count' , 0 ),
417+ view_session_count = source .get ('view_session_count' ) or source . get ( 'view_count' , 0 ),
430418 cumulative_view_count = _c_views ,
431419 cumulative_view_session_count = _c_view_sess or _c_views ,
432420 download_count = source .get ('download_count' , 0 ),
433- download_session_count = source .get ('download_session_count' , 0 ),
421+ download_session_count = source .get ('download_session_count' ) or source . get ( 'download_count' , 0 ),
434422 cumulative_download_count = _c_downloads ,
435423 cumulative_download_session_count = _c_download_sess or _c_downloads ,
436424 )
@@ -541,26 +529,32 @@ def _cumulative_countedusage_downloads(osfid, until_when) -> tuple[int, int]:
541529
542530
543531def _cumulative_preprint_count (preprint_metric_cls , osfid : str , until_when : str ) -> int :
544- '''aggregate views on each preprint'''
532+ '''aggregate counts on given preprint'''
545533 # copied/adapted from osf.metrics.preprint_metrics
546- _preprint_ids = [osfid ]
547- if osfid .endswith ('_v1' ):
548- # include pre-versioned-guid counts for v1
549- _preprint_ids .append (osfid .removesuffix ('_v1' ))
550534 _search = (
551535 preprint_metric_cls .search ()
552- .filter ('terms' , preprint_id = _preprint_ids )
536+ .filter ('terms' , preprint_id = _synonymous_osfids ( osfid ) )
553537 .filter ('range' , timestamp = {'lt' : until_when })
554538 .extra (size = 0 ) # no hits; only aggs
555539 )
556540 _search .aggs .metric ('agg_count' , 'sum' , field = 'count' )
557541 _response = _search .execute ()
558- _view_count = (
542+ return (
559543 int (_response .aggregations .agg_count .value )
560544 if hasattr (_response .aggregations , 'agg_count' )
561545 else 0
562546 )
563- return _view_count
547+
548+
549+ def _synonymous_osfids (osfid : str ) -> list [str ]:
550+ _synonyms = [osfid ]
551+ if osfid .endswith ('_v1' ):
552+ # include pre-versioned-guid counts for v1
553+ _synonyms .append (osfid .removesuffix ('_v1' ))
554+ elif '_' not in osfid :
555+ # include v1 (if it exists) with unversioned guid
556+ _synonyms .append (f'{ osfid } _v1' )
557+ return _synonyms
564558
565559
566560def _convert_item_type_list (osf_model_names : list [str ] | str , has_surrounding_items : bool ):
@@ -675,6 +669,20 @@ def _each_preprintdownload_osfid(until_when, after_osfid=None) -> collections.ab
675669 return _iter_composite_bucket_keys (_search , 'agg_osfid' , 'osfid' , after = after_osfid )
676670
677671
672+ def _merge_sorted_osfids (* osfid_iterables ):
673+ def _osfids_group_key (osfid : str ):
674+ return ( # v1 same as unversioned
675+ osfid .removesuffix ('_v1' )
676+ if osfid .endswith ('_v1' )
677+ else osfid
678+ )
679+ for _k , _g in itertools .groupby (
680+ heapq .merge (* osfid_iterables ),
681+ key = _osfids_group_key ,
682+ ):
683+ yield _k
684+
685+
678686###
679687# the command itself
680688
@@ -732,12 +740,12 @@ def handle(
732740 self ._clear_es8_data (unchanged , usage_events , usage_reports )
733741 self ._check_started_at (start_now = start )
734742 _default_all = not any ((unchanged , usage_events , usage_reports ))
735- if unchanged or _default_all :
736- self ._handle_unchanged (start = start , no_counts = no_counts )
737- if usage_events or _default_all :
738- self ._handle_usage_events (start = start , no_counts = no_counts )
739743 if usage_reports or _default_all :
740744 self ._handle_usage_reports (start = start , no_counts = no_counts )
745+ if usage_events or _default_all :
746+ self ._handle_usage_events (start = start , no_counts = no_counts )
747+ if unchanged or _default_all :
748+ self ._handle_unchanged (start = start , no_counts = no_counts )
741749 if not no_counts :
742750 self .stdout .write ('(counts may be approximate)' )
743751
@@ -845,15 +853,7 @@ def _handle_usage_reports(self, *, start: bool, no_counts: bool):
845853 self .stdout .write (
846854 f'starting per-item { es6_reports .PublicItemUsageReport .__name__ } => { es8_metrics .MonthlyPublicItemUsageReportEs8 .__name__ } '
847855 )
848- for _osfid in _zip_sorted (
849- _each_usage_report_osfid (until_when = self ._migration_started_at ),
850- _each_countedusage_osfid (until_when = self ._migration_started_at ),
851- _each_preprintview_osfid (until_when = self ._migration_started_at ),
852- _each_preprintdownload_osfid (until_when = self ._migration_started_at ),
853- ):
854- migrate_usage_reports .delay (
855- _osfid , self ._migration_started_at .isoformat ()
856- )
856+ schedule_migrate_usage_reports .delay (self ._migration_started_at .isoformat ())
857857
858858 def _check_started_at (self , start_now ):
859859 _started_at = self ._migration_started_at
@@ -885,9 +885,9 @@ def _clear_es8_data(self, unchanged, usage_events, usage_reports):
885885 if _default_all or unchanged :
886886 _to_clear .extend (_UNCHANGED_RECORDTYPES .values ())
887887 if _default_all or usage_events :
888- _to_clear .append (es8_metrics .MonthlyPublicItemUsageReportEs8 )
889- if _default_all or usage_reports :
890888 _to_clear .append (es8_metrics .OsfCountedUsageEvent )
889+ if _default_all or usage_reports :
890+ _to_clear .append (es8_metrics .MonthlyPublicItemUsageReportEs8 )
891891 for _es8_recordtype in _to_clear :
892892 self .stdout .write (
893893 f'clearing { _es8_recordtype .__name__ } ' , self .style .NOTICE
0 commit comments