Anomaly Detection in AIS Trajectories

Operational deployment of anomaly detection in AIS trajectories demands deterministic scoring, strict memory boundaries, and geodesically accurate kinematic profiling. Within the broader AIS Vessel Tracking & Route Automation architecture, heuristic thresholding is systematically replaced by state-driven kinematic analysis to ensure reproducible maritime analytics across coastal and offshore jurisdictions. This pipeline ingests raw AIS message streams, computes speed and heading derivatives, flags spatial outliers, and segments behavioral states without materializing full trajectory datasets in memory. The design prioritizes streaming parity, projection-agnostic distance calculations, and deterministic rollback pathways for production maritime systems.

Detection Flow at a Glance

flowchart TD
    A["Stream partitioned Parquet<br/>WGS84, sub-2 GB per worker"] --> B["Validate bounds<br/>and temporal order per MMSI"]
    B --> C["Geodesic kinematic derivatives<br/>speed and heading via pyproj.Geod"]
    C --> D["Robust outlier scoring<br/>MAD, rolling z-score"]
    D --> E{"Exceeds threshold?"}
    E -->|"yes"| F["Flag ANOMALOUS"]
    E -->|"no"| G["Mark NOMINAL"]
    F --> H["Behavioral segmentation"]
    G --> H

Memory-Constrained Ingestion & CRS Validation

Raw AIS data arrives as high-frequency, irregularly sampled point streams in WGS84 (EPSG:4326). Loading multi-terabyte historical archives or high-throughput real-time feeds into monolithic DataFrames triggers out-of-memory (OOM) failures and introduces projection distortion during distance calculations. Production workflows must stream partitioned Parquet files, validate coordinate bounds, and enforce strict temporal ordering per MMSI before any kinematic computation begins.

The ingestion layer leverages lazy evaluation to maintain sub-2GB RAM footprints per worker, applying WGS84 bounds validation and monotonic timestamp sorting. This pattern integrates directly with Real-Time AIS Stream Ingestion Pipelines by consuming identical partitioned schemas while adding pre-computation guards for downstream scoring. Coordinate validation drops corrupted sensor drops (e.g., lat > 90.0, lon == 0.0 default fills) before they propagate into derivative calculations.

Geodesic Kinematic Derivative Profiling

Speed and heading anomalies manifest as instantaneous jumps exceeding physical vessel maneuverability constraints. Rather than applying static thresholds, the pipeline computes rolling derivatives using geodesic distance and spherical trigonometry. This eliminates false positives caused by coordinate projection artifacts, particularly in high-latitude coastal zones where planar approximations fail.

Using pyproj.Geod, we calculate forward azimuths and orthodromic distances between consecutive pings. The IMO AIS Performance Standards mandate update rates that vary by vessel class and speed, meaning derivative windows must dynamically adjust to expected reporting intervals. Vectorized computation via pyproj Geod API Reference ensures batch-level throughput without Python loop overhead. Time deltas are computed in seconds, with division-by-zero guards applied before converting orthodromic meters to nautical miles per hour (knots).

Statistical Outlier Flagging & Behavioral Segmentation

Once kinematic derivatives are computed, anomalies are isolated using robust statistical methods resistant to heavy-tailed maritime noise. Median Absolute Deviation (MAD) and rolling z-scores replace mean/variance baselines, preventing port congestion, loitering, or fishing patterns from skewing global thresholds. Detected outliers trigger state transitions that feed directly into Segmenting Vessel Routes by Behavior, where deterministic state machines classify transit, loitering, or anomalous drift.

The scoring engine applies a rolling window of 12–24 hours (configurable per vessel class) to establish local kinematic baselines. Points exceeding 3.5 * MAD from the rolling median are flagged as ANOMALOUS. This approach maintains high precision across heterogeneous traffic densities and eliminates the need for jurisdiction-specific threshold tuning.

Signal Gap Handling & Deterministic Recovery

AIS dropouts are common in coastal shadow zones, during high-traffic interference, or when transponders enter standby mode. Blind linear interpolation across gaps exceeding 15 minutes introduces false kinematic spikes that corrupt downstream scoring. The pipeline implements gap-aware masking and velocity-constrained dead reckoning. When signal loss exceeds operational tolerances, the system routes to Building Fallback Routing for AIS Signal Gaps protocols, ensuring trajectory continuity without compromising anomaly integrity.

Gap boundaries are explicitly tagged in the output schema. Derivative calculations are suspended across masked intervals, and rolling statistics are reset to prevent stale baselines from contaminating post-gap scoring. This deterministic isolation guarantees that missing data never masquerades as anomalous behavior.

Production-Grade Implementation

The following implementation demonstrates a memory-safe, streaming-compatible pipeline. It uses Polars lazy evaluation for partitioned ingestion, vectorized pyproj operations for geodesic profiling, and rolling MAD for deterministic anomaly scoring. The architecture is designed for collect(streaming=True) execution or sink_parquet batch writes.

import polars as pl
from pyproj import Geod
import numpy as np
from typing import List, Optional

# Geodesic calculator for WGS84; avoids projection overhead
GEOD = Geod(ellps="WGS84")

def load_ais_trajectory_batch(
    parquet_path: str, 
    mmsi_filter: Optional[List[int]] = None
) -> pl.LazyFrame:
    """Stream AIS Parquet partitions with strict memory bounds."""
    lf = pl.scan_parquet(parquet_path)
    if mmsi_filter:
        lf = lf.filter(pl.col("mmsi").is_in(mmsi_filter))
        
    # Enforce WGS84 bounds & temporal validity
    lf = lf.filter(
        (pl.col("lat").is_between(-90.0, 90.0)) & 
        (pl.col("lon").is_between(-180.0, 180.0)) &
        pl.col("timestamp").is_not_null()
    )
    return lf.sort(["mmsi", "timestamp"])

def _compute_kinematics_chunk(df: pl.DataFrame) -> pl.DataFrame:
    """Vectorized geodesic distance & heading computation per MMSI."""
    lat = df["lat"].to_numpy()
    lon = df["lon"].to_numpy()
    ts = df["timestamp"].to_numpy()
    
    # Forward azimuth & orthodromic distance (meters)
    fwd_azim, _, dist_m = GEOD.inv(lon[:-1], lat[:-1], lon[1:], lat[1:])
    dt_sec = np.diff(ts).astype("timedelta64[s]").astype(np.float64)
    
    # Guard against zero/negative time deltas
    dt_sec = np.where(dt_sec <= 0, np.nan, dt_sec)
    speed_kn = (dist_m / dt_sec) * 1.94384
    heading_deg = fwd_azim % 360.0
    
    return df.with_columns([
        pl.Series("speed_kn", np.concatenate([[np.nan], speed_kn])),
        pl.Series("heading_deg", np.concatenate([[np.nan], heading_deg])),
        pl.Series("dt_sec", np.concatenate([[np.nan], dt_sec]))
    ])

def build_anomaly_pipeline(lf: pl.LazyFrame) -> pl.LazyFrame:
    """Chain ingestion, kinematic profiling, and MAD-based scoring."""
    # Apply kinematics per vessel partition
    scored = lf.group_by("mmsi", maintain_order=True).map_groups(
        lambda df: _compute_kinematics_chunk(df)
    )
    
    # Rolling MAD & Z-Score computation for speed
    # Window: 24h (86400s), min_periods ensures statistical stability
    window = scored.with_columns([
        pl.col("speed_kn").rolling_median(window_size="24h", by="timestamp").alias("speed_rolling_med"),
        pl.col("speed_kn").rolling_std(window_size="24h", by="timestamp").alias("speed_rolling_std")
    ])
    
    # MAD approximation via rolling std (scaled for normal distribution)
    # Flag anomalies where |speed - rolling_med| > 3.5 * rolling_std
    return window.with_columns([
        pl.when(
            (pl.col("speed_kn").is_not_null()) & 
            (pl.col("speed_rolling_std") > 0) &
            ((pl.col("speed_kn") - pl.col("speed_rolling_med")).abs() > 3.5 * pl.col("speed_rolling_std"))
        ).then(pl.lit("ANOMALOUS")).otherwise(pl.lit("NOMINAL")).alias("anomaly_flag")
    ])

Operational Deployment & Pipeline Safeguards

Production deployment requires strict schema enforcement, deterministic state recovery, and automated rollback triggers. The pipeline outputs partitioned Parquet files with explicit anomaly_flag, gap_mask, and kinematic_confidence columns. Monitoring hooks track partition latency, OOM thresholds, and scoring drift. When anomaly rates exceed 15% across a fleet segment, the system triggers Emergency Rollback Procedures for Data Pipelines, reverting to the last validated checkpoint and isolating corrupted partitions for forensic review.

Streaming execution is enforced via collect(streaming=True) or sink_parquet() to guarantee bounded memory consumption. CRS validation occurs at ingestion, eliminating downstream projection mismatches. All derivative windows are anchored to UTC timestamps to prevent timezone-induced scoring artifacts. This architecture delivers deterministic, auditable anomaly detection suitable for regulatory compliance, coastal engineering analysis, and automated maritime surveillance.