job#

class JobPhase(*values)#

Bases: str, Enum

PENDING = 'PENDING'#
SUBMITTED = 'SUBMITTED'#
RUNNING = 'RUNNING'#
DONE = 'DONE'#
class AnyMatcher(choices: set[str])#

Bases: object

choices: set[str]#
class Dependency(job: 'Job', when: str | None)#

Bases: object

job: Job#
when: str | None#
is_satisfied() bool#
is_done() bool#
class JobState(phase: _canary.job.JobPhase = <JobPhase.PENDING: 'PENDING'>)#

Bases: object

phase: JobPhase#
reset() None#
is_pending() bool#
is_submitted() bool#
is_running() bool#
is_done() bool#
class Measurements(data: dict[str, typing.Any]=<factory>)#

Bases: object

data: dict[str, Any]#
add_measurement(name: str, value: Any) None#
update(m: dict | Measurements) None#
reset() None#
items() Generator[tuple[str, Any], None, None]#
class BaseJob#

Bases: ABC

abstract property id: str#
abstractmethod cost() float#
property exclusive: bool#
abstractmethod required_resources() list[dict[str, Any]]#
abstractmethod assign_resources(arg: dict[str, list[dict]]) None#
abstractmethod free_resources() dict[str, list[dict]]#
abstractmethod refresh_readiness() None#

Side-effecting: may mark job DONE + BLOCKED, etc.

abstractmethod is_runnable() bool#

True if it could still run in the future.

abstractmethod is_ready() bool#

True if it can be dispatched now.

validate_enqueuable() None#
on_submitted() None#
on_started() None#
on_finished() None#
abstractmethod total_timeout() float#
abstractmethod refresh() None#
abstractmethod save() None#
set_status(category: str | None = None, outcome: str | None = None, reason: str | None = None, code: int = -1) None#
abstractmethod display_name(**kwargs: Any) str#
add_measurement(name: str, value: Any) None#
class Job(spec: JobSpec, workspace: ExecutionSpace, dependencies: list[Dependency] | None = None)#

Bases: BaseJob

launcher: Launcher#
variables: dict[str, str | None]#
dependencies: list[Dependency]#
property id: str#
property exclusive: bool#
get_artifacts() list[str]#
property upstreams: list[Job]#
property stdout: str#
property stderr: str | None#
property file: Path#
property name: str#
property timeout: float#
total_timeout() float#
property attributes: dict[str, Any]#
property mask: Mask#
property view_path: Path#
display_name(**kwargs) str#
set_status(category: str | None = None, outcome: str | None = None, reason: str | None = None, code: int = -1) None#
add_variables(**kwds: str) None#
statline(style: Literal['none', 'rich', 'html'] = 'none') str#
set_attribute(name: str, value: Any) None#
set_attributes(**kwds: Any) None#
get_attribute(name: str, default: None = None, /) None | Any#
property cpus: int#
property gpus: int#
property nodes: int#
property cpu_ids: list[str]#
property gpu_ids: list[str]#
cost() float#
property runtime: float#
size() float#
property resources: dict[str, list[dict]]#

resources is of the form:

resources[type] = [{"id": str, "slots": int}]

If the test required 2 cpus and 2 gpus, resources would look like:

resources = {
    "cpus": [{"id": "1", "slots": 1}, {"id": "2", "slots": 1}],
    "gpus": [{"id": "1", "slots": 1}, {"id": "2", "slots": 1}],
}
assign_resources(arg: dict[str, list[dict]]) None#
free_resources() dict[str, list[dict]]#
required_resources() list[dict[str, Any]]#
is_done() bool#
dropped() bool#
is_runnable() bool#

True if this job could still run in the future.

refresh_readiness() None#

Side-effecting: may mark job DONE + BLOCKED, etc.

is_ready() bool#

True if it can be dispatched now.

property lockfile: Path#
create_workspace() None#
restore_workspace() None#
setup() None#
run() None#
getstate() dict[str, Any]#
setstate(data: dict[str, Any]) None#

The companion of getstate, save results from the test ran in a child process in the parent process

do_baseline() None#
update_status_from_exit_code(*, code: int | str) None#
refresh() None#
set_runtime_env(env: MutableMapping[str, str]) None#
get_environ_from_spec() dict[str, str | None]#
get_resource_parameters_from_spec() dict[str, int]#

Default parameters used to set up resources required by test job

teardown() None#
finish() None#
save() None#
read_output(compress: bool = False) str#
load_cached_runs() dict[str, Any] | None#
cache_last_run() None#

store relevant information for this run

load_job_from_file(arg: Path | str | None) Job#
load_job_from_state(lock_data: dict) Job#
find_cache_dir(start: Path) Path | None#
ceil_div(a: int, b: int) int#
exception MissingSourceError#

Bases: Exception

exception InvalidTypeError(name, value)#

Bases: Exception