Source code for wcs.model

"""
Classes holding information about coverages on a WCS server.
"""

# postpone evaluations of type annotations
# https://stackoverflow.com/a/33533514
from __future__ import annotations

import re
import textwrap
from dataclasses import dataclass
from datetime import datetime
from typing import Union, Optional
from urllib.parse import parse_qs, urlparse

[docs] BoundType = Union[int, float, str, datetime]
"""Type for axis interval bounds.""" @dataclass
[docs] class BasicCoverage: """ Holds basic coverage information extracted from the WCS GetCapabilities document, notably the WGS bounding box if provided. :param name: the coverage name. :param subtype: coverage subtype, e.g. 'ReferenceableGridCoverage' :param bbox: bounding box in native CRS :param lon_lat: a tuple of longitude / latitude axes respresenting the WGS84 bounding box of the coverage :param size_bytes: coverage size in bytes; None if not reported by the server :param additional_params: additional key/value parameters """ def __init__(self, name: str, subtype: str = None, bbox: BoundingBox = None, lon_lat: tuple[Axis, Axis] = None, size_bytes: int = None, additional_params: dict[str, str] = None):
[docs] self.name = name
"""Coverage name"""
[docs] self.subtype = subtype
"""Coverage subtype, e.g. ReferenceableGridCoverage"""
[docs] self.bbox = bbox
"""Bounding box of all coverage axes in native CRS""" self.lon, self.lat = lon_lat or (None, None) """Longitude / Latitude axes describing the WGS84 bounding box of the coverage"""
[docs] self.size_bytes = size_bytes
"""Coverage size in bytes; None if not reported by the server"""
[docs] self.additional_params = additional_params
"""A dictionary of additional key/value parameters if reported by the server""" def __str__(self): ret = self.name + ':' if self.subtype is not None: ret += f'\n subtype: {self.subtype}' if self.bbox is not None: ret += f'\n{self.bbox}' if self.lon is not None: ret += f'\n WGS84 bbox:{self.lon}{self.lat}' if self.size_bytes is not None: ret += f'\n size in bytes: {self.size_bytes}' if self.additional_params is not None and len(self.additional_params) > 0: additional_params = _dict_to_yaml(self.additional_params, 4) ret += f'\n additional params:\n{additional_params}' return ret
[docs] def is_local(self) -> bool: """ :return: True if the coverage is local on the server, False if it's remote. """ return '--' not in self.name
[docs] def is_remote(self) -> bool: """ :return: True if the coverage is not local on the server, False otherwise. """ return not self.is_local()
@dataclass
[docs] class FullCoverage: """ Holds full coverage information extracted from the WCS DescribeCoverage. :param name: the coverage name. :param bbox: bounding box in native CRS :param grid_bbox: grid bounding box :param range_type: coverage range type """ def __init__(self, name: str, bbox: BoundingBox, grid_bbox: BoundingBox, range_type: RangeType, metadata: dict = None):
[docs] self.name = name
[docs] self.bbox = bbox
[docs] self.grid_bbox = grid_bbox
[docs] self.range_type = range_type
[docs] self.metadata = metadata or {}
def __str__(self): ret = self.name + ':' if self.bbox is not None: ret += f'\n{self.bbox}' if self.grid_bbox is not None: ret += f'\n{self.grid_bbox}' if self.range_type is not None: ret += f'\n{self.range_type}' if len(self.metadata) > 0: metadata = _dict_to_yaml(self.metadata) metadata = textwrap.indent(metadata, ' ' * 4) ret += f'\n metadata:\n{metadata}' return ret
[docs] def is_local(self) -> bool: """ :return: True if the coverage is local on the server, False if it's remote. """ return '--' not in self.name
[docs] def is_remote(self) -> bool: """ :return: True if the coverage is not local on the server, False otherwise. """ return not self.is_local()
@dataclass
[docs] class Axis: """ An axis with a name, low/upper bounds, a CRS, uom, resolution, coefficients. A subset of the coefficients (axis coordinates) can be retrieved with the [] operator, e.g. for an irregular temporal axis: axis["2024-01-01" : "2024-01-31"]. See :meth:`__getitem__` for more details. :param name: Name of the axis. :param low: Lower bound of the axis. :param high: Upper bound of the axis. :param crs: Coordinate Reference System, e.g., "EPSG:4326". :param uom: Unit of measure, e.g., "degree". :param resolution: Axis resolution, for regular axes. :param coefficients: Axis coefficients for irregular axes. """
[docs] name: str
[docs] low: BoundType
[docs] high: BoundType
[docs] crs: Optional[str] = None
[docs] uom: Optional[str] = None
[docs] resolution: Optional[BoundType] = None
[docs] coefficients: Optional[list[BoundType]] = None
def __str__(self): indent = '\n ' ret = f'{indent}{self.name}:' indent += ' ' ret += f'{indent}low: {_bound_to_str(self.low)}' ret += f'{indent}high: {_bound_to_str(self.high)}' if self.crs is not None: ret += f'{indent}crs: {Crs.to_short_notation(self.crs)}' if self.uom is not None: ret += f'{indent}uom: {self.uom}' if self.resolution is not None: ret += f'{indent}resolution: {self.resolution}' if self.resolution is not None: ret += f'{indent}type: regular' if self.coefficients is not None: ret += f'{indent}type: irregular' if self.coefficients is not None: coefficients = ', '.join([_bound_to_str(c) for c in self.coefficients]) coefficients = '[' + coefficients + ']' offset = ' ' * len(' coefficients: [') coefficients = textwrap.fill(coefficients, width=120, initial_indent=offset, subsequent_indent=offset, break_long_words=False) # remove the initial_indent which was added only to make sure the table width is consistent coefficients = coefficients.strip() ret += f'{indent}coefficients: {coefficients}' return ret
[docs] def is_temporal(self) -> bool: """ Returns: True if this axis is a temporal axis (e.g. ansi), False otherwise. """ return isinstance(self.low, datetime)
[docs] def is_spatial(self) -> bool: """ Returns: True if this axis is a spatial axis (e.g. Lat, Lon, E, N), False otherwise. """ return not self.is_temporal()
[docs] def is_irregular(self) -> bool: """ Returns: True if this axis is an irregular axis, False otherwise. """ return self.coefficients is not None and len(self.coefficients) > 0
[docs] def is_regular(self) -> bool: """ Returns: True if this axis is a regular axis, False otherwise. """ return self.resolution is not None
def __getitem__(self, item) -> list[BoundType]: """ - If :attr:`coefficients` is not None, then they are subsetted according to ``item`` - Otherwise, a list of coefficients is generated according to the :attr:`resolution`, between the start and stop provided by the item slice. :param item: must be a :class:`slice` object with a start and stop set; the step is ignored. The start and stop must be valid coordinates in the axis :attr:`crs` and within the :attr:`low` / :attr:`high` bounds of this object. :raises WCSClientException: - if :attr:`coefficients` and :attr:`resolution` are both None. - if ``item`` is not a slice object - if the start / stop of ``item`` are invalid coordinates """ if not isinstance(item, slice): raise WCSClientException(f"Invalid coordinates provided for operator [] " f"on axis {self.name}, expected a slice of the form start:stop.") if item.stop is None: raise WCSClientException(f"No upper limit provided for operator [] on axis {self.name}.") temporal = self.is_temporal() regular = self.is_regular() irregular = self.is_irregular() if not regular and not irregular: raise WCSClientException(f"operator [] is inapplicable to axis {self.name} " f"without a resolution or coefficients.") if temporal and not irregular: raise WCSClientException(f"operator [] is inapplicable to regular " f"temporal axis {self.name}.") start = item.start stop = item.stop # parse string datetime to datetimes if needed, and make sure all datetime have the same tzinfo if temporal: tz = self.coefficients[0].tzinfo if isinstance(start, str): start = datetime.fromisoformat(start) elif not isinstance(start, datetime): raise WCSClientException(f"Invalid type of start coordinate provided for operator [] " f"on axis {self.name}, expected either a string or a datetime.") if isinstance(stop, str): stop = datetime.fromisoformat(stop).replace(tzinfo=tz) elif not isinstance(stop, datetime): raise WCSClientException(f"Invalid type of stop coordinate provided for operator [] " f"on axis {self.name}, expected either a string or a datetime.") start = start.replace(tzinfo=tz) stop = stop.replace(tzinfo=tz) coefficients = self.get_coefficients() return [c for c in coefficients if start <= c <= stop]
[docs] def get_coefficients(self) -> list[BoundType]: """ :return: a list of coefficients, automatically generated if this is a regular axis. """ if self.is_irregular(): return self.coefficients if not self.is_regular(): raise WCSClientException(f"{self.name} is not a regular or irregular " f"axis, cannot calculate coefficients.") ret = [] current = self.low while current <= self.high: ret.append(current) current += self.resolution return ret
@dataclass
[docs] class BoundingBox: """ The bounding box of a coverage, containing low/high limits of all its axes. The axes can be accessed through the :attr:`axes` attribute, or through the subscript operator, e.g. .. code:: python bbox[1] # get the second axis bbox['Lat'] # get the axis with name Lat :param crs: native CRS of the axis coordinates :param axes: a list of :class:`Axis` objects """ def __init__(self, axes: list[Axis], crs: Optional[str]):
[docs] self.axes = axes
[docs] self.crs = crs
def __str__(self): bbox_type = 'grid_bbox' ret = '' if self.crs is not None: ret += f' crs: {Crs.to_short_notation(self.crs)}\n' bbox_type = 'bbox' ret += f' {bbox_type}:{_list_to_str(self.axes, "")}' return ret def __getitem__(self, index: Union[int, str]) -> Axis: """ Get the :class:`Axis` object from the :attr:`axes` list according to the specified ``index``. The ``index`` can be an axis name, or an index of the axes list. :raise KeyError: if the axis name is not found, the axis index is out of bounds, or the ``index`` is not an int or string. """ if isinstance(index, int): return self.axes[index] if isinstance(index, str): for axis in self.axes: if axis.name == index: return axis axis_names = ', '.join([axis.name for axis in self.axes]) raise KeyError(f"Axis '{index}' not found in the BoundingBox axes: {axis_names}") raise KeyError(f"Axis index has an invalid type {index.__class__}," f"expected a string or int.") def __getattr__(self, item): """ Get the :class:`Axis` object from the :attr:`axes` list according to the specified ``item``. The ``item`` can be an axis name, or an index of the axes list. :raise KeyError: if the axis name is not found, the axis index is out of bounds, or the ``item`` is not an int or string. """ return self.__getitem__(item)
@dataclass
[docs] class RangeType: """ Represents the range type of a coverage, indicating the structure of the data. The range type consists of a list of field types (:class:`Field`). The fields can be accessed through the :attr:`fields` attribute, or through the subscript operator, e.g. .. code:: python range_type[1] # get the second field range_type['blue'] # get the field with name blue :param fields: A list of :class:`Field` objects describing the fields (also known as bands or channels) of a coverage. """ def __init__(self, fields):
[docs] self.fields: list[Field] = fields
""" A list of :class:`Field` objects corresponding to the bands of the coverage. """ def __str__(self): fields = _list_to_str(self.fields, '') ret = f' range_type:{fields}' return ret def __getitem__(self, index: Union[int, str]) -> Field: """ Get the :class:`Field` object from the :attr:`fields` list according to the specified ``index``. The ``index`` can be a field (band) name, or an index of the fields list. :raise KeyError: if the field name is not found, the fields index is out of bounds, or the ``index`` is not an int or string. """ if isinstance(index, int): return self.fields[index] if isinstance(index, str): for field in self.fields: if field.name == index: return field names = ', '.join([field.name for field in self.fields]) raise KeyError(f"Field '{index}' not found in the RangeType fields: {names}") raise KeyError(f"Field index has an invalid type {index.__class__}," f"expected a string or int.") def __getattr__(self, item): """ Get the :class:`Field` object from the :attr:`fields` list according to the specified ``item``. The ``item`` can be a field (band) name, or an index of the fields list. :raise KeyError: if the field name is not found, the fields index is out of bounds, or the ``item`` is not an int or string. """ return self.__getitem__(item)
@dataclass
[docs] class Field: """ A field (also known as band, or channel) in a coverage range type (:class:`RangeType`) It can be either a quantity or a category. It includes information about the field's name, definition, label, description, codespace (only Category), unit of measure (only Quantity), and any nil values. :param name: The name of the field. This can be used to subset bands in WCS GetCoverage requests or WCPS queries. :param is_quantity: Indicates whether this field is a Quantity (:code:`True`) or a Category (:code:`False`). Defaults to :code:`True`. :param definition: A URI that can be resolved to the complete human-readable definition of the property that is represented by the data component. :param label: Short human-readable information about the data component. :param description: A human-readable description of the data. :param codespace: A URL to an external dictionary, taxonomy, or ontology representing the code space. This attribute is only set for category data, i.e., when :attr:`is_quantity` is :code:`False`. :param uom: The unit of measure for this data. :param nil_values: A list of nil values associated with this field. """
[docs] name: str
"""Field name that can be used to subset bands in WCS GetCoverage or WCPS queries."""
[docs] is_quantity: bool = True
"""True if this field is a Quantity, False if it's a Category."""
[docs] definition: str = None
"""A URI that can be resolved to the complete human readable definition of the property that is represented by the data component."""
[docs] label: str = None
"""Short human readable information about the data component."""
[docs] description: str = None
"""Human-readable description of the data."""
[docs] codespace: str = None
""" URL to an external dictionary, taxonomy or ontology representing the code space. Only set for category data, i.e. :attr:`is_quantity` is False."""
[docs] uom: str = None
"""Unit of measure for this data."""
[docs] nil_values: list[NilValue] = None
"""A list of nil values.""" def __str__(self): indent = '\n ' ret = f'{indent}{self.name}:' indent += ' ' ret += f'{indent}type: ' + ('Quantity' if self.is_quantity else 'Category') if self.label is not None: ret += f'{indent}label: {self.label}' if self.description is not None: ret += f'{indent}description: {self.description}' if self.definition is not None: ret += f'{indent}definition: {self.definition}' if self.nil_values is not None and len(self.nil_values) > 0: ret += f'{indent}nil_values: {_list_to_str(self.nil_values, ",")}' if self.codespace is not None: ret += f'{indent}codespace: {self.codespace}' if self.uom is not None: ret += f'{indent}uom: {self.uom}' return ret
@dataclass
[docs] class NilValue: """ Represents a null value with an optional reason. :param nil_value: The null value itself, represented as a string. :param reason: An optional explanation for why the value is null. This is useful for providing context or documentation about the null value. """
[docs] nil_value: str
[docs] reason: Optional[str]
def __str__(self): ret = self.nil_value if self.reason is not None and len(self.reason) > 0: ret += ': ' + self.reason return ret
[docs] class Crs: """Utility class for handling CRS.""" @staticmethod
[docs] def to_short_notation(url: Optional[str]) -> Optional[str]: """ Parse CRS identifiers in `this notation <https://doc.rasdaman.org/05_geo-services-guide.html#crs-notation>`_. :param url: a CRS identifier, e.g. - http://localhost:8080/rasdaman/def/crs/EPSG/0/4326 - EPSG/0/4326 - EPSG:4326 :return: Short CRS notation, e.g. EPSG:4326; None if input is None or the method fails to parse the url. """ if url is None: return None # handle "EPSG:4326" if not '/' in url: return url parsed_url = urlparse(url) path = parsed_url.path.strip('/') # compound urls, e.g. "https://www.opengis.net/def/crs-compound?1=..." if '/crs-compound' in url: query_params = parse_qs(parsed_url.query) ret = [] for _, value in query_params.items(): for subcrs in value: ret.append(Crs.to_short_notation(subcrs)) return '+'.join(ret) # url == "https://www.opengis.net/def/crs/EPSG/0/4326" parts = path.split('/') if len(parts) > 2: authority = parts[-3] version = parts[-2] code = parts[-1] if version == "0": return f'{authority}:{code}' return f'{authority}:{version}:{code}' return None
[docs] class WCSClientException(Exception): """ An exception thrown by this library. """
def _bound_to_str(bound: BoundType) -> str: """ Convert an interval bound to its string representation. If the bound is a `datetime` object, the function formats it as an ISO 8601 string, potentially simplifying the format if the time components are zero. All other types are converted to strings directly. :param bound: The interval bound to convert. :return: A string representation of the bound. For `datetime` objects, the string is enquoted and formatted according to ISO 8601. :meta private: .. note:: This function is intended for internal use within modules where interval bounds need to be serialized to strings. """ if isinstance(bound, datetime): if bound.hour == 0 and bound.minute == 0 and bound.second == 0: ret = bound.strftime("%Y-%m-%d") else: ret = bound.isoformat() return '"' + ret + '"' return str(bound) def _list_to_str(lst: list, sep: str) -> str: """ Convert a list of items into a single string. Each item is converted to a string and separated by a specified separator in the result. :param lst: The list of items to be joined into a string. Each item in the list will be converted to a string before joining. :param sep: The separator to use between each item in the resulting string. :return: A single string containing all items from the list, separated by the specified separator. """ return sep.join([str(item) for item in lst]) _SPECIAL_CHARS_PATTERN = re.compile(r'[\s:{}\[\]()*&|><#%@,?\\=!]') def _dict_to_yaml(d, indent=0): """ Convert a nested Python dictionary to a YAML-formatted string. This function recursively traverses a Python dictionary, including any nested dictionaries and lists, and formats it into a YAML string. The resulting string maintains the hierarchical structure of the input dictionary, with appropriate indentation for nested elements. :param d: The dictionary to be converted into YAML format. It can contain nested dictionaries and lists. :param indent: The current level of indentation. This parameter is used internally by the function for recursive calls to manage indentation. Default is 0. :return: A string representing the input dictionary in YAML format. """ yaml_str = "" for key, value in d.items(): # quote the key if needed if any(c in key for c in ' \t') or \ _SPECIAL_CHARS_PATTERN.search(key) or \ any(ord(c) < 32 for c in key) or \ key[0].isdigit(): key = '"' + key + '"' if isinstance(value, dict): yaml_str += " " * indent + str(key) + ":\n" + _dict_to_yaml(value, indent + 2) elif isinstance(value, list): yaml_str += " " * indent + str(key) + ":\n" for item in value: yaml_str += " " * (indent + 2) + "- " if isinstance(item, dict): yaml_str += "\n" + _dict_to_yaml(item, indent + 4) else: yaml_str += str(item) + "\n" else: yaml_str += " " * indent + str(key) + ": " + str(value) + "\n" return yaml_str