From 9f9a65833d04c45b8ad4d1040d53dd9d7828b052 Mon Sep 17 00:00:00 2001 From: abram axel booth Date: Fri, 15 May 2026 09:20:48 -0400 Subject: [PATCH 1/2] fix: migrate pre-version preprint usage counts in osfmetrics 6to8 migration, when building cumulative view/download counts for preprints, make sure to include usage stored under the preprint's unversioned osfid with the osfid_v1 usage reports (since PreprintView/PreprintDownload started saving under versioned osfid) so those older views don't get lost in migration --- .../commands/migrate_osfmetrics_6to8.py | 159 ++++++++++++++---- osf/metrics/es8_metrics.py | 22 +-- osf/metrics/reporters/public_item_usage.py | 9 +- osf/metrics/utils.py | 32 ++++ osf_tests/metrics/test_es8_metrics.py | 6 +- 5 files changed, 179 insertions(+), 49 deletions(-) diff --git a/osf/management/commands/migrate_osfmetrics_6to8.py b/osf/management/commands/migrate_osfmetrics_6to8.py index 85e5c9443d5..ed67ed18ba7 100644 --- a/osf/management/commands/migrate_osfmetrics_6to8.py +++ b/osf/management/commands/migrate_osfmetrics_6to8.py @@ -18,16 +18,28 @@ from framework.celery_tasks import app as celery_app from osf.metadata.rdfutils import OSF -from osf.metadata.osfmap_utils import osfmap_type_from_model, is_osf_component +from osf.metadata.osfmap_utils import is_osf_component from osf.metrics.preprint_metrics import ( PreprintView, PreprintDownload, ) -from osf.metrics.counted_usage import CountedAuthUsage as CountedUsageEs6 +from osf.metrics.counted_usage import ( + CountedAuthUsage as CountedUsageEs6, + get_provider_id, +) from osf.metrics import reports as es6_reports from osf.metrics import es8_metrics, RegistriesModerationMetrics -from osf.metrics.reporters.public_item_usage import _iter_composite_bucket_keys -from osf.metrics.utils import YearMonth +from osf.metrics.reporters.public_item_usage import ( + _iter_composite_bucket_keys, + _zip_sorted, +) +from osf.metrics.utils import ( + YearMonth, + get_database_iri, + get_item_type, + get_item_type_from_model, + get_item_type_from_iri, +) from osf import models as osfdb from osf.models.base import osfid_iri from website import settings as website_settings @@ -42,6 +54,8 @@ _MAX_CARDINALITY_PRECISION = 40000 # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-cardinality-aggregation.html#_precision_control +_COMPOSITE_CHUNK_SIZE = 500 + _UNCHANGED_RECORDTYPES = { # reports es6_reports.StorageAddonUsage: es8_metrics.DailyStorageAddonUsageReportEs8, @@ -135,8 +149,8 @@ def migrate_preprint_downloads(from_when: str, until_when: str): @celery_app.task(**_TASK_KWARGS) def migrate_usage_reports(osfid: str, until_when: str): # from PublicItemUsageReport to MonthlyPublicItemUsageReportEs8 - _osfguid = osfdb.Guid.load(osfid) - _item_is_component = is_osf_component(_osfguid.referent) if _osfguid else False + _osfobj, _ = osfdb.Guid.load_referent(osfid) + _item_is_component = is_osf_component(_osfobj) if _osfobj else False def _each_new(): # go in sorted order to build cumulative counts @@ -147,15 +161,21 @@ def _each_new(): addl_filter={'term': {'item_osfid': osfid}}, sort='report_yearmonth', ) - _prior_report = None - for _hit in list(_each_hit): - yield ( - _prior_report := _convert_public_usage_report( - _hit['_source'], - _prior_report, - item_is_component=_item_is_component, + _hits = list(_each_hit) + if _osfobj and not _hits: + # this item has usages, but only before the monthly usage reparts started + # -- create one for cumulative counts (if the object still exists) + yield _backfill_old_usage_report(_osfobj, _item_is_component, until_when) + else: + _prior_report = None + for _hit in _hits: + yield ( + _prior_report := _convert_public_usage_report( + _hit['_source'], + _prior_report, + item_is_component=_item_is_component, + ) ) - ) _es8_bulk_save(es8_metrics.MonthlyPublicItemUsageReportEs8, _each_new()) @@ -289,8 +309,8 @@ def _assert_field_unchangedness(es6_recordtype, es8_recordtype): assert _es6_fields == _es8_fields -def _semverish_from_yearmonth(given_yearmonth: str): - _ym = YearMonth.from_str(given_yearmonth) +def _semverish_from_yearmonth(given_yearmonth): + _ym = YearMonth.from_any(given_yearmonth) return f'{_ym.year}.{_ym.month}' @@ -362,7 +382,7 @@ def _convert_preprint_metric( ), # fields from OsfCountedUsageEvent: item_osfid=_source['preprint_id'], - item_type=OSF.Preprint, + item_type='Preprint', item_public=True, provider_id=_source.get('provider_id'), user_is_authenticated=bool(_source.get('user_id')), @@ -383,14 +403,14 @@ def _convert_public_usage_report( ) else: _c_views = prior_report.cumulative_view_count + source.get('view_count', 0) - _c_view_sess = prior_report.cumulative_view_session_count + source.get( - 'view_session_count', 0 + _c_view_sess = prior_report.cumulative_view_session_count + ( + source.get('view_session_count', 0) or source.get('view_count', 0) ) _c_downloads = prior_report.cumulative_download_count + source.get( 'download_count', 0 ) - _c_download_sess = prior_report.cumulative_download_session_count + source.get( - 'download_session_count', 0 + _c_download_sess = prior_report.cumulative_download_session_count + ( + source.get('download_session_count', 0) or source.get('download_count') ) return es8_metrics.MonthlyPublicItemUsageReportEs8( cycle_coverage=_semverish_from_yearmonth(source['report_yearmonth']), @@ -409,11 +429,37 @@ def _convert_public_usage_report( view_count=source.get('view_count', 0), view_session_count=source.get('view_session_count', 0), cumulative_view_count=_c_views, - cumulative_view_session_count=_c_view_sess, + cumulative_view_session_count=_c_view_sess or _c_views, download_count=source.get('download_count', 0), download_session_count=source.get('download_session_count', 0), cumulative_download_count=_c_downloads, - cumulative_download_session_count=_c_download_sess, + cumulative_download_session_count=_c_download_sess or _c_downloads, + ) + + +def _backfill_old_usage_report(osf_obj, is_component: bool, until_when: str): + _last_month = YearMonth.from_date(datetime.datetime.fromisoformat(until_when)).prior() + _c_views, _c_view_sess, _c_downloads, _c_download_sess = _get_cumulative_usage( + osfid=osf_obj._id, + until_when=_last_month.month_end().isoformat(), + is_preprint=isinstance(osf_obj, osfdb.Preprint), + ) + return es8_metrics.MonthlyPublicItemUsageReportEs8( + cycle_coverage=_semverish_from_yearmonth(_last_month), + item_iri=osfid_iri(osf_obj._id), + item_osfids=[osf_obj._id], + item_types=[get_item_type(osf_obj)], + provider_ids=[get_provider_id(osf_obj)], + database_iris=[get_database_iri(osf_obj)], + platform_iris=[website_settings.DOMAIN], + view_count=0, + view_session_count=0, + cumulative_view_count=_c_views, + cumulative_view_session_count=_c_view_sess or _c_views, + download_count=0, + download_session_count=0, + cumulative_download_count=_c_downloads, + cumulative_download_session_count=_c_download_sess or _c_downloads, ) @@ -497,9 +543,13 @@ def _cumulative_countedusage_downloads(osfid, until_when) -> tuple[int, int]: def _cumulative_preprint_count(preprint_metric_cls, osfid: str, until_when: str) -> int: '''aggregate views on each preprint''' # copied/adapted from osf.metrics.preprint_metrics + _preprint_ids = [osfid] + if osfid.endswith('_v1'): + # include pre-versioned-guid counts for v1 + _preprint_ids.append(osfid.removesuffix('_v1')) _search = ( preprint_metric_cls.search() - .filter('term', preprint_id=osfid) + .filter('terms', preprint_id=_preprint_ids) .filter('range', timestamp={'lt': until_when}) .extra(size=0) # no hits; only aggs ) @@ -525,13 +575,13 @@ def _convert_item_type_list(osf_model_names: list[str] | str, has_surrounding_it def _convert_item_type(osf_model_name: str | None, has_surrounding_items: bool): if osf_model_name: try: - return osfmap_type_from_model( + return get_item_type_from_model( apps.get_model('osf', osf_model_name), is_component=has_surrounding_items, ) except LookupError: pass - return OSF.Object # fine, fallback to abstract type + return get_item_type_from_iri(OSF.Object) # fallback abstract osf:Object def _convert_database_iri_list(provider_ids: list[str], osf_model_names: list[str]): @@ -578,10 +628,56 @@ def _each_usage_report_osfid(until_when, after_osfid=None): return _iter_composite_bucket_keys(_search, 'agg_osfid', 'osfid', after=after_osfid) +def _each_countedusage_osfid(until_when, after_osfid=None) -> collections.abc.Iterator[str]: + _search = ( + CountedUsageEs6.search() + .filter('term', item_public=True) + .filter('terms', action_labels=['view', 'download']) + .filter('range', timestamp={'lt': until_when}) + .extra(size=0) # only aggregations, no hits + ) + _search.aggs.bucket( + 'agg_osfid', + 'composite', + sources=[{'osfid': {'terms': {'field': 'item_guid'}}}], + size=_COMPOSITE_CHUNK_SIZE, + ) + return _iter_composite_bucket_keys(_search, 'agg_osfid', 'osfid', after=after_osfid) + + +def _each_preprintview_osfid(until_when, after_osfid=None) -> collections.abc.Iterator[str]: + _search = ( + PreprintView.search() + .filter('range', timestamp={'lt': until_when}) + .extra(size=0) # only aggregations, no hits + ) + _search.aggs.bucket( + 'agg_osfid', + 'composite', + sources=[{'osfid': {'terms': {'field': 'preprint_id'}}}], + size=_COMPOSITE_CHUNK_SIZE, + ) + return _iter_composite_bucket_keys(_search, 'agg_osfid', 'osfid', after=after_osfid) + + +def _each_preprintdownload_osfid(until_when, after_osfid=None) -> collections.abc.Iterator[str]: + _search = ( + PreprintDownload.search() + .filter('range', timestamp={'lt': until_when}) + .extra(size=0) # only aggregations, no hits + ) + _search.aggs.bucket( + 'agg_osfid', + 'composite', + sources=[{'osfid': {'terms': {'field': 'preprint_id'}}}], + size=_COMPOSITE_CHUNK_SIZE, + ) + return _iter_composite_bucket_keys(_search, 'agg_osfid', 'osfid', after=after_osfid) + + ### # the command itself - class Command(BaseCommand): def add_arguments(self, parser): parser.add_argument( @@ -750,14 +846,17 @@ def _handle_usage_reports(self, *, start: bool, no_counts: bool): _es8_item_count, style=self._eq_style(_es8_item_count, _es6_item_count), ) - # (if --start) schedule task per item (by composite agg on es6 public usage reports) + # (if --start) schedule task per item (by composite agg on es6 usage reports and events) # each item-task iter thru reports oldest to newest, adding cumulative counts if start: self.stdout.write( f'starting per-item {es6_reports.PublicItemUsageReport.__name__} => {es8_metrics.MonthlyPublicItemUsageReportEs8.__name__}' ) - for _osfid in _each_usage_report_osfid( - until_when=self._migration_started_at + for _osfid in _zip_sorted( + _each_usage_report_osfid(until_when=self._migration_started_at), + _each_countedusage_osfid(until_when=self._migration_started_at), + _each_preprintview_osfid(until_when=self._migration_started_at), + _each_preprintdownload_osfid(until_when=self._migration_started_at), ): migrate_usage_reports.delay( _osfid, self._migration_started_at.isoformat() diff --git a/osf/metrics/es8_metrics.py b/osf/metrics/es8_metrics.py index efa37547396..be68883a648 100644 --- a/osf/metrics/es8_metrics.py +++ b/osf/metrics/es8_metrics.py @@ -7,12 +7,13 @@ from elasticsearch_metrics import DAILY, MONTHLY, YEARLY import elasticsearch_metrics.imps.elastic8 as djelme -from osf.metadata.osfmap_utils import ( - osfmap_type, - osfid_from_iri, -) +from osf.metadata.osfmap_utils import osfid_from_iri from osf.metrics.counted_usage import _get_surrounding_guids -from osf.metrics.utils import YearMonth +from osf.metrics.utils import ( + YearMonth, + get_database_iri, + get_item_type, +) from osf import models as osfdb from osf.models.base import osfid_iri from website import settings as website_settings @@ -208,7 +209,7 @@ def _autofill_item_public(self): def _autofill_item_type(self): if self.item_osfid and not self.item_type: - self.item_type = osfmap_type(self._osfid_referent) + self.item_type = get_item_type(self._osfid_referent) def _autofill_provider_id(self): if self.item_osfid and not self.provider_id: @@ -245,14 +246,7 @@ def _autofill_pageview(self): def _autofill_database_iri(self): if self.item_osfid and not self.database_iri: - _provider = getattr(self._osfid_referent, 'provider', None) - if not _provider: - self.database_iri = website_settings.DOMAIN - elif isinstance(_provider, str): - # file providers are a different thing that don't really have an iri, just an id - self.database_iri = f'urn:files.osf.io:{self.provider_id}' - else: - self.database_iri = _provider.get_semantic_iri() + self.database_iri = get_database_iri(self._osfid_referent) def _clean_action_labels(self): if self.action_labels: diff --git a/osf/metrics/reporters/public_item_usage.py b/osf/metrics/reporters/public_item_usage.py index c32d318126c..ce33e24e87d 100644 --- a/osf/metrics/reporters/public_item_usage.py +++ b/osf/metrics/reporters/public_item_usage.py @@ -13,7 +13,6 @@ from osf.metadata.osf_gathering import OsfmapPartition from osf.metrics.counted_usage import ( CountedAuthUsage, - get_item_type, get_provider_id, ) from osf.metrics.preprint_metrics import ( @@ -21,7 +20,12 @@ PreprintView, ) from osf.metrics.reports import PublicItemUsageReport -from osf.metrics.utils import YearMonth, cycle_coverage_yearmonth +from osf.metrics.utils import ( + YearMonth, + cycle_coverage_yearmonth, + get_database_iri, + get_item_type, +) from osf import models as osfdb from osf.models.base import osfid_iri from website import settings as website_settings @@ -160,6 +164,7 @@ def _init_report(self, osf_obj) -> MonthlyPublicItemUsageReportEs8: item_osfids=[osf_obj._id], item_types=[get_item_type(osf_obj)], provider_ids=[get_provider_id(osf_obj)], + database_iris=[get_database_iri(osf_obj)], platform_iris=[website_settings.DOMAIN], # leave counts null; will be set if there's data ) diff --git a/osf/metrics/utils.py b/osf/metrics/utils.py index c5d49f293cf..87b2d48f6fd 100644 --- a/osf/metrics/utils.py +++ b/osf/metrics/utils.py @@ -8,6 +8,12 @@ from elasticsearch_metrics.util.timeparts import format_timeparts +from osf.metadata.osfmap_utils import ( + osfmap_type, + osfmap_type_from_model, +) +from website import settings as website_settings + def cycle_coverage_date(given_date: datetime.date) -> str: """ @@ -43,6 +49,32 @@ def stable_key(*key_parts): return sha256(bytes(plain_key, encoding='utf')).hexdigest() +def get_database_iri(osf_obj) -> str: + _provider = getattr(osf_obj, 'provider', None) + if not _provider: + return website_settings.DOMAIN + elif isinstance(_provider, str): + # file providers are a different thing that don't really have an iri, just an id + return f'urn:files.osf.io:{_provider}' + else: + return _provider.get_semantic_iri() + + +def get_item_type(osf_obj) -> str: + return get_item_type_from_iri(osfmap_type(osf_obj)) + + +def get_item_type_from_model(osf_model_cls, *, is_component: bool) -> str: + return get_item_type_from_iri( + osfmap_type_from_model(osf_model_cls, is_component=is_component), + ) + + +def get_item_type_from_iri(type_iri) -> str: + (_, _, _shortname) = type_iri.rpartition('/') + return _shortname + + @dataclasses.dataclass(frozen=True) class YearMonth: """YearMonth: represents a specific month in a specific year""" diff --git a/osf_tests/metrics/test_es8_metrics.py b/osf_tests/metrics/test_es8_metrics.py index 5bc6e4c4bc4..1560558abe1 100644 --- a/osf_tests/metrics/test_es8_metrics.py +++ b/osf_tests/metrics/test_es8_metrics.py @@ -26,7 +26,7 @@ def test_nested_pageview_autofill(self): item_iri='https://osf.example/itemm', item_osfid='itemm', item_public=True, - item_type='https://osf.example/Preprint', + item_type='Preprint', platform_iri='https://osf.example', user_is_authenticated=False, pageview_info=PageviewInfo( @@ -49,7 +49,7 @@ def test_nested_pageview_autofill_dict(self): item_iri='https://osf.example/itemm', item_osfid='itemm', item_public=True, - item_type='https://osf.example/Preprint', + item_type='Preprint', platform_iri='https://osf.example', user_is_authenticated=False, pageview_info={ @@ -72,7 +72,7 @@ def test_none_pageview_nested_autofill(self): item_iri='https://osf.example/itemm', item_osfid='itemm', item_public=True, - item_type='https://osf.example/Preprint', + item_type='Preprint', platform_iri='https://osf.example', user_is_authenticated=False, ) From 211fc0e2abdb977ce81a5b461bd5906c57926862 Mon Sep 17 00:00:00 2001 From: abram axel booth Date: Fri, 15 May 2026 12:15:28 -0400 Subject: [PATCH 2/2] remove setup from migrate_osfmetrics_6to8 --- admin/management/views.py | 1 - osf/management/commands/migrate_osfmetrics_6to8.py | 9 +-------- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/admin/management/views.py b/admin/management/views.py index d5415a87ad8..04034bfaa08 100644 --- a/admin/management/views.py +++ b/admin/management/views.py @@ -210,7 +210,6 @@ def post(self, request): class MigrateOsfmetrics6to8(ManagementCommandPermissionView): def post(self, request): _command_kwargs = { - 'no_setup': True, 'no_color': True, 'no_counts': request.POST.get('no_counts'), 'clear_state': request.POST.get('clear_state'), diff --git a/osf/management/commands/migrate_osfmetrics_6to8.py b/osf/management/commands/migrate_osfmetrics_6to8.py index ed67ed18ba7..b1081d55ddf 100644 --- a/osf/management/commands/migrate_osfmetrics_6to8.py +++ b/osf/management/commands/migrate_osfmetrics_6to8.py @@ -4,7 +4,6 @@ import logging from django.apps import apps -from django.core.management import call_command from django.core.management.base import BaseCommand from django.db import OperationalError as DjangoOperationalError from elasticsearch6.exceptions import ConnectionError as Elastic6ConnectionError @@ -438,6 +437,7 @@ def _convert_public_usage_report( def _backfill_old_usage_report(osf_obj, is_component: bool, until_when: str): + # add a "last month" report with cumulative counts up to that point _last_month = YearMonth.from_date(datetime.datetime.fromisoformat(until_when)).prior() _c_views, _c_view_sess, _c_downloads, _c_download_sess = _get_cumulative_usage( osfid=osf_obj._id, @@ -680,10 +680,6 @@ def _each_preprintdownload_osfid(until_when, after_osfid=None) -> collections.ab class Command(BaseCommand): def add_arguments(self, parser): - parser.add_argument( - '--no-setup', - action='store_true', - ) parser.add_argument( '--no-counts', action='store_true', @@ -720,7 +716,6 @@ def _migration_started_at(self): def handle( self, *, - no_setup, no_counts, clear_state, clear_es8_data, @@ -731,8 +726,6 @@ def handle( **kwargs, ): self._quiet_chatty_loggers() - if not no_setup: - call_command('djelme_backend_setup') if clear_state: self._clear_state() if clear_es8_data: