Skip to content

Execute Stage

The execute stage runs queries and retrieves data for compiled datafaces.

CompiledFace + Variables → Executor → Query Results (List[Dict])

Entry Points

Executor

The main class for executing queries:

Executor

Executor(face: CompiledFace, adapter_registry: Any | None = None, query_registry: dict[str, CompiledQuery] | None = None, project_root: Path | None = None, use_cache: bool = True, duckdb_cache: QueryResultCache | None = None)

Executes queries for datafaces.

Stage: EXECUTE (Service Module)

The executor manages query execution for a compiled dataface. It handles: - Query lookup from the face - Adapter selection based on query type - Result caching for efficiency - Variable substitution

Does NOT: - Compile datafaces (use compile module) - Render charts (use render module)

ATTRIBUTE DESCRIPTION
face

The compiled dataface

adapter_registry

Registry of adapters (optional)

query_registry

Complete query registry (for cross-references)

Example

from dataface.core.compile import compile from dataface.core.execute import Executor

result = compile(yaml_content) executor = Executor(result.face, query_registry=result.query_registry)

Execute a query

data = executor.execute_query("sales", {"year": 2024}) print(data) # [{"date": "2024-01", "amount": 1000}, ...]

Initialize executor.

PARAMETER DESCRIPTION
face

Compiled dataface

TYPE: CompiledFace

adapter_registry

Optional adapter registry (uses default if not provided)

TYPE: Any | None DEFAULT: None

query_registry

Optional query registry for cross-file references

TYPE: dict[str, CompiledQuery] | None DEFAULT: None

project_root

Root directory for resolving relative file paths

TYPE: Path | None DEFAULT: None

use_cache

Default cache behavior for query execution

TYPE: bool DEFAULT: True

duckdb_cache

Optional DuckDBCache for persistent caching (Suite context)

TYPE: QueryResultCache | None DEFAULT: None

Source code in dataface/core/execute/executor.py
def __init__(
    self,
    face: CompiledFace,
    adapter_registry: Any | None = None,
    query_registry: dict[str, CompiledQuery] | None = None,
    project_root: Path | None = None,
    use_cache: bool = True,
    duckdb_cache: "QueryResultCache | None" = None,
):
    """Initialize executor.

    Args:
        face: Compiled dataface
        adapter_registry: Optional adapter registry (uses default if not provided)
        query_registry: Optional query registry for cross-file references
        project_root: Root directory for resolving relative file paths
        use_cache: Default cache behavior for query execution
        duckdb_cache: Optional DuckDBCache for persistent caching (Suite context)
    """
    self.face = face
    self.query_registry = query_registry or {}
    self._cache: dict[str, list[dict[str, Any]]] = {}
    self._descriptions_cache: dict[str, dict[str, tuple]] = {}
    self._provenance_cache: dict[str, list[ResolvedRelation]] = {}
    self._query_errors: dict[str, Exception] = {}
    self._use_cache = use_cache
    self._duckdb_cache = duckdb_cache
    self.project_root = project_root

    # Import adapter registry lazily to avoid circular imports
    if adapter_registry is None:
        from dataface.core.execute.adapters import build_adapter_registry

        self.adapter_registry = build_adapter_registry(project_root or Path.cwd())
    else:
        self.adapter_registry = adapter_registry

execute_query

execute_query(query_name: str, variables: VariableValues | None = None, use_cache: bool | None = None, force_refresh: bool = False) -> list[dict[str, Any]]

Execute a query and return results.

Stage: EXECUTE (Main Entry Point)

Looks up the query by name, resolves the appropriate adapter, executes the query, and returns the data.

PARAMETER DESCRIPTION
query_name

Name of query to execute (may include "queries." prefix)

TYPE: str

variables

Variable values for query resolution

TYPE: VariableValues | None DEFAULT: None

use_cache

Whether to use cached results if available

TYPE: bool | None DEFAULT: None

force_refresh

If True, clear both success and failure caches for this query and re-run it from scratch.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
list[dict[str, Any]]

List of dictionaries with query results (each dict is a row)

RAISES DESCRIPTION
ExecutionError

If query not found or execution fails

CachedQueryFailure

If a cached failure exists within TTL

Example

data = executor.execute_query("sales", {"year": 2024}) for row in data: ... print(f"{row['date']}: {row['amount']}")

Source code in dataface/core/execute/executor.py
def execute_query(
    self,
    query_name: str,
    variables: VariableValues | None = None,
    use_cache: bool | None = None,
    force_refresh: bool = False,
) -> list[dict[str, Any]]:
    """Execute a query and return results.

    Stage: EXECUTE (Main Entry Point)

    Looks up the query by name, resolves the appropriate adapter,
    executes the query, and returns the data.

    Args:
        query_name: Name of query to execute (may include "queries." prefix)
        variables: Variable values for query resolution
        use_cache: Whether to use cached results if available
        force_refresh: If True, clear both success and failure caches
            for this query and re-run it from scratch.

    Returns:
        List of dictionaries with query results (each dict is a row)

    Raises:
        ExecutionError: If query not found or execution fails
        CachedQueryFailure: If a cached failure exists within TTL

    Example:
        >>> data = executor.execute_query("sales", {"year": 2024})
        >>> for row in data:
        ...     print(f"{row['date']}: {row['amount']}")
    """
    # ────────────────────────────────────────────────────────────────
    # Step 1: Normalize query name (strip "queries." prefix)
    # ────────────────────────────────────────────────────────────────
    if query_name.startswith("queries."):
        query_name = query_name[8:]

    # ────────────────────────────────────────────────────────────────
    # Step 1b: Merge variables with face defaults
    # ────────────────────────────────────────────────────────────────
    # Trust the normalizer - use pre-computed variable_defaults
    variable_registry = self.face.variable_registry or {}

    # Merge variables: start with None for all vars, then defaults, then user values
    all_variables: dict[str, Any] = dict.fromkeys(variable_registry)
    all_variables.update(self.face.variable_defaults)  # Pre-computed by normalizer
    # Parse JSON strings in variables (from URL parameters) and merge
    parsed_variables = parse_variable_json_strings(variables or {})
    variables = {**all_variables, **parsed_variables}

    # ────────────────────────────────────────────────────────────────
    # Step 2: Look up query
    # ────────────────────────────────────────────────────────────────
    query = self._get_query(query_name)

    # ────────────────────────────────────────────────────────────────
    # Step 3: Check cache (use instance default if not explicitly set)
    # ────────────────────────────────────────────────────────────────
    should_use_cache = use_cache if use_cache is not None else self._use_cache
    cache_key = self._cache_key(query, variables)

    # force_refresh: clear all caches for this key before running
    if force_refresh:
        self._cache.pop(cache_key, None)
        self._query_errors.pop(query_name, None)
        if self._duckdb_cache:
            from dataface.core.execute.duckdb_cache import (
                compute_query_hash,
                compute_source_hash,
                compute_variables_hash,
            )

            relevant_vars = _get_relevant_variables(query, variables)
            variables_hash = compute_variables_hash(relevant_vars)
            if is_sql_query(query):
                pre_sql = query.sql
                if query.setup_sql:
                    pre_sql += "\n" + query.setup_sql
                q_hash = compute_query_hash(pre_sql)
                source_hash = compute_source_hash(
                    query.source, face_sources=self.face.sources
                )
            else:
                q_hash = compute_query_hash(query.source_description)
                source_hash = compute_source_hash(None)
            self._duckdb_cache.clear(source_hash, q_hash, variables_hash)
    else:
        # Check in-memory cache first (CLI context)
        if should_use_cache and cache_key in self._cache:
            return self._cache[cache_key]

        # ────────────────────────────────────────────────────────────
        # Step 3a: Check for stored pre-execution error
        # ────────────────────────────────────────────────────────────
        if query_name in self._query_errors:
            stored = self._query_errors[query_name]
            raise QueryError(str(stored), query_name) from stored

        # ────────────────────────────────────────────────────────────
        # Step 3b: Check DuckDB failure then success cache
        # ────────────────────────────────────────────────────────────
        if should_use_cache and self._duckdb_cache:
            from dataface.core.execute.cache_backend import CachedQueryFailure
            from dataface.core.execute.duckdb_cache import (
                compute_query_hash,
                compute_source_hash,
                compute_variables_hash,
            )

            relevant_vars = _get_relevant_variables(query, variables)
            variables_hash = compute_variables_hash(relevant_vars)
            if is_sql_query(query):
                pre_sql = query.sql
                if query.setup_sql:
                    pre_sql += "\n" + query.setup_sql
                q_hash = compute_query_hash(pre_sql)
                source_hash = compute_source_hash(
                    query.source, face_sources=self.face.sources
                )
            else:
                q_hash = compute_query_hash(query.source_description)
                source_hash = compute_source_hash(None)

            # Step 3b: failure cache check

            cached_failure = self._duckdb_cache.get_failure(
                source_hash, q_hash, variables_hash
            )
            if isinstance(cached_failure, CachedQueryFailure):
                raise cached_failure

            # Step 3c: Check DuckDB success cache.
            # Key: (source_hash, query_hash, variables_hash) — no face/query noise.
            # A different query_hash (changed SQL) is a miss by definition since the
            # key doesn't match.
            cached_rows = self._duckdb_cache.get(
                source_hash, q_hash, variables_hash
            )
            if cached_rows is not None:
                self._cache[cache_key] = cached_rows  # warm in-memory cache
                return cached_rows

    # ────────────────────────────────────────────────────────────────
    # Step 3d: Check for {{ results.X }} references (Suite context)
    # ────────────────────────────────────────────────────────────────
    if self._has_results_refs(query):
        # {{ results.X }} queries bypass _resolve_query_references,
        # so setup_sql is not supported on this path.
        if is_sql_query(query) and query.setup_sql:
            raise QueryError(
                "setup_sql is not supported on queries using {{ results.X }} references",
                query_name,
            )
        result_data = self._execute_results_query(query, variables)
        if should_use_cache:
            self._cache[cache_key] = result_data
        return result_data

    # ────────────────────────────────────────────────────────────────
    # Step 4: Resolve query references in SQL
    # ────────────────────────────────────────────────────────────────
    query = self._resolve_query_references(query, variables, query_name)

    # ────────────────────────────────────────────────────────────────
    # Step 4b: Resolve named source references to full configs
    # ────────────────────────────────────────────────────────────────
    if is_sql_query(query):
        if isinstance(query.source, str) and query.source in self.face.sources:
            resolved_source = self.face.sources[query.source]
            query = query.model_copy(update={"source": resolved_source})

    # ────────────────────────────────────────────────────────────────
    # Step 5: Execute via adapter
    # ────────────────────────────────────────────────────────────────
    try:
        result = self.adapter_registry.execute(query, variables)
    except Exception as e:
        self._cache_failure_to_duckdb(query_name, query, variables, e)
        raise QueryError(str(e), query_name) from e

    if not result.is_success:
        err = QueryError(result.error or "Unknown error", query_name)
        self._cache_failure_to_duckdb(query_name, query, variables, err)
        raise err

    # ────────────────────────────────────────────────────────────────
    # Step 6: Cache and return
    # ────────────────────────────────────────────────────────────────
    if should_use_cache:
        self._cache[cache_key] = result.data

    # Cache column descriptions from cursor.description for chart decisions
    if result.column_descriptions:
        self._descriptions_cache[query_name] = result.column_descriptions

    # Cache dbt relation provenance for chart schema-status badges
    if result.resolved_relations:
        self._provenance_cache[query_name] = result.resolved_relations

    # Also cache to DuckDB if configured (Suite context)
    if self._duckdb_cache:
        self._cache_to_duckdb(query_name, query, result.data, variables)

    return result.data

execute_chart

execute_chart(chart: CompiledChart | str, variables: VariableValues | None = None, use_cache: bool | None = None) -> list[dict[str, Any]]

Execute the query for a chart.

Convenience method that handles chart → query lookup.

PARAMETER DESCRIPTION
chart

CompiledChart or chart name string

TYPE: CompiledChart | str

variables

Variable values

TYPE: VariableValues | None DEFAULT: None

use_cache

Whether to use cache (defaults to instance setting)

TYPE: bool | None DEFAULT: None

RETURNS DESCRIPTION
list[dict[str, Any]]

Query results for the chart

Example

chart = face.charts["revenue"] data = executor.execute_chart(chart, {"year": 2024})

Source code in dataface/core/execute/executor.py
def execute_chart(
    self,
    chart: CompiledChart | str,
    variables: VariableValues | None = None,
    use_cache: bool | None = None,
) -> list[dict[str, Any]]:
    """Execute the query for a chart.

    Convenience method that handles chart → query lookup.

    Args:
        chart: CompiledChart or chart name string
        variables: Variable values
        use_cache: Whether to use cache (defaults to instance setting)

    Returns:
        Query results for the chart

    Example:
        >>> chart = face.charts["revenue"]
        >>> data = executor.execute_chart(chart, {"year": 2024})
    """
    if isinstance(chart, str):
        if chart not in self.face.charts:
            raise ExecutionError(f"Chart '{chart}' not found")
        chart = self.face.charts[chart]

    # Handle blank charts (no query) - return empty data
    if chart.query_name is None:
        return []

    # Get data (use_cache will fall back to instance default in execute_query)
    data = self.execute_query(chart.query_name, variables, use_cache)

    # Apply chart-level filters if present
    if chart.filters:
        data = self._apply_filters(data, chart.filters, variables)

    return data

Adapters

Adapters connect to different data sources. Each query type has a corresponding adapter.

Base Adapter

BaseAdapter

Bases: ABC

Base interface for query adapters.

All adapters must implement this interface to execute queries against different backends (MetricFlow, SQL, HTTP, CSV, etc.).

The unified interface pattern uses: - supported_types: Property returning set of query types this adapter handles - Type guards for type-safe field access in execute methods

Subclasses must implement
  • supported_types: Property returning Set[str] of supported query types
  • _execute(): Perform the actual query execution
Optional override
  • _can_execute(): Override for custom eligibility logic beyond type matching
Example

class MyAdapter(BaseAdapter): ... @property ... def supported_types(self) -> Set[str]: ... return {"sql"} ... ... def _execute(self, query, variables): ... if is_sql_query(query): ... # Type checker knows query.sql exists ... return self._run_sql(query.sql)

supported_types abstractmethod property

supported_types: set[str]

Query types this adapter can execute.

RETURNS DESCRIPTION
set[str]

Set of query type strings (e.g., {"sql"}, {"csv", "http"})

Example

adapter.supported_types

can_execute

can_execute(query: CompiledQuery) -> bool

Check if this adapter can execute the given query.

Uses supported_types for type-based routing. Override _can_execute() for additional custom eligibility logic.

PARAMETER DESCRIPTION
query

CompiledQuery object (guaranteed by compiler)

TYPE: CompiledQuery

RETURNS DESCRIPTION
bool

True if this adapter can execute the query, False otherwise

Source code in dataface/core/execute/adapters/base.py
def can_execute(self, query: CompiledQuery) -> bool:
    """Check if this adapter can execute the given query.

    Uses supported_types for type-based routing. Override _can_execute()
    for additional custom eligibility logic.

    Args:
        query: CompiledQuery object (guaranteed by compiler)

    Returns:
        True if this adapter can execute the query, False otherwise
    """
    # First check if type is supported
    if query.query_type not in self.supported_types:
        return False
    # Then check any additional conditions
    return self._can_execute(query)

execute

execute(query: CompiledQuery, variables: VariableValues | None = None, params: QueryParams = None) -> QueryResult

Execute a query and return results.

PARAMETER DESCRIPTION
query

CompiledQuery object (guaranteed by compiler)

TYPE: CompiledQuery

variables

Optional dictionary of variable values for query resolution

TYPE: VariableValues | None DEFAULT: None

params

Optional pre-computed parameter values for parameterized execution. When provided, the adapter should use these params directly instead of re-processing variables. This is used by batch execution where parameterization happens before adapter execution.

TYPE: QueryParams DEFAULT: None

RETURNS DESCRIPTION
QueryResult

QueryResult containing data or error information

Source code in dataface/core/execute/adapters/base.py
def execute(
    self,
    query: CompiledQuery,
    variables: VariableValues | None = None,
    params: QueryParams = None,
) -> QueryResult:
    """Execute a query and return results.

    Args:
        query: CompiledQuery object (guaranteed by compiler)
        variables: Optional dictionary of variable values for query resolution
        params: Optional pre-computed parameter values for parameterized execution.
            When provided, the adapter should use these params directly instead
            of re-processing variables. This is used by batch execution where
            parameterization happens before adapter execution.

    Returns:
        QueryResult containing data or error information
    """
    return self._execute(query, variables, params)

SQL Adapter

For executing SQL queries against databases (DuckDB, PostgreSQL, etc.):

SqlAdapter

SqlAdapter(dbt_project_path: str | None = None, use_example_db: bool = False, connection_string: str | None = None, project_root: Path | None = None, profile_type: str = 'duckdb', read_only: bool = True, duckdb_config: dict[str, Any] | None = None)

Bases: BaseAdapter

Adapter for executing raw SQL queries.

DuckDB sources use the raw duckdb driver (parameterized, fast). All other warehouses use build_adapter() from dbt_adapter_factory.

Supported query types: sql

Example

adapter = SqlAdapter() query = SqlQuery(sql="SELECT * FROM users WHERE id = {{ user_id }}") result = adapter.execute(query, {"user_id": 1})

Initialize SQL adapter.

PARAMETER DESCRIPTION
dbt_project_path

Path to dbt project (default: current directory)

TYPE: str | None DEFAULT: None

use_example_db

If True, use DuckDB example database instead of dbt config

TYPE: bool DEFAULT: False

connection_string

DuckDB connection string (e.g., ":memory:" or file path)

TYPE: str | None DEFAULT: None

project_root

Root directory for resolving relative file paths in read_csv()

TYPE: Path | None DEFAULT: None

profile_type

Database type for dialect selection (e.g., 'postgres', 'duckdb')

TYPE: str DEFAULT: 'duckdb'

read_only

If True (default), open DuckDB connections in read-only mode. File-based DuckDB: opened with read_only=True — the driver refuses all writes. In-memory DuckDB (:memory:): always opened read-write regardless of this flag because there is no other way to populate an in-memory database; this is a known exception. enable_external_access=False is still forced for :memory: when read_only=True.

When read_only=True, enable_external_access=False is forced on the DuckDB config regardless of any user-supplied duckdb_config. This blocks httpfs URL fetches and read_csv/read_parquet/read_json against local and remote paths. To allow external access, pass read_only=False explicitly.

Non-DuckDB warehouses (Postgres, Snowflake, BigQuery, etc.) are not affected by this flag — they have no native read-only driver knob. Use SELECT-only credentials/roles at the warehouse level for those.

Callers that must write (cache backends, test fixture setup) should pass read_only=False explicitly.

TYPE: bool DEFAULT: True

duckdb_config

Optional DuckDB config dict for the default connection. When read_only=True, enable_external_access in this dict is overridden to False. To enable external access, use read_only=False.

TYPE: dict[str, Any] | None DEFAULT: None

Source code in dataface/core/execute/adapters/sql_adapter.py
def __init__(
    self,
    dbt_project_path: str | None = None,
    use_example_db: bool = False,
    connection_string: str | None = None,
    project_root: Path | None = None,
    profile_type: str = "duckdb",
    read_only: bool = True,
    duckdb_config: dict[str, Any] | None = None,
):
    """Initialize SQL adapter.

    Args:
        dbt_project_path: Path to dbt project (default: current directory)
        use_example_db: If True, use DuckDB example database instead of dbt config
        connection_string: DuckDB connection string (e.g., ":memory:" or file path)
        project_root: Root directory for resolving relative file paths in read_csv()
        profile_type: Database type for dialect selection (e.g., 'postgres', 'duckdb')
        read_only: If True (default), open DuckDB connections in read-only mode.
            File-based DuckDB: opened with read_only=True — the driver refuses all writes.
            In-memory DuckDB (:memory:): always opened read-write regardless of this flag
            because there is no other way to populate an in-memory database; this is a
            known exception. enable_external_access=False is still forced for :memory:
            when read_only=True.

            When read_only=True, enable_external_access=False is forced on the DuckDB
            config regardless of any user-supplied duckdb_config. This blocks httpfs URL
            fetches and read_csv/read_parquet/read_json against local and remote paths.
            To allow external access, pass read_only=False explicitly.

            Non-DuckDB warehouses (Postgres, Snowflake, BigQuery, etc.) are not affected
            by this flag — they have no native read-only driver knob. Use SELECT-only
            credentials/roles at the warehouse level for those.

            Callers that must write (cache backends, test fixture setup) should pass
            read_only=False explicitly.
        duckdb_config: Optional DuckDB config dict for the default connection.
            When read_only=True, enable_external_access in this dict is overridden
            to False. To enable external access, use read_only=False.
    """
    self.dbt_project_path = Path(dbt_project_path) if dbt_project_path else None
    self.use_example_db = use_example_db
    self.connection_string = connection_string or ":memory:"
    self.project_root = project_root
    self.profile_type = profile_type
    self.read_only = read_only
    self._duckdb_config = duckdb_config
    self._connection: Any = None  # Lazy-loaded DuckDB connection (default)
    # Thread-local storage for per-source DuckDB connection caches.
    self._tls = threading.local()
    # Track all connections opened across all threads so close() can clean up.
    self._all_conns: list[Any] = []
    self._all_conns_lock = threading.Lock()
    self._manifest: dict[str, Any] | None = None  # Lazy-loaded dbt manifest
    self._manifest_from_snapshot: bool = False
    self._prod_manifest: dict[str, Any] | None = None
    self._prod_manifest_loaded: bool = False
    self._dialect = get_dialect(profile_type)
    self._project_sources: dict[str, dict[str, Any]] | None = None

supported_types property

supported_types: set[str]

Return supported query types.

list_sources

list_sources() -> dict[str, dict[str, Any]]

Return the project's configured source profiles (lazy-loaded and cached).

Source code in dataface/core/execute/adapters/sql_adapter.py
def list_sources(self) -> dict[str, dict[str, Any]]:
    """Return the project's configured source profiles (lazy-loaded and cached)."""
    if self._project_sources is None:
        self._project_sources = {}
        if self.project_root:
            from dataface.core.compile.compiler import resolve_project_source_paths
            from dataface.core.compile.config import get_project_sources

            try:
                project_sources = get_project_sources(self.project_root)
                resolved = resolve_project_source_paths(
                    project_sources.sources, self.project_root
                )
                self._project_sources.update(resolved)
            except (OSError, TypeError, ValueError) as e:
                logger.warning("Failed to load project sources: %s", e)

    return self._project_sources

get_source_config

get_source_config(profile: str) -> dict[str, Any] | None

Look up source config for a profile name from project sources.

Source code in dataface/core/execute/adapters/sql_adapter.py
def get_source_config(self, profile: str) -> dict[str, Any] | None:
    """Look up source config for a profile name from project sources."""
    project_sources = self.list_sources()
    return project_sources.get(profile)

register_source

register_source(name: str, config: dict[str, Any]) -> None

Register a synthetic source against this adapter.

Used by surfaces that have a single warehouse connection but no _sources.yaml (the dft inspect HTML server is the canonical case). Idempotent on the source name: a project-source by the same name (or a previous synthetic registration) always wins.

Source code in dataface/core/execute/adapters/sql_adapter.py
def register_source(self, name: str, config: dict[str, Any]) -> None:
    """Register a synthetic source against this adapter.

    Used by surfaces that have a single warehouse connection but no
    ``_sources.yaml`` (the ``dft inspect`` HTML server is the canonical
    case). Idempotent on the source name: a project-source by the same
    name (or a previous synthetic registration) always wins.
    """
    sources = self.list_sources()
    sources.setdefault(name, dict(config))

close

close() -> None

Close all database connections.

Source code in dataface/core/execute/adapters/sql_adapter.py
def close(self) -> None:
    """Close all database connections."""
    if self._connection:
        self._connection.close()
        self._connection = None

    with self._all_conns_lock:
        conns, self._all_conns = self._all_conns, []
    for conn in conns:
        conn.close()

CSV Adapter

For loading data from CSV files:

CsvAdapter

CsvAdapter(project_root: Path | None = None)

Bases: BaseAdapter

Adapter for executing queries against CSV files.

Supported query types: csv

Loads data from CSV files and supports: - Column selection - Row filtering (exact match, AND logic) - Result limiting

Example

adapter = CsvAdapter() query = CsvQuery(file="data/sales.csv", columns=["date", "amount"]) result = adapter.execute(query)

Initialize CSV adapter.

PARAMETER DESCRIPTION
project_root

Root directory of the project (for resolving relative paths)

TYPE: Path | None DEFAULT: None

Source code in dataface/core/execute/adapters/csv_adapter.py
def __init__(self, project_root: Path | None = None):
    """Initialize CSV adapter.

    Args:
        project_root: Root directory of the project (for resolving relative paths)
    """
    self.project_root = project_root or Path.cwd()

supported_types property

supported_types: set[str]

Return supported query types.

HTTP Adapter

For fetching data from REST APIs:

HttpAdapter

HttpAdapter(timeout: int = 30)

Bases: BaseAdapter

Adapter for executing HTTP/REST API queries.

Supported query types: http

Fetches data from REST API endpoints with support for: - Multiple HTTP methods (GET, POST, PUT, DELETE, PATCH) - Custom headers - Query parameters - Request body (JSON)

Example

adapter = HttpAdapter() query = HttpQuery( ... url="https://api.example.com/users", ... headers={"Authorization": "Bearer {{ token }}"} ... ) result = adapter.execute(query, {"token": "xyz"})

Initialize HTTP adapter.

PARAMETER DESCRIPTION
timeout

Request timeout in seconds

TYPE: int DEFAULT: 30

Source code in dataface/core/execute/adapters/http_adapter.py
def __init__(self, timeout: int = 30):
    """Initialize HTTP adapter.

    Args:
        timeout: Request timeout in seconds
    """
    self.timeout = timeout

supported_types property

supported_types: set[str]

Return supported query types.

dbt Adapter

For integrating with dbt models:

DbtAdapter

DbtAdapter(dbt_project_path: Path | None = None, profile_name: str | None = None, target_name: str | None = None)

Bases: BaseAdapter

Adapter for executing SQL queries via dbt's adapter system.

Supported query types: sql

Leverages dbt's adapter API to execute SQL queries with support for dbt-specific features like ref(), source(), etc.

This adapter requires dbt-adapters and a warehouse-specific dbt package (e.g. dbt-duckdb, dbt-postgres) — not dbt-core. A valid dbt project with profiles.yml configuration is also required.

Example

adapter = DbtAdapter(dbt_project_path=Path("./my_dbt_project")) query = SqlQuery(sql="SELECT * FROM {{ ref('customers') }}") result = adapter.execute(query)

Initialize dbt adapter.

PARAMETER DESCRIPTION
dbt_project_path

Path to dbt project (default: current directory)

TYPE: Path | None DEFAULT: None

profile_name

dbt profile name (default: from dbt_project.yml)

TYPE: str | None DEFAULT: None

target_name

dbt target name (default: DBT_TARGET env var, then 'dev')

TYPE: str | None DEFAULT: None

Source code in dataface/core/execute/adapters/dbt_adapter.py
def __init__(
    self,
    dbt_project_path: Path | None = None,
    profile_name: str | None = None,
    target_name: str | None = None,
):
    """Initialize dbt adapter.

    Args:
        dbt_project_path: Path to dbt project (default: current directory)
        profile_name: dbt profile name (default: from dbt_project.yml)
        target_name: dbt target name (default: DBT_TARGET env var, then 'dev')
    """
    import os

    self.dbt_project_path = (
        Path(dbt_project_path).resolve()
        if dbt_project_path
        else Path.cwd().resolve()
    )
    self.profile_name = profile_name
    self.target_name = target_name or os.environ.get("DBT_TARGET") or "dev"
    self._adapter: Any = None
    self._manifest: dict[str, Any] | None = None
    self._manifest_loaded: bool = False

supported_types property

supported_types: set[str]

Return supported query types.

MetricFlow Adapter

For querying MetricFlow metrics:

MetricFlowAdapter

MetricFlowAdapter(config_path: str | None = None)

Bases: BaseAdapter

Adapter for executing MetricFlow (dbt Semantic Layer) queries.

Supported query types: metricflow

Executes queries against dbt's Semantic Layer using MetricFlow. In a full implementation, this uses the MetricFlow Python API or CLI.

Example

adapter = MetricFlowAdapter() query = MetricFlowQuery( ... metrics=["revenue", "orders"], ... dimensions=["date", "region"] ... ) result = adapter.execute(query)

Initialize MetricFlow adapter.

PARAMETER DESCRIPTION
config_path

Optional path to MetricFlow configuration

TYPE: str | None DEFAULT: None

Source code in dataface/core/execute/adapters/metricflow_adapter.py
def __init__(self, config_path: str | None = None):
    """Initialize MetricFlow adapter.

    Args:
        config_path: Optional path to MetricFlow configuration
    """
    self.config_path = config_path

supported_types property

supported_types: set[str]

Return supported query types.


Adapter Registry

AdapterRegistry

AdapterRegistry(project_root: Path | None = None)

Registry for managing and selecting query adapters.

The registry uses type-based routing from the unified query interface. Adapters declare their supported types via the supported_types property, and the registry routes queries to the appropriate adapter.

Priority order: 1. First registered adapter that supports the query type wins 2. dbt adapter is registered first for SQL (with dbt features) 3. Fallback SQL adapter for plain SQL without dbt

Example

from dataface.core.execute.adapters import build_adapter_registry registry = build_adapter_registry(Path(".")) result = registry.execute(SqlQuery(sql="SELECT 1"))

Initialize adapter registry.

Use build_adapter_registry() to get a fully configured registry.

PARAMETER DESCRIPTION
project_root

Root directory for resolving relative file paths.

TYPE: Path | None DEFAULT: None

Source code in dataface/core/execute/adapters/adapter_registry.py
def __init__(self, project_root: Path | None = None) -> None:
    """Initialize adapter registry.

    Use ``build_adapter_registry()`` to get a fully configured registry.

    Args:
        project_root: Root directory for resolving relative file paths.
    """
    self._adapters: list[BaseAdapter] = []
    self._type_index: dict[str, list[BaseAdapter]] = {}
    self._project_root = project_root

project_root property

project_root: Path | None

Return the registry's configured project root, if any.

adapters property

adapters: list[BaseAdapter]

Get all registered adapters.

supported_types property

supported_types: set[str]

Get all query types supported by registered adapters.

RETURNS DESCRIPTION
set[str]

Set of query type strings

register

register(adapter: BaseAdapter) -> None

Register an adapter.

Adapters are indexed by their supported types for fast lookup.

PARAMETER DESCRIPTION
adapter

Adapter instance to register

TYPE: BaseAdapter

Source code in dataface/core/execute/adapters/adapter_registry.py
def register(self, adapter: BaseAdapter) -> None:
    """Register an adapter.

    Adapters are indexed by their supported types for fast lookup.

    Args:
        adapter: Adapter instance to register
    """
    self._adapters.append(adapter)

    # Update type index
    for query_type in adapter.supported_types:
        if query_type not in self._type_index:
            self._type_index[query_type] = []
        self._type_index[query_type].append(adapter)

get_adapters_for_type

get_adapters_for_type(query_type: str) -> list[BaseAdapter]

Get all adapters that support a given query type.

PARAMETER DESCRIPTION
query_type

Query type string (e.g., "sql", "csv")

TYPE: str

RETURNS DESCRIPTION
list[BaseAdapter]

List of adapters supporting this type (in registration order)

Source code in dataface/core/execute/adapters/adapter_registry.py
def get_adapters_for_type(self, query_type: str) -> list[BaseAdapter]:
    """Get all adapters that support a given query type.

    Args:
        query_type: Query type string (e.g., "sql", "csv")

    Returns:
        List of adapters supporting this type (in registration order)
    """
    return self._type_index.get(query_type, [])

get_adapter

get_adapter(query: CompiledQuery) -> BaseAdapter | None

Get an adapter that can execute the given query.

Uses type-based routing for fast lookup, then checks additional eligibility via can_execute().

PARAMETER DESCRIPTION
query

CompiledQuery object

TYPE: CompiledQuery

RETURNS DESCRIPTION
BaseAdapter | None

Adapter instance or None if no adapter can execute the query

Source code in dataface/core/execute/adapters/adapter_registry.py
def get_adapter(self, query: CompiledQuery) -> BaseAdapter | None:
    """Get an adapter that can execute the given query.

    Uses type-based routing for fast lookup, then checks additional
    eligibility via can_execute().

    Args:
        query: CompiledQuery object

    Returns:
        Adapter instance or None if no adapter can execute the query
    """
    # Fast path: lookup by type
    candidates = self.get_adapters_for_type(query.query_type)

    for adapter in candidates:
        if adapter.can_execute(query):
            return adapter

    # Fallback: check all adapters (for adapters with custom can_execute)
    for adapter in self._adapters:
        if adapter not in candidates and adapter.can_execute(query):
            return adapter

    return None

execute

execute(query: CompiledQuery, variables: VariableValues | None = None, params: QueryParams = None) -> QueryResult

Execute a query using the appropriate adapter.

PARAMETER DESCRIPTION
query

CompiledQuery object

TYPE: CompiledQuery

variables

Variable values for query resolution

TYPE: VariableValues | None DEFAULT: None

params

Optional pre-computed parameter values for parameterized execution. When provided, the adapter should use these params directly instead of re-processing variables. This is used by batch execution where parameterization happens before adapter execution.

TYPE: QueryParams DEFAULT: None

RETURNS DESCRIPTION
QueryResult

QueryResult with data or error

Source code in dataface/core/execute/adapters/adapter_registry.py
def execute(
    self,
    query: CompiledQuery,
    variables: VariableValues | None = None,
    params: QueryParams = None,
) -> QueryResult:
    """Execute a query using the appropriate adapter.

    Args:
        query: CompiledQuery object
        variables: Variable values for query resolution
        params: Optional pre-computed parameter values for parameterized execution.
            When provided, the adapter should use these params directly instead
            of re-processing variables. This is used by batch execution where
            parameterization happens before adapter execution.

    Returns:
        QueryResult with data or error
    """
    # First, try to use a registered adapter that can handle this query
    # This allows properly configured adapters (e.g., from playground) to take precedence
    adapter = self.get_adapter(query)
    if not adapter:
        return QueryResult(
            data=[],
            error=f"No adapter found for query type: {query.query_type}",
        )

    return adapter.execute(query, variables, params)

list_sql_sources

list_sql_sources() -> list[dict[str, Any]]

Return configured SQL sources from registered SQL adapters.

Source code in dataface/core/execute/adapters/adapter_registry.py
def list_sql_sources(self) -> list[dict[str, Any]]:
    """Return configured SQL sources from registered SQL adapters."""
    from dataface.core.execute.adapters.sql_adapter import SqlAdapter

    sources: dict[str, dict[str, Any]] = {}
    for adapter in self._adapters:
        if not isinstance(adapter, SqlAdapter):
            continue
        for name, config in adapter.list_sources().items():
            source_type = str(config.get("type", "unknown"))
            source_info: dict[str, Any] = {"name": name, "type": source_type}
            db_path = config.get("path")
            if db_path:
                source_info["path"] = str(db_path)
            if source_type == "duckdb" and str(db_path) == ":memory:":
                source_info["in_memory"] = True
            sources[name] = source_info
    return [sources[name] for name in sorted(sources)]

resolve_source_config

resolve_source_config(source: str | None = None) -> dict[str, Any]

Resolve a full source_config dict for use with InspectConnection.

Returns the warehouse-specific dict (type + credentials).

PARAMETER DESCRIPTION
source

Named source profile. When None, falls back to the registry's default SqlAdapter — but only if it's a DuckDB adapter (the connection_string is :memory: or a .duckdb path). For non-DuckDB SqlAdapters, raises DatafaceError because reconstructing config from (connection_string, profile_type) drops credentials, host, port, keyfile_json, etc.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
dict[str, Any]

source_config dict with at least a 'type' key.

RAISES DESCRIPTION
DatafaceError

DF-EXECUTE-NO-DEFAULT-SOURCE if source is None and the default adapter is non-DuckDB; DF-EXECUTE-SOURCE-NOT-FOUND if the named source is missing but others are configured; DF-EXECUTE-SOURCE-NOT-FOUND-EMPTY if no sources are configured.

Source code in dataface/core/execute/adapters/adapter_registry.py
def resolve_source_config(self, source: str | None = None) -> dict[str, Any]:
    """Resolve a full source_config dict for use with InspectConnection.

    Returns the warehouse-specific dict (type + credentials).

    Args:
        source: Named source profile. When None, falls back to the
            registry's default SqlAdapter — but only if it's a DuckDB
            adapter (the connection_string is :memory: or a .duckdb path).
            For non-DuckDB SqlAdapters, raises DatafaceError because
            reconstructing config from (connection_string, profile_type)
            drops credentials, host, port, keyfile_json, etc.

    Returns:
        source_config dict with at least a 'type' key.

    Raises:
        DatafaceError: DF-EXECUTE-NO-DEFAULT-SOURCE if source is None and
            the default adapter is non-DuckDB; DF-EXECUTE-SOURCE-NOT-FOUND
            if the named source is missing but others are configured;
            DF-EXECUTE-SOURCE-NOT-FOUND-EMPTY if no sources are configured.
    """
    from dataface.core.execute.adapters.sql_adapter import SqlAdapter
    from dataface.core.inspect.connection import source_config_from_url

    if source is None:
        # No-source fallback only works for DuckDB SqlAdapters — they have
        # no credentials, host, port, keyfile_json to lose.
        for adapter in self._adapters:
            if isinstance(adapter, SqlAdapter) and adapter.profile_type == "duckdb":
                return source_config_from_url(
                    adapter.connection_string, adapter.profile_type
                )
        raise DatafaceError.from_code(DF_EXECUTE_NO_DEFAULT_SOURCE)

    for adapter in self._adapters:
        if isinstance(adapter, SqlAdapter):
            config = adapter.get_source_config(source)
            if config and config.get("type"):
                return config

    available = ", ".join(item["name"] for item in self.list_sql_sources())
    if available:
        raise DatafaceError.from_code(
            DF_EXECUTE_SOURCE_NOT_FOUND, source=source, available=available
        )
    raise DatafaceError.from_code(DF_EXECUTE_SOURCE_NOT_FOUND_EMPTY, source=source)

Errors

errors

Execution error types.

Stage: EXECUTE Purpose: Define error types for query execution failures.

These errors are raised during: - Query execution (QueryError) - Adapter resolution/execution (AdapterError) - Connection failures (ConnectionError)

All errors inherit from ExecutionError → DatafaceError for easy catching.

ExecutionError

ExecutionError(message: str, query_name: str | None = None)

Bases: DatafaceError

Base error for all execution failures.

This is the parent class for all execution-related errors. Catch this to handle any execution error.

ATTRIBUTE DESCRIPTION
message

Human-readable error description

query_name

Name of query that failed (if applicable)

TYPE: str | None

Source code in dataface/core/execute/errors.py
def __init__(self, message: str, query_name: str | None = None):
    self.message = message
    self.query_name = query_name
    self.fields: dict[str, Any] = {}
    super().__init__(self._format_message())
    if self.code is None:
        from dataface.core.errors.codes_unknown import DF_UNKNOWN_INTERNAL

        self.code = DF_UNKNOWN_INTERNAL

QueryError

QueryError(message: str, query_name: str | None = None, sql: str | None = None)

Bases: ExecutionError

Error during query execution.

Raised when: - SQL syntax is invalid - Table/column doesn't exist - Query returns unexpected results

Example

try: ... executor.execute_query("broken_query") ... except QueryError as e: ... print(f"Query failed: {e}")

Source code in dataface/core/execute/errors.py
def __init__(
    self, message: str, query_name: str | None = None, sql: str | None = None
):
    self.sql = sql
    super().__init__(f"Query execution failed: {message}", query_name)

AdapterError

AdapterError(message: str, adapter_type: str | None = None)

Bases: ExecutionError

Error with adapter resolution or execution.

Raised when: - Adapter not found for query type - Adapter initialization fails - Adapter-specific error occurs

Example

try: ... executor.execute_query("unknown_type_query") ... except AdapterError as e: ... print(f"Adapter error: {e}")

Source code in dataface/core/execute/errors.py
def __init__(self, message: str, adapter_type: str | None = None):
    self.adapter_type = adapter_type
    super().__init__(f"Adapter error: {message}")