64 lines
2.2 KiB
Python
64 lines
2.2 KiB
Python
|
|
import requests
|
|||
|
|
import logging
|
|||
|
|
import time
|
|||
|
|
|
|||
|
|
LOGIN_URL_ASPU = "http://192.168.254.10:3428/api/login"
|
|||
|
|
|
|||
|
|
API_URLS = {
|
|||
|
|
"Sipa": "http://192.168.254.10:3428/api/Management/",
|
|||
|
|
"Devin": "http://192.168.254.20:3428/api/Management/",
|
|||
|
|
"JR": "http://192.168.254.30:3428/api/Management/",
|
|||
|
|
"5L": "http://192.168.254.40:3428/api/Management/",
|
|||
|
|
"19L": "http://192.168.254.50:3428/api/Management/",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
USERNAME = "superuser"
|
|||
|
|
PASSWORD = "Superuser1105"
|
|||
|
|
|
|||
|
|
token_cache = {"token": None, "timestamp": 0}
|
|||
|
|
TOKEN_LIFETIME = 1440 * 60 # 24 часа в секундах
|
|||
|
|
|
|||
|
|
def get_new_token():
|
|||
|
|
"""Получение нового токена с кешированием."""
|
|||
|
|
global token_cache
|
|||
|
|
current_time = time.time()
|
|||
|
|
|
|||
|
|
if token_cache["token"] and (current_time - token_cache["timestamp"] < TOKEN_LIFETIME):
|
|||
|
|
return token_cache["token"]
|
|||
|
|
|
|||
|
|
payload = {"UserName": USERNAME, "Password": PASSWORD}
|
|||
|
|
try:
|
|||
|
|
response = requests.post(LOGIN_URL_ASPU, json=payload, timeout=5)
|
|||
|
|
response_data = response.json()
|
|||
|
|
|
|||
|
|
if response.status_code == 200 and response_data.get("IsSuccess"):
|
|||
|
|
token_cache["token"] = response_data["Value"]["Token"]
|
|||
|
|
token_cache["timestamp"] = current_time
|
|||
|
|
return token_cache["token"]
|
|||
|
|
|
|||
|
|
logging.error(f"Ошибка получения токена: {response_data}")
|
|||
|
|
return None
|
|||
|
|
except requests.RequestException as e:
|
|||
|
|
logging.error(f"Ошибка сети при получении токена: {str(e)}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
async def fetch_lines_data():
|
|||
|
|
"""Асинхронное получение данных с линий."""
|
|||
|
|
token = get_new_token()
|
|||
|
|
if not token:
|
|||
|
|
return {"error": "Не удалось получить токен"}
|
|||
|
|
|
|||
|
|
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|||
|
|
results = {}
|
|||
|
|
|
|||
|
|
for key, url in API_URLS.items():
|
|||
|
|
try:
|
|||
|
|
response = requests.get(url, headers=headers, timeout=5)
|
|||
|
|
response.raise_for_status()
|
|||
|
|
results[key] = response.json()
|
|||
|
|
except requests.RequestException as e:
|
|||
|
|
logging.error(f"Ошибка запроса к {key}: {str(e)}")
|
|||
|
|
results[key] = {"error": str(e)}
|
|||
|
|
|
|||
|
|
return results
|