44import logging
55
66from django .apps import apps
7- from django .core .management import call_command
87from django .core .management .base import BaseCommand
98from django .db import OperationalError as DjangoOperationalError
109from elasticsearch6 .exceptions import ConnectionError as Elastic6ConnectionError
1817
1918from framework .celery_tasks import app as celery_app
2019from osf .metadata .rdfutils import OSF
21- from osf .metadata .osfmap_utils import osfmap_type_from_model , is_osf_component
20+ from osf .metadata .osfmap_utils import is_osf_component
2221from osf .metrics .preprint_metrics import (
2322 PreprintView ,
2423 PreprintDownload ,
2524)
26- from osf .metrics .counted_usage import CountedAuthUsage as CountedUsageEs6
25+ from osf .metrics .counted_usage import (
26+ CountedAuthUsage as CountedUsageEs6 ,
27+ get_provider_id ,
28+ )
2729from osf .metrics import reports as es6_reports
2830from osf .metrics import es8_metrics , RegistriesModerationMetrics
29- from osf .metrics .reporters .public_item_usage import _iter_composite_bucket_keys
30- from osf .metrics .utils import YearMonth
31+ from osf .metrics .reporters .public_item_usage import (
32+ _iter_composite_bucket_keys ,
33+ _zip_sorted ,
34+ )
35+ from osf .metrics .utils import (
36+ YearMonth ,
37+ get_database_iri ,
38+ get_item_type ,
39+ get_item_type_from_model ,
40+ get_item_type_from_iri ,
41+ )
3142from osf import models as osfdb
3243from osf .models .base import osfid_iri
3344from website import settings as website_settings
4253
4354_MAX_CARDINALITY_PRECISION = 40000 # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-cardinality-aggregation.html#_precision_control
4455
56+ _COMPOSITE_CHUNK_SIZE = 500
57+
4558_UNCHANGED_RECORDTYPES = {
4659 # reports
4760 es6_reports .StorageAddonUsage : es8_metrics .DailyStorageAddonUsageReportEs8 ,
@@ -135,8 +148,8 @@ def migrate_preprint_downloads(from_when: str, until_when: str):
135148@celery_app .task (** _TASK_KWARGS )
136149def migrate_usage_reports (osfid : str , until_when : str ):
137150 # from PublicItemUsageReport to MonthlyPublicItemUsageReportEs8
138- _osfguid = osfdb .Guid .load (osfid )
139- _item_is_component = is_osf_component (_osfguid . referent ) if _osfguid else False
151+ _osfobj , _ = osfdb .Guid .load_referent (osfid )
152+ _item_is_component = is_osf_component (_osfobj ) if _osfobj else False
140153
141154 def _each_new ():
142155 # go in sorted order to build cumulative counts
@@ -147,15 +160,21 @@ def _each_new():
147160 addl_filter = {'term' : {'item_osfid' : osfid }},
148161 sort = 'report_yearmonth' ,
149162 )
150- _prior_report = None
151- for _hit in list (_each_hit ):
152- yield (
153- _prior_report := _convert_public_usage_report (
154- _hit ['_source' ],
155- _prior_report ,
156- item_is_component = _item_is_component ,
163+ _hits = list (_each_hit )
164+ if _osfobj and not _hits :
165+ # this item has usages, but only before the monthly usage reparts started
166+ # -- create one for cumulative counts (if the object still exists)
167+ yield _backfill_old_usage_report (_osfobj , _item_is_component , until_when )
168+ else :
169+ _prior_report = None
170+ for _hit in _hits :
171+ yield (
172+ _prior_report := _convert_public_usage_report (
173+ _hit ['_source' ],
174+ _prior_report ,
175+ item_is_component = _item_is_component ,
176+ )
157177 )
158- )
159178
160179 _es8_bulk_save (es8_metrics .MonthlyPublicItemUsageReportEs8 , _each_new ())
161180
@@ -289,8 +308,8 @@ def _assert_field_unchangedness(es6_recordtype, es8_recordtype):
289308 assert _es6_fields == _es8_fields
290309
291310
292- def _semverish_from_yearmonth (given_yearmonth : str ):
293- _ym = YearMonth .from_str (given_yearmonth )
311+ def _semverish_from_yearmonth (given_yearmonth ):
312+ _ym = YearMonth .from_any (given_yearmonth )
294313 return f'{ _ym .year } .{ _ym .month } '
295314
296315
@@ -362,7 +381,7 @@ def _convert_preprint_metric(
362381 ),
363382 # fields from OsfCountedUsageEvent:
364383 item_osfid = _source ['preprint_id' ],
365- item_type = OSF . Preprint ,
384+ item_type = ' Preprint' ,
366385 item_public = True ,
367386 provider_id = _source .get ('provider_id' ),
368387 user_is_authenticated = bool (_source .get ('user_id' )),
@@ -383,14 +402,14 @@ def _convert_public_usage_report(
383402 )
384403 else :
385404 _c_views = prior_report .cumulative_view_count + source .get ('view_count' , 0 )
386- _c_view_sess = prior_report .cumulative_view_session_count + source . get (
387- 'view_session_count' , 0
405+ _c_view_sess = prior_report .cumulative_view_session_count + (
406+ source . get ( 'view_session_count' , 0 ) or source . get ( 'view_count' , 0 )
388407 )
389408 _c_downloads = prior_report .cumulative_download_count + source .get (
390409 'download_count' , 0
391410 )
392- _c_download_sess = prior_report .cumulative_download_session_count + source . get (
393- 'download_session_count' , 0
411+ _c_download_sess = prior_report .cumulative_download_session_count + (
412+ source . get ( 'download_session_count' , 0 ) or source . get ( 'download_count' )
394413 )
395414 return es8_metrics .MonthlyPublicItemUsageReportEs8 (
396415 cycle_coverage = _semverish_from_yearmonth (source ['report_yearmonth' ]),
@@ -409,11 +428,38 @@ def _convert_public_usage_report(
409428 view_count = source .get ('view_count' , 0 ),
410429 view_session_count = source .get ('view_session_count' , 0 ),
411430 cumulative_view_count = _c_views ,
412- cumulative_view_session_count = _c_view_sess ,
431+ cumulative_view_session_count = _c_view_sess or _c_views ,
413432 download_count = source .get ('download_count' , 0 ),
414433 download_session_count = source .get ('download_session_count' , 0 ),
415434 cumulative_download_count = _c_downloads ,
416- cumulative_download_session_count = _c_download_sess ,
435+ cumulative_download_session_count = _c_download_sess or _c_downloads ,
436+ )
437+
438+
439+ def _backfill_old_usage_report (osf_obj , is_component : bool , until_when : str ):
440+ # add a "last month" report with cumulative counts up to that point
441+ _last_month = YearMonth .from_date (datetime .datetime .fromisoformat (until_when )).prior ()
442+ _c_views , _c_view_sess , _c_downloads , _c_download_sess = _get_cumulative_usage (
443+ osfid = osf_obj ._id ,
444+ until_when = _last_month .month_end ().isoformat (),
445+ is_preprint = isinstance (osf_obj , osfdb .Preprint ),
446+ )
447+ return es8_metrics .MonthlyPublicItemUsageReportEs8 (
448+ cycle_coverage = _semverish_from_yearmonth (_last_month ),
449+ item_iri = osfid_iri (osf_obj ._id ),
450+ item_osfids = [osf_obj ._id ],
451+ item_types = [get_item_type (osf_obj )],
452+ provider_ids = [get_provider_id (osf_obj )],
453+ database_iris = [get_database_iri (osf_obj )],
454+ platform_iris = [website_settings .DOMAIN ],
455+ view_count = 0 ,
456+ view_session_count = 0 ,
457+ cumulative_view_count = _c_views ,
458+ cumulative_view_session_count = _c_view_sess or _c_views ,
459+ download_count = 0 ,
460+ download_session_count = 0 ,
461+ cumulative_download_count = _c_downloads ,
462+ cumulative_download_session_count = _c_download_sess or _c_downloads ,
417463 )
418464
419465
@@ -497,9 +543,13 @@ def _cumulative_countedusage_downloads(osfid, until_when) -> tuple[int, int]:
497543def _cumulative_preprint_count (preprint_metric_cls , osfid : str , until_when : str ) -> int :
498544 '''aggregate views on each preprint'''
499545 # copied/adapted from osf.metrics.preprint_metrics
546+ _preprint_ids = [osfid ]
547+ if osfid .endswith ('_v1' ):
548+ # include pre-versioned-guid counts for v1
549+ _preprint_ids .append (osfid .removesuffix ('_v1' ))
500550 _search = (
501551 preprint_metric_cls .search ()
502- .filter ('term ' , preprint_id = osfid )
552+ .filter ('terms ' , preprint_id = _preprint_ids )
503553 .filter ('range' , timestamp = {'lt' : until_when })
504554 .extra (size = 0 ) # no hits; only aggs
505555 )
@@ -525,13 +575,13 @@ def _convert_item_type_list(osf_model_names: list[str] | str, has_surrounding_it
525575def _convert_item_type (osf_model_name : str | None , has_surrounding_items : bool ):
526576 if osf_model_name :
527577 try :
528- return osfmap_type_from_model (
578+ return get_item_type_from_model (
529579 apps .get_model ('osf' , osf_model_name ),
530580 is_component = has_surrounding_items ,
531581 )
532582 except LookupError :
533583 pass
534- return OSF .Object # fine, fallback to abstract type
584+ return get_item_type_from_iri ( OSF .Object ) # fallback abstract osf:Object
535585
536586
537587def _convert_database_iri_list (provider_ids : list [str ], osf_model_names : list [str ]):
@@ -578,16 +628,58 @@ def _each_usage_report_osfid(until_when, after_osfid=None):
578628 return _iter_composite_bucket_keys (_search , 'agg_osfid' , 'osfid' , after = after_osfid )
579629
580630
631+ def _each_countedusage_osfid (until_when , after_osfid = None ) -> collections .abc .Iterator [str ]:
632+ _search = (
633+ CountedUsageEs6 .search ()
634+ .filter ('term' , item_public = True )
635+ .filter ('terms' , action_labels = ['view' , 'download' ])
636+ .filter ('range' , timestamp = {'lt' : until_when })
637+ .extra (size = 0 ) # only aggregations, no hits
638+ )
639+ _search .aggs .bucket (
640+ 'agg_osfid' ,
641+ 'composite' ,
642+ sources = [{'osfid' : {'terms' : {'field' : 'item_guid' }}}],
643+ size = _COMPOSITE_CHUNK_SIZE ,
644+ )
645+ return _iter_composite_bucket_keys (_search , 'agg_osfid' , 'osfid' , after = after_osfid )
646+
647+
648+ def _each_preprintview_osfid (until_when , after_osfid = None ) -> collections .abc .Iterator [str ]:
649+ _search = (
650+ PreprintView .search ()
651+ .filter ('range' , timestamp = {'lt' : until_when })
652+ .extra (size = 0 ) # only aggregations, no hits
653+ )
654+ _search .aggs .bucket (
655+ 'agg_osfid' ,
656+ 'composite' ,
657+ sources = [{'osfid' : {'terms' : {'field' : 'preprint_id' }}}],
658+ size = _COMPOSITE_CHUNK_SIZE ,
659+ )
660+ return _iter_composite_bucket_keys (_search , 'agg_osfid' , 'osfid' , after = after_osfid )
661+
662+
663+ def _each_preprintdownload_osfid (until_when , after_osfid = None ) -> collections .abc .Iterator [str ]:
664+ _search = (
665+ PreprintDownload .search ()
666+ .filter ('range' , timestamp = {'lt' : until_when })
667+ .extra (size = 0 ) # only aggregations, no hits
668+ )
669+ _search .aggs .bucket (
670+ 'agg_osfid' ,
671+ 'composite' ,
672+ sources = [{'osfid' : {'terms' : {'field' : 'preprint_id' }}}],
673+ size = _COMPOSITE_CHUNK_SIZE ,
674+ )
675+ return _iter_composite_bucket_keys (_search , 'agg_osfid' , 'osfid' , after = after_osfid )
676+
677+
581678###
582679# the command itself
583680
584-
585681class Command (BaseCommand ):
586682 def add_arguments (self , parser ):
587- parser .add_argument (
588- '--no-setup' ,
589- action = 'store_true' ,
590- )
591683 parser .add_argument (
592684 '--no-counts' ,
593685 action = 'store_true' ,
@@ -624,7 +716,6 @@ def _migration_started_at(self):
624716 def handle (
625717 self ,
626718 * ,
627- no_setup ,
628719 no_counts ,
629720 clear_state ,
630721 clear_es8_data ,
@@ -635,8 +726,6 @@ def handle(
635726 ** kwargs ,
636727 ):
637728 self ._quiet_chatty_loggers ()
638- if not no_setup :
639- call_command ('djelme_backend_setup' )
640729 if clear_state :
641730 self ._clear_state ()
642731 if clear_es8_data :
@@ -750,14 +839,17 @@ def _handle_usage_reports(self, *, start: bool, no_counts: bool):
750839 _es8_item_count ,
751840 style = self ._eq_style (_es8_item_count , _es6_item_count ),
752841 )
753- # (if --start) schedule task per item (by composite agg on es6 public usage reports)
842+ # (if --start) schedule task per item (by composite agg on es6 usage reports and events )
754843 # each item-task iter thru reports oldest to newest, adding cumulative counts
755844 if start :
756845 self .stdout .write (
757846 f'starting per-item { es6_reports .PublicItemUsageReport .__name__ } => { es8_metrics .MonthlyPublicItemUsageReportEs8 .__name__ } '
758847 )
759- for _osfid in _each_usage_report_osfid (
760- until_when = self ._migration_started_at
848+ for _osfid in _zip_sorted (
849+ _each_usage_report_osfid (until_when = self ._migration_started_at ),
850+ _each_countedusage_osfid (until_when = self ._migration_started_at ),
851+ _each_preprintview_osfid (until_when = self ._migration_started_at ),
852+ _each_preprintdownload_osfid (until_when = self ._migration_started_at ),
761853 ):
762854 migrate_usage_reports .delay (
763855 _osfid , self ._migration_started_at .isoformat ()
0 commit comments