Source code for agent_urban_planning.core.engine

"""Simulation engine orchestrating the full simulation loop."""

from __future__ import annotations

from dataclasses import dataclass
from typing import Optional

import numpy as np

from agent_urban_planning.core.agents import AgentPopulation
from agent_urban_planning.data.loaders import (
    AgentDistributionalConfig,
    PolicyConfig,
    ScenarioConfig,
    load_agents,
    load_policy,
    load_scenario,
)
from agent_urban_planning.decisions.base import DecisionEngine
from agent_urban_planning.decisions.clustering import (
    ClusteringConfig,
    ClusterizedDecisionEngine,
)
from agent_urban_planning.decisions._legacy_singapore_utility import UtilityEngine
from agent_urban_planning.core.environment import Environment
from agent_urban_planning.core.market import AhlfeldtMarket, HousingMarket, MarketResult
from agent_urban_planning.core.metrics import WelfareMetrics, compute_metrics
from agent_urban_planning.core.results import AgentResult, SimulationResults
from agent_urban_planning.core.run_metadata import RunMetadata, WallClock


[docs] class SimulationEngine: """Orchestrate one end-to-end simulation: config to environment to market to metrics. Top-level entry point of the library. Wires together a scenario, an agent population, a :class:`DecisionEngine`, and a market clearer (:class:`HousingMarket` for Singapore-style scenarios, :class:`AhlfeldtMarket` for Berlin Cobb-Douglas + Fréchet spatial-equilibrium scenarios). Calling :meth:`run` produces a :class:`SimulationResults` object containing welfare metrics, per-agent allocations, the price-history trajectory, and run metadata. Args: scenario: A loaded ``ScenarioConfig`` describing zones, transport network, and (for Berlin) Ahlfeldt structural parameters. agent_config: A ``AgentDistributionalConfig`` describing the agent population (per-zone Census distributions or explicit agent records). engine: An optional :class:`DecisionEngine`. When ``None`` the engine is auto-selected based on ``scenario`` (Berlin scenarios → :class:`AhlfeldtUtilityEngine`; everything else → the legacy Singapore ``UtilityEngine``). seed: Optional integer seed for the agent-sampling RNG. Defaults to ``scenario.simulation.random_seed``. verbose: When ``True``, prints progress and intermediate results. clustering: Optional clustering configuration for wrapping the inner engine in a :class:`ClusterizedDecisionEngine` (used with full-LLM mode to amortize calls across archetypes). llm_provider: Provider name to record in run metadata (``"codex-cli"``, ``"claude-code"``, ``"anthropic"``, ...). llm_model: Model name to record (e.g. ``"haiku"``). llm_temperature: Sampling temperature to record. llm_concurrency: Concurrency setting for the async LLM client. price_elasticity: Override of the floor-price elasticity used by the tatonnement step. Falls back to engine default. initial_damping: Initial Walrasian damping ``lambda``. market_convergence_threshold: Maximum absolute excess demand at which clearing is declared converged. max_market_iterations: Iteration cap on the tatonnement loop. Examples: >>> import agent_urban_planning as aup >>> scenario = aup.data.builtin.load("singapore_real_v2") >>> agents = aup.data.builtin.load_agents("singapore_real_v2") >>> sim = aup.SimulationEngine(scenario=scenario, agent_config=agents) >>> results = sim.run(policy=None) >>> sorted(results.metrics.zone_populations.values()) # doctest: +SKIP [0.05, 0.07, 0.13, ...] See Also: :class:`agent_urban_planning.UtilityEngine` — the closed-form decision engine used by default for Berlin scenarios. :class:`agent_urban_planning.AhlfeldtMarket` — the two-market tatonnement clearer used for Berlin scenarios. References: Ahlfeldt, G. M., Redding, S. J., Sturm, D. M., Wolf, N. (2015). The economics of density: Evidence from the Berlin Wall. *Econometrica*, 83(6), 2127-2189. """ def __init__( self, scenario: ScenarioConfig, agent_config: AgentDistributionalConfig, engine: Optional[DecisionEngine] = None, seed: Optional[int] = None, verbose: bool = False, clustering: Optional[ClusteringConfig] = None, llm_provider: Optional[str] = None, llm_model: Optional[str] = None, llm_temperature: Optional[float] = None, llm_concurrency: Optional[int] = None, price_elasticity: Optional[float] = None, initial_damping: Optional[float] = None, market_convergence_threshold: Optional[float] = None, max_market_iterations: Optional[int] = None, ): self.scenario = scenario self.agent_config = agent_config self.seed = seed or scenario.simulation.random_seed self.verbose = verbose # Provider/model info for metadata self.llm_provider = llm_provider self.llm_model = llm_model self.llm_temperature = llm_temperature self.llm_concurrency = llm_concurrency # Market clearing parameters self._price_elasticity = price_elasticity self._initial_damping = initial_damping self._market_convergence_threshold = market_convergence_threshold self._max_market_iterations = max_market_iterations # Build the inner engine, then optionally wrap with clustering. # For Ahlfeldt scenarios, default to AhlfeldtUtilityEngine when no # explicit engine is provided. if engine is None: if scenario.ahlfeldt_params is not None: from agent_urban_planning.decisions.ahlfeldt_utility import AhlfeldtUtilityEngine inner_engine = AhlfeldtUtilityEngine( params=scenario.ahlfeldt_params, seed=seed, dtype=getattr(scenario.ahlfeldt_params, "dtype", "float64"), deterministic=getattr( scenario.ahlfeldt_params, "deterministic", False ), ) else: inner_engine = UtilityEngine() else: inner_engine = engine self.inner_engine = inner_engine self.clustering_config = clustering if self.verbose: print(f"Loading scenario: {scenario.name} ({len(scenario.zones)} zones)") print(f"Generating {agent_config.num_types} agent types (seed={self.seed})...") rng = np.random.RandomState(self.seed) if self.seed is not None else None self.rng = rng # strict=True for real Singapore scenarios (>5 zones, no ahlfeldt_params). # Ahlfeldt replication scenarios use homogeneous agents by design # (matching the closed-form continuum-of-identical-workers assumption) # and therefore do not require per-zone Census distributions. _strict = ( len(scenario.zones) > 5 and scenario.ahlfeldt_params is None ) self.population = AgentPopulation.from_config(agent_config, rng=rng, strict=_strict) self.base_env = Environment.from_config(scenario) # Wrap with clustering if configured (and not "none") if clustering is not None and clustering.algo != "none": self.engine = ClusterizedDecisionEngine( inner=inner_engine, config=clustering, environment=self.base_env, rng=rng, ) else: self.engine = inner_engine if self.verbose: print(f" Zones: {', '.join(self.base_env.zone_names)}") print(f" Agents: {len(self.population)} types") if clustering is not None and clustering.algo != "none": print( f" Clustering: {clustering.algo} k={clustering.k} " f"samples={clustering.samples_per_archetype} " f"({clustering.within_cluster_assignment})" ) print()
[docs] @classmethod def from_paths( cls, scenario_path: str, agents_path: str, engine: Optional[DecisionEngine] = None, seed: Optional[int] = None, ) -> "SimulationEngine": """Build a ``SimulationEngine`` directly from YAML file paths. Convenience constructor that loads both the scenario and agent configurations from disk before delegating to the regular constructor. Useful for command-line scripts and notebooks. Args: scenario_path: Path to a scenario YAML file (passed to :func:`load_scenario`). agents_path: Path to an agent-population YAML file (passed to :func:`load_agents`). engine: Optional :class:`DecisionEngine` instance. ``None`` selects the default engine for the scenario. seed: Optional integer seed. Returns: A configured :class:`SimulationEngine` ready to run. Examples: >>> import agent_urban_planning as aup >>> sim = aup.SimulationEngine.from_paths( # doctest: +SKIP ... "config/scenarios/singapore_real_v2.yaml", ... "config/agents/singapore_real_v2.yaml", ... seed=42, ... ) """ return cls( scenario=load_scenario(scenario_path), agent_config=load_agents(agents_path), engine=engine, seed=seed, )
[docs] def run( self, policy: Optional[PolicyConfig] = None, baseline: Optional[SimulationResults] = None, market_resume: Optional[dict] = None, market_checkpoint_callback=None, llm_cache_path: Optional[str] = None, ) -> SimulationResults: """Run one simulation under the given policy and return full results. Applies the policy to the base environment, runs the scenario-appropriate market clearer (HDB/private tatonnement for Singapore, Q + w joint tatonnement for Berlin), assembles per-agent results, and computes welfare metrics. When ``policy`` is ``None``, the scenario's built-in environment is used unchanged — no transit or facility investments are applied. This supports both Ahlfeldt replication runs (which have no notion of government investment) and baseline observational runs that report the pre-intervention equilibrium. Args: policy: Optional ``PolicyConfig`` describing transit and facility investments to apply before clearing. ``None`` runs the scenario as observed. baseline: Optional pre-computed :class:`SimulationResults` from a prior run. When provided, each agent's ``utility_vs_baseline`` field is filled in with the difference between their realized utility here and in ``baseline``. market_resume: Optional checkpoint state from a prior interrupted run, returned by ``market_checkpoint_callback`` in a previous invocation. Lets long LLM runs survive process restarts. market_checkpoint_callback: Optional callable invoked with a JSON-serializable checkpoint dict at every market iteration. Receives a ``dict`` payload that can be passed back as ``market_resume`` to resume. llm_cache_path: Optional path to a disk-backed LLM cache. Reuses cached LLM completions across invocations. Returns: A :class:`SimulationResults` object aggregating welfare metrics, per-agent allocations, market price history, and run metadata. Examples: >>> import agent_urban_planning as aup # doctest: +SKIP >>> sim = aup.SimulationEngine(scenario, agent_config) # doctest: +SKIP >>> baseline = sim.run() # doctest: +SKIP >>> with_policy = sim.run(policy, baseline=baseline) # doctest: +SKIP >>> with_policy.metrics.avg_utility - baseline.metrics.avg_utility # doctest: +SKIP 0.04 """ if self.verbose: if policy is None: print(f"Running scenario: {self.scenario.name} (no policy applied)") else: n_transit = len(policy.transit_investments) n_facility = len(policy.facility_investments) print(f"Running policy: {policy.name}") print(f" Investments: {n_transit} transit, {n_facility} facilities") print(f" Market clearing...") wall_clock = WallClock() wall_clock.__enter__() # 1. Apply policy to environment (skip when policy is None) env = self.base_env.apply_policy(policy) if policy is not None else self.base_env # 2. Run market clearing. Dispatch on scenario.ahlfeldt_params: # - Berlin / Ahlfeldt scenarios → AhlfeldtMarket (two-market) # - Everything else → HousingMarket (single-segment or HDB/private) if self.scenario.ahlfeldt_params is not None: # External override (e.g. OpenCityAhlfeldtMarket for Run 2 CF). # Scripts can attach an already-constructed market instance to # `engine._market_override`; when present, we use it unchanged. if getattr(self, "_market_override", None) is not None: market = self._market_override else: market = AhlfeldtMarket( ahlfeldt_params=self.scenario.ahlfeldt_params, initial_damping=self._initial_damping or 0.3, convergence_threshold=( self._market_convergence_threshold if self._market_convergence_threshold is not None else self.scenario.simulation.market_convergence_threshold ), max_iterations=( self._max_market_iterations or self.scenario.simulation.market_max_iterations ), verbose=self.verbose, ) else: # Resolve price elasticity: CLI override > engine property > default eta = self._price_elasticity if eta is None and hasattr(self.inner_engine, 'price_elasticity'): eta = abs(self.inner_engine.price_elasticity) if eta is None: eta = 0.5 market = HousingMarket( max_iterations=self._max_market_iterations or self.scenario.simulation.market_max_iterations, convergence_threshold=( self._market_convergence_threshold if self._market_convergence_threshold is not None else self.scenario.simulation.market_convergence_threshold ), price_elasticity=eta, initial_damping=self._initial_damping or 0.3, verbose=self.verbose, ) market_result = market.clear( self.population, env, self.engine, resume_state=market_resume, checkpoint_callback=market_checkpoint_callback, cache_path=llm_cache_path, ) # 3. Build per-agent results agent_results = [] for agent in self.population: choice = market_result.allocations[agent.agent_id] route = env.transport.get_best_route(choice.zone_name, agent.job_location) commute = route.time_minutes if route else 120.0 price = market_result.prices.get(choice.zone_name, 0.0) # Compute utility change vs baseline utility_vs_baseline = None if baseline: try: base_result = baseline.get_agent(agent.agent_id) utility_vs_baseline = choice.utility - base_result.realized_utility except KeyError: pass agent_results.append(AgentResult.from_agent( agent=agent, zone_utilities=choice.zone_utilities, zone_choice=choice.zone_name, equilibrium_price=price, commute_minutes=commute, realized_utility=choice.utility, utility_vs_baseline=utility_vs_baseline, workplace_zone=getattr(choice, "workplace", "") or agent.job_location, )) # 4. Compute aggregate metrics if self.verbose: print(f" Computing welfare metrics...") metrics = compute_metrics( self.population, env, market_result.allocations, market_result.prices, market_converged=market_result.converged, market_convergence_metric=market_result.convergence_metric, ) if self.verbose: pops = ", ".join(f"{z}: {p:.1%}" for z, p in sorted(metrics.zone_populations.items())) print(f" Results:") print(f" Avg utility: {metrics.avg_utility:.4f}") print(f" Gini: {metrics.gini_coefficient:.4f}") print(f" Min utility: {metrics.min_utility:.4f}") print(f" Avg commute: {metrics.avg_commute_minutes:.1f} min") print(f" Unaffordable: {metrics.housing_unaffordable_share:.1%}") print(f" Zone pops: {pops}") print() # Convert market history to serializable dicts. A/B / wages only # populated when the scenario is Ahlfeldt + endogenous flag. price_history = [] for snap in market_result.history: entry = { "iteration": snap.iteration, "prices": snap.prices, "demand": snap.demand, "excess_demand": snap.excess_demand, "excess_demand_by_zone": snap.excess_demand_by_zone, } if getattr(snap, "productivity_A", None): entry["productivity_A"] = snap.productivity_A entry["amenity_B"] = snap.amenity_B entry["max_delta_A_rel"] = snap.max_delta_A_rel entry["max_delta_B_rel"] = snap.max_delta_B_rel if getattr(snap, "wages", None): entry["wages"] = snap.wages price_history.append(entry) # Stop the clock and assemble run metadata wall_clock.__exit__(None, None, None) metadata = self._build_metadata(policy, market_result, wall_clock.elapsed) return SimulationResults( metrics=metrics, agent_results=agent_results, policy_name=policy.name if policy is not None else "", scenario_name=self.scenario.name, price_history=price_history, metadata=metadata, )
# ------------------------------------------------------------------ # Metadata construction # ------------------------------------------------------------------ def _build_metadata( self, policy: Optional[PolicyConfig], market_result: MarketResult, wall_clock_seconds: float, ) -> RunMetadata: """Assemble RunMetadata from market result and engine state.""" clustering_algo = "none" num_archetypes = None samples_per_archetype = 1 within_cluster_assignment = "deterministic" cluster_features: list[str] = [] cluster_assignments: Optional[dict[int, int]] = None if self.clustering_config is not None: clustering_algo = self.clustering_config.algo if clustering_algo != "none": num_archetypes = self.clustering_config.k samples_per_archetype = self.clustering_config.samples_per_archetype within_cluster_assignment = self.clustering_config.within_cluster_assignment cluster_features = list(self.clustering_config.features or []) if isinstance(self.engine, ClusterizedDecisionEngine): cluster_assignments = ( dict(self.engine.cluster_assignments) if self.engine.cluster_assignments is not None else None ) engine_name = self.inner_engine.__class__.__name__ # Market clearing info from the result price_elasticity_used = getattr(market_result, 'price_elasticity_used', None) damping_final = getattr(market_result, 'damping_final', None) meta = RunMetadata( scenario_name=self.scenario.name, policy_name=policy.name if policy is not None else "", seed=self.seed, llm_provider=self.llm_provider, llm_model=self.llm_model, llm_temperature=self.llm_temperature, llm_concurrency=self.llm_concurrency, total_llm_calls=int(market_result.cache_misses), cached_llm_calls=int(market_result.cache_hits), total_input_tokens=int(market_result.total_input_tokens), total_output_tokens=int(market_result.total_output_tokens), wall_clock_seconds=wall_clock_seconds, clustering_algo=clustering_algo, num_archetypes=num_archetypes, samples_per_archetype=samples_per_archetype, within_cluster_assignment=within_cluster_assignment, cluster_features=cluster_features, cluster_assignments=cluster_assignments, decision_engine_name=engine_name, price_elasticity_used=price_elasticity_used, damping_final=damping_final, market_iterations_actual=market_result.iterations, convergence_achieved=market_result.converged, ) meta.update_cache_hit_rate() meta.update_cost() return meta
[docs] def compare_policies( self, policies: list[PolicyConfig], ) -> dict[str, SimulationResults]: """Run one simulation per policy and return results keyed by policy name. Iterates over a list of policies, running :meth:`run` for each and threading the first policy's result through as the baseline for ``utility_vs_baseline`` computations on subsequent policies. Useful for cross-policy welfare comparisons in a single call. Args: policies: List of ``PolicyConfig`` objects. The first policy in the list is treated as the comparison baseline. Returns: A dict mapping each ``policy.name`` to its :class:`SimulationResults`. Examples: >>> import agent_urban_planning as aup # doctest: +SKIP >>> sim = aup.SimulationEngine(scenario, agent_config) # doctest: +SKIP >>> results = sim.compare_policies([baseline_policy, alt_policy]) # doctest: +SKIP >>> results["alt"].metrics.avg_utility # doctest: +SKIP 2.13 """ results = {} baseline = None for policy in policies: result = self.run(policy, baseline=baseline) results[policy.name] = result if baseline is None: baseline = result return results
[docs] def budget_sweep( self, base_policy: PolicyConfig, transit_shares: list[float], ) -> list[tuple[float, SimulationResults]]: """Sweep over transit budget share and return results for each share level. Generates a series of policies that interpolate between facility-only (``share=0``) and transit-only (``share=1``) allocations of the policy's total budget, and runs the simulation for each. At every share level transit investments scale travel time improvements proportionally (50% budget yields halfway between old and new time), while facility investments scale capacity and quality proportionally. Args: base_policy: A baseline ``PolicyConfig`` whose ``transit_investments`` and ``facility_investments`` are used as the upper-budget reference. The policy's ``total_budget`` defines the budget envelope swept. transit_shares: Iterable of fractions in ``[0, 1]`` indicating what share of the total budget goes to transit at each point of the sweep. The complementary ``1 - share`` goes to facilities. Returns: List of ``(share, SimulationResults)`` tuples in the order of ``transit_shares``. Use these to build budget-allocation tradeoff plots. Examples: >>> import agent_urban_planning as aup >>> # sim = aup.SimulationEngine(scenario, agent_config) >>> # sweep = sim.budget_sweep(policy, [0.0, 0.5, 1.0]) >>> # shares = [s for s, _ in sweep] >>> # avg_utilities = [r.metrics.avg_utility for _, r in sweep] """ from copy import deepcopy sweep_results = [] budget = base_policy.total_budget base_transit_cost = sum(t.cost for t in base_policy.transit_investments) or 1.0 base_facility_cost = sum(f.cost for f in base_policy.facility_investments) or 1.0 # Get baseline travel times from current environment for interpolation base_times = {} for t in base_policy.transit_investments: route = self.base_env.transport.get_best_route(t.route[0], t.route[1]) old_time = route.time_minutes if route else 120.0 base_times[tuple(t.route)] = old_time for share in transit_shares: policy = deepcopy(base_policy) policy.name = f"sweep_{share:.2f}" transit_budget = budget * share facility_budget = budget * (1.0 - share) # Scale transit: interpolate travel time based on budget fraction if share > 0 and base_policy.transit_investments: transit_scale = min(transit_budget / base_transit_cost, 1.0) for t in policy.transit_investments: old_time = base_times.get(tuple(t.route), 120.0) full_new_time = t.new_time_minutes # Interpolate: more budget = closer to full improvement t.new_time_minutes = old_time - transit_scale * (old_time - full_new_time) t.cost = t.cost * (transit_budget / base_transit_cost) else: policy.transit_investments = [] # Scale facilities: scale capacity and quality with budget if (1.0 - share) > 0 and base_policy.facility_investments: facility_scale = min(facility_budget / base_facility_cost, 1.0) for f in policy.facility_investments: f.capacity = max(1, int(f.capacity * facility_scale)) f.quality = f.quality * (0.5 + 0.5 * facility_scale) # floor at 50% f.cost = f.cost * (facility_budget / base_facility_cost) else: policy.facility_investments = [] result = self.run(policy) sweep_results.append((share, result)) return sweep_results