OK, ponieważ z Aleksandrem "Spinka" nie było kontaktu, a kiedyś w 2019 roku pobrałem ówczesną wersję programu, zająłem się nią na poważnie, po wielu nieudanych próbach poprawiłem, rozwinąłem i scaliłem w skrypt działający zarówno na pythonie 2 jak i 3, w systemach Linux i Windows.
Poniżej wklejam pełen kod. Na razie brak obsługi wiadomości MLAT, ale i tym się zajmę. Aby uruchomić skrypt trzeba zainstalować niektóre z modułów, co opisałem w komentarzach w skrypcie. Bardzo pomocny jest chatGPT który podpowie co zrobić z danym błędem, jeśli jakiś napotkacie. Skrypt ogólnie działa stabilnie, więc nie powinno być większych problemów.
Trzeba tylko wpisać swoje współrzędne, elewację oraz lotnisko dla którego chcecie pobierać METAR aby aktualizować ciśnienie QNH.
# -*- coding: utf-8 -*-
Original idea: https://github.com/darethehair/flight-warning
version 1.06
This program will send a Google mail message when an ADS-B data feed from
a dump1090 stream detects an aircraft within a set distance of a geographic point.
It will also send an email when the aircraft leaves that detection area.
As well, it will send a warning email if the trajectory of the plane is likely to
transit_predect the detection zone.
The appearance of the records sent to stdout look like this:
The format is as follows:
The units of elevation and distance depend on settings within the code below (i.e. meters/kilometers
or feet/miles).
Copyright (C) 2015 Darren Enns <darethehair@gmail.com>
v0.3 Grzegorz Tuszyński <grztus@wp.pl> (May 2024)
- auto checking the pressure from metar (for Poland region. You need to find some metar url site for Your country and change proper lines below in the script)
- minor bug fixes (including some calculations)
- added support of python 2 and python 3 in one script
- fitted to work both on Windows (10 Pro) and linux debian (10) systems
- more comments as user instructions added for better understanding the script
1. add vertical speed and pilot selected altitude to transit calculation
2. add MLAT messages handling
3. solve the problem with function to get directly entered user lat, lon, alt and pressure at script startup (now the lines are disabled)
4. add some logging functions
- try/except for plane lat/lon in MSG 3
- Color console realtime display Az/Alt
- Sun/Moon transits prediction
/home/User/# nc 30003 | python name_of_the_script_file.py
(in Windows system use ncat instead of nc)
# import required libraries, most of them should be preinstalled with Your python
from __future__ import print_function
import os
import subprocess
import sys
import datetime
import time
import math
import ephem # You need to install this module for Your python: pip install ephem
import re
import requests # You need to install this module for Your python: pip install requests
import threading
from math import atan2, sin, cos, acos, radians, degrees, atan, asin, sqrt, isnan
# Python 3
import tkinter as tk # You need to install this module for Your python: sudo apt-get install python3-tk
from tkinter import simpledialog
except ImportError:
# Python 2
import Tkinter as tk # You need to install this module for Your python: sudo apt-get install python-tk
import tkSimpleDialog as simpledialog
# Ścieżka do pliku przechowującego dane użytkownika
USER_DATA_FILE = "user_data.txt"
# Global settings / Globalne ustawienia
MAX_AGE_SECONDS = 60 # Maximum life time from last received message / Maksymalny czas życia wpisu po ostatnim odbiorze sygnału (w sekundach)
# Funkcja do pobierania lokalizacji, wysokości i ciśnienia od użytkownika za pomocą okna dialogowego
def get_user_input():
# Spróbuj wczytać poprzednio zapisane dane użytkownika z pliku
with open(USER_DATA_FILE, "r") as file:
data = file.readlines()
latitude, longitude, elevation, pressure = map(str.strip, data)
except FileNotFoundError:
latitude, longitude, elevation, pressure = "", "", "", ""
root = tk.Tk()
root.withdraw() # Ukryj główne okno aplikacji
# Poproś użytkownika o wprowadzenie lokalizacji (szerokość i długość geograficzną)
latitude = simpledialog.askstring("Input", "Enter observer's latitude (e.g., 51 23 35 N):", initialvalue=latitude)
longitude = simpledialog.askstring("Input", "Enter observer's longitude (e.g., 21 11 20 E):", initialvalue=longitude)
# Poproś użytkownika o wprowadzenie wysokości
elevation = simpledialog.askinteger("Input", "Enter observer's elevation (meters above sea level):", initialvalue=int(elevation) if elevation else None)
# Poproś użytkownika o wprowadzenie ciśnienia atmosferycznego
pressure = simpledialog.askfloat("Input", "Enter atmospheric pressure (QNH hPa):", initialvalue=float(pressure) if pressure else None)
# Zapisz nowe dane użytkownika do pliku
with open(USER_DATA_FILE, "w") as file:
return latitude, longitude, elevation, pressure
# Wywołaj funkcję do pobrania danych od użytkownika
user_latitude, user_longitude, user_elevation, user_pressure = get_user_input()
if os.name == 'nt':
print (os.name)
os.system('mode con: cols=170 lines=23')
# metar_path = 'C:\metar.txt'
print (os.name)
if sys.version_info[0] == 2: # Python 2
print (os.name)
deg = u'\xb0'
earth_R = 6371
REDALERT = '\x1b[1;37;41m'
PURPLE = '\x1b[1;35;40m'
PURPLEDARK = '\x1b[0;35;40m'
RED = '\x1b[0;31;40m'
GREEN = '\x1b[0;30;42m'
GREENALERT = '\x1b[0;30;42m'
GREENFG = '\x1b[1;32;40m'
BLUE = '\x1b[1;34;40m'
YELLOW = '\x1b[1;33;40m'
CYAN = '\x1b[1;36;40m'
RESET = '\x1b[0m'
# Czyści ekran przed wyświetleniem informacji clear_screen()
def clear_screen():
# Dla Windows
if os.name == 'nt':
subprocess.call('cls', shell=True)
# Dla Unix/Linux/Mac
subprocess.call('clear', shell=True)
def clean_dict():
current_time = datetime.datetime.now()
to_delete = []
for icao, entry in plane_dict.items():
entry_time = entry[0] # Zakładając, że entry[0] to datetime ostatniej aktualizacji dla tego ICAO
if (current_time - entry_time).total_seconds() > MAX_AGE_SECONDS:
for icao in to_delete:
del plane_dict[icao]
# initialize empty dictionaries
plane_dict = {}
# set desired units
metric_units = True
aktual_t = datetime.datetime.now()
last_t = datetime.datetime.now() - datetime.timedelta(seconds=10)
gong_t = datetime.datetime.now()
# set desired distance and time limits
warning_distance = 250
alert_duplicate_minutes = 20
alert_distance = 15 ## Warning radius 1 Sound alert
xtd_tst = 20 ## Warning radius 2 Sound alert
transit_separation_sound_alert = 3.2
transit_separation_REDALERT_FG = 10
transit_separation_GREENALERT_FG = 3
transit_separation_notignored = 20
# set geographic location and elevation
my_lat = 51.1111 #yourlatitude # (positive = north, negative = south)
my_lon = 21.1111 #yourlongitude # (positive = east, negative = west)
my_elevation = 111
my_elevation_const = 114 #your antenna elevation = site elevation + 3 metres
near_airport_elevation = 111 #nearest airport elevation
# Deklaracja globalnych zmiennych
global metar_t
global pressure
metar_t = datetime.datetime.now() - datetime.timedelta(seconds=900) # Ustawienie początkowego czasu
pressure = 1013 # Domyślne ciśnienie
metar_url = 'https://awiacja.imgw.pl/metar00.php?airport=EPRA' # Adres URL z danymi METAR
gatech = ephem.Observer()
gatech.lat, gatech.lon = str(my_lat), str(my_lon)
gatech.elevation = my_elevation_const
# calculate time zone for ISO date/timestamp
timezone_hours = time.altzone/60/60
last_update_time = datetime.datetime.now() # Inicjalizacja zmiennej na początku skryptu
# define havesine great-circle-distance routine
# credit: http://www.movable-type.co.uk/scripts/latlong.html
def haversine(origin, destination):
lat1, lon1 = origin
lat2, lon2 = destination
if (metric_units):
radius = 6371 # km
radius = 3959 # miles
dlat = radians(lat2-lat1)
dlon = radians(lon2-lon1)
a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(radians(lat1)) * math.cos(radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
d = radius * c
return d
# define cross-track error routine
# credit: http://www.movable-type.co.uk/scripts/latlong.html
def crosstrack(distance, azimuth, track):
if (metric_units):
radius = 6371 # km
radius = 3959 # miles
xtd = round(abs(math.asin(math.sin(float(distance)/radius) * math.sin(radians(float(azimuth) - float(track)))) * radius),1)
return xtd
def log_transits(icao, flight, transit_info, celestial_body):
filename = "transits_log.txt"
with open(filename, "a") as file:
# Formatowanie danych do zapisu
date_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
icao_code = icao
flight_code = flight
min_distance = transit_info['min_distance']
plane_az = transit_info['plane_az']
plane_alt = transit_info['plane_alt']
celestial_az = transit_info['celestial_az']
celestial_alt = transit_info['celestial_alt']
# Tworzenie linii do zapisu w pliku
line = "{},{},{},{},{},{},{},{},{}\n".format(
date_time, icao_code, flight_code, min_distance, plane_az, plane_alt, celestial_az, celestial_alt, celestial_body
# przeciecie azymutu slonca/ksiezyca z torem lotu
# credit: http://www.movable-type.co.uk/scripts/latlong.html
def transit_pred(obs2moon, plane_pos, track, velocity, elevation, moon_alt, moon_az):
if moon_alt < 0.1:
return 0
lat1, lon1 = obs2moon
lat2, lon2 = plane_pos
lat1 = radians(lat1)
lat2 = radians(lat2)
lon1 = radians(lon1)
lon2 = radians(lon2)
theta_13 = radians(float(moon_az))
theta_23 = radians(float(track))
Dlat = lat1-lat2
Dlon = lon1-lon2
delta_12 = 2*asin( sqrt( sin(Dlat/2)*sin(Dlat/2) + \
cos(lat1)*cos(lat2)*sin(Dlon/2)*sin(Dlon/2) ) )
if (float(delta_12 == 0)): # Prevent division by zero
return 0
# Adjusting acos input to be within the range [-1, 1]
x = (sin(lat2) - sin(lat1) * cos(delta_12)) / (sin(delta_12) * cos(lat1))
x = min(1, max(-1, x)) # Clamp x to the range [-1, 1] to avoid math domain error
theta_a = acos(x)
if (isnan(theta_a)):
theta_a = 0
# Adjust acos input for theta_b to be within the range [-1, 1]
y = (sin(lat1) - sin(lat2) * cos(delta_12)) / (sin(delta_12) * cos(lat2))
y = min(1, max(-1, y)) # Clamp y to the range [-1, 1] to avoid math domain error
theta_b = acos(y)
if sin(lon2-lon1) > 0:
theta_12 = theta_a
theta_21 = 2*math.pi - theta_b
theta_12 = 2*math.pi - theta_a
theta_21 = theta_b
alfa_1 = theta_13 - theta_12
alfa_2 = theta_21 - theta_23
if (sin(alfa_1) == 0) and (sin(alfa_2) == 0):
return 0 #null
if ((sin(alfa_1))*(sin(alfa_2)) < 0):
return 0 #null
alfa_3 = acos( -cos(alfa_1)*cos(alfa_2) + sin(alfa_1)*sin(alfa_2)*cos(delta_12) )
delta_13 = atan2( sin(delta_12)*sin(alfa_1)*sin(alfa_2), cos(alfa_2)+cos(alfa_1)*cos(alfa_3) )
lat3 = asin( sin(lat1)*cos(delta_13) + cos(lat1)*sin(delta_13)*cos(theta_13) )
Dlon_13 = atan2( sin(theta_13)*sin(delta_13)*cos(lat1), cos(delta_13)-sin(lat1)*sin(lat3) )
lon3 = lon1 + Dlon_13;
lat3 = degrees(lat3)
lon3 = (degrees(lon3)+540)%360-180
## the distance from the observer's position to the calculated position of the aircraft when it crosses the azimuth of the moon
dst_h2x = round(haversine((my_lat, my_lon), (lat3, lon3)),1)
if dst_h2x > 500:
return 0
if dst_h2x == 0:
dst_h2x = 0.001
## tu test wysokosci na metrach nie ft
# if elevation < 2166:
# my_elevation = 0 ## taka sama wysokość punktu obserwacji n.p.m jak pas na EPPO
# else:
# my_elevation = my_elevation_const
if not is_int_try(elevation):
return 0
altitude1 = degrees(atan(int(elevation - my_elevation)/(dst_h2x*1000)))
azimuth1 = atan2(sin(radians(lon3-my_lon)) * cos(radians(lat3)), cos(radians(my_lat)) * sin(radians(lat3)) - sin(radians(my_lat))*cos(radians(lat3))*cos(radians(lon3-my_lon)))
azimuth1 = round(((degrees(azimuth1) + 360) % 360),1)
## the distance from the CURRENT position of the airplane to the calculated position of the aircraft when it CROSSES the azimuth of the moon
dst_p2x = round(haversine((plane_pos[0], plane_pos[1]), (lat3, lon3)),1)
velocity = int(velocity)
delta_time = (dst_p2x / velocity)*3600
moon_alt_B = 90.00 - moon_alt
ideal_dist = (sin(radians(moon_alt_B))*elevation) / sin(radians(moon_alt)) / 1000
ideal_lat = asin((sin(radians(my_lat)) * cos(ideal_dist/earth_R)) + (cos(radians(my_lat)) * sin(ideal_dist/earth_R) * cos(radians(moon_az))))
ideal_lon = radians(my_lon) + atan2((sin(radians(moon_az))*sin(ideal_dist/earth_R)*cos(radians(my_lat))), (cos(ideal_dist/earth_R)-((sin(radians(my_lat))) * sin(ideal_lat))))
ideal_lat = degrees(ideal_lat)
ideal_lon = degrees(ideal_lon)
ideal_lon = (ideal_lon+540)%360-180
return lat3, lon3, azimuth1, altitude1, dst_h2x, dst_p2x, delta_time, 0, moon_az, moon_alt
## 0 1 2 3 4 5 6 7 8 9
def dist_col(distance):
if (distance <= 300 and distance > 100):
return PURPLE
elif (distance <= 100 and distance > 50):
return CYAN
elif (distance <= 50 and distance > 30):
return YELLOW
elif (distance <= 30 and distance > 15):
elif (distance <= 15 and distance >0):
def alt_col(altitude):
if (altitude >= 5 and altitude < 15):
return PURPLE
elif (altitude >= 15 and altitude < 25):
return CYAN
elif (altitude >= 25 and altitude < 30):
return YELLOW
elif (altitude >= 30 and altitude < 45):
elif (altitude >=45 and altitude <= 90):
return GREEN
def elev_col(elevation):
if (elevation >= 4000 and elevation <= 8000):
return PURPLE
elif (elevation >= 2000 and elevation < 4000):
return GREEN
elif (elevation > 0 and elevation < 2000):
return YELLOW
return RESET
def wind_deg_to_str1(deg):
if deg >= 11.25 and deg < 33.75: return 'NNE'
elif deg >= 33.75 and deg < 56.25: return 'NE'
elif deg >= 56.25 and deg < 78.75: return 'ENE'
elif deg >= 78.75 and deg < 101.25: return 'E'
elif deg >= 101.25 and deg < 123.75: return 'ESE'
elif deg >= 123.75 and deg < 146.25: return 'SE'
elif deg >= 146.25 and deg < 168.75: return 'SSE'
elif deg >= 168.75 and deg < 191.25: return 'S'
elif deg >= 191.25 and deg < 213.75: return 'SSW'
elif deg >= 213.75 and deg < 236.25: return 'SW'
elif deg >= 236.25 and deg < 258.75: return 'WSW'
elif deg >= 258.75 and deg < 281.25: return 'W'
elif deg >= 281.25 and deg < 303.75: return 'WNW'
elif deg >= 303.75 and deg < 326.25: return 'NW'
elif deg >= 326.25 and deg < 348.75: return 'NNW'
else: return 'N'
def gong():
global gong_t
aktual_gong_t = datetime.datetime.now()
diff_gong_t = (aktual_gong_t - gong_t).total_seconds()
if (diff_gong_t > 2):
gong_t = aktual_gong_t
print('\a') ## TERMINAL GONG!
def is_float_try(value):
return True
except ValueError:
return False
def is_int_try(value):
return True
except ValueError:
return False
def get_metar_press():
global metar_t
global pressure
aktual_metar_t = datetime.datetime.now()
diff_metar_t = (aktual_metar_t - metar_t).total_seconds()
if diff_metar_t > 900:
metar_t = aktual_metar_t
response = requests.get(metar_url)
if response.status_code == 200:
metar_data = response.text
pressure_match = re.search(r'Q(\d{4})', metar_data)
if pressure_match:
pressure = int(pressure_match.group(1))
if 800 < pressure < 1100:
return pressure
return 1013 # Wartość domyślna w przypadku nierealistycznego odczytu
return 1013 # Wartość domyślna, jeśli brak ciśnienia w danych
return 1013 # Wartość domyślna, jeśli odpowiedź serwera nie jest 200 OK
except requests.exceptions.RequestException as e:
print("Error retrieving METAR data: ", e)
return pressure # Zwraca ostatnio znaną wartość ciśnienia, jeśli wystąpi błąd
return pressure # Zwraca ostatnio znaną wartość ciśnienia, jeśli nie jest czas na aktualizację
def tabela():
global last_t
gatech.date = ephem.now()
vs = ephem.Moon(gatech)
vm = ephem.Sun(gatech)
moon_alt, moon_az= round(math.degrees(vm.alt), 1), round(math.degrees(vm.az), 1)
sun_alt, sun_az= round(math.degrees(vs.alt), 1), round(math.degrees(vs.az), 1)
diff_t = (aktual_t - last_t).total_seconds()
## Update freq 1=1s, 0=realtime
if (diff_t > 1):
last_t = aktual_t
print 'Age of last received message (s): -------------------------------------------------------'
print 'Time to cross azimuth of '+'{:15}'.format(str(v.name))+'------------------------------------------ |'
print 'Distance from observer pos to predicted cross: --------------------------- | |'
print 'Distance from plane pos to crossing: ------------------------------- | | |'
print 'Predicted angle between crossing and obj: ------------------- | | | |'
print 'Closest distance to observer: ------------------------ | | | | |'
print 'Airplane visible at deg above horizon: --------- | | | | | |'
print 'Airplane visible at azimuth: ------------ | | | | | | |'
print ' | | | | | | | |'
## flight elev dist trck news azmth alt warn Sep p2x h2x time2X age
print ("Flight info ----------|-------|Pred. closest |- Current Az/Alt ----|--- Transits:", vm.name, moon_az, moon_alt,' & ', vs.name, sun_az, sun_alt )
print ('{:9} {:>6} {:>6} {} {:>5} {} {:>6} {:>7} {} {:>5} {:>6} {:>5} {} {:>7} {:>7} {:>7} {:>8} {} {:>7} {:>7} {:>7} {:>7} {} {:>5}'.format(\
' icao or', ' (m)', '(d)', '|', '(km)', '|', '(km)', '(d)', '|', '(d)', '(d)', '(l)', ' |', '(d)', '(km)', '(km)', ' (s)', '|', '(d)', '(km)', '(km)', ' (s)', ' |', '(s)'))
print ('{:9} {:>6} {:>6} {} {:>5} {} {:>6} {:>7} {} {:>5} {:>6} {:>5} {} {:>7} {:>7} {:>7} {:>8} {} {:>7} {:>7} {:>7} {:>7} {} {:>5}'.format(\
' flight', 'elev', 'trck', '|', 'dist', '|', '[warn]', '[Alt]', '|', 'Alt', 'Azim', 'Azim', ' |', 'Sep', 'p2x', 'h2x', 'time2X', '|', 'Sep', 'p2x', 'h2x', 'time2X', ' |', 'age'))
print ("------------------------|-------|----------------|---------------------|----------------------------------|----------------------------------|-------")
## Subloop through all entries
for pentry in plane_dict:
distance = plane_dict[pentry][5]
distance = float(distance) # Konwersja wartości odległości na typ float
except ValueError:
continue # Jeśli konwersja się nie powiedzie, pomijamy ten wpis
if (plane_dict[pentry][5] <= 250):
if plane_dict[pentry][17] != "":
then = plane_dict[pentry][17]
now = datetime.datetime.now()
diff_seconds = (now - then).total_seconds()
diff_seconds = 999
then1 = plane_dict[pentry][0]
now1 = datetime.datetime.now()
diff_minutes = (now1 - then1).total_seconds() / 60.
wiersz = ''
if plane_dict[pentry][1] != "":
wiersz += '{} {:7} {}'.format(YELLOW, str(plane_dict[pentry][1]), RESET) ##flight
wiersz += '{} {:7} {}'.format(RESET, str(pentry), RESET) ##flight
if is_float_try(plane_dict[pentry][4]):
wiersz += '{} {:>6} {}'.format(elev_col(elevation), str(elevation),RESET) ##elev
wiersz += '{:>5}'.format(str(plane_dict[pentry][11])) ## trck
wiersz += ' |'
wiersz += '{} {:>5} {}'.format(dist_col(plane_dict[pentry][5]), str(plane_dict[pentry][5]),RESET) ## dist
wiersz += '|'
if (plane_dict[pentry][12] == 'WARNING' and plane_dict[pentry][9] != "RECEDING"):
wiersz += '{}{}{:>5}{}{}'.format(str('['),REDALERT, str( plane_dict[pentry][13]),RESET, str(']')) ## warn
elif (plane_dict[pentry][12] == 'WARNING' and plane_dict[pentry][9] == "RECEDING"):
wiersz += '{}{}{:>5}{}{}'.format(str('['),RED, str( plane_dict[pentry][13]),RESET, str(']')) ## warn
elif (plane_dict[pentry][12] != 'WARNING' and plane_dict[pentry][9] == "RECEDING"):
wiersz += '{}{}{:>5}{}{}'.format(str('['),PURPLEDARK, str( plane_dict[pentry][13]),RESET, str(']')) ## warn
wiersz += '{}{}{:>5}{}{}'.format(str('['),PURPLE, str(plane_dict[pentry][13]),RESET, str(']')) ## warn
if is_float_try(plane_dict[pentry][13]):
if plane_dict[pentry][13] == 0:
altitudeX = round(degrees(atan((elevation - my_elevation)/(float(0.01)*1000))) ,1)
altitudeX = round(degrees(atan((elevation - my_elevation)/(float(plane_dict[pentry][13])*1000))) ,1)
altitudeX = '0'
wiersz += '{}{}{:>5}{}{}'.format(str(' ['), str(alt_col(float(altitudeX))), str(altitudeX),RESET, str(']'))
wiersz += ' |'
wiersz += '{} {:>5} {}'.format(alt_col(plane_dict[pentry][7]), str(plane_dict[pentry][7]),RESET) ## Alt
if diff_seconds >= 999:
wiersz += '{}'.format(RED+str("x") +RESET)
elif diff_seconds > 30:
wiersz += '{}'.format(RED+str("!") +RESET)
elif diff_seconds > 15:
wiersz += '{}'.format(YELLOW+ str("!")+ RESET)
elif diff_seconds > 10:
wiersz += '{}'.format(GREENFG+ str("!")+ RESET)
wiersz += '{}'.format(GREENFG+str("o") +RESET)
wiersz += '{:>6}'.format(str(plane_dict[pentry][6])) ## Az
wiersz += '{:>6}'.format(str(wind_deg_to_str1(plane_dict[pentry][6]))) ## news
wiersz += ' |'
thenx = plane_dict[pentry][0]
nowx = datetime.datetime.now()
diff_secx = (nowx - thenx).total_seconds()
if is_float_try(plane_dict[pentry][24]) and is_float_try(plane_dict[pentry][23]):
separation_deg = float(plane_dict[pentry][24]-plane_dict[pentry][23])
separation_deg = 90.0
if (-transit_separation_GREENALERT_FG < separation_deg < transit_separation_GREENALERT_FG):
wiersz += '{} {:>6} {}'.format(GREENALERT, "{:.2f}".format(plane_dict[pentry][24]-plane_dict[pentry][23]),RESET) ## SEPARACJA
wiersz += '{:>8}'.format(str(plane_dict[pentry][27])) ## DISTANCE: AIRPLANE POS TO AIRPLANE PATH CROSS
wiersz += '{:>7}'.format(str(round(plane_dict[pentry][25],1))) ## DISTANCE MY_POS TO CROSS POINT
wiersz += '{:>10}'.format(str(plane_dict[pentry][26])) ## delta_tim ## TIME UNTIL PLANE ARRIVE AT CROS
# wiersz += '{} {:>6} {}'.format(RED, "{:.2f}".format(plane_dict[pentry][19]-plane_dict[pentry][18]), RESET) ## SEPARACJA
elif (-transit_separation_REDALERT_FG < separation_deg < transit_separation_REDALERT_FG):
wiersz += '{} {:>6} {}'.format(REDALERT, "{:.2f}".format(plane_dict[pentry][24]-plane_dict[pentry][23]),RESET) ## SEPARACJA
wiersz += '{:>8}'.format(str(plane_dict[pentry][27])) ## DISTANCE: AIRPLANE POS TO AIRPLANE PATH CROSS
wiersz += '{:>7}'.format(str(round(plane_dict[pentry][25],1))) ## DISTANCE MY_POS TO CROSS POINT
wiersz += '{:>10}'.format(str(plane_dict[pentry][26])) ## delta_tim ## TIME UNTIL PLANE ARRIVE AT CROSS POINT
elif (-transit_separation_notignored < separation_deg < transit_separation_notignored):
wiersz += '{} {:>6} {}'.format(RED, str(plane_dict[pentry][24]-plane_dict[pentry][23]),RESET) ## SEPARACJA
wiersz += '{:>8}'.format(str(plane_dict[pentry][27])) ## DISTANCE: AIRPLANE POS TO AIRPLANE PATH CROSS
wiersz += '{:>7}'.format(str(round(plane_dict[pentry][25],1))) ## DISTANCE MY_POS TO CROSS POINT
wiersz += '{:>10}'.format(str(plane_dict[pentry][26])) ## delta_tim ## TIME UNTIL PLANE ARRIVE AT CROSS POINT
wiersz += '{:>8}'.format(str("---")) ## SEPARACJA
wiersz += '{:>8}'.format(str("---")) ## DISTANCE: AIRPLANE POS TO AIRPLANE PATH CROSS
wiersz += '{:>7}'.format(str("---")) ## DISTANCE MY_POS TO CROSS POINT
wiersz += '{:>10}'.format(str("---")) ## delta_tim ## TIME UNTIL PLANE ARRIVE AT CROSS POINT
wiersz += ' |'
if is_float_try(plane_dict[pentry][19]) and is_float_try(plane_dict[pentry][18]):
separation_deg2 = float(plane_dict[pentry][19]-plane_dict[pentry][18])
separation_deg2 = 90.0
if (-transit_separation_GREENALERT_FG < separation_deg2 < transit_separation_GREENALERT_FG):
wiersz += '{} {:>6} {}'.format(GREENALERT, "{:.2f}".format(plane_dict[pentry][19]-plane_dict[pentry][18]),RESET) ## SEPARACJA
wiersz += '{:>8}'.format(str(plane_dict[pentry][21])) ## DISTANCE: AIRPLANE POS TO AIRPLANE PATH CROSS
wiersz += '{:>7}'.format(str(round(plane_dict[pentry][20],1))) ## DISTANCE MY_POS TO CROSS POINT
wiersz += '{:>10}'.format(str(plane_dict[pentry][22])) ## delta_tim ## TIME UNTIL PLANE ARRIVE AT CROS
elif (-transit_separation_REDALERT_FG < separation_deg2 < transit_separation_REDALERT_FG):
wiersz += '{} {:>6} {}'.format(REDALERT, "{:.2f}".format(plane_dict[pentry][19]-plane_dict[pentry][18]),RESET) ## SEPARACJA
wiersz += '{:>8}'.format(str(plane_dict[pentry][21])) ## DISTANCE: AIRPLANE POS TO AIRPLANE PATH CROSS
wiersz += '{:>7}'.format(str(round(plane_dict[pentry][20],1))) ## DISTANCE MY_POS TO CROSS POINT
wiersz += '{:>10}'.format(str(plane_dict[pentry][22])) ## delta_tim ## TIME UNTIL PLANE ARRIVE AT CROSS POINT
elif (-transit_separation_notignored < separation_deg2 < transit_separation_notignored):
wiersz += '{} {:>6} {}'.format(RED, "{:.2f}".format(plane_dict[pentry][19]-plane_dict[pentry][18]),RESET) ## SEPARACJA
wiersz += '{:>8}'.format(str(plane_dict[pentry][21])) ## DISTANCE: AIRPLANE POS TO AIRPLANE PATH CROSS
wiersz += '{:>7}'.format(str(round(plane_dict[pentry][20],1))) ## DISTANCE MY_POS TO CROSS POINT
wiersz += '{:>10}'.format(str(plane_dict[pentry][22])) ## delta_tim ## TIME UNTIL PLANE ARRIVE AT CROSS POINT
wiersz += '{:>8}'.format(str("---")) ## SEPARACJA
wiersz += '{:>8}'.format(str("---")) ## DISTANCE: AIRPLANE POS TO AIRPLANE PATH CROSS
wiersz += '{:>7}'.format(str("---")) ## DISTANCE MY_POS TO CROSS POINT
wiersz += '{:>10}'.format(str("---")) ## delta_tim ## TIME UNTIL PLANE ARRIVE AT CROSS POINT
wiersz += ' |'
wiersz += '{:>6}'.format(str(round(diff_secx, 1)))
wiersz += " "+str(len(plane_dict[pentry][15]))+" "+str(len(plane_dict[pentry][16]))+" "
wiersz +=str(diff_seconds)
print (wiersz)
if plane_dict[pentry][17] != "":
then = plane_dict[pentry][17]
now = datetime.datetime.now()
diff_seconds = (now - then).total_seconds()
diff_seconds = 999
if plane_dict[pentry][1] != "":
wiersz = ''
wiersz += '{} {:7} {}'.format(YELLOW, str(plane_dict[pentry][1]), RESET) ##flight
wiersz = ''
wiersz += '{} {:7} {}'.format(RESET, str(pentry),RESET) ##icao
if plane_dict[pentry][4] != "":
if is_float_try(plane_dict[pentry][4]):
wiersz += '{} {:>6} {}'.format(elev_col(elevation), str(elevation),RESET) ##elev
wiersz += '{} {:>6} {}'.format(RESET,str('---'),RESET) ##elev
if plane_dict[pentry][11] != "":
wiersz += '{:>5}'.format(str(plane_dict[pentry][11])) ##track
wiersz += '{:>5}'.format(str('---'))
wiersz += ' |'
if plane_dict[pentry][5] != "":
wiersz = ''
wiersz += '{} {:>5} {}'.format(RESET, str(plane_dict[pentry][5]), RESET) ##flight
wiersz += '{} {:>5} {}'.format(RESET, str('---'),RESET)
wiersz += '|'
wiersz += '{} {:>5} {}'.format(RESET, str('---'),RESET) ## warn
wiersz += '{} {:>5} {}'.format(RESET, str('---'),RESET) ## alt
wiersz += ' |'
wiersz += '{} {:>5} {}'.format(RESET, str('---'),RESET) ## alt
if diff_seconds > 5:
wiersz += '{}'.format(str("!"))
wiersz += '{}'.format(str(" "))
wiersz += '{:>7}'.format(str('---')) ## az1 news
wiersz += '{:>5}'.format(str('---')) ## az2
wiersz += ' |'
wiersz += '{} {:>7} {}'.format(RESET, str('---'),RESET) ## Sep
wiersz += '{:>7}'.format(str('---'))
wiersz += '{:>7}'.format(str('---'))
wiersz += '{:>10}'.format(str('---'))
wiersz += ' |'
wiersz += '{} {:>7} {}'.format(RESET, str('---'),RESET) ## Sep
wiersz += '{:>7}'.format(str('---'))
wiersz += '{:>7}'.format(str('---'))
wiersz += '{:>10}'.format(str('---'))
wiersz += ' |'
thenx = plane_dict[pentry][0]
nowx = datetime.datetime.now()
diff_secx = (nowx - thenx).total_seconds()
wiersz += '{:>6}'.format(str(round(diff_secx, 1)))
wiersz += " "+str(len(plane_dict[pentry][15]))+" "+str(len(plane_dict[pentry][16]))+" "
wiersz +=str(diff_seconds)
print (wiersz)
lastline+= " --- "
lastline+= str(len (plane_dict))
lastline+= " --- "
lastline+= str(int(diff_t))
lastline+= " --- "+ str(pressure)+"hPa"
print (lastline)
return moon_alt, moon_az, sun_alt, sun_az
moon_alt, moon_az, sun_alt, sun_az = tabela()
# loop through all records from dump1090 port 30003 input stream on stdin
while True:
if not line: continue # Skip empty lines
aktual_t = datetime.datetime.now()
if line in ['\n', '\r\n']:
if (datetime.datetime.now() - last_update_time).total_seconds() > 120: # 120 sekund = 2 minuty
last_update_time = datetime.datetime.now() # Reset czasu po wyczyszczeniu słownika
continue # Pomiń czyszczenie słownika, jeśli nie minęły 2 minuty
# divide input line into parts and extract desired values
parts = line.split(",")
type = parts[1].strip()
icao = parts[4].strip()
date = parts[6].strip()
time = parts[7].strip()
date_time_local = datetime.datetime.strptime(date + " " + time, '%Y/%m/%d %H:%M:%S.%f')
date_time_iso = datetime.datetime.strftime(date_time_local, '%Y-%m-%dT%H:%M:%S.%f') + str("%+d" % (-timezone_hours)).zfill(3)
# if type 1 record then extract datetime/flight and create or update dictionary
if (type == "1"): # this record type contains the aircraft 'flight' identifier
flight = parts[10].strip()
if (icao not in plane_dict):
plane_dict[icao] = [date_time_local, flight, "", "", "", "", "", "", "", "", "", "", "", "", "", [], [], "", "", "", "", "", "", "", "", "", "", "", ""]
plane_dict[icao][0] = date_time_local
plane_dict[icao][1] = flight
last_update_time = datetime.datetime.now() # Aktualizacja czasu po modyfikacji słownika
# if type 5 flight elevation
if (type == "5"):
flight = parts[10].strip()
elevation = parts[11].strip()
if is_int_try(elevation):
# print elevation
if elevation > 6500:
pressure = int(get_metar_press())
elevation = elevation + ((1013 - pressure)*26)
my_elevation = my_elevation_const
my_elevation = near_airport_elevation # -90 90 0 ???## taka sama wysokość punktu obserwacji n.p.m jak pas na EPPO
## powyzsze tu nic nie robi
if (metric_units):
elevation = float((elevation * 0.3048)) # convert elevation feet to meters
elevation = ""
if (icao not in plane_dict):
plane_dict[icao] = [date_time_local, flight, "", "", elevation, "", "", "", "", "", "", "", "", "", "", [], [], "", "", "", "", "", "", "", "", "", "", "", ""]
plane_dict[icao][4] = elevation
plane_dict[icao][0] = date_time_local
last_update_time = datetime.datetime.now() # Aktualizacja czasu po modyfikacji słownika
if flight != '':
plane_dict[icao][1] = flight
# if type 4 record then extract speed/track
if (type == "4"): # this record type contains the aircraft 'track' & 'velocity' identifier
velocity = parts[12].strip()
track = parts[13].strip()
if is_int_try(velocity):
velocity_kmh = round(int(velocity)*1.852)
velocity = velocity_kmh
velocity = 900
if (icao not in plane_dict):
plane_dict[icao] = [date_time_local, "", "", "", "", "", "", "", "", "", "", track, "", "", velocity, [], [], "", "", "", "", "", "", "", "", "", "", "", ""]
plane_dict[icao][0] = date_time_local
plane_dict[icao][11] = track
plane_dict[icao][14] = velocity
last_update_time = datetime.datetime.now() # Aktualizacja czasu po modyfikacji słownika
# if type 3 record then extract datetime/elevation/lat/lon, calculate distance/azimuth/altitude, and create or update dictionary
if (type == "3"): # this record type contains the aircraft 'ICAO' identifier, lat/lon, and elevation
elevation = parts[11].strip() # assumes dump1090 is outputting elevation in feet
# Try to convert elevation to an integer
elevation = int(elevation)
except ValueError:
elevation = 0 # Default to 0 if conversion fails
if is_int_try(elevation):
if elevation > 6500:
pressure = int(get_metar_press())
elevation = elevation + ((1013 - pressure)*26)
my_elevation = my_elevation_const
my_elevation = near_airport_elevation # -90 90 0 ???## taka sama wysokość punktu obserwacji n.p.m jak pas na EPxx
if (metric_units):
elevation = float((elevation * 0.3048)) # convert elevation feet to meters
elevation = ""
elevation_units = "ft"
distance_units = "mi"
plane_lat = float(parts[14])
except ValueError:
plane_lat = 0.0
plane_lon = float(parts[15])
except ValueError:
plane_lon = 0.0
if not plane_lat == '':
if not plane_lon == '':
distance = round(haversine((my_lat, my_lon), (plane_lat, plane_lon)),1)
azimuth = atan2(sin(radians(plane_lon-my_lon))*cos(radians(plane_lat)), cos(radians(my_lat))*sin(radians(plane_lat))-sin(radians(my_lat))*cos(radians(plane_lat))*cos(radians(plane_lon-my_lon)))
azimuth = round(((degrees(azimuth) + 360) % 360),1)
if distance == 0:
distance = 0.01
altitude = degrees(atan((elevation - my_elevation)/(distance*1000))) # distance converted from kilometers to meters to match elevation
altitude = round(altitude,1)
if (icao not in plane_dict):
plane_dict[icao] = [date_time_local, "", plane_lat, plane_lon, elevation, distance, azimuth, altitude, "", "", distance, "", "", "", "", [], [], "", "", "", "", "", "", "", "", "", "", "", ""]
plane_dict[icao][15] = []
plane_dict[icao][16] = []
last_update_time = datetime.datetime.now() # Aktualizacja czasu po modyfikacji słownika
# figure out if plane is approaching/holding/receding
min_distance = plane_dict[icao][10]
min_distance = float(min_distance) # Próba konwersji na float
except ValueError:
min_distance = float('inf') # Ustawienie bardzo dużej liczby, jeśli konwersja się nie powiedzie
if (distance < min_distance):
plane_dict[icao][9] = "APPROACHING"
plane_dict[icao][10] = distance
elif (distance > min_distance):
plane_dict[icao][9] = "RECEDING"
plane_dict[icao][9] = "HOLDING"
plane_dict[icao][0] = date_time_local
plane_dict[icao][2] = plane_lat
plane_dict[icao][3] = plane_lon
plane_dict[icao][4] = elevation
plane_dict[icao][5] = distance
plane_dict[icao][6] = azimuth
plane_dict[icao][7] = altitude
last_update_time = datetime.datetime.now() # Aktualizacja czasu po modyfikacji słownika
if plane_dict[icao][17] == '':
plane_dict[icao][17] = date_time_local
last_update_time = datetime.datetime.now() # Aktualizacja czasu po modyfikacji słownika
then = plane_dict[icao][17]
now = datetime.datetime.now()
diff_seconds = (now - then).total_seconds()
if (diff_seconds > 6):
plane_dict[icao][17] = date_time_local
poz_az = str(plane_dict[icao][6])
poz_alt = str(plane_dict[icao][7])
# if matched record between type 1/3 occurs, log stats to stdout and also email if entering/leaving detection zone
# if ((type == "1" or type == "3" or type == "4") and (icao in plane_dict and plane_dict[icao][1] != "" and plane_dict[icao][2] != "" and plane_dict[icao][11] != "")):
if ((type == "1" or type == "3" or type == "4") and (icao in plane_dict and plane_dict[icao][2] != "" and plane_dict[icao][11] != "")):
flight = plane_dict[icao][1]
plane_lat = plane_dict[icao][2]
plane_lon = plane_dict[icao][3]
elevation = plane_dict[icao][4]
distance = plane_dict[icao][5]
azimuth = plane_dict[icao][6]
altitude = plane_dict[icao][7]
track = plane_dict[icao][11]
warning = plane_dict[icao][12]
direction = plane_dict[icao][9]
velocity = plane_dict[icao][14]
xtd = crosstrack(distance, (180 + azimuth) % 360, track)
plane_dict[icao][13] = xtd
if (xtd <= xtd_tst and distance < warning_distance and warning == "" and direction != "RECEDING"):
plane_dict[icao][12] = "WARNING"
plane_dict[icao][13] = xtd
if (xtd > xtd_tst and distance < warning_distance and warning == "WARNING" and direction != "RECEDING"):
plane_dict[icao][12] = ""
plane_dict[icao][13] = xtd
if (plane_dict[icao][8] == ""):
plane_dict[icao][8] = "LINKED!"
# if plane enters detection zone, send email and begin history capture
# if ((elevation <= 8000) and distance <= 30):
# gong()
if (plane_dict[icao][5] <= alert_distance and plane_dict[icao][8] != "ENTERING"):
plane_dict[icao][8] = "ENTERING"
# if plane leaves detection zone, generate email and include history capture
if (plane_dict[icao][5] > alert_distance and plane_dict[icao][8] == "ENTERING"):
plane_dict[icao][8] = "LEAVING"
## Transit check
tst_int1 = transit_pred((my_lat, my_lon), (plane_lat, plane_lon), track, velocity, elevation, moon_alt, moon_az)
tst_int2 = transit_pred((my_lat, my_lon), (plane_lat, plane_lon), track, velocity, elevation, sun_alt, sun_az)
if tst_int1 != 0:
# Przygotowanie danych do logowania
transit_info = {
'min_distance': tst_int1[4],
'plane_az': tst_int1[2],
'plane_alt': tst_int1[3],
'celestial_az': tst_int1[8],
'celestial_alt': tst_int1[9]
log_transits(icao, flight, transit_info, 'Moon')
if tst_int2 != 0:
# Przygotowanie danych do logowania
transit_info = {
'min_distance': tst_int2[4],
'plane_az': tst_int2[2],
'plane_alt': tst_int2[3],
'celestial_az': tst_int2[8],
'celestial_alt': tst_int2[9]
log_transits(icao, flight, transit_info, 'Sun')
if tst_int1 == 0:
#if (plane_dict[icao][23] == ''):
alt_a = 90.0 # moon_alt
dst_p2x = 996
delta_time = 0
dst_h2x = 998
plane_dict[icao][23] = 999.0 ## MOON ALT
plane_dict[icao][24] = alt_a
plane_dict[icao][25] = dst_h2x ## dst_h2x ## DISTANCE MY_POS TO CROSS POINT
plane_dict[icao][26] = delta_time
plane_dict[icao][27] = dst_p2x ## dst_p2x ## DISTANCE PLANE TO MOON AZIMUTH (CROSS)
alt_a = round(tst_int1[3],2) ## altitude1 ## ALT OF CROSS POINT
dst_h2x = round(tst_int1[4],2) ## dst_h2x ## DISTANCE MY_POS TO CROSS POINT
dst_p2x = round(tst_int1[5],2) ## dst_p2x ## DISTANCE PLANE TO MOON AZIMUTH (CROSS)
delta_time = int(tst_int1[6]) ## delta_tim ## TIME UNTIL PLANE ARRIVE AT CROSS POINT
plane_dict[icao][25] = dst_h2x ## dst_h2x ## DISTANCE MY_POS TO CROSS POINT
plane_dict[icao][23] = moon_alt
plane_dict[icao][24] = alt_a
plane_dict[icao][26] = delta_time
plane_dict[icao][27] = dst_p2x ## dst_p2x ## DISTANCE PLANE TO MOON AZIMUTH (CROSS)
if is_float_try(plane_dict[icao][24]) and is_float_try(plane_dict[icao][23]):
separation_deg = float(plane_dict[icao][24]-plane_dict[icao][23])
separation_deg = 90.0
if (-transit_separation_sound_alert < separation_deg < transit_separation_sound_alert):
# sun
gong() # SUN!!!
# pass
if tst_int2 == 0:
alt_a = 90.0 # moon_alt
dst_p2x = 996
delta_time = 0
dst_h2x = 998
plane_dict[icao][18] = 999.0 ## SUN ALT
plane_dict[icao][19] = alt_a
plane_dict[icao][20] = dst_h2x ## dst_h2x ## DISTANCE MY_POS TO CROSS POINT
plane_dict[icao][22] = delta_time
plane_dict[icao][21] = dst_p2x ## dst_p2x ## DISTANCE PLANE TO SUN AZIMUTH (CROSS)
alt_a = round(tst_int2[3],2) ## altitude1 ## ALT OF CROSS POINT
dst_h2x = round(tst_int2[4],2) ## dst_h2x ## DISTANCE MY_POS TO CROSS POINT
dst_p2x = round(tst_int2[5],2) ## dst_p2x ## DISTANCE PLANE TO SUN AZIMUTH (CROSS)
delta_time = int(tst_int2[6]) ## delta_tim ## TIME UNTIL PLANE ARRIVE AT CROSS POINT
plane_dict[icao][20] = dst_h2x ## dst_h2x ## DISTANCE MY_POS TO CROSS POINT
plane_dict[icao][18] = sun_alt
plane_dict[icao][19] = alt_a
plane_dict[icao][22] = delta_time
plane_dict[icao][21] = dst_p2x ## dst_p2x ## DISTANCE PLANE TO SUN AZIMUTH (CROSS)
if is_float_try(plane_dict[icao][19]) and is_float_try(plane_dict[icao][18]):
separation_deg2 = float(plane_dict[icao][19]-plane_dict[icao][18])
separation_deg2 = 90.0
if (-transit_separation_sound_alert < separation_deg2 < transit_separation_sound_alert):
# moon
# pass
moon_alt, moon_az, sun_alt, sun_az = tabela()
clean_dict() # Czyści przestarzałe wpisy z `plane_dict`