xray/orthanc/tasks.py
The unwasted guests 8ec69c5d16 back-end 1.0
2025-01-28 21:43:26 +10:00

109 lines
4.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
from urllib3.exceptions import InsecureRequestWarning
from .models import Patient, Series, Instance
# Отключение предупреждений о небезопасном соединении
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
ORTHANC_BASE_URL = "http://192.168.2.60:8042"
ORTHANC_USERNAME = "writehost"
ORTHANC_PASSWORD = "writehost"
# 1. Получение всех пациентов
def fetch_patients():
url = f"{ORTHANC_BASE_URL}/patients"
response = requests.get(url, auth=(ORTHANC_USERNAME, ORTHANC_PASSWORD))
response.raise_for_status()
return response.json()
# 2. Получение информации о конкретном пациенте
def fetch_patient_details(orthanc_id):
url = f"{ORTHANC_BASE_URL}/patients/{orthanc_id}"
response = requests.get(url, auth=(ORTHANC_USERNAME, ORTHANC_PASSWORD))
response.raise_for_status()
return response.json()
# 3. Получение данных об исследовании
def fetch_study_details(study_id):
url = f"{ORTHANC_BASE_URL}/studies/{study_id}"
response = requests.get(url, auth=(ORTHANC_USERNAME, ORTHANC_PASSWORD))
response.raise_for_status()
return response.json()
# 4. Получение данных о серии
def fetch_series_details(series_id):
url = f"{ORTHANC_BASE_URL}/series/{series_id}"
response = requests.get(url, auth=(ORTHANC_USERNAME, ORTHANC_PASSWORD))
response.raise_for_status()
return response.json()
# 5. Получение данных об изображении (инстансе)
def fetch_instance_details(instance_id):
url = f"{ORTHANC_BASE_URL}/instances/{instance_id}"
response = requests.get(url, auth=(ORTHANC_USERNAME, ORTHANC_PASSWORD))
response.raise_for_status()
return response.json()
# Основная синхронизация
def sync_patients():
patient_ids = fetch_patients()
for patient_id in patient_ids:
patient_data = fetch_patient_details(patient_id)
# Синхронизация пациента
patient, created = Patient.objects.update_or_create(
orthanc_id=patient_id,
defaults={
"patient_id": patient_data["MainDicomTags"].get("PatientID"),
"patient_name": patient_data["MainDicomTags"].get("PatientName"),
"patient_sex": patient_data["MainDicomTags"].get("PatientSex"),
"patient_birth_date": patient_data["MainDicomTags"].get("PatientBirthDate"),
"studies": patient_data.get("Studies", []),
"patient_metadata": patient_data.get("Labels", []),
},
)
# Синхронизация исследований (Studies)
for study_id in patient_data.get("Studies", []):
sync_study(study_id, patient)
# Синхронизация исследования
def sync_study(study_id, patient):
study_data = fetch_study_details(study_id)
for series_id in study_data.get("Series", []):
sync_series(series_id, patient)
# Синхронизация серии
def sync_series(series_id, patient):
series_data = fetch_series_details(series_id)
series, created = Series.objects.update_or_create(
orthanc_id=series_id,
defaults={
"parent_study": series_data["ParentStudy"],
"patient": patient,
"main_dicom_tags": series_data.get("MainDicomTags", {}),
"status": series_data.get("Status"),
"is_stable": series_data.get("IsStable", False),
"last_update": series_data.get("LastUpdate"),
},
)
# Синхронизация снимков (Instances)
for instance_id in series_data.get("Instances", []):
sync_instance(instance_id, series)
# Синхронизация снимка
def sync_instance(instance_id, series):
instance_data = fetch_instance_details(instance_id)
Instance.objects.update_or_create(
orthanc_id=instance_id,
defaults={
"parent_series": series,
"file_size": instance_data.get("FileSize"),
"file_uuid": instance_data.get("FileUuid"),
"main_dicom_tags": instance_data.get("MainDicomTags", {}),
"index_in_series": instance_data.get("IndexInSeries"),
},
)