Skip to content

Commit

Permalink
Add pagination functionality to FHIRClient and related unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Lana committed Aug 12, 2024
1 parent 796ffa6 commit 030ca54
Show file tree
Hide file tree
Showing 2 changed files with 296 additions and 0 deletions.
120 changes: 120 additions & 0 deletions fhirclient/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import logging
import urllib
from typing import Optional, Iterable

import requests
from fhirclient.models.bundle import Bundle

from .server import FHIRServer, FHIRUnauthorizedException, FHIRNotFoundException

Expand Down Expand Up @@ -240,3 +245,118 @@ def from_state(self, state):
def save_state (self):
self._save_func(self.state)

# MARK: Pagination
def _fetch_next_page(self, bundle: Bundle) -> Optional[Bundle]:
"""
Fetch the next page of results using the `next` link provided in the bundle.
Args:
bundle (Bundle): The FHIR Bundle containing the `next` link.
Returns:
Optional[Bundle]: The next page of results as a FHIR Bundle, or None if no "next" link is found.
"""
next_link = self._get_next_link(bundle)
if next_link:
sanitized_next_link = self._sanitize_next_link(next_link)
return self._execute_pagination_request(sanitized_next_link)
return None

def _get_next_link(self, bundle: Bundle) -> Optional[str]:
"""
Extract the `next` link from the Bundle's links.
Args:
bundle (Bundle): The FHIR Bundle containing pagination links.
Returns:
Optional[str]: The URL of the next page if available, None otherwise.
"""
if not bundle.link:
return None

for link in bundle.link:
if link.relation == "next":
return link.url
return None

def _sanitize_next_link(self, next_link: str) -> str:
"""
Sanitize the `next` link to ensure it is safe to use.
Args:
next_link (str): The raw `next` link URL.
Returns:
str: The sanitized URL.
Raises:
ValueError: If the URL scheme or domain is invalid.
"""
parsed_url = urllib.parse.urlparse(next_link)

# Validate scheme and netloc (domain)
if parsed_url.scheme not in ["http", "https"]:
raise ValueError("Invalid URL scheme in `next` link.")
if not parsed_url.netloc:
raise ValueError("Invalid URL domain in `next` link.")

# Additional sanitization if necessary, e.g., removing dangerous query parameters
query_params = urllib.parse.parse_qs(parsed_url.query)
sanitized_query = {k: v for k, v in query_params.items()}

# Rebuild the sanitized URL
sanitized_url = urllib.parse.urlunparse(
(
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
urllib.parse.urlencode(sanitized_query, doseq=True),
parsed_url.fragment,
)
)

return sanitized_url

def _execute_pagination_request(self, sanitized_url: str) -> Optional[Bundle]:
"""
Execute the request to retrieve the next page using the sanitized URL.
Args:
sanitized_url (str): The sanitized URL to fetch the next page.
Returns:
Optional[Bundle]: The next page of results as a FHIR Bundle, or None.
Raises:
HTTPError: If the request fails due to network issues or server errors.
"""
try:
# Use requests.get directly to make the HTTP request
response = requests.get(sanitized_url)
response.raise_for_status()
next_bundle_data = response.json()
next_bundle = Bundle(next_bundle_data)

return next_bundle

except requests.exceptions.HTTPError as e:
# Handle specific HTTP errors as needed, possibly including retry logic
raise e

def iter_pages(self, first_bundle: Bundle) -> Iterable[Bundle]:
"""
Iterator that yields each page of results as a FHIR Bundle.
Args:
first_bundle (Bundle): The first Bundle to start pagination.
Yields:
Bundle: Each page of results as a FHIR Bundle.
"""
bundle = first_bundle
while bundle:
yield bundle
bundle = self._fetch_next_page(bundle)

176 changes: 176 additions & 0 deletions tests/client_pagination_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import unittest
from unittest.mock import patch, MagicMock

import requests
from fhirclient.models.bundle import Bundle

from fhirclient.client import FHIRClient


class TestFHIRClientPagination(unittest.TestCase):
def setUp(self) -> None:
state = {
"app_id": "AppID",
"app_secret": "AppSecret",
"scope": "user/*.read",
"redirect": "http://test.invalid/redirect",
"patient_id": "PatientID",
"server": {
"base_uri": "http://test.invalid/",
"auth_type": "none",
"auth": {
"app_id": "AppId",
},
},
"launch_token": "LaunchToken",
"launch_context": {
"encounter": "EncounterID",
"patient": "PatientID",
},
"jwt_token": "JwtToken",
}

# Confirm round trip
self.client = FHIRClient(state=state)

self.bundle = {
"resourceType": "Bundle",
"type": "searchset",
"link": [
{"relation": "self", "url": "http://example.com/fhir/Bundle/1"},
{"relation": "next", "url": "http://example.com/fhir/Bundle/2"},
],
"entry": [
{
"fullUrl": "http://example.com/fhir/Patient/1",
"resource": {
"resourceType": "Patient",
"id": "1",
"name": [{"family": "Doe", "given": ["John"]}],
"gender": "male",
"birthDate": "1980-01-01",
},
},
{
"fullUrl": "http://example.com/fhir/Patient/2",
"resource": {
"resourceType": "Patient",
"id": "2",
"name": [{"family": "Smith", "given": ["Jane"]}],
"gender": "female",
"birthDate": "1990-05-15",
},
},
],
}

def test_get_next_link(self):
next_link = self.client._get_next_link(Bundle(self.bundle))
self.assertEqual(next_link, "http://example.com/fhir/Bundle/2")

def test_get_next_link_no_next(self):
bundle_without_next = {
"resourceType": "Bundle",
"type": "searchset",
"link": [{"relation": "self", "url": "http://example.com/fhir/Bundle/1"}],
}
bundle = Bundle(bundle_without_next)
next_link = self.client._get_next_link(bundle)
self.assertIsNone(next_link)

def test_sanitize_next_link_valid(self):
next_link = "http://example.com/fhir/Bundle/2?page=2&size=10"
sanitized_link = self.client._sanitize_next_link(next_link)
self.assertEqual(sanitized_link, next_link)

def test_sanitize_next_link_invalid_scheme(self):
next_link = "ftp://example.com/fhir/Bundle/2?page=2&size=10"
with self.assertRaises(ValueError):
self.client._sanitize_next_link(next_link)

def test_sanitize_next_link_invalid_domain(self):
next_link = "http:///fhir/Bundle/2?page=2&size=10"
with self.assertRaises(ValueError):
self.client._sanitize_next_link(next_link)

@patch("requests.get")
def test_execute_pagination_request_success(self, mock_get):
mock_response = MagicMock()
# Set up the mock to return a specific JSON payload when its json() method is called
mock_response.json.return_value = self.bundle
mock_response.raise_for_status = MagicMock()
mock_get.return_value = mock_response

next_link = "http://example.com/fhir/Bundle/2"
sanitized_link = self.client._sanitize_next_link(next_link)
result = self.client._execute_pagination_request(sanitized_link)
self.assertIsInstance(result, Bundle)
self.assertIn("entry", result.as_json())
mock_get.assert_called_once_with(sanitized_link)

@patch("requests.get")
def test_execute_pagination_request_http_error(self, mock_get):
mock_get.side_effect = requests.exceptions.HTTPError("HTTP Error")

next_link = "http://example.com/fhir/Bundle/2"
sanitized_link = self.client._sanitize_next_link(next_link)

with self.assertRaises(requests.exceptions.HTTPError):
self.client._execute_pagination_request(sanitized_link)

@patch("requests.get")
def test_execute_pagination_request_returns_last_bundle_if_no_next_link(self, mock_get):
# Mock the response to simulate a Bundle with no next link
mock_response = MagicMock()
mock_response.json.return_value = {
"resourceType": "Bundle",
"type": "searchset",
"link": [{"relation": "self", "url": "http://example.com/fhir/Bundle/1"}],
"entry": [{"resource": {"resourceType": "Patient", "id": "1"}}],
}
mock_response.raise_for_status = MagicMock()
mock_get.return_value = mock_response

sanitized_link = "http://example.com/fhir/Bundle/1"
result = self.client._execute_pagination_request(sanitized_link)

# Check that the result is the Bundle itself, not None
self.assertIsInstance(result, Bundle)
self.assertTrue(hasattr(result, 'entry'))
mock_get.assert_called_once_with(sanitized_link)

@patch("fhirclient.client.FHIRClient._execute_pagination_request")
def test_fetch_next_page(self, mock_execute_request):
mock_execute_request.return_value = Bundle(self.bundle)
result = self.client._fetch_next_page(Bundle(self.bundle))
self.assertIsInstance(result, Bundle)
self.assertTrue(hasattr(result, "entry"))
mock_execute_request.assert_called_once()

def test_fetch_next_page_no_next_link(self):
bundle_without_next = {
"resourceType": "Bundle",
"type": "searchset",
"link": [{"relation": "self", "url": "http://example.com/fhir/Bundle/1"}],
}
bundle = Bundle(bundle_without_next)
result = self.client._fetch_next_page(bundle)
self.assertIsNone(result)

@patch("fhirclient.client.FHIRClient._fetch_next_page")
def test_iter_pages(self, mock_fetch_next_page):
# Set up the mock to return a new bundle, then None to stop iteration
mock_fetch_next_page.side_effect = [Bundle(self.bundle), None]
pages = list(self.client.iter_pages(Bundle(self.bundle)))

# Check that two bundles were returned (the first bundle and the one from mock)
self.assertEqual(len(pages), 2)
self.assertIsInstance(pages[0], Bundle)
self.assertIsInstance(pages[1], Bundle)

# Compare JSON representations instead of object instances
self.assertEqual(pages[0].as_json(), self.bundle)
self.assertEqual(pages[1].as_json(), self.bundle)

# Ensure that _fetch_next_page was called twice
self.assertEqual(mock_fetch_next_page.call_count, 2)

0 comments on commit 030ca54

Please sign in to comment.