Skip to content

Commit

Permalink
feat(pagination): Refactor FHIRSearch with iterable Bundle, add iter …
Browse files Browse the repository at this point in the history
…to Bundle, move pagination logic to _utils.py, add tests

Simplify url sanitization, adjust testse

Replace redundant code on perform()

Add tests perform-iter and perform_resources_iter via Response

Revert fhir-parser submodule to original commit
  • Loading branch information
LanaNYC committed Nov 4, 2024
1 parent e19a871 commit 3ec3ef3
Show file tree
Hide file tree
Showing 9 changed files with 656 additions and 316 deletions.
111 changes: 111 additions & 0 deletions fhirclient/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import urllib
from typing import Optional

import requests

from typing import TYPE_CHECKING, Iterable

if TYPE_CHECKING:
from fhirclient.server import FHIRServer
from fhirclient.models.bundle import Bundle


# Use forward references to avoid circular imports
def _fetch_next_page(bundle: 'Bundle', server: 'FHIRServer') -> Optional['Bundle']:
"""
Fetch the next page of results using the `next` link provided in the bundle.
Args:
bundle (Bundle): The FHIR Bundle containing the `next` link.
server (FHIRServer): The FHIR server instance for handling requests and authentication.
Returns:
Optional[Bundle]: The next page of results as a FHIR Bundle, or None if no "next" link is found.
"""
if next_link := _get_next_link(bundle):
return _execute_pagination_request(next_link, server)
return None


def _get_next_link(bundle: 'Bundle') -> Optional[str]:
"""
Extract the `next` link from the Bundle's links.
Args:
bundle (Bundle): The FHIR Bundle containing pagination links.
Returns:
Optional[str]: The URL of the next page if available, None otherwise.
"""
if not bundle.link:
return None

for link in bundle.link:
if link.relation == "next":
return _sanitize_next_link(link.url)
return None


def _sanitize_next_link(next_link: str) -> str:
"""
Sanitize the `next` link by validating its scheme and hostname against the origin server.
This function ensures the `next` link URL uses a valid scheme (`http` or `https`) and that it contains a
hostname. This provides a basic safeguard against malformed URLs without overly restricting flexibility.
Args:
next_link (str): The raw `next` link URL.
Returns:
str: The validated URL.
Raises:
ValueError: If the URL's scheme is not `http` or `https`, or if the hostname does not match the origin server.
"""

parsed_url = urllib.parse.urlparse(next_link)

# Validate scheme and netloc (domain)
if parsed_url.scheme not in ["http", "https"]:
raise ValueError("Invalid URL scheme in `next` link.")
if not parsed_url.netloc:
raise ValueError("Invalid URL domain in `next` link.")

return next_link


def _execute_pagination_request(sanitized_url: str, server: 'FHIRServer') -> 'Bundle':
"""
Execute the request to retrieve the next page using the sanitized URL via Bundle.read_from.
Args:
sanitized_url (str): The sanitized URL to fetch the next page.
server (FHIRServer): The FHIR server instance to perform the request.
Returns:
Bundle: The next page of results as a FHIR Bundle.
Raises:
HTTPError: If the request fails due to network issues or server errors.
"""
from fhirclient.models.bundle import Bundle
return Bundle.read_from(sanitized_url, server)


def iter_pages(first_bundle: 'Bundle', server: 'FHIRServer') -> Iterable['Bundle']:
"""
Iterator that yields each page of results as a FHIR Bundle.
Args:
first_bundle (Optional[Bundle]): The first Bundle to start pagination.
server (FHIRServer): The FHIR server instance to perform the request.
Yields:
Bundle: Each page of results as a FHIR Bundle.
"""
# Since _fetch_next_page can return None
bundle: Optional[Bundle] = first_bundle
while bundle:
yield bundle
bundle = _fetch_next_page(bundle, server)

122 changes: 0 additions & 122 deletions fhirclient/client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
import logging
import urllib
from typing import Optional, Iterable

import requests
from .models.bundle import Bundle

from .server import FHIRServer, FHIRUnauthorizedException, FHIRNotFoundException

__version__ = '4.3.0'
Expand Down Expand Up @@ -244,119 +238,3 @@ def from_state(self, state):

def save_state (self):
self._save_func(self.state)

# MARK: Pagination
def _fetch_next_page(self, bundle: Bundle) -> Optional[Bundle]:
"""
Fetch the next page of results using the `next` link provided in the bundle.
Args:
bundle (Bundle): The FHIR Bundle containing the `next` link.
Returns:
Optional[Bundle]: The next page of results as a FHIR Bundle, or None if no "next" link is found.
"""
next_link = self._get_next_link(bundle)
if next_link:
sanitized_next_link = self._sanitize_next_link(next_link)
return self._execute_pagination_request(sanitized_next_link)
return None

def _get_next_link(self, bundle: Bundle) -> Optional[str]:
"""
Extract the `next` link from the Bundle's links.
Args:
bundle (Bundle): The FHIR Bundle containing pagination links.
Returns:
Optional[str]: The URL of the next page if available, None otherwise.
"""
if not bundle.link:
return None

for link in bundle.link:
if link.relation == "next":
return link.url
return None

def _sanitize_next_link(self, next_link: str) -> str:
"""
Sanitize the `next` link to ensure it is safe to use.
Args:
next_link (str): The raw `next` link URL.
Returns:
str: The sanitized URL.
Raises:
ValueError: If the URL scheme or domain is invalid.
"""
parsed_url = urllib.parse.urlparse(next_link)

# Validate scheme and netloc (domain)
if parsed_url.scheme not in ["http", "https"]:
raise ValueError("Invalid URL scheme in `next` link.")
if not parsed_url.netloc:
raise ValueError("Invalid URL domain in `next` link.")

# Additional sanitization if necessary, e.g., removing dangerous query parameters
query_params = urllib.parse.parse_qs(parsed_url.query)
sanitized_query = {k: v for k, v in query_params.items()}

# Rebuild the sanitized URL
sanitized_url = urllib.parse.urlunparse(
(
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
urllib.parse.urlencode(sanitized_query, doseq=True),
parsed_url.fragment,
)
)

return sanitized_url

def _execute_pagination_request(self, sanitized_url: str) -> Optional[Bundle]:
"""
Execute the request to retrieve the next page using the sanitized URL.
Args:
sanitized_url (str): The sanitized URL to fetch the next page.
Returns:
Optional[Bundle]: The next page of results as a FHIR Bundle, or None.
Raises:
HTTPError: If the request fails due to network issues or server errors.
"""
try:
# Use requests.get directly to make the HTTP request
response = requests.get(sanitized_url)
response.raise_for_status()
next_bundle_data = response.json()
next_bundle = Bundle(next_bundle_data)

return next_bundle

except requests.exceptions.HTTPError as e:
# Handle specific HTTP errors as needed, possibly including retry logic
raise e

def iter_pages(self, first_bundle: Bundle) -> Iterable[Bundle]:
"""
Iterator that yields each page of results as a FHIR Bundle.
Args:
first_bundle (Bundle): The first Bundle to start pagination.
Yields:
Bundle: Each page of results as a FHIR Bundle.
"""
bundle = first_bundle
while bundle:
yield bundle
bundle = self._fetch_next_page(bundle)

2 changes: 1 addition & 1 deletion fhirclient/models/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self, jsondict=None, strict=True):
Type `str`. """

super(Bundle, self).__init__(jsondict=jsondict, strict=strict)

def elementProperties(self):
js = super(Bundle, self).elementProperties()
js.extend([
Expand Down
68 changes: 51 additions & 17 deletions fhirclient/models/fhirsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,22 @@
# 2014, SMART Health IT.

import logging
import warnings
from typing import Iterator, TYPE_CHECKING


from . import fhirreference
from .._utils import iter_pages

try:
from urllib import quote_plus
except Exception as e:
from urllib.parse import quote_plus

if TYPE_CHECKING:
from fhirclient.models.resource import Resource
from fhirclient.models.bundle import Bundle

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -110,35 +118,61 @@ def include(self, reference_field, reference_model=None, reverse=False):
self.includes.append((reference_model, reference_field, reverse))
return self

def perform(self, server):
def perform(self, server) -> 'Bundle':
""" Construct the search URL and execute it against the given server.
:param server: The server against which to perform the search
:returns: A Bundle resource
"""
# Old method with deprecation warning
warnings.warn(
"perform() is deprecated and will be removed in a future release. "
"Please use perform_iter() instead.",
DeprecationWarning,
)

if server is None:
raise Exception("Need a server to perform search")

from . import bundle
res = server.request_json(self.construct())
bundle = bundle.Bundle(res)
bundle.origin_server = server
return bundle

def perform_resources(self, server):
""" Performs the search by calling `perform`, then extracts all Bundle
entries and returns a list of Resource instances.

from .bundle import Bundle
return Bundle.read_from(self.construct(), server)

# Use forward references to avoid circular imports
def perform_iter(self, server) -> Iterator['Bundle']:
""" Perform the search by calling `perform` and return an iterator that yields
Bundle instances.
:param server: The server against which to perform the search
:returns: An iterator of Bundle instances
"""
return iter_pages(self.perform(server), server)

def perform_resources(self, server) -> 'list[Resource]':
""" Performs the search by calling `perform_resources_iter` and returns a list of Resource instances.
:param server: The server against which to perform the search
:returns: A list of Resource instances
"""
bundle = self.perform(server)
resources = []
if bundle is not None and bundle.entry is not None:
# Old method with deprecation warning
warnings.warn(
"perform_resources() is deprecated and will be removed in a future release. "
"Please use perform_resources_iter() instead.",
DeprecationWarning,
)

return list(self.perform_resources_iter(server))

# Use forward references to avoid circular imports
def perform_resources_iter(self, server) -> Iterator['Resource']:
""" Performs the search by calling `perform_iter` and yields Resource instances
from each Bundle returned by the search.
:param server: The server against which to perform the search
:returns: An iterator of Resource instances
"""
for bundle in self.perform_iter(server):
for entry in bundle.entry:
resources.append(entry.resource)

return resources
yield entry.resource


class FHIRSearchParam(object):
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ testpaths = "tests"
tests = [
"pytest >= 2.5",
"pytest-cov",
"responses",
]
Loading

0 comments on commit 3ec3ef3

Please sign in to comment.