Skip to content

Services

registry

Gateway registry backed by SQLAlchemy.

GatewayRegistry

GatewayRegistry(session_factory: sessionmaker[Session])

CRUD and health tracking for registered gateways.

Parameters:

Name Type Description Default
session_factory sessionmaker[Session]

SQLAlchemy session factory for database access.

required

register

register(
    name: str,
    endpoint: str,
    scheme: str = "https",
    auth_mode: str | None = "mtls",
    *,
    ca_cert: bytes | None = None,
    client_cert: bytes | None = None,
    client_key: bytes | None = None,
    metadata: dict[str, Any] | None = None,
    description: str | None = None,
    labels: dict[str, str] | None = None,
) -> dict[str, Any]

Register a new gateway.

Parameters:

Name Type Description Default
name str

Unique gateway name.

required
endpoint str

Gateway endpoint address.

required
scheme str

Connection scheme (e.g. "https").

'https'
auth_mode str | None

Authentication mode (e.g. "mtls").

'mtls'
ca_cert bytes | None

CA certificate bytes for TLS.

None
client_cert bytes | None

Client certificate bytes for mTLS.

None
client_key bytes | None

Client private key bytes for mTLS.

None
metadata dict[str, Any] | None

Optional metadata dict.

None
description str | None

Optional free-text description.

None
labels dict[str, str] | None

Optional key-value labels for filtering.

None

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The registered gateway record.

Raises:

Type Description
ConflictError

If a gateway with the given name already exists.

unregister

unregister(name: str) -> bool

Remove a gateway.

Parameters:

Name Type Description Default
name str

Gateway name to unregister.

required

Returns:

Name Type Description
bool bool

True if the gateway existed and was removed.

Raises:

Type Description
SQLAlchemyError

If the commit fails.

get

get(name: str) -> dict[str, Any] | None

Return a single gateway or None.

Parameters:

Name Type Description Default
name str

Gateway name.

required

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: Gateway record, or None if not found.

Raises:

Type Description
SQLAlchemyError

If the query fails.

list_all

list_all(
    *, labels_filter: dict[str, str] | None = None
) -> list[dict[str, Any]]

Return all registered gateways, optionally filtered by labels.

Parameters:

Name Type Description Default
labels_filter dict[str, str] | None

If provided, only return gateways whose labels contain all specified key-value pairs.

None

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: Gateway records ordered by name.

Raises:

Type Description
SQLAlchemyError

If the query fails.

update_health

update_health(
    name: str, status: str, last_seen: datetime
) -> None

Update health status and last-seen timestamp.

Parameters:

Name Type Description Default
name str

Gateway name.

required
status str

New health status string.

required
last_seen datetime

Timestamp of the health check.

required

Raises:

Type Description
SQLAlchemyError

If the commit fails.

update_metadata

update_metadata(
    name: str, metadata: dict[str, Any]
) -> None

Replace the metadata JSON blob.

Parameters:

Name Type Description Default
name str

Gateway name.

required
metadata dict[str, Any]

New metadata dict to store.

required

Raises:

Type Description
SQLAlchemyError

If the commit fails.

update_gateway_metadata

update_gateway_metadata(
    name: str,
    *,
    description: str | None | object = _UNSET,
    labels: dict[str, str] | None | object = _UNSET,
) -> dict[str, Any] | None

Update description and/or labels for a gateway.

Parameters:

Name Type Description Default
name str

Gateway name.

required
description str | None | object

New description, None to clear, or _UNSET to skip.

_UNSET
labels dict[str, str] | None | object

New labels dict, None to clear, or _UNSET to skip.

_UNSET

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: Updated gateway dict, or None if not found.

Raises:

Type Description
SQLAlchemyError

If the commit fails.

get_credentials

get_credentials(
    name: str,
) -> dict[str, str | bytes | None] | None

Return raw cert bytes for a gateway (for connection logic only).

Parameters:

Name Type Description Default
name str

Gateway name.

required

Returns:

Type Description
dict[str, str | bytes | None] | None

dict[str, str | bytes | None] | None: Credential dict, or None if not found.

Raises:

Type Description
SQLAlchemyError

If the query fails.

gateway

Gateway connection management and registry-backed discovery.

GatewayService

GatewayService(registry: GatewayRegistry)

Gateway connection management and registry-backed discovery.

Handles gRPC client connections, backoff, health probing, and gateway registration/unregistration.

Parameters:

Name Type Description Default
registry GatewayRegistry

Gateway registry for persistence.

required

Attributes:

Name Type Description
registry GatewayRegistry

The underlying gateway registry.

registry property

registry: GatewayRegistry

The underlying gateway registry.

get_client

get_client(name: str) -> ShoreGuardClient

Return a client for the given gateway, attempting reconnect with backoff.

Parameters:

Name Type Description Default
name str

Gateway name.

required

Returns:

Name Type Description
ShoreGuardClient ShoreGuardClient

Connected gRPC client.

Raises:

Type Description
GatewayNotConnectedError

If connection fails.

set_client

set_client(
    client: ShoreGuardClient | None, name: str
) -> None

Set or clear a client for the given gateway.

Parameters:

Name Type Description Default
client ShoreGuardClient | None

Client to cache, or None to clear.

required
name str

Gateway name.

required

reset_backoff

reset_backoff(name: str) -> None

Reset connection backoff for a gateway.

Parameters:

Name Type Description Default
name str

Gateway name.

required

register

register(
    name: str,
    endpoint: str,
    scheme: str = "https",
    auth_mode: str | None = "mtls",
    *,
    ca_cert: bytes | None = None,
    client_cert: bytes | None = None,
    client_key: bytes | None = None,
    metadata: dict[str, Any] | None = None,
    description: str | None = None,
    labels: dict[str, str] | None = None,
) -> dict[str, Any]

Register a gateway and attempt initial connection.

Parameters:

Name Type Description Default
name str

Unique gateway name.

required
endpoint str

Gateway endpoint address.

required
scheme str

Connection scheme (e.g. "https").

'https'
auth_mode str | None

Authentication mode (e.g. "mtls").

'mtls'
ca_cert bytes | None

CA certificate bytes for TLS.

None
client_cert bytes | None

Client certificate bytes for mTLS.

None
client_key bytes | None

Client private key bytes for mTLS.

None
metadata dict[str, Any] | None

Optional metadata dict.

None
description str | None

Optional free-text description.

None
labels dict[str, str] | None

Optional key-value labels for filtering.

None

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Gateway record with connection status.

unregister

unregister(name: str) -> bool

Unregister a gateway and close its connection.

Parameters:

Name Type Description Default
name str

Gateway name.

required

Returns:

Name Type Description
bool bool

True if the gateway existed and was removed.

test_connection

test_connection(name: str) -> dict[str, Any]

Explicitly test connectivity to a registered gateway.

Parameters:

Name Type Description Default
name str

Gateway name.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Connection test result.

Raises:

Type Description
NotFoundError

If the gateway is not registered.

update_gateway_metadata

update_gateway_metadata(
    name: str,
    *,
    description: str | None | object = _UNSET,
    labels: dict[str, str] | None | object = _UNSET,
) -> dict[str, Any]

Update description and/or labels for a gateway.

Parameters:

Name Type Description Default
name str

Gateway name.

required
description str | None | object

New description, None to clear, or sentinel to skip.

_UNSET
labels dict[str, str] | None | object

New labels dict, None to clear, or sentinel to skip.

_UNSET

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Updated gateway record.

Raises:

Type Description
NotFoundError

If the gateway does not exist.

list_all

list_all(
    *, labels_filter: dict[str, str] | None = None
) -> list[dict[str, Any]]

List all registered gateways with cached connection status.

Uses the cached client state instead of live health probes to avoid N+1 blocking gRPC calls. The background health monitor keeps last_status up-to-date.

Parameters:

Name Type Description Default
labels_filter dict[str, str] | None

If provided, only return gateways matching all specified label key-value pairs.

None

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: Gateway records with connection status.

get_info

get_info(name: str) -> dict[str, Any]

Get detailed info for a gateway.

Parameters:

Name Type Description Default
name str

Gateway name.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Detailed gateway information.

get_config

get_config(name: str) -> dict[str, Any]

Fetch the gateway configuration via gRPC.

Parameters:

Name Type Description Default
name str

Gateway name.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Gateway configuration.

check_all_health

check_all_health() -> None

Probe all registered gateways and update their health in the registry.

get_cached_client

get_cached_client(name: str) -> ShoreGuardClient | None

Return the cached client for a gateway, or None if not connected.

Parameters:

Name Type Description Default
name str

Gateway name.

required

Returns:

Type Description
ShoreGuardClient | None

ShoreGuardClient | None: Cached client, or None.

sandbox

Sandbox lifecycle operations including create-with-presets workflow.

SandboxService

SandboxService(
    client: ShoreGuardClient,
    meta_store: SandboxMetaStore | None = None,
)

Sandbox operations shared by Web UI and TUI.

Provides higher-level workflows like create-with-presets that were previously implemented in browser JS.

Parameters:

Name Type Description Default
client ShoreGuardClient

OpenShell gRPC client instance.

required
meta_store SandboxMetaStore | None

Optional metadata store for labels/description.

None

list

list(
    *,
    limit: int = 100,
    offset: int = 0,
    gateway_name: str | None = None,
    labels_filter: dict[str, str] | None = None,
) -> list[dict[str, Any]]

List all sandboxes with merged metadata.

Parameters:

Name Type Description Default
limit int

Maximum number of sandboxes to return.

100
offset int

Number of sandboxes to skip.

0
gateway_name str | None

Gateway name for metadata lookup.

None
labels_filter dict[str, str] | None

Filter sandboxes by label key-value pairs.

None

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: Sandbox records with metadata.

get

get(
    name: str, *, gateway_name: str | None = None
) -> dict[str, Any]

Get a sandbox by name with merged metadata.

Parameters:

Name Type Description Default
name str

Sandbox name.

required
gateway_name str | None

Gateway name for metadata lookup.

None

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Sandbox record with metadata.

delete

delete(
    name: str, *, gateway_name: str | None = None
) -> bool

Delete a sandbox by name and clean up metadata.

Parameters:

Name Type Description Default
name str

Sandbox name.

required
gateway_name str | None

Gateway name for metadata cleanup.

None

Returns:

Name Type Description
bool bool

True if the sandbox was deleted.

exec

exec(
    name: str,
    command: str | list[str],
    *,
    workdir: str = "",
    env: dict[str, str] | None = None,
    timeout_seconds: int = 0,
) -> dict[str, Any]

Execute a command inside a sandbox.

Accepts command as a raw string (parsed with shlex) or list.

Parameters:

Name Type Description Default
name str

Sandbox name.

required
command str | list[str]

Command as a string or list of arguments.

required
workdir str

Working directory inside the sandbox.

''
env dict[str, str] | None

Environment variables to set.

None
timeout_seconds int

Execution timeout (0 for no timeout).

0

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Execution result with stdout, stderr, exit code.

Raises:

Type Description
ValidationError

If the command string has invalid syntax.

get_logs

get_logs(
    name: str,
    *,
    lines: int = 200,
    since_ms: int = 0,
    sources: list[str] | None = None,
    min_level: str = "",
) -> list[dict[str, Any]]

Fetch recent logs from a sandbox.

Parameters:

Name Type Description Default
name str

Sandbox name.

required
lines int

Maximum number of log lines to return.

200
since_ms int

Only return logs after this epoch millisecond timestamp.

0
sources list[str] | None

Filter by log source names.

None
min_level str

Minimum log level filter.

''

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: Log entries.

create_ssh_session

create_ssh_session(name: str) -> dict[str, Any]

Create an SSH session for a sandbox, resolving name to ID.

Parameters:

Name Type Description Default
name str

Sandbox name.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: SSH session details including token.

revoke_ssh_session

revoke_ssh_session(token: str) -> bool

Revoke an active SSH session by token.

Parameters:

Name Type Description Default
token str

Session token to revoke.

required

Returns:

Name Type Description
bool bool

True if the session was revoked.

update_metadata

update_metadata(
    gateway_name: str,
    name: str,
    *,
    description: str | None | object = _UNSET,
    labels: dict[str, str] | None | object = _UNSET,
) -> dict[str, Any]

Update labels and/or description for a sandbox.

Parameters:

Name Type Description Default
gateway_name str

Name of the gateway.

required
name str

Sandbox name.

required
description str | None | object

New description (or _UNSET to skip).

_UNSET
labels dict[str, str] | None | object

New labels (or _UNSET to skip).

_UNSET

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Updated sandbox record with metadata.

Raises:

Type Description
RuntimeError

If no meta store is configured.

create

create(
    *,
    name: str = "",
    image: str = "",
    gpu: bool = False,
    providers: list[str] | None = None,
    environment: dict[str, str] | None = None,
    presets: list[str] | None = None,
    gateway_name: str | None = None,
    description: str | None = None,
    labels: dict[str, str] | None = None,
) -> dict[str, Any]

Create a sandbox and optionally apply presets.

This replaces the multi-step wizard polling loop that was in the browser JS. The entire workflow happens server-side: 1. Create sandbox 2. Wait for ready state 3. Wait for initial policy 4. Apply presets sequentially

Parameters:

Name Type Description Default
name str

Sandbox name (empty for auto-generated).

''
image str

Container image to use.

''
gpu bool

Whether to enable GPU support.

False
providers list[str] | None

Provider names to attach.

None
environment dict[str, str] | None

Environment variables to set.

None
presets list[str] | None

Policy presets to apply after creation.

None
gateway_name str | None

Gateway name for metadata storage.

None
description str | None

Optional sandbox description.

None
labels dict[str, str] | None

Optional sandbox labels.

None

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Created sandbox record with preset status.

policy

Policy operations including atomic network rule CRUD.

PolicyService

PolicyService(client: ShoreGuardClient)

Policy management shared by Web UI and TUI.

Wraps client.policies with higher-level operations like atomic add/delete of individual network rules.

Parameters:

Name Type Description Default
client ShoreGuardClient

OpenShell gRPC client instance.

required

get

get(sandbox_name: str) -> dict[str, Any]

Get the current active policy for a sandbox.

Parameters:

Name Type Description Default
sandbox_name str

Name of the sandbox.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Policy data.

update

update(
    sandbox_name: str, policy_dict: dict
) -> dict[str, Any]

Push a new policy version and return the full PolicyResponse.

Parameters:

Name Type Description Default
sandbox_name str

Name of the sandbox.

required
policy_dict dict

Policy content as a dict.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Updated policy response.

get_version

get_version(
    sandbox_name: str, version: int
) -> dict[str, Any]

Get a specific policy revision by version number.

Parameters:

Name Type Description Default
sandbox_name str

Name of the sandbox.

required
version int

Revision version number.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Policy revision data.

diff_revisions

diff_revisions(
    sandbox_name: str, version_a: int, version_b: int
) -> dict[str, Any]

Fetch two revisions and return both for client-side diffing.

Parameters:

Name Type Description Default
sandbox_name str

Name of the sandbox.

required
version_a int

First revision version number.

required
version_b int

Second revision version number.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Both policy revisions for comparison.

list_revisions

list_revisions(
    sandbox_name: str, *, limit: int = 20, offset: int = 0
) -> list[dict[str, Any]]

List policy revision history.

Parameters:

Name Type Description Default
sandbox_name str

Name of the sandbox.

required
limit int

Maximum number of revisions to return.

20
offset int

Number of revisions to skip.

0

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: Revision history entries.

apply_preset

apply_preset(
    sandbox_name: str, preset_name: str
) -> dict[str, Any]

Apply a policy preset to a sandbox (merges network_policies).

Parameters:

Name Type Description Default
sandbox_name str

Name of the sandbox.

required
preset_name str

Name of the preset to apply.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Updated policy response.

Raises:

Type Description
NotFoundError

If the preset name is not found.

add_network_rule

add_network_rule(
    sandbox_name: str, key: str, rule: dict[str, Any]
) -> dict[str, Any]

Add or update a single network rule (read-modify-write).

Parameters:

Name Type Description Default
sandbox_name str

Name of the sandbox.

required
key str

Network rule key.

required
rule dict[str, Any]

Rule definition dict.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Updated policy response.

delete_network_rule

delete_network_rule(
    sandbox_name: str, key: str
) -> dict[str, Any]

Delete a single network rule (read-modify-write).

Parameters:

Name Type Description Default
sandbox_name str

Name of the sandbox.

required
key str

Network rule key to remove.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Updated policy response.

add_filesystem_path

add_filesystem_path(
    sandbox_name: str, path: str, access: str
) -> dict[str, Any]

Add a filesystem path (read-modify-write).

Parameters:

Name Type Description Default
sandbox_name str

Name of the sandbox.

required
path str

Filesystem path to add.

required
access str

Access mode ("ro" or "rw").

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Updated policy response.

delete_filesystem_path

delete_filesystem_path(
    sandbox_name: str, path: str
) -> dict[str, Any]

Delete a filesystem path (read-modify-write).

Parameters:

Name Type Description Default
sandbox_name str

Name of the sandbox.

required
path str

Filesystem path to remove.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Updated policy response.

update_process_policy

update_process_policy(
    sandbox_name: str,
    *,
    run_as_user: str | None = None,
    run_as_group: str | None = None,
    landlock_compatibility: str | None = None,
) -> dict[str, Any]

Update process and landlock settings (read-modify-write).

Parameters:

Name Type Description Default
sandbox_name str

Name of the sandbox.

required
run_as_user str | None

User to run processes as.

None
run_as_group str | None

Group to run processes as.

None
landlock_compatibility str | None

Landlock compatibility level.

None

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Updated policy response.

approvals

Draft policy approval flow service.

ApprovalService

ApprovalService(client: ShoreGuardClient)

Draft policy approval operations shared by Web UI and TUI.

Parameters:

Name Type Description Default
client ShoreGuardClient

OpenShell gRPC client instance.

required

get_draft

get_draft(
    sandbox_name: str, *, status_filter: str = ""
) -> dict[str, Any]

Get draft policy recommendations for a sandbox.

Parameters:

Name Type Description Default
sandbox_name str

Name of the sandbox.

required
status_filter str

Optional status to filter by.

''

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Draft policy data.

get_pending

get_pending(sandbox_name: str) -> list[dict[str, Any]]

Get only pending (unapproved) draft chunks.

Parameters:

Name Type Description Default
sandbox_name str

Name of the sandbox.

required

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: Pending draft chunks.

approve

approve(sandbox_name: str, chunk_id: str) -> dict[str, Any]

Approve a single draft policy chunk.

Parameters:

Name Type Description Default
sandbox_name str

Name of the sandbox.

required
chunk_id str

Identifier of the chunk to approve.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Updated chunk data.

reject

reject(
    sandbox_name: str, chunk_id: str, *, reason: str = ""
) -> None

Reject a single draft policy chunk.

Parameters:

Name Type Description Default
sandbox_name str

Name of the sandbox.

required
chunk_id str

Identifier of the chunk to reject.

required
reason str

Optional reason for rejection.

''

approve_all

approve_all(
    sandbox_name: str,
    *,
    include_security_flagged: bool = False,
) -> dict[str, Any]

Approve all pending draft chunks.

Parameters:

Name Type Description Default
sandbox_name str

Name of the sandbox.

required
include_security_flagged bool

Whether to include security-flagged chunks.

False

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Summary of approved chunks.

edit

edit(
    sandbox_name: str, chunk_id: str, proposed_rule: dict
) -> None

Edit a pending draft chunk's proposed rule.

Parameters:

Name Type Description Default
sandbox_name str

Name of the sandbox.

required
chunk_id str

Identifier of the chunk to edit.

required
proposed_rule dict

New proposed rule content.

required

undo

undo(sandbox_name: str, chunk_id: str) -> dict[str, Any]

Reverse an approval decision.

Parameters:

Name Type Description Default
sandbox_name str

Name of the sandbox.

required
chunk_id str

Identifier of the chunk to undo.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Updated chunk data.

clear

clear(sandbox_name: str) -> dict[str, int]

Clear all pending draft chunks for a sandbox.

Parameters:

Name Type Description Default
sandbox_name str

Name of the sandbox.

required

Returns:

Type Description
dict[str, int]

dict[str, int]: Count of cleared chunks.

get_history

get_history(sandbox_name: str) -> list[dict[str, Any]]

Get decision history for a sandbox's draft policy.

Parameters:

Name Type Description Default
sandbox_name str

Name of the sandbox.

required

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: Decision history entries.

operations

DB-backed service for tracking long-running operations.

OperationService

OperationService(
    session_factory: sessionmaker[Session],
    running_ttl: float = 600.0,
    retention_days: int = 30,
)

DB-backed operation tracking with in-memory task registry for cancellation.

Parameters:

Name Type Description Default
session_factory sessionmaker[Session]

SQLAlchemy session factory for database access.

required
running_ttl float

Seconds before a stuck running operation is timed out.

600.0
retention_days int

Days to keep completed operations before cleanup.

30

create

create(
    resource_type: str,
    resource_key: str,
    *,
    actor: str | None = None,
    gateway_name: str | None = None,
    idempotency_key: str | None = None,
) -> OperationRecord

Create a new operation in pending state.

Parameters:

Name Type Description Default
resource_type str

Type of resource being operated on.

required
resource_key str

Name of the resource.

required
actor str | None

Identity of the user starting the operation.

None
gateway_name str | None

Gateway the operation targets.

None
idempotency_key str | None

Optional client-provided idempotency key.

None

Returns:

Name Type Description
OperationRecord OperationRecord

The newly created operation record.

create_if_not_running

create_if_not_running(
    resource_type: str,
    resource_key: str,
    *,
    actor: str | None = None,
    gateway_name: str | None = None,
    idempotency_key: str | None = None,
) -> OperationRecord | None

Atomically check for an active operation and create one if none exists.

An operation is considered active if its status is pending, running, or cancelling.

Parameters:

Name Type Description Default
resource_type str

Type of resource being operated on.

required
resource_key str

Name of the resource.

required
actor str | None

Identity of the user starting the operation.

None
gateway_name str | None

Gateway the operation targets.

None
idempotency_key str | None

Optional client-provided idempotency key.

None

Returns:

Type Description
OperationRecord | None

OperationRecord | None: The new operation, or None if one is already active.

start

start(op_id: str) -> None

Transition an operation from pending to running.

Silently ignored if the operation is not in pending state.

Parameters:

Name Type Description Default
op_id str

The operation ID to start.

required

complete

complete(op_id: str, result: dict[str, Any]) -> None

Mark an operation as succeeded with its result.

If the operation is no longer in an active non-terminal state, the call is silently ignored.

Parameters:

Name Type Description Default
op_id str

The operation ID to complete.

required
result dict[str, Any]

Result payload to store.

required

fail

fail(
    op_id: str,
    error: str,
    error_code: str = ErrorCode.internal,
) -> None

Mark an operation as failed with an error message.

Accepts operations in running or cancelling state. If the operation is already in a terminal state, the call is silently ignored.

Parameters:

Name Type Description Default
op_id str

The operation ID to mark as failed.

required
error str

Human-readable error message.

required
error_code str

Machine-readable error code.

internal

update_progress

update_progress(
    op_id: str, pct: int, message: str | None = None
) -> None

Update progress for an active operation.

Parameters:

Name Type Description Default
op_id str

The operation ID to update.

required
pct int

Progress percentage (0-100).

required
message str | None

Optional progress message.

None

get

get(op_id: str) -> OperationRecord | None

Get an operation by ID.

Parameters:

Name Type Description Default
op_id str

The operation ID to look up.

required

Returns:

Type Description
OperationRecord | None

OperationRecord | None: The operation record, or None if not found.

get_by_idempotency_key

get_by_idempotency_key(key: str) -> OperationRecord | None

Look up an operation by its idempotency key.

Parameters:

Name Type Description Default
key str

The idempotency key to search for.

required

Returns:

Type Description
OperationRecord | None

OperationRecord | None: The matching operation record, or None.

list_ops

list_ops(
    *,
    status: str | None = None,
    resource_type: str | None = None,
    limit: int = 50,
    offset: int = 0,
) -> tuple[list[OperationRecord], int]

List operations with optional filtering.

Parameters:

Name Type Description Default
status str | None

Filter by status.

None
resource_type str | None

Filter by resource type.

None
limit int

Maximum number of results (max 200).

50
offset int

Number of results to skip.

0

Returns:

Type Description
tuple[list[OperationRecord], int]

tuple[list[OperationRecord], int]: Tuple of (operations, total_count).

is_running

is_running(resource_type: str, resource_key: str) -> bool

Check if there is an active operation for the given resource.

Parameters:

Name Type Description Default
resource_type str

Type of resource to check.

required
resource_key str

Name of the resource to check.

required

Returns:

Name Type Description
bool bool

True if an active (pending/running) operation exists.

status_counts

status_counts() -> dict[str, int]

Return counts of operations grouped by status.

Returns:

Type Description
dict[str, int]

dict[str, int]: Mapping of status to count.

register_task

register_task(op_id: str, task: Task[None]) -> None

Register an asyncio task for an operation (enables cancellation).

Parameters:

Name Type Description Default
op_id str

The operation ID.

required
task Task[None]

The asyncio task running the operation.

required

cancel

cancel(op_id: str) -> OperationRecord | None

Cancel an active operation.

Immediately transitions the operation to cancelling state, then attempts to cancel the associated asyncio task. If no task is registered (e.g. after a server restart), the operation is marked as failed directly.

Parameters:

Name Type Description Default
op_id str

The operation ID to cancel.

required

Returns:

Type Description
OperationRecord | None

OperationRecord | None: The updated operation record, or None if

OperationRecord | None

not found or not active.

recover_orphans

recover_orphans() -> int

Mark all active operations as failed (startup recovery).

Called on server startup to handle operations that were in progress when the server was shut down or crashed.

Returns:

Name Type Description
int int

Number of orphaned operations recovered.

cleanup

cleanup() -> int

Expire stuck active operations and remove old completed ones.

Returns:

Name Type Description
int int

Number of operations cleaned up.

to_dict staticmethod

to_dict(op: OperationRecord) -> dict[str, Any]

Convert an operation record to a JSON-serializable dict.

Parameters:

Name Type Description Default
op OperationRecord

The operation record to convert.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: JSON-serializable representation for the API.

AsyncOperationService

AsyncOperationService(
    session_factory: async_sessionmaker[AsyncSession],
    running_ttl: float = 600.0,
    retention_days: int = 30,
)

Async variant of :class:OperationService using async SQLAlchemy.

Shares :meth:to_dict and :meth:register_task behaviour with the sync version. All DB-accessing methods are async.

Attributes:

Name Type Description
to_dict

Static serialisation helper re-used from :class:OperationService.

Parameters:

Name Type Description Default
session_factory async_sessionmaker[AsyncSession]

Async SQLAlchemy session factory.

required
running_ttl float

Seconds before a stuck running operation is timed out.

600.0
retention_days int

Days to keep completed operations before cleanup.

30

create async

create(
    resource_type: str,
    resource_key: str,
    *,
    actor: str | None = None,
    gateway_name: str | None = None,
    idempotency_key: str | None = None,
) -> OperationRecord

Create a new operation in pending state.

Parameters:

Name Type Description Default
resource_type str

Type of resource being operated on.

required
resource_key str

Name of the resource.

required
actor str | None

Identity of the user starting the operation.

None
gateway_name str | None

Gateway the operation targets.

None
idempotency_key str | None

Optional client-provided idempotency key.

None

Returns:

Name Type Description
OperationRecord OperationRecord

The newly created operation record.

create_if_not_running async

create_if_not_running(
    resource_type: str,
    resource_key: str,
    *,
    actor: str | None = None,
    gateway_name: str | None = None,
    idempotency_key: str | None = None,
) -> OperationRecord | None

Create a new operation if none is active for this resource.

Parameters:

Name Type Description Default
resource_type str

Type of resource being operated on.

required
resource_key str

Name of the resource.

required
actor str | None

Identity of the user starting the operation.

None
gateway_name str | None

Gateway the operation targets.

None
idempotency_key str | None

Optional client-provided idempotency key.

None

Returns:

Type Description
OperationRecord | None

OperationRecord | None: The new operation, or None if one is already active.

start async

start(op_id: str) -> None

Transition pending → running.

Parameters:

Name Type Description Default
op_id str

The operation ID to start.

required

complete async

complete(op_id: str, result: dict[str, Any]) -> None

Mark an operation as succeeded.

Parameters:

Name Type Description Default
op_id str

The operation ID to complete.

required
result dict[str, Any]

Result payload to store.

required

fail async

fail(
    op_id: str,
    error: str,
    error_code: str = ErrorCode.internal,
) -> None

Mark an operation as failed.

Parameters:

Name Type Description Default
op_id str

The operation ID to mark as failed.

required
error str

Human-readable error message.

required
error_code str

Machine-readable error code.

internal

update_progress async

update_progress(
    op_id: str, pct: int, message: str | None = None
) -> None

Update progress for an active operation.

Parameters:

Name Type Description Default
op_id str

The operation ID to update.

required
pct int

Progress percentage (0-100).

required
message str | None

Optional progress message.

None

get async

get(op_id: str) -> OperationRecord | None

Get an operation by ID.

Parameters:

Name Type Description Default
op_id str

The operation ID to look up.

required

Returns:

Type Description
OperationRecord | None

OperationRecord | None: The operation record, or None if not found.

get_by_idempotency_key async

get_by_idempotency_key(key: str) -> OperationRecord | None

Look up an operation by its idempotency key.

Parameters:

Name Type Description Default
key str

The idempotency key to search for.

required

Returns:

Type Description
OperationRecord | None

OperationRecord | None: The matching operation record, or None.

list_ops async

list_ops(
    *,
    status: str | None = None,
    resource_type: str | None = None,
    limit: int = 50,
    offset: int = 0,
) -> tuple[list[OperationRecord], int]

List operations with optional filtering.

Parameters:

Name Type Description Default
status str | None

Filter by status.

None
resource_type str | None

Filter by resource type.

None
limit int

Maximum number of results (max 200).

50
offset int

Number of results to skip.

0

Returns:

Type Description
tuple[list[OperationRecord], int]

tuple[list[OperationRecord], int]: Tuple of (operations, total_count).

is_running async

is_running(resource_type: str, resource_key: str) -> bool

Check if there is an active operation for the given resource.

Parameters:

Name Type Description Default
resource_type str

Type of resource to check.

required
resource_key str

Name of the resource to check.

required

Returns:

Name Type Description
bool bool

True if an active (pending/running) operation exists.

status_counts async

status_counts() -> dict[str, int]

Return counts of operations grouped by status.

Returns:

Type Description
dict[str, int]

dict[str, int]: Mapping of status to count.

register_task

register_task(op_id: str, task: Task[None]) -> None

Register an asyncio task for cancellation support.

Parameters:

Name Type Description Default
op_id str

The operation ID.

required
task Task[None]

The asyncio task running the operation.

required

cancel async

cancel(op_id: str) -> OperationRecord | None

Cancel an active operation.

Parameters:

Name Type Description Default
op_id str

The operation ID to cancel.

required

Returns:

Type Description
OperationRecord | None

OperationRecord | None: The updated operation record, or None if

OperationRecord | None

not found or not active.

recover_orphans async

recover_orphans() -> int

Mark all active operations as failed (startup recovery).

Returns:

Name Type Description
int int

Number of orphaned operations recovered.

cleanup async

cleanup() -> int

Expire stuck active operations and remove old completed ones.

Returns:

Name Type Description
int int

Number of operations cleaned up.

providers

Provider management with metadata from openshell.yaml.

ProviderService

ProviderService(client: ShoreGuardClient)

Provider management shared by Web UI and TUI.

Parameters:

Name Type Description Default
client ShoreGuardClient

OpenShell gRPC client instance.

required

list

list(
    *, limit: int = 100, offset: int = 0
) -> list[dict[str, Any]]

List all providers.

Parameters:

Name Type Description Default
limit int

Maximum number of providers to return.

100
offset int

Number of providers to skip.

0

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: Provider records.

get

get(name: str) -> dict[str, Any]

Get a provider by name.

Parameters:

Name Type Description Default
name str

Provider name.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Provider record.

create

create(
    *,
    name: str,
    provider_type: str,
    api_key: str,
    extra_credentials: dict[str, str] | None = None,
    config: dict[str, str] | None = None,
) -> dict[str, Any]

Create a provider with automatic credential key mapping.

Looks up the correct environment variable name from openshell.yaml. Falls back to API_KEY for unknown types.

Parameters:

Name Type Description Default
name str

Provider name.

required
provider_type str

Provider type identifier.

required
api_key str

Primary API key value.

required
extra_credentials dict[str, str] | None

Additional credential key-value pairs.

None
config dict[str, str] | None

Optional provider configuration.

None

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The created provider record.

update

update(
    *,
    name: str,
    provider_type: str = "",
    credentials: dict[str, str] | None = None,
    config: dict[str, str] | None = None,
) -> dict[str, Any]

Update an existing provider.

Parameters:

Name Type Description Default
name str

Provider name.

required
provider_type str

New provider type (empty string to keep current).

''
credentials dict[str, str] | None

New credential key-value pairs.

None
config dict[str, str] | None

New provider configuration.

None

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The updated provider record.

delete

delete(name: str) -> bool

Delete a provider.

Parameters:

Name Type Description Default
name str

Provider name.

required

Returns:

Name Type Description
bool bool

True if the provider was deleted.

list_known_types staticmethod

list_known_types() -> list[dict[str, str]]

Return metadata about known provider types from openshell.yaml.

Returns:

Type Description
list[dict[str, str]]

list[dict[str, str]]: Provider type metadata records.

list_inference_providers staticmethod

list_inference_providers() -> list[dict[str, str]]

Return known inference provider options from openshell.yaml.

Returns:

Type Description
list[dict[str, str]]

list[dict[str, str]]: Inference provider option records.

list_community_sandboxes staticmethod

list_community_sandboxes() -> list[dict[str, Any]]

Return community sandbox templates from openshell.yaml.

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: Community sandbox template records.

audit

Persistent audit log for state-changing operations.

AuditIntegrityError

Bases: RuntimeError

Raised when code tries to mutate or delete an AuditEntry illegally.

AuditService

AuditService(session_factory: sessionmaker)

DB-backed audit trail for all state-changing operations.

Parameters:

Name Type Description Default
session_factory sessionmaker

SQLAlchemy session factory for database access.

required

log

log(
    *,
    actor: str,
    actor_role: str,
    action: str,
    resource_type: str,
    resource_id: str = "",
    gateway: str | None = None,
    detail: dict[str, Any] | None = None,
    client_ip: str | None = None,
) -> None

Write an audit entry. Never raises -- failures are logged and swallowed.

Parameters:

Name Type Description Default
actor str

Identity of the user performing the action.

required
actor_role str

Role of the actor (e.g. "admin", "viewer").

required
action str

Action being performed (e.g. "create", "delete").

required
resource_type str

Type of resource affected.

required
resource_id str

Identifier of the affected resource.

''
gateway str | None

Optional gateway name for scoping.

None
detail dict[str, Any] | None

Optional structured detail payload.

None
client_ip str | None

IP address of the client.

None

list

list(
    *,
    limit: int = 100,
    offset: int = 0,
    actor: str | None = None,
    action: str | None = None,
    resource_type: str | None = None,
    since: str | None = None,
    until: str | None = None,
) -> list[dict[str, Any]]

Query audit entries with optional filters and pagination.

Parameters:

Name Type Description Default
limit int

Maximum number of entries to return.

100
offset int

Number of entries to skip.

0
actor str | None

Filter by actor identity.

None
action str | None

Filter by action type.

None
resource_type str | None

Filter by resource type.

None
since str | None

ISO-format start timestamp filter.

None
until str | None

ISO-format end timestamp filter.

None

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: Matching audit entries.

list_with_count

list_with_count(
    *,
    limit: int = 100,
    offset: int = 0,
    actor: str | None = None,
    action: str | None = None,
    resource_type: str | None = None,
    since: str | None = None,
    until: str | None = None,
) -> tuple[list[dict[str, Any]], int]

Query audit entries with total count for pagination.

Parameters:

Name Type Description Default
limit int

Maximum number of entries to return.

100
offset int

Number of entries to skip.

0
actor str | None

Filter by actor identity.

None
action str | None

Filter by action type.

None
resource_type str | None

Filter by resource type.

None
since str | None

ISO-format start timestamp filter.

None
until str | None

ISO-format end timestamp filter.

None

Returns:

Type Description
tuple[list[dict[str, Any]], int]

tuple[list[dict[str, Any]], int]: Entries and total matching count.

export_csv

export_csv(
    *,
    actor: str | None = None,
    action: str | None = None,
    resource_type: str | None = None,
    since: str | None = None,
    until: str | None = None,
) -> str

Export audit entries as a CSV string.

Parameters:

Name Type Description Default
actor str | None

Filter by actor identity.

None
action str | None

Filter by action type.

None
resource_type str | None

Filter by resource type.

None
since str | None

ISO-format start timestamp filter.

None
until str | None

ISO-format end timestamp filter.

None

Returns:

Name Type Description
str str

CSV-formatted string of matching entries.

cleanup

cleanup(older_than_days: int | None = None) -> int

Delete audit entries older than the given number of days.

Uses row-by-row deletion (not Query.delete()) so that the before_delete listener fires and the ContextVar bypass is honoured. Retention cleanup runs once per cleanup_interval so the slight extra cost is negligible.

Parameters:

Name Type Description Default
older_than_days int | None

Age threshold in days.

None

Returns:

Name Type Description
int int

Number of entries deleted.

export_json

export_json(
    *,
    actor: str | None = None,
    action: str | None = None,
    resource_type: str | None = None,
    since: str | None = None,
    until: str | None = None,
) -> str

Export audit entries as a JSON array string.

Parameters:

Name Type Description Default
actor str | None

Filter by actor identity.

None
action str | None

Filter by action type.

None
resource_type str | None

Filter by resource type.

None
since str | None

ISO-format start timestamp filter.

None
until str | None

ISO-format end timestamp filter.

None

Returns:

Name Type Description
str str

JSON-formatted list of matching entries.

audit_log async

audit_log(
    request: Request,
    action: str,
    resource_type: str,
    resource_id: str = "",
    *,
    gateway: str | None = None,
    detail: dict[str, Any] | None = None,
) -> None

Route-handler helper that extracts actor/role/IP from the request.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request.

required
action str

Action being performed.

required
resource_type str

Type of resource affected.

required
resource_id str

Identifier of the affected resource.

''
gateway str | None

Optional gateway name for scoping.

None
detail dict[str, Any] | None

Optional structured detail payload.

None

local_gateway

Local gateway lifecycle — Docker containers, openshell CLI, port management.

Only used when SHOREGUARD_LOCAL_MODE=1 or for backward-compatible v0.2 workflows.

LocalGatewayManager

LocalGatewayManager(gateway_service: GatewayService)

Docker and openshell CLI lifecycle for locally-managed gateways.

Parameters:

Name Type Description Default
gateway_service GatewayService

Gateway service for connection management.

required

diagnostics

diagnostics() -> dict[str, Any]

Check Docker availability, daemon status, and permissions.

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Diagnostic information about Docker and openshell.

start

start(name: str) -> dict[str, Any]

Start a gateway by name.

Parameters:

Name Type Description Default
name str

Gateway name.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Result with success status and output or error.

stop

stop(name: str) -> dict[str, Any]

Stop a gateway by name.

Parameters:

Name Type Description Default
name str

Gateway name.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Result with success status and output or error.

restart

restart(name: str) -> dict[str, Any]

Restart a gateway (stop + start).

Parameters:

Name Type Description Default
name str

Gateway name.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Result from the start step.

create

create(
    name: str,
    port: int | None = None,
    *,
    remote_host: str | None = None,
    gpu: bool = False,
) -> dict[str, Any]

Create a new gateway via openshell CLI.

Parameters:

Name Type Description Default
name str

Gateway name.

required
port int | None

Port number, or None for auto-selection.

None
remote_host str | None

Remote host for the gateway.

None
gpu bool

Whether to enable GPU support.

False

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Gateway info or error result.

destroy

destroy(
    name: str, *, force: bool = False
) -> dict[str, Any]

Destroy a gateway and remove its configuration.

Parameters:

Name Type Description Default
name str

Gateway name.

required
force bool

Force destruction even if resources still exist.

False

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Result with success status.