Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 153 additions & 14 deletions mapillary_tools/geo.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,43 @@ def gps_distance(latlon_1: tuple[float, float], latlon_2: tuple[float, float]) -


def avg_speed(sequence: T.Sequence[PointLike]) -> float:
"""
Calculate average speed over a sequence of points.
Uses GPS epoch time when available (from GPSPoint.epoch_time or CAMMGPSPoint.time_gps_epoch),
otherwise falls back to the time field.
"""
# Import here to avoid circular dependency
from . import telemetry

total_distance = 0.0
for cur, nxt in pairwise(sequence):
total_distance += gps_distance((cur.lat, cur.lon), (nxt.lat, nxt.lon))

if sequence:
time_diff = sequence[-1].time - sequence[0].time
first = sequence[0]
last = sequence[-1]

# Try to use GPS epoch time if available
time_diff = 0.0
if isinstance(first, telemetry.GPSPoint) and isinstance(
last, telemetry.GPSPoint
):
if (
first.epoch_time is not None
and last.epoch_time is not None
and first.epoch_time > 0
and last.epoch_time > 0
):
time_diff = last.epoch_time - first.epoch_time
elif isinstance(first, telemetry.CAMMGPSPoint) and isinstance(
last, telemetry.CAMMGPSPoint
):
if first.time_gps_epoch > 0 and last.time_gps_epoch > 0:
time_diff = last.time_gps_epoch - first.time_gps_epoch

# Fall back to time field if GPS epoch time not available
if time_diff == 0.0:
time_diff = last.time - first.time
else:
time_diff = 0.0

Expand Down Expand Up @@ -141,7 +172,7 @@ def as_unix_time(dt: datetime.datetime | int | float) -> float:

if sys.version_info < (3, 10):

def interpolate(points: T.Sequence[Point], t: float, lo: int = 0) -> Point:
def interpolate(points: T.Sequence[PointLike], t: float, lo: int = 0) -> PointLike:
"""
Interpolate or extrapolate the point at time t along the sequence of points (sorted by time).
"""
Expand All @@ -152,12 +183,14 @@ def interpolate(points: T.Sequence[Point], t: float, lo: int = 0) -> Point:
# for cur, nex in pairwise(points):
# assert cur.time <= nex.time, "Points not sorted"

p = Point(time=t, lat=float("-inf"), lon=float("-inf"), alt=None, angle=None)
idx = bisect.bisect_left(points, p, lo=lo)
times = [p.time for p in points]
# Use bisect_left on the times list
idx = bisect.bisect_left(times, t, lo=lo)

return _interpolate_at_segment_idx(points, t, idx)
else:

def interpolate(points: T.Sequence[Point], t: float, lo: int = 0) -> Point:
def interpolate(points: T.Sequence[PointLike], t: float, lo: int = 0) -> PointLike:
"""
Interpolate or extrapolate the point at time t along the sequence of points (sorted by time).
"""
Expand All @@ -172,18 +205,19 @@ def interpolate(points: T.Sequence[Point], t: float, lo: int = 0) -> Point:
return _interpolate_at_segment_idx(points, t, idx)


class Interpolator:
class Interpolator(T.Generic[PointLike]):
"""
Interpolator for interpolating a sequence of timestamps incrementally.
Preserves the type of input points (Point, GPSPoint, or CAMMGPSPoint).
"""

tracks: T.Sequence[T.Sequence[Point]]
tracks: T.Sequence[T.Sequence[PointLike]]
track_idx: int
# interpolation starts from the lower bound point index in the current track
lo: int
prev_time: float | None

def __init__(self, tracks: T.Sequence[T.Sequence[Point]]):
def __init__(self, tracks: T.Sequence[T.Sequence[PointLike]]):
# Remove empty tracks
self.tracks = [track for track in tracks if track]

Expand All @@ -204,7 +238,7 @@ def __init__(self, tracks: T.Sequence[T.Sequence[Point]]):

@staticmethod
def _lsearch_left(
track: T.Sequence[Point], t: float, lo: int = 0, hi: int | None = None
track: T.Sequence[PointLike], t: float, lo: int = 0, hi: int | None = None
) -> int:
"""
similar to bisect.bisect_left, but faster in the incremental search case
Expand All @@ -221,14 +255,14 @@ def _lsearch_left(
# assert track[lo - 1].time < t <= track[lo].time
return lo

def interpolate(self, t: float) -> Point:
def interpolate(self, t: float) -> PointLike:
if self.prev_time is not None:
if not (self.prev_time <= t):
raise ValueError(
f"Require times to be monotonically increasing, but got {self.prev_time} then {t}"
)

interpolated: Point | None = None
interpolated: PointLike | None = None

while self.track_idx < len(self.tracks):
track = self.tracks[self.track_idx]
Expand Down Expand Up @@ -318,7 +352,14 @@ def _ecef_from_lla2(lat: float, lon: float) -> tuple[float, float, float]:
return x, y, z


def _interpolate_segment(start: Point, end: Point, t: float) -> Point:
def _interpolate_segment(start: PointLike, end: PointLike, t: float) -> PointLike:
"""
Interpolate between two points at time t, preserving the type of the input points.
Supports Point, GPSPoint, and CAMMGPSPoint types.
"""
# Import here to avoid circular dependency
from . import telemetry

try:
weight = (t - start.time) / (end.time - start.time)
except ZeroDivisionError:
Expand All @@ -333,10 +374,108 @@ def _interpolate_segment(start: Point, end: Point, t: float) -> Point:
else:
alt = None

return Point(time=t, lat=lat, lon=lon, alt=alt, angle=angle)
# Determine the type and create the appropriate point
if isinstance(start, telemetry.GPSPoint) and isinstance(end, telemetry.GPSPoint):
# Interpolate GPSPoint-specific fields
epoch_time: float | None
if (
start.epoch_time is not None
and end.epoch_time is not None
and start.epoch_time > 0
and end.epoch_time > 0
):
epoch_time = start.epoch_time + (end.epoch_time - start.epoch_time) * weight
else:
epoch_time = None

precision: float | None
if start.precision is not None and end.precision is not None:
precision = start.precision + (end.precision - start.precision) * weight
else:
precision = None

ground_speed: float | None
if start.ground_speed is not None and end.ground_speed is not None:
ground_speed = (
start.ground_speed + (end.ground_speed - start.ground_speed) * weight
)
else:
ground_speed = None

# For fix, use the start point's value (or could use majority voting logic)
fix = start.fix

return T.cast(
PointLike,
telemetry.GPSPoint(
time=t,
lat=lat,
lon=lon,
alt=alt,
angle=angle,
epoch_time=epoch_time,
fix=fix,
precision=precision,
ground_speed=ground_speed,
),
)

elif isinstance(start, telemetry.CAMMGPSPoint) and isinstance(
end, telemetry.CAMMGPSPoint
):
# Interpolate CAMMGPSPoint-specific fields
time_gps_epoch = (
start.time_gps_epoch + (end.time_gps_epoch - start.time_gps_epoch) * weight
)
horizontal_accuracy = (
start.horizontal_accuracy
+ (end.horizontal_accuracy - start.horizontal_accuracy) * weight
)
vertical_accuracy = (
start.vertical_accuracy
+ (end.vertical_accuracy - start.vertical_accuracy) * weight
)
velocity_east = (
start.velocity_east + (end.velocity_east - start.velocity_east) * weight
)
velocity_north = (
start.velocity_north + (end.velocity_north - start.velocity_north) * weight
)
velocity_up = start.velocity_up + (end.velocity_up - start.velocity_up) * weight
speed_accuracy = (
start.speed_accuracy + (end.speed_accuracy - start.speed_accuracy) * weight
)

# For gps_fix_type, use the start point's value
gps_fix_type = start.gps_fix_type

return T.cast(
PointLike,
telemetry.CAMMGPSPoint(
time=t,
lat=lat,
lon=lon,
alt=alt,
angle=angle,
time_gps_epoch=time_gps_epoch,
gps_fix_type=gps_fix_type,
horizontal_accuracy=horizontal_accuracy,
vertical_accuracy=vertical_accuracy,
velocity_east=velocity_east,
velocity_north=velocity_north,
velocity_up=velocity_up,
speed_accuracy=speed_accuracy,
),
)

else:
# Return base Point type
return T.cast(PointLike, Point(time=t, lat=lat, lon=lon, alt=alt, angle=angle))


def _interpolate_at_segment_idx(points: T.Sequence[Point], t: float, idx: int) -> Point:
def _interpolate_at_segment_idx(
points: T.Sequence[PointLike], t: float, idx: int
) -> PointLike:
"""
Interpolate time t along the segment between idx - 1 and idx.
If idx is out of range, extrapolate it to the nearest segment (first or last).
Expand Down