Skip to content

Intelligence System API Reference

Worker Entrypoint

Intelligence system worker: orchestrates risk calculations and pub/sub messaging.

Manages three concurrent tasks
  1. Hourly scheduled risk calculations for all monitored zones
  2. Instant (ad-hoc) risk requests from the backend via Redis queue
  3. Event publication to Redis for dashboard and subscription updates

The system listens on Redis for frontend subscription requests and location queries, publishing results back in real-time. All calculations are delegated to the zone processor which handles MET API calls and risk model logic.

job() async

Fetch weather data and calculate fire risk for all monitored zones (hourly).

Executes concurrency-limited requests to MET API for weather readings, processes each zone's data through risk calculation model, and saves results to database. Emits HOURLY_DATA_READY event via Redis when complete.

Note: Uses Semaphore to limit concurrent API requests (configurable via MAX_CONCURRENT_FETCHES setting) to avoid overloading external APIs or exhausting database connection pool.

process_instant_queue() async

Listen for and process instant (ad-hoc) risk calculation requests from backend.

Runs as long-lived coroutine that blocks on Redis (intelligence_tasks list), processing high-priority requests for specific locations. Each request includes a geohash/location_id; the worker fetches zone details, calculates risk, and publishes result back to Redis channel for the backend to stream to clients.

Errors (missing zones, API failures) are logged and processing continues. Implements 5-second backoff on failure to avoid tight error loops.

process_scheduled_locations() async

Run hourly background polling loop for all monitored zones.

Repeatedly calls job() on a fixed interval (FETCH_INTERVAL_SECONDS) to ensure all zones receive regular weather updates and risk calculations. Implements 60-second error backoff if job() fails, then resumes normal schedule.

This task forms the core of the intelligence system, keeping historical data fresh for both dashboard display and alerting logic.

main() async

Initialize database and start concurrent worker task group.

Seeds initial monitored zones (if empty) and spawns two long-lived tasks: - process_instant_queue(): Handles on-demand risk calculations - process_scheduled_locations(): Runs hourly bulk risk refresh cycle

Both tasks run concurrently, coordinating via Redis messaging and shared database. Worker continues indefinitely until process is terminated.

Configuration

Settings

Bases: BaseSettings

Manages application settings and configurations.

Zone Processing

Zone processing pipeline for weather fetch, risk calculation, and persistence.

process_zone(zone, semaphore=None) async

Fetch weather, calculate risk, and persist readings for one zone.

If a semaphore is provided, processing is executed within that concurrency guard to avoid overloading external APIs and the database.

Returns a lightweight risk payload for Redis publishing when successful, otherwise None.

Database Operations and Models

Database models and persistence helpers for the intelligence worker.

Base

Bases: DeclarativeBase

Base class for SQLAlchemy models.

MonitoredZone

Bases: Base

Represents a geographic zone (grid cell) that we are actively monitoring.

WeatherDataReading

Bases: Base

SQLAlchemy model for raw weather data readings.

FireRiskReading

Bases: Base

SQLAlchemy model for fire risk readings (History).

CurrentFireRisk

Bases: Base

Stores the MOST RECENT fire risk calculation for each zone. This table is optimized for fast lookups by the backend API.

create_db_and_tables() async

Creates the database and tables if they do not exist.

seed_initial_zones() async

Populates the database with initial regional zones if empty.

get_monitored_zones() async

Returns all active monitored zones.

get_zone_by_geohash(geohash) async

Returns a single monitored zone by its geohash.

get_db() async

Dependency for getting an async database session.

save_weather_data(location_name, lat, lon, weather_json) async

Saves the raw weather data for a given location to the database.

save_risk_data(location_name, lat, lon, risk_result) async

Saves the fire risk data to the history table AND updates the current risk table.

get_latest_readings(location_name, limit=1) async

Return recent weather and risk rows for one location for debugging.

This helper is primarily intended for operational inspection and tests.

Base

Bases: DeclarativeBase

Base class for all database models, providing a common metadata store.

MonitoredZone

Bases: Base

Represents a geographic zone (grid cell) that we are actively monitoring.

WeatherDataReading

Bases: Base

Represents a single reading of raw weather data from an external API.

FireRiskReading

Bases: Base

Represents the calculated fire risk for a specific location (History).

CurrentFireRisk

Bases: Base

Stores the MOST RECENT fire risk calculation for each zone. This table is optimized for fast lookups by the backend API.

Utilities

Adapters between MET weather payloads and the Fire Risk Computation Model.

transform_met_data_to_model(met_json)

Convert MET.no timeseries JSON into FRCM WeatherData input.

The function keeps the full forecast sequence and normalizes timestamps to timezone-aware datetimes expected by the model.

calculate_risk(met_json)

Run risk computation end-to-end and return the current prediction snapshot.

The FRCM output contains a forecast sequence; this function returns the second sample, which represents the first computed point after model initialization.

calculate_risk_score(ttf)

Calculates a normalized risk score (0-100) and category based on Time To Flashover (TTF).

TTF is expressed in minutes. Lower TTF means higher risk.

Parameters:

Name Type Description Default
ttf float

Time To Flashover in minutes.

required

Returns:

Type Description
float

A tuple containing:

str
  • risk_score: A float between 0 and 100 (100 = Extreme Risk).
Tuple[float, str]
  • risk_category: A string description (Low, Moderate, High, Extreme).

Grid/geohash helpers for seeding and analytics-target zone selection.

get_geohash(lat, lon, precision=5)

Converts latitude and longitude to a geohash string.

Parameters:

Name Type Description Default
lat float

Latitude.

required
lon float

Longitude.

required
precision int

Length of the geohash string. 3 chars ~= 156km x 156km (Coarse Regional Tier) 4 chars ~= 39km x 19km (Finer Regional Tier) 5 chars ~= 4.9km x 4.9km (Precise Tier for User Alerts)

5

Returns:

Type Description
str

The geohash string.

get_geohash_center(geohash)

Decodes a geohash string to its center latitude and longitude.

Parameters:

Name Type Description Default
geohash str

The geohash string.

required

Returns:

Type Description
Tuple[float, float]

A tuple of (latitude, longitude).

mark_analytics_targets(zones)

Mark a subset of zones as analytics targets based on nearest city mapping.

generate_initial_zones()

Generates a list of 'Tier 1' (Regional) zones covering the entire bounding box of Norway. These zones use coarser geohash precision (3 characters) for a broader overview, suitable for a national map.

Returns:

Type Description
List[dict]

A list of dictionaries representing zones.

MET.no weather API client helpers used by the intelligence worker.

fetch_weather(lat, lon) async

Asynchronously fetches weather data for a given latitude and longitude from the MET.no API.

Parameters:

Name Type Description Default
lat float

The latitude of the location.

required
lon float

The longitude of the location.

required

Returns:

Type Description
Any | None

A dictionary containing the weather data, or None if an error occurs.