queue_executor#
- class ExecutionSlot(job: _canary.job.BaseJob, qrank: int, qsize: int, spawned: float, worker_id: int, submitted: float = -1.0, started: float = -1.0, finished: float = -1.0)#
Bases:
object- qrank: int#
- qsize: int#
- spawned: float#
- worker_id: int#
- submitted: float = -1.0#
- started: float = -1.0#
- finished: float = -1.0#
- queued() float#
- elapsed() float#
- running() float#
- class JobFunctor#
Bases:
object
- inner_ctx() Any#
Inner job process context.
Default: fork (fast for many tiny tests).
- Override (env var):
CANARY_INNER_START_METHOD=default|inherit -> use mp.get_context() (global default) CANARY_INNER_START_METHOD=spawn -> force spawn CANARY_INNER_START_METHOD=forkserver -> force forkserver CANARY_INNER_START_METHOD=fork -> force fork CANARY_INNER_START_METHOD=auto -> same as unset (defaults to fork)
- class ResourceQueueExecutor(queue: ResourceQueue, executor: Callable, max_workers: int = -1, busy_wait_time: float = 0.01)#
Bases:
objectManages a pool of worker processes with timeout support and metrics collection.
- queue: ResourceQueue#
- submitted: dict[str, ExecutionSlot]#
- running: dict[str, ExecutionSlot]#
- finished: dict[str, ExecutionSlot]#
- entered: bool#
- started_on: float#
- listeners: list[Callable[[...], None]]#
- workers: list[dict[str, Any]]#
- worker_connections: list[Connection]#
- idle_workers: list[int]#
- busy_workers: dict[int, str]#
- slots_by_id: dict[str, ExecutionSlot]#
- property inflight: dict[str, ExecutionSlot]#
- add_listener(callback: Callable[[...], None]) None#
- remove_listener(callback: Callable[[...], None]) None#
- run(**kwargs: Any) int#
- notify_listeners(event: Literal['job_submitted', 'job_started', 'job_finished'], *args: Any) None#
- class Reporter(executor: ResourceQueueExecutor)#
Bases:
object- live_columns: tuple[str, ...]#
- final_columns: tuple[str, ...]#
- validate_columns(columns: tuple[str, ...]) None#
- add_table_columns(table: Table, columns: tuple[str, ...]) None#
- add_table_row(table: Table, columns: tuple[str, ...], **kwargs: str) None#
- final_table() Group#
- class LiveReporter(executor: ResourceQueueExecutor)#
Bases:
Reporter- mute_stream_handlers() None#
- unmute_stream_handlers() None#
- dynamic_table() Group#
- class EventReporter(executor: ResourceQueueExecutor)#
Bases:
Reporter- on_event(event: str, *args, **kwargs) None#
- on_job_submit(slot: ExecutionSlot) None#
- on_job_start(slot: ExecutionSlot) None#
- on_job_finish(slot: ExecutionSlot) None#
- class StaticColumn(header: str, width: int, align: Literal['left', 'right'] = 'left')#
Bases:
object- header: str#
- width: int#
- align: Literal['left', 'right'] = 'left'#
- class StaticTable(columns: list[StaticColumn] | None = None)#
Bases:
object- add_column(header: str, width: int, align: Literal['left', 'right'] = 'left') None#
- render_header() Text#
- render_row(values: list[str]) Text#
- print_header()#
- fmt_secs(x: float, *, na: str = 'NA') str#
- exception CanaryKill#
Bases:
Exception
- exception StuckQueueError#
Bases:
Exception
- exception ParentGone#
Bases:
Exception