1818
1919from framework .celery_tasks import app as celery_app
2020from osf .metadata .rdfutils import OSF
21- from osf .metadata .osfmap_utils import osfmap_type_from_model , is_osf_component
21+ from osf .metadata .osfmap_utils import is_osf_component
2222from osf .metrics .preprint_metrics import (
2323 PreprintView ,
2424 PreprintDownload ,
2525)
26- from osf .metrics .counted_usage import CountedAuthUsage as CountedUsageEs6
26+ from osf .metrics .counted_usage import (
27+ CountedAuthUsage as CountedUsageEs6 ,
28+ get_provider_id ,
29+ )
2730from osf .metrics import reports as es6_reports
2831from osf .metrics import es8_metrics , RegistriesModerationMetrics
29- from osf .metrics .reporters .public_item_usage import _iter_composite_bucket_keys
30- from osf .metrics .utils import YearMonth
32+ from osf .metrics .reporters .public_item_usage import (
33+ _iter_composite_bucket_keys ,
34+ _zip_sorted ,
35+ )
36+ from osf .metrics .utils import (
37+ YearMonth ,
38+ get_database_iri ,
39+ get_item_type ,
40+ get_item_type_from_model ,
41+ get_item_type_from_iri ,
42+ )
3143from osf import models as osfdb
3244from osf .models .base import osfid_iri
3345from website import settings as website_settings
4254
4355_MAX_CARDINALITY_PRECISION = 40000 # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-cardinality-aggregation.html#_precision_control
4456
57+ _COMPOSITE_CHUNK_SIZE = 500
58+
4559_UNCHANGED_RECORDTYPES = {
4660 # reports
4761 es6_reports .StorageAddonUsage : es8_metrics .DailyStorageAddonUsageReportEs8 ,
@@ -135,8 +149,8 @@ def migrate_preprint_downloads(from_when: str, until_when: str):
135149@celery_app .task (** _TASK_KWARGS )
136150def migrate_usage_reports (osfid : str , until_when : str ):
137151 # from PublicItemUsageReport to MonthlyPublicItemUsageReportEs8
138- _osfguid = osfdb .Guid .load (osfid )
139- _item_is_component = is_osf_component (_osfguid . referent ) if _osfguid else False
152+ _osfobj , _ = osfdb .Guid .load_referent (osfid )
153+ _item_is_component = is_osf_component (_osfobj ) if _osfobj else False
140154
141155 def _each_new ():
142156 # go in sorted order to build cumulative counts
@@ -147,15 +161,21 @@ def _each_new():
147161 addl_filter = {'term' : {'item_osfid' : osfid }},
148162 sort = 'report_yearmonth' ,
149163 )
150- _prior_report = None
151- for _hit in list (_each_hit ):
152- yield (
153- _prior_report := _convert_public_usage_report (
154- _hit ['_source' ],
155- _prior_report ,
156- item_is_component = _item_is_component ,
164+ _hits = list (_each_hit )
165+ if _osfobj and not _hits :
166+ # this item has usages, but only before the monthly usage reparts started
167+ # -- create one for cumulative counts (if the object still exists)
168+ yield _backfill_old_usage_report (_osfobj , _item_is_component , until_when )
169+ else :
170+ _prior_report = None
171+ for _hit in _hits :
172+ yield (
173+ _prior_report := _convert_public_usage_report (
174+ _hit ['_source' ],
175+ _prior_report ,
176+ item_is_component = _item_is_component ,
177+ )
157178 )
158- )
159179
160180 _es8_bulk_save (es8_metrics .MonthlyPublicItemUsageReportEs8 , _each_new ())
161181
@@ -289,8 +309,8 @@ def _assert_field_unchangedness(es6_recordtype, es8_recordtype):
289309 assert _es6_fields == _es8_fields
290310
291311
292- def _semverish_from_yearmonth (given_yearmonth : str ):
293- _ym = YearMonth .from_str (given_yearmonth )
312+ def _semverish_from_yearmonth (given_yearmonth ):
313+ _ym = YearMonth .from_any (given_yearmonth )
294314 return f'{ _ym .year } .{ _ym .month } '
295315
296316
@@ -362,7 +382,7 @@ def _convert_preprint_metric(
362382 ),
363383 # fields from OsfCountedUsageEvent:
364384 item_osfid = _source ['preprint_id' ],
365- item_type = OSF . Preprint ,
385+ item_type = ' Preprint' ,
366386 item_public = True ,
367387 provider_id = _source .get ('provider_id' ),
368388 user_is_authenticated = bool (_source .get ('user_id' )),
@@ -383,14 +403,14 @@ def _convert_public_usage_report(
383403 )
384404 else :
385405 _c_views = prior_report .cumulative_view_count + source .get ('view_count' , 0 )
386- _c_view_sess = prior_report .cumulative_view_session_count + source . get (
387- 'view_session_count' , 0
406+ _c_view_sess = prior_report .cumulative_view_session_count + (
407+ source . get ( 'view_session_count' , 0 ) or source . get ( 'view_count' , 0 )
388408 )
389409 _c_downloads = prior_report .cumulative_download_count + source .get (
390410 'download_count' , 0
391411 )
392- _c_download_sess = prior_report .cumulative_download_session_count + source . get (
393- 'download_session_count' , 0
412+ _c_download_sess = prior_report .cumulative_download_session_count + (
413+ source . get ( 'download_session_count' , 0 ) or source . get ( 'download_count' )
394414 )
395415 return es8_metrics .MonthlyPublicItemUsageReportEs8 (
396416 cycle_coverage = _semverish_from_yearmonth (source ['report_yearmonth' ]),
@@ -409,11 +429,37 @@ def _convert_public_usage_report(
409429 view_count = source .get ('view_count' , 0 ),
410430 view_session_count = source .get ('view_session_count' , 0 ),
411431 cumulative_view_count = _c_views ,
412- cumulative_view_session_count = _c_view_sess ,
432+ cumulative_view_session_count = _c_view_sess or _c_views ,
413433 download_count = source .get ('download_count' , 0 ),
414434 download_session_count = source .get ('download_session_count' , 0 ),
415435 cumulative_download_count = _c_downloads ,
416- cumulative_download_session_count = _c_download_sess ,
436+ cumulative_download_session_count = _c_download_sess or _c_downloads ,
437+ )
438+
439+
440+ def _backfill_old_usage_report (osf_obj , is_component : bool , until_when : str ):
441+ _last_month = YearMonth .from_date (datetime .datetime .fromisoformat (until_when )).prior ()
442+ _c_views , _c_view_sess , _c_downloads , _c_download_sess = _get_cumulative_usage (
443+ osfid = osf_obj ._id ,
444+ until_when = _last_month .month_end ().isoformat (),
445+ is_preprint = isinstance (osf_obj , osfdb .Preprint ),
446+ )
447+ return es8_metrics .MonthlyPublicItemUsageReportEs8 (
448+ cycle_coverage = _semverish_from_yearmonth (_last_month ),
449+ item_iri = osfid_iri (osf_obj ._id ),
450+ item_osfids = [osf_obj ._id ],
451+ item_types = [get_item_type (osf_obj )],
452+ provider_ids = [get_provider_id (osf_obj )],
453+ database_iris = [get_database_iri (osf_obj )],
454+ platform_iris = [website_settings .DOMAIN ],
455+ view_count = 0 ,
456+ view_session_count = 0 ,
457+ cumulative_view_count = _c_views ,
458+ cumulative_view_session_count = _c_view_sess or _c_views ,
459+ download_count = 0 ,
460+ download_session_count = 0 ,
461+ cumulative_download_count = _c_downloads ,
462+ cumulative_download_session_count = _c_download_sess or _c_downloads ,
417463 )
418464
419465
@@ -497,9 +543,13 @@ def _cumulative_countedusage_downloads(osfid, until_when) -> tuple[int, int]:
497543def _cumulative_preprint_count (preprint_metric_cls , osfid : str , until_when : str ) -> int :
498544 '''aggregate views on each preprint'''
499545 # copied/adapted from osf.metrics.preprint_metrics
546+ _preprint_ids = [osfid ]
547+ if osfid .endswith ('_v1' ):
548+ # include pre-versioned-guid counts for v1
549+ _preprint_ids .append (osfid .removesuffix ('_v1' ))
500550 _search = (
501551 preprint_metric_cls .search ()
502- .filter ('term ' , preprint_id = osfid )
552+ .filter ('terms ' , preprint_id = _preprint_ids )
503553 .filter ('range' , timestamp = {'lt' : until_when })
504554 .extra (size = 0 ) # no hits; only aggs
505555 )
@@ -525,13 +575,13 @@ def _convert_item_type_list(osf_model_names: list[str] | str, has_surrounding_it
525575def _convert_item_type (osf_model_name : str | None , has_surrounding_items : bool ):
526576 if osf_model_name :
527577 try :
528- return osfmap_type_from_model (
578+ return get_item_type_from_model (
529579 apps .get_model ('osf' , osf_model_name ),
530580 is_component = has_surrounding_items ,
531581 )
532582 except LookupError :
533583 pass
534- return OSF .Object # fine, fallback to abstract type
584+ return get_item_type_from_iri ( OSF .Object ) # fallback abstract osf:Object
535585
536586
537587def _convert_database_iri_list (provider_ids : list [str ], osf_model_names : list [str ]):
@@ -578,10 +628,56 @@ def _each_usage_report_osfid(until_when, after_osfid=None):
578628 return _iter_composite_bucket_keys (_search , 'agg_osfid' , 'osfid' , after = after_osfid )
579629
580630
631+ def _each_countedusage_osfid (until_when , after_osfid = None ) -> collections .abc .Iterator [str ]:
632+ _search = (
633+ CountedUsageEs6 .search ()
634+ .filter ('term' , item_public = True )
635+ .filter ('terms' , action_labels = ['view' , 'download' ])
636+ .filter ('range' , timestamp = {'lt' : until_when })
637+ .extra (size = 0 ) # only aggregations, no hits
638+ )
639+ _search .aggs .bucket (
640+ 'agg_osfid' ,
641+ 'composite' ,
642+ sources = [{'osfid' : {'terms' : {'field' : 'item_guid' }}}],
643+ size = _COMPOSITE_CHUNK_SIZE ,
644+ )
645+ return _iter_composite_bucket_keys (_search , 'agg_osfid' , 'osfid' , after = after_osfid )
646+
647+
648+ def _each_preprintview_osfid (until_when , after_osfid = None ) -> collections .abc .Iterator [str ]:
649+ _search = (
650+ PreprintView .search ()
651+ .filter ('range' , timestamp = {'lt' : until_when })
652+ .extra (size = 0 ) # only aggregations, no hits
653+ )
654+ _search .aggs .bucket (
655+ 'agg_osfid' ,
656+ 'composite' ,
657+ sources = [{'osfid' : {'terms' : {'field' : 'preprint_id' }}}],
658+ size = _COMPOSITE_CHUNK_SIZE ,
659+ )
660+ return _iter_composite_bucket_keys (_search , 'agg_osfid' , 'osfid' , after = after_osfid )
661+
662+
663+ def _each_preprintdownload_osfid (until_when , after_osfid = None ) -> collections .abc .Iterator [str ]:
664+ _search = (
665+ PreprintDownload .search ()
666+ .filter ('range' , timestamp = {'lt' : until_when })
667+ .extra (size = 0 ) # only aggregations, no hits
668+ )
669+ _search .aggs .bucket (
670+ 'agg_osfid' ,
671+ 'composite' ,
672+ sources = [{'osfid' : {'terms' : {'field' : 'preprint_id' }}}],
673+ size = _COMPOSITE_CHUNK_SIZE ,
674+ )
675+ return _iter_composite_bucket_keys (_search , 'agg_osfid' , 'osfid' , after = after_osfid )
676+
677+
581678###
582679# the command itself
583680
584-
585681class Command (BaseCommand ):
586682 def add_arguments (self , parser ):
587683 parser .add_argument (
@@ -750,14 +846,17 @@ def _handle_usage_reports(self, *, start: bool, no_counts: bool):
750846 _es8_item_count ,
751847 style = self ._eq_style (_es8_item_count , _es6_item_count ),
752848 )
753- # (if --start) schedule task per item (by composite agg on es6 public usage reports)
849+ # (if --start) schedule task per item (by composite agg on es6 usage reports and events )
754850 # each item-task iter thru reports oldest to newest, adding cumulative counts
755851 if start :
756852 self .stdout .write (
757853 f'starting per-item { es6_reports .PublicItemUsageReport .__name__ } => { es8_metrics .MonthlyPublicItemUsageReportEs8 .__name__ } '
758854 )
759- for _osfid in _each_usage_report_osfid (
760- until_when = self ._migration_started_at
855+ for _osfid in _zip_sorted (
856+ _each_usage_report_osfid (until_when = self ._migration_started_at ),
857+ _each_countedusage_osfid (until_when = self ._migration_started_at ),
858+ _each_preprintview_osfid (until_when = self ._migration_started_at ),
859+ _each_preprintdownload_osfid (until_when = self ._migration_started_at ),
761860 ):
762861 migrate_usage_reports .delay (
763862 _osfid , self ._migration_started_at .isoformat ()
0 commit comments