Source code for agent_urban_planning.core.metrics

"""Welfare metrics computation."""

from __future__ import annotations

import json
from dataclasses import asdict, dataclass, field
from typing import Optional

import numpy as np

from agent_urban_planning.core.agents import Agent, AgentPopulation
from agent_urban_planning.decisions.base import LocationChoice, ZoneChoice
from agent_urban_planning.core.environment import Environment


@dataclass
class FacilityUtilization:
    zone: str
    facility_type: str
    capacity: int
    demand: float
    utilization: float  # demand / capacity


[docs] @dataclass class WelfareMetrics: """Aggregate welfare metrics summarizing one simulation run. Captures both classical welfare measures (average utility, weighted Gini, Rawlsian min/max) and the planning-relevant indicators (commute times, housing affordability, facility utilization, zone-level population, employment, wages, prices). Populated by :func:`compute_metrics` from market output. Attributes: avg_utility: Population-weighted mean utility. gini_coefficient: Weighted Gini coefficient of utility values (0 = perfect equality, 1 = perfect inequality). Negative utilities are shifted to non-negative before computation. min_utility: Smallest realized utility (Rawlsian floor). max_utility: Largest realized utility. avg_commute_minutes: Weighted mean commute time. long_commute_share: Share of population with commute exceeding ``long_commute_threshold`` minutes. housing_unaffordable_share: Share of population whose price/income ratio exceeds ``affordability_threshold``. zone_populations: Mapping ``zone -> share`` of total population keyed by residence zone. zone_prices: Mapping ``zone -> equilibrium price``. facility_utilization: Per-facility utilization (demand / capacity) records. market_converged: Whether the market clearer reached convergence. market_convergence_metric: Final residual at termination. zone_employment: Mapping ``zone -> share`` keyed by workplace. zone_wages: Mapping ``zone -> wage`` (Ahlfeldt scenarios only). Examples: >>> import agent_urban_planning as aup >>> # Typically created by aup.compute_metrics(); see the quickstart tutorial. >>> # results.metrics.avg_utility """ avg_utility: float gini_coefficient: float min_utility: float max_utility: float avg_commute_minutes: float long_commute_share: float # share with commute > threshold housing_unaffordable_share: float # share spending > 30% income zone_populations: dict[str, float] # zone → population weight (by residence) zone_prices: dict[str, float] # zone → equilibrium price facility_utilization: list[FacilityUtilization] = field(default_factory=list) market_converged: bool = True market_convergence_metric: float = 0.0 # Added in berlin-replication-abm: zone → population weight keyed by workplace. # Populated from LocationChoice.workplace. For Singapore scenarios this # equals the aggregate of agent.job_location across the population; # for Berlin scenarios it reflects the engine's joint R×W choice. zone_employment: dict[str, float] = field(default_factory=dict) # Added in berlin-replication-abm: zone → wage (only populated in Berlin). zone_wages: dict[str, float] = field(default_factory=dict)
[docs] def to_dict(self) -> dict: """Return a JSON-serializable dict copy of every field. Returns: Plain ``dict`` produced by :func:`dataclasses.asdict`. Examples: >>> from agent_urban_planning.core.metrics import WelfareMetrics >>> m = WelfareMetrics( ... avg_utility=1.0, gini_coefficient=0.2, min_utility=0.0, ... max_utility=2.0, avg_commute_minutes=20.0, ... long_commute_share=0.1, housing_unaffordable_share=0.05, ... zone_populations={}, zone_prices={}, ... ) >>> m.to_dict()["avg_utility"] 1.0 """ return asdict(self)
[docs] def to_json(self) -> str: """Serialize to a pretty-printed JSON string. Returns: Indented JSON string suitable for writing to disk. Examples: >>> from agent_urban_planning.core.metrics import WelfareMetrics >>> m = WelfareMetrics( ... avg_utility=1.0, gini_coefficient=0.2, min_utility=0.0, ... max_utility=2.0, avg_commute_minutes=20.0, ... long_commute_share=0.1, housing_unaffordable_share=0.05, ... zone_populations={}, zone_prices={}, ... ) >>> "avg_utility" in m.to_json() True """ return json.dumps(self.to_dict(), indent=2, default=str)
[docs] @classmethod def from_dict(cls, data: dict) -> "WelfareMetrics": """Reconstruct a :class:`WelfareMetrics` from its serialized dict. Args: data: Dict shaped like the output of :meth:`to_dict`. Returns: A :class:`WelfareMetrics` instance with ``facility_utilization`` records re-hydrated from their dict form. Examples: >>> from agent_urban_planning.core.metrics import WelfareMetrics >>> m = WelfareMetrics( ... avg_utility=1.0, gini_coefficient=0.2, min_utility=0.0, ... max_utility=2.0, avg_commute_minutes=20.0, ... long_commute_share=0.1, housing_unaffordable_share=0.05, ... zone_populations={}, zone_prices={}, ... ) >>> WelfareMetrics.from_dict(m.to_dict()).avg_utility 1.0 """ fu_data = data.pop("facility_utilization", []) facility_utilization = [FacilityUtilization(**f) for f in fu_data] return cls(facility_utilization=facility_utilization, **data)
def compute_weighted_gini(values: np.ndarray, weights: np.ndarray) -> float: """Compute weighted Gini coefficient using the standard formula. If values contain negatives (common for utility), they are shifted so the minimum becomes zero before computing the Gini. This is the standard approach for welfare Gini with utility values that can be negative (see Atkinson 1970, Sen 1997). """ if len(values) == 0 or weights.sum() == 0: return 0.0 # Shift to non-negative if needed (Gini is undefined for negative values) shifted = values.copy() if shifted.min() < 0: shifted = shifted - shifted.min() # Sort by value sorted_indices = np.argsort(shifted) sorted_values = shifted[sorted_indices] sorted_weights = weights[sorted_indices] total_w = sorted_weights.sum() weighted_mean = np.average(sorted_values, weights=sorted_weights) if weighted_mean == 0: return 0.0 # Weighted mean absolute difference # Gini = sum_i sum_j w_i w_j |x_i - x_j| / (2 * W^2 * mean) n = len(sorted_values) numerator = 0.0 for i in range(n): for j in range(n): numerator += sorted_weights[i] * sorted_weights[j] * abs(sorted_values[i] - sorted_values[j]) gini = numerator / (2.0 * total_w * total_w * weighted_mean) return max(0.0, min(1.0, gini))
[docs] def compute_metrics( population: AgentPopulation, environment: Environment, allocations: dict[int, ZoneChoice], prices: dict[str, float], long_commute_threshold: float = 60.0, affordability_threshold: float = 0.30, market_converged: bool = True, market_convergence_metric: float = 0.0, wages: Optional[dict[str, float]] = None, ) -> WelfareMetrics: """Compute aggregate welfare metrics from per-agent allocations. Walks the population, looks up each agent's chosen zone and route, and aggregates weighted means, the Gini coefficient, Rawlsian extremes, commute statistics, affordability share, facility utilization, and per-zone population/employment/wage tables. Args: population: The simulated :class:`AgentPopulation`. environment: The :class:`Environment` used by the simulation. allocations: Dict mapping ``agent_id`` to the :class:`ZoneChoice` produced by the market clearer. prices: Mapping ``zone -> equilibrium price``. long_commute_threshold: Minutes threshold for the ``long_commute_share`` indicator. Defaults to ``60.0``. affordability_threshold: Price/income ratio above which an agent's housing counts as unaffordable. Defaults to ``0.30``. market_converged: Whether the upstream market clearer reached convergence. Recorded in the result. market_convergence_metric: Final residual to record. wages: Optional ``zone -> wage`` mapping (Ahlfeldt scenarios). Returns: A :class:`WelfareMetrics` describing the run's welfare outcome. Examples: >>> import agent_urban_planning as aup >>> # metrics = aup.compute_metrics(population, env, allocations, prices) >>> # metrics.gini_coefficient """ utilities = [] weights = [] commute_times = [] housing_ratios = [] zone_pops: dict[str, float] = {name: 0.0 for name in environment.zone_names} zone_emp: dict[str, float] = {name: 0.0 for name in environment.zone_names} for agent in population: choice = allocations[agent.agent_id] zone_name = choice.zone_name # backward-compat alias for residence utility = choice.utility utilities.append(utility) weights.append(agent.weight) zone_pops[zone_name] += agent.weight # Workplace resolution priority: # 1. agent.job_location if present (Singapore-style — agent attribute is authoritative) # 2. LocationChoice.workplace if agent has no job_location (Berlin-style joint choice) # 3. fall back to residence zone workplace = agent.job_location or getattr(choice, "workplace", "") or zone_name if workplace in zone_emp: zone_emp[workplace] += agent.weight # Commute time based on resolved workplace route = environment.transport.get_best_route(zone_name, workplace) commute = route.time_minutes if route else 120.0 commute_times.append(commute) # Housing ratio price = prices.get(zone_name, 0.0) ratio = price / agent.income if agent.income > 0 else 1.0 housing_ratios.append(ratio) utilities_arr = np.array(utilities) weights_arr = np.array(weights) commute_arr = np.array(commute_times) ratio_arr = np.array(housing_ratios) # Weighted averages avg_utility = float(np.average(utilities_arr, weights=weights_arr)) avg_commute = float(np.average(commute_arr, weights=weights_arr)) long_commute_share = float( np.average(commute_arr > long_commute_threshold, weights=weights_arr) ) unaffordable_share = float( np.average(ratio_arr > affordability_threshold, weights=weights_arr) ) # Gini gini = compute_weighted_gini(utilities_arr, weights_arr) # Facility utilization facility_util = [] for zone_name, zone in environment.zones.items(): pop_in_zone = zone_pops[zone_name] for facility in zone.facilities: demand = pop_in_zone * sum(a.weight for a in population) # approximate # Use population share as a proxy for demand util_rate = pop_in_zone / (facility.capacity / sum( z.housing_supply for z in environment.zones.values() )) if facility.capacity > 0 else 0.0 facility_util.append(FacilityUtilization( zone=zone_name, facility_type=facility.type, capacity=facility.capacity, demand=pop_in_zone, utilization=util_rate, )) return WelfareMetrics( avg_utility=avg_utility, gini_coefficient=gini, min_utility=float(utilities_arr.min()), max_utility=float(utilities_arr.max()), avg_commute_minutes=avg_commute, long_commute_share=long_commute_share, housing_unaffordable_share=unaffordable_share, zone_populations=zone_pops, zone_prices=dict(prices), facility_utilization=facility_util, market_converged=market_converged, market_convergence_metric=market_convergence_metric, zone_employment=zone_emp, zone_wages=dict(wages) if wages else {}, )