api_24sea.utils#
Utility functions and classes.
Functions#
|
Return True when running under IPython-based shells. |
|
Select the appropriate tqdm variant for the active environment. |
|
Normalize and validate an HTTP method. |
|
Normalize a non-GET payload to the backend JSON contract. |
|
Build |
|
Handle the request to the 24SEA API and manage errors using httpx. |
|
Asynchronously handle the request to the 24SEA API using httpx's |
|
Convert a defaultdict to a regular dictionary. |
|
Flatten nested list payloads returned by multi-location endpoints. |
|
Normalize grouped request values while preserving order. |
|
Decorator to ensure authentication before executing a method |
|
Parse timestamp column in DataFrame using multiple format attempts. |
|
Estimate the optimal chunk size for processing tasks based on the expected |
|
Syncronously fetch metrics data for the datasignals API app. |
|
Syncronously fetch metrics data for the datasignals API app. |
|
Asynchronously fetch availability data for a site/location. |
|
Asyncronously fetch metrics data for the datasignals API app. |
|
Synchronously fetch oldest timestamp data for a site/location. |
|
Set the number of threads to use for processing. |
|
Parse a list of statistics dictionaries into a DataFrame. |
|
Get the overview information for statistics DataFrame. |
|
Convert the statistics DataFrame to a dictionary format. |
|
Convert the metrics DataFrame to a dictionary format. |
|
Convert a pandas Series to a specified data type. |
|
Convert a column in a DataFrame to a specified data type. |
|
Convert a daily availability dataframe to a calendar monthly availability |
Module Contents#
- is_executable_ipython() bool#
Return True when running under IPython-based shells.
Returns#
- bool
True for IPython or Jupyter shells, False for standard Python.
Examples#
>>> isinstance(is_executable_ipython(), bool) True
- run_tqdm(iterable: Iterable, *args: Any, exec_env: bool, **kwargs: Any)#
Select the appropriate tqdm variant for the active environment.
Parameters#
- iterableIterable
Items to iterate over.
- argsAny
Positional arguments forwarded to
tqdm.- exec_envbool
True when the notebook-aware progress bar should be used.
- kwargsAny
Keyword arguments forwarded to
tqdm.
Returns#
- tqdm.std.tqdm
Configured progress iterator.
Examples#
>>> list(run_tqdm(range(2), exec_env=False, disable=True)) [0, 1]
- normalize_http_method(method: str) str#
Normalize and validate an HTTP method.
Parameters#
- methodstr
HTTP verb to normalize.
Returns#
- str
Upper-case HTTP method.
Raises#
- ValueError
If the provided method is not supported.
- build_json_request_payload(params: Dict | None, json: Dict | None = None) Dict | None#
Normalize a non-GET payload to the backend JSON contract.
Parameters#
- paramsdict, optional
Request parameters produced by the caller.
- jsondict, optional
Explicit JSON payload fallback.
Returns#
- dict or None
Normalized JSON payload.
- build_httpx_request_kwargs(method: str, params: Dict, json: Dict | None = None) Tuple[Dict | None, Dict | None]#
Build
httpxrequest kwargs based on the HTTP method.GET requests keep using query-string parameters. Non-GET requests build the request payload from
paramsand only fall back tojsonwhenparamsis empty.Parameters#
- methodstr
Normalized HTTP method.
- paramsdict
Parameters provided by the caller.
- jsondict, optional
Explicit JSON payload fallback.
Returns#
- Tuple[Optional[Dict], Optional[Dict]]
paramsandjsonvalues to forward tohttpx.request.
- handle_request(url: str, params: Dict, auth: httpx.BasicAuth | None, headers: Dict[str, str] | None = {'accept': 'application/json'}, max_retries: int = 10, timeout: int = 3600, method: str = 'GET', json: Dict | None = None) httpx.Response#
Handle the request to the 24SEA API and manage errors using httpx.
This function will handle the request to the 24SEA API and manage any errors that may arise. If the request is successful, the response object will be returned. Otherwise, an error will be raised.
Parameters#
- urlstr
The URL to which to send the request.
- paramsdict
The parameters to send with the request.
- authhttpx.BasicAuth
The authentication object.
- headersdict
The headers to send with the request.
Returns#
- httpx.Response
The response object if the request was successful, otherwise error.
- async handle_request_async(url: str, params: Dict, auth: httpx.BasicAuth | None, headers: Dict[str, str] | None = {'accept': 'application/json'}, max_retries: int = 10, timeout: int = 1800, method: str = 'GET', json: Dict | None = None) httpx.Response#
Asynchronously handle the request to the 24SEA API using httpx’s AsyncClient. Supports GET, POST, PUT, PATCH, DELETE methods.
- default_to_regular_dict(d_: DefaultDict | Dict) Dict#
Convert a defaultdict to a regular dictionary.
- flatten_api_response(payload: Any) Any#
Flatten nested list payloads returned by multi-location endpoints.
- normalize_group_values(values: List[str]) List[str]#
Normalize grouped request values while preserving order.
Site-level grouping can repeat the special
allmetric once per location. The API only accepts it as a single value.
- require_auth(func)#
Decorator to ensure authentication before executing a method
- parse_timestamp(df: pandas.DataFrame, formats: Iterable[str] = ('ISO8601', 'mixed'), dayfirst: bool = False, keep_index_only: bool = True) pandas.DataFrame#
Parse timestamp column in DataFrame using multiple format attempts.
Parameters#
- dfpandas.DataFrame
Input DataFrame containing timestamp column or index
- formatsIterable[str], default (‘ISO8601’, ‘mixed’)
List of datetime format strings to try
- dayfirstbool, default False
Whether to interpret dates as day first
Returns#
- pandas.DataFrame
DataFrame with parsed timestamp column
Raises#
- ValueError
If timestamp parsing fails with all formats
- estimate_chunk_size(tasks: list, start_timestamp: str | datetime.datetime, end_timestamp: str | datetime.datetime, grouped_metrics: Iterable, selected_metrics: pandas.DataFrame | None = None, target: str = 'metric')#
Estimate the optimal chunk size for processing tasks based on the expected data volume. This function calculates the estimated size of the data request in megabytes (MB) by considering the number of data points, the number of tasks, and the bytes required per metric. It then determines an appropriate chunk size for processing the tasks efficiently.
Parameters#
- taskslist
List of tasks to be processed.
- queryobject
Query object containing at least start_timestamp and end_timestamp attributes.
- grouped_metricsiterable
Iterable of grouped metrics, where each group is a tuple (key, group), and group is typically a DataFrame.
- selected_metricspandas.DataFrame or None
DataFrame containing selected metrics with at least a “metric” column and optionally a “data_group” column.
- targetstr, default “metric”
The target column name in selected_metrics and grouped_metrics
Returns#
- dict
- Dictionary with the following keys:
“total_mb”: float, estimated total size of the request in MB.
“n_tasks”: int, number of tasks.
“chunk_size”: int, recommended chunk size for processing.
Notes#
The function assumes each data point is a float64 (8 bytes) unless overridden by the “data_group”.
The number of data points is estimated as one every 10 minutes between the start and end timestamps.
Chunk size is determined based on the estimated total data size.
- fetch_data_sync(url, site: str, locations: str | List[str], start_timestamp: datetime.datetime | str, end_timestamp: datetime.datetime | str, headers: Dict[str, str] | None, group: pandas.DataFrame, auth: httpx.BasicAuth | None, timeout: int, target: str = 'metric', force_cache_miss: bool = False, method: str = 'GET') Any#
Syncronously fetch metrics data for the datasignals API app.
- fetch_availability_sync(url, site: str, locations: str | List[str], start_timestamp: datetime.datetime | str, end_timestamp: datetime.datetime | str, granularity: str | int, sampling_interval_seconds: int, headers: Dict[str, str] | None, group: pandas.DataFrame, auth: httpx.BasicAuth | None, timeout: int, method: str = 'GET') Any#
Syncronously fetch metrics data for the datasignals API app.
- async fetch_availability_async(url: str, site: str, locations: str | List[str] | None, start_timestamp: datetime.datetime | str, end_timestamp: datetime.datetime | str, granularity: str | int, sampling_interval_seconds: int, headers: Dict[str, str] | None, group: pandas.DataFrame, auth: httpx.BasicAuth | None, timeout: int, max_retries: int, method: str = 'GET') Any#
Asynchronously fetch availability data for a site/location.
- async fetch_data_async(url, site: str, locations: str | List[str], start_timestamp: datetime.datetime | str, end_timestamp: datetime.datetime | str, headers: Dict[str, str] | None, group: pandas.DataFrame, auth: httpx.BasicAuth | None, timeout: int, max_retries: int, as_dict: bool = False, target: str = 'metric', force_cache_miss: bool = False, method: str = 'GET') pandas.DataFrame | Dict[str, Any]#
Asyncronously fetch metrics data for the datasignals API app.
- fetch_oldest_timestamp_sync(url: str, site: str, locations: str | None, headers: Dict[str, str], auth: httpx.BasicAuth | None, timeout: int, as_dict: bool = False, method: str = 'GET') pandas.DataFrame | Dict[str, Any]#
Synchronously fetch oldest timestamp data for a site/location.
- set_threads_nr(threads: int | None, thread_limit: int = 30) int#
Set the number of threads to use for processing.
Parameters#
- threadsOptional[int]
The number of threads to use. If None, the number of available CPU cores will be used.
Returns#
- int
The number of threads to use.
- parse_stats_list(stats_list: List[Dict[str, Any]]) pandas.DataFrame#
Parse a list of statistics dictionaries into a DataFrame.
Parameters#
- stats_listList[Dict[str, Any]]
List of dictionaries containing statistics data.
Returns#
- pd.DataFrame
DataFrame containing the parsed statistics.
- get_stats_overview_info(stats_df: pandas.DataFrame, metrics_overview: pandas.DataFrame | None = None) pandas.DataFrame#
Get the overview information for statistics DataFrame.
Parameters#
- stats_dfpd.DataFrame
DataFrame containing statistics data.
- metrics_overviewpd.DataFrame
DataFrame containing metrics overview information.
Returns#
- pd.DataFrame
DataFrame with overview information merged with stats_df.
- get_stats_as_dict(stats_df: pandas.DataFrame) Dict[str, Dict[str, pandas.DataFrame]]#
Convert the statistics DataFrame to a dictionary format.
Parameters#
- stats_dfpd.DataFrame
DataFrame containing statistics data.
Returns#
- Dict[str, Dict[str, pd.DataFrame]]
Dictionary with site and location as keys and statistics as values.
- get_metrics_data_df_as_dict(metrics_data_df: pandas.DataFrame, selected_metrics: pandas.DataFrame) Dict[str, Dict[str, pandas.DataFrame]]#
Convert the metrics DataFrame to a dictionary format.
Parameters#
- metrics_data_dfpd.DataFrame
DataFrame containing metrics data.
Returns#
- Dict[str, Dict[str, pd.DataFrame]]
Dictionary with site and location as keys and metrics data as values.
- series_to_type(series: pandas.Series, dtype: str | type) pandas.Series | pandas.Timestamp#
Convert a pandas Series to a specified data type.
Parameters#
- seriespd.Series
The Series to convert.
- dtypeUnion[str, type]
The data type to convert the series to.
Returns#
- pd.Series
The Series converted to the specified data type.
Example#
>>> import pandas as pd >>> s = pd.Series([1, 2, 3]) >>> column_to_type(s, float) 0 1.0 1 2.0 2 3.0 dtype: float64
- column_to_type(data: pandas.DataFrame, column: str, dtype: str | type) pandas.DataFrame#
Convert a column in a DataFrame to a specified data type.
Parameters#
- datapd.DataFrame
The DataFrame containing the column to convert.
- columnstr
The column to convert.
- dtypeUnion[str, type]
The data type to convert the column to.
Returns#
- pd.DataFrame
The DataFrame with the column converted to the specified data type.
Example#
>>> import pandas as pd >>> df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]}) >>> column_to_type(df, 'A', float) A B 0 1 4 1 2 5 2 3 6
- calendar_monthly_availability(df: pandas.DataFrame, *, start_timestamp: datetime.datetime | str | None = None, end_timestamp: datetime.datetime | str | None = None, sampling_interval_seconds: int = 600) pandas.DataFrame#
Convert a daily availability dataframe to a calendar monthly availability dataframe. The columns of the input dataframe are assumed to be daily availability values (between 0 and 1). The output dataframe will have the same columns, but the two indices will be year and month. The values will be the mean of the daily availability values in that month.
Parameters#
- dfpd.DataFrame
Input dataframe with daily availability values.
Returns#
- pd.DataFrame
Output dataframe with calendar monthly availability values.
Example#
>>> import pandas as pd >>> data = {'timestamp': pd.date_range(start='2023-01-01', periods=90, ... freq='D'), ... 'availability': [0.9, 0.8, 0.95] * 30} >>> df = pd.DataFrame(data).set_index('timestamp') >>> calendar_monthly_availability(df) availability timestamp 2023-01 0.883333 2023-02 0.883333 2023-03 0.883333