batchspec#

class BatchSpec(layout: str, cases: list[_canary.testcase.TestCase])#

Bases: object

layout: str#
cases: list[TestCase]#
id: str#
session: str#
rparameters: dict[str, int]#
exclusive: bool = False#
required_resources() list[dict[str, Any]]#
class TestBatch(spec: BatchSpec, workspace: ExecutionSpace)#

Bases: object

A batch of test cases

Parameters:

cases – The list of test cases in this batch

runtime: float#
jobid: str | None#
dependencies: list[TestBatch]#
display_name(**kwargs: Any) str#
cost() float#
property cpus: int#
property gpus: int#
property cpu_ids: list[str]#
property gpu_ids: list[str]#
find_approximate_runtime() float#
property timeout_multiplier: float#
property timeout: float#
property queue_timeout: float#
total_timeout() float#
estimated_runtime() float#
property resources: dict[str, list[dict]]#

resources is of the form:

resources[type] = [{"id": str, "slots": int}]

If the test required 2 cpus and 2 gpus, resources would look like:

resources = {
    "cpus": [{"id": "1", "slots": 1}, {"id": "2", "slots": 1}],
    "gpus": [{"id": "1", "slots": 1}, {"id": "2", "slots": 1}],
}
assign_resources(arg: dict[str, list[dict]]) None#
free_resources() dict[str, list[dict]]#
required_resources() list[dict[str, Any]]#
property status: BatchStatus#
set_status(state: str | None = None, category: str | None = None, status: str | None = None, reason: str | None = None, code: int = -1) None#
run(backend: Backend, queue: SimpleQueue) None#
getstate() dict[str, Any]#
setstate(data: dict[str, Any])#

Update my state.

Called by the resource queue executor with the results put into the multiprocessing queue after a run

finish() None#
refresh() None#
times() tuple[float | None, float | None, float | None]#

Return total, running, and time in queue

static loadconfig(workspace: str) dict[str, Any]#
setup() None#
save()#