Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.emergence.ai/llms.txt

Use this file to discover all available pages before exploring further.

Workflow Orchestration

The Data Governance solution uses Prefect for orchestrating data profiling, metadata enrichment, and data quality (DQ) rule generation workflows. Prefect provides task scheduling, concurrency control, observability, and failure handling.

Architecture

ComponentPurpose
Prefect ServerWorkflow API and UI (port 4200)
Prefect WorkerExecutes flows in Kubernetes pods
Work PoolKubernetes work pool named kubernetes-pool
PostgreSQLPrefect metadata storage
All Prefect services are deployed alongside the Data Governance API via Docker Compose (local dev) or Helm (Kubernetes).

Available Workflows

Data Profiling Flow

Analyzes connected data sources to produce statistical profiles:
SettingValue
Flow nameprofile-database
ConcurrencyConcurrentTaskRunner(max_workers=10)
InputData connection ID, schema/table targets (database-level targets are not supported, specify schema or table), sampling config
OutputColumn and table-level profiling results
Profiling, Metadata Assessment, and DQ Assessment workflows require schema-level or table-level targets. Database-level targets (entire database) are not supported, specify individual schemas or tables to process.

Metadata Enrichment Flow

Uses LLMs to generate business descriptions and classifications:
SettingValue
Flow nameenrich-metadata
ConcurrencyConcurrentTaskRunner(max_workers=5)
InputData connection ID, profiling results
OutputBusiness descriptions, PII classifications, DQ rule suggestions

DQ Rule Generation Flow

Generates data quality rules based on profiling data and LLM analysis:
SettingValue
Flow namegenerate-dq-rules
ConcurrencyConcurrentTaskRunner with asyncio.Semaphore for LLM calls
InputSchema context, profiling results, enrichment metadata
OutputStructured DQ rules per column
ConfigDQ_RULE_GEN_MAX_CONCURRENT_TABLES, DQ_RULE_GEN_MAX_CONCURRENT_RULES
DQ rule generation does not require a reference pack. Earlier versions passed an industry context / reference pack into the workflow; that input was removed because it caused crashes under Prefect 3.x. Generation runs purely from schema context, profiling results, and metadata.

Workflow approval and the scorecard

When you approve (“Save”) a profiling or enrichment workflow run, the platform applies two changes atomically:
  1. Tombstone affected metric snapshots. Affected metric_snapshots rows are written with numerator=0, denominator=0. This invalidates the prior score so the scorecard does not display stale numbers while the next assessment is pending.
  2. Resolve all matching OPEN violations. Every OPEN violation tied to an invalidated metric is closed in the same transaction, not just the most recent. Previously the resolve step closed only the latest violation, leaving older OPEN duplicates on the dashboard.
Net effect for users: scorecards update in real time. After Save, expect to see the affected scorecard cells flip to N/A (“pending next assessment”) and any related violation entries disappear from the open-violations view.

FQN scoping for violations

Violations are filtered by asset_fqn using an exact-or-child boundary: a filter for db.schema.card matches db.schema.card itself and any column-level child like db.schema.card.amount, but does not match sibling tables with prefix collisions like db.schema.cardtransaction. Earlier versions used a plain startswith, which produced false matches on sibling tables sharing a name prefix.

Ingestion triggered by asset.created

When an ingestion run is triggered by an asset.created event (or kicked off manually with no schemas listed in config), the workflow ingests all schemas in the connection, scoped by config.database. Earlier versions only ingested the first schema returned by the connection, which led to silent data gaps when a connection exposed multiple schemas.

Concurrency Model

Workflows use a hybrid concurrency model combining Prefect’s task runner with asyncio semaphores:

Flow-Level Concurrency

The ConcurrentTaskRunner controls how many table-level tasks run simultaneously:
@flow(task_runner=ConcurrentTaskRunner(max_workers=10))
async def profile_database(tables: list[str]):
    # Up to 10 tables profiled concurrently
    tasks = [profile_table.submit(table) for table in tables]
    results = await asyncio.gather(*[t.result() for t in tasks])
    return results

Task-Level Concurrency

Within each task, asyncio.Semaphore controls fine-grained concurrency (e.g., LLM API calls):
@task
async def enrich_table(table_info):
    semaphore = asyncio.Semaphore(20)  # Max 20 concurrent LLM calls

    async def process_column(column):
        async with semaphore:
            return await llm_client.enrich(column)

    results = await asyncio.gather(
        *[process_column(col) for col in table_info.columns]
    )
    return results
LevelMechanismVisibility
FlowConcurrentTaskRunnerVisible in Prefect UI
Taskasyncio.SemaphoreInternal to the task
Configure concurrency limits via config.yaml rather than hardcoding. This allows tuning based on the source database capacity and LLM API rate limits.

Workflow Management

Prefect UI

The Prefect UI (port 4200) provides visibility into workflow execution:
  • Flow runs: Status, duration, logs for each execution
  • Task runs: Individual task status within a flow
  • Deployments: Registered flow definitions and schedules
  • Work pool: Worker status and queue depth

Configuration

Workflow configuration is managed via OmegaConf with environment variable overrides:
# config.yaml
profiling:
  max_concurrent_tables: 10
  sample_threshold: 1000000  # Sample tables with more than 1M rows
  sample_size: 100000

enrichment:
  max_concurrent_tables: 5
  max_concurrent_llm_calls: 20
  model: "gpt-4o"
  temperature: 0.1

dq_rules:
  max_concurrent_tables: 10
  max_concurrent_rules: 20
Precedence: Environment variables > config.yaml > defaults Environment variables can be referenced in config: ${oc.env:OPENAI_API_KEY}

Error Handling

Error TypeHandling
Database connection failureFlow fails fast with descriptive error
LLM API rate limitRetry with exponential backoff via tenacity
LLM API timeoutRetry up to 3 times, then skip column and continue
Single table failureLog error, continue processing remaining tables
Infrastructure failurePrefect marks flow as failed; manual retry available via UI

Kubernetes Deployment

In Kubernetes, Prefect workflows run as pods in the kubernetes-pool work pool:
ComponentDeployment
Prefect ServerStatefulSet with PostgreSQL backend
Prefect WorkerDeployment watching the kubernetes-pool
Flow runsEphemeral pods created per flow execution
SecretsInjected via Helm secretKeyRef (e.g., OPENAI_API_KEY)

Next Steps

Data Profiling

Learn about the profiling workflow in detail.

Data Enrichment

Understand the LLM-powered enrichment pipeline.

Data Source Setup

Connect a database to use with governance workflows.