224 lines
7.8 KiB
Python
224 lines
7.8 KiB
Python
"""
|
|
Integration Tests for EMRIntegration API
|
|
End-to-end tests for API endpoints
|
|
"""
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
from main import app
|
|
from src.models._model import EMRIntegration
|
|
from src.config.database import SessionLocal
|
|
|
|
client = TestClient(app)
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup_db():
|
|
"""Setup and teardown database for each test"""
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.query(EMRIntegration).delete()
|
|
db.commit()
|
|
db.close()
|
|
|
|
class TestEMRIntegrationAPI:
|
|
def test_create__success(self, setup_db):
|
|
"""Test successful creation of """
|
|
data = {
|
|
"id": True,
|
|
"organization_id": True,
|
|
"emr_system": "testemr_system",
|
|
"emr_version": "testemr_version",
|
|
"integration_type": "testintegration_type",
|
|
"fhir_base_url": "testfhir_base_url",
|
|
"api_endpoint": "testapi_endpoint",
|
|
"auth_type": "testauth_type",
|
|
"client_id": "testclient_id",
|
|
"client_secret_encrypted": True,
|
|
"api_key_encrypted": True,
|
|
"token_url": "testtoken_url",
|
|
"scopes": True,
|
|
"connection_status": "testconnection_status",
|
|
"approval_status": "testapproval_status",
|
|
"approval_date": True,
|
|
"epic_approval_months_estimate": True,
|
|
"data_mappings": True,
|
|
"supported_resources": True,
|
|
"sync_frequency_minutes": True,
|
|
"last_sync_at": True,
|
|
"last_sync_status": "testlast_sync_status",
|
|
"last_error_message": True,
|
|
"retry_count": True,
|
|
"max_retries": True,
|
|
"timeout_seconds": True,
|
|
"rate_limit_per_minute": True,
|
|
"use_mock_data": True,
|
|
"configuration_notes": True,
|
|
"created_by_id": True,
|
|
"": True,
|
|
"": True,
|
|
}
|
|
|
|
response = client.post("/api/v1/s", json=data)
|
|
|
|
assert response.status_code == 201
|
|
assert response.json()["id"] is not None
|
|
for key, value in data.items():
|
|
assert response.json()[key] == value
|
|
|
|
def test_create__validation_error(self):
|
|
"""Test validation error on invalid data"""
|
|
invalid_data = {}
|
|
|
|
response = client.post("/api/v1/s", json=invalid_data)
|
|
|
|
assert response.status_code == 400
|
|
assert "errors" in response.json()
|
|
|
|
def test_get__by_id_success(self, setup_db):
|
|
"""Test successful retrieval by id"""
|
|
# Create a first
|
|
create_data = {
|
|
"id": True,
|
|
"organization_id": True,
|
|
"emr_system": "testemr_system",
|
|
"emr_version": "testemr_version",
|
|
"integration_type": "testintegration_type",
|
|
"fhir_base_url": "testfhir_base_url",
|
|
"api_endpoint": "testapi_endpoint",
|
|
"auth_type": "testauth_type",
|
|
"client_id": "testclient_id",
|
|
"client_secret_encrypted": True,
|
|
"api_key_encrypted": True,
|
|
"token_url": "testtoken_url",
|
|
"scopes": True,
|
|
"connection_status": "testconnection_status",
|
|
"approval_status": "testapproval_status",
|
|
"approval_date": True,
|
|
"epic_approval_months_estimate": True,
|
|
"data_mappings": True,
|
|
"supported_resources": True,
|
|
"sync_frequency_minutes": True,
|
|
"last_sync_at": True,
|
|
"last_sync_status": "testlast_sync_status",
|
|
"last_error_message": True,
|
|
"retry_count": True,
|
|
"max_retries": True,
|
|
"timeout_seconds": True,
|
|
"rate_limit_per_minute": True,
|
|
"use_mock_data": True,
|
|
"configuration_notes": True,
|
|
"created_by_id": True,
|
|
"": True,
|
|
"": True,
|
|
}
|
|
create_response = client.post("/api/v1/s", json=create_data)
|
|
_id = create_response.json()["id"]
|
|
|
|
response = client.get("/api/v1/s/" + str(_id))
|
|
|
|
assert response.status_code == 200
|
|
assert response.json()["id"] == _id
|
|
|
|
def test_get__not_found(self):
|
|
"""Test 404 when not found"""
|
|
response = client.get("/api/v1/s/999")
|
|
|
|
assert response.status_code == 404
|
|
assert "message" in response.json()
|
|
|
|
def test_update__success(self, setup_db):
|
|
"""Test successful update"""
|
|
# Create a first
|
|
create_data = {
|
|
"id": True,
|
|
"organization_id": True,
|
|
"emr_system": "testemr_system",
|
|
"emr_version": "testemr_version",
|
|
"integration_type": "testintegration_type",
|
|
"fhir_base_url": "testfhir_base_url",
|
|
"api_endpoint": "testapi_endpoint",
|
|
"auth_type": "testauth_type",
|
|
"client_id": "testclient_id",
|
|
"client_secret_encrypted": True,
|
|
"api_key_encrypted": True,
|
|
"token_url": "testtoken_url",
|
|
"scopes": True,
|
|
"connection_status": "testconnection_status",
|
|
"approval_status": "testapproval_status",
|
|
"approval_date": True,
|
|
"epic_approval_months_estimate": True,
|
|
"data_mappings": True,
|
|
"supported_resources": True,
|
|
"sync_frequency_minutes": True,
|
|
"last_sync_at": True,
|
|
"last_sync_status": "testlast_sync_status",
|
|
"last_error_message": True,
|
|
"retry_count": True,
|
|
"max_retries": True,
|
|
"timeout_seconds": True,
|
|
"rate_limit_per_minute": True,
|
|
"use_mock_data": True,
|
|
"configuration_notes": True,
|
|
"created_by_id": True,
|
|
"": True,
|
|
"": True,
|
|
}
|
|
create_response = client.post("/api/v1/s", json=create_data)
|
|
_id = create_response.json()["id"]
|
|
|
|
update_data = {"name": "Updated"}
|
|
response = client.put("/api/v1/s/" + str(_id), json=update_data)
|
|
|
|
assert response.status_code == 200
|
|
assert response.json()["name"] == update_data["name"]
|
|
|
|
def test_delete__success(self, setup_db):
|
|
"""Test successful deletion"""
|
|
# Create a first
|
|
create_data = {
|
|
"id": True,
|
|
"organization_id": True,
|
|
"emr_system": "testemr_system",
|
|
"emr_version": "testemr_version",
|
|
"integration_type": "testintegration_type",
|
|
"fhir_base_url": "testfhir_base_url",
|
|
"api_endpoint": "testapi_endpoint",
|
|
"auth_type": "testauth_type",
|
|
"client_id": "testclient_id",
|
|
"client_secret_encrypted": True,
|
|
"api_key_encrypted": True,
|
|
"token_url": "testtoken_url",
|
|
"scopes": True,
|
|
"connection_status": "testconnection_status",
|
|
"approval_status": "testapproval_status",
|
|
"approval_date": True,
|
|
"epic_approval_months_estimate": True,
|
|
"data_mappings": True,
|
|
"supported_resources": True,
|
|
"sync_frequency_minutes": True,
|
|
"last_sync_at": True,
|
|
"last_sync_status": "testlast_sync_status",
|
|
"last_error_message": True,
|
|
"retry_count": True,
|
|
"max_retries": True,
|
|
"timeout_seconds": True,
|
|
"rate_limit_per_minute": True,
|
|
"use_mock_data": True,
|
|
"configuration_notes": True,
|
|
"created_by_id": True,
|
|
"": True,
|
|
"": True,
|
|
}
|
|
create_response = client.post("/api/v1/s", json=create_data)
|
|
_id = create_response.json()["id"]
|
|
|
|
response = client.delete("/api/v1/s/" + str(_id))
|
|
|
|
assert response.status_code == 204
|
|
|
|
# Verify deletion
|
|
get_response = client.get("/api/v1/s/" + str(_id))
|
|
assert get_response.status_code == 404
|
|
|