ARTICLE AD BOX
How do you like my code?How do you like my code?How do you like my code?How do you like my code?How do you like my code?How do you like my code?How do you like my code?
Imports
import gpxpy import geopandas as gpd import contextily as ctx from geopy.distance import geodesic from PIL import Image, ImageEnhance from shapely.geometry import box, LineString import requests import pandas as pd import os, time import matplotlib.pyplot as plt import seaborn as sns from random import randint, uniformDownloads GPX files from provided URLs and saves them locally.
def download_gpx(links): for num, url in enumerate(links): try: response = requests.get(url) filename = f"track{num}.gpx" with open(f"data/gpx/{filename}", mode="wb") as f: f.write(response.content) except Exception as e: print("Error") df = pd.DataFrame(columns=["track_id", "track_time", "latitude", "longitude", "altitude"])Converts GPX files to PNG images with maps and tracks visualized.
def gpx_to_png(df): margin = 0.02 img_path = "data/image" os.makedirs(img_path, exist_ok=True) for i in os.listdir("data/gpx"): png_name = f"{i[:-4]}.png" with open(f"data/gpx/{i}", encoding="UTF-8") as f: gpx = gpxpy.parse(f) lats, lons = [], [] for track in gpx.tracks: for segment in track.segments: for point in segment.points: lats.append(point.latitude) lons.append(point.longitude) df.loc[len(df)] = [i, point.time, point.latitude, point.longitude, point.elevation] bbox = box( min(lons) - margin, min(lats) - margin, max(lons) + margin, max(lats) + margin ) track_line = LineString(zip(lons, lats)) gdf_bbox = gpd.GeoDataFrame(geometry=[bbox], crs="EPSG:4326") gdf_bbox_web = gdf_bbox.to_crs(epsg=3857) gdf_track = gpd.GeoDataFrame(geometry=[track_line], crs="EPSG:4326") gdf_track_web = gdf_track.to_crs(epsg=3857) fig, ax = plt.subplots(figsize=(10, 8)) gdf_bbox_web.plot(ax=ax, alpha=0) gdf_track_web.plot(ax=ax, color="red", linewidth=2) ctx.add_basemap(ax, crs=gdf_bbox_web.crs, source=ctx.providers.OpenStreetMap.Mapnik) ax.set_axis_off() plt.savefig(f"{img_path}/{png_name}", dpi=150, bbox_inches="tight", pad_inches=0) plt.close(fig) return dfFetches temperature data from Open-Meteo API for a specific location and date.
df['track_time'] = df['track_time'].dt.strftime('%Y-%m-%d') def temp(lat, lon, date): url = "https://archive-api.open-meteo.com/v1/archive" params = { 'latitude': lat, 'longitude': lon, 'start_date': date, 'end_date': date, 'hourly': 'temperature_2m', "timezone":"auto" } headers = {'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0'} response = requests.get(url, params=params, headers=headers) response.raise_for_status() if response.status_code == 200: data = response.json() return data["hourly"]['temperature_2m'][12] else: print(f"Error")Interpolates temperatures between key points for all track points.
def analysis_weather(df): n = len(df) key_indexes = [0, n//4, n//2, 3*n//4, n-1] temperatures_at_key_points = {} for idx in key_indexes: lat = df.iloc[idx]["latitude"] lon = df.iloc[idx]["longitude"] date = df.iloc[idx]["track_time"] temp_value = temp(lat, lon, date) temperatures_at_key_points[idx] = temp_value if len(temperatures_at_key_points) < 2: print("Not enough data for interpolation, please check the API functionality") df["temperature"] = None return df all_temperatures = [] left_idx = 0 right_idx = 1 for i in range(n): if (right_idx < len(key_indexes) - 1 and i >= key_indexes[right_idx]): left_idx += 1 right_idx += 1 left_key = key_indexes[left_idx] right_key = key_indexes[right_idx] left_temp = temperatures_at_key_points[left_key] right_temp = temperatures_at_key_points[right_key] if i in temperatures_at_key_points: temperature = temperatures_at_key_points[i] elif left_temp is not None and right_temp is not None: temperature = left_temp + (right_temp - left_temp) * (i - left_key) / (right_key - left_key) else: temperature = left_temp if left_temp is not None else right_temp all_temperatures.append(temperature) df = df.copy() df["temperature"] = all_temperatures return dfApplies weather analysis to multiple tracks and combines results.
def get_temp(df): try: df_temp = pd.DataFrame() for i in range(0, 3): track_data = df[df["track_id"] == f"track{i}.gpx"] track_data_weather = analysis_weather(track_data) df_temp = pd.concat([df_temp, track_data_weather]) print(f"track{i} add") df = df_temp.copy() return df except Exception as e: print(f"Error {e}")Extracts geographical region information using OpenStreetMap Nominatim API.
def extract_map_region(lat: float, lon: float): try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/536.36 (KHTML, like Gecko) Chrome/58.0.3029.100 Safari/536.3'} response = requests.get(f"https://nominatim.openstreetmap.org/reverse?lat={lat}&lon={lon}&format=json", headers=headers) json = response.json() time.sleep(1.5) if "county" in json["address"]: return json["address"]["county"] if "state" in json["address"]: return json["address"]["state"] return json["address"]["country"] except Exception as e: print(f"Error {e}")Determines geographical region for track points.
Calculates step frequency based on distance between consecutive track points.Assumes average step length of 0.75 meters to convert distance to steps.
def analysis_region(df): try: lat = df.iloc[0]["latitude"] lon = df.iloc[0]["longitude"] df = df.copy() df["region"] = extract_map_region(lat, lon) return df except Exception as e: print(f"Error {e}") def step_frequency(df): points = list(zip(df["latitude"], df["longitude"])) step = [0] for p1, p2 in zip(points, points[1:]): dist = geodesic(p1, p2).meters step.append(dist / 0.75) df["steps"] = step return dfAnalyzes terrain type and key geographical objects around track points.
def terrain_type(df): overpass_endpoints = [ "https://overpass-api.de/api/interpreter", "https://overpass.kumi.systems/api/interpreter", "https://overpass.openstreetmap.ru/cgi/interpreter" ] try: points = list(zip(df["latitude"], df["longitude"])) rep_points = [points[0], points[len(points)//2], points[-1]] # 3 точки all_landuse, all_natural, all_key_objects = [], [], set() for lat, lon in rep_points: overpass_query = f""" [out:json][timeout:45]; ( way(around:500,{lat},{lon})["landuse"]; way(around:500,{lat},{lon})["natural"]; way(around:500,{lat},{lon})["leisure"]; way(around:500,{lat},{lon})["waterway"="river"]; way(around:500,{lat},{lon})["waterway"="stream"]; way(around:500,{lat},{lon})["natural"="water"]; node(around:500,{lat},{lon})["place"="city"]; node(around:500,{lat},{lon})["place"="town"]; node(around:500,{lat},{lon})["place"="village"]; node(around:500,{lat},{lon})["natural"="peak"]; node(around:500,{lat},{lon})["natural"="mountain"]; ); out tags center; """ for endpoint in overpass_endpoints: response = requests.get(endpoint, params={'data': overpass_query}, timeout=60) if response.status_code == 200: data = response.json() break for element in data.get('elements', []): tags = element.get('tags', {}) if 'landuse' in tags: all_landuse.append(tags['landuse']) elif 'natural' in tags: all_natural.append(tags['natural']) if tags['natural'] in ['peak', 'mountain'] and 'name' in tags: all_key_objects.add(f"Mountain: {tags['name']}") if 'waterway' in tags and tags['waterway'] in ['river', 'stream'] and 'name' in tags: all_key_objects.add(f"River: {tags['name']}") if 'place' in tags and tags['place'] in ['city', 'town', 'village'] and 'name' in tags: all_key_objects.add(f"Settlement: {tags['name']} ({tags['place']})") if element.get('type') == 'way' and 'natural' in tags and tags['natural'] == 'water' and 'name' in tags: all_key_objects.add(f"Lake: {tags['name']}") time.sleep(1.5) terrain_type = "unknown" if all_landuse: terrain_type = max(set(all_landuse), key=all_landuse.count) elif all_natural: terrain_type = max(set(all_natural), key=all_natural.count) key_objects_str = "; ".join(sorted(all_key_objects)) if all_key_objects else None df["terrain_type"] = terrain_type df["key_objects_str"] = key_objects_str except Exception as e: print(f"Error {e}") return dfCreates kernel density estimation plots for all numerical columns in DataFrame.
def norm_or_not(df): n = len(df.columns) // 5 + bool(len(df.columns) % 5) _, axes = plt.subplots(nrows=n, ncols=5, figsize=(15,20)) for idx, i in enumerate(df.columns): sns.kdeplot(data=df, x=i, common_norm=False, ax=axes[idx // 5][idx%5]) plt.title(i) plt.show()Performs data augmentation on map images to create training dataset variations.
def data_augmentation(): images_path = "data/image" os.makedirs(images_path, exist_ok=True) for filename in os.listdir(images_path): if filename.lower().endswith(".png") and not any(word in filename for word in ["rotated", "contrasted", "brightness"]): img_path = os.path.join(images_path, filename) img = Image.open(img_path) base_name = os.path.splitext(filename)[0] rotated_img = img.rotate(randint(10, 60)) rotated_img.save(os.path.join(images_path, f"{base_name}_rotated.png")) contrasted_img = ImageEnhance.Contrast(img).enhance(randint(2, 4)) contrasted_img.save(os.path.join(images_path, f"{base_name}_contrasted.png")) brightness_img = ImageEnhance.Brightness(img).enhance(uniform(1.2, 1.6)) brightness_img.save(os.path.join(images_path, f"{base_name}_brightness.png"))