hat.event.common

Common functionality shared between clients and event server

  1"""Common functionality shared between clients and event server"""
  2
  3import time
  4
  5from hat.event.common.backend import (BackendClosedError,
  6                                      Backend,
  7                                      BackendConf,
  8                                      BackendRegisteredEventsCb,
  9                                      BackendFlushedEventsCb,
 10                                      CreateBackend,
 11                                      BackendInfo,
 12                                      import_backend_info)
 13from hat.event.common.collection import (EventTypeCollection,
 14                                         ListEventTypeCollection,
 15                                         TreeEventTypeCollection)
 16from hat.event.common.common import (json_schema_repo,
 17                                     sbs_repo,
 18                                     ServerId,
 19                                     SessionId,
 20                                     InstanceId,
 21                                     EventTypeSegment,
 22                                     EventType,
 23                                     Timestamp,
 24                                     min_timestamp,
 25                                     max_timestamp,
 26                                     Status,
 27                                     Order,
 28                                     OrderBy,
 29                                     EventId,
 30                                     EventPayloadBinary,
 31                                     EventPayloadJson,
 32                                     EventPayload,
 33                                     Event,
 34                                     RegisterEvent,
 35                                     QueryLatestParams,
 36                                     QueryTimeseriesParams,
 37                                     QueryServerParams,
 38                                     QueryParams,
 39                                     QueryResult)
 40from hat.event.common.encoder import (timestamp_to_bytes,
 41                                      timestamp_from_bytes,
 42                                      timestamp_to_float,
 43                                      timestamp_from_float,
 44                                      timestamp_to_datetime,
 45                                      timestamp_from_datetime,
 46                                      timestamp_to_sbs,
 47                                      timestamp_from_sbs,
 48                                      status_to_sbs,
 49                                      status_from_sbs,
 50                                      event_to_sbs,
 51                                      event_from_sbs,
 52                                      register_event_to_sbs,
 53                                      register_event_from_sbs,
 54                                      query_params_to_sbs,
 55                                      query_params_from_sbs,
 56                                      query_result_to_sbs,
 57                                      query_result_from_sbs,
 58                                      event_payload_to_sbs,
 59                                      event_payload_from_sbs)
 60from hat.event.common.matches import matches_query_type
 61from hat.event.common.module import (SourceType,
 62                                     Source,
 63                                     Engine,
 64                                     Module,
 65                                     ModuleConf,
 66                                     CreateModule,
 67                                     ModuleInfo,
 68                                     import_module_info)
 69from hat.event.common.subscription import (Subscription,
 70                                           create_subscription)
 71
 72
 73__all__ = ['BackendClosedError',
 74           'Backend',
 75           'BackendConf',
 76           'BackendRegisteredEventsCb',
 77           'BackendFlushedEventsCb',
 78           'CreateBackend',
 79           'BackendInfo',
 80           'import_backend_info',
 81           'EventTypeCollection',
 82           'ListEventTypeCollection',
 83           'TreeEventTypeCollection',
 84           'json_schema_repo',
 85           'sbs_repo',
 86           'ServerId',
 87           'SessionId',
 88           'InstanceId',
 89           'EventTypeSegment',
 90           'EventType',
 91           'Timestamp',
 92           'min_timestamp',
 93           'max_timestamp',
 94           'Status',
 95           'Order',
 96           'OrderBy',
 97           'EventId',
 98           'EventPayloadBinary',
 99           'EventPayloadJson',
100           'EventPayload',
101           'Event',
102           'RegisterEvent',
103           'QueryLatestParams',
104           'QueryTimeseriesParams',
105           'QueryServerParams',
106           'QueryParams',
107           'QueryResult',
108           'timestamp_to_bytes',
109           'timestamp_from_bytes',
110           'timestamp_to_float',
111           'timestamp_from_float',
112           'timestamp_to_datetime',
113           'timestamp_from_datetime',
114           'timestamp_to_sbs',
115           'timestamp_from_sbs',
116           'status_to_sbs',
117           'status_from_sbs',
118           'event_to_sbs',
119           'event_from_sbs',
120           'register_event_to_sbs',
121           'register_event_from_sbs',
122           'query_params_to_sbs',
123           'query_params_from_sbs',
124           'query_result_to_sbs',
125           'query_result_from_sbs',
126           'event_payload_to_sbs',
127           'event_payload_from_sbs',
128           'matches_query_type',
129           'SourceType',
130           'Source',
131           'Engine',
132           'Module',
133           'ModuleConf',
134           'CreateModule',
135           'ModuleInfo',
136           'import_module_info',
137           'Subscription',
138           'create_subscription',
139           'now']
140
141
142def now() -> Timestamp:
143    """Create new timestamp representing current time"""
144    return timestamp_from_float(time.time())
class BackendClosedError(builtins.Exception):
16class BackendClosedError(Exception):
17    """Backend closed"""

Backend closed

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class Backend(hat.aio.group.Resource):
20class Backend(aio.Resource):
21    """Backend ABC"""
22
23    @abc.abstractmethod
24    async def get_last_event_id(self,
25                                server_id: int
26                                ) -> EventId:
27        """Get last registered event id associated with server id"""
28
29    @abc.abstractmethod
30    async def register(self,
31                       events: Collection[Event]
32                       ) -> Collection[Event] | None:
33        """Register events"""
34
35    @abc.abstractmethod
36    async def query(self,
37                    params: QueryParams
38                    ) -> QueryResult:
39        """Query events"""
40
41    @abc.abstractmethod
42    async def flush(self):
43        """Flush internal buffers and permanently persist events"""

Backend ABC

@abc.abstractmethod
async def get_last_event_id(self, server_id: int) -> EventId:
23    @abc.abstractmethod
24    async def get_last_event_id(self,
25                                server_id: int
26                                ) -> EventId:
27        """Get last registered event id associated with server id"""

Get last registered event id associated with server id

@abc.abstractmethod
async def register( self, events: collections.abc.Collection[Event]) -> collections.abc.Collection[Event] | None:
29    @abc.abstractmethod
30    async def register(self,
31                       events: Collection[Event]
32                       ) -> Collection[Event] | None:
33        """Register events"""

Register events

@abc.abstractmethod
async def query( self, params: QueryLatestParams | QueryTimeseriesParams | QueryServerParams) -> QueryResult:
35    @abc.abstractmethod
36    async def query(self,
37                    params: QueryParams
38                    ) -> QueryResult:
39        """Query events"""

Query events

@abc.abstractmethod
async def flush(self):
41    @abc.abstractmethod
42    async def flush(self):
43        """Flush internal buffers and permanently persist events"""

Flush internal buffers and permanently persist events

Inherited Members
hat.aio.group.Resource
async_group
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
BackendConf = None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]
BackendRegisteredEventsCb = typing.Callable[[collections.abc.Collection[Event]], typing.Optional[typing.Awaitable[NoneType]]]
BackendFlushedEventsCb = typing.Callable[[collections.abc.Collection[Event]], typing.Optional[typing.Awaitable[NoneType]]]
CreateBackend = typing.Callable[[None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], typing.Optional[typing.Callable[[collections.abc.Collection[Event]], typing.Optional[typing.Awaitable[NoneType]]]], typing.Optional[typing.Callable[[collections.abc.Collection[Event]], typing.Optional[typing.Awaitable[NoneType]]]]], typing.Union[Backend, typing.Awaitable[Backend]]]
class BackendInfo(typing.NamedTuple):
67class BackendInfo(typing.NamedTuple):
68    """Backend info
69
70    Backend is implemented as python module which is dynamically imported.
71    It is expected that this module contains `info` which is instance of
72    `BackendInfo`.
73
74    If backend defines JSON schema repository and JSON schema id, JSON schema
75    repository will be used for additional validation of backend configuration
76    with JSON schema id.
77
78    """
79    create: CreateBackend
80    json_schema_id: str | None = None
81    json_schema_repo: json.SchemaRepository | None = None

Backend info

Backend is implemented as python module which is dynamically imported. It is expected that this module contains info which is instance of BackendInfo.

If backend defines JSON schema repository and JSON schema id, JSON schema repository will be used for additional validation of backend configuration with JSON schema id.

BackendInfo( create: Callable[[None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], Optional[Callable[[collections.abc.Collection[Event]], Optional[Awaitable[NoneType]]]], Optional[Callable[[collections.abc.Collection[Event]], Optional[Awaitable[NoneType]]]]], Union[Backend, Awaitable[Backend]]], json_schema_id: str | None = None, json_schema_repo: hat.json.repository.SchemaRepository | None = None)

Create new instance of BackendInfo(create, json_schema_id, json_schema_repo)

create: Callable[[None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], Optional[Callable[[collections.abc.Collection[Event]], Optional[Awaitable[NoneType]]]], Optional[Callable[[collections.abc.Collection[Event]], Optional[Awaitable[NoneType]]]]], Union[Backend, Awaitable[Backend]]]

Alias for field number 0

json_schema_id: str | None

Alias for field number 1

json_schema_repo: hat.json.repository.SchemaRepository | None

Alias for field number 2

Inherited Members
builtins.tuple
index
count
def import_backend_info(py_module_str: str) -> BackendInfo:
84def import_backend_info(py_module_str: str) -> BackendInfo:
85    """Import backend info"""
86    py_module = importlib.import_module(py_module_str)
87    info = py_module.info
88
89    if not isinstance(info, BackendInfo):
90        raise Exception('invalid backend implementation')
91
92    return info

Import backend info

class EventTypeCollection(abc.ABC, typing.Generic[~T]):
13class EventTypeCollection(abc.ABC, typing.Generic[T]):
14
15    @abc.abstractmethod
16    def __init__(self):
17        pass
18
19    @abc.abstractmethod
20    def add(self, subscription: Subscription, value: T):
21        pass
22
23    @abc.abstractmethod
24    def remove(self, value: T):
25        pass
26
27    @abc.abstractmethod
28    def get(self, event_type: EventType) -> Iterable[T]:
29        pass

Helper class that provides a standard way to create an ABC using inheritance.

@abc.abstractmethod
def add( self, subscription: Subscription, value: ~T):
19    @abc.abstractmethod
20    def add(self, subscription: Subscription, value: T):
21        pass
@abc.abstractmethod
def remove(self, value: ~T):
23    @abc.abstractmethod
24    def remove(self, value: T):
25        pass
@abc.abstractmethod
def get(self, event_type: tuple[str, ...]) -> collections.abc.Iterable[~T]:
27    @abc.abstractmethod
28    def get(self, event_type: EventType) -> Iterable[T]:
29        pass
class ListEventTypeCollection(abc.ABC, typing.Generic[~T]):
 5class ListEventTypeCollection(common.EventTypeCollection):
 6
 7    def __init__(self):
 8        self._values = {}
 9
10    def add(self, subscription, value):
11        self._values[value] = (self._values[value].union(subscription)
12                               if value in self._values else subscription)
13
14    def remove(self, value):
15        self._values.pop(value, None)
16
17    def get(self, event_type):
18        for value, subscription in self._values.items():
19            if subscription.matches(event_type):
20                yield value

Helper class that provides a standard way to create an ABC using inheritance.

def add(self, subscription, value):
10    def add(self, subscription, value):
11        self._values[value] = (self._values[value].union(subscription)
12                               if value in self._values else subscription)
def remove(self, value):
14    def remove(self, value):
15        self._values.pop(value, None)
def get(self, event_type):
17    def get(self, event_type):
18        for value, subscription in self._values.items():
19            if subscription.matches(event_type):
20                yield value
class TreeEventTypeCollection(abc.ABC, typing.Generic[~T]):
 8class TreeEventTypeCollection(common.EventTypeCollection):
 9
10    def __init__(self):
11        self._root = _create_node()
12        self._value_nodes = collections.defaultdict(collections.deque)
13
14    def add(self, subscription, value):
15        for query_type in subscription.get_query_types():
16            node = self._root
17            rest = query_type
18
19            while rest:
20                head, rest = rest[0], rest[1:]
21                if head == '*' and rest:
22                    raise ValueError('invalid query type')
23
24                node = node.children[head]
25
26            if value in node.values:
27                return
28
29            node.values.add(value)
30            self._value_nodes[value].append(node)
31
32    def remove(self, value):
33        for node in self._value_nodes.pop(value, []):
34            node.values.remove(value)
35
36    def get(self, event_type):
37        return set(_get(self._root, event_type))

Helper class that provides a standard way to create an ABC using inheritance.

def add(self, subscription, value):
14    def add(self, subscription, value):
15        for query_type in subscription.get_query_types():
16            node = self._root
17            rest = query_type
18
19            while rest:
20                head, rest = rest[0], rest[1:]
21                if head == '*' and rest:
22                    raise ValueError('invalid query type')
23
24                node = node.children[head]
25
26            if value in node.values:
27                return
28
29            node.values.add(value)
30            self._value_nodes[value].append(node)
def remove(self, value):
32    def remove(self, value):
33        for node in self._value_nodes.pop(value, []):
34            node.values.remove(value)
def get(self, event_type):
36    def get(self, event_type):
37        return set(_get(self._root, event_type))
json_schema_repo = <hat.json.repository.SchemaRepository object>
sbs_repo = <hat.sbs.repository.Repository object>
ServerId = <class 'int'>
SessionId = <class 'int'>
InstanceId = <class 'int'>
EventTypeSegment = <class 'str'>
EventType = tuple[str, ...]
class Timestamp(typing.NamedTuple):
40class Timestamp(typing.NamedTuple):
41    s: int
42    """seconds since 1970-01-01 (can be negative)"""
43    us: int
44    """microseconds added to timestamp seconds in range [0, 1e6)"""
45
46    def add(self, s: float) -> 'Timestamp':
47        """Create new timestamp by adding seconds to existing timestamp"""
48        us = self.us + round((s - int(s)) * 1e6)
49        s = self.s + int(s)
50        return Timestamp(s=s + us // int(1e6),
51                         us=us % int(1e6))

Timestamp(s, us)

Timestamp(s: int, us: int)

Create new instance of Timestamp(s, us)

s: int

seconds since 1970-01-01 (can be negative)

us: int

microseconds added to timestamp seconds in range [0, 1e6)

def add(self, s: float) -> Timestamp:
46    def add(self, s: float) -> 'Timestamp':
47        """Create new timestamp by adding seconds to existing timestamp"""
48        us = self.us + round((s - int(s)) * 1e6)
49        s = self.s + int(s)
50        return Timestamp(s=s + us // int(1e6),
51                         us=us % int(1e6))

Create new timestamp by adding seconds to existing timestamp

Inherited Members
builtins.tuple
index
count
min_timestamp = Timestamp(s=-9223372036854775808, us=0)
max_timestamp = Timestamp(s=9223372036854775807, us=999999)
class Status(enum.Enum):
61class Status(enum.Enum):
62    STANDBY = 'standby'
63    OPERATIONAL = 'operational'

An enumeration.

STANDBY = <Status.STANDBY: 'standby'>
OPERATIONAL = <Status.OPERATIONAL: 'operational'>
Inherited Members
enum.Enum
name
value
class Order(enum.Enum):
66class Order(enum.Enum):
67    DESCENDING = 'descending'
68    ASCENDING = 'ascending'

An enumeration.

DESCENDING = <Order.DESCENDING: 'descending'>
ASCENDING = <Order.ASCENDING: 'ascending'>
Inherited Members
enum.Enum
name
value
class OrderBy(enum.Enum):
71class OrderBy(enum.Enum):
72    TIMESTAMP = 'timestamp'
73    SOURCE_TIMESTAMP = 'sourceTimestamp'

An enumeration.

TIMESTAMP = <OrderBy.TIMESTAMP: 'timestamp'>
SOURCE_TIMESTAMP = <OrderBy.SOURCE_TIMESTAMP: 'sourceTimestamp'>
Inherited Members
enum.Enum
name
value
class EventId(typing.NamedTuple):
76class EventId(typing.NamedTuple):
77    server: ServerId
78    session: SessionId
79    instance: InstanceId

EventId(server, session, instance)

EventId(server: int, session: int, instance: int)

Create new instance of EventId(server, session, instance)

server: int

Alias for field number 0

session: int

Alias for field number 1

instance: int

Alias for field number 2

Inherited Members
builtins.tuple
index
count
class EventPayloadBinary(typing.NamedTuple):
82class EventPayloadBinary(typing.NamedTuple):
83    type: str
84    data: util.Bytes

EventPayloadBinary(type, data)

EventPayloadBinary(type: str, data: bytes | bytearray | memoryview)

Create new instance of EventPayloadBinary(type, data)

type: str

Alias for field number 0

data: bytes | bytearray | memoryview

Alias for field number 1

Inherited Members
builtins.tuple
index
count
class EventPayloadJson(typing.NamedTuple):
87class EventPayloadJson(typing.NamedTuple):
88    data: json.Data

EventPayloadJson(data,)

EventPayloadJson( data: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')])

Create new instance of EventPayloadJson(data,)

data: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]

Alias for field number 0

Inherited Members
builtins.tuple
index
count
class Event(typing.NamedTuple):
 94class Event(typing.NamedTuple):
 95    """Event
 96
 97    Operators `>` and `<` test for natural order where it is assumed that
 98    first operand is registered before second operand.
 99
100    """
101
102    id: EventId
103    type: EventType
104    timestamp: Timestamp
105    source_timestamp: Timestamp | None
106    payload: EventPayload | None
107
108    def __lt__(self, other):
109        if not isinstance(other, Event):
110            return NotImplemented
111
112        if self.id == other.id:
113            return False
114
115        if self.id.server == other.id.server:
116            return self.id < other.id
117
118        if self.timestamp != other.timestamp:
119            return self.timestamp < other.timestamp
120
121        return True
122
123    def __gt__(self, other):
124        if not isinstance(other, Event):
125            return NotImplemented
126
127        if self.id == other.id:
128            return False
129
130        if self.id.server == other.id.server:
131            return self.id > other.id
132
133        if self.timestamp != other.timestamp:
134            return self.timestamp > other.timestamp
135
136        return False

Event

Operators > and < test for natural order where it is assumed that first operand is registered before second operand.

Event( id: EventId, type: tuple[str, ...], timestamp: Timestamp, source_timestamp: Timestamp | None, payload: EventPayloadBinary | EventPayloadJson | None)

Create new instance of Event(id, type, timestamp, source_timestamp, payload)

id: EventId

Alias for field number 0

type: tuple[str, ...]

Alias for field number 1

timestamp: Timestamp

Alias for field number 2

source_timestamp: Timestamp | None

Alias for field number 3

Alias for field number 4

Inherited Members
builtins.tuple
index
count
class RegisterEvent(typing.NamedTuple):
139class RegisterEvent(typing.NamedTuple):
140    type: EventType
141    source_timestamp: Timestamp | None
142    payload: EventPayload | None

RegisterEvent(type, source_timestamp, payload)

RegisterEvent( type: tuple[str, ...], source_timestamp: Timestamp | None, payload: EventPayloadBinary | EventPayloadJson | None)

Create new instance of RegisterEvent(type, source_timestamp, payload)

type: tuple[str, ...]

Alias for field number 0

source_timestamp: Timestamp | None

Alias for field number 1

Alias for field number 2

Inherited Members
builtins.tuple
index
count
class QueryLatestParams(typing.NamedTuple):
145class QueryLatestParams(typing.NamedTuple):
146    event_types: Collection[EventType] | None = None

QueryLatestParams(event_types,)

QueryLatestParams( event_types: collections.abc.Collection[tuple[str, ...]] | None = None)

Create new instance of QueryLatestParams(event_types,)

event_types: collections.abc.Collection[tuple[str, ...]] | None

Alias for field number 0

Inherited Members
builtins.tuple
index
count
class QueryTimeseriesParams(typing.NamedTuple):
149class QueryTimeseriesParams(typing.NamedTuple):
150    event_types: Collection[EventType] | None = None
151    t_from: Timestamp | None = None
152    t_to: Timestamp | None = None
153    source_t_from: Timestamp | None = None
154    source_t_to: Timestamp | None = None
155    order: Order = Order.DESCENDING
156    order_by: OrderBy = OrderBy.TIMESTAMP
157    max_results: int | None = None
158    last_event_id: EventId | None = None

QueryTimeseriesParams(event_types, t_from, t_to, source_t_from, source_t_to, order, order_by, max_results, last_event_id)

QueryTimeseriesParams( event_types: collections.abc.Collection[tuple[str, ...]] | None = None, t_from: Timestamp | None = None, t_to: Timestamp | None = None, source_t_from: Timestamp | None = None, source_t_to: Timestamp | None = None, order: Order = <Order.DESCENDING: 'descending'>, order_by: OrderBy = <OrderBy.TIMESTAMP: 'timestamp'>, max_results: int | None = None, last_event_id: EventId | None = None)

Create new instance of QueryTimeseriesParams(event_types, t_from, t_to, source_t_from, source_t_to, order, order_by, max_results, last_event_id)

event_types: collections.abc.Collection[tuple[str, ...]] | None

Alias for field number 0

t_from: Timestamp | None

Alias for field number 1

t_to: Timestamp | None

Alias for field number 2

source_t_from: Timestamp | None

Alias for field number 3

source_t_to: Timestamp | None

Alias for field number 4

order: Order

Alias for field number 5

order_by: OrderBy

Alias for field number 6

max_results: int | None

Alias for field number 7

last_event_id: EventId | None

Alias for field number 8

Inherited Members
builtins.tuple
index
count
class QueryServerParams(typing.NamedTuple):
161class QueryServerParams(typing.NamedTuple):
162    server_id: ServerId
163    persisted: bool = False
164    max_results: int | None = None
165    last_event_id: EventId | None = None

QueryServerParams(server_id, persisted, max_results, last_event_id)

QueryServerParams( server_id: int, persisted: bool = False, max_results: int | None = None, last_event_id: EventId | None = None)

Create new instance of QueryServerParams(server_id, persisted, max_results, last_event_id)

server_id: int

Alias for field number 0

persisted: bool

Alias for field number 1

max_results: int | None

Alias for field number 2

last_event_id: EventId | None

Alias for field number 3

Inherited Members
builtins.tuple
index
count
class QueryResult(typing.NamedTuple):
173class QueryResult(typing.NamedTuple):
174    events: Collection[Event]
175    more_follows: bool

QueryResult(events, more_follows)

QueryResult( events: collections.abc.Collection[Event], more_follows: bool)

Create new instance of QueryResult(events, more_follows)

events: collections.abc.Collection[Event]

Alias for field number 0

more_follows: bool

Alias for field number 1

Inherited Members
builtins.tuple
index
count
def timestamp_to_bytes(t: Timestamp) -> bytes | bytearray | memoryview:
26def timestamp_to_bytes(t: Timestamp) -> util.Bytes:
27    """Convert timestamp to 12 byte representation
28
29    Bytes [0, 8] are big endian unsigned `Timestamp.s` + 2^63 and
30    bytes [9, 12] are big endian unsigned `Timestamp.us`.
31
32    """
33    return struct.pack(">QI", t.s + (1 << 63), t.us)

Convert timestamp to 12 byte representation

Bytes [0, 8] are big endian unsigned Timestamp.s + 2^63 and bytes [9, 12] are big endian unsigned Timestamp.us.

def timestamp_from_bytes( data: bytes | bytearray | memoryview) -> Timestamp:
36def timestamp_from_bytes(data: util.Bytes) -> Timestamp:
37    """Create new timestamp from 12 byte representation
38
39    Bytes representation is same as defined for `timestamp_to_bytes` function.
40
41    """
42    s, us = struct.unpack(">QI", data)
43    return Timestamp(s - (1 << 63), us)

Create new timestamp from 12 byte representation

Bytes representation is same as defined for timestamp_to_bytes function.

def timestamp_to_float(t: Timestamp) -> float:
46def timestamp_to_float(t: Timestamp) -> float:
47    """Convert timestamp to floating number of seconds since 1970-01-01 UTC
48
49    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.
50
51    """
52    return t.s + t.us * 1E-6

Convert timestamp to floating number of seconds since 1970-01-01 UTC

For precise serialization see timestamp_to_bytes/timestamp_from_bytes.

def timestamp_from_float(ts: float) -> Timestamp:
55def timestamp_from_float(ts: float) -> Timestamp:
56    """Create timestamp from floating number of seconds since 1970-01-01 UTC
57
58    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.
59
60    """
61    s = int(ts)
62    if ts < 0:
63        s = s - 1
64
65    us = round((ts - s) * 1E6)
66
67    if us == 1_000_000:
68        return Timestamp(s + 1, 0)
69
70    else:
71        return Timestamp(s, us)

Create timestamp from floating number of seconds since 1970-01-01 UTC

For precise serialization see timestamp_to_bytes/timestamp_from_bytes.

def timestamp_to_datetime(t: Timestamp) -> datetime.datetime:
74def timestamp_to_datetime(t: Timestamp) -> datetime.datetime:
75    """Convert timestamp to datetime (representing utc time)
76
77    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.
78
79    """
80    try:
81        dt_from_s = datetime.datetime.fromtimestamp(t.s, datetime.timezone.utc)
82
83    except OSError:
84        dt_from_s = (
85            datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) +
86            datetime.timedelta(seconds=t.s))
87
88    return datetime.datetime(
89        year=dt_from_s.year,
90        month=dt_from_s.month,
91        day=dt_from_s.day,
92        hour=dt_from_s.hour,
93        minute=dt_from_s.minute,
94        second=dt_from_s.second,
95        microsecond=t.us,
96        tzinfo=datetime.timezone.utc)

Convert timestamp to datetime (representing utc time)

For precise serialization see timestamp_to_bytes/timestamp_from_bytes.

def timestamp_from_datetime(dt: datetime.datetime) -> Timestamp:
 99def timestamp_from_datetime(dt: datetime.datetime) -> Timestamp:
100    """Create new timestamp from datetime
101
102    If `tzinfo` is not set, it is assumed that provided datetime represents
103    utc time.
104
105    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.
106
107    """
108    if not dt.tzinfo:
109        dt = dt.replace(tzinfo=datetime.timezone.utc)
110
111    s = int(dt.timestamp())
112
113    if dt.timestamp() < 0:
114        s = s - 1
115
116    return Timestamp(s=s, us=dt.microsecond)

Create new timestamp from datetime

If tzinfo is not set, it is assumed that provided datetime represents utc time.

For precise serialization see timestamp_to_bytes/timestamp_from_bytes.

def timestamp_to_sbs( t: Timestamp) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
119def timestamp_to_sbs(t: Timestamp) -> sbs.Data:
120    """Convert timestamp to SBS data"""
121    return {'s': t.s, 'us': t.us}

Convert timestamp to SBS data

def timestamp_from_sbs( data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> Timestamp:
124def timestamp_from_sbs(data: sbs.Data) -> Timestamp:
125    """Create new timestamp from SBS data"""
126    return Timestamp(s=data['s'], us=data['us'])

Create new timestamp from SBS data

def status_to_sbs( status: Status) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
129def status_to_sbs(status: Status) -> sbs.Data:
130    """Convert Status to SBS data"""
131    return status.value, None

Convert Status to SBS data

def status_from_sbs( status: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> Status:
134def status_from_sbs(status: sbs.Data) -> Status:
135    """Create Status based on SBS data"""
136    return Status(status[0])

Create Status based on SBS data

def event_to_sbs( event: Event) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
139def event_to_sbs(event: Event) -> sbs.Data:
140    """Convert Event to SBS data"""
141    return {'id': _event_id_to_sbs(event.id),
142            'type': list(event.type),
143            'timestamp': timestamp_to_sbs(event.timestamp),
144            'sourceTimestamp': _optional_to_sbs(event.source_timestamp,
145                                                timestamp_to_sbs),
146            'payload': _optional_to_sbs(event.payload, event_payload_to_sbs)}

Convert Event to SBS data

def event_from_sbs( data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> Event:
149def event_from_sbs(data: sbs.Data) -> Event:
150    """Create Event based on SBS data"""
151    return Event(id=_event_id_from_sbs(data['id']),
152                 type=tuple(data['type']),
153                 timestamp=timestamp_from_sbs(data['timestamp']),
154                 source_timestamp=_optional_from_sbs(data['sourceTimestamp'],
155                                                     timestamp_from_sbs),
156                 payload=_optional_from_sbs(data['payload'],
157                                            event_payload_from_sbs))

Create Event based on SBS data

def register_event_to_sbs( event: RegisterEvent) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
160def register_event_to_sbs(event: RegisterEvent) -> sbs.Data:
161    """Convert RegisterEvent to SBS data"""
162    return {'type': list(event.type),
163            'sourceTimestamp': _optional_to_sbs(event.source_timestamp,
164                                                timestamp_to_sbs),
165            'payload': _optional_to_sbs(event.payload, event_payload_to_sbs)}

Convert RegisterEvent to SBS data

def register_event_from_sbs( data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> RegisterEvent:
168def register_event_from_sbs(data: sbs.Data) -> RegisterEvent:
169    """Create RegisterEvent based on SBS data"""
170    return RegisterEvent(
171        type=tuple(data['type']),
172        source_timestamp=_optional_from_sbs(data['sourceTimestamp'],
173                                            timestamp_from_sbs),
174        payload=_optional_from_sbs(data['payload'], event_payload_from_sbs))

Create RegisterEvent based on SBS data

def query_params_to_sbs( params: QueryLatestParams | QueryTimeseriesParams | QueryServerParams) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
177def query_params_to_sbs(params: QueryParams) -> sbs.Data:
178    """Convert QueryParams to SBS data"""
179    if isinstance(params, QueryLatestParams):
180        return 'latest', {
181            'eventTypes': _optional_to_sbs(params.event_types,
182                                           _event_types_to_sbs)}
183
184    if isinstance(params, QueryTimeseriesParams):
185        return 'timeseries', {
186            'eventTypes': _optional_to_sbs(params.event_types,
187                                           _event_types_to_sbs),
188            'tFrom': _optional_to_sbs(params.t_from, timestamp_to_sbs),
189            'tTo': _optional_to_sbs(params.t_to, timestamp_to_sbs),
190            'sourceTFrom': _optional_to_sbs(params.source_t_from,
191                                            timestamp_to_sbs),
192            'sourceTTo': _optional_to_sbs(params.source_t_to,
193                                          timestamp_to_sbs),
194            'order': (params.order.value, None),
195            'orderBy': (params.order_by.value, None),
196            'maxResults': _optional_to_sbs(params.max_results),
197            'lastEventId': _optional_to_sbs(params.last_event_id,
198                                            _event_id_to_sbs)}
199
200    if isinstance(params, QueryServerParams):
201        return 'server', {
202            'serverId': params.server_id,
203            'persisted': params.persisted,
204            'maxResults': _optional_to_sbs(params.max_results),
205            'lastEventId': _optional_to_sbs(params.last_event_id,
206                                            _event_id_to_sbs)}
207
208    raise ValueError('unsupported params type')

Convert QueryParams to SBS data

def query_params_from_sbs( data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> QueryLatestParams | QueryTimeseriesParams | QueryServerParams:
211def query_params_from_sbs(data: sbs.Data) -> QueryParams:
212    """Create QueryParams based on SBS data"""
213    if data[0] == 'latest':
214        return QueryLatestParams(
215            event_types=_optional_from_sbs(data[1]['eventTypes'],
216                                           _event_types_from_sbs))
217
218    if data[0] == 'timeseries':
219        return QueryTimeseriesParams(
220            event_types=_optional_from_sbs(data[1]['eventTypes'],
221                                           _event_types_from_sbs),
222            t_from=_optional_from_sbs(data[1]['tFrom'], timestamp_from_sbs),
223            t_to=_optional_from_sbs(data[1]['tTo'], timestamp_from_sbs),
224            source_t_from=_optional_from_sbs(data[1]['sourceTFrom'],
225                                             timestamp_from_sbs),
226            source_t_to=_optional_from_sbs(data[1]['sourceTTo'],
227                                           timestamp_from_sbs),
228            order=Order(data[1]['order'][0]),
229            order_by=OrderBy(data[1]['orderBy'][0]),
230            max_results=_optional_from_sbs(data[1]['maxResults']),
231            last_event_id=_optional_from_sbs(data[1]['lastEventId'],
232                                             _event_id_from_sbs))
233
234    if data[0] == 'server':
235        return QueryServerParams(
236            server_id=data[1]['serverId'],
237            persisted=data[1]['persisted'],
238            max_results=_optional_from_sbs(data[1]['maxResults']),
239            last_event_id=_optional_from_sbs(data[1]['lastEventId'],
240                                             _event_id_from_sbs))
241
242    raise ValueError('unsupported params type')

Create QueryParams based on SBS data

def query_result_to_sbs( result: QueryResult) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
245def query_result_to_sbs(result: QueryResult) -> sbs.Data:
246    """Convert QueryResult to SBS data"""
247    return {'events': [event_to_sbs(event) for event in result.events],
248            'moreFollows': result.more_follows}

Convert QueryResult to SBS data

def query_result_from_sbs( data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> QueryResult:
251def query_result_from_sbs(data: sbs.Data) -> QueryResult:
252    """Create QueryResult based on SBS data"""
253    return QueryResult(events=[event_from_sbs(i) for i in data['events']],
254                       more_follows=data['moreFollows'])

Create QueryResult based on SBS data

def event_payload_to_sbs( payload: EventPayloadBinary | EventPayloadJson) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
257def event_payload_to_sbs(payload: EventPayload) -> sbs.Data:
258    """Convert EventPayload to SBS data"""
259    if isinstance(payload, EventPayloadBinary):
260        return 'binary', {'type': payload.type,
261                          'data': payload.data}
262
263    if isinstance(payload, EventPayloadJson):
264        return 'json', json.encode(payload.data)
265
266    raise ValueError('unsupported payload type')

Convert EventPayload to SBS data

def event_payload_from_sbs( data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> EventPayloadBinary | EventPayloadJson:
269def event_payload_from_sbs(data: sbs.Data) -> EventPayload:
270    """Create EventPayload based on SBS data"""
271    if data[0] == 'binary':
272        return EventPayloadBinary(type=data[1]['type'],
273                                  data=data[1]['data'])
274
275    if data[0] == 'json':
276        return EventPayloadJson(data=json.decode(data[1]))
277
278    raise ValueError('unsupported payload type')

Create EventPayload based on SBS data

def matches_query_type(event_type: tuple[str, ...], query_type: tuple[str, ...]) -> bool:
 5def matches_query_type(event_type: EventType,
 6                       query_type: EventType
 7                       ) -> bool:
 8    """Determine if event type matches query type
 9
10    Event type is tested if it matches query type according to the following
11    rules:
12
13        * Matching is performed on subtypes in increasing order.
14        * Event type is a match only if all its subtypes are matched by
15          corresponding query subtypes.
16        * Matching is finished when all query subtypes are exhausted.
17        * Query subtype '?' matches exactly one event subtype of any value.
18          The subtype must exist.
19        * Query subtype '*' matches 0 or more event subtypes of any value. It
20          must be the last query subtype.
21        * All other values of query subtype match exactly one event subtype
22          of the same value.
23        * Query type without subtypes is matched only by event type with no
24          subtypes.
25
26    As a consequence of aforementioned matching rules, event subtypes '*' and
27    '?' cannot be directly matched and it is advisable not to use them in event
28    types.
29
30    """
31    is_variable = bool(query_type and query_type[-1] == '*')
32    if is_variable:
33        query_type = query_type[:-1]
34
35    if len(event_type) < len(query_type):
36        return False
37
38    if len(event_type) > len(query_type) and not is_variable:
39        return False
40
41    for i, j in zip(event_type, query_type):
42        if j != '?' and i != j:
43            return False
44
45    return True

Determine if event type matches query type

Event type is tested if it matches query type according to the following rules:

* Matching is performed on subtypes in increasing order.
* Event type is a match only if all its subtypes are matched by
  corresponding query subtypes.
* Matching is finished when all query subtypes are exhausted.
* Query subtype '?' matches exactly one event subtype of any value.
  The subtype must exist.
* Query subtype '*' matches 0 or more event subtypes of any value. It
  must be the last query subtype.
* All other values of query subtype match exactly one event subtype
  of the same value.
* Query type without subtypes is matched only by event type with no
  subtypes.

As a consequence of aforementioned matching rules, event subtypes '*' and '?' cannot be directly matched and it is advisable not to use them in event types.

class SourceType(enum.Enum):
18class SourceType(enum.Enum):
19    EVENTER = 1
20    MODULE = 2
21    ENGINE = 3
22    SERVER = 4

An enumeration.

EVENTER = <SourceType.EVENTER: 1>
MODULE = <SourceType.MODULE: 2>
ENGINE = <SourceType.ENGINE: 3>
SERVER = <SourceType.SERVER: 4>
Inherited Members
enum.Enum
name
value
class Source(typing.NamedTuple):
25class Source(typing.NamedTuple):
26    type: SourceType
27    id: int

Source(type, id)

Source(type: SourceType, id: int)

Create new instance of Source(type, id)

type: SourceType

Alias for field number 0

id: int

Alias for field number 1

Inherited Members
builtins.tuple
index
count
class Engine(hat.aio.group.Resource):
30class Engine(aio.Resource):
31    """Engine ABC"""
32
33    @abc.abstractmethod
34    async def register(self,
35                       source: Source,
36                       events: Collection[RegisterEvent]
37                       ) -> Collection[Event] | None:
38        """Register events"""
39
40    @abc.abstractmethod
41    async def query(self,
42                    params: QueryParams
43                    ) -> QueryResult:
44        """Query events"""

Engine ABC

@abc.abstractmethod
async def register( self, source: Source, events: collections.abc.Collection[RegisterEvent]) -> collections.abc.Collection[Event] | None:
33    @abc.abstractmethod
34    async def register(self,
35                       source: Source,
36                       events: Collection[RegisterEvent]
37                       ) -> Collection[Event] | None:
38        """Register events"""

Register events

@abc.abstractmethod
async def query( self, params: QueryLatestParams | QueryTimeseriesParams | QueryServerParams) -> QueryResult:
40    @abc.abstractmethod
41    async def query(self,
42                    params: QueryParams
43                    ) -> QueryResult:
44        """Query events"""

Query events

Inherited Members
hat.aio.group.Resource
async_group
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
class Module(hat.aio.group.Resource):
47class Module(aio.Resource):
48    """Module ABC"""
49
50    @property
51    @abc.abstractmethod
52    def subscription(self) -> Subscription:
53        """Subscribed event types filter.
54
55        `subscription` is constant during module's lifetime.
56
57        """
58
59    async def on_session_start(self, session_id: int):
60        """Called on start of a session, identified by session_id.
61
62        This method can be coroutine or regular function.
63
64        """
65
66    async def on_session_stop(self, session_id: int):
67        """Called on stop of a session, identified by session_id.
68
69        This method can be coroutine or regular function.
70
71        """
72
73    @abc.abstractmethod
74    async def process(self,
75                      source: Source,
76                      event: Event
77                      ) -> Iterable[RegisterEvent] | None:
78        """Process new session event.
79
80        Provided event is matched by modules subscription filter.
81
82        Processing of session event can result in registration of
83        new register events.
84
85        Single module session process is always called sequentially.
86
87        This method can be coroutine or regular function.
88
89        """

Module ABC

subscription: Subscription
50    @property
51    @abc.abstractmethod
52    def subscription(self) -> Subscription:
53        """Subscribed event types filter.
54
55        `subscription` is constant during module's lifetime.
56
57        """

Subscribed event types filter.

subscription is constant during module's lifetime.

async def on_session_start(self, session_id: int):
59    async def on_session_start(self, session_id: int):
60        """Called on start of a session, identified by session_id.
61
62        This method can be coroutine or regular function.
63
64        """

Called on start of a session, identified by session_id.

This method can be coroutine or regular function.

async def on_session_stop(self, session_id: int):
66    async def on_session_stop(self, session_id: int):
67        """Called on stop of a session, identified by session_id.
68
69        This method can be coroutine or regular function.
70
71        """

Called on stop of a session, identified by session_id.

This method can be coroutine or regular function.

@abc.abstractmethod
async def process( self, source: Source, event: Event) -> collections.abc.Iterable[RegisterEvent] | None:
73    @abc.abstractmethod
74    async def process(self,
75                      source: Source,
76                      event: Event
77                      ) -> Iterable[RegisterEvent] | None:
78        """Process new session event.
79
80        Provided event is matched by modules subscription filter.
81
82        Processing of session event can result in registration of
83        new register events.
84
85        Single module session process is always called sequentially.
86
87        This method can be coroutine or regular function.
88
89        """

Process new session event.

Provided event is matched by modules subscription filter.

Processing of session event can result in registration of new register events.

Single module session process is always called sequentially.

This method can be coroutine or regular function.

Inherited Members
hat.aio.group.Resource
async_group
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
ModuleConf = None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]
CreateModule = typing.Callable[[None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], Engine, Source], typing.Union[Module, typing.Awaitable[Module]]]
class ModuleInfo(typing.NamedTuple):
101class ModuleInfo(typing.NamedTuple):
102    """Module info
103
104    Module is implemented as python module which is dynamically imported.
105    It is expected that this module contains `info` which is instance of
106    `ModuleInfo`.
107
108    If module defines JSON schema repository and JSON schema id, JSON schema
109    repository will be used for additional validation of module configuration
110    with JSON schema id.
111
112    """
113    create: CreateModule
114    json_schema_id: str | None = None
115    json_schema_repo: json.SchemaRepository | None = None

Module info

Module is implemented as python module which is dynamically imported. It is expected that this module contains info which is instance of ModuleInfo.

If module defines JSON schema repository and JSON schema id, JSON schema repository will be used for additional validation of module configuration with JSON schema id.

ModuleInfo( create: Callable[[None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], Engine, Source], Union[Module, Awaitable[Module]]], json_schema_id: str | None = None, json_schema_repo: hat.json.repository.SchemaRepository | None = None)

Create new instance of ModuleInfo(create, json_schema_id, json_schema_repo)

create: Callable[[None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], Engine, Source], Union[Module, Awaitable[Module]]]

Alias for field number 0

json_schema_id: str | None

Alias for field number 1

json_schema_repo: hat.json.repository.SchemaRepository | None

Alias for field number 2

Inherited Members
builtins.tuple
index
count
def import_module_info(py_module_str: str) -> ModuleInfo:
118def import_module_info(py_module_str: str) -> ModuleInfo:
119    """Import module info"""
120    py_module = importlib.import_module(py_module_str)
121    info = py_module.info
122
123    if not isinstance(info, ModuleInfo):
124        raise Exception('invalid module implementation')
125
126    return info

Import module info

class Subscription(abc.ABC):
10class Subscription(abc.ABC):
11    """Subscription defined by query event types"""
12
13    @abc.abstractmethod
14    def __init__(self, query_types: Iterable[EventType]):
15        """Create subscription instance"""
16
17    @abc.abstractmethod
18    def get_query_types(self) -> Iterable[EventType]:
19        """Calculate sanitized query event types"""
20
21    @abc.abstractmethod
22    def matches(self, event_type: EventType) -> bool:
23        """Does `event_type` match subscription"""
24
25    @abc.abstractmethod
26    def union(self, *others: 'Subscription') -> 'Subscription':
27        """Create new subscription including event types from this and
28        other subscriptions."""
29
30    @abc.abstractmethod
31    def intersection(self, *others: 'Subscription') -> 'Subscription':
32        """Create new subscription containing event types in common with
33        other subscriptions."""
34
35    @abc.abstractmethod
36    def isdisjoint(self, other: 'Subscription') -> bool:
37        """Return ``True`` if this subscription has no event types in common
38        with other subscription."""

Subscription defined by query event types

@abc.abstractmethod
Subscription(query_types: collections.abc.Iterable[tuple[str, ...]])
13    @abc.abstractmethod
14    def __init__(self, query_types: Iterable[EventType]):
15        """Create subscription instance"""

Create subscription instance

@abc.abstractmethod
def get_query_types(self) -> collections.abc.Iterable[tuple[str, ...]]:
17    @abc.abstractmethod
18    def get_query_types(self) -> Iterable[EventType]:
19        """Calculate sanitized query event types"""

Calculate sanitized query event types

@abc.abstractmethod
def matches(self, event_type: tuple[str, ...]) -> bool:
21    @abc.abstractmethod
22    def matches(self, event_type: EventType) -> bool:
23        """Does `event_type` match subscription"""

Does event_type match subscription

@abc.abstractmethod
def union( self, *others: Subscription) -> Subscription:
25    @abc.abstractmethod
26    def union(self, *others: 'Subscription') -> 'Subscription':
27        """Create new subscription including event types from this and
28        other subscriptions."""

Create new subscription including event types from this and other subscriptions.

@abc.abstractmethod
def intersection( self, *others: Subscription) -> Subscription:
30    @abc.abstractmethod
31    def intersection(self, *others: 'Subscription') -> 'Subscription':
32        """Create new subscription containing event types in common with
33        other subscriptions."""

Create new subscription containing event types in common with other subscriptions.

@abc.abstractmethod
def isdisjoint(self, other: Subscription) -> bool:
35    @abc.abstractmethod
36    def isdisjoint(self, other: 'Subscription') -> bool:
37        """Return ``True`` if this subscription has no event types in common
38        with other subscription."""

Return True if this subscription has no event types in common with other subscription.

def create_subscription( query_types: collections.abc.Iterable[tuple[str, ...]]) -> Subscription:
19def create_subscription(query_types: Iterable[EventType]) -> Subscription:
20    if CSubscription is not None:
21        return CSubscription(query_types)
22
23    return PySubscription(query_types)
def now() -> Timestamp:
143def now() -> Timestamp:
144    """Create new timestamp representing current time"""
145    return timestamp_from_float(time.time())

Create new timestamp representing current time