Skip to content

Client

pingthings.timeseries.client.Client

Class that manages reusable state and client connections.

This class provides synchronous methods to connect to an asynchronous client, retrieve client and event loop information, and perform various client-related operations such as querying and streaming.

Info

For most users, this client connection will be the only necessary connection type to work with. Queries are accelerated using the asynchronous methods under the hood. If for some reason you need to use the asynchronous client, refer to pingthings.timeseries.async_client.

Create a synchronous client class that manages state for asynchronous client connections.

PARAMETER DESCRIPTION

loop

The event loop used by the client.

TYPE: ClientEventLoop

client

The asynchronous client instance.

TYPE: AsyncClient

METHOD DESCRIPTION
connect

Establish a synchronous connection to the asynchronous client.

create

Create a new stream with the given uuid and collection name.

create_device

Creates a device with fields matching the passed dictionary. name can not be None or an empty string, two

create_unit

Create a new representation of a unit definition in the platform.

delete_device

Deletes the device with the specified id

delete_unit

Remove a unit from the platform.

get_async_client

Retrieve the asynchronous client instance.

get_collection_properties

Get properties of a collection.

get_device

Returns a device with a matching id in the device table.

get_event_loop

Retrieve the event loop instance.

get_unit

Returns a unit with a matching id in the unit table.

info

Retrieve information about the server and proxy server the client is connected to.

list_collections

Returns a list of collection paths using the prefix argument for

list_devices

Returns a list of devices the user has permission to see.

list_units

Returns a list of units generated in the database.

set_collection_retention

Set retention policy on a collection of streams.

sql_query

Performs a SQL query on the database metadata and returns a list of

stream_from_uuid

Retrieve a stream based on its UUID.

streams_in_collection

Search for streams matching given parameters

streamset_from_uuids

Return a StreamSet from an iterable of UUIDs.

update_device

Updates a device with matching device["id"] using fields matching the passed dictionary. name can not be None

update_unit

Updates a unit_dict with matching id using fields from the passed dictionary.

ATTRIBUTE DESCRIPTION
concurrency_limit

The concurrency limit for background asynchronous operations.

Attributes

concurrency_limit property writable

concurrency_limit

The concurrency limit for background asynchronous operations.

Setting the value overrides the environment variable

If you choose to set a custom concurrency_limit, this will bypass the environment variable PINGTHINGS_CONCURRENCY_LIMIT value.

Functions

connect staticmethod

connect(
    profile: Optional[str] = None,
    endpoint: Optional[str] = None,
    apikey: Optional[str] = None,
    concurrency_limit: Optional[int] = None,
) -> Client

Establish a synchronous connection to the asynchronous client.

PARAMETER DESCRIPTION
profile

The name of a profile containing the required connection information as found in the user's predictive grid credentials file ${HOME}/.predictivegrid/credentials.yaml.

TYPE: Optional[str] DEFAULT: None

endpoint

The address and port of the cluster to connect to, e.g. 192.168.1.1:4411, if not set, will look for the environment variable $BTRDB_ENDPOINTS

TYPE: Optional[str] DEFAULT: None

apikey

The API key used to authenticate requests, if not set, the key is looked up from the environment variable $BTRDB_API_KEY.

TYPE: Optional[str] DEFAULT: None

concurrency_limit

The maximum number of concurrent database requests to have in flight at any one time, if not set, will be inferred from environment variable $PINGTHINGS_CONCURRENCY_LIMIT.

TYPE: Optional[int] DEFAULT: None

RETURNS DESCRIPTION
Client

An instance of the Client class.

Examples:

Connecting to the timeseries platform as a commercial customer in the PingThings provided JupyterHub/Lab environment. This behavior also works if you have the environment variables set, refer to the above docstring for more information.

import pingthings as pt
conn = pt.timeseries.connect()

Connecting to the timeseries platform when you know your api key and FQDN endpoint.

import pingthings as pt
my_key = "ABC123"
my_endpoint = "example.com:4411"

conn = pt.timeseries.connect(apikey=my_key, endpoint=my_endpoint)

Connecting to the platform when you have a populated ${HOME}/.predictivegrid/credentials.yaml file with profiles.

import pingthings as pt
conn = pt.timeseries.connect(profile='my_server')

create

create(
    uuid: UUID,
    collection: str,
    tags: dict[str, str],
    annotations: Optional[dict[str, str]] = None,
) -> Stream

Create a new stream with the given uuid and collection name.

PARAMETER DESCRIPTION
uuid

The UUID for the new stream.

TYPE: UUID

collection

The collection to which the stream belongs.

TYPE: str

tags

Tags associated with the stream.

TYPE: dict[str, str]

annotations

Annotations for the stream.

TYPE: Optional[dict[str, str]] DEFAULT: None

RETURNS DESCRIPTION
Stream

The newly created stream.

Required and Allowable Tags/Annotations

The name field in tags must be set and currently all tags/annotations must be encoded as strings and two streams can not share both the same collection and name.

Available tags in the stream table

column_name Description
name name of the stream
source Where did the stream come from
device_id What device is the stream attached to
samples_per_second What is the nominal report rate of the data, in Hz?
time_precision How accurate is each timestep (+- nanoseconds)
value_precision Bit precision
dynamic Can the timeseries stream send us data at varying report rates?
continuous Should we expect a continuous stream of data?
description Additional information about the timeseries stream
alt_name Additional name that the stream may be referenced by
alt_id Alternative ID (most likely SignalID from STTP)
norm_factor A factor to scale the measurement by to normalize. Like “baseKV” for voltage, etc.
scaling_factor The “m” in y=mx + b factor, line-line scaling factor, etc
bias_factor The “b” in y=mx+b
hidden Whether the stream should be presented to non-admin users

create_device

create_device(device: dict[str, Any]) -> dict[str, Any]

Creates a device with fields matching the passed dictionary. name can not be None or an empty string, two devices can not share the same name and geo, and enabled=True by default.

PARAMETER DESCRIPTION
device

dictionary with the possible keys and description of expected values below.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
dict[str, Any]

A dictionary with the same fields passed with create plus the device["id"] set

Available columns in the device table

column_name Description
name Name of device
device_type Type of device (DFR/Relay/etc)
description User readable description
geo Coordinates, a dict {"latitude":float,"logitude":float}
elevation Elevation of the device, float
alt_id Alternative id, int
alt_name Alternative name
enabled Whether the device is expected to be sending data, bool default: True
owner Who owns the device
protocol What protocol the device is using to send data
model_name The model name of the device
vendor The company that creates/sells the device

create_unit

create_unit(
    name: str,
    symbol: str,
    base: float,
    canonical: str,
    source_name: list[str],
) -> dict[str, Any]

Create a new representation of a unit definition in the platform.

Units are a main way to assign further meaning to the timeseries stored in the platform. There are usually a handful of default units provided with each cluster, however, based on user needs, additional units will need to be included to support the wide range of timeseries added to the platform.

PARAMETER DESCRIPTION
name

Name of the new unit.

TYPE: str

symbol

Symbol of the new unit.

TYPE: str

base

Multiplicative factor to convert the unit to the canonical representation.

TYPE: float

canonical

The "main" unit that this unit can be ultimately converted to.

TYPE: str

source_name

List of other potential names that this unit can be considered in the platform. For example, if the unit name is degrees the source_name could include the following: ["deg", "VPHA", "IPHA", "VpHA", "Deg", "DEG"]

TYPE: list[str]

RETURNS DESCRIPTION
dict[str, Any]

Dictionary with fields corresponding to the values in the database.

Examples:

>>> unit_def = {"name":"volts", "symbol":"V", "base":1.0, "canonical":"volts", source_name:["VPHM", "voltage", "VOLTS", "VLTGE"]}
>>> unit = client.create_unit(**unit_def)
>>> print(unit)
{'id': 6,
'name': 'volts',
'symbol': 'V',
'canonical': 'volts',
'base': 1.0,
'source_name': ['VPHM', 'voltage', 'VOLTS', 'VLTGE']}

delete_device

delete_device(device_id: int) -> None

Deletes the device with the specified id

PARAMETER DESCRIPTION
device_id

Corresponding id of the device to be deleted

TYPE: int

RETURNS DESCRIPTION
None

None

delete_unit

delete_unit(unit_id: int) -> None

Remove a unit from the platform.

PARAMETER DESCRIPTION
unit_id

Integer corresponding to the id of the unit in the database.

TYPE: int

RETURNS DESCRIPTION
None

None

Examples:

>>> client.delete_unit(6)

get_async_client

get_async_client() -> AsyncClient

Retrieve the asynchronous client instance.

RETURNS DESCRIPTION
AsyncClient

The asynchronous client instance.

get_collection_properties

get_collection_properties(
    collection: str,
) -> dict[str, Any]

Get properties of a collection.

PARAMETER DESCRIPTION
collection

The name of the collection.

TYPE: str

Examples:

Get the retention policy of a collection.

>>> conn.get_collection_properties("bar")
{'retention': {'remove_older_than': datetime.timedelta(days=7)}}

get_device

get_device(device_id: int) -> dict[str, Any]

Returns a device with a matching id in the device table.

PARAMETER DESCRIPTION
device_id

Integer corresponding to the id of the device in the database.

TYPE: int

RETURNS DESCRIPTION
dict[str, Any]

Dictionary with fields corresponding to the values in the database.

get_event_loop

get_event_loop() -> ClientEventLoop

Retrieve the event loop instance.

RETURNS DESCRIPTION
ClientEventLoop

The event loop instance used by the client.

get_unit

get_unit(unit_id: int) -> dict[str, Any]

Returns a unit with a matching id in the unit table.

PARAMETER DESCRIPTION
unit_id

Integer corresponding to the id of the unit in the database.

TYPE: int

Returns: Dictionary with fields corresponding to the values in the unit table.

info

info() -> dict[str, Any]

Retrieve information about the server and proxy server the client is connected to.

RETURNS DESCRIPTION
dict[str, Any]

A dictionary containing server and proxy server information.

list_collections

list_collections(prefix: Optional[str] = None) -> list[str]

Returns a list of collection paths using the prefix argument for filtering.

PARAMETER DESCRIPTION
prefix

Filter collections that start with the string provided, if none passed, will list all collections.

DEFAULT: None

RETURNS DESCRIPTION
list[str]

All collections that match the provided prefix.

Examples:

Assuming we have the following collections in the platform: foo, bar, foo/baz, bar/baz

>>> conn = pt.connect()
>>> conn.list_collections().sort()
["bar", "bar/baz", "foo", "foo/bar"]
>>> conn.list_collections(prefix="foo")
["foo", "foo/bar"]

list_devices

list_devices() -> list[dict[str, Any]]

Returns a list of devices the user has permission to see.

RETURNS DESCRIPTION
list[dict[str, Any]]

Dictionary with keys matching the columns in the device table.

list_units

list_units() -> list[dict[str, Any]]

Returns a list of units generated in the database.

RETURNS DESCRIPTION
list[dict[str, Any]]

Dictionary with keys matching the columns in the units table.

set_collection_retention

set_collection_retention(
    collection: str,
    override_per_stream: bool = False,
    remove_older_than: Optional[datetime] = None,
) -> None

Set retention policy on a collection of streams.

PARAMETER DESCRIPTION
collection

The name of the collection.

override_per_stream

Whether stream-specific retention policy should be overridden.

TYPE: bool DEFAULT: False

remove_older_than

Trim time period - after which the data will get removed. Not specifying this parameter disables the trimming.

TYPE: Optional[datetime] DEFAULT: None

Examples:

Keep the data for only one week.

>>> conn.set_collection_retention("bar", false, datetime.timedelta(days=7))

sql_query

sql_query(
    query: str, params: Optional[list[str]] = None
) -> list[Any]

Performs a SQL query on the database metadata and returns a list of dictionaries from the resulting cursor.

PARAMETER DESCRIPTION
query

A SQL statement to be executed on the BTrDB metadata. Available columns in the stream table are noted below. To sanitize inputs use a $1 style parameter such as select * from streams where name = $1 or name = $2.

TYPE: str

params

A list of parameter values to be sanitized and interpolated into the SQL statement. Using parameters forces value/type checking and is considered a best practice at the very least.

TYPE: Optional[list[str]] DEFAULT: None

RETURNS DESCRIPTION
list[Any]

The result of the SQL query.

Available columns in the stream table

column_name data_type
uuid uuid
collection character varying
name character varying
unit character varying
ingress character varying
property_version bigint
annotations hstore
distiller character varying
created_at timestamp with time zone
updated_at timestamp with time zone
geo postgis geometry
last_written timestamp with time zone
previous_last_written timestamp with time zone
estimated_count_delta bigint
long_term_autoregressive_average double precision
count_last_updated timestamp with time zone
previous_count_last_updated timestamp with time zone
watched boolean

stream_from_uuid

stream_from_uuid(uuid: UUID | str) -> Stream

Retrieve a stream based on its UUID.

PARAMETER DESCRIPTION
uuid

The UUID of the stream.

TYPE: UUID | str

RETURNS DESCRIPTION
Stream

The stream associated with the provided UUID.

RAISES DESCRIPTION
TypeError

If the provided uuid is not a valid UUID

streams_in_collection

streams_in_collection(
    collection: str = "",
    is_collection_prefix: bool = True,
    tags: Optional[dict[str, str]] = None,
    annotations: Optional[dict[str, Any]] = None,
) -> StreamSet

Search for streams matching given parameters

PARAMETER DESCRIPTION
collection

collections to use when searching for streams, case sensitive.

TYPE: str DEFAULT: ''

is_collection_prefix

Whether the collection is a prefix of the whole collection name.

TYPE: bool DEFAULT: True

tags

The tags to identify the stream.

TYPE: Optional[dict[str, str]] DEFAULT: None

annotations

The annotations to identify the stream.

TYPE: Optional[dict[str, Any]] DEFAULT: None

RETURNS DESCRIPTION
StreamSet

The grouping of streams matching given parameters.

streamset_from_uuids

streamset_from_uuids(
    uuids: list[UUID | str], fetch_metadata: bool = True
) -> StreamSet

Return a StreamSet from an iterable of UUIDs.

PARAMETER DESCRIPTION
uuids

List of stream identifiers

TYPE: list[UUID | str]

fetch_metadata

Whether to fetch metadata for the streams in the set. Default is True.

TYPE: bool DEFAULT: True

Advanced user feature

Be cautious about using fetch_metadata=False. Many stream metadata values like collection, name, unit, tags, annotations will not be available, meaning filtering and other operations that require metadata will not work.

RETURNS DESCRIPTION
StreamSet

The StreamSet associated with the provided iterable of UUIDs.

update_device

update_device(
    device: dict[str, Any], replace: bool = False
) -> dict[str, Any]

Updates a device with matching device["id"] using fields matching the passed dictionary. name can not be None or an empty string.

PARAMETER DESCRIPTION
device

dictionary with the possible keys and description of expected values below.

TYPE: dict[str, Any]

replace

whether to change all unspecified values to None (device["enabled"]=True by default).

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict[str, Any]

Dictionary with the updated fields and original unmodified fields of the device

Update Behavior

This function updates a field if it has been set, otherwise it inserts a new value. Not passing a value for an existing field will not clear the value from that field. For that you must choose replace=True. If replace=True name can not be None or an empty string. Even with replace=True geo will not be set to None.

Available columns in the device table

column_name Description
id Internal id of device
name Name of device
device_type Type of device (DFR/Relay/etc)
description User readable description
geo Coordinates, a dict {"latitude":float,"logitude":float}
elevation Elevation of the device, float
alt_id Alternative id, int
alt_name Alternative name
enabled Whether the device is expected to be sending data, bool default: True
owner Who owns the device
protocol What protocol the device is using to send data
model_name The model name of the device
vendor The company that creates/sells the device

update_unit

update_unit(unit_dict: dict[str, Any]) -> dict[str, Any]

Updates a unit_dict with matching id using fields from the passed dictionary.

Modify the unit where the unit_dict['id'] is the id number of the unit you want to update.

Update Behavior

You are not requried to include fields that you want to keep the same. Although including them will not cause issues.

PARAMETER DESCRIPTION
unit_dict

Dictionary with fields corresponding to the values in the database you want to update

TYPE: dict[str, Any]

RETURNS DESCRIPTION
dict[str, Any]

Dictionary with the updated fields and original unmodified fields of the unit

Examples:

>>> original_unit = client.get_unit(6)
>>> print(original_unit)
{'id': 6,
'name': 'volts',
'symbol': 'V',
'canonical': 'volts',
'base': 1.0,
'source_name': ['VPHM', 'voltage', 'VOLTS', 'VLTGE']}
>>> updated_unit = {"id":6, "name":"volt"}
>>> client.update_unit(updated_unit)
{'id': 6,
 'name': 'volt',
 'symbol': 'V',
 'canonical': 'volts',
 'base': 1.0,
 'source_name': ['VPHM', 'voltage', 'VOLTS', 'VLTGE']}

pingthings.timeseries.client.ClientEventLoop

ClientEventLoop()

The object responsible for task running.

This leverages the asyncio event loop.

Are you sure you need to manually use this?

This object is automatically created and leveraged whenever you use the standard connect or async_connect

METHOD DESCRIPTION
run_coroutine_threadsafe

A wrapper to run coroutines threadsafe.

Functions

run_coroutine_threadsafe

run_coroutine_threadsafe(
    coro: Coroutine[Any, Any, CO_RESULT_TYPE]
) -> Future[CO_RESULT_TYPE]

A wrapper to run coroutines threadsafe.