Source code for agent_urban_planning.decisions.ahlfeldt_abm_engine

"""Monte Carlo argmax engine — ABM substitute for softmax in Ahlfeldt.

Replaces the closed-form Fréchet softmax step with an explicit per-agent
argmax over drawn shocks. This lets the same Ahlfeldt FOC machinery
(housing market clearing + firm labor FOC) run under **any** shock
distribution, since the FOCs only read aggregate choice shares and are
distribution-free (budget identity + firm marginal condition).

Design principles
-----------------
1. **Homogeneous preferences across all M agents** — β, κ, ε come from
   ``params``. Heterogeneity emerges from the per-agent shock draw only.
2. **Frozen shocks via deterministic sub-seeds** — each agent's (N, N)
   shock tensor is regenerated from ``rng(master_seed + agent_id)`` on
   every call, guaranteeing bitwise reproducibility without storing the
   full M × N × N tensor.
3. **Batched numpy vectorization** — M agents processed in chunks of
   ``batch_size`` (default 10,000). Peak memory per batch:
   ``BATCH × N² × 4 B`` — ~370 MB at BATCH=10k, N=96.
4. **Empirical P_agg exposed to market unchanged** — ``P_agg_count / M``
   feeds ``self.last_choice_probabilities``, so :class:`AhlfeldtMarket`
   reads it exactly as if it came from the closed-form path.

Supported shock distributions
-----------------------------
- ``"frechet"``  — Gumbel(0, 1) with variance π²/6 ≈ 1.645. Matches the
                   Ahlfeldt closed-form softmax in aggregate.
- ``"normal"``   — Normal(0, σ² = π²/6). Variance-matched to Gumbel so
                   V2 vs V3 is a pure **shape** test (symmetric Gaussian
                   vs asymmetric right-skewed Gumbel), not a scale test.

Variance matching rationale:
After the argmax applies ``shock / ε_structural``, the effective logit-
space noise variance is the same under both distributions. Any
equilibrium deviation between V2 (Gumbel) and V3 (Normal) is thus purely
attributable to distribution *shape*, not noise *amplitude*.
"""
from __future__ import annotations

import math
import logging
from typing import Optional

import numpy as np

from agent_urban_planning.core.agents import Agent
from agent_urban_planning.data.loaders import AhlfeldtParams
from agent_urban_planning.decisions.ahlfeldt_utility import AhlfeldtUtilityEngine
from agent_urban_planning.decisions.base import LocationChoice

SUPPORTED_SHOCKS = ("frechet", "normal")
# Variance of Gumbel(0, 1) distribution.
GUMBEL_VARIANCE = math.pi ** 2 / 6.0  # ≈ 1.6449


[docs] class AhlfeldtABMEngine(AhlfeldtUtilityEngine): """Monte Carlo argmax decision engine with configurable shock distribution. The ABM substitute for V1's closed-form softmax. Drives paper variants V2 (Fréchet shocks) and V3 (variance-matched Gaussian shocks). Replaces the Fréchet softmax step with an explicit per-agent argmax over drawn shocks, while keeping the rest of the Ahlfeldt FOC machinery (housing market clearing, firm labor FOC) unchanged. The exposed empirical choice matrix ``last_choice_probabilities`` lets :class:`AhlfeldtMarket` consume the result as if it came from the closed-form path. Most users should configure this through :class:`UtilityEngine` rather than instantiating it directly. Args: params: Structural parameters. shock_distribution: ``"frechet"`` for V2 or ``"normal"`` for V3 (variance-matched to Gumbel so V2 vs V3 is a pure shape test). num_agents: Number of Monte Carlo replicates ``M``. batch_size: Per-batch chunk size for memory bookkeeping. seed: Optional integer seed. Passed to the parent class. store_agent_samples: Cap on per-agent sample records to retain for diagnostics. ``0`` (default) skips sampling. **kwargs: Forwarded to :class:`AhlfeldtUtilityEngine`. Raises: ValueError: If ``shock_distribution`` is not ``"frechet"`` / ``"normal"``, or if ``num_agents`` / ``batch_size`` is non-positive. Examples: >>> import agent_urban_planning as aup >>> # Prefer the public wrapper: >>> # engine = aup.UtilityEngine(params, mode="argmax", noise="frechet") References: Ahlfeldt, G. M., Redding, S. J., Sturm, D. M., Wolf, N. (2015). The economics of density: Evidence from the Berlin Wall. *Econometrica*, 83(6), 2127-2189. """ def __init__( self, params: AhlfeldtParams, *, shock_distribution: str = "frechet", num_agents: int = 1_000_000, batch_size: int = 10_000, seed: Optional[int] = None, store_agent_samples: int = 0, **kwargs, ): super().__init__(params, seed=seed, **kwargs) if shock_distribution not in SUPPORTED_SHOCKS: raise ValueError( f"shock_distribution must be one of {SUPPORTED_SHOCKS}; " f"got {shock_distribution!r}" ) if num_agents < 1: raise ValueError(f"num_agents must be >= 1; got {num_agents}") if batch_size < 1: raise ValueError(f"batch_size must be >= 1; got {batch_size}") self.shock_distribution = shock_distribution self.num_agents = int(num_agents) self.batch_size = int(batch_size) self.store_agent_samples = int(store_agent_samples) # Public diagnostics populated each decide_batch. self.last_abm_diagnostics: dict = {} # ------------------------------------------------------------------ # Shock draw helpers # ------------------------------------------------------------------ def _fresh_rng(self) -> np.random.Generator: """Create a fresh RNG seeded from the master seed. One RNG per ``decide_batch`` call, shared across all its batches. This guarantees frozen-across-iterations shocks (same seed on every call → same sequence), while avoiding per-batch RNG creation overhead. """ return np.random.default_rng((self.seed * 2_000_003 + 1) & 0xFFFFFFFF) def _draw_shocks_batch( self, rng: np.random.Generator, batch_size: int, N: int, ) -> np.ndarray: """Draw a (batch, N, N) shock tensor from the provided generator. Uses `rng.random(dtype=np.float32)` to get uniforms directly in float32 (avoids a float64 → float32 copy), then transforms to the target distribution via its inverse CDF: Gumbel(0, 1): -ln(-ln(u)) Normal(0, σ²): σ · Φ⁻¹(u) via Box-Muller or sqrt(2)·erfinv(2u-1) Numpy's ``rng.gumbel`` and ``rng.normal`` internally produce float64 and then we'd cast — the direct-uniform path is ~30% faster at this size. """ shape = (batch_size, N, N) u = rng.random(size=shape, dtype=np.float32) if self.shock_distribution == "frechet": # Clamp to avoid log(0). np.maximum(u, 1e-7, out=u) np.minimum(u, 1.0 - 1e-7, out=u) # Gumbel inverse CDF: −log(−log(u)) np.log(u, out=u) np.negative(u, out=u) np.log(u, out=u) np.negative(u, out=u) return u # "normal": variance-matched to Gumbel, σ = π/√6. # scipy-free: use np.sqrt(2) * erfinv(2u - 1) via erfinv approximation. # Easier and also fast: use the standard RNG normal draw (internally # uses ziggurat — very fast). Accept a small float64→float32 copy cost. sigma = math.sqrt(GUMBEL_VARIANCE) return rng.standard_normal(size=shape, dtype=np.float32) * np.float32(sigma) # ------------------------------------------------------------------ # decide_batch override # ------------------------------------------------------------------
[docs] def decide_batch( self, agents: list[Agent], environment, zone_options: list[str], prices: dict, ) -> list[LocationChoice]: """Run ``M`` independent argmax-over-shocks draws and aggregate the results. Builds a shared utility matrix ``V[i, j]`` once, then in batches of ``self.batch_size`` draws independent (N, N) shock tensors, adds them to ``V``, and takes per-agent argmax to produce the empirical choice matrix consumed by :class:`AhlfeldtMarket`. Args: agents: Canonical agent list (typically length 1 — a single representative type — since preferences are homogeneous across all M Monte Carlo replicates). The number of MC agents is controlled by ``self.num_agents``, not ``len(agents)``. environment: The :class:`Environment` carrying zones and travel-time matrix. zone_options: Allowed zones. prices: Mapping ``zone -> Q_i``. Returns: List of :class:`LocationChoice`, one per input agent. ``self.last_choice_probabilities`` is set to the empirical ``M``-sample choice matrix. Examples: >>> import agent_urban_planning as aup >>> # engine = aup.UtilityEngine(params, mode="argmax", noise="frechet", >>> # num_agents=1_000_000) >>> # choices = engine.decide_batch([rep_agent], env, zones, prices) """ if not agents: return [] N = len(zone_options) zones = list(zone_options) # Build the shared utility matrix V once (same for all M agents). amen_source = self._current_amenity def _resolve_B(z): if amen_source and z in amen_source: v = amen_source[z] if v is not None: return float(v) v = environment.get_zone(z).amenity_B return float(v) if v else 1e-12 B = np.array([_resolve_B(z) for z in zones], dtype=self._np_dtype) wages_source = self._current_wages w_vec = np.array( [ float( (wages_source.get(z) if wages_source else None) or environment.get_zone(z).wage_observed or 1.0 ) for z in zones ], dtype=self._np_dtype, ) Q = np.array( [float(prices.get(z, environment.get_zone(z).floor_price_observed)) for z in zones], dtype=self._np_dtype, ) if environment.transport_matrix is not None and environment.transport_matrix_index: tt_idx = [environment._matrix_index_map[z] for z in zones] tau = environment.transport_matrix[np.ix_(tt_idx, tt_idx)].astype( self._np_dtype, copy=False ) else: tau = np.zeros((N, N), dtype=self._np_dtype) for i, zi in enumerate(zones): for j, zj in enumerate(zones): tau[i, j] = environment.travel_time(zi, zj) EPS = 1e-12 log_B = np.log(np.maximum(B, EPS)) log_w = np.log(np.maximum(w_vec, EPS)) log_Q = np.log(np.maximum(Q, EPS)) V = ( log_B[:, None] + log_w[None, :] - (1.0 - self.beta) * log_Q[:, None] - self.kappa * tau ) # shape (N, N) # Cast to float32 for shock-tensor compatibility. V_f32 = V.astype(np.float32, copy=False) # Affordability mask (matches parent class — same structural β). if self.budget_constraint: afford_mask = ((1.0 - self.beta) * w_vec[None, :]) >= (Q[:, None] * 1.0) if not np.any(afford_mask): afford_mask = np.ones_like(afford_mask, dtype=bool) else: afford_mask = np.ones((N, N), dtype=bool) # Where disallowed, subtract a very large number pre-argmax. V_masked = np.where(afford_mask, V_f32, -1e30).astype(np.float32, copy=False) eps_scale = 1.0 / max(float(self.epsilon), EPS) # Aggregates. M = self.num_agents HR_count = np.zeros(N, dtype=np.int64) HM_count = np.zeros(N, dtype=np.int64) P_agg_count = np.zeros((N, N), dtype=np.int64) vv_sum = np.zeros(N, dtype=np.float64) wage_f64 = w_vec.astype(np.float64) # Per-agent sample (optional; capped at store_agent_samples). samples: list[LocationChoice] = [] sample_stride = None if self.store_agent_samples > 0: sample_stride = max(1, M // self.store_agent_samples) # Iterate over batches. # One RNG per decide_batch call — advanced sequentially across batches. # Same seed on every call → same sequence → frozen-across-iteration shocks. rng = self._fresh_rng() n_batches = (M + self.batch_size - 1) // self.batch_size for b in range(n_batches): start = b * self.batch_size stop = min(start + self.batch_size, M) this_batch = stop - start shocks = self._draw_shocks_batch(rng, this_batch, N) # eff = V + shock * (1/ε). Shape (batch, N, N). eff = V_masked[None, :, :] + shocks * np.float32(eps_scale) # Argmax across flattened (i, j) per agent. flat = eff.reshape(eff.shape[0], -1).argmax(axis=1) i_star = flat // N j_star = flat - i_star * N # Accumulate aggregates. # HR: count of each residence. np.add.at(HR_count, i_star, 1) np.add.at(HM_count, j_star, 1) np.add.at(P_agg_count, (i_star, j_star), 1) # vv_i = Σ_k wage[j*_k] * 1[i*_k = i]. # Vectorized: group wages by residence. np.add.at(vv_sum, i_star, wage_f64[j_star]) # Optional agent-record sampling. if sample_stride is not None: for offset in range(0, eff.shape[0], sample_stride): k = offset samples.append( LocationChoice( residence=zones[int(i_star[k])], workplace=zones[int(j_star[k])], utility=float(V_f32[int(i_star[k]), int(j_star[k])]), zone_utilities={}, ) ) # Empirical choice matrix: normalized so sum = 1. P_agg = (P_agg_count.astype(np.float64) / float(M)).astype( self._np_dtype, copy=False ) self.last_choice_probabilities = P_agg # Diagnostics. utilities_mean = float( np.sum(P_agg.astype(np.float64) * V.astype(np.float64)) ) self.last_abm_diagnostics = { "num_agents": M, "batch_size": self.batch_size, "n_batches": n_batches, "shock_distribution": self.shock_distribution, "epsilon": float(self.epsilon), "empty_cells": int(np.sum(P_agg_count == 0)), "nonzero_cells": int(np.sum(P_agg_count > 0)), "total_cells": int(P_agg_count.size), "HR_min": int(HR_count.min()), "HR_max": int(HR_count.max()), "HM_min": int(HM_count.min()), "HM_max": int(HM_count.max()), "expected_utility": utilities_mean, } # Per-type LocationChoice records: one entry per input agent. # Since all M replicates share preferences, we return the population- # modal (i*, j*) for each input agent. For downstream tables that expect # `len(results) == len(agents)`. results: list[LocationChoice] = [] for a in agents: flat_mode = int(P_agg_count.argmax()) i_mode = flat_mode // N j_mode = flat_mode - i_mode * N results.append( LocationChoice( residence=zones[i_mode], workplace=zones[j_mode], utility=float(V[i_mode, j_mode]), zone_utilities={}, ) ) # Also store the samples (if requested) as an attribute; callers # can read last_agent_samples for debugging / distributional reports. self.last_agent_samples = samples return results