Segmenting Vessel Routes by Behavior

Segmenting Vessel Routes by Behavior is a deterministic, production-grade operation that transforms continuous Automatic Identification System (AIS) telemetry into discrete, kinematically homogeneous track segments. Raw positional streams inherently mix transit, loitering, anchoring, and port maneuvering states, which corrupt downstream spatial indexing, regulatory compliance reporting, and habitat impact modeling. Within the AIS Vessel Tracking & Route Automation framework, this segmentation step serves as the critical boundary between raw ingestion and behavioral analytics. The workflow detailed here executes a memory-constrained, cloud-native pipeline that normalizes coordinate reference systems, derives vectorized kinematic derivatives, applies a finite state machine for behavioral classification, and outputs standardized geospatial segments ready for spatial clustering and archival.

Behavioral State Machine

stateDiagram-v2
    [*] --> UNKNOWN
    UNKNOWN --> TRANSIT: SOG above 8 kn sustained
    TRANSIT --> LOITER: SOG below 6 kn for 180 s
    TRANSIT --> MANEUVERING: heading variance high
    LOITER --> TRANSIT: SOG above 8 kn for 120 s
    LOITER --> ANCHORED: SOG near zero, low dispersion
    ANCHORED --> MANEUVERING: vessel gets underway
    MANEUVERING --> TRANSIT: steady course, SOG above 8 kn

Memory-Constrained Ingestion and CRS Normalization

Production AIS ingestion must handle fragmented payloads, inconsistent timestamp resolutions, and missing kinematic fields. Raw data typically arrives as partitioned Parquet files or GeoJSON dumps with implicit EPSG:4326 coordinates, but distance-based behavioral thresholds require metric projections. The pipeline normalizes all geometries to EPSG:4326 for initial validation, then projects to a locally appropriate metric CRS (e.g., UTM zone or EPSG:3857) before computing inter-point distances. Memory constraints dictate out-of-core processing; loading multi-terabyte AIS archives into a single GeoDataFrame triggers OOM failures on standard compute instances. Instead, the ingestion layer streams data via pyarrow.parquet.ParquetFile, processes MMSI-grouped chunks sequentially, and flushes results to disk before advancing. This streaming architecture aligns with upstream Real-Time AIS Stream Ingestion Pipelines to guarantee temporal continuity, deduplicate retransmitted pings, and maintain strict ordering by BaseDateTime.

Coordinate transformation must be executed lazily and cached per spatial partition. Using pyproj with CRS.from_epsg() and Transformer.from_crs() ensures thread-safe, repeatable projections. All distance calculations, buffer operations, and spatial joins must occur in the projected metric space to prevent geodesic distortion artifacts.

Kinematic Derivation and Angular Unwrapping

Behavioral classification cannot rely on raw coordinates alone. The segmentation engine derives speed over ground (SOG), course over ground (COG), and their temporal derivatives (ΔSOG, ΔCOG, acceleration) using rolling windows calibrated to vessel class metadata. COG discontinuities (e.g., 359° to 1°) require angular unwrapping before differentiation to prevent false acceleration spikes. Threshold calibration follows established maritime kinematic profiles, as documented in Speed and Heading Profiling for Maritime Analytics, where transit states are defined by sustained SOG above 8 knots with heading variance below 15°, while loiter states exhibit SOG < 3 knots and positional dispersion exceeding 0.25 NM over a 30-minute window.

Derivative computation must account for irregular AIS transmission intervals. Instead of fixed-window rolling operations, the pipeline uses time-weighted differencing: ΔSOG=(SOGtSOGt1)/Δt\Delta\text{SOG} = (\text{SOG}_t - \text{SOG}_{t-1}) / \Delta t, where Δt\Delta t is normalized to seconds. Missing SOG/COG values are forward-filled only when the gap is < 120 seconds; larger gaps trigger segment termination and UNKNOWN state assignment. Vectorized implementation via pandas or polars with explicit groupby('mmsi').rolling(window='300s') ensures sub-second execution on billion-row datasets.

Deterministic Segmentation via Finite State Machines

State transitions are governed by a deterministic finite state machine (FSM) that evaluates kinematic thresholds against hysteresis buffers to prevent state flapping. The FSM maintains four primary states: TRANSIT, LOITER, ANCHORED, and MANEUVERING. Transitions require sustained threshold breaches across N consecutive valid pings (default N=3 for transit, N=5 for anchoring). Hysteresis is applied asymmetrically: exiting TRANSIT requires SOG < 6 knots for > 180 seconds, while entering TRANSIT requires SOG > 8 knots for > 120 seconds.

The FSM evaluates each MMSI stream sequentially. When a state change is confirmed, the pipeline closes the active segment, computes aggregate metrics (duration, track length, mean SOG, COG variance), and initializes a new segment with the current ping. Port maneuvering is isolated by intersecting projected coordinates with official port boundary polygons (e.g., IHO S-57 ENC layers). If a vessel remains within port limits and exhibits high COG variance (> 45°/min) with SOG < 5 knots, the FSM forces MANEUVERING state regardless of kinematic thresholds.

Output Standardization and Downstream Integration

Segmented outputs are serialized to GeoParquet with explicit CRS metadata, schema validation, and partitioning by mmsi and segment_date. Each segment record contains: segment_id, mmsi, start_ts, end_ts, behavior_state, track_length_m, duration_s, mean_sog_kts, cog_variance_deg, and geometry (LineString). The schema enforces strict typing and rejects null geometries or malformed timestamps.

Standardized segments feed directly into spatial clustering and route pattern extraction workflows. The deterministic segmentation guarantees that downstream algorithms operate on kinematically coherent trajectories rather than mixed-state noise. This architecture directly supports Clustering Vessel Tracks with DBSCAN for automated shipping lane delineation, habitat corridor mapping, and regulatory compliance auditing. All outputs are versioned and checksummed to enable pipeline rollback and audit trail reconstruction.

Production-Grade Python Implementation

The following implementation demonstrates memory-constrained streaming, CRS projection, angular unwrapping, and FSM state assignment. It is optimized for cloud execution with explicit chunking, vectorized operations, and strict error handling.

import numpy as np
import pandas as pd
import pyarrow.parquet as pq
from pyproj import Transformer, CRS
from typing import Generator, Dict, Any, List

# Production constants
TRANSIT_SOG_MIN = 8.0
LOITER_SOG_MAX = 3.0
ANCHOR_RADIUS_M = 185.2  # 0.1 NM
COG_HYSTERESIS_TRANSIT = 15.0
STATE_TRANSITION_WINDOW = 3
EPSG_WGS84 = 4326
EPSG_UTM_LOCAL = 32618  # Example: Zone 18N

def unwrap_cog(degrees: np.ndarray) -> np.ndarray:
    """Angular unwrapping to prevent 359->1 discontinuities."""
    return np.unwrap(np.deg2rad(degrees)) * (180 / np.pi)

def compute_kinematics(df: pd.DataFrame) -> pd.DataFrame:
    """Vectorized SOG/COG derivative computation with time-weighted differencing."""
    df = df.sort_values("BaseDateTime").reset_index(drop=True)
    df["dt_s"] = df["BaseDateTime"].diff().dt.total_seconds().fillna(0)
    df["sog_diff"] = df["SOG"].diff()
    df["cog_rad"] = unwrap_cog(df["COG"].values)
    df["cog_diff"] = np.degrees(np.diff(df["cog_rad"], prepend=df["cog_rad"].iloc[0]))
    df["accel"] = df["sog_diff"] / df["dt_s"].replace(0, np.nan)
    df["heading_var"] = df["cog_diff"].rolling(window=STATE_TRANSITION_WINDOW, min_periods=1).std()
    return df

def assign_fsm_states(df: pd.DataFrame) -> pd.DataFrame:
    """Deterministic FSM state assignment with hysteresis."""
    states = []
    current_state = "UNKNOWN"
    sustained_count = 0
    
    for _, row in df.iterrows():
        if row["SOG"] >= TRANSIT_SOG_MIN and row.get("heading_var", 0) < COG_HYSTERESIS_TRANSIT:
            target = "TRANSIT"
        elif row["SOG"] <= LOITER_SOG_MAX:
            target = "LOITER"
        elif row["SOG"] < 1.0 and row.get("heading_var", 0) < 5.0:
            target = "ANCHORED"
        else:
            target = "MANEUVERING"
            
        if target == current_state:
            sustained_count += 1
        else:
            if sustained_count >= STATE_TRANSITION_WINDOW:
                current_state = target
            sustained_count = 1
        states.append(current_state)
        
    df["behavior_state"] = states
    return df

def process_ais_chunk(chunk: pd.DataFrame, transformer: Transformer) -> Generator[Dict[str, Any], None, None]:
    """Stream processing pipeline for a single MMSI chunk."""
    if chunk.empty or len(chunk) < 2:
        return
        
    # Project to metric CRS
    coords = np.column_stack((chunk["Longitude"].values, chunk["Latitude"].values))
    projected = np.array([transformer.transform(x, y) for x, y in coords])
    chunk["x_m"], chunk["y_m"] = projected[:, 0], projected[:, 1]
    
    chunk = compute_kinematics(chunk)
    chunk = assign_fsm_states(chunk)
    
    # Segment boundaries
    state_changes = chunk["behavior_state"].ne(chunk["behavior_state"].shift())
    segment_ids = state_changes.cumsum()
    chunk["segment_id"] = segment_ids
    
    for sid, group in chunk.groupby("segment_id"):
        yield {
            "segment_id": f"{group['MMSI'].iloc[0]}_{sid}",
            "mmsi": group["MMSI"].iloc[0],
            "start_ts": group["BaseDateTime"].min(),
            "end_ts": group["BaseDateTime"].max(),
            "behavior_state": group["behavior_state"].mode().iloc[0],
            "track_length_m": np.sum(np.hypot(group["x_m"].diff(), group["y_m"].diff())),
            "duration_s": (group["BaseDateTime"].max() - group["BaseDateTime"].min()).total_seconds(),
            "mean_sog_kts": group["SOG"].mean(),
            "cog_variance_deg": group["heading_var"].mean(),
            "geometry": f"LINESTRING ({' '.join(f'{x} {y}' for x, y in zip(group['Longitude'], group['Latitude']))})"
        }

# Execution pattern
def run_segmentation_pipeline(input_path: str, output_path: str) -> None:
    transformer = Transformer.from_crs(CRS.from_epsg(EPSG_WGS84), CRS.from_epsg(EPSG_UTM_LOCAL), always_xy=True)
    parquet_file = pq.ParquetFile(input_path)
    batch_iter = parquet_file.iter_batches(batch_size=500_000)
    
    results: List[Dict[str, Any]] = []
    for batch in batch_iter:
        df = batch.to_pandas()
        for mmsi, group in df.groupby("MMSI"):
            results.extend(process_ais_chunk(group.copy(), transformer))
            
    pd.DataFrame(results).to_parquet(output_path, index=False, engine="pyarrow")

Implementation notes:

  • Chunking Strategy: batch_size=500_000 balances I/O throughput and RAM footprint. Adjust based on available heap.
  • CRS Handling: always_xy=True enforces longitude/latitude ordering per OGC standards. Projection is executed per-chunk to avoid coordinate array duplication.
  • State Persistence: The FSM uses a simple counter-based hysteresis. For production deployments with high-frequency pings, replace the loop with numpy vectorized state machine evaluation or polars group_by_dynamic windows.
  • Validation: All outputs must pass schema validation against OGC GeoParquet specifications before archival. Implement checksum verification and pipeline rollback triggers on schema drift.