batchspec#

class BatchSpec(layout: str, jobs: list[_canary.job.Job], dependencies: list['BatchSpec'] = <factory>)#

Bases: object

layout: str#
jobs: list[Job]#
dependencies: list[BatchSpec]#
id: str#
session: str#
rparameters: dict[str, int]#
exclusive: bool = False#
required_resources() list[dict[str, Any]]#
class TestBatch(spec: BatchSpec, workspace: ExecutionSpace, dependencies: list[TestBatch] | None = None, backend_supports_dependencies: bool = False)#

Bases: BaseJob

A batch of jobs

Parameters:

jobs – The list of jobs in this batch

status: BatchStatus#
runtime: float#
jobid: str | None#
dependencies: list[TestBatch]#
property id: str#
display_name(**kwargs: Any) str#
cost() float#
property cpus: int#
property gpus: int#
property cpu_ids: list[str]#
property gpu_ids: list[str]#
find_approximate_runtime() float#
property timeout_multiplier: float#
property timeout: float#
property queue_timeout: float#
total_timeout() float#
estimated_runtime() float#
property resources: dict[str, list[dict]]#

resources is of the form:

resources[type] = [{"id": str, "slots": int}]

If the test required 2 cpus and 2 gpus, resources would look like:

resources = {
    "cpus": [{"id": "1", "slots": 1}, {"id": "2", "slots": 1}],
    "gpus": [{"id": "1", "slots": 1}, {"id": "2", "slots": 1}],
}
assign_resources(arg: dict[str, list[dict]]) None#
free_resources() dict[str, list[dict]]#
required_resources() list[dict[str, Any]]#
dependency_batches_submitted() bool#
dependency_batches_complete() bool#
refresh_readiness() None#

Side-effecting: may mark job DONE + BLOCKED, etc.

is_runnable() bool#

True if it could still run in the future.

is_ready() bool#

True if it can be dispatched now.

run(backend: Backend, queue: SimpleQueue) None#
getstate() dict[str, Any]#
setstate(data: dict[str, Any])#

Update my state.

Called by the resource queue executor with the results put into the multiprocessing queue after a run

finish() None#
refresh() None#
times() tuple[float | None, float | None, float | None]#

Return total, running, and time in queue

static loadconfig(workspace: str) dict[str, Any]#
setup() None#
save()#
set_status(category: str | None = None, outcome: str | None = None, reason: str | None = None, code: int = -1) None#