hat.event.common

Common functionality shared between clients and event server

  1"""Common functionality shared between clients and event server"""
  2
  3import time
  4
  5from hat.event.common.backend import (BackendClosedError,
  6                                      Backend,
  7                                      BackendConf,
  8                                      BackendRegisteredEventsCb,
  9                                      BackendFlushedEventsCb,
 10                                      CreateBackend,
 11                                      BackendInfo,
 12                                      import_backend_info)
 13from hat.event.common.collection import (EventTypeCollection,
 14                                         ListEventTypeCollection,
 15                                         TreeEventTypeCollection)
 16from hat.event.common.common import (json_schema_repo,
 17                                     sbs_repo,
 18                                     ServerId,
 19                                     SessionId,
 20                                     InstanceId,
 21                                     EventTypeSegment,
 22                                     EventType,
 23                                     Timestamp,
 24                                     min_timestamp,
 25                                     max_timestamp,
 26                                     Status,
 27                                     Order,
 28                                     OrderBy,
 29                                     EventId,
 30                                     EventPayloadBinary,
 31                                     EventPayloadJson,
 32                                     EventPayload,
 33                                     Event,
 34                                     RegisterEvent,
 35                                     QueryLatestParams,
 36                                     QueryTimeseriesParams,
 37                                     QueryServerParams,
 38                                     QueryParams,
 39                                     QueryResult)
 40from hat.event.common.encoder import (timestamp_to_bytes,
 41                                      timestamp_from_bytes,
 42                                      timestamp_to_float,
 43                                      timestamp_from_float,
 44                                      timestamp_to_datetime,
 45                                      timestamp_from_datetime,
 46                                      timestamp_to_sbs,
 47                                      timestamp_from_sbs,
 48                                      status_to_sbs,
 49                                      status_from_sbs,
 50                                      event_to_sbs,
 51                                      event_from_sbs,
 52                                      register_event_to_sbs,
 53                                      register_event_from_sbs,
 54                                      query_params_to_sbs,
 55                                      query_params_from_sbs,
 56                                      query_result_to_sbs,
 57                                      query_result_from_sbs,
 58                                      event_payload_to_sbs,
 59                                      event_payload_from_sbs)
 60from hat.event.common.matches import matches_query_type
 61from hat.event.common.module import (SourceType,
 62                                     Source,
 63                                     Engine,
 64                                     Module,
 65                                     ModuleConf,
 66                                     CreateModule,
 67                                     ModuleInfo,
 68                                     import_module_info)
 69from hat.event.common.subscription import (Subscription,
 70                                           create_subscription)
 71
 72
 73__all__ = ['BackendClosedError',
 74           'Backend',
 75           'BackendConf',
 76           'BackendRegisteredEventsCb',
 77           'BackendFlushedEventsCb',
 78           'CreateBackend',
 79           'BackendInfo',
 80           'import_backend_info',
 81           'EventTypeCollection',
 82           'ListEventTypeCollection',
 83           'TreeEventTypeCollection',
 84           'json_schema_repo',
 85           'sbs_repo',
 86           'ServerId',
 87           'SessionId',
 88           'InstanceId',
 89           'EventTypeSegment',
 90           'EventType',
 91           'Timestamp',
 92           'min_timestamp',
 93           'max_timestamp',
 94           'Status',
 95           'Order',
 96           'OrderBy',
 97           'EventId',
 98           'EventPayloadBinary',
 99           'EventPayloadJson',
100           'EventPayload',
101           'Event',
102           'RegisterEvent',
103           'QueryLatestParams',
104           'QueryTimeseriesParams',
105           'QueryServerParams',
106           'QueryParams',
107           'QueryResult',
108           'timestamp_to_bytes',
109           'timestamp_from_bytes',
110           'timestamp_to_float',
111           'timestamp_from_float',
112           'timestamp_to_datetime',
113           'timestamp_from_datetime',
114           'timestamp_to_sbs',
115           'timestamp_from_sbs',
116           'status_to_sbs',
117           'status_from_sbs',
118           'event_to_sbs',
119           'event_from_sbs',
120           'register_event_to_sbs',
121           'register_event_from_sbs',
122           'query_params_to_sbs',
123           'query_params_from_sbs',
124           'query_result_to_sbs',
125           'query_result_from_sbs',
126           'event_payload_to_sbs',
127           'event_payload_from_sbs',
128           'matches_query_type',
129           'SourceType',
130           'Source',
131           'Engine',
132           'Module',
133           'ModuleConf',
134           'CreateModule',
135           'ModuleInfo',
136           'import_module_info',
137           'Subscription',
138           'create_subscription',
139           'now']
140
141
142def now() -> Timestamp:
143    """Create new timestamp representing current time"""
144    return timestamp_from_float(time.time())
class BackendClosedError(builtins.Exception):
16class BackendClosedError(Exception):
17    """Backend closed"""

Backend closed

class Backend(hat.aio.group.Resource):
20class Backend(aio.Resource):
21    """Backend ABC"""
22
23    @abc.abstractmethod
24    async def get_last_event_id(self,
25                                server_id: int
26                                ) -> EventId:
27        """Get last registered event id associated with server id"""
28
29    @abc.abstractmethod
30    async def register(self,
31                       events: Collection[Event]
32                       ) -> Collection[Event] | None:
33        """Register events"""
34
35    @abc.abstractmethod
36    async def query(self,
37                    params: QueryParams
38                    ) -> QueryResult:
39        """Query events"""
40
41    @abc.abstractmethod
42    async def flush(self):
43        """Flush internal buffers and permanently persist events"""

Backend ABC

@abc.abstractmethod
async def get_last_event_id(self, server_id: int) -> EventId:
23    @abc.abstractmethod
24    async def get_last_event_id(self,
25                                server_id: int
26                                ) -> EventId:
27        """Get last registered event id associated with server id"""

Get last registered event id associated with server id

@abc.abstractmethod
async def register( self, events: Collection[Event]) -> Collection[Event] | None:
29    @abc.abstractmethod
30    async def register(self,
31                       events: Collection[Event]
32                       ) -> Collection[Event] | None:
33        """Register events"""

Register events

@abc.abstractmethod
async def query( self, params: QueryLatestParams | QueryTimeseriesParams | QueryServerParams) -> QueryResult:
35    @abc.abstractmethod
36    async def query(self,
37                    params: QueryParams
38                    ) -> QueryResult:
39        """Query events"""

Query events

@abc.abstractmethod
async def flush(self):
41    @abc.abstractmethod
42    async def flush(self):
43        """Flush internal buffers and permanently persist events"""

Flush internal buffers and permanently persist events

BackendConf = None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]
BackendRegisteredEventsCb = typing.Callable[[collections.abc.Collection[Event]], typing.Optional[typing.Awaitable[NoneType]]]
BackendFlushedEventsCb = typing.Callable[[collections.abc.Collection[Event]], typing.Optional[typing.Awaitable[NoneType]]]
CreateBackend = typing.Callable[[None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], typing.Optional[typing.Callable[[collections.abc.Collection[Event]], typing.Optional[typing.Awaitable[NoneType]]]], typing.Optional[typing.Callable[[collections.abc.Collection[Event]], typing.Optional[typing.Awaitable[NoneType]]]]], typing.Union[Backend, typing.Awaitable[Backend]]]
class BackendInfo(typing.NamedTuple):
67class BackendInfo(typing.NamedTuple):
68    """Backend info
69
70    Backend is implemented as python module which is dynamically imported.
71    It is expected that this module contains `info` which is instance of
72    `BackendInfo`.
73
74    If backend defines JSON schema repository and JSON schema id, JSON schema
75    repository will be used for additional validation of backend configuration
76    with JSON schema id.
77
78    """
79    create: CreateBackend
80    json_schema_id: str | None = None
81    json_schema_repo: json.SchemaRepository | None = None

Backend info

Backend is implemented as python module which is dynamically imported. It is expected that this module contains info which is instance of BackendInfo.

If backend defines JSON schema repository and JSON schema id, JSON schema repository will be used for additional validation of backend configuration with JSON schema id.

BackendInfo( create: Callable[[None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], Optional[Callable[[Collection[Event]], Optional[Awaitable[NoneType]]]], Optional[Callable[[Collection[Event]], Optional[Awaitable[NoneType]]]]], Union[Backend, Awaitable[Backend]]], json_schema_id: str | None = None, json_schema_repo: hat.json.repository.SchemaRepository | None = None)

Create new instance of BackendInfo(create, json_schema_id, json_schema_repo)

create: Callable[[None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], Optional[Callable[[Collection[Event]], Optional[Awaitable[NoneType]]]], Optional[Callable[[Collection[Event]], Optional[Awaitable[NoneType]]]]], Union[Backend, Awaitable[Backend]]]

Alias for field number 0

json_schema_id: str | None

Alias for field number 1

json_schema_repo: hat.json.repository.SchemaRepository | None

Alias for field number 2

def import_backend_info(py_module_str: str) -> BackendInfo:
84def import_backend_info(py_module_str: str) -> BackendInfo:
85    """Import backend info"""
86    py_module = importlib.import_module(py_module_str)
87    info = py_module.info
88
89    if not isinstance(info, BackendInfo):
90        raise Exception('invalid backend implementation')
91
92    return info

Import backend info

class EventTypeCollection(abc.ABC, typing.Generic[~T]):
13class EventTypeCollection(abc.ABC, typing.Generic[T]):
14
15    @abc.abstractmethod
16    def __init__(self):
17        pass
18
19    @abc.abstractmethod
20    def add(self, subscription: Subscription, value: T):
21        pass
22
23    @abc.abstractmethod
24    def remove(self, value: T):
25        pass
26
27    @abc.abstractmethod
28    def get(self, event_type: EventType) -> Iterable[T]:
29        pass

Helper class that provides a standard way to create an ABC using inheritance.

@abc.abstractmethod
def add( self, subscription: Subscription, value: ~T):
19    @abc.abstractmethod
20    def add(self, subscription: Subscription, value: T):
21        pass
@abc.abstractmethod
def remove(self, value: ~T):
23    @abc.abstractmethod
24    def remove(self, value: T):
25        pass
@abc.abstractmethod
def get(self, event_type: tuple[str, ...]) -> Iterable[~T]:
27    @abc.abstractmethod
28    def get(self, event_type: EventType) -> Iterable[T]:
29        pass
class ListEventTypeCollection(abc.ABC, typing.Generic[~T]):
 5class ListEventTypeCollection(common.EventTypeCollection):
 6
 7    def __init__(self):
 8        self._values = {}
 9
10    def add(self, subscription, value):
11        self._values[value] = (self._values[value].union(subscription)
12                               if value in self._values else subscription)
13
14    def remove(self, value):
15        self._values.pop(value, None)
16
17    def get(self, event_type):
18        for value, subscription in self._values.items():
19            if subscription.matches(event_type):
20                yield value

Helper class that provides a standard way to create an ABC using inheritance.

def add(self, subscription, value):
10    def add(self, subscription, value):
11        self._values[value] = (self._values[value].union(subscription)
12                               if value in self._values else subscription)
def remove(self, value):
14    def remove(self, value):
15        self._values.pop(value, None)
def get(self, event_type):
17    def get(self, event_type):
18        for value, subscription in self._values.items():
19            if subscription.matches(event_type):
20                yield value
class TreeEventTypeCollection(abc.ABC, typing.Generic[~T]):
 8class TreeEventTypeCollection(common.EventTypeCollection):
 9
10    def __init__(self):
11        self._root = _create_node()
12        self._value_nodes = collections.defaultdict(collections.deque)
13
14    def add(self, subscription, value):
15        for query_type in subscription.get_query_types():
16            node = self._root
17            rest = query_type
18
19            while rest:
20                head, rest = rest[0], rest[1:]
21                if head == '*' and rest:
22                    raise ValueError('invalid query type')
23
24                node = node.children[head]
25
26            if value in node.values:
27                return
28
29            node.values.add(value)
30            self._value_nodes[value].append(node)
31
32    def remove(self, value):
33        for node in self._value_nodes.pop(value, []):
34            node.values.remove(value)
35
36    def get(self, event_type):
37        return set(_get(self._root, event_type))

Helper class that provides a standard way to create an ABC using inheritance.

def add(self, subscription, value):
14    def add(self, subscription, value):
15        for query_type in subscription.get_query_types():
16            node = self._root
17            rest = query_type
18
19            while rest:
20                head, rest = rest[0], rest[1:]
21                if head == '*' and rest:
22                    raise ValueError('invalid query type')
23
24                node = node.children[head]
25
26            if value in node.values:
27                return
28
29            node.values.add(value)
30            self._value_nodes[value].append(node)
def remove(self, value):
32    def remove(self, value):
33        for node in self._value_nodes.pop(value, []):
34            node.values.remove(value)
def get(self, event_type):
36    def get(self, event_type):
37        return set(_get(self._root, event_type))
json_schema_repo = <hat.json.repository.SchemaRepository object>
sbs_repo = <hat.sbs.repository.Repository object>
ServerId = <class 'int'>
SessionId = <class 'int'>
InstanceId = <class 'int'>
EventTypeSegment = <class 'str'>
EventType = tuple[str, ...]
class Timestamp(typing.NamedTuple):
40class Timestamp(typing.NamedTuple):
41    s: int
42    """seconds since 1970-01-01 (can be negative)"""
43    us: int
44    """microseconds added to timestamp seconds in range [0, 1e6)"""
45
46    def add(self, s: float) -> 'Timestamp':
47        """Create new timestamp by adding seconds to existing timestamp"""
48        us = self.us + round((s - int(s)) * 1e6)
49        s = self.s + int(s)
50        return Timestamp(s=s + us // int(1e6),
51                         us=us % int(1e6))

Timestamp(s, us)

Timestamp(s: int, us: int)

Create new instance of Timestamp(s, us)

s: int

seconds since 1970-01-01 (can be negative)

us: int

microseconds added to timestamp seconds in range [0, 1e6)

def add(self, s: float) -> Timestamp:
46    def add(self, s: float) -> 'Timestamp':
47        """Create new timestamp by adding seconds to existing timestamp"""
48        us = self.us + round((s - int(s)) * 1e6)
49        s = self.s + int(s)
50        return Timestamp(s=s + us // int(1e6),
51                         us=us % int(1e6))

Create new timestamp by adding seconds to existing timestamp

min_timestamp = Timestamp(s=-9223372036854775808, us=0)
max_timestamp = Timestamp(s=9223372036854775807, us=999999)
class Status(enum.Enum):
61class Status(enum.Enum):
62    STANDBY = 'standby'
63    STARTING = 'starting'
64    OPERATIONAL = 'operational'
65    STOPPING = 'stopping'

An enumeration.

STANDBY = <Status.STANDBY: 'standby'>
STARTING = <Status.STARTING: 'starting'>
OPERATIONAL = <Status.OPERATIONAL: 'operational'>
STOPPING = <Status.STOPPING: 'stopping'>
class Order(enum.Enum):
68class Order(enum.Enum):
69    DESCENDING = 'descending'
70    ASCENDING = 'ascending'

An enumeration.

DESCENDING = <Order.DESCENDING: 'descending'>
ASCENDING = <Order.ASCENDING: 'ascending'>
class OrderBy(enum.Enum):
73class OrderBy(enum.Enum):
74    TIMESTAMP = 'timestamp'
75    SOURCE_TIMESTAMP = 'sourceTimestamp'

An enumeration.

TIMESTAMP = <OrderBy.TIMESTAMP: 'timestamp'>
SOURCE_TIMESTAMP = <OrderBy.SOURCE_TIMESTAMP: 'sourceTimestamp'>
class EventId(typing.NamedTuple):
78class EventId(typing.NamedTuple):
79    server: ServerId
80    session: SessionId
81    instance: InstanceId

EventId(server, session, instance)

EventId(server: int, session: int, instance: int)

Create new instance of EventId(server, session, instance)

server: int

Alias for field number 0

session: int

Alias for field number 1

instance: int

Alias for field number 2

class EventPayloadBinary(typing.NamedTuple):
84class EventPayloadBinary(typing.NamedTuple):
85    type: str
86    data: util.Bytes

EventPayloadBinary(type, data)

EventPayloadBinary(type: str, data: bytes | bytearray | memoryview)

Create new instance of EventPayloadBinary(type, data)

type: str

Alias for field number 0

data: bytes | bytearray | memoryview

Alias for field number 1

class EventPayloadJson(typing.NamedTuple):
89class EventPayloadJson(typing.NamedTuple):
90    data: json.Data

EventPayloadJson(data,)

EventPayloadJson( data: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')])

Create new instance of EventPayloadJson(data,)

data: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]

Alias for field number 0

class Event(typing.NamedTuple):
 96class Event(typing.NamedTuple):
 97    """Event
 98
 99    Operators `>` and `<` test for natural order where it is assumed that
100    first operand is registered before second operand.
101
102    """
103
104    id: EventId
105    type: EventType
106    timestamp: Timestamp
107    source_timestamp: Timestamp | None
108    payload: EventPayload | None
109
110    def __lt__(self, other):
111        if not isinstance(other, Event):
112            return NotImplemented
113
114        if self.id == other.id:
115            return False
116
117        if self.id.server == other.id.server:
118            return self.id < other.id
119
120        if self.timestamp != other.timestamp:
121            return self.timestamp < other.timestamp
122
123        return True
124
125    def __gt__(self, other):
126        if not isinstance(other, Event):
127            return NotImplemented
128
129        if self.id == other.id:
130            return False
131
132        if self.id.server == other.id.server:
133            return self.id > other.id
134
135        if self.timestamp != other.timestamp:
136            return self.timestamp > other.timestamp
137
138        return False

Event

Operators > and < test for natural order where it is assumed that first operand is registered before second operand.

Event( id: EventId, type: tuple[str, ...], timestamp: Timestamp, source_timestamp: Timestamp | None, payload: EventPayloadBinary | EventPayloadJson | None)

Create new instance of Event(id, type, timestamp, source_timestamp, payload)

id: EventId

Alias for field number 0

type: tuple[str, ...]

Alias for field number 1

timestamp: Timestamp

Alias for field number 2

source_timestamp: Timestamp | None

Alias for field number 3

Alias for field number 4

class RegisterEvent(typing.NamedTuple):
141class RegisterEvent(typing.NamedTuple):
142    type: EventType
143    source_timestamp: Timestamp | None
144    payload: EventPayload | None

RegisterEvent(type, source_timestamp, payload)

RegisterEvent( type: tuple[str, ...], source_timestamp: Timestamp | None, payload: EventPayloadBinary | EventPayloadJson | None)

Create new instance of RegisterEvent(type, source_timestamp, payload)

type: tuple[str, ...]

Alias for field number 0

source_timestamp: Timestamp | None

Alias for field number 1

Alias for field number 2

class QueryLatestParams(typing.NamedTuple):
147class QueryLatestParams(typing.NamedTuple):
148    event_types: Collection[EventType] | None = None

QueryLatestParams(event_types,)

QueryLatestParams(event_types: Collection[tuple[str, ...]] | None = None)

Create new instance of QueryLatestParams(event_types,)

event_types: Collection[tuple[str, ...]] | None

Alias for field number 0

class QueryTimeseriesParams(typing.NamedTuple):
151class QueryTimeseriesParams(typing.NamedTuple):
152    event_types: Collection[EventType] | None = None
153    t_from: Timestamp | None = None
154    t_to: Timestamp | None = None
155    source_t_from: Timestamp | None = None
156    source_t_to: Timestamp | None = None
157    order: Order = Order.DESCENDING
158    order_by: OrderBy = OrderBy.TIMESTAMP
159    max_results: int | None = None
160    last_event_id: EventId | None = None

QueryTimeseriesParams(event_types, t_from, t_to, source_t_from, source_t_to, order, order_by, max_results, last_event_id)

QueryTimeseriesParams( event_types: Collection[tuple[str, ...]] | None = None, t_from: Timestamp | None = None, t_to: Timestamp | None = None, source_t_from: Timestamp | None = None, source_t_to: Timestamp | None = None, order: Order = <Order.DESCENDING: 'descending'>, order_by: OrderBy = <OrderBy.TIMESTAMP: 'timestamp'>, max_results: int | None = None, last_event_id: EventId | None = None)

Create new instance of QueryTimeseriesParams(event_types, t_from, t_to, source_t_from, source_t_to, order, order_by, max_results, last_event_id)

event_types: Collection[tuple[str, ...]] | None

Alias for field number 0

t_from: Timestamp | None

Alias for field number 1

t_to: Timestamp | None

Alias for field number 2

source_t_from: Timestamp | None

Alias for field number 3

source_t_to: Timestamp | None

Alias for field number 4

order: Order

Alias for field number 5

order_by: OrderBy

Alias for field number 6

max_results: int | None

Alias for field number 7

last_event_id: EventId | None

Alias for field number 8

class QueryServerParams(typing.NamedTuple):
163class QueryServerParams(typing.NamedTuple):
164    server_id: ServerId
165    persisted: bool = False
166    max_results: int | None = None
167    last_event_id: EventId | None = None

QueryServerParams(server_id, persisted, max_results, last_event_id)

QueryServerParams( server_id: int, persisted: bool = False, max_results: int | None = None, last_event_id: EventId | None = None)

Create new instance of QueryServerParams(server_id, persisted, max_results, last_event_id)

server_id: int

Alias for field number 0

persisted: bool

Alias for field number 1

max_results: int | None

Alias for field number 2

last_event_id: EventId | None

Alias for field number 3

class QueryResult(typing.NamedTuple):
175class QueryResult(typing.NamedTuple):
176    events: Collection[Event]
177    more_follows: bool

QueryResult(events, more_follows)

QueryResult( events: Collection[Event], more_follows: bool)

Create new instance of QueryResult(events, more_follows)

events: Collection[Event]

Alias for field number 0

more_follows: bool

Alias for field number 1

def timestamp_to_bytes(t: Timestamp) -> bytes | bytearray | memoryview:
26def timestamp_to_bytes(t: Timestamp) -> util.Bytes:
27    """Convert timestamp to 12 byte representation
28
29    Bytes [0, 8] are big endian unsigned `Timestamp.s` + 2^63 and
30    bytes [9, 12] are big endian unsigned `Timestamp.us`.
31
32    """
33    return struct.pack(">QI", t.s + (1 << 63), t.us)

Convert timestamp to 12 byte representation

Bytes [0, 8] are big endian unsigned Timestamp.s + 2^63 and bytes [9, 12] are big endian unsigned Timestamp.us.

def timestamp_from_bytes( data: bytes | bytearray | memoryview) -> Timestamp:
36def timestamp_from_bytes(data: util.Bytes) -> Timestamp:
37    """Create new timestamp from 12 byte representation
38
39    Bytes representation is same as defined for `timestamp_to_bytes` function.
40
41    """
42    s, us = struct.unpack(">QI", data)
43    return Timestamp(s - (1 << 63), us)

Create new timestamp from 12 byte representation

Bytes representation is same as defined for timestamp_to_bytes function.

def timestamp_to_float(t: Timestamp) -> float:
46def timestamp_to_float(t: Timestamp) -> float:
47    """Convert timestamp to floating number of seconds since 1970-01-01 UTC
48
49    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.
50
51    """
52    return t.s + t.us * 1E-6

Convert timestamp to floating number of seconds since 1970-01-01 UTC

For precise serialization see timestamp_to_bytes/timestamp_from_bytes.

def timestamp_from_float(ts: float) -> Timestamp:
55def timestamp_from_float(ts: float) -> Timestamp:
56    """Create timestamp from floating number of seconds since 1970-01-01 UTC
57
58    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.
59
60    """
61    s = int(ts)
62    if ts < 0:
63        s = s - 1
64
65    us = round((ts - s) * 1E6)
66
67    if us == 1_000_000:
68        return Timestamp(s + 1, 0)
69
70    else:
71        return Timestamp(s, us)

Create timestamp from floating number of seconds since 1970-01-01 UTC

For precise serialization see timestamp_to_bytes/timestamp_from_bytes.

def timestamp_to_datetime(t: Timestamp) -> datetime.datetime:
74def timestamp_to_datetime(t: Timestamp) -> datetime.datetime:
75    """Convert timestamp to datetime (representing utc time)
76
77    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.
78
79    """
80    try:
81        dt_from_s = datetime.datetime.fromtimestamp(t.s, datetime.timezone.utc)
82
83    except OSError:
84        dt_from_s = (
85            datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) +
86            datetime.timedelta(seconds=t.s))
87
88    return datetime.datetime(
89        year=dt_from_s.year,
90        month=dt_from_s.month,
91        day=dt_from_s.day,
92        hour=dt_from_s.hour,
93        minute=dt_from_s.minute,
94        second=dt_from_s.second,
95        microsecond=t.us,
96        tzinfo=datetime.timezone.utc)

Convert timestamp to datetime (representing utc time)

For precise serialization see timestamp_to_bytes/timestamp_from_bytes.

def timestamp_from_datetime(dt: datetime.datetime) -> Timestamp:
 99def timestamp_from_datetime(dt: datetime.datetime) -> Timestamp:
100    """Create new timestamp from datetime
101
102    If `tzinfo` is not set, it is assumed that provided datetime represents
103    utc time.
104
105    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.
106
107    """
108    if not dt.tzinfo:
109        dt = dt.replace(tzinfo=datetime.timezone.utc)
110
111    s = int(dt.timestamp())
112
113    if dt.timestamp() < 0:
114        s = s - 1
115
116    return Timestamp(s=s, us=dt.microsecond)

Create new timestamp from datetime

If tzinfo is not set, it is assumed that provided datetime represents utc time.

For precise serialization see timestamp_to_bytes/timestamp_from_bytes.

def timestamp_to_sbs( t: Timestamp) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
119def timestamp_to_sbs(t: Timestamp) -> sbs.Data:
120    """Convert timestamp to SBS data"""
121    return {'s': t.s, 'us': t.us}

Convert timestamp to SBS data

def timestamp_from_sbs( data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> Timestamp:
124def timestamp_from_sbs(data: sbs.Data) -> Timestamp:
125    """Create new timestamp from SBS data"""
126    return Timestamp(s=data['s'], us=data['us'])

Create new timestamp from SBS data

def status_to_sbs( status: Status) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
129def status_to_sbs(status: Status) -> sbs.Data:
130    """Convert Status to SBS data"""
131    return status.value, None

Convert Status to SBS data

def status_from_sbs( status: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> Status:
134def status_from_sbs(status: sbs.Data) -> Status:
135    """Create Status based on SBS data"""
136    return Status(status[0])

Create Status based on SBS data

def event_to_sbs( event: Event) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
139def event_to_sbs(event: Event) -> sbs.Data:
140    """Convert Event to SBS data"""
141    return {'id': _event_id_to_sbs(event.id),
142            'type': list(event.type),
143            'timestamp': timestamp_to_sbs(event.timestamp),
144            'sourceTimestamp': _optional_to_sbs(event.source_timestamp,
145                                                timestamp_to_sbs),
146            'payload': _optional_to_sbs(event.payload, event_payload_to_sbs)}

Convert Event to SBS data

def event_from_sbs( data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> Event:
149def event_from_sbs(data: sbs.Data) -> Event:
150    """Create Event based on SBS data"""
151    return Event(id=_event_id_from_sbs(data['id']),
152                 type=tuple(data['type']),
153                 timestamp=timestamp_from_sbs(data['timestamp']),
154                 source_timestamp=_optional_from_sbs(data['sourceTimestamp'],
155                                                     timestamp_from_sbs),
156                 payload=_optional_from_sbs(data['payload'],
157                                            event_payload_from_sbs))

Create Event based on SBS data

def register_event_to_sbs( event: RegisterEvent) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
160def register_event_to_sbs(event: RegisterEvent) -> sbs.Data:
161    """Convert RegisterEvent to SBS data"""
162    return {'type': list(event.type),
163            'sourceTimestamp': _optional_to_sbs(event.source_timestamp,
164                                                timestamp_to_sbs),
165            'payload': _optional_to_sbs(event.payload, event_payload_to_sbs)}

Convert RegisterEvent to SBS data

def register_event_from_sbs( data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> RegisterEvent:
168def register_event_from_sbs(data: sbs.Data) -> RegisterEvent:
169    """Create RegisterEvent based on SBS data"""
170    return RegisterEvent(
171        type=tuple(data['type']),
172        source_timestamp=_optional_from_sbs(data['sourceTimestamp'],
173                                            timestamp_from_sbs),
174        payload=_optional_from_sbs(data['payload'], event_payload_from_sbs))

Create RegisterEvent based on SBS data

def query_params_to_sbs( params: QueryLatestParams | QueryTimeseriesParams | QueryServerParams) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
177def query_params_to_sbs(params: QueryParams) -> sbs.Data:
178    """Convert QueryParams to SBS data"""
179    if isinstance(params, QueryLatestParams):
180        return 'latest', {
181            'eventTypes': _optional_to_sbs(params.event_types,
182                                           _event_types_to_sbs)}
183
184    if isinstance(params, QueryTimeseriesParams):
185        return 'timeseries', {
186            'eventTypes': _optional_to_sbs(params.event_types,
187                                           _event_types_to_sbs),
188            'tFrom': _optional_to_sbs(params.t_from, timestamp_to_sbs),
189            'tTo': _optional_to_sbs(params.t_to, timestamp_to_sbs),
190            'sourceTFrom': _optional_to_sbs(params.source_t_from,
191                                            timestamp_to_sbs),
192            'sourceTTo': _optional_to_sbs(params.source_t_to,
193                                          timestamp_to_sbs),
194            'order': (params.order.value, None),
195            'orderBy': (params.order_by.value, None),
196            'maxResults': _optional_to_sbs(params.max_results),
197            'lastEventId': _optional_to_sbs(params.last_event_id,
198                                            _event_id_to_sbs)}
199
200    if isinstance(params, QueryServerParams):
201        return 'server', {
202            'serverId': params.server_id,
203            'persisted': params.persisted,
204            'maxResults': _optional_to_sbs(params.max_results),
205            'lastEventId': _optional_to_sbs(params.last_event_id,
206                                            _event_id_to_sbs)}
207
208    raise ValueError('unsupported params type')

Convert QueryParams to SBS data

def query_params_from_sbs( data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> QueryLatestParams | QueryTimeseriesParams | QueryServerParams:
211def query_params_from_sbs(data: sbs.Data) -> QueryParams:
212    """Create QueryParams based on SBS data"""
213    if data[0] == 'latest':
214        return QueryLatestParams(
215            event_types=_optional_from_sbs(data[1]['eventTypes'],
216                                           _event_types_from_sbs))
217
218    if data[0] == 'timeseries':
219        return QueryTimeseriesParams(
220            event_types=_optional_from_sbs(data[1]['eventTypes'],
221                                           _event_types_from_sbs),
222            t_from=_optional_from_sbs(data[1]['tFrom'], timestamp_from_sbs),
223            t_to=_optional_from_sbs(data[1]['tTo'], timestamp_from_sbs),
224            source_t_from=_optional_from_sbs(data[1]['sourceTFrom'],
225                                             timestamp_from_sbs),
226            source_t_to=_optional_from_sbs(data[1]['sourceTTo'],
227                                           timestamp_from_sbs),
228            order=Order(data[1]['order'][0]),
229            order_by=OrderBy(data[1]['orderBy'][0]),
230            max_results=_optional_from_sbs(data[1]['maxResults']),
231            last_event_id=_optional_from_sbs(data[1]['lastEventId'],
232                                             _event_id_from_sbs))
233
234    if data[0] == 'server':
235        return QueryServerParams(
236            server_id=data[1]['serverId'],
237            persisted=data[1]['persisted'],
238            max_results=_optional_from_sbs(data[1]['maxResults']),
239            last_event_id=_optional_from_sbs(data[1]['lastEventId'],
240                                             _event_id_from_sbs))
241
242    raise ValueError('unsupported params type')

Create QueryParams based on SBS data

def query_result_to_sbs( result: QueryResult) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
245def query_result_to_sbs(result: QueryResult) -> sbs.Data:
246    """Convert QueryResult to SBS data"""
247    return {'events': [event_to_sbs(event) for event in result.events],
248            'moreFollows': result.more_follows}

Convert QueryResult to SBS data

def query_result_from_sbs( data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> QueryResult:
251def query_result_from_sbs(data: sbs.Data) -> QueryResult:
252    """Create QueryResult based on SBS data"""
253    return QueryResult(events=[event_from_sbs(i) for i in data['events']],
254                       more_follows=data['moreFollows'])

Create QueryResult based on SBS data

def event_payload_to_sbs( payload: EventPayloadBinary | EventPayloadJson) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
257def event_payload_to_sbs(payload: EventPayload) -> sbs.Data:
258    """Convert EventPayload to SBS data"""
259    if isinstance(payload, EventPayloadBinary):
260        return 'binary', {'type': payload.type,
261                          'data': payload.data}
262
263    if isinstance(payload, EventPayloadJson):
264        return 'json', json.encode(payload.data)
265
266    raise ValueError('unsupported payload type')

Convert EventPayload to SBS data

def event_payload_from_sbs( data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> EventPayloadBinary | EventPayloadJson:
269def event_payload_from_sbs(data: sbs.Data) -> EventPayload:
270    """Create EventPayload based on SBS data"""
271    if data[0] == 'binary':
272        return EventPayloadBinary(type=data[1]['type'],
273                                  data=data[1]['data'])
274
275    if data[0] == 'json':
276        return EventPayloadJson(data=json.decode(data[1]))
277
278    raise ValueError('unsupported payload type')

Create EventPayload based on SBS data

def matches_query_type(event_type: tuple[str, ...], query_type: tuple[str, ...]) -> bool:
 5def matches_query_type(event_type: EventType,
 6                       query_type: EventType
 7                       ) -> bool:
 8    """Determine if event type matches query type
 9
10    Event type is tested if it matches query type according to the following
11    rules:
12
13        * Matching is performed on subtypes in increasing order.
14        * Event type is a match only if all its subtypes are matched by
15          corresponding query subtypes.
16        * Matching is finished when all query subtypes are exhausted.
17        * Query subtype '?' matches exactly one event subtype of any value.
18          The subtype must exist.
19        * Query subtype '*' matches 0 or more event subtypes of any value. It
20          must be the last query subtype.
21        * All other values of query subtype match exactly one event subtype
22          of the same value.
23        * Query type without subtypes is matched only by event type with no
24          subtypes.
25
26    As a consequence of aforementioned matching rules, event subtypes '*' and
27    '?' cannot be directly matched and it is advisable not to use them in event
28    types.
29
30    """
31    is_variable = bool(query_type and query_type[-1] == '*')
32    if is_variable:
33        query_type = query_type[:-1]
34
35    if len(event_type) < len(query_type):
36        return False
37
38    if len(event_type) > len(query_type) and not is_variable:
39        return False
40
41    for i, j in zip(event_type, query_type):
42        if j != '?' and i != j:
43            return False
44
45    return True

Determine if event type matches query type

Event type is tested if it matches query type according to the following rules:

* Matching is performed on subtypes in increasing order.
* Event type is a match only if all its subtypes are matched by
  corresponding query subtypes.
* Matching is finished when all query subtypes are exhausted.
* Query subtype '?' matches exactly one event subtype of any value.
  The subtype must exist.
* Query subtype '*' matches 0 or more event subtypes of any value. It
  must be the last query subtype.
* All other values of query subtype match exactly one event subtype
  of the same value.
* Query type without subtypes is matched only by event type with no
  subtypes.

As a consequence of aforementioned matching rules, event subtypes '*' and '?' cannot be directly matched and it is advisable not to use them in event types.

class SourceType(enum.Enum):
18class SourceType(enum.Enum):
19    EVENTER = 1
20    MODULE = 2
21    ENGINE = 3
22    SERVER = 4

An enumeration.

EVENTER = <SourceType.EVENTER: 1>
MODULE = <SourceType.MODULE: 2>
ENGINE = <SourceType.ENGINE: 3>
SERVER = <SourceType.SERVER: 4>
class Source(typing.NamedTuple):
25class Source(typing.NamedTuple):
26    type: SourceType
27    id: int

Source(type, id)

Source(type: SourceType, id: int)

Create new instance of Source(type, id)

type: SourceType

Alias for field number 0

id: int

Alias for field number 1

class Engine(hat.aio.group.Resource):
30class Engine(aio.Resource):
31    """Engine ABC"""
32
33    @property
34    @abc.abstractmethod
35    def server_id(self) -> int:
36        """Event server identifier"""
37
38    @abc.abstractmethod
39    async def register(self,
40                       source: Source,
41                       events: Collection[RegisterEvent]
42                       ) -> Collection[Event] | None:
43        """Register events"""
44
45    @abc.abstractmethod
46    async def query(self,
47                    params: QueryParams
48                    ) -> QueryResult:
49        """Query events"""
50
51    @abc.abstractmethod
52    def get_client_names(self) -> Iterable[tuple[Source, str]]:
53        """Get client names connected to local eventer server"""
54
55    @abc.abstractmethod
56    def restart(self):
57        """Schedule engine restart"""
58
59    @abc.abstractmethod
60    def reset_monitor_ready(self):
61        """Schedule reseting of monitor component's ready flag"""

Engine ABC

server_id: int
33    @property
34    @abc.abstractmethod
35    def server_id(self) -> int:
36        """Event server identifier"""

Event server identifier

@abc.abstractmethod
async def register( self, source: Source, events: Collection[RegisterEvent]) -> Collection[Event] | None:
38    @abc.abstractmethod
39    async def register(self,
40                       source: Source,
41                       events: Collection[RegisterEvent]
42                       ) -> Collection[Event] | None:
43        """Register events"""

Register events

@abc.abstractmethod
async def query( self, params: QueryLatestParams | QueryTimeseriesParams | QueryServerParams) -> QueryResult:
45    @abc.abstractmethod
46    async def query(self,
47                    params: QueryParams
48                    ) -> QueryResult:
49        """Query events"""

Query events

@abc.abstractmethod
def get_client_names(self) -> Iterable[tuple[Source, str]]:
51    @abc.abstractmethod
52    def get_client_names(self) -> Iterable[tuple[Source, str]]:
53        """Get client names connected to local eventer server"""

Get client names connected to local eventer server

@abc.abstractmethod
def restart(self):
55    @abc.abstractmethod
56    def restart(self):
57        """Schedule engine restart"""

Schedule engine restart

@abc.abstractmethod
def reset_monitor_ready(self):
59    @abc.abstractmethod
60    def reset_monitor_ready(self):
61        """Schedule reseting of monitor component's ready flag"""

Schedule reseting of monitor component's ready flag

class Module(hat.aio.group.Resource):
 64class Module(aio.Resource):
 65    """Module ABC"""
 66
 67    @property
 68    @abc.abstractmethod
 69    def subscription(self) -> Subscription:
 70        """Subscribed event types filter.
 71
 72        `subscription` is constant during module's lifetime.
 73
 74        """
 75
 76    async def on_session_start(self, session_id: int):
 77        """Called on start of a session, identified by session_id.
 78
 79        This method can be coroutine or regular function.
 80
 81        """
 82
 83    async def on_session_stop(self, session_id: int):
 84        """Called on stop of a session, identified by session_id.
 85
 86        This method can be coroutine or regular function.
 87
 88        """
 89
 90    @abc.abstractmethod
 91    async def process(self,
 92                      source: Source,
 93                      event: Event
 94                      ) -> Iterable[RegisterEvent] | None:
 95        """Process new session event.
 96
 97        Provided event is matched by modules subscription filter.
 98
 99        Processing of session event can result in registration of
100        new register events.
101
102        Single module session process is always called sequentially.
103
104        This method can be coroutine or regular function.
105
106        """

Module ABC

subscription: Subscription
67    @property
68    @abc.abstractmethod
69    def subscription(self) -> Subscription:
70        """Subscribed event types filter.
71
72        `subscription` is constant during module's lifetime.
73
74        """

Subscribed event types filter.

subscription is constant during module's lifetime.

async def on_session_start(self, session_id: int):
76    async def on_session_start(self, session_id: int):
77        """Called on start of a session, identified by session_id.
78
79        This method can be coroutine or regular function.
80
81        """

Called on start of a session, identified by session_id.

This method can be coroutine or regular function.

async def on_session_stop(self, session_id: int):
83    async def on_session_stop(self, session_id: int):
84        """Called on stop of a session, identified by session_id.
85
86        This method can be coroutine or regular function.
87
88        """

Called on stop of a session, identified by session_id.

This method can be coroutine or regular function.

@abc.abstractmethod
async def process( self, source: Source, event: Event) -> Iterable[RegisterEvent] | None:
 90    @abc.abstractmethod
 91    async def process(self,
 92                      source: Source,
 93                      event: Event
 94                      ) -> Iterable[RegisterEvent] | None:
 95        """Process new session event.
 96
 97        Provided event is matched by modules subscription filter.
 98
 99        Processing of session event can result in registration of
100        new register events.
101
102        Single module session process is always called sequentially.
103
104        This method can be coroutine or regular function.
105
106        """

Process new session event.

Provided event is matched by modules subscription filter.

Processing of session event can result in registration of new register events.

Single module session process is always called sequentially.

This method can be coroutine or regular function.

ModuleConf = None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]
CreateModule = typing.Callable[[None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], Engine, Source], typing.Union[Module, typing.Awaitable[Module]]]
class ModuleInfo(typing.NamedTuple):
118class ModuleInfo(typing.NamedTuple):
119    """Module info
120
121    Module is implemented as python module which is dynamically imported.
122    It is expected that this module contains `info` which is instance of
123    `ModuleInfo`.
124
125    If module defines JSON schema repository and JSON schema id, JSON schema
126    repository will be used for additional validation of module configuration
127    with JSON schema id.
128
129    """
130    create: CreateModule
131    json_schema_id: str | None = None
132    json_schema_repo: json.SchemaRepository | None = None

Module info

Module is implemented as python module which is dynamically imported. It is expected that this module contains info which is instance of ModuleInfo.

If module defines JSON schema repository and JSON schema id, JSON schema repository will be used for additional validation of module configuration with JSON schema id.

ModuleInfo( create: Callable[[None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], Engine, Source], Union[Module, Awaitable[Module]]], json_schema_id: str | None = None, json_schema_repo: hat.json.repository.SchemaRepository | None = None)

Create new instance of ModuleInfo(create, json_schema_id, json_schema_repo)

create: Callable[[None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], Engine, Source], Union[Module, Awaitable[Module]]]

Alias for field number 0

json_schema_id: str | None

Alias for field number 1

json_schema_repo: hat.json.repository.SchemaRepository | None

Alias for field number 2

def import_module_info(py_module_str: str) -> ModuleInfo:
135def import_module_info(py_module_str: str) -> ModuleInfo:
136    """Import module info"""
137    py_module = importlib.import_module(py_module_str)
138    info = py_module.info
139
140    if not isinstance(info, ModuleInfo):
141        raise Exception('invalid module implementation')
142
143    return info

Import module info

class Subscription(abc.ABC):
10class Subscription(abc.ABC):
11    """Subscription defined by query event types"""
12
13    @abc.abstractmethod
14    def __init__(self, query_types: Iterable[EventType]):
15        """Create subscription instance"""
16
17    @abc.abstractmethod
18    def get_query_types(self) -> Iterable[EventType]:
19        """Calculate sanitized query event types"""
20
21    @abc.abstractmethod
22    def matches(self, event_type: EventType) -> bool:
23        """Does `event_type` match subscription"""
24
25    @abc.abstractmethod
26    def union(self, *others: 'Subscription') -> 'Subscription':
27        """Create new subscription including event types from this and
28        other subscriptions."""
29
30    @abc.abstractmethod
31    def intersection(self, *others: 'Subscription') -> 'Subscription':
32        """Create new subscription containing event types in common with
33        other subscriptions."""
34
35    @abc.abstractmethod
36    def isdisjoint(self, other: 'Subscription') -> bool:
37        """Return ``True`` if this subscription has no event types in common
38        with other subscription."""

Subscription defined by query event types

@abc.abstractmethod
Subscription(query_types: Iterable[tuple[str, ...]])
13    @abc.abstractmethod
14    def __init__(self, query_types: Iterable[EventType]):
15        """Create subscription instance"""

Create subscription instance

@abc.abstractmethod
def get_query_types(self) -> Iterable[tuple[str, ...]]:
17    @abc.abstractmethod
18    def get_query_types(self) -> Iterable[EventType]:
19        """Calculate sanitized query event types"""

Calculate sanitized query event types

@abc.abstractmethod
def matches(self, event_type: tuple[str, ...]) -> bool:
21    @abc.abstractmethod
22    def matches(self, event_type: EventType) -> bool:
23        """Does `event_type` match subscription"""

Does event_type match subscription

@abc.abstractmethod
def union( self, *others: Subscription) -> Subscription:
25    @abc.abstractmethod
26    def union(self, *others: 'Subscription') -> 'Subscription':
27        """Create new subscription including event types from this and
28        other subscriptions."""

Create new subscription including event types from this and other subscriptions.

@abc.abstractmethod
def intersection( self, *others: Subscription) -> Subscription:
30    @abc.abstractmethod
31    def intersection(self, *others: 'Subscription') -> 'Subscription':
32        """Create new subscription containing event types in common with
33        other subscriptions."""

Create new subscription containing event types in common with other subscriptions.

@abc.abstractmethod
def isdisjoint(self, other: Subscription) -> bool:
35    @abc.abstractmethod
36    def isdisjoint(self, other: 'Subscription') -> bool:
37        """Return ``True`` if this subscription has no event types in common
38        with other subscription."""

Return True if this subscription has no event types in common with other subscription.

def create_subscription( query_types: Iterable[tuple[str, ...]]) -> Subscription:
19def create_subscription(query_types: Iterable[EventType]) -> Subscription:
20    if CSubscription is not None:
21        return CSubscription(query_types)
22
23    return PySubscription(query_types)
def now() -> Timestamp:
143def now() -> Timestamp:
144    """Create new timestamp representing current time"""
145    return timestamp_from_float(time.time())

Create new timestamp representing current time