AsyncClient
pingthings.timeseries.async_client.AsyncClient
¶
Asynchronous representation of the client connection to the timeseries platform.
Advanced user feature
For most customers, using the synchronous client (which will leverage the asynchronous client under the hood) will be sufficient. If you need to leverage the asynchronous functions yourself, this client can be used.
| METHOD | DESCRIPTION |
|---|---|
connect |
Connect to the timeseries platform and return an asynchronous client. |
create |
Create a new stream with the given uuid and collection name. |
create_device |
Creates a device with fields matching the passed dictionary. |
create_unit |
Create a new representation of a unit definition in the platform. |
delete_device |
Deletes the device with the specified id |
delete_unit |
Remove a unit from the platform. |
get_collection_properties |
Get properties of a collection. |
get_device |
Returns a device with a matching id in the device table. |
get_unit |
Returns a unit with a matching id in the unit table. |
info |
Retrieve information about the server and proxy server the client is connected to. |
list_collections |
Returns a list of collection paths using the |
list_devices |
Returns a list of devices the user has authorization to view. |
list_units |
Returns the list of units in the system. |
set_collection_retention |
Set retention policy on a collection of streams. |
sql_query |
Performs a SQL query on the database metadata and returns a list of |
stream_from_uuid |
Retrieve a stream based on its UUID. |
streams_in_collection |
Search for streams matching given parameters |
streamset_from_uuids |
Return an |
update_device |
Updates a |
update_unit |
Updates a |
| ATTRIBUTE | DESCRIPTION |
|---|---|
concurrency_limit |
The concurrency limit for background asynchronous operations.
|
Attributes¶
concurrency_limit
property
writable
¶
concurrency_limit
The concurrency limit for background asynchronous operations.
Setting the value overrides the environment variable
If you choose to set a custom concurrency_limit, this will bypass the environment variable PINGTHINGS_CONCURRENCY_LIMIT value.
Functions¶
connect
async
staticmethod
¶
connect(
profile: Optional[str] = None,
endpoint: Optional[str] = None,
apikey: Optional[str] = None,
concurrency_limit: Optional[int] = None,
) -> AsyncClient
Connect to the timeseries platform and return an asynchronous client.
Advanced user feature
For most customers, using the synchronous client (which will leverage the asynchronous client under the hood) will be sufficient. If you need to leverage the asynchronous functions yourself, this function can be useful.
Connecting to the platform for commercial customers
If you are a commercial customer and are using the PingThings JupyterHub/Lab environment to work with the timeseries platform, the relevant connection information has already been added to your session in the form of environment variables. All you need to do to connect to the platform is the following:
import pingthings as pt
async_conn = await pt.timeseries.AsyncClient.connect()
Choosing a concurrency limit
The default concurrency limit is defined in pingthings.timeseries.constants. If the environment variable is not set, it will use this default value.
Using a large concurrency limit has the ability to cause large memory consumption, and if the job gets killed, it can also lead to a hard to kill zombie process.
Most users will probably be fine with the default concurrency limit, but feel free to explore larger values.
| PARAMETER | DESCRIPTION |
|---|---|
|
The name of a profile containing the required connection information as found in the user's predictive grid credentials file |
|
The address and port of the cluster to connect to, e.g. |
|
The API key used to authenticate requests, if not set, the key is looked up from the environment variable |
|
The maximum number of concurrent database requests to have in flight at any one time, if not set, will be inferred from environment variable |
| RETURNS | DESCRIPTION |
|---|---|
AsyncClient
|
A timeseries client. |
Examples:
Connecting to the timeseries platform as a commercial customer in the PingThings provided JupyterHub/Lab environment This behavior also works if you have the environment variables set, refer to the above docstring for more information.
>>> import pingthings as pt
>>> conn = await pt.timeseries.AsyncClient.connect()
Connecting to the timeseries platform when you know your api key and FQDN endpoint.
>>> import pingthings as pt
>>> my_key = "ABC123"
>>> my_endpoint = "example.com:4411"
>>> conn = await pt.timeseries.AsyncClient.connect(apikey=my_key, endpoint=my_endpoint)
Connecting to the platform when you have a populated ${HOME}/.predictivegrid/credentials.yaml file with profiles.
>>> import pingthings as pt
>>> conn = await pt.timeseries.AsyncClient.connect(profile='my_server')
create
async
¶
create(
uuid: UUID,
collection: str,
tags: dict[str, str] = {},
annotations: dict[str, str] = {},
) -> AsyncStream
Create a new stream with the given uuid and collection name.
| PARAMETER | DESCRIPTION |
|---|---|
|
The UUID for the new stream.
TYPE:
|
|
The collection to which the stream belongs.
TYPE:
|
|
Tags associated with the stream. |
|
Annotations for the stream. |
| RETURNS | DESCRIPTION |
|---|---|
AsyncStream
|
The newly created stream. |
Required and Allowable Tags/Annotations
The name field in tags must be set and currently all tags/annotations must be encoded as strings and two streams can not share both the same collection and name.
Available tags in the stream table
| tag_name | Description |
|---|---|
| name | name of the stream |
| source | Where did the stream come from |
| device_id | What device is the stream attached to |
| samples_per_second | What is the nominal report rate of the data, in Hz? |
| time_precision | How accurate is each timestep (+- nanoseconds) |
| value_precision | Bit precision |
| dynamic | Can the timeseries stream send us data at varying report rates? |
| continuous | Should we expect a continuous stream of data? |
| description | Additional information about the timeseries stream |
| alt_name | Additional name that the stream may be referenced by |
| alt_id | Alternative ID (most likely SignalID from STTP) |
| norm_factor | A factor to scale the measurement by to normalize. Like “baseKV” for voltage, etc. |
| scaling_factor | The “m” in y=mx + b factor, line-line scaling factor, etc |
| bias_factor | The “b” in y=mx+b |
| hidden | Whether the stream should be presented to non-admin users |
create_device
async
¶
Creates a device with fields matching the passed dictionary. name can not be None or an empty string, two
devices can not share the same name and geo, and enabled=True by default.
| PARAMETER | DESCRIPTION |
|---|---|
|
dictionary with the possible keys and description of expected values below. |
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
A dictionary with the same fields passed with create plus the |
Available columns in the device table
| column_name | Description |
|---|---|
| name | Name of device |
| device_type | Type of device (DFR/Relay/etc) |
| description | User readable description |
| geo | Coordinates, a dict {"latitude":float,"logitude":float} |
| elevation | Elevation of the device, float |
| alt_id | Alternative id, int |
| alt_name | Alternative name |
| enabled | Whether the device is expected to be sending data, bool default: True |
| owner | Who owns the device |
| protocol | What protocol the device is using to send data |
| model_name | The model name of the device |
| vendor | The company that creates/sells the device |
create_unit
async
¶
create_unit(
name: str,
symbol: str,
base: float,
canonical: str,
source_name: list[str],
) -> dict[str, Any]
Create a new representation of a unit definition in the platform.
Units are a main way to assign further meaning to the timeseries stored in the platform.
There are usually a handful of default units provided with each cluster, however, based on
user needs, additional units will need to be included to support the wide range of timeseries added to the platform.
| PARAMETER | DESCRIPTION |
|---|---|
|
Name of the new unit.
TYPE:
|
|
Symbol of the new unit.
TYPE:
|
|
Multiplicative factor to convert the unit to the canonical representation.
TYPE:
|
|
The "main" unit that this unit can be ultimately converted to.
TYPE:
|
|
List of other potential |
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
Dictionary with fields corresponding to the values in the database. |
Examples:
>>> unit_def = {"name":"volts", "symbol":"V", "base":1.0, "canonical":"volts", source_name:["VPHM", "voltage", "VOLTS", "VLTGE"]}
>>> unit = await async_client.create_unit(**unit_def)
>>> print(unit)
{'id': 6,
'name': 'volts',
'symbol': 'V',
'canonical': 'volts',
'base': 1.0,
'source_name': ['VPHM', 'voltage', 'VOLTS', 'VLTGE']}
delete_device
async
¶
delete_unit
async
¶
get_device
async
¶
get_unit
async
¶
info
async
¶
list_collections
async
¶
Returns a list of collection paths using the prefix argument for
filtering.
| PARAMETER | DESCRIPTION |
|---|---|
|
Filter collections that start with the string provided, if none passed, will list all collections.
DEFAULT:
|
| RETURNS | DESCRIPTION |
|---|---|
list[str]
|
All collections that match the provided prefix. |
Examples:
Assuming we have the following collections in the platform:
foo, bar, foo/baz, bar/baz
>>> conn = pt.connect()
>>> conn.list_collections().sort()
["bar", "bar/baz", "foo", "foo/bar"]
>>> conn.list_collections(prefix="foo")
["foo", "foo/bar"]
>>> conn.list_collections(prefix="moo")
[]
list_devices
async
¶
list_units
async
¶
set_collection_retention
async
¶
set_collection_retention(
collection: str,
override_per_stream=False,
remove_older_than: Optional[datetime] = None,
) -> None
Set retention policy on a collection of streams.
| PARAMETER | DESCRIPTION |
|---|---|
|
The name of the collection.
|
|
Whether stream-specific retention policy should be overridden.
DEFAULT:
|
|
Trim time period - after which the data will get removed. Not specifying this parameter disables the trimming. |
Examples:
Keep the data for only one week.
>>> conn = pt.connect()
>>> conn.list_collections()
["bar", "foo"]
>>> conn.set_collection_retention("bar", false, datetime.timedelta(days=7))
sql_query
async
¶
Performs a SQL query on the database metadata and returns a list of dictionaries from the resulting cursor.
| PARAMETER | DESCRIPTION |
|---|---|
|
A SQL statement to be executed on the BTrDB metadata. Available columns in the
TYPE:
|
|
A list of parameter values to be sanitized and interpolated into the SQL statement. Using parameters forces value/type checking and is considered a best practice at the very least. |
| RETURNS | DESCRIPTION |
|---|---|
list[Any]
|
The result of the SQL query. |
Available columns in the stream table
| column_name | data_type |
|---|---|
| uuid | uuid |
| collection | character varying |
| name | character varying |
| unit | character varying |
| ingress | character varying |
| property_version | bigint |
| annotations | hstore |
| distiller | character varying |
| created_at | timestamp with time zone |
| updated_at | timestamp with time zone |
| geo | postgis geometry |
| last_written | timestamp with time zone |
| previous_last_written | timestamp with time zone |
| estimated_count_delta | bigint |
| long_term_autoregressive_average | double precision |
| count_last_updated | timestamp with time zone |
| previous_count_last_updated | timestamp with time zone |
| watched | boolean |
stream_from_uuid
¶
stream_from_uuid(uuid: UUID | str) -> AsyncStream
Retrieve a stream based on its UUID.
| PARAMETER | DESCRIPTION |
|---|---|
|
The UUID of the stream. |
| RETURNS | DESCRIPTION |
|---|---|
AsyncStream
|
The stream associated with the provided UUID. |
| RAISES | DESCRIPTION |
|---|---|
TypeError
|
If the provided |
streams_in_collection
async
¶
streams_in_collection(
collection: str = "",
is_collection_prefix: bool = True,
tags: Optional[dict[str, str]] = None,
annotations: Optional[dict[str, str]] = None,
) -> AsyncStreamSet
Search for streams matching given parameters
| PARAMETER | DESCRIPTION |
|---|---|
|
collections to use when searching for streams, case sensitive.
TYPE:
|
|
Whether the collection is a prefix of the whole collection name.
TYPE:
|
|
The tags to identify the stream. |
|
The annotations to identify the stream. |
| RETURNS | DESCRIPTION |
|---|---|
AsyncStreamSet
|
The grouping of streams matching given parameters. |
streamset_from_uuids
async
¶
streamset_from_uuids(
uuids: list[UUID | str], fetch_metadata: bool = True
) -> AsyncStreamSet
Return an AsyncStreamSet from an iterable of UUIDs.
| PARAMETER | DESCRIPTION |
|---|---|
|
List of stream identifiers |
|
Whether to fetch metadata for the streams in the set. Default is True.
TYPE:
|
Advanced user feature
Be cautious about using fetch_metadata=False. Many stream metadata values like collection, name, unit,
tags, annotations will not be available, meaning filtering and other operations that require metadata will
not work.
| RETURNS | DESCRIPTION |
|---|---|
AsyncStreamSet
|
The |
update_device
async
¶
Updates a device with matching device["id"] using fields matching the passed dictionary.
| PARAMETER | DESCRIPTION |
|---|---|
|
dictionary with the possible keys and description of expected values below. |
|
whether to change all unspecified values to
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
Dictionary with the updated fields and original unmodified fields of the device |
Update Behavior
This function updates a field if it has been set, otherwise it inserts a new value. Not passing a value for an existing field will not clear the value from that field. For that you must choose replace=True. If replace=True name can not be None or an empty string. Even with replace=True geo will not be set to None.
Available columns in the device table
| column_name | Description |
|---|---|
| id | Internal id of device |
| name | Name of device |
| device_type | Type of device (DFR/Relay/etc) |
| description | User readable description |
| geo | Coordinates, a dict {"latitude":float,"logitude":float} |
| elevation | Elevation of the device, float |
| alt_id | Alternative id, int |
| alt_name | Alternative name |
| enabled | Whether the device is expected to be sending data, bool default: True |
| owner | Who owns the device |
| protocol | What protocol the device is using to send data |
| model_name | The model name of the device |
| vendor | The company that creates/sells the device |
update_unit
async
¶
Updates a unit_dict with matching id using fields from the passed dictionary.
Modify the unit where the unit_dict['id'] is the id number of the unit you want to update.
Update Behavior
You are not requried to include fields that you want to keep the same. Although including them will not cause issues.
| PARAMETER | DESCRIPTION |
|---|---|
|
Dictionary with fields corresponding to the values in the database you want to update |
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
Dictionary with the updated fields and original unmodified fields of the unit |
Examples:
>>> original_unit = await async_client.get_unit(6)
>>> print(original_unit)
{'id': 6,
'name': 'volts',
'symbol': 'V',
'canonical': 'volts',
'base': 1.0,
'source_name': ['VPHM', 'voltage', 'VOLTS', 'VLTGE']}
>>> updated_unit = {"id":6, "name":"volt"}
>>> await async_client.update_unit(updated_unit)
{'id': 6,
'name': 'volt',
'symbol': 'V',
'canonical': 'volts',
'base': 1.0,
'source_name': ['VPHM', 'voltage', 'VOLTS', 'VLTGE']}