Source code for agent_urban_planning.core.agents

"""Heterogeneous household agent model."""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Optional

import numpy as np
from scipy import stats

from agent_urban_planning.data.loaders import AgentDistributionalConfig


[docs] @dataclass class PreferenceWeights: """Four-axis preference weights for an agent or archetype. Captures the relative importance an agent assigns to housing affordability, commute, services/facilities, and amenities. Values are stored unnormalized; call :meth:`normalized` to get a copy summing to 1. Attributes: alpha: Weight on housing affordability. beta: Weight on commute disutility. gamma: Weight on services / facilities accessibility. delta: Weight on neighborhood amenities. Examples: >>> from agent_urban_planning.core.agents import PreferenceWeights >>> w = PreferenceWeights(alpha=2.0, beta=1.0, gamma=1.0, delta=1.0) >>> w.normalized().alpha 0.4 """ alpha: float # housing affordability beta: float # commute gamma: float # services/facilities delta: float # amenities
[docs] def normalized(self) -> "PreferenceWeights": """Return a normalized copy whose four weights sum to one. Returns: New :class:`PreferenceWeights` whose components are scaled so ``alpha + beta + gamma + delta == 1.0``. If all weights are zero, returns equal weights (0.25 each) to avoid a division-by-zero downstream. Examples: >>> from agent_urban_planning.core.agents import PreferenceWeights >>> PreferenceWeights(1.0, 1.0, 1.0, 1.0).normalized().alpha 0.25 """ total = self.alpha + self.beta + self.gamma + self.delta if total == 0: # Edge case: all weights are zero → equal weights as safe fallback. # NOT a model parameter — just prevents division by zero. return PreferenceWeights(0.25, 0.25, 0.25, 0.25) return PreferenceWeights( self.alpha / total, self.beta / total, self.gamma / total, self.delta / total, )
[docs] @dataclass class Agent: """One representative household type (a weighted demographic record). Each agent represents a slice of the population. Demographic fields drive both the closed-form utility computation (via income and location) and the LLM-elicited persona used by V5 engines (through :func:`persona_summary`). The optional richer demographic fields (education, migration, employment, tenure) are populated only when the agent is sampled from the 10D Berlin joint distribution; legacy engines (V1..V4) ignore them. Attributes: agent_id: Stable integer identifier across runs. household_size: Number of persons in the household. age_head: Age of the household head (constrained 21-85 in sampling). has_children: Whether the household has any members aged < 18. has_elderly: Whether the household has any members aged >= 65. income: Monthly household income in scenario-currency units. savings: Liquid savings (default ``income * 6``). job_location: Zone name where the agent works (for Singapore scenarios; ignored when the engine optimizes workplace jointly with residence). car_owner: Whether the household owns a car. weight: Population share assigned to this type. Across the full :class:`AgentPopulation` these weights must sum to 1. preferences: Optional :class:`PreferenceWeights` recorded in the JSON output. Most engines ignore this — it is preserved for diagnostic output. home_zone: Planning area where the agent resides (used by engines that need an outside-option reference). education: ``"low"``, ``"mid"``, ``"high"``, or ``None``. migration_background: ``"none"``, ``"EU"``, ``"non-EU"``, or ``None``. employment_status: ``"employed"``, ``"self-employed"``, ``"unemployed"``, ``"retired_or_student"``, or ``None``. tenure: ``"owner"``, ``"renter"``, or ``None``. Examples: >>> from agent_urban_planning.core.agents import Agent >>> a = Agent( ... agent_id=0, household_size=3, age_head=42, has_children=True, ... has_elderly=False, income=5000.0, savings=30000.0, ... job_location="CBD", car_owner=False, weight=0.001, ... ) >>> a.income 5000.0 """ agent_id: int household_size: int age_head: int has_children: bool has_elderly: bool income: float savings: float job_location: str car_owner: bool weight: float # population share, all weights sum to 1.0 # Preference weights (α,β,γ,δ). Historical field — NOT read by any # decision engine (UtilityEngine and EstimatedUtilityEngine use the # linear β model with literature coefficients). Kept for recording # in simulation results JSON. Default equal weights. preferences: PreferenceWeights = field( default_factory=lambda: PreferenceWeights(0.25, 0.25, 0.25, 0.25) ) home_zone: str = "" # planning area where the agent resides (for outside option) # ---- Richer demographic fields (zensus-richer-demographics) ----------- # Optional. Populated when agent is sampled from the richer 10D joint # (joint_2011_richer.npz) via sample_ortsteile_agent_types.py # --joint-version richer. All legacy engines (V1..V4) ignore these. # V5-hierarchical consumes them through persona_summary(). education: Optional[str] = None # "low" | "mid" | "high" | None migration_background: Optional[str] = None # "none" | "EU" | "non-EU" | None employment_status: Optional[str] = None # "employed" | "self-employed" | "unemployed" | "retired_or_student" | None tenure: Optional[str] = None # "owner" | "renter" | None
# --------------------------------------------------------------------------- # Persona summary helper (used by V5-hierarchical prompts + diagnostics). # --------------------------------------------------------------------------- # Fixed ordering so the same agent always yields the same string. Omits any # field set to None so pre-richer agents (V1..V4) produce a shorter but # still-valid persona. _PERSONA_FIELD_ORDER: tuple[str, ...] = ( "age_head", "household_size", "has_children", "has_elderly", "income", "education", "migration_background", "employment_status", "tenure", "car_owner", )
[docs] def persona_summary(agent: "Agent") -> str: """Produce a stable, human-readable one-line persona from an agent's demographics. Fields appear in a fixed order; ``None`` fields are omitted. Output format is stable across calls (deterministic given the same agent), suitable for use as a cluster-label diagnostic or as the persona block in an LLM prompt. Args: agent: An :class:`Agent` instance whose demographics are rendered. Returns: Comma-separated persona string. Example: ``"38y, 3-person household, has children, mid income, renter, high education, EU background, employed, no car"``. Examples: >>> from agent_urban_planning.core.agents import Agent, persona_summary >>> a = Agent( ... agent_id=0, household_size=2, age_head=30, has_children=False, ... has_elderly=False, income=5000.0, savings=30000.0, ... job_location="x", car_owner=True, weight=1.0, ... ) >>> persona_summary(a) # doctest: +ELLIPSIS '30y, 2-person household, no children, ... car owner' """ parts: list[str] = [] for field_name in _PERSONA_FIELD_ORDER: v = getattr(agent, field_name, None) if v is None: continue if field_name == "age_head": parts.append(f"{int(v)}y") elif field_name == "household_size": parts.append(f"{int(v)}-person household") elif field_name == "has_children": parts.append("has children" if v else "no children") elif field_name == "has_elderly": if v: parts.append("has elderly") elif field_name == "income": parts.append(_income_bucket_label(float(v))) elif field_name == "education": parts.append(f"{v} education") elif field_name == "migration_background": parts.append("native" if v == "none" else f"{v} background") elif field_name == "employment_status": parts.append(v.replace("_", " ")) elif field_name == "tenure": parts.append(str(v)) elif field_name == "car_owner": parts.append("car owner" if v else "no car") return ", ".join(parts)
def _income_bucket_label(inc: float) -> str: """Categorize a numeric income into the fetcher's income brackets.""" # Thresholds between low/mid/high tertiles; midpoints are 900 / 1700 / 3200. if inc < 1300: return "low income" if inc < 2400: return "mid income" return "high income"
[docs] class AgentPopulation: """Collection of weighted representative agent types. Holds the full list of :class:`Agent` instances comprising the simulation's population, indexed by integer position. Each agent carries a ``weight`` that represents its population share; weights must sum to 1.0 across the population. Construct via :meth:`from_config` from a parsed agent YAML, which handles distributional sampling (per-zone Census or single national) or explicit per-agent records. Args: agents: List of :class:`Agent` instances. The constructor validates that their weights sum to 1.0 within tolerance. Raises: ValueError: If ``agents`` weights do not sum to 1 within ``1e-6``. Examples: >>> import agent_urban_planning as aup >>> # config = aup.data.builtin.load_agents("singapore_real_v2") >>> # pop = aup.AgentPopulation.from_config(config) >>> # len(pop) # number of representative types >>> # for agent in pop: >>> # ... # iterate over the population """ def __init__(self, agents: list[Agent]): self.agents = agents self._validate_weights() def _validate_weights(self): total = sum(a.weight for a in self.agents) if abs(total - 1.0) > 1e-6: raise ValueError( f"Agent weights must sum to 1.0, got {total:.6f}" ) def __len__(self) -> int: return len(self.agents) def __iter__(self): return iter(self.agents) def __getitem__(self, idx: int) -> Agent: return self.agents[idx]
[docs] @classmethod def from_config( cls, config: AgentDistributionalConfig, rng: Optional[np.random.RandomState] = None, strict: bool = True, ) -> "AgentPopulation": """Generate an agent population from a configuration object. Dispatches on ``config.mode``: ``"distributional"`` samples agents from per-zone Census distributions (or, for unit-test configs with ``strict=False``, a single national distribution), otherwise loads explicitly declared agent records. Args: config: Parsed :class:`AgentDistributionalConfig` from a YAML file. rng: Optional ``numpy.random.RandomState`` for reproducible sampling. If ``None``, a fresh state is created. strict: If ``True`` (default), reject configs without per-zone Census data. Set to ``False`` only in unit tests that use small synthetic configs. Returns: A new :class:`AgentPopulation`. Weights are normalized to sum to 1. Raises: RuntimeError: When ``strict=True`` and the config lacks per-zone ``zone_distributions``. Examples: >>> import agent_urban_planning as aup >>> # config = aup.data.builtin.load_agents("singapore_real_v2") >>> # pop = aup.AgentPopulation.from_config(config, strict=True) """ if config.mode == "distributional": # Per-zone Census sampling if zone_distributions is present if config.zone_distributions and len(config.zone_distributions) > 0: return cls._generate_from_zone_distributions(config, rng) if strict: raise RuntimeError( "Agent config has no zone_distributions. Cannot generate " "agents without per-zone Census data — a single national " "distribution produces spatially uniform demographics with " "no inter-zone income variation, which is not acceptable. " "Use config/agents/singapore_real_v2.yaml (has per-zone " "Census 2020 data) or run the data fetcher to generate it." ) # strict=False: allow old single-distribution path (for unit tests only) return cls._generate_from_distributions(config, rng) else: return cls._load_explicit(config)
@classmethod def _generate_from_distributions( cls, config: AgentDistributionalConfig, rng: Optional[np.random.RandomState] = None, ) -> "AgentPopulation": if rng is None: rng = np.random.RandomState() n = config.num_types dists = config.distributions # Sample each feature income = _sample_distribution(dists["income"], n, rng) age_head = _sample_distribution(dists["age_head"], n, rng).astype(int) household_size = _sample_distribution(dists["household_size"], n, rng).astype(int) has_children = _sample_distribution(dists["has_children"], n, rng).astype(bool) has_elderly = _sample_distribution(dists["has_elderly"], n, rng).astype(bool) car_owner = _sample_distribution(dists["car_owner"], n, rng).astype(bool) job_location = _sample_distribution(dists["job_location"], n, rng) # Equal weights for sampled types weight = 1.0 / n agents = [] for i in range(n): agents.append(Agent( agent_id=i, household_size=int(household_size[i]), age_head=int(age_head[i]), has_children=bool(has_children[i]), has_elderly=bool(has_elderly[i]), income=float(income[i]), savings=float(income[i]) * 6, # rough default: 6 months income job_location=str(job_location[i]), car_owner=bool(car_owner[i]), weight=weight, )) return cls(agents) @classmethod def _generate_from_zone_distributions( cls, config: AgentDistributionalConfig, rng: Optional[np.random.RandomState] = None, ) -> "AgentPopulation": """Generate agents from per-zone Census 2020 distributions. Each agent's demographics are sampled from the histogram of their home zone, producing a population that statistically matches each zone's real Census profile. """ if rng is None: rng = np.random.RandomState() n = config.num_types zd = config.zone_distributions dists = config.distributions # 1. Compute population-proportional agent counts per zone total_pop = sum(z.get("population", 0) for z in zd.values()) or 1 zone_agent_counts: dict[str, int] = {} remaining = n zone_names = sorted(zd.keys()) for i, zone_name in enumerate(zone_names): z = zd[zone_name] if i == len(zone_names) - 1: zone_agent_counts[zone_name] = remaining # last zone gets remainder else: count = max(1, round(n * z.get("population", 0) / total_pop)) count = min(count, remaining) zone_agent_counts[zone_name] = count remaining -= count # 2. Sample job_location weights from employment-density distribution job_dist = dists.get("job_location") if job_dist: job_values = job_dist.params["values"] job_weights = np.array(job_dist.params["weights"], dtype=float) job_weights /= job_weights.sum() else: job_values = zone_names job_weights = np.ones(len(zone_names)) / len(zone_names) # 3. Get car_owner rate (national — no per-zone data) car_pct = dists["car_owner"].params["p"] if "car_owner" in dists else 0.28 # 3b. Compute median employment for agglomeration wage premium. # Employment counts are embedded in job_location weights (proportional). # We use population as a proxy if per-zone employment isn't available. zone_populations = [z.get("population", 0) for z in zd.values()] median_pop = float(np.median([p for p in zone_populations if p > 0])) if zone_populations else 1.0 # Build a lookup: job_location → zone population (proxy for employment) zone_pop_lookup = {zone: zd[zone].get("population", 0) for zone in zd} # 4. Generate agents per zone agents = [] agent_id = 0 weight = 1.0 / n for zone_name in zone_names: z = zd[zone_name] zone_n = zone_agent_counts[zone_name] if zone_n <= 0: continue income_brackets = z.get("income_brackets", []) hh_hist = z.get("household_size_hist", []) age_brackets = z.get("age_brackets", []) for _ in range(zone_n): # Sample income from Census bracket histogram sampled_income = _sample_census_income(income_brackets, rng) # Sample household size from Census histogram sampled_hh = _sample_census_hh_size(hh_hist, rng) # Sample age from Census age brackets sampled_age = _sample_census_age(age_brackets, rng) # Derive has_children / has_elderly from zone age distribution children_pct = _derive_children_pct(age_brackets) elderly_pct = _derive_elderly_pct(age_brackets) has_children = bool(rng.random() < children_pct) has_elderly = bool(rng.random() < elderly_pct) # Sample job_location from employment weights job_idx = rng.choice(len(job_values), p=job_weights) job_loc = str(job_values[job_idx]) # Sample car_owner from national rate car_owner = bool(rng.random() < car_pct) # Apply agglomeration wage premium based on job_location from agent_urban_planning.core.constraints import compute_effective_income job_zone_pop = zone_pop_lookup.get(job_loc, median_pop) effective_income = compute_effective_income( sampled_income, int(job_zone_pop), median_pop ) agents.append(Agent( agent_id=agent_id, household_size=sampled_hh, age_head=sampled_age, has_children=has_children, has_elderly=has_elderly, income=effective_income, savings=effective_income * 6, job_location=job_loc, car_owner=car_owner, weight=weight, home_zone=zone_name, )) agent_id += 1 return cls(agents) @classmethod def _load_explicit(cls, config: AgentDistributionalConfig) -> "AgentPopulation": agents = [] for i, ad in enumerate(config.explicit_agents): agents.append(Agent( agent_id=i, household_size=ad["household_size"], age_head=ad["age_head"], has_children=ad["has_children"], has_elderly=ad["has_elderly"], income=ad["income"], savings=ad.get("savings", ad["income"] * 6), job_location=ad["job_location"], car_owner=ad["car_owner"], weight=ad["weight"], # Richer demographic fields (zensus-richer-demographics). # Default to None so pre-richer YAMLs still load. education=ad.get("education"), migration_background=ad.get("migration_background"), employment_status=ad.get("employment_status"), tenure=ad.get("tenure"), )) return cls(agents)
def _sample_distribution(dist_config, n: int, rng: np.random.RandomState) -> np.ndarray: """Sample n values from a distribution config.""" dtype = dist_config.type params = dist_config.params if dtype == "lognormal": # Convert mean/sigma to lognormal params mean = params["mean"] sigma = params["sigma"] mu_ln = np.log(mean) - 0.5 * sigma**2 samples = rng.lognormal(mu_ln, sigma, n) return np.maximum(samples, 100) # floor at 100 elif dtype == "truncated_normal": mean = params["mean"] std = params["std"] lo = params.get("min", mean - 4 * std) hi = params.get("max", mean + 4 * std) a = (lo - mean) / std b = (hi - mean) / std return stats.truncnorm.rvs(a, b, loc=mean, scale=std, size=n, random_state=rng) elif dtype == "categorical": values = params["values"] weights = params["weights"] weights = np.array(weights, dtype=float) weights /= weights.sum() indices = rng.choice(len(values), size=n, p=weights) return np.array([values[i] for i in indices]) elif dtype == "bernoulli": p = params["p"] return rng.random(n) < p else: raise ValueError(f"Unknown distribution type: {dtype}") # ------------------------------------------------------------------ # Census histogram sampling helpers # ------------------------------------------------------------------ # Income bracket label → (lower_bound, upper_bound) in SGD _INCOME_BRACKET_RANGES: dict[str, tuple[float, float]] = { "NoEmployedPerson": (0, 0), "Below_1_000": (200, 1000), "1_000_1_999": (1000, 2000), "2_000_2_999": (2000, 3000), "3_000_3_999": (3000, 4000), "4_000_4_999": (4000, 5000), "5_000_5_999": (5000, 6000), "6_000_6_999": (6000, 7000), "7_000_7_999": (7000, 8000), "8_000_8_999": (8000, 9000), "9_000_9_999": (9000, 10000), "10_000_10_999": (10000, 11000), "11_000_11_999": (11000, 12000), "12_000_12_999": (12000, 13000), "13_000_13_999": (13000, 14000), "14_000_14_999": (14000, 15000), "15_000_17_499": (15000, 17500), "17_500_19_999": (17500, 20000), "20_000andOver": (20000, 35000), } def _sample_census_income( brackets: list, rng: np.random.RandomState ) -> float: """Sample one income from a Census income bracket histogram. Each bracket is [label, count]. We weighted-random a bracket, then uniform-sample within the bracket range. """ if not brackets: return 5000.0 # national median fallback labels = [b[0] if isinstance(b, (list, tuple)) else b for b in brackets] counts = [b[1] if isinstance(b, (list, tuple)) else 0 for b in brackets] # Filter to brackets with positive count and known range valid = [] weights = [] for label, count in zip(labels, counts): if count > 0 and label in _INCOME_BRACKET_RANGES: lo, hi = _INCOME_BRACKET_RANGES[label] if lo == 0 and hi == 0: continue # skip NoEmployedPerson valid.append((lo, hi)) weights.append(count) if not valid: return 5000.0 weights_arr = np.array(weights, dtype=float) weights_arr /= weights_arr.sum() idx = rng.choice(len(valid), p=weights_arr) lo, hi = valid[idx] return float(rng.uniform(lo, hi)) def _sample_census_hh_size( hist: list, rng: np.random.RandomState ) -> int: """Sample one household size from a Census histogram. Each entry is [size, count]. The last bucket (8) represents 8+. """ if not hist: return 3 sizes = [int(h[0]) if isinstance(h, (list, tuple)) else int(h) for h in hist] counts = [int(h[1]) if isinstance(h, (list, tuple)) else 0 for h in hist] total = sum(counts) if total <= 0: return 3 weights = np.array(counts, dtype=float) / total idx = rng.choice(len(sizes), p=weights) return max(1, sizes[idx]) def _sample_census_age( brackets: list, rng: np.random.RandomState ) -> int: """Sample one age from a Census age bracket histogram. Each bracket is [label, count] where label is like "25_29" or "90+Over". We sample the head-of-household age (constrained to 21-85). """ if not brackets: return 42 valid = [] weights = [] for b in brackets: label = str(b[0]) if isinstance(b, (list, tuple)) else str(b) count = int(b[1]) if isinstance(b, (list, tuple)) else 0 if count <= 0: continue # Parse age range from label like "25_29", "0_4", "90+Over" lo, hi = _parse_age_bracket(label) if lo < 21: lo = 21 # head of household minimum if hi > 85: hi = 85 if lo > hi: continue valid.append((lo, hi)) weights.append(count) if not valid: return 42 weights_arr = np.array(weights, dtype=float) weights_arr /= weights_arr.sum() idx = rng.choice(len(valid), p=weights_arr) lo, hi = valid[idx] return int(rng.randint(lo, hi + 1)) def _parse_age_bracket(label: str) -> tuple[int, int]: """Parse Census age bracket label to (lo, hi) ages.""" label = label.strip().replace("+Over", "").replace("+", "") parts = label.split("_") try: if len(parts) == 2: return int(parts[0]), int(parts[1]) elif len(parts) == 1: age = int(parts[0]) return age, age + 4 # e.g., "90" → 90-94 except ValueError: pass return 21, 85 # safe fallback def _derive_children_pct(age_brackets: list) -> float: """Estimate has_children probability from zone's age distribution. Uses share of population aged 0-17 as proxy: if a zone has many children, its households are more likely to have children. """ if not age_brackets: return 0.37 # national default total = 0 children = 0 for b in age_brackets: label = str(b[0]) count = int(b[1]) if isinstance(b, (list, tuple)) else 0 lo, _ = _parse_age_bracket(label) total += count if lo < 18: children += count return children / total if total > 0 else 0.37 def _derive_elderly_pct(age_brackets: list) -> float: """Estimate has_elderly probability from zone's age distribution. Uses share of population aged 65+ as proxy. """ if not age_brackets: return 0.22 # national default total = 0 elderly = 0 for b in age_brackets: label = str(b[0]) count = int(b[1]) if isinstance(b, (list, tuple)) else 0 lo, _ = _parse_age_bracket(label) total += count if lo >= 65: elderly += count return elderly / total if total > 0 else 0.22