22import datetime
33import functools
44import logging
5- from pprint import pprint
65
76from django .core .management import call_command
87from django .core .management .base import BaseCommand
1110from elasticsearch6 import helpers as es6_helpers
1211from elasticsearch6_dsl .connections import connections as es6_connections
1312from elasticsearch8 .exceptions import ConnectionError as Elastic8ConnectionError
14- from elasticsearch8 .dsl .connections import connections as es8_connections
1513from elasticsearch_metrics .registry import djelme_registry
1614from elasticsearch_metrics .imps import elastic8 as djel8me
1715from psycopg2 import OperationalError as PostgresOperationalError
7371
7472
7573@celery_app .task (** _TASK_KWARGS )
76- def migrate_unchanged_recordtype (es6_recordtype_name : str ):
74+ def migrate_unchanged_recordtype (es6_recordtype_name : str , until_when : str ):
7775 _es6_recordtype = djelme_registry .get_recordtype ("osf" , es6_recordtype_name )
7876 _es8_recordtype = _UNCHANGED_RECORDTYPES [_es6_recordtype ]
79- _assert_field_unchangedness (_es6_recordtype , _es8_recordtype )
8077 _convert_kwargs = (
8178 _convert_unchanged_cyclicrecord_kwargs
8279 if issubclass (_es8_recordtype , djel8me .CyclicRecord )
8380 else (lambda _kw : _kw ) # no conversion needed for event record
8481 )
8582 _each_new = (
8683 _es8_recordtype (** _convert_kwargs (_hit ["_source" ]))
87- for _hit in _es6_scan_all (_es6_recordtype )
84+ for _hit in _es6_scan_range (_es6_recordtype , until_when = until_when )
8885 )
8986 return _es8_bulk_save (_es8_recordtype , _each_new )
9087
@@ -96,8 +93,8 @@ def migrate_counted_usages(from_when: str, until_when: str):
9693 _convert_counted_usage (_hit ["_source" ])
9794 for _hit in _es6_scan_range (
9895 CountedUsageEs6 ,
99- from_when ,
100- until_when ,
96+ from_when = from_when ,
97+ until_when = until_when ,
10198 addl_filter = {"exists" : {"field" : "item_guid" }},
10299 )
103100 )
@@ -110,7 +107,9 @@ def migrate_preprint_views(from_when: str, until_when: str):
110107 _action_labels = ["view" , "web" ]
111108 _each_new = (
112109 _convert_preprint_metric (_hit ["_source" ], _action_labels )
113- for _hit in _es6_scan_range (PreprintView , from_when , until_when )
110+ for _hit in _es6_scan_range (
111+ PreprintView , from_when = from_when , until_when = until_when
112+ )
114113 )
115114 return _es8_bulk_save (es8_metrics .OsfCountedUsageRecord , _each_new )
116115
@@ -121,23 +120,24 @@ def migrate_preprint_downloads(from_when: str, until_when: str):
121120 _action_labels = ["download" ]
122121 _each_new = (
123122 _convert_preprint_metric (_hit ["_source" ], _action_labels )
124- for _hit in _es6_scan_range (PreprintDownload , from_when , until_when )
123+ for _hit in _es6_scan_range (
124+ PreprintDownload , from_when = from_when , until_when = until_when
125+ )
125126 )
126127 return _es8_bulk_save (es8_metrics .OsfCountedUsageRecord , _each_new )
127128
128129
129130@celery_app .task (** _TASK_KWARGS )
130- def migrate_usage_reports (osfid : str ):
131+ def migrate_usage_reports (osfid : str , until_when : str ):
131132 # from PublicItemUsageReport to PublicItemUsageReportEs8
132133 def _each_new ():
133134 # go in sorted order to build cumulative counts
134135 # (only a few dozen of these per item; should be fine to sort and load all at once)
135- _each_hit = _es6_scan_all (
136+ _each_hit = _es6_scan_range (
136137 es6_reports .PublicItemUsageReport ,
137- query_body = {
138- "query" : {"term" : {"item_osfid" : osfid }},
139- "sort" : "report_yearmonth" ,
140- },
138+ until_when = until_when ,
139+ addl_filter = {"term" : {"item_osfid" : osfid }},
140+ sort = "report_yearmonth" ,
141141 )
142142 _prior_report = None
143143 for _hit in list (_each_hit ):
@@ -158,31 +158,6 @@ def _es6_connection():
158158 return es6_connections .get_connection ("osfmetrics_es6" )
159159
160160
161- def _es8_connection ():
162- return es8_connections .get_connection ("osfmetrics_es8" )
163-
164-
165- def _delete_all (recordtype ):
166- # TODO: REMOVE THIS
167- recordtype .search ().query ({"match_all" : {}}).delete ()
168- recordtype .refresh ()
169-
170-
171- def _delete_all_es8 ():
172- # TODO: REMOVE THIS
173- for _es8_recordtype in _UNCHANGED_RECORDTYPES .values ():
174- _delete_all (_es8_recordtype )
175- _delete_all (es8_metrics .PublicItemUsageReportEs8 )
176- _delete_all (es8_metrics .OsfCountedUsageRecord )
177-
178-
179- def _debug_migrate (each_new ):
180- # TODO: remove this
181- for _each in each_new :
182- _each .full_clean ()
183- pprint (_each .to_dict (include_meta = True ))
184-
185-
186161def _es8_bulk_save (es8_recordtype , each_new_record ):
187162 _success_count , _fail_count = es8_recordtype .bulk (
188163 each_new_record ,
@@ -203,24 +178,29 @@ def _date_range(
203178 (_from_date , _until_date ) = (_until_date , _until_date + step )
204179
205180
206- def _es6_scan_all (es6_recordtype , query_body = None ):
207- return es6_helpers .scan (
208- _es6_connection (),
209- index = es6_recordtype ._template_pattern ,
210- query = query_body ,
211- )
212-
213-
214- def _es6_scan_range (es6_recordtype , from_when : str , until_when : str , addl_filter = None ):
181+ def _es6_scan_range (
182+ es6_recordtype ,
183+ * ,
184+ from_when : str = "" ,
185+ until_when : str ,
186+ addl_filter = None ,
187+ sort = None ,
188+ ):
189+ _timestamp_range = {"lt" : until_when }
190+ if from_when :
191+ _timestamp_range ["gte" ] = from_when
215192 _filters = [
216- {"range" : {"timestamp" : { "gte" : from_when , "lt" : until_when } }},
193+ {"range" : {"timestamp" : _timestamp_range }},
217194 ]
218195 if addl_filter :
219196 _filters .append (addl_filter )
197+ _query_body = {"query" : {"bool" : {"filter" : _filters }}}
198+ if sort :
199+ _query_body ["sort" ] = sort
220200 return es6_helpers .scan (
221201 _es6_connection (),
222202 index = es6_recordtype ._template_pattern ,
223- query = { "query" : { "bool" : { "filter" : _filters }}} ,
203+ query = _query_body ,
224204 )
225205
226206
@@ -332,7 +312,9 @@ def _convert_counted_usage(source: dict) -> es8_metrics.OsfCountedUsageRecord:
332312 timestamp = source ["timestamp" ],
333313 sessionhour_id = source ["session_id" ],
334314 platform_iri = source .get ("platform_iri" ) or website_settings .DOMAIN ,
335- database_iri = _convert_database_iri (source .get ("provider_id" ), source .get ("item_type" )),
315+ database_iri = _convert_database_iri (
316+ source .get ("provider_id" ), source .get ("item_type" )
317+ ),
336318 item_iri = _item_iri ,
337319 within_iris = [
338320 _iri_from_osfid (_within_osfid )
@@ -345,7 +327,6 @@ def _convert_counted_usage(source: dict) -> es8_metrics.OsfCountedUsageRecord:
345327 provider_id = source .get ("provider_id" ),
346328 user_is_authenticated = source .get ("user_is_authenticated" ),
347329 action_labels = source .get ("action_labels" ),
348- # TODO: does this need the PageviewInfo object or is the dictionary fine?
349330 pageview_info = source .get ("pageview_info" ),
350331 )
351332
@@ -550,10 +531,10 @@ def _fallback_iri():
550531 return _fallback_iri ()
551532
552533
553- def _each_usage_report_osfid (started_at , after_osfid = None ):
534+ def _each_usage_report_osfid (until_when , after_osfid = None ):
554535 _search = (
555536 es6_reports .PublicItemUsageReport .search ()
556- .filter ("range" , timestamp = {"lt" : started_at })
537+ .filter ("range" , timestamp = {"lt" : until_when })
557538 .extra (size = 0 )
558539 )
559540 _search .aggs .bucket (
@@ -600,6 +581,10 @@ def add_arguments(self, parser):
600581 action = "store_true" ,
601582 )
602583
584+ @functools .cached_property
585+ def _migration_started_at (self ):
586+ return es8_metrics .Elastic6To8State .get_started_at ()
587+
603588 def handle (
604589 self ,
605590 * ,
@@ -625,10 +610,13 @@ def handle(
625610 self ._handle_usage_events (start = start , no_counts = no_counts )
626611 if usage_reports or _default_all :
627612 self ._handle_usage_reports (start = start , no_counts = no_counts )
613+ if not no_counts :
614+ self .stdout .write ("(counts may be approximate)" )
628615
629616 def _handle_unchanged (self , * , start : bool , no_counts : bool ):
630617 # for each (unchanged) report/event:
631618 for _es6_cls , _es8_cls in _UNCHANGED_RECORDTYPES .items ():
619+ _assert_field_unchangedness (_es6_cls , _es8_cls )
632620 if not no_counts :
633621 # display counts
634622 _es6_count = _es6_cls .search ().count ()
@@ -641,34 +629,51 @@ def _handle_unchanged(self, *, start: bool, no_counts: bool):
641629 style = self ._eq_style (_es8_count , _es6_count ),
642630 )
643631 if start : # schedule task
644- self .stdout .write (f"starting { _es6_cls .__name__ } => { _es8_cls .__name__ } " )
645- migrate_unchanged_recordtype .delay (_es6_cls .__name__ )
632+ self .stdout .write (
633+ f"starting { _es6_cls .__name__ } => { _es8_cls .__name__ } "
634+ )
635+ migrate_unchanged_recordtype .delay (
636+ _es6_cls .__name__ , self ._migration_started_at .isoformat ()
637+ )
646638
647639 def _handle_usage_events (self , * , start : bool , no_counts : bool ):
648640 # for counted-usage events:
649- _started = self ._migration_started_at
641+ _started = self ._migration_started_at or datetime . datetime . now ()
650642 _range_start = (_started - datetime .timedelta (days = _USAGE_DAYS_BACK )).date ()
651643 _range_end = _started .date () + datetime .timedelta (days = 1 )
652644 if not no_counts :
653645 # display counts for each view/download event type
654- _range_q = {"range" : {"timestamp" : {"gte" : _range_start .isoformat (), "lt" : _range_end .isoformat ()}}}
646+ _range_q = {
647+ "range" : {
648+ "timestamp" : {
649+ "gte" : _range_start .isoformat (),
650+ "lt" : _range_end .isoformat (),
651+ }
652+ }
653+ }
655654 _es6_pview_count = PreprintView .search ().filter (_range_q ).count ()
656655 _es6_pdownload_count = PreprintDownload .search ().filter (_range_q ).count ()
657656 _es6_usage_event_count = CountedUsageEs6 .search ().filter (_range_q ).count ()
658- _es6_count = _es6_pview_count + _es6_pdownload_count + _es6_usage_event_count
657+ _es6_count = (
658+ _es6_pview_count + _es6_pdownload_count + _es6_usage_event_count
659+ )
659660 _es8_count = es8_metrics .OsfCountedUsageRecord .search ().count ()
660661 self ._write_tabbed ("es6" , PreprintView , _es6_pview_count )
661662 self ._write_tabbed ("es6" , PreprintDownload , _es6_pdownload_count )
662663 self ._write_tabbed ("es6" , CountedUsageEs6 , _es6_usage_event_count )
663- self ._write_tabbed ("es6" , f"(total between { _range_start } and { _range_end } )" , _es6_count )
664+ self ._write_tabbed (
665+ "es6" , f"(total between { _range_start } and { _range_end } )" , _es6_count
666+ )
664667 self ._write_tabbed (
665668 "es8" ,
666669 es8_metrics .OsfCountedUsageRecord ,
667670 _es8_count ,
668671 style = self ._eq_style (_es8_count , _es6_count ),
669672 )
670673 if start : # schedule (per-day?) tasks (if --start)
671- self .stdout .write (f"starting usages => { es8_metrics .OsfCountedUsageRecord .__name__ } " )
674+ self .stdout .write (
675+ f"starting usages => { es8_metrics .OsfCountedUsageRecord .__name__ } "
676+ )
672677 for _from_date , _until_date in _date_range (_range_start , _range_end ):
673678 _from_str = _from_date .isoformat ()
674679 _until_str = _until_date .isoformat ()
@@ -689,7 +694,10 @@ def _handle_usage_reports(self, *, start: bool, no_counts: bool):
689694 style = self ._eq_style (_es8_count , _es6_count ),
690695 )
691696 self ._write_tabbed (
692- "es6" , es6_reports .PublicItemUsageReport , "osfid count:" , _es6_item_count
697+ "es6" ,
698+ es6_reports .PublicItemUsageReport ,
699+ "osfid count:" ,
700+ _es6_item_count ,
693701 )
694702 self ._write_tabbed (
695703 "es8" ,
@@ -705,13 +713,11 @@ def _handle_usage_reports(self, *, start: bool, no_counts: bool):
705713 f"starting per-item { es6_reports .PublicItemUsageReport .__name__ } => { es8_metrics .PublicItemUsageReportEs8 .__name__ } "
706714 )
707715 for _osfid in _each_usage_report_osfid (
708- started_at = self ._migration_started_at
716+ until_when = self ._migration_started_at
709717 ):
710- migrate_usage_reports .delay (_osfid )
711-
712- @functools .cached_property
713- def _migration_started_at (self ):
714- return es8_metrics .Elastic6To8State .get_started_at ()
718+ migrate_usage_reports .delay (
719+ _osfid , self ._migration_started_at .isoformat ()
720+ )
715721
716722 def _check_started_at (self , start_now ):
717723 _started_at = self ._migration_started_at
@@ -736,9 +742,6 @@ def _clear_state(self):
736742 )
737743 es8_metrics .Elastic6To8State .search ().query ({"match_all" : {}}).delete ()
738744 es8_metrics .Elastic6To8State .refresh ()
739- # TODO: REMOVE THIS
740- self .stdout .write ("deleting all migration target data in es8" , self .style .ERROR )
741- _delete_all_es8 ()
742745
743746 def _eq_style (self , num : int , should_be : int ):
744747 return self .style .SUCCESS if (num == should_be ) else self .style .WARNING
0 commit comments