Skip to content

Commit 55c8523

Browse files
draft implementation of autopopulate2.0
1 parent 74b8c11 commit 55c8523

4 files changed

Lines changed: 39 additions & 53 deletions

File tree

datajoint/autopopulate.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
from .errors import DataJointError, LostConnectionError
1717
from .expression import AndList, QueryExpression
18-
from .jobs import JobTable2
18+
from .jobs import JobTable
1919

2020
# noinspection PyExceptionInherit,PyCallingNonCallable
2121

@@ -276,8 +276,7 @@ def populate(
276276
"error_list": the error list that is filled if `suppress_errors` is True
277277
"""
278278
if self.connection.in_transaction:
279-
raise DataJointError(
280-
"Populate cannot be called during a transaction.")
279+
raise DataJointError("Populate cannot be called during a transaction.")
281280

282281
valid_order = ["original", "reverse", "random"]
283282
if order not in valid_order:
@@ -349,8 +348,7 @@ def handler(signum, frame):
349348
del self.connection._conn.ctx # SSLContext is not pickleable
350349
with (
351350
mp.Pool(
352-
processes, _initialize_populate, (
353-
self, jobs, populate_kwargs)
351+
processes, _initialize_populate, (self, jobs, populate_kwargs)
354352
) as pool,
355353
(
356354
tqdm(desc="Processes: ", total=nkeys)
@@ -396,8 +394,7 @@ def _populate1(
396394
True if successfully invoke one `make()` call, otherwise False
397395
"""
398396
# use the legacy `_make_tuples` callback.
399-
make = self._make_tuples if hasattr(
400-
self, "_make_tuples") else self.make
397+
make = self._make_tuples if hasattr(self, "_make_tuples") else self.make
401398

402399
if reserve_jobs and not self.jobs.reserve(key):
403400
# unable to reserve the key for the job
@@ -467,8 +464,7 @@ def _populate1(
467464
return key, error if return_exception_objects else error_message
468465
else:
469466
self.connection.commit_transaction()
470-
logger.debug(
471-
f"Success making {key} -> {self.target.full_table_name}")
467+
logger.debug(f"Success making {key} -> {self.target.full_table_name}")
472468
if jobs is not None:
473469
run_duration = time.time() - start_time
474470
if jobs is not None:

datajoint/jobs.py

Lines changed: 25 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,7 @@ class Job(Table):
1919
def __init__(self, computed_table: Table):
2020
"""
2121
Initialize a job table for a specific computed table.
22-
23-
:param computed_table: Database connection
24-
:param database: Database name
25-
:param table_name: Name of the computed table
26-
:param primary_key_attrs: List of primary key attribute names
22+
:param computed_table: AutoPopulate instance
2723
"""
2824
self.database = computed_table.database
2925
self._connection = computed_table.connection
@@ -37,30 +33,29 @@ def __init__(self, computed_table: Table):
3733
conn=computed_table.connection,
3834
database=computed_table.database,
3935
table_name=self.table_name,
40-
context=None
36+
context=None,
4137
)
4238
)
4339

4440
self._definition = f"# job table for {self.computed_table.full_table_name}\n"
4541

46-
# add references to parent tables to the definition
42+
# add primary foreign key references to parent tables to the definition
4743
parents = computed_table.parents(
48-
as_objects=False, primary_key=True, foreign_key_info=True)
44+
as_objects=False, primary_key=True, foreign_key_info=True
45+
)
4946
for parent_name, fk_props in parents:
5047
if not fk_props["aliased"]:
5148
# simple foreign key
5249
self._definition += f"->{parent_name}\n"
5350
else:
5451
# projected foreign key
55-
self._definition += (
56-
"->{parent_name}.proj({proj_list})\n".format(
57-
parent_name=parent_name,
58-
proj_list=",".join(
59-
'{}="{}"'.format(attr, ref)
60-
for attr, ref in fk_props["attr_map"].items()
61-
if ref != attr
62-
)
63-
)
52+
self._definition += "->{parent_name}.proj({proj_list})\n".format(
53+
parent_name=parent_name,
54+
proj_list=",".join(
55+
'{}="{}"'.format(attr, ref)
56+
for attr, ref in fk_props["attr_map"].items()
57+
if ref != attr
58+
),
6459
)
6560
self._definition += """
6661
---
@@ -105,22 +100,22 @@ def refresh(self, key_source):
105100
:param key_source: QueryExpression that defines available jobs
106101
"""
107102
# Get current jobs that should be preserved (reserved, error, ignore)
108-
preserve_statuses = ('reserved', 'error', 'ignore')
109-
existing_jobs = (self & f'status in {preserve_statuses}').fetch('KEY')
103+
preserve_statuses = ("reserved", "error", "ignore")
104+
existing_jobs = (self & f"status in {preserve_statuses}").fetch("KEY")
110105

111106
# Get all keys from key_source
112-
available_keys = key_source.fetch('KEY')
107+
available_keys = key_source.fetch("KEY")
113108

114109
# Remove jobs that are no longer in key_source
115110
for job_key in existing_jobs:
116111
if job_key not in available_keys:
117112
(self & job_key).delete_quick()
118113

119114
# Add new jobs that aren't already in the table
120-
existing_all = self.fetch('KEY')
115+
existing_all = self.fetch("KEY")
121116
for key in available_keys:
122117
if key not in existing_all:
123-
self.insert1(dict(key, status='scheduled', priority=3))
118+
self.insert1(dict(**key, status="scheduled", priority=3))
124119

125120
def reserve(self, key):
126121
"""
@@ -130,7 +125,7 @@ def reserve(self, key):
130125
:return: True if reserved job successfully. False = the job is already taken
131126
"""
132127
job = dict(
133-
key,
128+
**key,
134129
status="reserved",
135130
host=platform.node(),
136131
pid=os.getpid(),
@@ -151,7 +146,7 @@ def ignore(self, key):
151146
:return: True if ignore job successfully. False = the job is already taken
152147
"""
153148
job = dict(
154-
key,
149+
**key,
155150
status="ignore",
156151
host=platform.node(),
157152
pid=os.getpid(),
@@ -173,7 +168,7 @@ def complete(self, key, run_duration=None, run_version=None):
173168
:param run_version: version information (e.g., git commit hash)
174169
"""
175170
job = dict(
176-
key,
171+
**key,
177172
status="success",
178173
run_duration=run_duration,
179174
run_version=json.dumps(run_version) if run_version else None,
@@ -194,13 +189,12 @@ def error(self, key, error_message, error_stack=None):
194189
"""
195190
if len(error_message) > ERROR_MESSAGE_LENGTH:
196191
error_message = (
197-
error_message[: ERROR_MESSAGE_LENGTH -
198-
len(TRUNCATION_APPENDIX)]
192+
error_message[: ERROR_MESSAGE_LENGTH - len(TRUNCATION_APPENDIX)]
199193
+ TRUNCATION_APPENDIX
200194
)
201195

202196
job = dict(
203-
key,
197+
**key,
204198
status="error",
205199
error_message=error_message,
206200
error_stack=error_stack,
@@ -227,12 +221,12 @@ def get_scheduled_jobs(self, limit=None, order_by=None):
227221
query = query.fetch(order_by=order_by)
228222
else:
229223
# Default ordering: priority (ascending), then timestamp (ascending)
230-
query = query.fetch(order_by=['priority', 'timestamp'])
224+
query = query.fetch(order_by=["priority", "timestamp"])
231225

232226
if limit:
233227
query = query[:limit]
234228

235-
return query.fetch('KEY')
229+
return query.fetch("KEY")
236230

237231
def set_priority(self, key, priority):
238232
"""
@@ -241,4 +235,4 @@ def set_priority(self, key, priority):
241235
:param key: the dict of the job's primary key
242236
:param priority: priority value (lower = higher priority)
243237
"""
244-
self.update1(dict(key, priority=priority))
238+
self.update1(dict(**key, priority=priority))

docs/src/compute/distributed.md

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,9 @@
22

33
## Job reservations
44

5-
Running `populate` on the same table on multiple computers will causes them to attempt
5+
Running `populate` on the same table on multiple computers will cause them to attempt
66
to compute the same data all at once.
77
This will not corrupt the data since DataJoint will reject any duplication.
8-
One solution could be to cause the different computing nodes to populate the tables in
9-
random order.
10-
This would reduce some collisions but not completely prevent them.
118

129
To allow efficient distributed computing, DataJoint provides a built-in job reservation
1310
process.

docs/src/concepts/data-model.md

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,23 +36,22 @@ objects in memory with properties and methods for transformations of such data.
3636
## Relational data model
3737

3838
The **relational model** is a way of thinking about data as sets and operations on sets.
39-
Formalized almost a half-century ago ([Codd,
40-
1969](https://dl.acm.org/citation.cfm?doid=362384.362685)). The relational data model is
41-
one of the most powerful and precise ways to store and manage structured data. At its
42-
core, this model organizes all data into tables--representing mathematical
43-
relations---where each table consists of rows (representing mathematical tuples) and
44-
columns (often called attributes).
39+
Formalized half a century ago by Edgar F. Codd (@10.1145/362384.36268), the relational data model is
40+
one of the most powerful and precise ways to define and manage structured data. At its
41+
core, this model organizes all data into tables---representing mathematical
42+
relations---where each table consists of rows (mathematical tuples) and
43+
columns (attributes).
4544

4645
### Core principles of the relational data model
4746

4847
**Data representation:**
49-
Data are represented and manipulated in the form of relations.
48+
Data are represented and manipulated in the form of relations (mathematical sets).
5049
A relation is a set (i.e. an unordered collection) of entities of values for each of
5150
the respective named attributes of the relation.
5251
Base relations represent stored data while derived relations are formed from base
53-
relations through query expressions.
52+
relations through relational operators.
5453
A collection of base relations with their attributes, domain constraints, uniqueness
55-
constraints, and referential constraints is called a schema.
54+
constraints, and referential constraints is called a [schema](../design/schema.md).
5655

5756
**Domain constraints:**
5857
Each attribute (column) in a table is associated with a specific attribute domain (or

0 commit comments

Comments
 (0)