queue_executor#

class ExecutionSlot(job: _canary.job.BaseJob, qrank: int, qsize: int, spawned: float, worker_id: int, submitted: float = -1.0, started: float = -1.0, finished: float = -1.0)#

Bases: object

job: BaseJob#
qrank: int#
qsize: int#
spawned: float#
worker_id: int#
submitted: float = -1.0#
started: float = -1.0#
finished: float = -1.0#
queued() float#
elapsed() float#
running() float#
class JobFunctor#

Bases: object

inner_ctx() Any#

Inner job process context.

Default: fork (fast for many tiny tests).

Override (env var):

CANARY_INNER_START_METHOD=default|inherit -> use mp.get_context() (global default) CANARY_INNER_START_METHOD=spawn -> force spawn CANARY_INNER_START_METHOD=forkserver -> force forkserver CANARY_INNER_START_METHOD=fork -> force fork CANARY_INNER_START_METHOD=auto -> same as unset (defaults to fork)

class ResourceQueueExecutor(queue: ResourceQueue, executor: Callable, max_workers: int = -1, busy_wait_time: float = 0.01)#

Bases: object

Manages a pool of worker processes with timeout support and metrics collection.

queue: ResourceQueue#
submitted: dict[str, ExecutionSlot]#
running: dict[str, ExecutionSlot]#
finished: dict[str, ExecutionSlot]#
entered: bool#
started_on: float#
listeners: list[Callable[[...], None]]#
workers: list[dict[str, Any]]#
worker_connections: list[Connection]#
idle_workers: list[int]#
busy_workers: dict[int, str]#
slots_by_id: dict[str, ExecutionSlot]#
property inflight: dict[str, ExecutionSlot]#
add_listener(callback: Callable[[...], None]) None#
remove_listener(callback: Callable[[...], None]) None#
run(**kwargs: Any) int#
notify_listeners(event: Literal['job_submitted', 'job_started', 'job_finished'], *args: Any) None#
class Reporter(executor: ResourceQueueExecutor)#

Bases: object

live_columns: tuple[str, ...]#
final_columns: tuple[str, ...]#
validate_columns(columns: tuple[str, ...]) None#
add_table_columns(table: Table, columns: tuple[str, ...]) None#
add_table_row(table: Table, columns: tuple[str, ...], **kwargs: str) None#
final_table() Group#
class LiveReporter(executor: ResourceQueueExecutor)#

Bases: Reporter

mute_stream_handlers() None#
unmute_stream_handlers() None#
dynamic_table() Group#
class EventReporter(executor: ResourceQueueExecutor)#

Bases: Reporter

on_event(event: str, *args, **kwargs) None#
on_job_submit(slot: ExecutionSlot) None#
on_job_start(slot: ExecutionSlot) None#
on_job_finish(slot: ExecutionSlot) None#
class StaticColumn(header: str, width: int, align: Literal['left', 'right'] = 'left')#

Bases: object

header: str#
width: int#
align: Literal['left', 'right'] = 'left'#
class StaticTable(columns: list[StaticColumn] | None = None)#

Bases: object

add_column(header: str, width: int, align: Literal['left', 'right'] = 'left') None#
render_header() Text#
render_row(values: list[str]) Text#
print_header()#
fmt_secs(x: float, *, na: str = 'NA') str#
exception CanaryKill#

Bases: Exception

exception StuckQueueError#

Bases: Exception

exception ParentGone#

Bases: Exception