job#
- class JobPhase(*values)#
Bases:
str,Enum- PENDING = 'PENDING'#
- SUBMITTED = 'SUBMITTED'#
- RUNNING = 'RUNNING'#
- DONE = 'DONE'#
- class Dependency(job: 'Job', when: str | None)#
Bases:
object- when: str | None#
- is_satisfied() bool#
- is_done() bool#
- class JobState(phase: _canary.job.JobPhase = <JobPhase.PENDING: 'PENDING'>)#
Bases:
object- reset() None#
- is_pending() bool#
- is_submitted() bool#
- is_running() bool#
- is_done() bool#
- class Measurements(data: dict[str, typing.Any]=<factory>)#
Bases:
object- data: dict[str, Any]#
- add_measurement(name: str, value: Any) None#
- update(m: dict | Measurements) None#
- reset() None#
- items() Generator[tuple[str, Any], None, None]#
- class BaseJob#
Bases:
ABC- abstract property id: str#
- abstractmethod cost() float#
- property exclusive: bool#
- abstractmethod required_resources() list[dict[str, Any]]#
- abstractmethod assign_resources(arg: dict[str, list[dict]]) None#
- abstractmethod free_resources() dict[str, list[dict]]#
- abstractmethod refresh_readiness() None#
Side-effecting: may mark job DONE + BLOCKED, etc.
- abstractmethod is_runnable() bool#
True if it could still run in the future.
- abstractmethod is_ready() bool#
True if it can be dispatched now.
- validate_enqueuable() None#
- on_submitted() None#
- on_started() None#
- on_finished() None#
- abstractmethod total_timeout() float#
- abstractmethod refresh() None#
- abstractmethod save() None#
- set_status(category: str | None = None, outcome: str | None = None, reason: str | None = None, code: int = -1) None#
- abstractmethod display_name(**kwargs: Any) str#
- add_measurement(name: str, value: Any) None#
- class Job(spec: JobSpec, workspace: ExecutionSpace, dependencies: list[Dependency] | None = None)#
Bases:
BaseJob- variables: dict[str, str | None]#
- dependencies: list[Dependency]#
- property id: str#
- property exclusive: bool#
- get_artifacts() list[str]#
- property stdout: str#
- property stderr: str | None#
- property file: Path#
- property name: str#
- property timeout: float#
- total_timeout() float#
- property attributes: dict[str, Any]#
- property view_path: Path#
- display_name(**kwargs) str#
- set_status(category: str | None = None, outcome: str | None = None, reason: str | None = None, code: int = -1) None#
- add_variables(**kwds: str) None#
- statline(style: Literal['none', 'rich', 'html'] = 'none') str#
- set_attribute(name: str, value: Any) None#
- set_attributes(**kwds: Any) None#
- get_attribute(name: str, default: None = None, /) None | Any#
- property cpus: int#
- property gpus: int#
- property nodes: int#
- property cpu_ids: list[str]#
- property gpu_ids: list[str]#
- cost() float#
- property runtime: float#
- size() float#
- property resources: dict[str, list[dict]]#
resources is of the form:
resources[type] = [{"id": str, "slots": int}]
If the test required 2 cpus and 2 gpus, resources would look like:
resources = { "cpus": [{"id": "1", "slots": 1}, {"id": "2", "slots": 1}], "gpus": [{"id": "1", "slots": 1}, {"id": "2", "slots": 1}], }
- assign_resources(arg: dict[str, list[dict]]) None#
- free_resources() dict[str, list[dict]]#
- required_resources() list[dict[str, Any]]#
- is_done() bool#
- dropped() bool#
- is_runnable() bool#
True if this job could still run in the future.
- refresh_readiness() None#
Side-effecting: may mark job DONE + BLOCKED, etc.
- is_ready() bool#
True if it can be dispatched now.
- property lockfile: Path#
- create_workspace() None#
- restore_workspace() None#
- setup() None#
- run() None#
- getstate() dict[str, Any]#
- setstate(data: dict[str, Any]) None#
The companion of getstate, save results from the test ran in a child process in the parent process
- do_baseline() None#
- update_status_from_exit_code(*, code: int | str) None#
- refresh() None#
- set_runtime_env(env: MutableMapping[str, str]) None#
- get_environ_from_spec() dict[str, str | None]#
- get_resource_parameters_from_spec() dict[str, int]#
Default parameters used to set up resources required by test job
- teardown() None#
- finish() None#
- save() None#
- read_output(compress: bool = False) str#
- load_cached_runs() dict[str, Any] | None#
- cache_last_run() None#
store relevant information for this run
- find_cache_dir(start: Path) Path | None#
- ceil_div(a: int, b: int) int#
- exception MissingSourceError#
Bases:
Exception
- exception InvalidTypeError(name, value)#
Bases:
Exception