Skip to main content

Integration patterns

Simulacrum fits into batch jobs, streaming workers, and web applications. Use these patterns as a starting point.

Airflow DAG example

from airflow.decorators import dag, task
from airflow.models import Variable
from datetime import datetime
from simulacrum import Simulacrum

@dag(schedule="0 * * * *", start_date=datetime(2024, 1, 1), catchup=False)
def simulacrum_forecasts():
    @task
    def produce_forecast(segment: str) -> list[float]:
        client = Simulacrum(api_key=Variable.get("SIMULACRUM_API_KEY"))
        series = load_series(segment)  # retrieve from your warehouse
        forecast = client.forecast(series=series, horizon=12, model="tempo")
        persist_forecast(segment, forecast.tolist())  # write back to storage
        return forecast.tolist()

    produce_forecast.expand(segment=["enterprise", "smb", "consumer"])

doc = simulacrum_forecasts()

FastAPI service

from fastapi import FastAPI, HTTPException
from simulacrum import Simulacrum
from simulacrum.exceptions import SimulacrumError

app = FastAPI()
client = Simulacrum(api_key=get_secret("SIMULACRUM_API_KEY"))

@app.post("/forecast")
async def forecast(payload: list[float], horizon: int = 5):
    try:
        return {"forecast": client.forecast(series=payload, horizon=horizon, model="tempo").tolist()}
    except SimulacrumError as exc:
        raise HTTPException(status_code=400, detail=str(exc))

CI smoke test

import pytest
from simulacrum import Simulacrum

@pytest.mark.integration
def test_validate_key():
    client = Simulacrum(api_key=get_secret("SIMULACRUM_API_KEY"))
    validation = client.validate()
    assert validation.valid, "API key must be active for production deploys"
Add these checks to the top of your deployments to catch credential drift before it hits production.