hat.event.common
Common functionality shared between clients and event server
1"""Common functionality shared between clients and event server""" 2 3import time 4 5from hat.event.common.backend import (BackendClosedError, 6 Backend, 7 BackendConf, 8 BackendRegisteredEventsCb, 9 BackendFlushedEventsCb, 10 CreateBackend, 11 BackendInfo, 12 import_backend_info) 13from hat.event.common.collection import (EventTypeCollection, 14 ListEventTypeCollection, 15 TreeEventTypeCollection) 16from hat.event.common.common import (json_schema_repo, 17 sbs_repo, 18 ServerId, 19 SessionId, 20 InstanceId, 21 EventTypeSegment, 22 EventType, 23 Timestamp, 24 min_timestamp, 25 max_timestamp, 26 Status, 27 Order, 28 OrderBy, 29 EventId, 30 EventPayloadBinary, 31 EventPayloadJson, 32 EventPayload, 33 Event, 34 RegisterEvent, 35 QueryLatestParams, 36 QueryTimeseriesParams, 37 QueryServerParams, 38 QueryParams, 39 QueryResult) 40from hat.event.common.encoder import (timestamp_to_bytes, 41 timestamp_from_bytes, 42 timestamp_to_float, 43 timestamp_from_float, 44 timestamp_to_datetime, 45 timestamp_from_datetime, 46 timestamp_to_sbs, 47 timestamp_from_sbs, 48 status_to_sbs, 49 status_from_sbs, 50 event_to_sbs, 51 event_from_sbs, 52 register_event_to_sbs, 53 register_event_from_sbs, 54 query_params_to_sbs, 55 query_params_from_sbs, 56 query_result_to_sbs, 57 query_result_from_sbs, 58 event_payload_to_sbs, 59 event_payload_from_sbs) 60from hat.event.common.matches import matches_query_type 61from hat.event.common.module import (SourceType, 62 Source, 63 Engine, 64 Module, 65 ModuleConf, 66 CreateModule, 67 ModuleInfo, 68 import_module_info) 69from hat.event.common.subscription import (Subscription, 70 create_subscription) 71 72 73__all__ = ['BackendClosedError', 74 'Backend', 75 'BackendConf', 76 'BackendRegisteredEventsCb', 77 'BackendFlushedEventsCb', 78 'CreateBackend', 79 'BackendInfo', 80 'import_backend_info', 81 'EventTypeCollection', 82 'ListEventTypeCollection', 83 'TreeEventTypeCollection', 84 'json_schema_repo', 85 'sbs_repo', 86 'ServerId', 87 'SessionId', 88 'InstanceId', 89 'EventTypeSegment', 90 'EventType', 91 'Timestamp', 92 'min_timestamp', 93 'max_timestamp', 94 'Status', 95 'Order', 96 'OrderBy', 97 'EventId', 98 'EventPayloadBinary', 99 'EventPayloadJson', 100 'EventPayload', 101 'Event', 102 'RegisterEvent', 103 'QueryLatestParams', 104 'QueryTimeseriesParams', 105 'QueryServerParams', 106 'QueryParams', 107 'QueryResult', 108 'timestamp_to_bytes', 109 'timestamp_from_bytes', 110 'timestamp_to_float', 111 'timestamp_from_float', 112 'timestamp_to_datetime', 113 'timestamp_from_datetime', 114 'timestamp_to_sbs', 115 'timestamp_from_sbs', 116 'status_to_sbs', 117 'status_from_sbs', 118 'event_to_sbs', 119 'event_from_sbs', 120 'register_event_to_sbs', 121 'register_event_from_sbs', 122 'query_params_to_sbs', 123 'query_params_from_sbs', 124 'query_result_to_sbs', 125 'query_result_from_sbs', 126 'event_payload_to_sbs', 127 'event_payload_from_sbs', 128 'matches_query_type', 129 'SourceType', 130 'Source', 131 'Engine', 132 'Module', 133 'ModuleConf', 134 'CreateModule', 135 'ModuleInfo', 136 'import_module_info', 137 'Subscription', 138 'create_subscription', 139 'now'] 140 141 142def now() -> Timestamp: 143 """Create new timestamp representing current time""" 144 return timestamp_from_float(time.time())
Backend closed
20class Backend(aio.Resource): 21 """Backend ABC""" 22 23 @abc.abstractmethod 24 async def get_last_event_id(self, 25 server_id: int 26 ) -> EventId: 27 """Get last registered event id associated with server id""" 28 29 @abc.abstractmethod 30 async def register(self, 31 events: Collection[Event] 32 ) -> Collection[Event] | None: 33 """Register events""" 34 35 @abc.abstractmethod 36 async def query(self, 37 params: QueryParams 38 ) -> QueryResult: 39 """Query events""" 40 41 @abc.abstractmethod 42 async def flush(self): 43 """Flush internal buffers and permanently persist events"""
Backend ABC
23 @abc.abstractmethod 24 async def get_last_event_id(self, 25 server_id: int 26 ) -> EventId: 27 """Get last registered event id associated with server id"""
Get last registered event id associated with server id
29 @abc.abstractmethod 30 async def register(self, 31 events: Collection[Event] 32 ) -> Collection[Event] | None: 33 """Register events"""
Register events
35 @abc.abstractmethod 36 async def query(self, 37 params: QueryParams 38 ) -> QueryResult: 39 """Query events"""
Query events
67class BackendInfo(typing.NamedTuple): 68 """Backend info 69 70 Backend is implemented as python module which is dynamically imported. 71 It is expected that this module contains `info` which is instance of 72 `BackendInfo`. 73 74 If backend defines JSON schema repository and JSON schema id, JSON schema 75 repository will be used for additional validation of backend configuration 76 with JSON schema id. 77 78 """ 79 create: CreateBackend 80 json_schema_id: str | None = None 81 json_schema_repo: json.SchemaRepository | None = None
Backend info
Backend is implemented as python module which is dynamically imported.
It is expected that this module contains info
which is instance of
BackendInfo
.
If backend defines JSON schema repository and JSON schema id, JSON schema repository will be used for additional validation of backend configuration with JSON schema id.
Create new instance of BackendInfo(create, json_schema_id, json_schema_repo)
Alias for field number 0
84def import_backend_info(py_module_str: str) -> BackendInfo: 85 """Import backend info""" 86 py_module = importlib.import_module(py_module_str) 87 info = py_module.info 88 89 if not isinstance(info, BackendInfo): 90 raise Exception('invalid backend implementation') 91 92 return info
Import backend info
13class EventTypeCollection(abc.ABC, typing.Generic[T]): 14 15 @abc.abstractmethod 16 def __init__(self): 17 pass 18 19 @abc.abstractmethod 20 def add(self, subscription: Subscription, value: T): 21 pass 22 23 @abc.abstractmethod 24 def remove(self, value: T): 25 pass 26 27 @abc.abstractmethod 28 def get(self, event_type: EventType) -> Iterable[T]: 29 pass
Helper class that provides a standard way to create an ABC using inheritance.
5class ListEventTypeCollection(common.EventTypeCollection): 6 7 def __init__(self): 8 self._values = {} 9 10 def add(self, subscription, value): 11 self._values[value] = (self._values[value].union(subscription) 12 if value in self._values else subscription) 13 14 def remove(self, value): 15 self._values.pop(value, None) 16 17 def get(self, event_type): 18 for value, subscription in self._values.items(): 19 if subscription.matches(event_type): 20 yield value
Helper class that provides a standard way to create an ABC using inheritance.
8class TreeEventTypeCollection(common.EventTypeCollection): 9 10 def __init__(self): 11 self._root = _create_node() 12 self._value_nodes = collections.defaultdict(collections.deque) 13 14 def add(self, subscription, value): 15 for query_type in subscription.get_query_types(): 16 node = self._root 17 rest = query_type 18 19 while rest: 20 head, rest = rest[0], rest[1:] 21 if head == '*' and rest: 22 raise ValueError('invalid query type') 23 24 node = node.children[head] 25 26 if value in node.values: 27 return 28 29 node.values.add(value) 30 self._value_nodes[value].append(node) 31 32 def remove(self, value): 33 for node in self._value_nodes.pop(value, []): 34 node.values.remove(value) 35 36 def get(self, event_type): 37 return set(_get(self._root, event_type))
Helper class that provides a standard way to create an ABC using inheritance.
14 def add(self, subscription, value): 15 for query_type in subscription.get_query_types(): 16 node = self._root 17 rest = query_type 18 19 while rest: 20 head, rest = rest[0], rest[1:] 21 if head == '*' and rest: 22 raise ValueError('invalid query type') 23 24 node = node.children[head] 25 26 if value in node.values: 27 return 28 29 node.values.add(value) 30 self._value_nodes[value].append(node)
40class Timestamp(typing.NamedTuple): 41 s: int 42 """seconds since 1970-01-01 (can be negative)""" 43 us: int 44 """microseconds added to timestamp seconds in range [0, 1e6)""" 45 46 def add(self, s: float) -> 'Timestamp': 47 """Create new timestamp by adding seconds to existing timestamp""" 48 us = self.us + round((s - int(s)) * 1e6) 49 s = self.s + int(s) 50 return Timestamp(s=s + us // int(1e6), 51 us=us % int(1e6))
Timestamp(s, us)
46 def add(self, s: float) -> 'Timestamp': 47 """Create new timestamp by adding seconds to existing timestamp""" 48 us = self.us + round((s - int(s)) * 1e6) 49 s = self.s + int(s) 50 return Timestamp(s=s + us // int(1e6), 51 us=us % int(1e6))
Create new timestamp by adding seconds to existing timestamp
61class Status(enum.Enum): 62 STANDBY = 'standby' 63 STARTING = 'starting' 64 OPERATIONAL = 'operational' 65 STOPPING = 'stopping'
An enumeration.
An enumeration.
An enumeration.
78class EventId(typing.NamedTuple): 79 server: ServerId 80 session: SessionId 81 instance: InstanceId
EventId(server, session, instance)
EventPayloadBinary(type, data)
EventPayloadJson(data,)
96class Event(typing.NamedTuple): 97 """Event 98 99 Operators `>` and `<` test for natural order where it is assumed that 100 first operand is registered before second operand. 101 102 """ 103 104 id: EventId 105 type: EventType 106 timestamp: Timestamp 107 source_timestamp: Timestamp | None 108 payload: EventPayload | None 109 110 def __lt__(self, other): 111 if not isinstance(other, Event): 112 return NotImplemented 113 114 if self.id == other.id: 115 return False 116 117 if self.id.server == other.id.server: 118 return self.id < other.id 119 120 if self.timestamp != other.timestamp: 121 return self.timestamp < other.timestamp 122 123 return True 124 125 def __gt__(self, other): 126 if not isinstance(other, Event): 127 return NotImplemented 128 129 if self.id == other.id: 130 return False 131 132 if self.id.server == other.id.server: 133 return self.id > other.id 134 135 if self.timestamp != other.timestamp: 136 return self.timestamp > other.timestamp 137 138 return False
Event
Operators >
and <
test for natural order where it is assumed that
first operand is registered before second operand.
Create new instance of Event(id, type, timestamp, source_timestamp, payload)
141class RegisterEvent(typing.NamedTuple): 142 type: EventType 143 source_timestamp: Timestamp | None 144 payload: EventPayload | None
RegisterEvent(type, source_timestamp, payload)
Create new instance of RegisterEvent(type, source_timestamp, payload)
QueryLatestParams(event_types,)
151class QueryTimeseriesParams(typing.NamedTuple): 152 event_types: Collection[EventType] | None = None 153 t_from: Timestamp | None = None 154 t_to: Timestamp | None = None 155 source_t_from: Timestamp | None = None 156 source_t_to: Timestamp | None = None 157 order: Order = Order.DESCENDING 158 order_by: OrderBy = OrderBy.TIMESTAMP 159 max_results: int | None = None 160 last_event_id: EventId | None = None
QueryTimeseriesParams(event_types, t_from, t_to, source_t_from, source_t_to, order, order_by, max_results, last_event_id)
Create new instance of QueryTimeseriesParams(event_types, t_from, t_to, source_t_from, source_t_to, order, order_by, max_results, last_event_id)
163class QueryServerParams(typing.NamedTuple): 164 server_id: ServerId 165 persisted: bool = False 166 max_results: int | None = None 167 last_event_id: EventId | None = None
QueryServerParams(server_id, persisted, max_results, last_event_id)
Create new instance of QueryServerParams(server_id, persisted, max_results, last_event_id)
QueryResult(events, more_follows)
Create new instance of QueryResult(events, more_follows)
26def timestamp_to_bytes(t: Timestamp) -> util.Bytes: 27 """Convert timestamp to 12 byte representation 28 29 Bytes [0, 8] are big endian unsigned `Timestamp.s` + 2^63 and 30 bytes [9, 12] are big endian unsigned `Timestamp.us`. 31 32 """ 33 return struct.pack(">QI", t.s + (1 << 63), t.us)
Convert timestamp to 12 byte representation
Bytes [0, 8] are big endian unsigned Timestamp.s
+ 2^63 and
bytes [9, 12] are big endian unsigned Timestamp.us
.
36def timestamp_from_bytes(data: util.Bytes) -> Timestamp: 37 """Create new timestamp from 12 byte representation 38 39 Bytes representation is same as defined for `timestamp_to_bytes` function. 40 41 """ 42 s, us = struct.unpack(">QI", data) 43 return Timestamp(s - (1 << 63), us)
Create new timestamp from 12 byte representation
Bytes representation is same as defined for timestamp_to_bytes
function.
46def timestamp_to_float(t: Timestamp) -> float: 47 """Convert timestamp to floating number of seconds since 1970-01-01 UTC 48 49 For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`. 50 51 """ 52 return t.s + t.us * 1E-6
Convert timestamp to floating number of seconds since 1970-01-01 UTC
For precise serialization see timestamp_to_bytes
/timestamp_from_bytes
.
55def timestamp_from_float(ts: float) -> Timestamp: 56 """Create timestamp from floating number of seconds since 1970-01-01 UTC 57 58 For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`. 59 60 """ 61 s = int(ts) 62 if ts < 0: 63 s = s - 1 64 65 us = round((ts - s) * 1E6) 66 67 if us == 1_000_000: 68 return Timestamp(s + 1, 0) 69 70 else: 71 return Timestamp(s, us)
Create timestamp from floating number of seconds since 1970-01-01 UTC
For precise serialization see timestamp_to_bytes
/timestamp_from_bytes
.
74def timestamp_to_datetime(t: Timestamp) -> datetime.datetime: 75 """Convert timestamp to datetime (representing utc time) 76 77 For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`. 78 79 """ 80 try: 81 dt_from_s = datetime.datetime.fromtimestamp(t.s, datetime.timezone.utc) 82 83 except OSError: 84 dt_from_s = ( 85 datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + 86 datetime.timedelta(seconds=t.s)) 87 88 return datetime.datetime( 89 year=dt_from_s.year, 90 month=dt_from_s.month, 91 day=dt_from_s.day, 92 hour=dt_from_s.hour, 93 minute=dt_from_s.minute, 94 second=dt_from_s.second, 95 microsecond=t.us, 96 tzinfo=datetime.timezone.utc)
Convert timestamp to datetime (representing utc time)
For precise serialization see timestamp_to_bytes
/timestamp_from_bytes
.
99def timestamp_from_datetime(dt: datetime.datetime) -> Timestamp: 100 """Create new timestamp from datetime 101 102 If `tzinfo` is not set, it is assumed that provided datetime represents 103 utc time. 104 105 For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`. 106 107 """ 108 if not dt.tzinfo: 109 dt = dt.replace(tzinfo=datetime.timezone.utc) 110 111 s = int(dt.timestamp()) 112 113 if dt.timestamp() < 0: 114 s = s - 1 115 116 return Timestamp(s=s, us=dt.microsecond)
Create new timestamp from datetime
If tzinfo
is not set, it is assumed that provided datetime represents
utc time.
For precise serialization see timestamp_to_bytes
/timestamp_from_bytes
.
119def timestamp_to_sbs(t: Timestamp) -> sbs.Data: 120 """Convert timestamp to SBS data""" 121 return {'s': t.s, 'us': t.us}
Convert timestamp to SBS data
124def timestamp_from_sbs(data: sbs.Data) -> Timestamp: 125 """Create new timestamp from SBS data""" 126 return Timestamp(s=data['s'], us=data['us'])
Create new timestamp from SBS data
129def status_to_sbs(status: Status) -> sbs.Data: 130 """Convert Status to SBS data""" 131 return status.value, None
Convert Status to SBS data
134def status_from_sbs(status: sbs.Data) -> Status: 135 """Create Status based on SBS data""" 136 return Status(status[0])
Create Status based on SBS data
139def event_to_sbs(event: Event) -> sbs.Data: 140 """Convert Event to SBS data""" 141 return {'id': _event_id_to_sbs(event.id), 142 'type': list(event.type), 143 'timestamp': timestamp_to_sbs(event.timestamp), 144 'sourceTimestamp': _optional_to_sbs(event.source_timestamp, 145 timestamp_to_sbs), 146 'payload': _optional_to_sbs(event.payload, event_payload_to_sbs)}
Convert Event to SBS data
149def event_from_sbs(data: sbs.Data) -> Event: 150 """Create Event based on SBS data""" 151 return Event(id=_event_id_from_sbs(data['id']), 152 type=tuple(data['type']), 153 timestamp=timestamp_from_sbs(data['timestamp']), 154 source_timestamp=_optional_from_sbs(data['sourceTimestamp'], 155 timestamp_from_sbs), 156 payload=_optional_from_sbs(data['payload'], 157 event_payload_from_sbs))
Create Event based on SBS data
160def register_event_to_sbs(event: RegisterEvent) -> sbs.Data: 161 """Convert RegisterEvent to SBS data""" 162 return {'type': list(event.type), 163 'sourceTimestamp': _optional_to_sbs(event.source_timestamp, 164 timestamp_to_sbs), 165 'payload': _optional_to_sbs(event.payload, event_payload_to_sbs)}
Convert RegisterEvent to SBS data
168def register_event_from_sbs(data: sbs.Data) -> RegisterEvent: 169 """Create RegisterEvent based on SBS data""" 170 return RegisterEvent( 171 type=tuple(data['type']), 172 source_timestamp=_optional_from_sbs(data['sourceTimestamp'], 173 timestamp_from_sbs), 174 payload=_optional_from_sbs(data['payload'], event_payload_from_sbs))
Create RegisterEvent based on SBS data
177def query_params_to_sbs(params: QueryParams) -> sbs.Data: 178 """Convert QueryParams to SBS data""" 179 if isinstance(params, QueryLatestParams): 180 return 'latest', { 181 'eventTypes': _optional_to_sbs(params.event_types, 182 _event_types_to_sbs)} 183 184 if isinstance(params, QueryTimeseriesParams): 185 return 'timeseries', { 186 'eventTypes': _optional_to_sbs(params.event_types, 187 _event_types_to_sbs), 188 'tFrom': _optional_to_sbs(params.t_from, timestamp_to_sbs), 189 'tTo': _optional_to_sbs(params.t_to, timestamp_to_sbs), 190 'sourceTFrom': _optional_to_sbs(params.source_t_from, 191 timestamp_to_sbs), 192 'sourceTTo': _optional_to_sbs(params.source_t_to, 193 timestamp_to_sbs), 194 'order': (params.order.value, None), 195 'orderBy': (params.order_by.value, None), 196 'maxResults': _optional_to_sbs(params.max_results), 197 'lastEventId': _optional_to_sbs(params.last_event_id, 198 _event_id_to_sbs)} 199 200 if isinstance(params, QueryServerParams): 201 return 'server', { 202 'serverId': params.server_id, 203 'persisted': params.persisted, 204 'maxResults': _optional_to_sbs(params.max_results), 205 'lastEventId': _optional_to_sbs(params.last_event_id, 206 _event_id_to_sbs)} 207 208 raise ValueError('unsupported params type')
Convert QueryParams to SBS data
211def query_params_from_sbs(data: sbs.Data) -> QueryParams: 212 """Create QueryParams based on SBS data""" 213 if data[0] == 'latest': 214 return QueryLatestParams( 215 event_types=_optional_from_sbs(data[1]['eventTypes'], 216 _event_types_from_sbs)) 217 218 if data[0] == 'timeseries': 219 return QueryTimeseriesParams( 220 event_types=_optional_from_sbs(data[1]['eventTypes'], 221 _event_types_from_sbs), 222 t_from=_optional_from_sbs(data[1]['tFrom'], timestamp_from_sbs), 223 t_to=_optional_from_sbs(data[1]['tTo'], timestamp_from_sbs), 224 source_t_from=_optional_from_sbs(data[1]['sourceTFrom'], 225 timestamp_from_sbs), 226 source_t_to=_optional_from_sbs(data[1]['sourceTTo'], 227 timestamp_from_sbs), 228 order=Order(data[1]['order'][0]), 229 order_by=OrderBy(data[1]['orderBy'][0]), 230 max_results=_optional_from_sbs(data[1]['maxResults']), 231 last_event_id=_optional_from_sbs(data[1]['lastEventId'], 232 _event_id_from_sbs)) 233 234 if data[0] == 'server': 235 return QueryServerParams( 236 server_id=data[1]['serverId'], 237 persisted=data[1]['persisted'], 238 max_results=_optional_from_sbs(data[1]['maxResults']), 239 last_event_id=_optional_from_sbs(data[1]['lastEventId'], 240 _event_id_from_sbs)) 241 242 raise ValueError('unsupported params type')
Create QueryParams based on SBS data
245def query_result_to_sbs(result: QueryResult) -> sbs.Data: 246 """Convert QueryResult to SBS data""" 247 return {'events': [event_to_sbs(event) for event in result.events], 248 'moreFollows': result.more_follows}
Convert QueryResult to SBS data
251def query_result_from_sbs(data: sbs.Data) -> QueryResult: 252 """Create QueryResult based on SBS data""" 253 return QueryResult(events=[event_from_sbs(i) for i in data['events']], 254 more_follows=data['moreFollows'])
Create QueryResult based on SBS data
257def event_payload_to_sbs(payload: EventPayload) -> sbs.Data: 258 """Convert EventPayload to SBS data""" 259 if isinstance(payload, EventPayloadBinary): 260 return 'binary', {'type': payload.type, 261 'data': payload.data} 262 263 if isinstance(payload, EventPayloadJson): 264 return 'json', json.encode(payload.data) 265 266 raise ValueError('unsupported payload type')
Convert EventPayload to SBS data
269def event_payload_from_sbs(data: sbs.Data) -> EventPayload: 270 """Create EventPayload based on SBS data""" 271 if data[0] == 'binary': 272 return EventPayloadBinary(type=data[1]['type'], 273 data=data[1]['data']) 274 275 if data[0] == 'json': 276 return EventPayloadJson(data=json.decode(data[1])) 277 278 raise ValueError('unsupported payload type')
Create EventPayload based on SBS data
5def matches_query_type(event_type: EventType, 6 query_type: EventType 7 ) -> bool: 8 """Determine if event type matches query type 9 10 Event type is tested if it matches query type according to the following 11 rules: 12 13 * Matching is performed on subtypes in increasing order. 14 * Event type is a match only if all its subtypes are matched by 15 corresponding query subtypes. 16 * Matching is finished when all query subtypes are exhausted. 17 * Query subtype '?' matches exactly one event subtype of any value. 18 The subtype must exist. 19 * Query subtype '*' matches 0 or more event subtypes of any value. It 20 must be the last query subtype. 21 * All other values of query subtype match exactly one event subtype 22 of the same value. 23 * Query type without subtypes is matched only by event type with no 24 subtypes. 25 26 As a consequence of aforementioned matching rules, event subtypes '*' and 27 '?' cannot be directly matched and it is advisable not to use them in event 28 types. 29 30 """ 31 is_variable = bool(query_type and query_type[-1] == '*') 32 if is_variable: 33 query_type = query_type[:-1] 34 35 if len(event_type) < len(query_type): 36 return False 37 38 if len(event_type) > len(query_type) and not is_variable: 39 return False 40 41 for i, j in zip(event_type, query_type): 42 if j != '?' and i != j: 43 return False 44 45 return True
Determine if event type matches query type
Event type is tested if it matches query type according to the following rules:
* Matching is performed on subtypes in increasing order.
* Event type is a match only if all its subtypes are matched by
corresponding query subtypes.
* Matching is finished when all query subtypes are exhausted.
* Query subtype '?' matches exactly one event subtype of any value.
The subtype must exist.
* Query subtype '*' matches 0 or more event subtypes of any value. It
must be the last query subtype.
* All other values of query subtype match exactly one event subtype
of the same value.
* Query type without subtypes is matched only by event type with no
subtypes.
As a consequence of aforementioned matching rules, event subtypes '*' and '?' cannot be directly matched and it is advisable not to use them in event types.
An enumeration.
Source(type, id)
30class Engine(aio.Resource): 31 """Engine ABC""" 32 33 @property 34 @abc.abstractmethod 35 def server_id(self) -> int: 36 """Event server identifier""" 37 38 @abc.abstractmethod 39 async def register(self, 40 source: Source, 41 events: Collection[RegisterEvent] 42 ) -> Collection[Event] | None: 43 """Register events""" 44 45 @abc.abstractmethod 46 async def query(self, 47 params: QueryParams 48 ) -> QueryResult: 49 """Query events""" 50 51 @abc.abstractmethod 52 def get_client_names(self) -> Iterable[tuple[Source, str]]: 53 """Get client names connected to local eventer server""" 54 55 @abc.abstractmethod 56 def restart(self): 57 """Schedule engine restart""" 58 59 @abc.abstractmethod 60 def reset_monitor_ready(self): 61 """Schedule reseting of monitor component's ready flag"""
Engine ABC
38 @abc.abstractmethod 39 async def register(self, 40 source: Source, 41 events: Collection[RegisterEvent] 42 ) -> Collection[Event] | None: 43 """Register events"""
Register events
45 @abc.abstractmethod 46 async def query(self, 47 params: QueryParams 48 ) -> QueryResult: 49 """Query events"""
Query events
51 @abc.abstractmethod 52 def get_client_names(self) -> Iterable[tuple[Source, str]]: 53 """Get client names connected to local eventer server"""
Get client names connected to local eventer server
64class Module(aio.Resource): 65 """Module ABC""" 66 67 @property 68 @abc.abstractmethod 69 def subscription(self) -> Subscription: 70 """Subscribed event types filter. 71 72 `subscription` is constant during module's lifetime. 73 74 """ 75 76 async def on_session_start(self, session_id: int): 77 """Called on start of a session, identified by session_id. 78 79 This method can be coroutine or regular function. 80 81 """ 82 83 async def on_session_stop(self, session_id: int): 84 """Called on stop of a session, identified by session_id. 85 86 This method can be coroutine or regular function. 87 88 """ 89 90 @abc.abstractmethod 91 async def process(self, 92 source: Source, 93 event: Event 94 ) -> Iterable[RegisterEvent] | None: 95 """Process new session event. 96 97 Provided event is matched by modules subscription filter. 98 99 Processing of session event can result in registration of 100 new register events. 101 102 Single module session process is always called sequentially. 103 104 This method can be coroutine or regular function. 105 106 """
Module ABC
67 @property 68 @abc.abstractmethod 69 def subscription(self) -> Subscription: 70 """Subscribed event types filter. 71 72 `subscription` is constant during module's lifetime. 73 74 """
Subscribed event types filter.
subscription
is constant during module's lifetime.
76 async def on_session_start(self, session_id: int): 77 """Called on start of a session, identified by session_id. 78 79 This method can be coroutine or regular function. 80 81 """
Called on start of a session, identified by session_id.
This method can be coroutine or regular function.
83 async def on_session_stop(self, session_id: int): 84 """Called on stop of a session, identified by session_id. 85 86 This method can be coroutine or regular function. 87 88 """
Called on stop of a session, identified by session_id.
This method can be coroutine or regular function.
90 @abc.abstractmethod 91 async def process(self, 92 source: Source, 93 event: Event 94 ) -> Iterable[RegisterEvent] | None: 95 """Process new session event. 96 97 Provided event is matched by modules subscription filter. 98 99 Processing of session event can result in registration of 100 new register events. 101 102 Single module session process is always called sequentially. 103 104 This method can be coroutine or regular function. 105 106 """
Process new session event.
Provided event is matched by modules subscription filter.
Processing of session event can result in registration of new register events.
Single module session process is always called sequentially.
This method can be coroutine or regular function.
118class ModuleInfo(typing.NamedTuple): 119 """Module info 120 121 Module is implemented as python module which is dynamically imported. 122 It is expected that this module contains `info` which is instance of 123 `ModuleInfo`. 124 125 If module defines JSON schema repository and JSON schema id, JSON schema 126 repository will be used for additional validation of module configuration 127 with JSON schema id. 128 129 """ 130 create: CreateModule 131 json_schema_id: str | None = None 132 json_schema_repo: json.SchemaRepository | None = None
Module info
Module is implemented as python module which is dynamically imported.
It is expected that this module contains info
which is instance of
ModuleInfo
.
If module defines JSON schema repository and JSON schema id, JSON schema repository will be used for additional validation of module configuration with JSON schema id.
Create new instance of ModuleInfo(create, json_schema_id, json_schema_repo)
135def import_module_info(py_module_str: str) -> ModuleInfo: 136 """Import module info""" 137 py_module = importlib.import_module(py_module_str) 138 info = py_module.info 139 140 if not isinstance(info, ModuleInfo): 141 raise Exception('invalid module implementation') 142 143 return info
Import module info
10class Subscription(abc.ABC): 11 """Subscription defined by query event types""" 12 13 @abc.abstractmethod 14 def __init__(self, query_types: Iterable[EventType]): 15 """Create subscription instance""" 16 17 @abc.abstractmethod 18 def get_query_types(self) -> Iterable[EventType]: 19 """Calculate sanitized query event types""" 20 21 @abc.abstractmethod 22 def matches(self, event_type: EventType) -> bool: 23 """Does `event_type` match subscription""" 24 25 @abc.abstractmethod 26 def union(self, *others: 'Subscription') -> 'Subscription': 27 """Create new subscription including event types from this and 28 other subscriptions.""" 29 30 @abc.abstractmethod 31 def intersection(self, *others: 'Subscription') -> 'Subscription': 32 """Create new subscription containing event types in common with 33 other subscriptions.""" 34 35 @abc.abstractmethod 36 def isdisjoint(self, other: 'Subscription') -> bool: 37 """Return ``True`` if this subscription has no event types in common 38 with other subscription."""
Subscription defined by query event types
13 @abc.abstractmethod 14 def __init__(self, query_types: Iterable[EventType]): 15 """Create subscription instance"""
Create subscription instance
17 @abc.abstractmethod 18 def get_query_types(self) -> Iterable[EventType]: 19 """Calculate sanitized query event types"""
Calculate sanitized query event types
21 @abc.abstractmethod 22 def matches(self, event_type: EventType) -> bool: 23 """Does `event_type` match subscription"""
Does event_type
match subscription
25 @abc.abstractmethod 26 def union(self, *others: 'Subscription') -> 'Subscription': 27 """Create new subscription including event types from this and 28 other subscriptions."""
Create new subscription including event types from this and other subscriptions.
30 @abc.abstractmethod 31 def intersection(self, *others: 'Subscription') -> 'Subscription': 32 """Create new subscription containing event types in common with 33 other subscriptions."""
Create new subscription containing event types in common with other subscriptions.
143def now() -> Timestamp: 144 """Create new timestamp representing current time""" 145 return timestamp_from_float(time.time())
Create new timestamp representing current time