11import os
22import datetime
33import platform
4+ import json
5+ from typing import Dict , Any , Union
46
57from .errors import DuplicateError
68from .hash import key_hash
@@ -32,18 +34,19 @@ def __init__(self, conn, database):
3234 key_hash :char(32) # key hash
3335 ---
3436 status :enum('reserved','error','ignore','scheduled','success')
35- key=null :blob # structure containing the key
37+ key=null :json # structure containing the key for querying
3638 error_message="" :varchar({error_message_length}) # error message returned if failed
3739 error_stack=null :mediumblob # error stack if failed
3840 user="" :varchar(255) # database user
3941 host="" :varchar(255) # system hostname
4042 pid=0 :int unsigned # system process id
4143 connection_id = 0 : bigint unsigned # connection_id()
42- timestamp :timestamp # the scheduled time (UTC) for the job to run at or after
44+ timestamp :timestamp # timestamp of the job status change or scheduled time
4345 run_duration=null : float # run duration in seconds
4446 run_version="" : varchar(255) # some string representation of the code/env version of a run (e.g. git commit hash)
4547 index(table_name, status)
4648 index(status)
49+ index(timestamp) # for ordering jobs
4750 """ .format (
4851 database = database , error_message_length = ERROR_MESSAGE_LENGTH
4952 )
@@ -69,13 +72,24 @@ def drop(self):
6972
7073 def schedule (self , table_name , key , seconds_delay = 0 , force = False ):
7174 """
72- Schedule a job for computation.
75+ Schedule a job for computation in the DataJoint pipeline .
7376
74- :param table_name: `database`.`table_name`
75- :param key: the dict of the job's primary key
76- :param seconds_delay: add time delay (in second) in scheduling this job
77- :param force: force scheduling this job (even if it is in error/ignore status)
78- :return: True if schedule job successfully. False = the jobs already exists with a different status
77+ This method manages job scheduling with the following key behaviors:
78+ 1. Creates a new job entry if one doesn't exist
79+ 2. Updates existing jobs based on their current status:
80+ - Allows rescheduling if job is in error/ignore status and force=True
81+ - Prevents rescheduling if job is already scheduled/reserved/success
82+ 3. Records job metadata including host, process ID, and user info
83+ 4. Supports delayed execution through seconds_delay parameter
84+
85+ Args:
86+ table_name: Full table name in format `database`.`table_name`
87+ key: Dictionary containing the job's primary key
88+ seconds_delay: Optional delay in seconds before job execution (default: 0)
89+ force: If True, allows rescheduling jobs in error/ignore status (default: False)
90+
91+ Returns:
92+ bool: True if job was successfully scheduled, False if job already exists with incompatible status
7993 """
8094 job_key = dict (table_name = table_name , key_hash = key_hash (key ))
8195 if self & job_key :
@@ -91,7 +105,7 @@ def schedule(self, table_name, key, seconds_delay=0, force=False):
91105 host = platform .node (),
92106 pid = os .getpid (),
93107 connection_id = self .connection .connection_id ,
94- key = key ,
108+ key = _jsonify ( key ) ,
95109 user = self ._user ,
96110 timestamp = datetime .datetime .utcnow ()
97111 + datetime .timedelta (seconds = seconds_delay ),
@@ -122,7 +136,7 @@ def reserve(self, table_name, key):
122136 host = platform .node (),
123137 pid = os .getpid (),
124138 connection_id = self .connection .connection_id ,
125- key = key ,
139+ key = _jsonify ( key ) ,
126140 user = self ._user ,
127141 timestamp = datetime .datetime .utcnow (),
128142 )
@@ -160,7 +174,7 @@ def ignore(self, table_name, key, message=""):
160174 host = platform .node (),
161175 pid = os .getpid (),
162176 connection_id = self .connection .connection_id ,
163- key = key ,
177+ key = _jsonify ( key ) ,
164178 error_message = message ,
165179 user = self ._user ,
166180 timestamp = datetime .datetime .utcnow (),
@@ -196,7 +210,7 @@ def complete(self, table_name, key, run_duration=None, run_version=""):
196210 pid = os .getpid (),
197211 connection_id = self .connection .connection_id ,
198212 user = self ._user ,
199- key = key ,
213+ key = _jsonify ( key ) ,
200214 run_duration = run_duration ,
201215 run_version = run_version ,
202216 timestamp = datetime .datetime .utcnow (),
@@ -230,11 +244,19 @@ def error(self, table_name, key, error_message, error_stack=None):
230244 pid = os .getpid (),
231245 connection_id = self .connection .connection_id ,
232246 user = self ._user ,
233- key = key ,
247+ key = _jsonify ( key ) ,
234248 error_message = error_message ,
235249 error_stack = error_stack ,
236250 timestamp = datetime .datetime .utcnow (),
237251 ),
238252 replace = True ,
239253 ignore_extra_fields = True ,
240254 )
255+
256+
257+ def _jsonify (key : Dict [str , Any ]) -> Dict [str , Any ]:
258+ """
259+ Ensure the key is JSON serializable by converting to JSON and back.
260+ Uses str() as fallback for any non-serializable objects.
261+ """
262+ return json .loads (json .dumps (key , default = str ))
0 commit comments