Source code for agent_urban_planning.decisions.ahlfeldt_hierarchical_llm_engine

"""V5-hierarchical: full-LLM decision engine with clustering + hierarchical prompts.

Replaces the utility function V[i,j] entirely with an LLM call made per cluster
per market iteration. The LLM returns a top-5 ranking with scores for each
stage (stage 1: residence; stage 2: workplace given residence). Scores are
converted to sampling probabilities (via `_scores_to_probs`) and sampled
M = num_agents times to build the empirical `last_choice_probabilities` matrix
consumed by AhlfeldtMarket.

Sampling semantics (prompt_version v5-hierarchical-v2, see softmax-fix change):
  * At `softmax_T >= 1.0` (default): use LLM's validator-normalized top-5
    scores DIRECTLY as sampling probabilities. T=1 is the identity pass-
    through; T>1 offers no additional effect (scores already probabilistic).
  * At `softmax_T < 1.0`: apply `exp(log(scores+ε)/T)` to concentrate mass
    on the LLM's top-1 choice. Useful for ablation.

An earlier iteration composed `normalize → softmax(T=1)` which flattened
[0,1]-scale scores to near-uniform over the top-5 — see the
`_scores_to_probs` docstring for the full bug + fix writeup.

Key design decisions (see `openspec/changes/v5-full-llm-hierarchical/design.md`):
  • Clustering: kmeans on one-hot encoded demographics, K=50 default.
  • Stage 1 issues K LLM calls; stage 2 issues up to K × 5 (per unique
    (cluster, residence) pair observed in stage-1 output).
  • Cache: in-memory dict keyed on
      (cluster_id, stage, residence_name_or_None, prompt_version, price_bucket_tuple)
    backed by JSON files under `cache_dir` for cross-run reuse.
  • Softmax(T=1) over the returned scores; seeded numpy RNG draws M samples.

Cache namespace: `.cache/llm_v5_hierarchical/`.
"""
from __future__ import annotations

import hashlib
import json
import logging
from pathlib import Path
from typing import Any, Callable, Optional

import numpy as np

from agent_urban_planning.core.agents import Agent, persona_summary
from agent_urban_planning.data.loaders import AhlfeldtParams
from agent_urban_planning.decisions.ahlfeldt_abm_engine import AhlfeldtABMEngine
from agent_urban_planning.llm.async_client import AsyncLLMClient
from agent_urban_planning.decisions.base import LocationChoice
from agent_urban_planning.llm.prompts.hierarchical import (
    PROMPT_VERSION,
    build_stage1_prompt,
    build_stage2_prompt,
    validate_top5_response,
)


# Type alias for stage-1 prompt builder signature.
# (persona, zones_info, *, prompt_version) → (sys_msg, user_msg)
Stage1Builder = Callable[..., tuple[str, str]]
# Type alias for response validator signature.
# (raw, allowed_zone_names) → list[(name, score)]
ResponseValidator = Callable[..., list[tuple[str, float]]]


logger = logging.getLogger(__name__)


[docs] class AhlfeldtHierarchicalLLMEngine(AhlfeldtABMEngine): """LLM-as-decision-maker engine with clustering and two-stage prompts. The full V5 / V5 engine. Replaces ``V[i, j]`` entirely with an LLM call made per cluster per market iteration. Stage 1 selects a residence; stage 2 selects a workplace conditional on residence. Returned scores become sampling probabilities (via ``_scores_to_probs``) and are sampled ``M = num_agents`` times to build the empirical ``last_choice_probabilities`` matrix consumed by :class:`AhlfeldtMarket`. Cache namespace: ``.cache/llm_v5_hierarchical/``. Most users should configure this through :class:`LLMDecisionEngine`. Args: params: Structural Ahlfeldt parameters. llm_client: An LLM client object (``.complete(user, system="")`` returning a string). cluster_k: Number of clusters used in stage-1 prompt grouping. clustering_algo: Clustering algorithm; only ``"kmeans"`` is currently supported. zone_name_map: Optional ``synthetic_id -> real_name`` mapping for prompt-side zone naming. cache_dir: Directory where stage-1 / stage-2 LLM call results are persisted. softmax_T: Temperature applied to LLM-returned scores. max_retries: Retry budget on parse errors per LLM call. prompt_version: String identifier baked into cache keys. llm_concurrency: Max parallel LLM calls. progress_callback: Optional ``(stage, done, total, retries)`` callback for progress reporting. seed: Optional integer seed. prompt_builder_stage1: Optional override for the stage-1 prompt builder. response_validator_stage1: Optional override for the stage-1 response validator. stage2_top_k_residences: Optional cap on stage-2 fan-out (used in V5 score-all mode). **parent_kwargs: Forwarded to :class:`AhlfeldtABMEngine`. Examples: >>> import agent_urban_planning as aup >>> # Prefer the public wrapper: >>> # engine = aup.LLMDecisionEngine(params, llm_client=client, >>> # response_format="score_all") """ def __init__( self, params: AhlfeldtParams, llm_client, *, cluster_k: int = 50, clustering_algo: str = "kmeans", zone_name_map: Optional[dict[str, str]] = None, cache_dir: str | Path = ".cache/llm_v5_hierarchical", softmax_T: float = 1.0, max_retries: int = 3, prompt_version: str = PROMPT_VERSION, llm_concurrency: int = 15, progress_callback: Optional[Callable[[str, int, int, int], None]] = None, seed: Optional[int] = None, prompt_builder_stage1: Optional[Stage1Builder] = None, response_validator_stage1: Optional[ResponseValidator] = None, stage2_top_k_residences: Optional[int] = None, **parent_kwargs, ): # Parent: AhlfeldtABMEngine handles num_agents, batch_size, shock_distribution, # seed. We skip the shock path in decide_batch but keep the rest. parent_kwargs.setdefault("shock_distribution", "frechet") super().__init__(params, seed=seed, **parent_kwargs) self.llm_client = llm_client self.cluster_k = int(cluster_k) self.clustering_algo = clustering_algo self.zone_name_map = dict(zone_name_map) if zone_name_map else None # Reverse map: real_name → synthetic_id for LLM-output decoding. self._real_to_synth: dict[str, str] = {} if self.zone_name_map: self._real_to_synth = {v: k for k, v in self.zone_name_map.items()} self.softmax_T = float(softmax_T) self.max_retries = int(max_retries) self.prompt_version = prompt_version self.llm_concurrency = int(llm_concurrency) self.progress_callback = progress_callback # Stage-1 prompt builder + validator. Default to production V5 top-5. # Pass different values for V5 score-all-96 ablation. Stage-2 # always uses top-5 (see design §D6 in v5-score-all-96-ablation). self.prompt_builder_stage1: Stage1Builder = ( prompt_builder_stage1 if prompt_builder_stage1 is not None else build_stage1_prompt ) self.response_validator_stage1: ResponseValidator = ( response_validator_stage1 if response_validator_stage1 is not None else validate_top5_response ) # V5 cost-control knob. `None` = use every residence in the stage-1 # distribution (legacy top-5 behaviour: stage-1 top-5 → 5 # stage-2 calls/cluster). Set to an int K to cap stage-2 fan-out at # the top-K residences (by stage-1 probability) per cluster — used # in score-all-96 mode to prevent 20x LLM-call blowup. Residences # outside top-K get the sampler's uniform-workplace fallback. self.stage2_top_k_residences: Optional[int] = ( int(stage2_top_k_residences) if stage2_top_k_residences is not None else None ) # Async client for batched LLM calls. Lazy-initialized (so unit tests # that never call decide_batch don't spawn a loop). self._async_client: Optional[AsyncLLMClient] = None self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(parents=True, exist_ok=True) self._in_memory_cache: dict[tuple, list[tuple[str, float]]] = {} # Clustering state (populated on first decide_batch). self._cluster_labels: Optional[np.ndarray] = None # shape (M_types,) self._cluster_personas: list[str] = [] # length K self._cluster_weights: Optional[np.ndarray] = None # shape (K,) self._cluster_member_counts: Optional[np.ndarray] = None # shape (K,) self._agents_snapshot_ids: set[int] = set() # Per-iteration stats (reset each decide_batch). self._iter_stats = { "n_stage1_calls_made": 0, "n_stage1_cached": 0, "n_stage2_calls_made": 0, "n_stage2_cached": 0, "n_parse_failures": 0, } # ------------------------------------------------------------------ # Clustering # ------------------------------------------------------------------
[docs] def ensure_clustering(self, agents: list[Agent]) -> None: """Build (or reuse) the cluster assignments for the given agent set. Idempotent: re-calling with the same agent set is a no-op. On first call the agents are one-hot encoded and clustered with the configured algorithm (currently only ``kmeans``); ``self._cluster_labels``, ``self._cluster_personas``, and ``self._cluster_weights`` are populated. Args: agents: List of :class:`Agent` instances. Returns: None. Raises: ValueError: If ``self.clustering_algo`` is not supported. RuntimeError: If the cluster weights sum to zero. Examples: >>> import agent_urban_planning as aup >>> # engine = aup.LLMDecisionEngine(params, llm_client=client) >>> # engine.ensure_clustering(list(population)) """ ids = {int(a.agent_id) for a in agents} if self._agents_snapshot_ids == ids and self._cluster_labels is not None: return # One-hot encode categorical/ordinal fields. Use graceful-None handling. feats = _encode_agent_features(agents) # (M_types, n_feat) K = min(self.cluster_k, len(agents)) if self.clustering_algo == "kmeans": labels, centers = _kmeans_simple(feats, K, seed=self.seed) else: raise ValueError( f"clustering_algo={self.clustering_algo!r} not supported; " "this engine ships kmeans only. See docs for ablation knobs." ) self._cluster_labels = labels # Weights per cluster = sum of member agent weights. weights = np.array([float(a.weight) for a in agents], dtype=np.float64) K_actual = int(labels.max()) + 1 cw = np.zeros(K_actual, dtype=np.float64) cc = np.zeros(K_actual, dtype=np.int64) for i, lab in enumerate(labels): cw[int(lab)] += weights[i] cc[int(lab)] += 1 # Normalize to sum to 1 for within-cluster sampling weights. total_w = cw.sum() if total_w <= 0: raise RuntimeError("cluster weights sum to zero") cw_norm = cw / total_w self._cluster_weights = cw_norm self._cluster_member_counts = cc # Generate personas: pick the highest-weight agent in each cluster as the # persona representative, then format with persona_summary. self._cluster_personas = [] for c in range(K_actual): members = [i for i, lab in enumerate(labels) if int(lab) == c] if not members: self._cluster_personas.append("(empty cluster)") continue # Pick representative: highest weight within cluster (first wins on ties). rep_idx = max(members, key=lambda i: float(agents[i].weight)) self._cluster_personas.append(persona_summary(agents[rep_idx])) self._agents_snapshot_ids = ids logger.info( "V5 clustering: K=%d, member counts min/max/mean=%d/%d/%.1f", K_actual, int(cc.min()), int(cc.max()), float(cc.mean()), )
# ------------------------------------------------------------------ # Cache helpers # ------------------------------------------------------------------ # Price-bucket width for cache key. Wider buckets → more cache hits but # less price sensitivity in the prompt. 20% is a good default: late-iter # Q deltas (< 1%) fall into the same bucket; early-iter Q deltas (up to # ~10%) also stay in the same bucket about half the time. Observed in # development-phase experiments at seed 42. _PRICE_BUCKET_WIDTH = 0.20 def _cache_key( self, cluster_id: int, stage: int, residence_name: Optional[str], prices: dict[str, float], wages: dict[str, float], ) -> tuple: w = self._PRICE_BUCKET_WIDTH def bucket(d: dict[str, float]) -> tuple: return tuple(sorted((z, round(float(v) / w) * w) for z, v in d.items())) return ( int(cluster_id), int(stage), residence_name, self.prompt_version, bucket(prices), bucket(wages), ) def _key_hash(self, key: tuple) -> str: s = json.dumps(key, sort_keys=True, default=str) return hashlib.md5(s.encode()).hexdigest() def _cache_get(self, key: tuple) -> Optional[list[tuple[str, float]]]: if key in self._in_memory_cache: return self._in_memory_cache[key] path = self.cache_dir / f"{self._key_hash(key)}.json" if path.exists(): try: data = json.loads(path.read_text()) result = [(str(z), float(s)) for z, s in data["top_5"]] self._in_memory_cache[key] = result return result except Exception: return None return None def _cache_put(self, key: tuple, value: list[tuple[str, float]]) -> None: self._in_memory_cache[key] = value path = self.cache_dir / f"{self._key_hash(key)}.json" try: path.write_text(json.dumps({ "top_5": value, "prompt_version": self.prompt_version, })) except Exception: pass # ------------------------------------------------------------------ # LLM call helpers # ------------------------------------------------------------------ def _issue_prompt(self, system: str, user: str, allowed_zone_names: set[str], validator: Optional[ResponseValidator] = None, ) -> list[tuple[str, float]]: """Call the LLM once (synchronously), validate, retry on parse errors. Returns [(zone_name, normalized_score), ...]. Raises on persistent failure. Used for single-shot paths (unit tests); prefer `_batch_issue_prompts` in the main `decide_batch` codepath. `validator` defaults to `validate_top5_response` for backward compat with existing callers. Pass a different validator (e.g., `validate_all_scores_response`) for V5 score-all mode. """ if validator is None: validator = validate_top5_response last_exc: Optional[Exception] = None for attempt in range(self.max_retries): try: resp = self.llm_client.complete(user, system=system) except Exception as e: last_exc = e continue try: parsed = validator(resp, allowed_zone_names) return parsed except ValueError as e: last_exc = e self._iter_stats["n_parse_failures"] += 1 continue raise RuntimeError( f"LLM call failed after {self.max_retries} retries: {last_exc}" ) def _get_async_client(self) -> AsyncLLMClient: if self._async_client is None: self._async_client = AsyncLLMClient( self.llm_client, concurrency=self.llm_concurrency, ) return self._async_client def _batch_issue_prompts( self, prompts: list[tuple[str, str]], allowed_zone_names: set[str], progress_stage_label: str = "", validator: Optional[ResponseValidator] = None, ) -> list[list[tuple[str, float]]]: """Fire all prompts concurrently via `AsyncLLMClient.complete_many`, validate each response, retry the malformed/failed ones in follow-up batches up to `max_retries`. prompts: list of (system, user) tuples. validator: defaults to `validate_top5_response`. Pass a different validator (e.g., `validate_all_scores_response`) for V5 score-all mode. Stage-2 calls always pass the default. Returns: list of parsed results, same order as input. """ if validator is None: validator = validate_top5_response n = len(prompts) if n == 0: return [] results: list[Optional[list[tuple[str, float]]]] = [None] * n pending_indices = list(range(n)) async_client = self._get_async_client() for attempt in range(self.max_retries): if not pending_indices: break user_prompts = [prompts[i][1] for i in pending_indices] system_prompts = [prompts[i][0] for i in pending_indices] # Per-call progress via on_progress wrapped in our stage label. def _on_progress(done, total, _stage=progress_stage_label, _base=n - len(pending_indices)): if self.progress_callback is not None: # `done` here is relative to the current attempt's batch. # We report absolute completed count for this stage. absolute_done = _base + done # "cached" isn't known at batch time — pass 0 here; the # outer decide_batch updates the iter_stats counters # after the batch returns. self.progress_callback(_stage, absolute_done, n, 0) responses = async_client.complete_many( user_prompts, systems=system_prompts, on_progress=_on_progress, ) # Validate each response; on failure add to next-attempt queue. next_pending: list[int] = [] for local_idx, orig_idx in enumerate(pending_indices): resp = responses[local_idx] try: parsed = validator(resp, allowed_zone_names) results[orig_idx] = parsed except ValueError as e: self._iter_stats["n_parse_failures"] += 1 next_pending.append(orig_idx) except Exception as e: next_pending.append(orig_idx) pending_indices = next_pending if pending_indices: # All retries exhausted for some prompts. raise RuntimeError( f"LLM call failed after {self.max_retries} retries for " f"{len(pending_indices)}/{n} prompts in stage " f"{progress_stage_label!r}" ) return results # type: ignore[return-value] # ------------------------------------------------------------------ # decide_batch — main override # ------------------------------------------------------------------
[docs] def decide_batch( self, agents: list[Agent], environment, zone_options: list[str], prices: dict, ) -> list[LocationChoice]: """Issue stage-1 + stage-2 LLM calls and sample per-agent (R, W) choices. For each cluster, issues one stage-1 LLM call (residence ranking / scoring) plus up to ``stage2_top_k_residences`` stage-2 calls (workplace conditional on residence). Cached results are reused across iterations via the in-memory and on-disk caches keyed by ``(cluster, stage, residence, prompt_version, price_bucket, wage_bucket)``. Returned scores are converted to sampling probabilities and sampled ``num_agents`` times to populate ``self.last_choice_probabilities``. Args: agents: List of :class:`Agent` instances. environment: The :class:`Environment` carrying zones. zone_options: Allowed zone names. prices: Mapping ``zone -> Q_i``. Returns: List of :class:`LocationChoice`, one per input agent and in the same order. Examples: >>> import agent_urban_planning as aup >>> # engine = aup.LLMDecisionEngine(params, llm_client=client) >>> # choices = engine.decide_batch(agents, env, zones, prices) """ if not agents: return [] self.ensure_clustering(agents) # Reset per-iter stats. self._iter_stats = { "n_stage1_calls_made": 0, "n_stage1_cached": 0, "n_stage2_calls_made": 0, "n_stage2_cached": 0, "n_parse_failures": 0, } N = len(zone_options) zones = list(zone_options) synth_to_real = ( dict(self.zone_name_map) if self.zone_name_map else {z: z for z in zones} ) real_names = {synth_to_real[z] for z in zones} # For every synthetic zone, the name used in prompts + validation. prompt_name_of = {z: synth_to_real[z] for z in zones} # Reverse: prompt-name → synthetic id. synth_of_prompt_name = {prompt_name_of[z]: z for z in zones} # Build zone info (prices, wages, fundamentals). wages_source = self._current_wages amen_source = self._current_amenity def _resolve_B(z): if amen_source and z in amen_source and amen_source[z] is not None: return float(amen_source[z]) v = environment.get_zone(z).amenity_B return float(v) if v else 1e-6 def _resolve_A(z): v = environment.get_zone(z).productivity_A return float(v) if v else 1.0 def _resolve_w(z): v = wages_source.get(z) if wages_source else None if v is None: v = environment.get_zone(z).wage_observed return float(v) if v else 1.0 zones_info_by_synth: dict[str, dict] = {} prices_dict: dict[str, float] = {} wages_dict: dict[str, float] = {} for z in zones: Q = float(prices.get(z, environment.get_zone(z).floor_price_observed)) w = _resolve_w(z) B = _resolve_B(z) A = _resolve_A(z) zones_info_by_synth[z] = { "name": prompt_name_of[z], "Q": Q, "w": w, "B": B, "A": A, } prices_dict[z] = Q wages_dict[z] = w # Travel-time matrix (synthetic-id keyed). if environment.transport_matrix is not None and environment.transport_matrix_index: tt_idx = [environment._matrix_index_map[z] for z in zones] tau = environment.transport_matrix[np.ix_(tt_idx, tt_idx)].astype( np.float64, copy=False, ) else: tau = np.zeros((N, N), dtype=np.float64) for i, zi in enumerate(zones): for j, zj in enumerate(zones): tau[i, j] = environment.travel_time(zi, zj) K = int(self._cluster_weights.shape[0]) # --- STAGE 1: residence per cluster (batched) ------------------ stage1_dist_by_cluster: dict[int, list[tuple[str, float]]] = {} stage1_synth_dist: dict[int, list[tuple[str, float]]] = {} zones_info_list = [zones_info_by_synth[z] for z in zones] # Split K clusters into cached vs uncached. s1_uncached_clusters: list[int] = [] s1_uncached_prompts: list[tuple[str, str]] = [] s1_cache_keys: dict[int, tuple] = {} for c in range(K): key = self._cache_key(c, 1, None, prices_dict, wages_dict) s1_cache_keys[c] = key cached = self._cache_get(key) if cached is not None: stage1_dist_by_cluster[c] = cached self._iter_stats["n_stage1_cached"] += 1 else: sys_msg, user_msg = self.prompt_builder_stage1( self._cluster_personas[c], zones_info_list, prompt_version=self.prompt_version, ) s1_uncached_clusters.append(c) s1_uncached_prompts.append((sys_msg, user_msg)) # Batch-dispatch the uncached prompts (stage 1 uses configurable validator). if s1_uncached_prompts: parsed_list = self._batch_issue_prompts( s1_uncached_prompts, real_names, progress_stage_label="stage1", validator=self.response_validator_stage1, ) for c, parsed in zip(s1_uncached_clusters, parsed_list): stage1_dist_by_cluster[c] = parsed self._cache_put(s1_cache_keys[c], parsed) self._iter_stats["n_stage1_calls_made"] += 1 # Decode all stage-1 distributions into synthetic-id space. for c in range(K): stage1_synth_dist[c] = [ (synth_of_prompt_name[z], s) for z, s in stage1_dist_by_cluster[c] ] # --- STAGE 2: workplace given residence (batched) --------------- stage2_dist: dict[tuple[int, str], list[tuple[str, float]]] = {} needed_pairs: list[tuple[int, str]] = [] for c, dist in stage1_synth_dist.items(): # Optional cap: only query stage-2 for the top-K residences. # Used by the V5 score-all-96 variant to prevent stage-2 fan-out # from exploding from 5→96 residences per cluster. Residences # outside top-K fall back to uniform workplace in the sampler. if self.stage2_top_k_residences is not None: topk = sorted(dist, key=lambda x: -x[1])[:self.stage2_top_k_residences] else: topk = dist for synth_res, _s in topk: needed_pairs.append((c, synth_res)) s2_uncached_pairs: list[tuple[int, str]] = [] s2_uncached_prompts: list[tuple[str, str]] = [] s2_cache_keys: dict[tuple[int, str], tuple] = {} for c, synth_res in needed_pairs: res_prompt_name = prompt_name_of[synth_res] key = self._cache_key(c, 2, res_prompt_name, prices_dict, wages_dict) s2_cache_keys[(c, synth_res)] = key cached = self._cache_get(key) if cached is not None: stage2_dist[(c, synth_res)] = cached self._iter_stats["n_stage2_cached"] += 1 else: res_idx = zones.index(synth_res) wpl_info = [ { "name": prompt_name_of[z], "w": zones_info_by_synth[z]["w"], "commute_min": float(tau[res_idx, j]), } for j, z in enumerate(zones) ] sys_msg, user_msg = build_stage2_prompt( self._cluster_personas[c], res_prompt_name, wpl_info, prompt_version=self.prompt_version, ) s2_uncached_pairs.append((c, synth_res)) s2_uncached_prompts.append((sys_msg, user_msg)) if s2_uncached_prompts: parsed_list = self._batch_issue_prompts( s2_uncached_prompts, real_names, progress_stage_label="stage2", ) for (c, synth_res), parsed in zip(s2_uncached_pairs, parsed_list): stage2_dist[(c, synth_res)] = parsed self._cache_put(s2_cache_keys[(c, synth_res)], parsed) self._iter_stats["n_stage2_calls_made"] += 1 # --- Softmax + MC sampling expand K → M -------------------------- P_agg, HR_count, HM_count = self._sample_m_from_distributions( N, zones, synth_of_prompt_name, stage1_synth_dist, stage2_dist, ) # Hand off to the market. self.last_choice_probabilities = P_agg.astype( self._np_dtype, copy=False, ) # Diagnostics. self.last_abm_diagnostics = { "num_agents": int(self.num_agents), "n_clusters": K, "n_stage1_calls": self._iter_stats["n_stage1_calls_made"], "n_stage1_cached": self._iter_stats["n_stage1_cached"], "n_stage2_calls": self._iter_stats["n_stage2_calls_made"], "n_stage2_cached": self._iter_stats["n_stage2_cached"], "n_parse_failures": self._iter_stats["n_parse_failures"], "n_cells_nonzero": int((P_agg > 0).sum()), "total_cells": int(P_agg.size), "n_residence_marginal_zero": int((HR_count == 0).sum()), "n_workplace_marginal_zero": int((HM_count == 0).sum()), "HR_min": int(HR_count.min()), "HR_max": int(HR_count.max()), "HM_min": int(HM_count.min()), "HM_max": int(HM_count.max()), "softmax_T": self.softmax_T, "prompt_version": self.prompt_version, "cluster_k": K, } # Per-input-agent LocationChoice (population-modal cell). flat_mode = int(P_agg.argmax()) i_mode = flat_mode // N j_mode = flat_mode - i_mode * N return [ LocationChoice( residence=zones[i_mode], workplace=zones[j_mode], utility=0.0, zone_utilities={}, ) for _ in agents ]
# ------------------------------------------------------------------ def _sample_m_from_distributions( self, N: int, zones: list[str], synth_of_prompt_name: dict[str, str], stage1_synth_dist: dict[int, list[tuple[str, float]]], stage2_dist: dict[tuple[int, str], list[tuple[str, float]]], ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Expand K-cluster distributions to M MC samples, then aggregate. Score → sampling-probability branching (v5-hierarchical-v2 semantics): The validator pre-normalizes LLM scores to sum to 1. At softmax_T >= 1.0 we use those raw probabilities DIRECTLY (T=1 is the no-op identity). At softmax_T < 1.0 we apply `exp(log(scores+ε)/T)` renormalization to concentrate mass on the LLM's top-ranked zones. Previous (buggy) behavior composed `normalize → softmax(scores/T=1)` which flattened [0,1]-scale scores into near-uniform over the top-5. See openspec/changes/v5-hierarchical-softmax-fix/ for the fix rationale. """ M = int(self.num_agents) T = max(float(self.softmax_T), 1e-6) K = int(self._cluster_weights.shape[0]) zone_to_idx = {z: i for i, z in enumerate(zones)} rng = np.random.default_rng((self.seed * 2_000_003 + 7) & 0xFFFFFFFF) # Per-agent cluster assignment (weighted by cluster_weights). cluster_of_m = rng.choice(K, size=M, p=self._cluster_weights) # For each cluster, pre-compute sampling probs for residence. stage1_probs: dict[int, tuple[np.ndarray, np.ndarray]] = {} # cluster → (synth_id_array, prob_array) for c in range(K): dist = stage1_synth_dist.get(c, []) if not dist: # Empty distribution — fall back to uniform over all zones. stage1_probs[c] = ( np.array(zones), np.ones(N) / N, ) continue names = np.array([z for z, _ in dist]) scores = np.array([s for _, s in dist], dtype=np.float64) probs = _scores_to_probs(scores, T) stage1_probs[c] = (names, probs) # Pre-compute workplace sampling probs per (cluster, residence). stage2_probs: dict[tuple[int, str], tuple[np.ndarray, np.ndarray]] = {} for (c, res_synth), dist in stage2_dist.items(): if not dist: stage2_probs[(c, res_synth)] = ( np.array(zones), np.ones(N) / N, ) continue # Stage-2 distributions use prompt-name (real) keys; decode to synth. names = np.array([synth_of_prompt_name.get(z, z) for z, _ in dist]) scores = np.array([s for _, s in dist], dtype=np.float64) probs = _scores_to_probs(scores, T) stage2_probs[(c, res_synth)] = (names, probs) HR_count = np.zeros(N, dtype=np.int64) HM_count = np.zeros(N, dtype=np.int64) P_count = np.zeros((N, N), dtype=np.int64) # Sample M agents — group by cluster for efficiency. for c in range(K): mask = cluster_of_m == c m_c = int(mask.sum()) if m_c == 0: continue s1_names, s1_probs = stage1_probs[c] # Stage 1: draw residences. res_idx_in_dist = rng.choice(len(s1_probs), size=m_c, p=s1_probs) res_synth_per_agent = s1_names[res_idx_in_dist] # For each unique residence picked, sample that sub-batch's workplace. unique_res, inverse = np.unique(res_synth_per_agent, return_inverse=True) for u_idx, res_synth in enumerate(unique_res): sub_mask = inverse == u_idx sub_n = int(sub_mask.sum()) key = (c, str(res_synth)) if key in stage2_probs: s2_names, s2_probs = stage2_probs[key] else: s2_names, s2_probs = np.array(zones), np.ones(N) / N wp_idx_in_dist = rng.choice( len(s2_probs), size=sub_n, p=s2_probs, ) wp_synth_per_agent = s2_names[wp_idx_in_dist] # Accumulate aggregates. res_i = zone_to_idx[str(res_synth)] for wp in wp_synth_per_agent: wp_i = zone_to_idx[str(wp)] P_count[res_i, wp_i] += 1 HR_count[res_i] += 1 HM_count[wp_i] += 1 P_agg = P_count.astype(np.float64) / float(M) return P_agg, HR_count, HM_count
[docs] def cluster_personas(self) -> list[str]: """Return a copy of the per-cluster persona strings. Each persona is a one-line :func:`persona_summary` of the highest-weight agent in the cluster, used as the persona block in stage-1 prompts. Useful for diagnostic logging. Returns: List of persona strings, one per cluster. Examples: >>> import agent_urban_planning as aup >>> # engine.cluster_personas() # one persona per cluster """ return list(self._cluster_personas)
# --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _softmax(scores: np.ndarray, T: float) -> np.ndarray: """Numerically-stable softmax with temperature T over raw scores as logits. Kept for library use and direct testing; NOT called in the default V5 sampling path (which uses `_scores_to_probs` instead). See that helper for the score-interpretation semantics used by decide_batch. """ x = np.asarray(scores, dtype=np.float64) / max(T, 1e-6) x -= x.max() e = np.exp(x) s = e.sum() if s <= 0: n = e.size return np.ones(n) / n return e / s def _scores_to_probs(scores: np.ndarray, T: float) -> np.ndarray: """Convert LLM-returned top-K scores into a sampling distribution. The LLM returns values in [0, 1] that the validator has already normalized to sum to 1 (so they function as a probability distribution). We therefore branch on `T`: * T >= 1.0 → return the raw probabilities as-is. This is the identity pass-through semantically — we trust the LLM's stated relative preferences. Previously the code composed `normalize → softmax(T=1)` which flattened [0,1] scores into near-uniform; that was a bug. * T < 1.0 → concentrate on the LLM's top choice via a true temperature- scaled softmax over log-probabilities: probs ∝ exp(log(score) / T). This is the well-known language-model "temperature sampling" form; as T → 0 it concentrates on argmax, and T=1 recovers the raw probabilities (continuous with the T>=1 branch above). Numerical guard: scores of exactly 0 are floored with ε=1e-12 before log to avoid -inf. All-zero input falls back to uniform. """ scores = np.asarray(scores, dtype=np.float64) total = float(scores.sum()) if total <= 0: n = scores.size return np.ones(n) / n normalized = scores / total if T >= 1.0 - 1e-9: return normalized # T < 1.0: concentrate via temperature-scaled softmax over log-probs. eps = 1e-12 log_p = np.log(np.maximum(normalized, eps)) x = log_p / max(T, 1e-6) x -= x.max() e = np.exp(x) s = e.sum() if s <= 0: n = e.size return np.ones(n) / n return e / s def _encode_agent_features(agents: list[Agent]) -> np.ndarray: """One-hot encode 10 demographic axes. Missing fields encode as all-zero for that axis (graceful handling of non-richer agents).""" # Category definitions matching the richer joint labels. HH_BUCKETS = (1, 2, 3, 4) AGE_BUCKETS = ((24, 34), (34, 44), (44, 54), (54, 64), (64, 99)) INCOME_BUCKETS = ((0, 1300), (1300, 2400), (2400, 10**6)) EDU = ("low", "mid", "high") MIG = ("none", "EU", "non-EU") EMP = ("employed", "self-employed", "unemployed", "retired_or_student") TEN = ("owner", "renter") def onehot(val, options) -> list[float]: return [1.0 if val == o else 0.0 for o in options] rows: list[list[float]] = [] for a in agents: row: list[float] = [] # hh_size row.extend(onehot(min(int(a.household_size), 4), HH_BUCKETS)) # age brackets age_oh = [0.0] * len(AGE_BUCKETS) for i, (lo, hi) in enumerate(AGE_BUCKETS): if lo < int(a.age_head) <= hi: age_oh[i] = 1.0 break row.extend(age_oh) # income brackets inc_oh = [0.0] * len(INCOME_BUCKETS) for i, (lo, hi) in enumerate(INCOME_BUCKETS): if lo <= float(a.income) < hi: inc_oh[i] = 1.0 break row.extend(inc_oh) # binary flags row.append(1.0 if a.has_children else 0.0) row.append(1.0 if a.has_elderly else 0.0) row.append(1.0 if a.car_owner else 0.0) # richer categoricals (None → all zeros for that field) row.extend(onehot(a.education, EDU) if a.education else [0.0] * len(EDU)) row.extend(onehot(a.migration_background, MIG) if a.migration_background else [0.0] * len(MIG)) row.extend(onehot(a.employment_status, EMP) if a.employment_status else [0.0] * len(EMP)) row.extend(onehot(a.tenure, TEN) if a.tenure else [0.0] * len(TEN)) rows.append(row) return np.asarray(rows, dtype=np.float64) def _kmeans_simple( X: np.ndarray, K: int, seed: Optional[int], max_iter: int = 100, ) -> tuple[np.ndarray, np.ndarray]: """Deterministic k-means with k-means++ initialization. Returns (labels[M], centers[K, D]). """ rng = np.random.default_rng(seed if seed is not None else 0) M, D = X.shape if K >= M: # Degenerate: one cluster per agent. labels = np.arange(M, dtype=np.int64) return labels, X.copy() # k-means++ init. centers = np.empty((K, D), dtype=X.dtype) idx0 = int(rng.integers(0, M)) centers[0] = X[idx0] for c in range(1, K): d = np.min( ((X[:, None, :] - centers[None, :c, :]) ** 2).sum(axis=2), axis=1, ) if d.sum() <= 0: # All points identical to selected centers — pick randomly. idx = int(rng.integers(0, M)) else: probs = d / d.sum() idx = int(rng.choice(M, p=probs)) centers[c] = X[idx] labels = np.zeros(M, dtype=np.int64) for _ in range(max_iter): # Assign. d2 = ((X[:, None, :] - centers[None, :, :]) ** 2).sum(axis=2) new_labels = d2.argmin(axis=1) if np.array_equal(new_labels, labels): labels = new_labels break labels = new_labels # Update. for c in range(K): members = X[labels == c] if len(members) > 0: centers[c] = members.mean(axis=0) return labels, centers