Intelligence System API Reference¶
Worker Entrypoint¶
Intelligence system worker: orchestrates risk calculations and pub/sub messaging.
Manages three concurrent tasks
- Hourly scheduled risk calculations for all monitored zones
- Instant (ad-hoc) risk requests from the backend via Redis queue
- Event publication to Redis for dashboard and subscription updates
The system listens on Redis for frontend subscription requests and location queries, publishing results back in real-time. All calculations are delegated to the zone processor which handles MET API calls and risk model logic.
job()
async
¶
Fetch weather data and calculate fire risk for all monitored zones (hourly).
Executes concurrency-limited requests to MET API for weather readings, processes each zone's data through risk calculation model, and saves results to database. Emits HOURLY_DATA_READY event via Redis when complete.
Note: Uses Semaphore to limit concurrent API requests (configurable via MAX_CONCURRENT_FETCHES setting) to avoid overloading external APIs or exhausting database connection pool.
process_instant_queue()
async
¶
Listen for and process instant (ad-hoc) risk calculation requests from backend.
Runs as long-lived coroutine that blocks on Redis (intelligence_tasks list), processing high-priority requests for specific locations. Each request includes a geohash/location_id; the worker fetches zone details, calculates risk, and publishes result back to Redis channel for the backend to stream to clients.
Errors (missing zones, API failures) are logged and processing continues. Implements 5-second backoff on failure to avoid tight error loops.
process_scheduled_locations()
async
¶
Run hourly background polling loop for all monitored zones.
Repeatedly calls job() on a fixed interval (FETCH_INTERVAL_SECONDS) to ensure all zones receive regular weather updates and risk calculations. Implements 60-second error backoff if job() fails, then resumes normal schedule.
This task forms the core of the intelligence system, keeping historical data fresh for both dashboard display and alerting logic.
main()
async
¶
Initialize database and start concurrent worker task group.
Seeds initial monitored zones (if empty) and spawns two long-lived tasks: - process_instant_queue(): Handles on-demand risk calculations - process_scheduled_locations(): Runs hourly bulk risk refresh cycle
Both tasks run concurrently, coordinating via Redis messaging and shared database. Worker continues indefinitely until process is terminated.
Configuration¶
Settings
¶
Bases: BaseSettings
Manages application settings and configurations.
Zone Processing¶
Zone processing pipeline for weather fetch, risk calculation, and persistence.
process_zone(zone, semaphore=None)
async
¶
Fetch weather, calculate risk, and persist readings for one zone.
If a semaphore is provided, processing is executed within that concurrency guard to avoid overloading external APIs and the database.
Returns a lightweight risk payload for Redis publishing when successful,
otherwise None.
Database Operations and Models¶
Database models and persistence helpers for the intelligence worker.
Base
¶
Bases: DeclarativeBase
Base class for SQLAlchemy models.
MonitoredZone
¶
CurrentFireRisk
¶
Bases: Base
Stores the MOST RECENT fire risk calculation for each zone. This table is optimized for fast lookups by the backend API.
create_db_and_tables()
async
¶
Creates the database and tables if they do not exist.
seed_initial_zones()
async
¶
Populates the database with initial regional zones if empty.
get_monitored_zones()
async
¶
Returns all active monitored zones.
get_zone_by_geohash(geohash)
async
¶
Returns a single monitored zone by its geohash.
get_db()
async
¶
Dependency for getting an async database session.
save_weather_data(location_name, lat, lon, weather_json)
async
¶
Saves the raw weather data for a given location to the database.
save_risk_data(location_name, lat, lon, risk_result)
async
¶
Saves the fire risk data to the history table AND updates the current risk table.
get_latest_readings(location_name, limit=1)
async
¶
Return recent weather and risk rows for one location for debugging.
This helper is primarily intended for operational inspection and tests.
Utilities¶
Adapters between MET weather payloads and the Fire Risk Computation Model.
transform_met_data_to_model(met_json)
¶
Convert MET.no timeseries JSON into FRCM WeatherData input.
The function keeps the full forecast sequence and normalizes timestamps to timezone-aware datetimes expected by the model.
calculate_risk(met_json)
¶
Run risk computation end-to-end and return the current prediction snapshot.
The FRCM output contains a forecast sequence; this function returns the second sample, which represents the first computed point after model initialization.
calculate_risk_score(ttf)
¶
Calculates a normalized risk score (0-100) and category based on Time To Flashover (TTF).
TTF is expressed in minutes. Lower TTF means higher risk.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ttf
|
float
|
Time To Flashover in minutes. |
required |
Returns:
| Type | Description |
|---|---|
float
|
A tuple containing: |
str
|
|
Tuple[float, str]
|
|
Grid/geohash helpers for seeding and analytics-target zone selection.
get_geohash(lat, lon, precision=5)
¶
Converts latitude and longitude to a geohash string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
lat
|
float
|
Latitude. |
required |
lon
|
float
|
Longitude. |
required |
precision
|
int
|
Length of the geohash string. 3 chars ~= 156km x 156km (Coarse Regional Tier) 4 chars ~= 39km x 19km (Finer Regional Tier) 5 chars ~= 4.9km x 4.9km (Precise Tier for User Alerts) |
5
|
Returns:
| Type | Description |
|---|---|
str
|
The geohash string. |
get_geohash_center(geohash)
¶
Decodes a geohash string to its center latitude and longitude.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
geohash
|
str
|
The geohash string. |
required |
Returns:
| Type | Description |
|---|---|
Tuple[float, float]
|
A tuple of (latitude, longitude). |
mark_analytics_targets(zones)
¶
Mark a subset of zones as analytics targets based on nearest city mapping.
generate_initial_zones()
¶
Generates a list of 'Tier 1' (Regional) zones covering the entire bounding box of Norway. These zones use coarser geohash precision (3 characters) for a broader overview, suitable for a national map.
Returns:
| Type | Description |
|---|---|
List[dict]
|
A list of dictionaries representing zones. |
MET.no weather API client helpers used by the intelligence worker.
fetch_weather(lat, lon)
async
¶
Asynchronously fetches weather data for a given latitude and longitude from the MET.no API.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
lat
|
float
|
The latitude of the location. |
required |
lon
|
float
|
The longitude of the location. |
required |
Returns:
| Type | Description |
|---|---|
Any | None
|
A dictionary containing the weather data, or None if an error occurs. |