Skip to content

Commit 8b9ac0f

Browse files
author
Thinh Nguyen
committed
remove JobConfigTable and register_key_source
1 parent ef3adc2 commit 8b9ac0f

3 files changed

Lines changed: 2 additions & 171 deletions

File tree

datajoint/autopopulate.py

Lines changed: 2 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import inspect
77
from tqdm import tqdm
88
from .hash import key_hash
9-
from .expression import QueryExpression, AndList, _SQLExpression
9+
from .expression import QueryExpression, AndList
1010
from .errors import DataJointError, LostConnectionError
1111
from .settings import config
1212
from .utils import user_choice, to_camel_case
@@ -90,18 +90,6 @@ def _rename_attributes(table, props):
9090
for q in parents[1:]:
9191
self._key_source *= _rename_attributes(*q)
9292

93-
jobs_config = self.connection.schemas[self.target.database].jobs_config
94-
key_source_query = jobs_config & {"table_name": self.target.table_name}
95-
if key_source_query:
96-
key_source_uuid = key_hash({"sql": self._key_source.make_sql()})
97-
if not (key_source_query & {"key_source_uuid": key_source_uuid}):
98-
# different key_source stored in jobs_config
99-
return _SQLExpression(
100-
self._key_source.proj(), key_source_query.fetch1("key_source")
101-
)
102-
else:
103-
self.register_key_source(self._key_source.proj().make_sql())
104-
10593
return self._key_source
10694

10795
def make(self, key):
@@ -146,7 +134,7 @@ def _jobs_to_do(self, restrictions):
146134
if inspect.isclass(todo) and issubclass(todo, QueryExpression):
147135
todo = todo()
148136

149-
if not isinstance(todo, (QueryExpression, _SQLExpression)):
137+
if not isinstance(todo, QueryExpression):
150138
raise DataJointError("Invalid key_source value")
151139

152140
try:
@@ -403,66 +391,12 @@ def _Jobs(self):
403391
def jobs(self):
404392
return self._Jobs & {"table_name": self.target.table_name}
405393

406-
def register_key_source(self, sql=None, safemode=None):
407-
key_source_sql = sql or self.key_source.proj().make_sql()
408-
key_source_uuid = key_hash({"sql": key_source_sql})
409-
410-
jobs_config = self.connection.schemas[self.target.database].jobs_config
411-
412-
entry = {
413-
"table_name": self.target.table_name,
414-
"key_source": key_source_sql,
415-
"key_source_uuid": key_source_uuid,
416-
"last_refresh_time": datetime.datetime.utcnow(),
417-
}
418-
419-
if jobs_config & {"table_name": self.target.table_name}:
420-
if jobs_config & {
421-
"table_name": self.target.table_name,
422-
"key_source_uuid": key_source_uuid,
423-
}:
424-
return
425-
426-
safemode = config["safemode"] if safemode is None else safemode
427-
if (
428-
not safemode
429-
or user_choice(
430-
f"Modified key_source for table `{to_camel_case(self.target.table_name)}`, Re-register?",
431-
default="no",
432-
)
433-
== "yes"
434-
):
435-
jobs_config.insert1(entry, replace=True)
436-
else:
437-
jobs_config.insert1(entry)
438-
439394
def schedule_jobs(self, purge_invalid_jobs=True):
440395
"""
441396
Schedule new jobs for this autopopulate table
442397
:param purge_invalid_jobs: if True, remove invalid entry from the jobs table (potentially expensive operation)
443398
:return:
444399
"""
445-
jobs_config = self.connection.schemas[self.target.database].jobs_config
446-
key_source_query = jobs_config & {"table_name": self.target.table_name}
447-
if not key_source_query:
448-
self.register_key_source()
449-
450-
if key_source_query.fetch1("refresh_reserved"):
451-
# scheduling is ongoing
452-
return
453-
454-
last_refresh_time, refresh_rate = key_source_query.fetch1(
455-
"last_refresh_time", "refresh_rate"
456-
)
457-
new_refresh_time = last_refresh_time + datetime.timedelta(seconds=refresh_rate)
458-
if datetime.datetime.utcnow() < new_refresh_time:
459-
# not yet time to refresh jobs
460-
return
461-
462-
# add new scheduled jobs to JobTable
463-
jobs_config.update1(
464-
{"table_name": self.target.table_name, "refresh_reserved": 1}
465-
)
466400
try:
467401
with self.connection.transaction:
468402
schedule_count = 0
@@ -477,9 +411,6 @@ def schedule_jobs(self, purge_invalid_jobs=True):
477411
finally:
478412
if purge_invalid_jobs:
479413
self.purge_invalid_jobs()
480-
jobs_config.update1(
481-
{"table_name": self.target.table_name, "refresh_reserved": 0}
482-
)
483414

484415
def purge_invalid_jobs(self):
485416
"""

datajoint/expression.py

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -931,61 +931,3 @@ def aggr(self, group, **named_attributes):
931931
)
932932

933933
aggregate = aggr # alias for aggr
934-
935-
936-
class _SQLExpression:
937-
def __init__(self, expression, sql):
938-
self._expression = expression
939-
self.make_sql = lambda: sql
940-
self._expression.cursor = self.cursor
941-
942-
self._select, self._from_where = re.match(
943-
r"SELECT (.*) FROM (.*)", sql
944-
).groups()
945-
946-
assert set(self._expression.heading.names) == set(
947-
v.strip("`") for v in self._select.split(",")
948-
)
949-
950-
def cursor(self, offset=0, limit=None, order_by=None, as_dict=False):
951-
"""
952-
See expression.fetch() for input description.
953-
:return: query cursor
954-
"""
955-
if offset and limit is None:
956-
raise DataJointError("limit is required when offset is set")
957-
sql = self.make_sql()
958-
if order_by is not None:
959-
sql += " ORDER BY " + ", ".join(order_by)
960-
if limit is not None:
961-
sql += " LIMIT %d" % limit + (" OFFSET %d" % offset if offset else "")
962-
logger.debug(sql)
963-
return self._expression.connection.query(sql, as_dict=as_dict)
964-
965-
@property
966-
def heading(self):
967-
return self._expression.heading
968-
969-
@property
970-
def fetch1(self):
971-
return Fetch1(self._expression)
972-
973-
@property
974-
def fetch(self):
975-
return Fetch(self._expression)
976-
977-
def __len__(self):
978-
""":return: number of elements in the result set e.g. ``len(q1)``."""
979-
return self._expression.connection.query(
980-
"SELECT {select_} FROM {from_where}".format(
981-
select_=f"count(DISTINCT {self._select})",
982-
from_where=self._from_where,
983-
)
984-
).fetchone()[0]
985-
986-
def __bool__(self):
987-
"""
988-
:return: True if the result is not empty. Equivalent to len(self) > 0 but often
989-
faster e.g. ``bool(q1)``.
990-
"""
991-
return bool(self._expression.connection.query(self.make_sql()).fetchone()[0])

datajoint/jobs.py

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -233,45 +233,3 @@ def error(self, table_name, key, error_message, error_stack=None):
233233
replace=True,
234234
ignore_extra_fields=True,
235235
)
236-
237-
238-
class JobConfigTable(Table):
239-
"""
240-
A base table with no definition. Allows job configuration for imported/computed tables
241-
"""
242-
243-
def __init__(self, conn, database):
244-
self.database = database
245-
self._connection = conn
246-
self._heading = Heading(
247-
table_info=dict(
248-
conn=conn, database=database, table_name=self.table_name, context=None
249-
)
250-
)
251-
self._support = [self.full_table_name]
252-
253-
self._definition = """ # job configuration table for `{database}`
254-
table_name :varchar(255) # className of the table
255-
---
256-
key_source_uuid: UUID # hash of the key_source
257-
key_source :mediumblob # sql statement for the key_source of the table - from make_sql()
258-
unique index (table_name, key_source_uuid)
259-
unique index (key_source_uuid)
260-
refresh_rate=1: int unsigned # (second) how often should the jobs for the table be refreshed
261-
refresh_reserved=0: bool # is the jobs for the table currently being refreshed
262-
last_refresh_time: datetime # timestamp (UTC) of the last refresh time for the table
263-
additional_config=null: longblob # (TODO: change to json) any additional configuration (e.g. retries, docker image, etc.)
264-
""".format(
265-
database=database
266-
)
267-
if not self.is_declared:
268-
self.declare()
269-
self._user = self.connection.get_user()
270-
271-
@property
272-
def definition(self):
273-
return self._definition
274-
275-
@property
276-
def table_name(self):
277-
return "~jobs_config"

0 commit comments

Comments
 (0)