backend/scan/views.py
2025-03-06 16:31:45 +10:00

215 lines
8.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from rest_framework.response import Response
from rest_framework.views import APIView
from collections import defaultdict
from .models import ScannedCode
from .serializers import ScannedCodeSerializer
# Константы API URL и креденшелы
LOGIN_URL_ASPU = "http://192.168.254.10:3428/api/login"
API_URL_ASPU_MANAGEMENT = "http://192.168.254.10:3428/api/Management/"
API_URL_MANAGEMENT_DEVIN = "http://192.168.254.20:3428/api/Management/"
API_URL_MANAGEMENT_JR = "http://192.168.254.30:3428/api/Management/"
API_URL_MANAGEMENT_5L = "http://192.168.254.40:3428/api/Management/"
API_URL_MANAGEMENT_19L = "http://192.168.254.50:3428/api/Management/"
API_URL_ASPU_MASSAGE_LIST_SIPA = "http://192.168.254.10:3428/api/Messages/"
API_URL_ASPU_MASSAGE_LIST_DEVIN = "http://192.168.254.20:3428/api/Messages/"
API_URL_ASPU_MASSAGE_LIST_JR = "http://192.168.254.30:3428/api/Messages/"
API_URL_ASPU_MASSAGE_LIST_5L = "http://192.168.254.40:3428/api/Messages/"
API_URL_ASPU_MASSAGE_LIST_19L = "http://192.168.254.50:3428/api/Messages/"
USERNAME = "superuser"
PASSWORD = "Superuser1105"
# Глобальные переменные для хранения токена
token_cache = {"token": None, "timestamp": 0}
TOKEN_LIFETIME = 1440 * 60 # 24 часа в секундах
def get_new_token(url):
"""Получение нового токена с кешированием."""
global token_cache
current_time = time.time()
if token_cache["token"] and (current_time - token_cache["timestamp"] < TOKEN_LIFETIME):
return token_cache["token"]
payload = {"UserName": USERNAME, "Password": PASSWORD}
try:
response = requests.post(url, json=payload, timeout=5)
response_data = response.json()
if response.status_code == 200 and response_data.get("IsSuccess"):
token_cache["token"] = response_data["Value"]["Token"]
token_cache["timestamp"] = current_time
return token_cache["token"]
logging.error(f"Ошибка получения токена: {response_data}")
return None
except requests.RequestException as e:
logging.error(f"Ошибка сети при получении токена: {str(e)}")
return None
def request_to_api(url, token, payload=None, timeout=10):
"""Отправка запроса к API."""
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
try:
response = requests.post(url, json=payload or {}, headers=headers, timeout=timeout)
response.raise_for_status()
return response.json()
except requests.Timeout:
logging.error(f"Тайм-аут при запросе {url}")
except requests.RequestException as e:
logging.error(f"Ошибка запроса {url}: {str(e)}")
return None
def extract_product_id(text):
"""Функция извлечения идентификатора продукта из текста."""
import re
match = re.search(r'Product\s*ID:\s*(\d+)', text)
return match.group(1) if match else None
def create_response(status, message=None, data=None, status_code=200):
"""Создание стандартного ответа API."""
return Response({
"status": status,
"message": message,
"data": data
}, status=status_code)
class GetManagementAPIView(APIView):
"""Получение данных управления и сообщений с различных источников."""
def get(self, request):
token = get_new_token(LOGIN_URL_ASPU)
if not token:
return create_response("Error", message="Failed to get token", status_code=500)
management_urls = {
"Sipa": API_URL_ASPU_MANAGEMENT,
"Devin": API_URL_MANAGEMENT_DEVIN,
"JR": API_URL_MANAGEMENT_JR,
"5L": API_URL_MANAGEMENT_5L,
"19L": API_URL_MANAGEMENT_19L
}
message_urls = {
"Sipa": API_URL_ASPU_MASSAGE_LIST_SIPA,
"Devin": API_URL_ASPU_MASSAGE_LIST_DEVIN,
"JR": API_URL_ASPU_MASSAGE_LIST_JR,
"5L": API_URL_ASPU_MASSAGE_LIST_5L,
"19L": API_URL_ASPU_MASSAGE_LIST_19L
}
message_payload = {
"skip": 1,
"take": 50,
"includes": [],
"sort": [{"field": "Time", "dir": "desc"}],
"filter": {"filters": [], "logic": "and"}
}
# Используем defaultdict, чтобы избежать KeyError
all_data = defaultdict(lambda: {
"Name": "",
"ShortName": "",
"BatchName": "",
"Stats": [],
"Sources": [],
"Gtin": "",
"GroupType": "",
"Quantity": 0,
"Messages": []
})
def process_management_response(key, response_data):
"""Обработка данных управления."""
if not response_data or "Value" not in response_data:
logging.error(f"Некорректный ответ от {key}: {response_data}")
return
value_data = response_data["Value"]
batch_name = value_data.get("BatchName", "")
for product in value_data.get("ProductTypes", []):
product_id = product.get("Id")
if not product_id:
continue
all_data[product_id].update({
"Name": product.get("Name", ""),
"ShortName": product.get("ShortName", ""),
"BatchName": batch_name,
"Gtin": product.get("Gtin", ""),
"GroupType": product.get("GroupType", ""),
})
all_data[product_id]["Stats"].extend(product.get("Stats", []))
all_data[product_id]["Sources"].append(key)
if product.get("GroupType") == "Pallet":
all_data[product_id]["Quantity"] = sum(
stat["Value"] for stat in product.get("Stats", []) if stat.get("Type") == "Validated"
)
def process_message_response(key, response_data):
"""Обработка сообщений."""
if not response_data or "Value" not in response_data:
logging.error(f"Некорректный ответ от {key}: {response_data}")
return
for msg in response_data["Value"]:
if "Text" not in msg:
continue
message = {
"Id": msg["Id"],
"Time": msg["Time"],
"Type": msg["Type"],
"Text": msg["Text"]
}
# Добавляем в общий список сообщений
for product_id in all_data:
all_data[product_id]["Messages"].append(message)
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {
executor.submit(request_to_api, url, token, timeout=2): (key, "management")
for key, url in management_urls.items()
}
futures.update({
executor.submit(request_to_api, url, token, payload=message_payload, timeout=2): (key, "messages")
for key, url in message_urls.items()
})
for future in as_completed(futures):
key, request_type = futures[future]
try:
response_data = future.result()
if request_type == "management":
process_management_response(key, response_data)
elif request_type == "messages":
process_message_response(key, response_data)
except Exception as e:
logging.error(f"Ошибка обработки {key}: {str(e)}")
logging.info(f"Отправляем ответ с {len(all_data)} объектами управления")
return create_response("OK", data=dict(all_data)) if all_data else create_response(
"Error", message="No data found", status_code=404
)