6565 Elastic8ConnectionError ,
6666 PostgresOperationalError ,
6767 ),
68- max_retries = 50 ,
69- retry_backoff = True ,
68+ retry_backoff = True , # exponential backoff, with jitter
69+ max_retries = 20 ,
7070)
7171
7272###
@@ -87,8 +87,8 @@ def migrate_unchanged_recordtype(es6_recordtype_name: str):
8787 _es8_recordtype (** _convert_kwargs (_hit ["_source" ]))
8888 for _hit in _es6_scan_all (_es6_recordtype )
8989 )
90- # _debug_migrate(_each_new)
91- return _es8_bulk_save (_es8_recordtype , _each_new )
90+ _debug_migrate (_each_new )
91+ # return _es8_bulk_save(_es8_recordtype, _each_new)
9292
9393
9494@celery_app .task (** _TASK_KWARGS )
@@ -103,8 +103,8 @@ def migrate_counted_usages(from_when: str, until_when: str):
103103 addl_filter = {"exists" : {"field" : "item_guid" }},
104104 )
105105 )
106- # _debug_migrate(_each_new)
107- return _es8_bulk_save (es8_metrics .OsfCountedUsageRecord , _each_new )
106+ _debug_migrate (_each_new )
107+ # return _es8_bulk_save(es8_metrics.OsfCountedUsageRecord, _each_new)
108108
109109
110110@celery_app .task (** _TASK_KWARGS )
@@ -115,8 +115,8 @@ def migrate_preprint_views(from_when: str, until_when: str):
115115 _convert_preprint_metric (_hit ["_source" ], _action_labels )
116116 for _hit in _es6_scan_range (PreprintView , from_when , until_when )
117117 )
118- # _debug_migrate(_each_new)
119- return _es8_bulk_save (es8_metrics .OsfCountedUsageRecord , _each_new )
118+ _debug_migrate (_each_new )
119+ # return _es8_bulk_save(es8_metrics.OsfCountedUsageRecord, _each_new)
120120
121121
122122@celery_app .task (** _TASK_KWARGS )
@@ -127,23 +127,32 @@ def migrate_preprint_downloads(from_when: str, until_when: str):
127127 _convert_preprint_metric (_hit ["_source" ], _action_labels )
128128 for _hit in _es6_scan_range (PreprintDownload , from_when , until_when )
129129 )
130- # _debug_migrate(_each_new)
131- return _es8_bulk_save (es8_metrics .OsfCountedUsageRecord , _each_new )
130+ _debug_migrate (_each_new )
131+ # return _es8_bulk_save(es8_metrics.OsfCountedUsageRecord, _each_new)
132132
133133
134134@celery_app .task (** _TASK_KWARGS )
135135def migrate_usage_reports (osfid : str ):
136136 # from PublicItemUsageReport to PublicItemUsageReportEs8
137137 # add cumulative count
138138 def _each_new ():
139- for _hit in _es6_scan_all (
139+ _each_hit = _es6_scan_all (
140140 es6_reports .PublicItemUsageReport ,
141141 query_body = {"query" : {"term" : {"item_osfid" : osfid }}},
142- ):
143- yield _convert_public_usage_report (_hit ["_source" ])
142+ )
143+ # only a few dozen of these per item; fine to hold all at once
144+ _sorted_sources = sorted (
145+ (_hit ["_source" ] for _hit in _each_hit ),
146+ key = lambda _s : _s ["report_yearmonth" ],
147+ )
148+ _prior_report = None
149+ for _source in _sorted_sources :
150+ yield (
151+ _prior_report := _convert_public_usage_report (_source , _prior_report )
152+ )
144153
145- # _debug_migrate(_each_new)
146- return _es8_bulk_save (es8_metrics .PublicItemUsageReportEs8 , _each_new )
154+ _debug_migrate (_each_new () )
155+ # return _es8_bulk_save(es8_metrics.PublicItemUsageReportEs8, _each_new)
147156
148157
149158###
@@ -175,6 +184,7 @@ def _delete_all_es8():
175184def _debug_migrate (each_new ):
176185 # TODO: remove this
177186 for _each in each_new :
187+ _each .full_clean ()
178188 pprint (_each .to_dict (include_meta = True ))
179189
180190
@@ -320,75 +330,89 @@ def _each_kwarg():
320330 return dict (_each_kwarg ())
321331
322332
323- def _convert_counted_usage (source_dict ) -> es8_metrics .OsfCountedUsageRecord :
324- _item_iri = _iri_from_osfid (source_dict ["item_guid" ])
325- _item_type = _convert_item_type (source_dict )
333+ def _convert_counted_usage (source : dict ) -> es8_metrics .OsfCountedUsageRecord :
334+ _item_iri = _iri_from_osfid (source ["item_guid" ])
335+ _item_type = _convert_item_type (source )
326336 return es8_metrics .OsfCountedUsageRecord (
327337 # fields from djelme.CountedUsageRecord:
328- timestamp = source_dict ["timestamp" ],
329- sessionhour_id = source_dict ["session_id" ],
330- platform_iri = source_dict [ "platform_iri" ] ,
331- database_iri = _convert_database_iri (source_dict .get ("provider_id" ), _item_type ),
338+ timestamp = source ["timestamp" ],
339+ sessionhour_id = source ["session_id" ],
340+ platform_iri = source . get ( "platform_iri" ) or website_settings . DOMAIN ,
341+ database_iri = _convert_database_iri (source .get ("provider_id" ), _item_type ),
332342 item_iri = _item_iri ,
333343 within_iris = [
334344 _iri_from_osfid (_within_osfid )
335- for _within_osfid in source_dict .get ("surrounding_guids" , ())
345+ for _within_osfid in source .get ("surrounding_guids" , ())
336346 ],
337347 # fields from OsfCountedUsageRecord:
338- item_osfid = source_dict ["item_guid" ],
348+ item_osfid = source ["item_guid" ],
339349 item_type = _item_type ,
340- item_public = source_dict ["item_public" ],
341- provider_id = source_dict .get ("provider_id" ),
342- user_is_authenticated = source_dict ["user_is_authenticated" ],
343- action_labels = source_dict ["action_labels" ],
350+ item_public = source ["item_public" ],
351+ provider_id = source .get ("provider_id" ),
352+ user_is_authenticated = source ["user_is_authenticated" ],
353+ action_labels = source ["action_labels" ],
344354 # TODO: does this need the PageviewInfo object or is the dictionary fine?
345- pageview_info = source_dict .get ("pageview_info" ),
355+ pageview_info = source .get ("pageview_info" ),
346356 )
347357
348358
349359def _convert_preprint_metric (
350- source_dict , action_labels : list [str ]
360+ source : dict , action_labels : list [str ]
351361) -> es8_metrics .OsfCountedUsageRecord :
352- _preprint_iri = _iri_from_osfid (source_dict ["preprint_id" ])
362+ _preprint_iri = _iri_from_osfid (source ["preprint_id" ])
353363 return es8_metrics .OsfCountedUsageRecord .record (
354364 using = False , # don't save yet; will save in bulk
355365 # fields used to compute a sessionhour_id:
356- timestamp = source_dict ["timestamp" ],
357- user_id = source_dict .get ("user_id" ),
366+ timestamp = source ["timestamp" ],
367+ user_id = source .get ("user_id" ),
358368 # fields from djelme.CountedUsageRecord:
359369 platform_iri = website_settings .DOMAIN ,
360- database_iri = _convert_database_iri (
361- source_dict .get ("provider_id" ), OSF .Preprint
362- ),
370+ database_iri = _convert_database_iri (source .get ("provider_id" ), OSF .Preprint ),
363371 item_iri = _preprint_iri ,
364372 within_iris = [_preprint_iri ],
365373 # fields from OsfCountedUsageRecord:
366- item_osfid = source_dict ["preprint_id" ],
374+ item_osfid = source ["preprint_id" ],
367375 item_type = OSF .Preprint ,
368376 item_public = True ,
369- provider_id = source_dict .get ("provider_id" ),
370- user_is_authenticated = bool (source_dict .get ("user_id" )),
377+ provider_id = source .get ("provider_id" ),
378+ user_is_authenticated = bool (source .get ("user_id" )),
371379 action_labels = action_labels ,
372380 )
373381
374382
375- def _convert_public_usage_report (source_dict ) -> es8_metrics .PublicItemUsageReportEs8 :
376- _c_views , _c_view_sess , _c_downloads , _c_download_sess = _get_cumulative_usage (
377- osfid = source_dict ["item_osfid" ],
378- until_when = YearMonth .from_str (source_dict ["report_yearmonth" ]).month_end (),
379- item_type = source_dict .get ("item_type" ),
380- )
383+ def _convert_public_usage_report (
384+ source : dict ,
385+ prior_report : es8_metrics .PublicItemUsageReportEs8 | None ,
386+ ) -> es8_metrics .PublicItemUsageReportEs8 :
387+ if prior_report is None :
388+ _c_views , _c_view_sess , _c_downloads , _c_download_sess = _get_cumulative_usage (
389+ osfid = source ["item_osfid" ],
390+ until_when = YearMonth .from_str (source ["report_yearmonth" ]).month_end (),
391+ item_type = source .get ("item_type" ),
392+ )
393+ else :
394+ _c_views = prior_report .cumulative_view_count + source .get ("view_count" , 0 )
395+ _c_view_sess = prior_report .cumulative_view_session_count + source .get (
396+ "view_session_count" , 0
397+ )
398+ _c_downloads = prior_report .cumulative_download_count + source .get (
399+ "download_count" , 0
400+ )
401+ _c_download_sess = prior_report .cumulative_download_session_count + source .get (
402+ "download_session_count" , 0
403+ )
381404 return es8_metrics .PublicItemUsageReportEs8 (
382- item_osfid = source_dict ["item_osfid" ],
383- item_type = source_dict .get ("item_type" ),
384- provider_id = source_dict .get ("provider_id" ),
385- platform_iri = source_dict .get ("platform_iri" ),
386- view_count = source_dict .get ("view_count" ),
387- view_session_count = source_dict .get ("view_session_count" ),
405+ cycle_coverage = _semverish_from_yearmonth (source ['report_yearmonth' ]),
406+ item_osfid = source ["item_osfid" ],
407+ item_type = source .get ("item_type" ),
408+ provider_id = source .get ("provider_id" ),
409+ platform_iri = source .get ("platform_iri" ) or website_settings .DOMAIN ,
410+ view_count = source .get ("view_count" ),
411+ view_session_count = source .get ("view_session_count" ),
388412 cumulative_view_count = _c_views ,
389413 cumulative_view_session_count = _c_view_sess ,
390- download_count = source_dict .get ("download_count" ),
391- download_session_count = source_dict .get ("download_session_count" ),
414+ download_count = source .get ("download_count" ),
415+ download_session_count = source .get ("download_session_count" ),
392416 cumulative_download_count = _c_downloads ,
393417 cumulative_download_session_count = _c_download_sess ,
394418 )
@@ -407,9 +431,7 @@ def _get_cumulative_usage(osfid: str, until_when, item_type: str | None):
407431 return (_views , _view_sess , _downloads , _download_sess )
408432
409433
410- def _cumulative_countedusage_views (
411- osfid : str , until_when : str
412- ) -> tuple [int , int ]:
434+ def _cumulative_countedusage_views (osfid : str , until_when : str ) -> tuple [int , int ]:
413435 """compute view_session_count separately to avoid double-counting
414436
415437 (the same session may be represented in both the composite agg on `item_guid`
@@ -651,7 +673,8 @@ def _handle_unchanged(self, *, start: bool):
651673 )
652674 if start : # schedule task
653675 self ._write_tabbed ("starting" , _es6_cls , "=>" , _es8_cls )
654- migrate_unchanged_recordtype .delay (_es6_cls .__name__ )
676+ #migrate_unchanged_recordtype.delay(_es6_cls.__name__)
677+ migrate_unchanged_recordtype (_es6_cls .__name__ )
655678 self .stdout .write ("---" )
656679
657680 def _handle_usage_events (self , * , start : bool ):
@@ -681,9 +704,12 @@ def _handle_usage_events(self, *, start: bool):
681704 for _from_date , _until_date in _date_range (_range_start , _range_end ):
682705 _from_str = _from_date .isoformat ()
683706 _until_str = _until_date .isoformat ()
684- migrate_counted_usages .delay (_from_str , _until_str )
685- migrate_preprint_views .delay (_from_str , _until_str )
686- migrate_preprint_downloads .delay (_from_str , _until_str )
707+ # migrate_counted_usages.delay(_from_str, _until_str)
708+ # migrate_preprint_views.delay(_from_str, _until_str)
709+ # migrate_preprint_downloads.delay(_from_str, _until_str)
710+ migrate_counted_usages (_from_str , _until_str )
711+ migrate_preprint_views (_from_str , _until_str )
712+ migrate_preprint_downloads (_from_str , _until_str )
687713 self .stdout .write ("---" )
688714
689715 def _handle_usage_reports (self , * , start : bool ):
@@ -709,15 +735,15 @@ def _handle_usage_reports(self, *, start: bool):
709735 )
710736 # (if --start) schedule task per item (by composite agg on es6 public usage reports)
711737 # each item-task iter thru reports oldest to newest, adding cumulative counts
712- if start : # schedule per-item tasks
738+ if start :
713739 self .stdout .write (
714740 f"starting per-item { es6_reports .PublicItemUsageReport } => { es8_metrics .PublicItemUsageReportEs8 } "
715741 )
716742 for _osfid in _each_usage_report_osfid (
717743 started_at = self ._migration_started_at
718744 ):
719745 migrate_usage_reports (_osfid )
720- # TODO: migrate_usage_reports.apply_async (...)
746+ # TODO: migrate_usage_reports.delay (...)
721747 self .stdout .write ("---" )
722748
723749 @functools .cached_property
0 commit comments