database#

class WorkspaceDatabase(root: Path)#

Bases: object

Database wrapper

property connection: Connection#
listener() ResultListener#
close() None#
classmethod create(path: Path) WorkspaceDatabase#
classmethod load(path: Path) WorkspaceDatabase#
connect() None#
put_specs(specs: list[ResolvedSpec]) None#
resolve_spec_id(id: str) str | None#
resolve_spec_ids(ids: list[str])#

Given partial spec IDs in ids, expand them to their full size

load_specs(ids: list[str] | None = None, include_upstreams: bool = False) list[ResolvedSpec]#
load_specs_by_tagname(tag: str) list[ResolvedSpec]#
get_edges(ids: list[str] | None = None) list[tuple[str, str]]#
static format_single_result(case: TestCase) tuple[Any, ...]#
put_result(case: TestCase) None#
put_results(*cases: TestCase) None#

Store results in the DB.

Since canary uses hierarchical parallelism, this function can be called by many independent processes at once, resulting in some callers hitting a locked database. We guard against a locked database by trying multiple times with an exponential backoff between attempts. If we don’t succeed we return False and let the caller decide what to do.

If writing to the database results in other types of errors, we re-raise those.

get_results(ids: list[str] | None = None, include_upstreams: bool = False) dict[str, dict[str, Any]]#
get_result_history(id: str) list#
put_selection(tag: str, specs: list[ResolvedSpec], **meta: Any) None#
rename_selection(old: str, new: str) None#
get_selection_metadata(tag: str) dict[str, Any]#
property tags: list[str]#
is_selection(tag: str) bool#
delete_selection(tag: str) bool#
get_updownstream_ids(seeds: list[str] | None = None) tuple[set[str], set[str]]#
get_downstream_ids(seeds: Iterable[str]) set[str]#

Return dependencies in instantiation order.

get_upstream_ids(seeds: Iterable[str]) set[str]#

Return dependents in reverse instantiation order.

get_dependency_graph() dict[str, list[str]]#

Return the entire dependency graph, including disconnected nodes. Every spec appears, standalone nodes have dep_id=None (empty list).

get_partial_specs(*, tag: str | None = None) list[PartialSpec]#
select_from_view(prefixes: list[str]) list[str]#

Return spec IDs whose view matches ANY of the provided glob patterns.

view is stored as a TestResults-relative path, e.g.:

foo/bar/test_case.py

class ResultListener(db: WorkspaceDatabase, poll_interval: float = 0.05)#

Bases: Thread

Watches a spool directory for JSON test results and writes them to SQLite in batches.

run()#

Main thread loop.

stop_and_join()#

Stop listener and wait for thread to finish.

class PartialSpec(id: str, file: pathlib.Path, view: str, result_category: str, result_status: str, started_at: float)#

Bases: object

id: str#
file: Path#
view: str#
result_category: str#
result_status: str#
started_at: float#
increment_hex_prefix(prefix: str) str | None#
is_operation_error(e: BaseException) bool#
exception NotASelection(tag)#

Bases: Exception