[cuebot/proto/pycue/cuegui/docs] Add startTime() and stopTime() to Layer#2338
Conversation
Adds `submissionTime()` to `Frame`, exposing the parent job's submission timestamp directly on the frame object. This avoids requiring callers to fetch the parent job or overload `eligibleTime()` to infer submission time. `Frame.startTime()` represents when the frame began executing on a render host, not when the job was submitted. Since `Job.startTime()` and `Layer.startTime()` already serve as submission timestamps for those objects, only `Frame` needed this additional accessor. With `submissionTime()`, callers can now compute frame lifecycle timing directly from a single `Frame` object: blocked_by_depends = frame.eligibleTime() - frame.submissionTime() blocked_by_pickup = frame.startTime() - frame.eligibleTime() total_turnaround = frame.stopTime() - frame.submissionTime() Proto files - Adds `submission_time` field to `Frame`. Cuebot - Updates `WhiteboardDaoJdbc.FRAME_MAPPER` to populate `submission_time` from the existing `job.ts_started` join (already aliased as `job_ts_started` for the `eligibleTime()` fallback). - No database migration is required, since the source column already exists. PyCue - Adds `Frame.submissionTime()`. - Includes a new unit test covering the Python wrapper. CueGUI - Adds a new "Submission Time" column to `FrameMonitorTree`, positioned next to the existing "Eligible Time" column. Docs - Adds `submissionTime` to the Frame REST API reference schema and example payloads. - Updates the Cuetopia monitoring guide to document the new column.
- New "Submission Time" column shifted "Last Line" visual index by +1 - LASTLINE_COLUMN bounds the redrawRunning() dataChanged range; bump to keep running-row repaints aligned.
- LASTLINE_COLUMN, RUNTIME_COLUMN, MEMORY_COLUMN, CHECKPOINT_COLUMN, and PROC_COLUMN drifted from their intended columns over years of insertions (last touched 2018). - redrawRunning() was emitting dataChanged for the wrong range, silently missing Runtime/Memory/Last Line repaints on running frames. - PROC_COLUMN highlighting/alignment was landing on GPUs, not Host; CHECKPOINT_COLUMN icon was landing on Retries, not CheckP. - Update all stale constants and add a header note so future column insertions don't re-stale them.
Layer previously had no start or stop time accessor - callers wanting the window over which a layer's frames actually ran had to fetch every frame and compute MIN/MAX client-side. This is wasteful in cuegui (N extra RPCs per refresh for a job with N layers) and missing entirely for tools that only query layers. Both values are aggregated server-side from the layer's frames: - `Layer.startTime() = MIN(frame.ts_started)` - `Layer.stopTime() = MAX(frame.ts_stopped), only once every frame stopped` `stopTime` stays 0 while any frame is still pending, running, or in DEPEND, mirroring `Job.stopTime()` so callers can use the same "is it done?" idiom on both. Proto files - Adds `start_time` (24) and `stop_time` (25) to the `Layer` message. Cuebot - Extends `GET_LAYER_WITH_LIMITS` with two correlated subqueries against the `frame` table: `MIN(ts_started)` and a `CASE WHEN COUNT(*) FILTER (WHERE ts_stopped IS NULL) = 0 THEN MAX(ts_stopped) END` that only returns a value when every frame has stopped. - Updates `WhiteboardDaoJdbc.LAYER_MAPPER` to set `start_time`/`stop_time` from those subqueries; leaves the proto default (0) when the subquery returns NULL. - No database migration is required; the aggregate uses the existing `i_frame_pkjoblayer` index. PyCue - Adds `Layer.startTime(format=None)` and `Layer.stopTime(format=None)`, mirroring `Job.startTime()`'s format-string behavior. - New unit tests for epoch and formatted output of both accessors. CueGUI - Adds "Start Time" and "Stop Time" columns to `LayerMonitorTree`, alongside the existing "Eligible" column. Docs - Adds `startTime` and `stopTime` to the Layer example payload in the REST API reference. - Documents the new monitor-tree columns in the Cuetopia monitoring guide.
📝 WalkthroughWalkthroughThis PR extends the OpenCue job tracking system to surface aggregated layer lifecycle timestamps (start_time, stop_time) and frame submission timestamps across the full stack: proto message contracts, PostgreSQL schema with real-time trigger aggregation, Java data access layer mapping, Python wrapper accessors, CueGUI display columns, REST API documentation, and a version bump. ChangesFrame and Layer Timestamp Tracking
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related issues
Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Note: PR created on top of PR #2337 |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
proto/src/job.proto (1)
526-526: 🏗️ Heavy liftAvoid adding new public timestamps as
int32.These newly added fields overflow on January 19, 2038. Since you're introducing them now, this is the optimal point to use
int64epoch seconds orgoogle.protobuf.Timestampinstead. A schema widening later would be a breaking change.Also applies to: 725-726
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@proto/src/job.proto` at line 526, The new timestamp field submission_time is declared as int32 which will overflow in 2038; change its type to either int64 (epoch seconds) or google.protobuf.Timestamp and update the proto imports accordingly (add google/protobuf/timestamp.proto if using Timestamp); apply the same change to the other newly added int32 timestamp fields referenced in this diff (the other timestamp fields added later) and regenerate any affected stubs so consumers use the widened type.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@proto/src/job.proto`:
- Line 526: The new timestamp field submission_time is declared as int32 which
will overflow in 2038; change its type to either int64 (epoch seconds) or
google.protobuf.Timestamp and update the proto imports accordingly (add
google/protobuf/timestamp.proto if using Timestamp); apply the same change to
the other newly added int32 timestamp fields referenced in this diff (the other
timestamp fields added later) and regenerate any affected stubs so consumers use
the widened type.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 59982177-a699-412a-bb14-e29cf5765061
📒 Files selected for processing (11)
VERSION.incuebot/src/main/java/com/imageworks/spcue/dao/postgres/WhiteboardDaoJdbc.javacuegui/cuegui/FrameMonitorTree.pycuegui/cuegui/LayerMonitorTree.pydocs/_docs/reference/rest-api-reference.mddocs/_docs/user-guides/cuetopia-monitoring-guide.mdproto/src/job.protopycue/opencue/wrappers/frame.pypycue/opencue/wrappers/layer.pypycue/tests/wrappers/test_frame.pypycue/tests/wrappers/test_layer.py
|
Hi @DiegoTavares / @lithorus |
Script to test the startTime() and stopTime() of a Layer#!/usr/bin/env python3
"""
Smoke-tests the new Layer.startTime() and Layer.stopTime() accessors.
Submits a job with two layers wired so layer_b depends on layer_a. Each
layer has multiple frames so the layer-level aggregation has something
to chew on:
Layer.startTime() = MIN(frame.ts_started)
Layer.stopTime() = MAX(frame.ts_stopped), only once every frame stopped
The dependency chain guarantees the timeline below is reachable in one run:
t0 submit layer_a.startTime == 0, layer_a.stopTime == 0
layer_b.startTime == 0, layer_b.stopTime == 0
t1 layer_a frames running layer_a.startTime > 0, layer_a.stopTime == 0
layer_b.startTime == 0, layer_b.stopTime == 0
t2 layer_a done, layer_b run layer_a.startTime > 0, layer_a.stopTime > 0
layer_b.startTime > 0, layer_b.stopTime == 0
t3 job finished both layers: startTime > 0 and stopTime > 0
For each snapshot the script also cross-checks the server-side aggregate
against the same MIN/MAX computed client-side from the layer's frames -
they should agree once frames have stopped, and the layer.stopTime() must
stay at 0 while any frame is still pending/running/DEPEND.
Usage:
python test_layer_start_stop_time_api.py # submit + initial snapshot, exit
python test_layer_start_stop_time_api.py --sleep 20 # /bin/sleep 20 per frame
python test_layer_start_stop_time_api.py --wait # submit + poll until done + final dump
python test_layer_start_stop_time_api.py --verify # skip submission; dump matching jobs
python test_layer_start_stop_time_api.py --prefix my_test # custom job-name prefix
"""
import argparse
import sys
import time
import opencue
import outline
import outline.cuerun
from outline.modules.shell import Shell
DEFAULT_SLEEP_SECONDS = 10
DEFAULT_FRAMES_PER_LAYER = 3
DEFAULT_PREFIX = "layer_time_test"
SHOW = "testing"
SHOT = "testshot"
def fmt(epoch):
if not epoch:
return "--"
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(epoch))
def fmt_delta(seconds):
if seconds is None:
return "--"
return "%d s" % seconds
def submit_job(short_name, sleep_seconds, frames_per_layer):
"""Submit a two-layer job; layer_b depends on layer_a."""
ol = outline.Outline(short_name, shot=SHOT, show=SHOW)
layer_a = Shell(
"layer_a",
command=["/bin/sleep", str(sleep_seconds)],
range="1-%d" % frames_per_layer,
)
layer_b = Shell(
"layer_b",
command=["/bin/sleep", str(sleep_seconds)],
range="1-%d" % frames_per_layer,
)
ol.add_layer(layer_a)
ol.add_layer(layer_b)
layer_b.depend_all(layer_a)
outline.cuerun.launch(ol, use_pycuerun=False)
def find_job(short_name):
"""Resolve a short outline name to its opencue Job (matches any state)."""
candidates = opencue.api.getJobs(substr=[short_name], include_finished=True)
for job in candidates:
if job.name().endswith("_%s" % short_name):
return job
raise RuntimeError("Could not find submitted job: %s" % short_name)
def frame_minmax(layer):
"""Compute MIN(start) / MAX(stop) client-side from the layer's frames.
Returns (client_min_start, client_max_stop, any_unfinished).
`any_unfinished` is True when at least one frame has not stopped yet
(state != SUCCEEDED/EATEN); the server-side stopTime() must be 0 in
that case.
"""
frames = layer.getFrames()
starts = [f.startTime() for f in frames if f.startTime()]
stops = [f.stopTime() for f in frames if f.stopTime()]
finished_states = (
opencue.api.job_pb2.SUCCEEDED,
opencue.api.job_pb2.EATEN,
)
any_unfinished = any(f.state() not in finished_states for f in frames)
return (
min(starts) if starts else 0,
max(stops) if stops else 0,
any_unfinished,
)
def dump_layer(layer, job_submission):
layer_start = layer.startTime()
layer_stop = layer.stopTime()
client_min_start, client_max_stop, any_unfinished = frame_minmax(layer)
print(" LAYER: %s" % layer.name())
print(" startTime() : %s" % fmt(layer_start))
print(" stopTime() : %s" % fmt(layer_stop))
print(" client MIN(frame start) : %s" % fmt(client_min_start))
print(" client MAX(frame stop) : %s" % fmt(client_max_stop))
print(" any frame unfinished? : %s" % any_unfinished)
# Cross-checks against the documented semantics.
if layer_start and client_min_start and layer_start != client_min_start:
print(" WARNING: layer.startTime() (%d) != MIN(frame.startTime) (%d)"
% (layer_start, client_min_start))
if any_unfinished and layer_stop:
print(" WARNING: layer.stopTime() (%d) is non-zero while frames "
"are still unfinished" % layer_stop)
if not any_unfinished and client_max_stop and layer_stop != client_max_stop:
print(" WARNING: layer.stopTime() (%d) != MAX(frame.stopTime) (%d)"
% (layer_stop, client_max_stop))
if layer_start and layer_stop and layer_stop < layer_start:
print(" WARNING: layer.stopTime() (%d) precedes startTime() (%d)"
% (layer_stop, layer_start))
if job_submission and layer_start and layer_start < job_submission:
print(" WARNING: layer.startTime() (%d) precedes job submission (%d)"
% (layer_start, job_submission))
span = (layer_stop - layer_start) if (layer_start and layer_stop) else None
wait_after_submit = (
(layer_start - job_submission)
if (layer_start and job_submission) else None
)
print(" layer wall-clock span (stop-start) : %s" % fmt_delta(span))
print(" wait after job submission : %s" % fmt_delta(wait_after_submit))
def dump_job(job):
print("=" * 72)
print("JOB: %s" % job.name())
print(" state : %s" % opencue.api.job_pb2.JobState.Name(job.state()))
print(" startTime() : %s <-- job submission time" % fmt(job.startTime()))
print(" stopTime() : %s" % fmt(job.stopTime()))
for layer in job.getLayers():
dump_layer(layer, job.startTime())
def all_finished(job_short_name):
finished_state = opencue.api.job_pb2.FINISHED
try:
live = find_job(job_short_name)
except RuntimeError:
return False
return live.state() == finished_state
def wait_for_finish(short_name, poll_interval=5, max_wait=None):
print("\nWaiting for %s to finish (poll every %ds)..." % (short_name, poll_interval))
start = time.time()
while True:
if all_finished(short_name):
print("Job finished after %d seconds." % int(time.time() - start))
return True
if max_wait is not None and (time.time() - start) > max_wait:
print("Timed out after %d seconds; job still running." % max_wait)
return False
time.sleep(poll_interval)
def load_jobs_by_prefix(prefix):
return opencue.api.getJobs(substr=[prefix], include_finished=True)
def main():
parser = argparse.ArgumentParser(
description="Submit a two-layer job and verify Layer.startTime()/stopTime().")
parser.add_argument("--sleep", type=int, default=DEFAULT_SLEEP_SECONDS,
help="Seconds to sleep per frame (default: %d)" % DEFAULT_SLEEP_SECONDS)
parser.add_argument("--frames", type=int, default=DEFAULT_FRAMES_PER_LAYER,
help="Frames per layer (default: %d)" % DEFAULT_FRAMES_PER_LAYER)
parser.add_argument("--prefix", default=DEFAULT_PREFIX,
help="Job-name prefix for submit/verify (default: %s)" % DEFAULT_PREFIX)
parser.add_argument("--verify", action="store_true",
help="Skip submission; dump existing jobs matching --prefix.")
parser.add_argument("--wait", action="store_true",
help="After submitting, poll until the job finishes before final dump.")
parser.add_argument("--max-wait", type=int, default=None,
help="Max seconds to wait when --wait is set (default: no limit).")
args = parser.parse_args()
if args.verify:
jobs = load_jobs_by_prefix(args.prefix)
if not jobs:
print("No jobs found matching prefix: %s" % args.prefix)
sys.exit(1)
print("Found %d jobs matching '%s':" % (len(jobs), args.prefix))
for job in jobs:
dump_job(job)
return
short_name = "%s_%d" % (args.prefix, int(time.time()))
print("Submitting job %s (2 layers, %d frames each, sleep %ds)..."
% (short_name, args.frames, args.sleep))
submit_job(short_name, args.sleep, args.frames)
# Give Cuebot a moment to land the job before the first lookup.
time.sleep(1.0)
job = find_job(short_name)
print("\nInitial snapshot (layer_a probably still pending/running, "
"layer_b in DEPEND):")
dump_job(job)
if args.wait:
wait_for_finish(short_name, max_wait=args.max_wait)
print("\nFinal snapshot:")
dump_job(find_job(short_name))
else:
print("\nSubmission complete. Re-run with --verify --prefix %s once the"
" job finishes, or use --wait to block." % args.prefix)
if __name__ == "__main__":
main() |
DiegoTavares
left a comment
There was a problem hiding this comment.
Please report on the query impact, specially taking into consideration that some layers can have a few thousand frames.
- GET_LAYER_WITH_LIMITS previously aggregated MIN(ts_started) and MAX(ts_stopped) from the frame table via two correlated subqueries on every read. For an N-layer job each CueGUI layer-panel refresh ran 2N aggregates, each scanning every frame row for the layer via i_frame_pkjoblayer -- O(total frames in the job) per refresh, repeated on every poll. - Add ts_started / ts_stopped (TIMESTAMP WITH TIME ZONE) to layer_stat and maintain them from the existing trigger__update_frame_status_counts trigger that already fires AFTER UPDATE ON frame on every state change: * entry to RUNNING stamps ts_started from NEW.ts_started via COALESCE (first-writer-wins; retries do not bump it). * exit from RUNNING stamps ts_stopped from NEW.ts_stopped (latest-writer-wins). - The trigger copies values from the just-updated frame row instead of sampling current_timestamp, so layer_stat.ts_stopped agrees exactly with MAX(frame.ts_stopped). This matters because FrameDaoJdbc.UPDATE_FRAME_STOPPED writes "ts_stopped = current_timestamp + interval '1' second" on the frame to guarantee a non-zero duration on instant frames -- a current_timestamp sample in the trigger would land the layer one second earlier than the frame and break client-side cross-checks. COALESCE(..., current_timestamp) remains as a defensive fallback for callers that update state without restamping the timestamps. - V43 migration backfills both columns from existing frame aggregates so current layers report immediately. No follow-up admin step required. - GET_LAYER_WITH_LIMITS now reads layer_stat.ts_started and layer_stat.ts_stopped directly -- two column reads instead of two aggregates. - The "stay at 0 until the whole layer is done" stop-time semantic moves to WhiteboardDaoJdbc.LAYER_MAPPER as a counter check (int_waiting_count + int_running_count + int_depend_count == 0) against columns already on layer_stat -- no extra aggregate at write or read time. - Matches how job.ts_started / job.ts_stopped are already denormalized on the job row and updated on state transitions.
DiegoTavares
left a comment
There was a problem hiding this comment.
Pre-approved. There are still conflicts to be solved before merging.
# Conflicts: # VERSION.in # cuebot/src/main/java/com/imageworks/spcue/dao/postgres/WhiteboardDaoJdbc.java
- Re-add layer_stat.ts_started AS layer_ts_started and layer_stat.ts_stopped AS layer_ts_stopped to the GET_LAYER SELECT - Master merge dropped these columns while LAYER_MAPPER still reads them, which would throw a SQLException on any getLayer/getLayers call - Restores the layer start/stop time feature end-to-end
Conflicts fixed! Code updated. Thanks, @DiegoTavares |
aa359d5
into
AcademySoftwareFoundation:master

Related Issues
Summarize your change.
[cuebot/proto/pycue/cuegui/docs] Add Layer startTime() and stopTime()
Layer previously had no start/stop time accessors. Callers that needed the execution window for a layer had to fetch every frame and compute MIN/MAX client-side, which was wasteful in CueGUI (N extra RPCs per refresh for a job with N layers) and unavailable to tools that only queried layers.
This change denormalizes layer timing onto
layer_statand exposes the values directly on the Layer API:Layer.startTime()=layer_stat.ts_started(stamped on first entry to RUNNING)Layer.stopTime()=layer_stat.ts_stopped(stamped on each exit from RUNNING, but surfaced as 0 until every frame on the layer has stopped)stopTime()remains0while any frame is still pending, running, or in DEPEND, mirroringJob.stopTime()so callers can use the same "is it done?" idiom consistently.Proto
start_time(field 24) andstop_time(field 25) to theLayermessage.Cuebot
ts_startedandts_stopped(TIMESTAMP WITH TIME ZONE) tolayer_stat.trigger__update_frame_status_countstrigger (AFTER UPDATE ON frame):ts_startedfromNEW.ts_startedviaCOALESCE(...)(first-writer-wins; retries do not update it)ts_stoppedfromNEW.ts_stopped(latest-writer-wins)current_timestamp, ensuringlayer_stat.ts_stoppedexactly matchesMAX(frame.ts_stopped).COALESCE(..., current_timestamp)as a fallback for callers that change state without explicitly updating timestamps.GET_LAYERandGET_LAYER_WITH_LIMITSto readlayer_stat.ts_startedandlayer_stat.ts_stoppeddirectly, replacing two correlated frame-table aggregates per layer query.WhiteboardDaoJdbc.LAYER_MAPPERusing existing counters:int_waiting_count + int_running_count + int_depend_count == 0job.ts_started/job.ts_stopped.PyCue
Layer.startTime(format=None)andLayer.stopTime(format=None), matchingJob.startTime()formatting behavior.CueGUI
LayerMonitorTree, alongside the existing "Eligible" column.Docs
startTimeandstopTimeto the Layer example payload in the REST API reference.VERSION.in