Загрузка проекта barcode

This commit is contained in:
suhotskiy.ne 2025-03-06 16:31:45 +10:00
commit 0cbc52b7da
118 changed files with 1830 additions and 0 deletions

18
Dockerfile Normal file
View File

@ -0,0 +1,18 @@
# Используем официальный образ Python
FROM python:3.10
# Устанавливаем рабочую директорию в контейнере
WORKDIR /app
# Копируем файлы проекта в контейнер
COPY . /app/
# Устанавливаем зависимости
RUN pip install --no-cache-dir --upgrade pip \
&& pip install -r requirements.txt
# Применяем миграции
RUN python manage.py migrate
# Запускаем сервер Django
CMD ["python", "manage.py", "runserver", "0.0.0.0:8000"]

1
__init__.py Normal file
View File

@ -0,0 +1 @@
from .celery_app import app as celery_app

1
barcode/__init__.py Normal file
View File

@ -0,0 +1 @@
from .celery_app import app as celery_app

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

20
barcode/asgi.py Normal file
View File

@ -0,0 +1,20 @@
"""
ASGI config for barcode project.
It exposes the ASGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/5.1/howto/deployment/asgi/
"""
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from barcode.routing import websocket_urlpatterns
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "barcode.settings")
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": URLRouter(websocket_urlpatterns),
})

11
barcode/celery_app.py Normal file
View File

@ -0,0 +1,11 @@
import os
from celery import Celery
# Óęŕçűâŕĺě ďóňü ę íŕńňđîéęŕě Django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "barcode.settings")
app = Celery("barcode")
# Çŕăđóćŕĺě íŕńňđîéęč Celery čç Django
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

20
barcode/consumers.py Normal file
View File

@ -0,0 +1,20 @@
import json
import asyncio
from channels.generic.websocket import AsyncWebsocketConsumer
from .data_fetcher import fetch_lines_data # Вынесем логику получения данных в отдельный модуль
class LineDataConsumer(AsyncWebsocketConsumer):
async def connect(self):
await self.accept()
self.running = True
await self.send(text_data=json.dumps({"message": "WebSocket connected"}))
asyncio.create_task(self.send_data_periodically())
async def disconnect(self, close_code):
self.running = False
async def send_data_periodically(self):
while self.running:
data = await fetch_lines_data()
await self.send(text_data=json.dumps(data))
await asyncio.sleep(5) # Ждем 5 секунд перед следующим запросом

63
barcode/data_fetcher.py Normal file
View File

@ -0,0 +1,63 @@
import requests
import logging
import time
LOGIN_URL_ASPU = "http://192.168.254.10:3428/api/login"
API_URLS = {
"Sipa": "http://192.168.254.10:3428/api/Management/",
"Devin": "http://192.168.254.20:3428/api/Management/",
"JR": "http://192.168.254.30:3428/api/Management/",
"5L": "http://192.168.254.40:3428/api/Management/",
"19L": "http://192.168.254.50:3428/api/Management/",
}
USERNAME = "superuser"
PASSWORD = "Superuser1105"
token_cache = {"token": None, "timestamp": 0}
TOKEN_LIFETIME = 1440 * 60 # 24 часа в секундах
def get_new_token():
"""Получение нового токена с кешированием."""
global token_cache
current_time = time.time()
if token_cache["token"] and (current_time - token_cache["timestamp"] < TOKEN_LIFETIME):
return token_cache["token"]
payload = {"UserName": USERNAME, "Password": PASSWORD}
try:
response = requests.post(LOGIN_URL_ASPU, json=payload, timeout=5)
response_data = response.json()
if response.status_code == 200 and response_data.get("IsSuccess"):
token_cache["token"] = response_data["Value"]["Token"]
token_cache["timestamp"] = current_time
return token_cache["token"]
logging.error(f"Ошибка получения токена: {response_data}")
return None
except requests.RequestException as e:
logging.error(f"Ошибка сети при получении токена: {str(e)}")
return None
async def fetch_lines_data():
"""Асинхронное получение данных с линий."""
token = get_new_token()
if not token:
return {"error": "Не удалось получить токен"}
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
results = {}
for key, url in API_URLS.items():
try:
response = requests.get(url, headers=headers, timeout=5)
response.raise_for_status()
results[key] = response.json()
except requests.RequestException as e:
logging.error(f"Ошибка запроса к {key}: {str(e)}")
results[key] = {"error": str(e)}
return results

6
barcode/routing.py Normal file
View File

@ -0,0 +1,6 @@
from django.urls import re_path
from .consumers import LineDataConsumer
websocket_urlpatterns = [
re_path(r"ws/lines/$", LineDataConsumer.as_asgi()),
]

203
barcode/settings.py Normal file
View File

@ -0,0 +1,203 @@
"""
Django settings for barcode project.
Generated by 'django-admin startproject' using Django 5.1.5.
For more information on this file, see
https://docs.djangoproject.com/en/5.1/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/5.1/ref/settings/
"""
from pathlib import Path
# -*- coding: utf-8 -*-
# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/5.1/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = 'django-insecure-i#8aotin&c00-&#v@x!moruf-uimxr#c!8pi9ehltop98-@bzr'
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
ALLOWED_HOSTS = []
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_WORKER_POOL = 'solo'
CELERY_WORKER_CONCURRENCY = 1
CELERY_TASK_ALWAYS_EAGER = True
# Application definition
INSTALLED_APPS = [
'inventory',
'batches',
'scan',
'forms',
'users',
"channels",
'rest_framework',
'rest_framework_simplejwt',
'django.contrib.admin',
'corsheaders',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'corsheaders.middleware.CorsMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.common.CommonMiddleware',
]
ROOT_URLCONF = 'barcode.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
WSGI_APPLICATION = 'barcode.wsgi.application'
# Database
# https://docs.djangoproject.com/en/5.1/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'mydatabase',
'USER': 'myuser',
'PASSWORD': 'mypassword',
'HOST': '31.130.144.182',
'PORT': '5432',
'OPTIONS': {
'client_encoding': 'UTF8',
},
}
}
# Password validation
# https://docs.djangoproject.com/en/5.1/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
# Internationalization
# https://docs.djangoproject.com/en/5.1/topics/i18n/
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'UTC'
USE_I18N = True
USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/5.1/howto/static-files/
STATIC_URL = 'static/'
# Default primary key field type
# https://docs.djangoproject.com/en/5.1/ref/settings/#default-auto-field
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
import sys
import os
# <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> UTF-8 <20><> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
os.environ["PYTHONIOENCODING"] = "utf-8"
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
REST_FRAMEWORK = {
'DEFAULT_RENDERER_CLASSES': (
'rest_framework.renderers.JSONRenderer', # <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD> JSON
),
}
CORS_ALLOW_HEADERS = ['Content-Type'] # <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
CORS_ALLOW_ALL_ORIGINS = True # <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> <20><> <20><><EFBFBD><EFBFBD> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
# <20><><EFBFBD> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD> localhost:3000
CORS_ALLOWED_ORIGINS = [
"http://localhost:3000",
"http://127.0.0.1:3000"
]
from datetime import timedelta
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': (
'rest_framework_simplejwt.authentication.JWTAuthentication',
),
}
SIMPLE_JWT = {
'ACCESS_TOKEN_LIFETIME': timedelta(days=1), # Токен живет 1 день
'REFRESH_TOKEN_LIFETIME': timedelta(days=7), # Refresh-токен живет 7 дней
'AUTH_HEADER_TYPES': ('Bearer',), # Токен передается через заголовок Authorization
}
ASGI_APPLICATION = "your_project.asgi.application"
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels.layers.InMemoryChannelLayer",
"CONFIG": {
"capacity": 1000, # Увеличь лимит соединений
},
},
}

33
barcode/urls.py Normal file
View File

@ -0,0 +1,33 @@
"""
URL configuration for barcode project.
The `urlpatterns` list routes URLs to views. For more information please see:
https://docs.djangoproject.com/en/5.1/topics/http/urls/
Examples:
Function views
1. Add an import: from my_app import views
2. Add a URL to urlpatterns: path('', views.home, name='home')
Class-based views
1. Add an import: from other_app.views import Home
2. Add a URL to urlpatterns: path('', Home.as_view(), name='home')
Including another URLconf
1. Import the include() function: from django.urls import include, path
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
"""
# from django.contrib import admin
# from django.urls import path
#
# urlpatterns = [
# path('admin/', admin.site.urls),
# ]
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path("admin/", admin.site.urls),
path("", include("scan.urls")),
path("bad/", include("batches.urls")),
path("inventory/", include("inventory.urls")),
path("forms/", include("forms.urls")),
path("api/users/", include("users.urls")),
]

16
barcode/wsgi.py Normal file
View File

@ -0,0 +1,16 @@
"""
WSGI config for barcode project.
It exposes the WSGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/5.1/howto/deployment/wsgi/
"""
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'barcode.settings')
application = get_wsgi_application()

0
batches/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

3
batches/admin.py Normal file
View File

@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

6
batches/apps.py Normal file
View File

@ -0,0 +1,6 @@
from django.apps import AppConfig
class BatchesConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'batches'

View File

3
batches/models.py Normal file
View File

@ -0,0 +1,3 @@
from django.db import models
# Create your models here.

3
batches/tests.py Normal file
View File

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

14
batches/urls.py Normal file
View File

@ -0,0 +1,14 @@
from django.urls import path
# from .views import ScanAPIView, ScanAspuAPIView, GetManagementAPIView
from .views import BatchListView, BatchListGTIN, BatchListReport, BatchReportView
urlpatterns = [
# path('api/scan/', ScanAPIView.as_view(), name='scan'),
# path('api/scan_aspu/', ScanAspuAPIView.as_view(), name='scan_aspu'),
path('batches/', BatchListView.as_view(), name='batch-list'),
path('gtin/', BatchListGTIN.as_view(), name='batch-gtin'),
path('reports/', BatchListReport.as_view(), name='batch-gtin'),
path('reports-view/', BatchReportView.as_view(), name='batch-gtin')
]

193
batches/views.py Normal file
View File

@ -0,0 +1,193 @@
import requests
from django.http import JsonResponse
from django.views import View
from django.conf import settings
import asyncio
import aiohttp
import json
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
# http://192.168.254.2/szkm/api/batch/load/eb7bb39a-fd68-47c7-aec8-0041a4fe5faf/
# Настройки API
API_BASE_URL = "http://192.168.254.2:8280/api"
API_REPORTS_URL = f"{API_BASE_URL}/productlinereport/list"
LOGIN_URL = f"{API_BASE_URL}/login"
BATCH_LIST_URL = f"{API_BASE_URL}/Batch/list"
# Данные для входа
USERNAME = "user"
PASSWORD = "user"
def get_auth_token():
"""Авторизуется и получает Bearer токен"""
response = requests.post(LOGIN_URL, json={"username": USERNAME, "password": PASSWORD})
if response.status_code == 200:
data = response.json()
token = data.get("Value", {}).get("Token") # Исправленный способ получения токена
if token:
return token
return None
class BatchListView(View):
"""Получает список партий с фильтрацией по CreatedOn"""
def get(self, request):
token = get_auth_token()
if not token:
return JsonResponse({"error": "Не удалось получить токен"}, status=401)
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
payload = {
"Filter": {
"Filters": [
{"Value": "2024-01-1 00:00:00", "Operator": "gte", "Field": "CreatedOn"},
{"Value": "2025.11.22", "Operator": "lte", "Field": "CreatedOn"}
],
"Logic": "and"
}
}
response = requests.post(BATCH_LIST_URL, json=payload, headers=headers)
if response.status_code == 200:
return JsonResponse(response.json(), safe=False)
return JsonResponse({"error": "Ошибка при запросе данных"}, status=response.status_code)
class BatchListGTIN(View):
"""Получает список партий с фильтрацией по CreatedOn"""
def get(self, request):
token = get_auth_token()
if not token:
return JsonResponse({"error": "Не удалось получить токен"}, status=401)
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
payload = {
"Filter": {
"Filters": [
{
"Filters": [
{
"Value": "04607017161371",
"Operator": "eq",
"Field": "ProductType.GTIN"
},
{
"Value": "04607017161371",
"Operator": "linq",
"Field": "CodesForProductLines.Any( a=> a.ProductType.GTIN=='{0}' )"
}
],
"Logic": "or"
}
],
"Logic": "and"
},
"Includes": [
"ProductType",
"CodesForProductLines.ProductType"
]
}
response = requests.post(BATCH_LIST_URL, json=payload, headers=headers)
if response.status_code == 200:
return JsonResponse(response.json(), safe=False)
return JsonResponse({"error": "Ошибка при запросе данных"}, status=response.status_code)
@method_decorator(csrf_exempt, name='dispatch')
class BatchListReport(View):
"""Получает список партий с фильтрацией по CreatedOn и BatchId из тела запроса"""
def post(self, request):
token = get_auth_token()
if not token:
return JsonResponse({"error": "Не удалось получить токен"}, status=401)
try:
body = json.loads(request.body)
batch_id = body.get("id")
except json.JSONDecodeError:
return JsonResponse({"error": "Неверный формат JSON"}, status=400)
if not batch_id:
return JsonResponse({"error": "Отсутствует параметр 'id'"}, status=400)
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
payload = {"skip":0,"take":10,"includes":["ProductLineBatch","ProductLineBatch.ProductType","ProductLine"],"sort":[{"field":"CreatedOn","dir":"desc"}],"filter":{"filters":[{"operator":"eq","field":"ProductLineBatchId","value":batch_id}],"logic":"and"}}
response = requests.post(API_REPORTS_URL, json=payload, headers=headers)
if response.status_code == 200:
return JsonResponse(response.json(), safe=False)
return JsonResponse({"error": "Ошибка при запросе данных"}, status=response.status_code)
class BatchReportView(View):
"""Получает список партий и прикрепляет к ним отчёты асинхронно."""
async def fetch_reports(self, session, headers, batch_id):
"""Асинхронно получает список отчётов для заданной партии."""
if not batch_id:
return []
report_payload = {
"Skip": 0,
"Take": 100000,
"Filter": {
"Field": "BatchId",
"Operator": "eq",
"Value": batch_id
},
"TotalCount": 0,
"Sort": [{"Field": "CreatedOn", "Dir": "desc"}]
}
async with session.post(API_REPORTS_URL, json=report_payload, headers=headers) as response:
if response.status == 200:
data = await response.json()
return data.get("Value", {}).get("Items", [])
return []
async def fetch_all_reports(self, batches, headers):
"""Запрашивает отчёты для всех партий параллельно."""
async with aiohttp.ClientSession() as session:
tasks = [self.fetch_reports(session, headers, batch.get("ProductTypeId")) for batch in batches]
reports = await asyncio.gather(*tasks)
# Присваиваем отчёты соответствующим партиям
for batch, report in zip(batches, reports):
batch["Reports"] = report
def get(self, request):
"""Основной метод обработки GET-запроса."""
token = get_auth_token()
if not token:
return JsonResponse({"error": "Не удалось получить токен"}, status=401)
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
# Запрос списка партий
batch_payload = {
"Filter": {
"Filters": [
{"Value": "2025-01-1 00:00:00", "Operator": "gte", "Field": "CreatedOn"},
{"Value": "2025.11.22", "Operator": "lte", "Field": "CreatedOn"}
],
"Logic": "and"
}
}
batch_response = requests.post(BATCH_LIST_URL, json=batch_payload, headers=headers)
if batch_response.status_code != 200:
return JsonResponse({"error": "Ошибка при запросе списка партий"}, status=batch_response.status_code)
batch_data = batch_response.json()
batches = batch_data.get("Value", {}).get("Items", [])
print(batches)
# Асинхронно получаем отчёты для всех партий
asyncio.run(self.fetch_all_reports(batches, headers))
print(fetch_all_reports)
return JsonResponse({"Batches": batches}, safe=False)

BIN
db.sqlite3 Normal file

Binary file not shown.

32
docker-compose.yml Normal file
View File

@ -0,0 +1,32 @@
version: '3.8'
services:
django:
build: .
container_name: django_app
restart: always
ports:
- "8000:8000"
volumes:
- .:/app
depends_on:
- db
environment:
- DEBUG=True
- DATABASE_URL=postgres://myuser:mypassword@db:5432/mydatabase
db:
image: postgres:15
container_name: postgres_db
restart: always
environment:
POSTGRES_USER: myuser
POSTGRES_PASSWORD: mypassword
POSTGRES_DB: mydatabase
volumes:
- pgdata:/var/lib/postgresql/data
ports:
- "5432:5432"
volumes:
pgdata:

0
forms/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

3
forms/admin.py Normal file
View File

@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

6
forms/apps.py Normal file
View File

@ -0,0 +1,6 @@
from django.apps import AppConfig
class FormsConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'forms'

16
forms/forms.py Normal file
View File

@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
from django import forms
class Step1Form(forms.Form):
линия = forms.CharField(label="Линия", max_length=100)
проблема = forms.CharField(label="Проблема", widget=forms.Textarea)
class Step2Form(forms.Form):
время_начала = forms.DateTimeField(label="Время начала проблемы", widget=forms.DateTimeInput(attrs={'type': 'datetime-local'}))
время_решения = forms.DateTimeField(label="Время решения проблемы", widget=forms.DateTimeInput(attrs={'type': 'datetime-local'}))
class Step3Form(forms.Form):
причина = forms.CharField(label="Причина простоя", widget=forms.Textarea)
устранение = forms.CharField(label="Способ устранения проблемы", widget=forms.Textarea)

View File

@ -0,0 +1,30 @@
# Generated by Django 5.1.5 on 2025-03-06 04:31
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='FormResponse',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('factory', models.TextField()),
('line', models.CharField(max_length=100)),
('operators_name', models.TextField()),
('problem', models.TextField()),
('error_zone', models.TextField()),
('occurred_at', models.DateTimeField()),
('resolved_at', models.DateTimeField(blank=True, null=True)),
('downtime_reason', models.TextField()),
('fix_method', models.TextField()),
('created_at', models.DateTimeField(auto_now_add=True)),
],
),
]

View File

Binary file not shown.

17
forms/models.py Normal file
View File

@ -0,0 +1,17 @@
from django.db import models
class FormResponse(models.Model):
factory = models.TextField()
line = models.CharField(max_length=100) # <20><><EFBFBD><EFBFBD><EFBFBD>
operators_name = models.TextField() # <20><><EFBFBD> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
problem = models.TextField() # <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
error_zone = models.TextField() # <20><><EFBFBD><EFBFBD> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
occurred_at = models.DateTimeField() # <20><><EFBFBD><EFBFBD><EFBFBD> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
resolved_at = models.DateTimeField(null=True, blank=True) # <20><><EFBFBD><EFBFBD><EFBFBD> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
downtime_reason = models.TextField() # <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
fix_method = models.TextField() # <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
created_at = models.DateTimeField(auto_now_add=True) # <20><><EFBFBD><EFBFBD><EFBFBD> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
def __str__(self):
return f"{self.line} - {self.problem} ({self.occurred_at})"

9
forms/serializers.py Normal file
View File

@ -0,0 +1,9 @@
from rest_framework import serializers
from .models import FormResponse
class FormResponseSerializer(serializers.ModelSerializer):
class Meta:
model = FormResponse
fields = '__all__'

3
forms/tests.py Normal file
View File

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

8
forms/urls.py Normal file
View File

@ -0,0 +1,8 @@
from django.urls import path
from .views import FormResponseCreateView, FormResponseListView, FormResponseUploadView
urlpatterns = [
path('submit/', FormResponseCreateView.as_view(), name='submit_response'), # POST-запрос
path('submit/uploads/', FormResponseUploadView.as_view(), name='form-upload-report'),
path('responses/', FormResponseListView.as_view(), name='list_responses'), # GET-запрос
]

90
forms/views.py Normal file
View File

@ -0,0 +1,90 @@
from .serializers import FormResponseSerializer
import pandas as pd
from rest_framework import generics, status
from rest_framework.response import Response
from rest_framework.parsers import MultiPartParser, FormParser
from rest_framework.views import APIView
from .models import FormResponse
import logging
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
import pandas as pd
from .models import FormResponse # Убедись, что модель импортирована
from .serializers import FormResponseSerializer # Импортируем сериализатор
from django.core.files.storage import default_storage
from django.core.files.base import ContentFile
from datetime import datetime
import json
# API для сохранения ответов
class FormResponseCreateView(generics.CreateAPIView):
queryset = FormResponse.objects.all()
serializer_class = FormResponseSerializer
# API для получения списка ответов
class FormResponseListView(generics.ListAPIView):
queryset = FormResponse.objects.all()
serializer_class = FormResponseSerializer
# API для загрузки Excel-файла и сохранения в БД
logger = logging.getLogger(__name__) # Логирование ошибок
class FormResponseUploadView(APIView):
def post(self, request, *args, **kwargs):
print("\n===== [DEBUG] Данные, полученные на сервере =====")
print(json.dumps(request.data, indent=4, ensure_ascii=False)) # Логируем JSON
if "data" not in request.data:
return Response({"error": "Данные не найдены в запросе"}, status=status.HTTP_400_BAD_REQUEST)
uploaded_data = request.data["data"]
responses = []
errors = []
for item in uploaded_data:
# Обработка значений по умолчанию
operators_name = item.get("operators_name", "Анонимный пользователь") or "Анонимный пользователь"
error_zone = item.get("error_zone", "Не указано") or "Не указано"
downtime_reason = item.get("downtime_reason", "Не указано") or "Не указано"
fix_method = item.get("fix_method", "Не указано") or "Не указано"
# Функция для парсинга дат
def parse_date(date_str, default_now=False):
if not date_str:
return datetime.utcnow().isoformat() if default_now else None # Подставляем текущую дату если default_now=True
try:
return datetime.fromisoformat(date_str.replace("Z", "+00:00")).isoformat()
except ValueError:
errors.append({"occurred_at": f"Неверный формат даты: {date_str}"})
return None
occurred_at = parse_date(item.get("occurred_at"), default_now=True) # Если нет даты — ставим текущее время
resolved_at = parse_date(item.get("resolved_at")) # Если нет, оставляем None
response_data = {
"line": item.get("line", "Неизвестная линия"),
"operators_name": operators_name,
"problem": item.get("problem", "Не указано"),
"error_zone": error_zone,
"occurred_at": occurred_at,
"resolved_at": resolved_at,
"downtime_reason": downtime_reason,
"fix_method": fix_method,
}
serializer = FormResponseSerializer(data=response_data)
if serializer.is_valid():
responses.append(serializer.save())
else:
errors.append(serializer.errors)
if errors:
print("\n===== [DEBUG] Ошибки при сохранении данных =====")
print(json.dumps(errors, indent=4, ensure_ascii=False)) # Логируем ошибки
return Response({"error": "Некоторые записи не были сохранены", "details": errors}, status=status.HTTP_400_BAD_REQUEST)
return Response({"message": "Данные успешно загружены!", "saved": len(responses)}, status=status.HTTP_201_CREATED)

0
inventory/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

3
inventory/admin.py Normal file
View File

@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

6
inventory/apps.py Normal file
View File

@ -0,0 +1,6 @@
from django.apps import AppConfig
class InventoryConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'inventory'

View File

@ -0,0 +1,47 @@
# Generated by Django 5.1.5 on 2025-03-06 04:31
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='Nomenclature',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(max_length=255)),
('gtin', models.CharField(max_length=14, unique=True)),
('unit', models.CharField(default='шт', max_length=50)),
],
),
migrations.CreateModel(
name='Sticker',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('quantity', models.IntegerField()),
('emission_date', models.DateField()),
('last_revision_date', models.DateField(blank=True, null=True)),
('location', models.CharField(max_length=255)),
('status', models.CharField(default='в наличии', max_length=255)),
('nomenclature', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='inventory.nomenclature')),
],
),
migrations.CreateModel(
name='StickerMovement',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('date', models.DateTimeField(auto_now_add=True)),
('from_location', models.CharField(max_length=50)),
('to_location', models.CharField(max_length=50)),
('quantity', models.IntegerField()),
('sticker', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='inventory.sticker')),
],
),
]

View File

59
inventory/models.py Normal file
View File

@ -0,0 +1,59 @@
from django.db import models
from datetime import timedelta, date
class Nomenclature(models.Model):
name = models.CharField(max_length=255)
gtin = models.CharField(max_length=14, unique=True)
unit = models.CharField(max_length=50, default="шт")
def __str__(self):
return self.name
class Sticker(models.Model):
nomenclature = models.ForeignKey("Nomenclature", on_delete=models.CASCADE)
quantity = models.IntegerField()
emission_date = models.DateField()
last_revision_date = models.DateField(null=True, blank=True)
location = models.CharField(max_length=255)
status = models.CharField(max_length=255, default="в наличии")
def is_expired(self):
"""Проверяем, истёк ли срок годности (1 год)"""
if not self.emission_date:
return False
expiration_date = self.emission_date + timedelta(days=365) # 12 месяцев
return date.today() > expiration_date
def save(self, *args, **kwargs):
"""При сохранении автоматически обновляем статус"""
if self.is_expired():
self.status = "просрочены"
elif self.is_expiring_soon():
self.status = "скоро истекает"
else:
self.status = "в наличии"
super().save(*args, **kwargs)
def is_expiring_soon(self):
"""Срок годности истекает через 1 месяц"""
if not self.emission_date:
return False
expiration_date = self.emission_date + timedelta(days=365)
return expiration_date - timedelta(days=30) <= date.today() < expiration_date
class StickerMovement(models.Model):
sticker = models.ForeignKey(Sticker, on_delete=models.CASCADE)
date = models.DateTimeField(auto_now_add=True)
from_location = models.CharField(max_length=50)
to_location = models.CharField(max_length=50)
quantity = models.IntegerField()
def save(self, *args, **kwargs):
if self.from_location == "склад" and self.to_location == "цех":
self.sticker.location = "цех"
elif self.to_location == "склад":
self.sticker.location = "склад"
self.sticker.save()
super().save(*args, **kwargs)

34
inventory/serializers.py Normal file
View File

@ -0,0 +1,34 @@
from rest_framework import serializers
from .models import Nomenclature, Sticker, StickerMovement
from datetime import date
class NomenclatureSerializer(serializers.ModelSerializer):
class Meta:
model = Nomenclature
fields = '__all__'
class StickerSerializer(serializers.ModelSerializer):
is_expired = serializers.SerializerMethodField()
is_expiring_soon = serializers.SerializerMethodField()
nomenclature_name = serializers.CharField(source="nomenclature.name", read_only=True)
nomenclature_gtin = serializers.CharField(source="nomenclature.gtin", read_only=True)
class Meta:
model = Sticker
fields = '__all__'
def get_is_expired(self, obj):
return obj.is_expired()
def get_is_expiring_soon(self, obj):
return obj.is_expiring_soon()
def to_representation(self, instance):
"""Обновляем статус при каждом запросе"""
instance.save()
return super().to_representation(instance)
class StickerMovementSerializer(serializers.ModelSerializer):
class Meta:
model = StickerMovement
fields = '__all__'

3
inventory/tests.py Normal file
View File

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

13
inventory/urls.py Normal file
View File

@ -0,0 +1,13 @@
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from .views import NomenclatureViewSet, StickerViewSet, StickerMovementViewSet, ExternalNomenclatureView
router = DefaultRouter()
router.register(r'nomenclature', NomenclatureViewSet)
router.register(r'stickers', StickerViewSet)
router.register(r'movement', StickerMovementViewSet)
urlpatterns = [
path('api/', include(router.urls)),
path('api/external-nomenclature/', ExternalNomenclatureView.as_view(), name="external-nomenclature"),
]

122
inventory/views.py Normal file
View File

@ -0,0 +1,122 @@
import time
import logging
import requests
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status, viewsets
from .models import Nomenclature, Sticker, StickerMovement
from .serializers import NomenclatureSerializer, StickerSerializer, StickerMovementSerializer
# Авторизационные данные
USERNAME = "superuser"
PASSWORD = "Superuser1105"
TOKEN_URL = "http://192.168.254.2:8280/api/login" # URL для получения токена
PRODUCT_LIST_URL = "http://192.168.254.2:8280/api/ProductType/list"
# Глобальный кэш для хранения токена
token_cache = {"token": None, "timestamp": 0}
TOKEN_LIFETIME = 1440 * 60 # 24 часа в секундах
def get_new_token():
"""Получение нового токена с кешированием."""
global token_cache
current_time = time.time()
# Если токен есть и он еще не истек, возвращаем его
if token_cache["token"] and (current_time - token_cache["timestamp"] < TOKEN_LIFETIME):
return token_cache["token"]
payload = {"UserName": USERNAME, "Password": PASSWORD}
try:
response = requests.post(TOKEN_URL, json=payload, timeout=5)
response_data = response.json()
if response.status_code == 200 and response_data.get("IsSuccess"):
token_cache["token"] = response_data["Value"]["Token"]
token_cache["timestamp"] = current_time
return token_cache["token"]
logging.error(f"Ошибка получения токена: {response_data}")
return None
except requests.RequestException as e:
logging.error(f"Ошибка сети при получении токена: {str(e)}")
return None
class ExternalNomenclatureView(APIView):
"""Получение списка номенклатур с внешнего API."""
def post(self, request):
product_line_id = request.data.get("product_line_id", "40ba525b-1686-4900-a2a9-443436812b1d")
token = get_new_token()
if not token:
return Response({"error": "Ошибка авторизации"}, status=status.HTTP_401_UNAUTHORIZED)
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
payload = {
"Filter": {
"Filters": [
{
"Logic": "and",
"Operator": "linq",
"Field": "ProductType_ProductLines.Where(i=>i.ProductLineId=='{0}').Any()",
"Value": product_line_id,
}
],
"Logic": "and",
},
"Includes": ["ProductType_ProductLines"],
}
try:
response = requests.post(PRODUCT_LIST_URL, json=payload, headers=headers, timeout=10)
response.raise_for_status()
return Response(response.json(), status=status.HTTP_200_OK)
except requests.RequestException as e:
logging.error(f"Ошибка запроса {PRODUCT_LIST_URL}: {str(e)}")
return Response({"error": "Ошибка запроса к API"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# Основные API ViewSet'ы
class NomenclatureViewSet(viewsets.ModelViewSet):
queryset = Nomenclature.objects.all()
serializer_class = NomenclatureSerializer
class StickerViewSet(viewsets.ModelViewSet):
queryset = Sticker.objects.all().select_related('nomenclature')
serializer_class = StickerSerializer
def create(self, request, *args, **kwargs):
data = request.data
gtin = data.get("nomenclature_gtin") # Берем GTIN из запроса
if not gtin:
return Response({"error": "GTIN обязателен"}, status=status.HTTP_400_BAD_REQUEST)
# Проверяем, есть ли номенклатура с таким GTIN
nomenclature, created = Nomenclature.objects.get_or_create(
gtin=gtin,
defaults={"name": data.get("nomenclature_name", "Неизвестный продукт")}
)
# Обновляем данные запроса, чтобы стикер использовал ID найденной/созданной номенклатуры
data["nomenclature"] = nomenclature.id
serializer = self.get_serializer(data=data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def destroy(self, request, *args, **kwargs):
instance = self.get_object()
self.perform_destroy(instance)
return Response({"message": "Стикер успешно удалён"}, status=status.HTTP_204_NO_CONTENT)
class StickerMovementViewSet(viewsets.ModelViewSet):
queryset = StickerMovement.objects.all()
serializer_class = StickerMovementSerializer

22
manage.py Normal file
View File

@ -0,0 +1,22 @@
#!/usr/bin/env python
"""Django's command-line utility for administrative tasks."""
import os
import sys
def main():
"""Run administrative tasks."""
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'barcode.settings')
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
raise ImportError(
"Couldn't import Django. Are you sure it's installed and "
"available on your PYTHONPATH environment variable? Did you "
"forget to activate a virtual environment?"
) from exc
execute_from_command_line(sys.argv)
if __name__ == '__main__':
main()

BIN
requirements.txt Normal file

Binary file not shown.

0
scan/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

3
scan/admin.py Normal file
View File

@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

53
scan/api_client.py Normal file
View File

@ -0,0 +1,53 @@
# api_client.py
import requests
import logging
from rest_framework.response import Response
from .constants import API_URLS, CREDENTIALS
logger = logging.getLogger(__name__)
class APIClient:
@staticmethod
def get_token(url):
"""Получение токена авторизации"""
payload = {
"UserName": CREDENTIALS["USERNAME"],
"Password": CREDENTIALS["PASSWORD"]
}
try:
response = requests.post(url, json=payload)
response.raise_for_status()
data = response.json()
return data["Value"]["Token"] if data.get("IsSuccess") else None
except Exception as e:
logger.error(f"Token request error: {str(e)}")
return None
@staticmethod
def make_request(url, token, payload=None, timeout=10):
"""Выполнение API-запроса"""
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
try:
response = requests.post(
url,
json=payload or {},
headers=headers,
timeout=timeout
)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"API request error: {str(e)}")
return None
@staticmethod
def create_response(status, message=None, data=None, status_code=200):
"""Формирование стандартного ответа API"""
return Response({
"status": status,
"message": message,
"data": data
}, status=status_code)

6
scan/apps.py Normal file
View File

@ -0,0 +1,6 @@
from django.apps import AppConfig
class ScanConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'scan'

11
scan/celery_app.py Normal file
View File

@ -0,0 +1,11 @@
import os
from celery import Celery
# Указываем путь к настройкам Django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "barcode.settings")
app = Celery("barcode")
# Загружаем настройки Celery из Django
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

19
scan/constants.py Normal file
View File

@ -0,0 +1,19 @@
# constants.py
API_URLS = {
"LOGIN": "http://192.168.254.2:8280/api/login",
"LOGIN_ASPU": "http://192.168.254.10:3428/api/login",
"STORAGE_INFO": "http://192.168.254.2:8180/api/Storage/getInfo",
"ASPU_CODE_INFO": "http://192.168.254.10:3428/api/Codes/getInfo",
"MANAGEMENT_ENDPOINTS": {
"Sipa": "http://192.168.254.10:3428/api/Management/",
"Devin": "http://192.168.254.20:3428/api/Management/",
"JR": "http://192.168.254.30:3428/api/Management/",
"5L": "http://192.168.254.40:3428/api/Management/",
"19L": "http://192.168.254.50:3428/api/Management/"
}
}
CREDENTIALS = {
"USERNAME": "user",
"PASSWORD": "user"
}

View File

@ -0,0 +1,32 @@
# Generated by Django 5.1.5 on 2025-03-06 04:31
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='MessagesAlert',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('type_message', models.CharField(max_length=255, unique=True)),
('product_line', models.CharField(max_length=255, unique=True)),
('code_lexer', models.IntegerField()),
('time', models.DateTimeField(auto_now_add=True)),
],
),
migrations.CreateModel(
name='ScannedCode',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('code', models.CharField(max_length=255, unique=True)),
('scanned_at', models.DateTimeField(auto_now_add=True)),
],
),
]

View File

Binary file not shown.

15
scan/models.py Normal file
View File

@ -0,0 +1,15 @@
from django.db import models
class ScannedCode(models.Model):
code = models.CharField(max_length=255, unique=True)
scanned_at = models.DateTimeField(auto_now_add=True)
def __str__(self):
return self.code
class MessagesAlert(models.Model):
type_message = models.CharField(max_length=255, unique=True)
product_line = models.CharField(max_length=255, unique=True)
code_lexer = models.IntegerField()
time = models.DateTimeField(auto_now_add=True)

7
scan/serializers.py Normal file
View File

@ -0,0 +1,7 @@
from rest_framework import serializers
from .models import ScannedCode
class ScannedCodeSerializer(serializers.ModelSerializer):
class Meta:
model = ScannedCode
fields = "__all__"

13
scan/tasks.py Normal file
View File

@ -0,0 +1,13 @@
from celery import shared_task
import requests
@shared_task
def fetch_api_data(url, token):
"""Запрос к API в фоновом режиме"""
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
try:
response = requests.post(url, headers=headers, timeout=10)
if response.ok:
return response.json()
except requests.RequestException:
return None

3
scan/tests.py Normal file
View File

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

10
scan/urls.py Normal file
View File

@ -0,0 +1,10 @@
from django.urls import path
# from .views import ScanAPIView, ScanAspuAPIView, GetManagementAPIView
from .views import GetManagementAPIView
urlpatterns = [
# path('api/scan/', ScanAPIView.as_view(), name='scan'),
# path('api/scan_aspu/', ScanAspuAPIView.as_view(), name='scan_aspu'),
path('api/get_management/', GetManagementAPIView.as_view(), name='get_management'),
]

83
scan/utils.py Normal file
View File

@ -0,0 +1,83 @@
import requests
import logging
from rest_framework.response import Response
from .constants import *
def get_new_token(url):
"""Function to get a new token."""
payload = {"UserName": USERNAME, "Password": PASSWORD}
try:
response = requests.post(url, json=payload)
response_data = response.json()
if response.status_code == 200 and response_data.get("IsSuccess"):
return response_data["Value"]["Token"]
else:
return None
except requests.RequestException as e:
logging.error(f"Network error while getting token: {str(e)}")
return None
def request_to_api(url, token, payload=None, timeout=10):
"""General function to make API requests."""
if payload is None:
payload = {}
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
try:
response = requests.post(url, json=payload, headers=headers, timeout=timeout)
if response.ok:
return response.json()
else:
logging.error(f"Error from API ({url}): {response.status_code} - {response.text}")
return None
except requests.Timeout:
logging.error(f"Timeout while requesting {url}")
return None
except requests.RequestException as e:
logging.error(f"Request error to {url}: {str(e)}")
return None
def create_response(status, message=None, data=None, status_code=200):
"""Function to create consistent API responses."""
return Response({
"status": status,
"message": message,
"data": data
}, status=status_code)

215
scan/views.py Normal file
View File

@ -0,0 +1,215 @@
import requests
import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from rest_framework.response import Response
from rest_framework.views import APIView
from collections import defaultdict
from .models import ScannedCode
from .serializers import ScannedCodeSerializer
# Константы API URL и креденшелы
LOGIN_URL_ASPU = "http://192.168.254.10:3428/api/login"
API_URL_ASPU_MANAGEMENT = "http://192.168.254.10:3428/api/Management/"
API_URL_MANAGEMENT_DEVIN = "http://192.168.254.20:3428/api/Management/"
API_URL_MANAGEMENT_JR = "http://192.168.254.30:3428/api/Management/"
API_URL_MANAGEMENT_5L = "http://192.168.254.40:3428/api/Management/"
API_URL_MANAGEMENT_19L = "http://192.168.254.50:3428/api/Management/"
API_URL_ASPU_MASSAGE_LIST_SIPA = "http://192.168.254.10:3428/api/Messages/"
API_URL_ASPU_MASSAGE_LIST_DEVIN = "http://192.168.254.20:3428/api/Messages/"
API_URL_ASPU_MASSAGE_LIST_JR = "http://192.168.254.30:3428/api/Messages/"
API_URL_ASPU_MASSAGE_LIST_5L = "http://192.168.254.40:3428/api/Messages/"
API_URL_ASPU_MASSAGE_LIST_19L = "http://192.168.254.50:3428/api/Messages/"
USERNAME = "superuser"
PASSWORD = "Superuser1105"
# Глобальные переменные для хранения токена
token_cache = {"token": None, "timestamp": 0}
TOKEN_LIFETIME = 1440 * 60 # 24 часа в секундах
def get_new_token(url):
"""Получение нового токена с кешированием."""
global token_cache
current_time = time.time()
if token_cache["token"] and (current_time - token_cache["timestamp"] < TOKEN_LIFETIME):
return token_cache["token"]
payload = {"UserName": USERNAME, "Password": PASSWORD}
try:
response = requests.post(url, json=payload, timeout=5)
response_data = response.json()
if response.status_code == 200 and response_data.get("IsSuccess"):
token_cache["token"] = response_data["Value"]["Token"]
token_cache["timestamp"] = current_time
return token_cache["token"]
logging.error(f"Ошибка получения токена: {response_data}")
return None
except requests.RequestException as e:
logging.error(f"Ошибка сети при получении токена: {str(e)}")
return None
def request_to_api(url, token, payload=None, timeout=10):
"""Отправка запроса к API."""
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
try:
response = requests.post(url, json=payload or {}, headers=headers, timeout=timeout)
response.raise_for_status()
return response.json()
except requests.Timeout:
logging.error(f"Тайм-аут при запросе {url}")
except requests.RequestException as e:
logging.error(f"Ошибка запроса {url}: {str(e)}")
return None
def extract_product_id(text):
"""Функция извлечения идентификатора продукта из текста."""
import re
match = re.search(r'Product\s*ID:\s*(\d+)', text)
return match.group(1) if match else None
def create_response(status, message=None, data=None, status_code=200):
"""Создание стандартного ответа API."""
return Response({
"status": status,
"message": message,
"data": data
}, status=status_code)
class GetManagementAPIView(APIView):
"""Получение данных управления и сообщений с различных источников."""
def get(self, request):
token = get_new_token(LOGIN_URL_ASPU)
if not token:
return create_response("Error", message="Failed to get token", status_code=500)
management_urls = {
"Sipa": API_URL_ASPU_MANAGEMENT,
"Devin": API_URL_MANAGEMENT_DEVIN,
"JR": API_URL_MANAGEMENT_JR,
"5L": API_URL_MANAGEMENT_5L,
"19L": API_URL_MANAGEMENT_19L
}
message_urls = {
"Sipa": API_URL_ASPU_MASSAGE_LIST_SIPA,
"Devin": API_URL_ASPU_MASSAGE_LIST_DEVIN,
"JR": API_URL_ASPU_MASSAGE_LIST_JR,
"5L": API_URL_ASPU_MASSAGE_LIST_5L,
"19L": API_URL_ASPU_MASSAGE_LIST_19L
}
message_payload = {
"skip": 1,
"take": 50,
"includes": [],
"sort": [{"field": "Time", "dir": "desc"}],
"filter": {"filters": [], "logic": "and"}
}
# Используем defaultdict, чтобы избежать KeyError
all_data = defaultdict(lambda: {
"Name": "",
"ShortName": "",
"BatchName": "",
"Stats": [],
"Sources": [],
"Gtin": "",
"GroupType": "",
"Quantity": 0,
"Messages": []
})
def process_management_response(key, response_data):
"""Обработка данных управления."""
if not response_data or "Value" not in response_data:
logging.error(f"Некорректный ответ от {key}: {response_data}")
return
value_data = response_data["Value"]
batch_name = value_data.get("BatchName", "")
for product in value_data.get("ProductTypes", []):
product_id = product.get("Id")
if not product_id:
continue
all_data[product_id].update({
"Name": product.get("Name", ""),
"ShortName": product.get("ShortName", ""),
"BatchName": batch_name,
"Gtin": product.get("Gtin", ""),
"GroupType": product.get("GroupType", ""),
})
all_data[product_id]["Stats"].extend(product.get("Stats", []))
all_data[product_id]["Sources"].append(key)
if product.get("GroupType") == "Pallet":
all_data[product_id]["Quantity"] = sum(
stat["Value"] for stat in product.get("Stats", []) if stat.get("Type") == "Validated"
)
def process_message_response(key, response_data):
"""Обработка сообщений."""
if not response_data or "Value" not in response_data:
logging.error(f"Некорректный ответ от {key}: {response_data}")
return
for msg in response_data["Value"]:
if "Text" not in msg:
continue
message = {
"Id": msg["Id"],
"Time": msg["Time"],
"Type": msg["Type"],
"Text": msg["Text"]
}
# Добавляем в общий список сообщений
for product_id in all_data:
all_data[product_id]["Messages"].append(message)
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {
executor.submit(request_to_api, url, token, timeout=2): (key, "management")
for key, url in management_urls.items()
}
futures.update({
executor.submit(request_to_api, url, token, payload=message_payload, timeout=2): (key, "messages")
for key, url in message_urls.items()
})
for future in as_completed(futures):
key, request_type = futures[future]
try:
response_data = future.result()
if request_type == "management":
process_management_response(key, response_data)
elif request_type == "messages":
process_message_response(key, response_data)
except Exception as e:
logging.error(f"Ошибка обработки {key}: {str(e)}")
logging.info(f"Отправляем ответ с {len(all_data)} объектами управления")
return create_response("OK", data=dict(all_data)) if all_data else create_response(
"Error", message="No data found", status_code=404
)

3
scan/viewss1/__init__.py Normal file
View File

@ -0,0 +1,3 @@
# views/__init__.py
from .scan_views import StorageScanView, AspuScanView
from .management_views import ManagementDataView

View File

@ -0,0 +1,88 @@
# views/management_views.py
from rest_framework.views import APIView
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
from ..api_client import APIClient
from ..constants import API_URLS
logger = logging.getLogger(__name__)
class ManagementDataView(APIView):
"""Получение управленческих данных"""
client = APIClient()
response_handler = APIClient.create_response
MAX_WORKERS = 5
REQUEST_TIMEOUT = 5
def get(self, request):
token = self.client.get_token(API_URLS["LOGIN_ASPU"])
if not token:
return self.response_handler(
"Error",
"Failed to get token",
status_code=500
)
all_data = {}
with ThreadPoolExecutor(max_workers=self.MAX_WORKERS) as executor:
futures = {
executor.submit(
self.client.make_request,
url,
token,
timeout=self.REQUEST_TIMEOUT
): name
for name, url in API_URLS["MANAGEMENT_ENDPOINTS"].items()
}
for future in as_completed(futures):
name = futures[future]
try:
self.process_response(
name,
future.result(),
all_data
)
except Exception as e:
logger.error(f"Error processing {name}: {str(e)}")
return self.response_handler(
"OK" if all_data else "Error",
data=all_data or None,
status_code=200 if all_data else 404
)
def process_response(self, source_name, response, data_store):
"""Обработка полученных данных"""
if not response or not response.get("Value"):
return
value_data = response["Value"]
batch_name = value_data.get("BatchName")
for product in value_data.get("ProductTypes", []):
product_id = product.get("Id")
if not product_id:
continue
if product_id not in data_store:
data_store[product_id] = {
"Name": product.get("Name"),
"ShortName": product.get("ShortName"),
"BatchName": batch_name,
"Stats": [],
"Sources": [],
"Gtin": product.get("Gtin"),
"GroupType": product.get("GroupType", ""),
"Quantity": 0
}
data_store[product_id]["Stats"].extend(product.get("Stats", []))
data_store[product_id]["Sources"].append(source_name)
if product.get("GroupType") == "Pallet":
data_store[product_id]["Quantity"] = sum(
stat["Value"]
for stat in product.get("Stats", [])
if stat["Type"] == "Validated"
)

Some files were not shown because too many files have changed in this diff Show More