hat.event.common
Common functionality shared between clients and event server
1"""Common functionality shared between clients and event server""" 2 3from hat.event.common.data import (json_schema_repo, 4 sbs_repo, 5 EventType, 6 Order, 7 OrderBy, 8 EventPayloadType, 9 EventId, 10 EventPayload, 11 SbsData, 12 Event, 13 RegisterEvent, 14 QueryData, 15 event_to_sbs, 16 event_from_sbs, 17 register_event_to_sbs, 18 register_event_from_sbs, 19 query_to_sbs, 20 query_from_sbs, 21 event_payload_to_sbs, 22 event_payload_from_sbs) 23from hat.event.common.timestamp import (Timestamp, 24 now, 25 timestamp_to_bytes, 26 timestamp_from_bytes, 27 timestamp_to_float, 28 timestamp_from_float, 29 timestamp_to_datetime, 30 timestamp_from_datetime, 31 timestamp_to_sbs, 32 timestamp_from_sbs) 33from hat.event.common.subscription import (matches_query_type, 34 Subscription) 35 36 37__all__ = ['json_schema_repo', 38 'sbs_repo', 39 'EventType', 40 'Order', 41 'OrderBy', 42 'EventPayloadType', 43 'EventId', 44 'EventPayload', 45 'SbsData', 46 'Event', 47 'RegisterEvent', 48 'QueryData', 49 'event_to_sbs', 50 'event_from_sbs', 51 'register_event_to_sbs', 52 'register_event_from_sbs', 53 'query_to_sbs', 54 'query_from_sbs', 55 'event_payload_to_sbs', 56 'event_payload_from_sbs', 57 'Timestamp', 58 'now', 59 'timestamp_to_bytes', 60 'timestamp_from_bytes', 61 'timestamp_to_float', 62 'timestamp_from_float', 63 'timestamp_to_datetime', 64 'timestamp_from_datetime', 65 'timestamp_to_sbs', 66 'timestamp_from_sbs', 67 'matches_query_type', 68 'Subscription']
An enumeration.
Inherited Members
- enum.Enum
- name
- value
An enumeration.
Inherited Members
- enum.Enum
- name
- value
An enumeration.
Inherited Members
- enum.Enum
- name
- value
55class EventId(typing.NamedTuple): 56 server: int 57 """server identifier""" 58 session: int 59 """session identifier""" 60 instance: int 61 """event instance identifier"""
EventId(server, session, instance)
Create new instance of EventId(server, session, instance)
Inherited Members
- builtins.tuple
- index
- count
64class EventPayload(typing.NamedTuple): 65 type: EventPayloadType 66 data: typing.Union[bytes, json.Data, 'SbsData']
EventPayload(type, data)
Create new instance of EventPayload(type, data)
Alias for field number 1
Inherited Members
- builtins.tuple
- index
- count
69class SbsData(typing.NamedTuple): 70 module: str | None 71 """SBS module name""" 72 type: str 73 """SBS type name""" 74 data: bytes
SbsData(module, type, data)
Create new instance of SbsData(module, type, data)
Inherited Members
- builtins.tuple
- index
- count
77class Event(typing.NamedTuple): 78 event_id: EventId 79 event_type: EventType 80 timestamp: Timestamp 81 source_timestamp: Timestamp | None 82 payload: EventPayload | None
Event(event_id, event_type, timestamp, source_timestamp, payload)
Create new instance of Event(event_id, event_type, timestamp, source_timestamp, payload)
Inherited Members
- builtins.tuple
- index
- count
85class RegisterEvent(typing.NamedTuple): 86 event_type: EventType 87 source_timestamp: Timestamp | None 88 payload: EventPayload | None
RegisterEvent(event_type, source_timestamp, payload)
Create new instance of RegisterEvent(event_type, source_timestamp, payload)
Inherited Members
- builtins.tuple
- index
- count
91class QueryData(typing.NamedTuple): 92 server_id: int | None = None 93 event_ids: list[EventId] | None = None 94 event_types: list[EventType] | None = None 95 t_from: Timestamp | None = None 96 t_to: Timestamp | None = None 97 source_t_from: Timestamp | None = None 98 source_t_to: Timestamp | None = None 99 payload: EventPayload | None = None 100 order: Order = Order.DESCENDING 101 order_by: OrderBy = OrderBy.TIMESTAMP 102 unique_type: bool = False 103 max_results: int | None = None
QueryData(server_id, event_ids, event_types, t_from, t_to, source_t_from, source_t_to, payload, order, order_by, unique_type, max_results)
Create new instance of QueryData(server_id, event_ids, event_types, t_from, t_to, source_t_from, source_t_to, payload, order, order_by, unique_type, max_results)
Inherited Members
- builtins.tuple
- index
- count
106def event_to_sbs(event: Event) -> sbs.Data: 107 """Convert Event to SBS data""" 108 return { 109 'id': _event_id_to_sbs(event.event_id), 110 'type': list(event.event_type), 111 'timestamp': timestamp_to_sbs(event.timestamp), 112 'sourceTimestamp': _optional_to_sbs(event.source_timestamp, 113 timestamp_to_sbs), 114 'payload': _optional_to_sbs(event.payload, event_payload_to_sbs)}
Convert Event to SBS data
117def event_from_sbs(data: sbs.Data) -> Event: 118 """Create new Event based on SBS data""" 119 return Event( 120 event_id=_event_id_from_sbs(data['id']), 121 event_type=tuple(data['type']), 122 timestamp=timestamp_from_sbs(data['timestamp']), 123 source_timestamp=_optional_from_sbs(data['sourceTimestamp'], 124 timestamp_from_sbs), 125 payload=_optional_from_sbs(data['payload'], event_payload_from_sbs))
Create new Event based on SBS data
128def register_event_to_sbs(event: RegisterEvent) -> sbs.Data: 129 """Convert RegisterEvent to SBS data""" 130 return { 131 'type': list(event.event_type), 132 'sourceTimestamp': _optional_to_sbs(event.source_timestamp, 133 timestamp_to_sbs), 134 'payload': _optional_to_sbs(event.payload, event_payload_to_sbs)}
Convert RegisterEvent to SBS data
137def register_event_from_sbs(data: sbs.Data) -> RegisterEvent: 138 """Create new RegisterEvent based on SBS data""" 139 return RegisterEvent( 140 event_type=tuple(data['type']), 141 source_timestamp=_optional_from_sbs(data['sourceTimestamp'], 142 timestamp_from_sbs), 143 payload=_optional_from_sbs(data['payload'], event_payload_from_sbs))
Create new RegisterEvent based on SBS data
146def query_to_sbs(query: QueryData) -> sbs.Data: 147 """Convert QueryData to SBS data""" 148 return { 149 'serverId': _optional_to_sbs(query.server_id), 150 'ids': _optional_to_sbs(query.event_ids, lambda ids: [ 151 _event_id_to_sbs(i) for i in ids]), 152 'types': _optional_to_sbs(query.event_types, lambda ets: [ 153 list(et) for et in ets]), 154 'tFrom': _optional_to_sbs(query.t_from, timestamp_to_sbs), 155 'tTo': _optional_to_sbs(query.t_to, timestamp_to_sbs), 156 'sourceTFrom': _optional_to_sbs(query.source_t_from, timestamp_to_sbs), 157 'sourceTTo': _optional_to_sbs(query.source_t_to, timestamp_to_sbs), 158 'payload': _optional_to_sbs(query.payload, event_payload_to_sbs), 159 'order': {Order.DESCENDING: ('descending', None), 160 Order.ASCENDING: ('ascending', None)}[query.order], 161 'orderBy': {OrderBy.TIMESTAMP: ('timestamp', None), 162 OrderBy.SOURCE_TIMESTAMP: ('sourceTimestamp', None) 163 }[query.order_by], 164 'uniqueType': query.unique_type, 165 'maxResults': _optional_to_sbs(query.max_results)}
Convert QueryData to SBS data
168def query_from_sbs(data: sbs.Data) -> QueryData: 169 """Create new QueryData based on SBS data""" 170 return QueryData( 171 server_id=_optional_from_sbs(data['serverId']), 172 event_ids=_optional_from_sbs(data['ids'], lambda ids: [ 173 _event_id_from_sbs(i) for i in ids]), 174 event_types=_optional_from_sbs(data['types'], lambda ets: [ 175 tuple(et) for et in ets]), 176 t_from=_optional_from_sbs(data['tFrom'], timestamp_from_sbs), 177 t_to=_optional_from_sbs(data['tTo'], timestamp_from_sbs), 178 source_t_from=_optional_from_sbs(data['sourceTFrom'], 179 timestamp_from_sbs), 180 source_t_to=_optional_from_sbs(data['sourceTTo'], timestamp_from_sbs), 181 payload=_optional_from_sbs(data['payload'], event_payload_from_sbs), 182 order={'descending': Order.DESCENDING, 183 'ascending': Order.ASCENDING}[data['order'][0]], 184 order_by={'timestamp': OrderBy.TIMESTAMP, 185 'sourceTimestamp': OrderBy.SOURCE_TIMESTAMP 186 }[data['orderBy'][0]], 187 unique_type=data['uniqueType'], 188 max_results=_optional_from_sbs(data['maxResults']))
Create new QueryData based on SBS data
191def event_payload_to_sbs(payload: EventPayload) -> sbs.Data: 192 """Convert EventPayload to SBS data""" 193 if payload.type == EventPayloadType.BINARY: 194 return 'binary', payload.data 195 196 if payload.type == EventPayloadType.JSON: 197 return 'json', json.encode(payload.data) 198 199 if payload.type == EventPayloadType.SBS: 200 return 'sbs', _sbs_data_to_sbs(payload.data) 201 202 raise ValueError('unsupported payload type')
Convert EventPayload to SBS data
205def event_payload_from_sbs(data: sbs.Data) -> EventPayload: 206 """Create new EventPayload based on SBS data""" 207 data_type, data_data = data 208 209 if data_type == 'binary': 210 return EventPayload(type=EventPayloadType.BINARY, 211 data=data_data) 212 213 if data_type == 'json': 214 return EventPayload(type=EventPayloadType.JSON, 215 data=json.decode(data_data)) 216 217 if data_type == 'sbs': 218 return EventPayload(type=EventPayloadType.SBS, 219 data=_sbs_data_from_sbs(data_data)) 220 221 raise ValueError('unsupported payload type')
Create new EventPayload based on SBS data
9class Timestamp(typing.NamedTuple): 10 s: int 11 """seconds since 1970-01-01 (can be negative)""" 12 us: int 13 """microseconds added to timestamp seconds in range [0, 1e6)""" 14 15 def __lt__(self, other): 16 if not isinstance(other, Timestamp): 17 return NotImplemented 18 return self.s * 1000000 + self.us < other.s * 1000000 + other.us 19 20 def __gt__(self, other): 21 if not isinstance(other, Timestamp): 22 return NotImplemented 23 return self.s * 1000000 + self.us > other.s * 1000000 + other.us 24 25 def __eq__(self, other): 26 if not isinstance(other, Timestamp): 27 return NotImplemented 28 return self.s * 1000000 + self.us == other.s * 1000000 + other.us 29 30 def __ne__(self, other): 31 return not self == other 32 33 def __le__(self, other): 34 return self < other or self == other 35 36 def __ge__(self, other): 37 return self > other or self == other 38 39 def __hash__(self): 40 return self.s * 1000000 + self.us 41 42 def add(self, s: float) -> 'Timestamp': 43 """Create new timestamp by adding seconds to existing timestamp""" 44 us = self.us + round((s - int(s)) * 1e6) 45 s = self.s + int(s) 46 return Timestamp(s=s + us // int(1e6), 47 us=us % int(1e6))
Timestamp(s, us)
42 def add(self, s: float) -> 'Timestamp': 43 """Create new timestamp by adding seconds to existing timestamp""" 44 us = self.us + round((s - int(s)) * 1e6) 45 s = self.s + int(s) 46 return Timestamp(s=s + us // int(1e6), 47 us=us % int(1e6))
Create new timestamp by adding seconds to existing timestamp
Inherited Members
- builtins.tuple
- index
- count
50def now() -> Timestamp: 51 """Create new timestamp representing current time""" 52 return timestamp_from_datetime( 53 datetime.datetime.now(datetime.timezone.utc))
Create new timestamp representing current time
56def timestamp_to_bytes(t: Timestamp) -> bytes: 57 """Convert timestamp to 12 byte representation 58 59 Bytes [0, 8] are big endian unsigned `Timestamp.s` + 2^63 and 60 bytes [9, 12] are big endian unsigned `Timestamp.us`. 61 62 """ 63 return struct.pack(">QI", t.s + (1 << 63), t.us)
Convert timestamp to 12 byte representation
Bytes [0, 8] are big endian unsigned Timestamp.s
+ 2^63 and
bytes [9, 12] are big endian unsigned Timestamp.us
.
66def timestamp_from_bytes(data: bytes) -> Timestamp: 67 """Create new timestamp from 12 byte representation 68 69 Bytes representation is same as defined for `timestamp_to_bytes` function. 70 71 """ 72 s, us = struct.unpack(">QI", data) 73 return Timestamp(s - (1 << 63), us)
Create new timestamp from 12 byte representation
Bytes representation is same as defined for timestamp_to_bytes
function.
76def timestamp_to_float(t: Timestamp) -> float: 77 """Convert timestamp to floating number of seconds since 1970-01-01 UTC 78 79 For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`. 80 81 """ 82 return t.s + t.us * 1E-6
Convert timestamp to floating number of seconds since 1970-01-01 UTC
For precise serialization see timestamp_to_bytes
/timestamp_from_bytes
.
85def timestamp_from_float(ts: float) -> Timestamp: 86 """Create timestamp from floating number of seconds since 1970-01-01 UTC 87 88 For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`. 89 90 """ 91 s = int(ts) 92 if ts < 0: 93 s = s - 1 94 us = round((ts - s) * 1E6) 95 if us == 1000000: 96 return Timestamp(s + 1, 0) 97 else: 98 return Timestamp(s, us)
Create timestamp from floating number of seconds since 1970-01-01 UTC
For precise serialization see timestamp_to_bytes
/timestamp_from_bytes
.
101def timestamp_to_datetime(t: Timestamp) -> datetime.datetime: 102 """Convert timestamp to datetime (representing utc time) 103 104 For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`. 105 106 """ 107 try: 108 dt_from_s = datetime.datetime.fromtimestamp(t.s, datetime.timezone.utc) 109 except OSError: 110 dt_from_s = ( 111 datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + 112 datetime.timedelta(seconds=t.s)) 113 return datetime.datetime( 114 year=dt_from_s.year, 115 month=dt_from_s.month, 116 day=dt_from_s.day, 117 hour=dt_from_s.hour, 118 minute=dt_from_s.minute, 119 second=dt_from_s.second, 120 microsecond=t.us, 121 tzinfo=datetime.timezone.utc)
Convert timestamp to datetime (representing utc time)
For precise serialization see timestamp_to_bytes
/timestamp_from_bytes
.
124def timestamp_from_datetime(dt: datetime.datetime) -> Timestamp: 125 """Create new timestamp from datetime 126 127 If `tzinfo` is not set, it is assumed that provided datetime represents 128 utc time. 129 130 For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`. 131 132 """ 133 if not dt.tzinfo: 134 dt = dt.replace(tzinfo=datetime.timezone.utc) 135 s = int(dt.timestamp()) 136 if dt.timestamp() < 0: 137 s = s - 1 138 return Timestamp(s=s, us=dt.microsecond)
Create new timestamp from datetime
If tzinfo
is not set, it is assumed that provided datetime represents
utc time.
For precise serialization see timestamp_to_bytes
/timestamp_from_bytes
.
141def timestamp_to_sbs(t: Timestamp) -> sbs.Data: 142 """Convert timestamp to SBS data""" 143 return {'s': t.s, 'us': t.us}
Convert timestamp to SBS data
146def timestamp_from_sbs(data: sbs.Data) -> Timestamp: 147 """Create new timestamp from SBS data""" 148 return Timestamp(s=data['s'], us=data['us'])
Create new timestamp from SBS data
9def matches_query_type(event_type: EventType, 10 query_type: EventType 11 ) -> bool: 12 """Determine if event type matches query type 13 14 Event type is tested if it matches query type according to the following 15 rules: 16 17 * Matching is performed on subtypes in increasing order. 18 * Event type is a match only if all its subtypes are matched by 19 corresponding query subtypes. 20 * Matching is finished when all query subtypes are exhausted. 21 * Query subtype '?' matches exactly one event subtype of any value. 22 The subtype must exist. 23 * Query subtype '*' matches 0 or more event subtypes of any value. It 24 must be the last query subtype. 25 * All other values of query subtype match exactly one event subtype 26 of the same value. 27 * Query type without subtypes is matched only by event type with no 28 subtypes. 29 30 As a consequence of aforementioned matching rules, event subtypes '*' and 31 '?' cannot be directly matched and it is advisable not to use them in event 32 types. 33 34 """ 35 is_variable = bool(query_type and query_type[-1] == '*') 36 if is_variable: 37 query_type = query_type[:-1] 38 39 if len(event_type) < len(query_type): 40 return False 41 42 if len(event_type) > len(query_type) and not is_variable: 43 return False 44 45 for i, j in zip(event_type, query_type): 46 if j != '?' and i != j: 47 return False 48 49 return True
Determine if event type matches query type
Event type is tested if it matches query type according to the following rules:
* Matching is performed on subtypes in increasing order.
* Event type is a match only if all its subtypes are matched by
corresponding query subtypes.
* Matching is finished when all query subtypes are exhausted.
* Query subtype '?' matches exactly one event subtype of any value.
The subtype must exist.
* Query subtype '*' matches 0 or more event subtypes of any value. It
must be the last query subtype.
* All other values of query subtype match exactly one event subtype
of the same value.
* Query type without subtypes is matched only by event type with no
subtypes.
As a consequence of aforementioned matching rules, event subtypes '*' and '?' cannot be directly matched and it is advisable not to use them in event types.