batchspec#
- class BatchSpec(layout: str, cases: list[_canary.testcase.TestCase])#
Bases:
object- layout: str#
- id: str#
- session: str#
- rparameters: dict[str, int]#
- exclusive: bool = False#
- required_resources() list[dict[str, Any]]#
- class TestBatch(spec: BatchSpec, workspace: ExecutionSpace)#
Bases:
objectA batch of test cases
- Parameters:
cases – The list of test cases in this batch
- runtime: float#
- jobid: str | None#
- display_name(**kwargs: Any) str#
- cost() float#
- property cpus: int#
- property gpus: int#
- property cpu_ids: list[str]#
- property gpu_ids: list[str]#
- find_approximate_runtime() float#
- property timeout_multiplier: float#
- property timeout: float#
- property queue_timeout: float#
- total_timeout() float#
- estimated_runtime() float#
- property resources: dict[str, list[dict]]#
resources is of the form:
resources[type] = [{"id": str, "slots": int}]
If the test required 2 cpus and 2 gpus, resources would look like:
resources = { "cpus": [{"id": "1", "slots": 1}, {"id": "2", "slots": 1}], "gpus": [{"id": "1", "slots": 1}, {"id": "2", "slots": 1}], }
- assign_resources(arg: dict[str, list[dict]]) None#
- free_resources() dict[str, list[dict]]#
- required_resources() list[dict[str, Any]]#
- property status: BatchStatus#
- set_status(state: str | None = None, category: str | None = None, status: str | None = None, reason: str | None = None, code: int = -1) None#
- run(backend: Backend, queue: SimpleQueue) None#
- getstate() dict[str, Any]#
- setstate(data: dict[str, Any])#
Update my state.
Called by the resource queue executor with the results put into the multiprocessing queue after a run
- finish() None#
- refresh() None#
- times() tuple[float | None, float | None, float | None]#
Return total, running, and time in queue
- static loadconfig(workspace: str) dict[str, Any]#
- setup() None#
- save()#