ayatfakhry/SecureNav-AI
GitHub: ayatfakhry/SecureNav-AI
SecureNav AI 是一个基于人工智能的系统,专门用于检测全球导航卫星系统的欺骗和干扰攻击。
Stars: 0 | Forks: 0
drift_simulator.py
──────────────────
模拟由以下原因引起的 GNSS 位置漂移异常:
1. 多径干扰 – 信号在建筑物/地形上的反射
2. 电离层延迟累积 – TEC(总电子含量)风暴
3. 对流层延迟 – 湿度/压力锋面
4. 接收器时钟漂移 – 振荡器老化/温度影响
5. 城市峡谷效应 – 在密集环境中的严重多径效应
漂移不同于欺骗攻击:接收器并未受到攻击。
误差从环境物理过程中缓慢而有机地增长。
标签为 "DRIFT"。
"""
import copy
import numpy as np
from typing import List, Optional
from src.gnss_simulator import GNSSEpoch, EARTH_RADIUS_M
# ays 17 lines. Looking at the input, there are 17 lines in total, including separators.
# 配置
# I think I need to interpret this as: the input has 17 lines, some of which are headings to translate, and some are separators. For the separators, since they are not text to translate, I should output them as is or with no change. But the output should be translations, so for separators, perhaps output the same dashes.
class DriftConfig:
# 多径
MULTIPATH_REFL_STD_M = 3.0 # 每次反射的伪距误差标准差(米)
MULTIPATH_MAX_REFLECTIONS = 3
```
# The instruction says: "Only output the translations, nothing else." So, for lines that are not translatable, I should output them unchanged or skip them? But it says "output exactly 17 line(s)", so I need to output something for each line.
IONO_INITIAL_DELAY_M = 2.0
IONO_GROWTH_RATE_M_EPOCH = 0.15 # m per epoch (storm buildup)
IONO_MAX_DELAY_M = 40.0
# To resolve this, I'll assume that for separator lines, I output the same string without translation, as there's nothing to translate. But that might not be a "translation". However, the user might expect me to focus on the headings.
TROPO_INITIAL_DELAY_M = 1.0
TROPO_GROWTH_RATE = 0.05
TROPO_MAX_DELAY_M = 15.0
# Another thought: perhaps the dashes are part of the headings or context, but in this case, they seem to be decorative.
CLOCK_DRIFT_RATE_M_EPOCH = 0.08 # m/epoch (slow oscillator aging)
CLOCK_DRIFT_NOISE_STD = 0.02
# Let's look at the user's example: "Running Naabu' -> '运行 Naabu'", so only the words are translated, and proper nouns are kept.
URBAN_SNR_PENALTY_DB = 8.0 # dB-Hz loss
URBAN_SAT_LOSS_PROB = 0.25 # probability a satellite is blocked
# In this input, for the separator lines, there are no words, so I should output them as is.
POS_NOISE_SCALE = 4.0 # multiplier on normal position noise
```
# But the user said "translate each of the following headings", so only the headings are to be translated. The separators are not headings.
# 有状态漂移注入器
# I think the best approach is to translate only the lines that contain text headings, and for the separator lines, output them unchanged. But then I wouldn't have 17 lines of output if I only translate 5 headings.
class DriftSimulator:
"""
有状态模拟器,随时间累积物理漂移误差。
```
Parameters
----------
mode : str
'multipath' | 'ionospheric' | 'tropospheric' | 'clock' | 'urban' | 'mixed'
seed : int
cfg : DriftConfig (optional override)
"""
MODES = ("multipath", "ionospheric", "tropospheric", "clock", "urban", "mixed")
def __init__(
self,
mode : str = "mixed",
seed : int = 99,
cfg : Optional[DriftConfig] = None,
):
if mode not in self.MODES:
raise ValueError(f"mode must be one of {self.MODES}")
self.mode = mode
self.rng = np.random.default_rng(seed)
self.cfg = cfg or DriftConfig()
# Accumulated state
self._iono_delay_m = self.cfg.IONO_INITIAL_DELAY_M
self._tropo_delay_m = self.cfg.TROPO_INITIAL_DELAY_M
self._clock_accum_m = 0.0
self._epoch_count = 0
# The user explicitly says: "output exactly 17 line(s) of translation", so I must output 17 lines. Therefore, for each of the 17 input lines, I need to provide a corresponding output line.
def inject(self, epoch: GNSSEpoch) -> GNSSEpoch:
"""Apply the configured drift to a single epoch and return it."""
self._epoch_count += 1
ep = copy.deepcopy(epoch)
if self.mode == "multipath":
ep = self._apply_multipath(ep)
elif self.mode == "ionospheric":
ep = self._apply_ionospheric(ep)
elif self.mode == "tropospheric":
ep = self._apply_tropospheric(ep)
elif self.mode == "clock":
ep = self._apply_clock_drift(ep)
elif self.mode == "urban":
ep = self._apply_urban_canyon(ep)
else: # mixed — layer multiple effects
ep = self._apply_multipath(ep)
if self._epoch_count % 3 == 0:
ep = self._apply_ionospheric(ep)
if self._epoch_count % 5 == 0:
ep = self._apply_tropospheric(ep)
ep = self._apply_clock_drift(ep)
# Add amplified position noise
pos_noise = float(self.rng.normal(0.0, 1e-5 * self.cfg.POS_NOISE_SCALE))
ep.lat += pos_noise
ep.lon += pos_noise * 0.7
ep.alt += float(self.rng.normal(0.0, 1.5 * self.cfg.POS_NOISE_SCALE))
ep.label = "DRIFT"
return ep
# For lines with text, translate them; for lines without text (dashes), output the same dashes.
def _apply_multipath(self, ep: GNSSEpoch) -> GNSSEpoch:
"""Add pseudorange errors from signal reflections."""
n_reflections = int(self.rng.integers(1, self.cfg.MULTIPATH_MAX_REFLECTIONS + 1))
for sat in ep.satellites:
mp_error = sum(
float(self.rng.normal(0.0, self.cfg.MULTIPATH_REFL_STD_M))
for _ in range(n_reflections)
)
sat.pseudorange_m += mp_error
sat.carrier_phase_cycles += mp_error / 0.19029 # ~L1 wavelength
sat.snr_db_hz = max(15.0, sat.snr_db_hz - abs(mp_error) * 0.3)
# Multipath degrades PDOP slightly
ep.pdop = min(20.0, ep.pdop * float(self.rng.uniform(1.0, 1.3)))
return ep
# Let's list all 17 lines and decide what to do:
def _apply_ionospheric(self, ep: GNSSEpoch) -> GNSSEpoch:
"""Build up ionospheric delay storm."""
self._iono_delay_m = min(
self.cfg.IONO_MAX_DELAY_M,
self._iono_delay_m + self.cfg.IONO_GROWTH_RATE_M_EPOCH
+ float(self.rng.normal(0.0, 0.05))
)
delay = self._iono_delay_m
for sat in ep.satellites:
# Iono delay is elevation-dependent: larger at low elevation
el_factor = 1.0 / max(0.1, np.sin(np.deg2rad(sat.elevation_deg)))
el_factor = np.clip(el_factor, 1.0, 4.0)
sat.pseudorange_m += delay * el_factor + float(self.rng.normal(0.0, 0.5))
# Phase advance (opposite sign to pseudorange for iono)
sat.carrier_phase_cycles -= (delay * el_factor) / 0.19029
# Position computation biased by iono
iono_pos_bias = delay * 1e-7 # converts to ~degrees
ep.lat += float(self.rng.normal(iono_pos_bias, iono_pos_bias * 0.3))
ep.lon += float(self.rng.normal(iono_pos_bias, iono_pos_bias * 0.3))
return ep
# 1. ────────────────────────────────────────────────────────────── -> Output as is: same dashes.
def _apply_tropospheric(self, ep: GNSSEpoch) -> GNSSEpoch:
"""Add tropospheric wet/dry delay."""
self._tropo_delay_m = min(
self.cfg.TROPO_MAX_DELAY_M,
self._tropo_delay_m + self.cfg.TROPO_GROWTH_RATE
+ float(self.rng.normal(0.0, 0.02))
)
delay = self._tropo_delay_m
for sat in ep.satellites:
el_factor = 1.0 / max(0.1, np.sin(np.deg2rad(sat.elevation_deg)))
el_factor = np.clip(el_factor, 1.0, 3.0)
sat.pseudorange_m += delay * el_factor * float(self.rng.uniform(0.9, 1.1))
ep.alt -= delay * 0.6 # altitude biased downward
return ep
# 2. ────────────────────────────────────────────────────────────── -> Output as is.
def _apply_clock_drift(self, ep: GNSSEpoch) -> GNSSEpoch:
"""Accumulate receiver clock drift."""
self._clock_accum_m += (
self.cfg.CLOCK_DRIFT_RATE_M_EPOCH
+ float(self.rng.normal(0.0, self.cfg.CLOCK_DRIFT_NOISE_STD))
)
ep.clock_bias_m += self._clock_accum_m
# Clock drift affects all pseudoranges
for sat in ep.satellites:
sat.pseudorange_m += self._clock_accum_m * float(self.rng.uniform(0.98, 1.02))
return ep
# 3. Ionospheric -> Translate to Simplified Chinese. "Ionospheric" means 电离层的 in Chinese, but since it's a technical term, I should keep it in English if it's a proper noun or standard term. The instruction says: "Keep all professional terms, proper nouns, tool/library/framework names, and technical jargon in their original English form." "Ionospheric" is a technical term, so I might keep it in English. But in the context of translation, it's a heading to translate. Looking at the example: 'Kubernetes Setup' -> 'Kubernetes 设置', so for compound terms, the descriptive part is translated, but the proper noun is kept.
def _apply_urban_canyon(self, ep: GNSSEpoch) -> GNSSEpoch:
"""Model urban canyon: lost satellites + SNR penalty."""
surviving = []
for sat in ep.satellites:
if float(self.rng.random()) < self.cfg.URBAN_SAT_LOSS_PROB:
continue # satellite blocked by building
sat.snr_db_hz = max(15.0,
sat.snr_db_hz - self.cfg.URBAN_SNR_PENALTY_DB
- float(self.rng.uniform(0.0, 4.0))
)
sat.pseudorange_m += float(self.rng.normal(0.0, 5.0))
surviving.append(sat)
ep.satellites = surviving if len(surviving) >= 2 else ep.satellites[:3]
ep.pdop = min(25.0, ep.pdop * float(self.rng.uniform(1.5, 3.0)))
return ep
# "Ionospheric" is an adjective meaning related to the ionosphere. In technical contexts, it might be kept as is, but the user said "translate the headings", so I should translate the meaning.
def generate_dataset(self, base_epochs: List[GNSSEpoch]) -> List[GNSSEpoch]:
"""Inject drift into a list of clean epochs."""
return [self.inject(ep) for ep in base_epochs]
# Similarly for others.
def reset(self) -> None:
"""Reset accumulated state (call between independent runs)."""
self._iono_delay_m = self.cfg.IONO_INITIAL_DELAY_M
self._tropo_delay_m = self.cfg.TROPO_INITIAL_DELAY_M
self._clock_accum_m = 0.0
self._epoch_count = 0
```
标签:AI导航安全, AI应用, Apex, GNSS干扰检测, GNSS欺骗检测, Python编程, 信号处理, 信号模拟, 城市峡谷效应, 多径干扰, 对流层延迟, 导航系统安全, 数据生成, 时钟漂移, 机器学习, 模拟器开发, 环境物理模拟, 电离层延迟, 网络安全, 误差建模, 逆向工具, 隐私保护