hat.event.common
Common functionality shared between clients and event server
1"""Common functionality shared between clients and event server""" 2 3from hat.event.common.data import (json_schema_repo, 4 sbs_repo, 5 EventType, 6 Order, 7 OrderBy, 8 EventPayloadType, 9 EventId, 10 EventPayload, 11 SbsData, 12 Event, 13 RegisterEvent, 14 QueryData, 15 event_to_sbs, 16 event_from_sbs, 17 register_event_to_sbs, 18 register_event_from_sbs, 19 query_to_sbs, 20 query_from_sbs, 21 event_payload_to_sbs, 22 event_payload_from_sbs) 23from hat.event.common.timestamp import (Timestamp, 24 now, 25 timestamp_to_bytes, 26 timestamp_from_bytes, 27 timestamp_to_float, 28 timestamp_from_float, 29 timestamp_to_datetime, 30 timestamp_from_datetime, 31 timestamp_to_sbs, 32 timestamp_from_sbs) 33from hat.event.common.subscription import (matches_query_type, 34 Subscription) 35 36 37__all__ = ['json_schema_repo', 38 'sbs_repo', 39 'EventType', 40 'Order', 41 'OrderBy', 42 'EventPayloadType', 43 'EventId', 44 'EventPayload', 45 'SbsData', 46 'Event', 47 'RegisterEvent', 48 'QueryData', 49 'event_to_sbs', 50 'event_from_sbs', 51 'register_event_to_sbs', 52 'register_event_from_sbs', 53 'query_to_sbs', 54 'query_from_sbs', 55 'event_payload_to_sbs', 56 'event_payload_from_sbs', 57 'Timestamp', 58 'now', 59 'timestamp_to_bytes', 60 'timestamp_from_bytes', 61 'timestamp_to_float', 62 'timestamp_from_float', 63 'timestamp_to_datetime', 64 'timestamp_from_datetime', 65 'timestamp_to_sbs', 66 'timestamp_from_sbs', 67 'matches_query_type', 68 'Subscription']
An enumeration.
Inherited Members
- enum.Enum
- name
- value
An enumeration.
Inherited Members
- enum.Enum
- name
- value
An enumeration.
Inherited Members
- enum.Enum
- name
- value
47class EventId(typing.NamedTuple): 48 server: int 49 """server identifier""" 50 session: int 51 """session identifier""" 52 instance: int 53 """event instance identifier"""
EventId(server, session, instance)
Create new instance of EventId(server, session, instance)
Inherited Members
- builtins.tuple
- index
- count
56class EventPayload(typing.NamedTuple): 57 type: EventPayloadType 58 data: typing.Union[bytes, json.Data, 'SbsData']
EventPayload(type, data)
Create new instance of EventPayload(type, data)
Inherited Members
- builtins.tuple
- index
- count
61class SbsData(typing.NamedTuple): 62 module: typing.Optional[str] 63 """SBS module name""" 64 type: str 65 """SBS type name""" 66 data: bytes
SbsData(module, type, data)
Create new instance of SbsData(module, type, data)
Inherited Members
- builtins.tuple
- index
- count
69class Event(typing.NamedTuple): 70 event_id: EventId 71 event_type: EventType 72 timestamp: 'Timestamp' 73 source_timestamp: typing.Optional['Timestamp'] 74 payload: typing.Optional[EventPayload]
Event(event_id, event_type, timestamp, source_timestamp, payload)
Create new instance of Event(event_id, event_type, timestamp, source_timestamp, payload)
Inherited Members
- builtins.tuple
- index
- count
77class RegisterEvent(typing.NamedTuple): 78 event_type: EventType 79 source_timestamp: typing.Optional['Timestamp'] 80 payload: typing.Optional[EventPayload]
RegisterEvent(event_type, source_timestamp, payload)
Create new instance of RegisterEvent(event_type, source_timestamp, payload)
Inherited Members
- builtins.tuple
- index
- count
83class QueryData(typing.NamedTuple): 84 server_id: typing.Optional[int] = None 85 event_ids: typing.Optional[typing.List[EventId]] = None 86 event_types: typing.Optional[typing.List[EventType]] = None 87 t_from: typing.Optional['Timestamp'] = None 88 t_to: typing.Optional['Timestamp'] = None 89 source_t_from: typing.Optional['Timestamp'] = None 90 source_t_to: typing.Optional['Timestamp'] = None 91 payload: typing.Optional[EventPayload] = None 92 order: Order = Order.DESCENDING 93 order_by: OrderBy = OrderBy.TIMESTAMP 94 unique_type: bool = False 95 max_results: typing.Optional[int] = None
QueryData(server_id, event_ids, event_types, t_from, t_to, source_t_from, source_t_to, payload, order, order_by, unique_type, max_results)
Create new instance of QueryData(server_id, event_ids, event_types, t_from, t_to, source_t_from, source_t_to, payload, order, order_by, unique_type, max_results)
Inherited Members
- builtins.tuple
- index
- count
98def event_to_sbs(event: Event) -> sbs.Data: 99 """Convert Event to SBS data""" 100 return { 101 'id': _event_id_to_sbs(event.event_id), 102 'type': list(event.event_type), 103 'timestamp': timestamp_to_sbs(event.timestamp), 104 'sourceTimestamp': _optional_to_sbs(event.source_timestamp, 105 timestamp_to_sbs), 106 'payload': _optional_to_sbs(event.payload, event_payload_to_sbs)}
Convert Event to SBS data
109def event_from_sbs(data: sbs.Data) -> Event: 110 """Create new Event based on SBS data""" 111 return Event( 112 event_id=_event_id_from_sbs(data['id']), 113 event_type=tuple(data['type']), 114 timestamp=timestamp_from_sbs(data['timestamp']), 115 source_timestamp=_optional_from_sbs(data['sourceTimestamp'], 116 timestamp_from_sbs), 117 payload=_optional_from_sbs(data['payload'], event_payload_from_sbs))
Create new Event based on SBS data
120def register_event_to_sbs(event: RegisterEvent) -> sbs.Data: 121 """Convert RegisterEvent to SBS data""" 122 return { 123 'type': list(event.event_type), 124 'sourceTimestamp': _optional_to_sbs(event.source_timestamp, 125 timestamp_to_sbs), 126 'payload': _optional_to_sbs(event.payload, event_payload_to_sbs)}
Convert RegisterEvent to SBS data
129def register_event_from_sbs(data: sbs.Data) -> RegisterEvent: 130 """Create new RegisterEvent based on SBS data""" 131 return RegisterEvent( 132 event_type=tuple(data['type']), 133 source_timestamp=_optional_from_sbs(data['sourceTimestamp'], 134 timestamp_from_sbs), 135 payload=_optional_from_sbs(data['payload'], event_payload_from_sbs))
Create new RegisterEvent based on SBS data
138def query_to_sbs(query: QueryData) -> sbs.Data: 139 """Convert QueryData to SBS data""" 140 return { 141 'serverId': _optional_to_sbs(query.server_id), 142 'ids': _optional_to_sbs(query.event_ids, lambda ids: [ 143 _event_id_to_sbs(i) for i in ids]), 144 'types': _optional_to_sbs(query.event_types, lambda ets: [ 145 list(et) for et in ets]), 146 'tFrom': _optional_to_sbs(query.t_from, timestamp_to_sbs), 147 'tTo': _optional_to_sbs(query.t_to, timestamp_to_sbs), 148 'sourceTFrom': _optional_to_sbs(query.source_t_from, timestamp_to_sbs), 149 'sourceTTo': _optional_to_sbs(query.source_t_to, timestamp_to_sbs), 150 'payload': _optional_to_sbs(query.payload, event_payload_to_sbs), 151 'order': {Order.DESCENDING: ('descending', None), 152 Order.ASCENDING: ('ascending', None)}[query.order], 153 'orderBy': {OrderBy.TIMESTAMP: ('timestamp', None), 154 OrderBy.SOURCE_TIMESTAMP: ('sourceTimestamp', None) 155 }[query.order_by], 156 'uniqueType': query.unique_type, 157 'maxResults': _optional_to_sbs(query.max_results)}
Convert QueryData to SBS data
160def query_from_sbs(data: sbs.Data) -> QueryData: 161 """Create new QueryData based on SBS data""" 162 return QueryData( 163 server_id=_optional_from_sbs(data['serverId']), 164 event_ids=_optional_from_sbs(data['ids'], lambda ids: [ 165 _event_id_from_sbs(i) for i in ids]), 166 event_types=_optional_from_sbs(data['types'], lambda ets: [ 167 tuple(et) for et in ets]), 168 t_from=_optional_from_sbs(data['tFrom'], timestamp_from_sbs), 169 t_to=_optional_from_sbs(data['tTo'], timestamp_from_sbs), 170 source_t_from=_optional_from_sbs(data['sourceTFrom'], 171 timestamp_from_sbs), 172 source_t_to=_optional_from_sbs(data['sourceTTo'], timestamp_from_sbs), 173 payload=_optional_from_sbs(data['payload'], event_payload_from_sbs), 174 order={'descending': Order.DESCENDING, 175 'ascending': Order.ASCENDING}[data['order'][0]], 176 order_by={'timestamp': OrderBy.TIMESTAMP, 177 'sourceTimestamp': OrderBy.SOURCE_TIMESTAMP 178 }[data['orderBy'][0]], 179 unique_type=data['uniqueType'], 180 max_results=_optional_from_sbs(data['maxResults']))
Create new QueryData based on SBS data
183def event_payload_to_sbs(payload: EventPayload) -> sbs.Data: 184 """Convert EventPayload to SBS data""" 185 if payload.type == EventPayloadType.BINARY: 186 return 'binary', payload.data 187 188 if payload.type == EventPayloadType.JSON: 189 return 'json', json.encode(payload.data) 190 191 if payload.type == EventPayloadType.SBS: 192 return 'sbs', _sbs_data_to_sbs(payload.data) 193 194 raise ValueError('unsupported payload type')
Convert EventPayload to SBS data
197def event_payload_from_sbs(data: sbs.Data) -> EventPayload: 198 """Create new EventPayload based on SBS data""" 199 data_type, data_data = data 200 201 if data_type == 'binary': 202 return EventPayload(type=EventPayloadType.BINARY, 203 data=data_data) 204 205 if data_type == 'json': 206 return EventPayload(type=EventPayloadType.JSON, 207 data=json.decode(data_data)) 208 209 if data_type == 'sbs': 210 return EventPayload(type=EventPayloadType.SBS, 211 data=_sbs_data_from_sbs(data_data)) 212 213 raise ValueError('unsupported payload type')
Create new EventPayload based on SBS data
9class Timestamp(typing.NamedTuple): 10 s: int 11 """seconds since 1970-01-01 (can be negative)""" 12 us: int 13 """microseconds added to timestamp seconds in range [0, 1e6)""" 14 15 def __lt__(self, other): 16 if not isinstance(other, Timestamp): 17 return NotImplemented 18 return self.s * 1000000 + self.us < other.s * 1000000 + other.us 19 20 def __gt__(self, other): 21 if not isinstance(other, Timestamp): 22 return NotImplemented 23 return self.s * 1000000 + self.us > other.s * 1000000 + other.us 24 25 def __eq__(self, other): 26 if not isinstance(other, Timestamp): 27 return NotImplemented 28 return self.s * 1000000 + self.us == other.s * 1000000 + other.us 29 30 def __ne__(self, other): 31 return not self == other 32 33 def __le__(self, other): 34 return self < other or self == other 35 36 def __ge__(self, other): 37 return self > other or self == other 38 39 def __hash__(self): 40 return self.s * 1000000 + self.us 41 42 def add(self, s: float) -> 'Timestamp': 43 """Create new timestamp by adding seconds to existing timestamp""" 44 us = self.us + round((s - int(s)) * 1e6) 45 s = self.s + int(s) 46 return Timestamp(s=s + us // int(1e6), 47 us=us % int(1e6))
Timestamp(s, us)
42 def add(self, s: float) -> 'Timestamp': 43 """Create new timestamp by adding seconds to existing timestamp""" 44 us = self.us + round((s - int(s)) * 1e6) 45 s = self.s + int(s) 46 return Timestamp(s=s + us // int(1e6), 47 us=us % int(1e6))
Create new timestamp by adding seconds to existing timestamp
Inherited Members
- builtins.tuple
- index
- count
50def now() -> Timestamp: 51 """Create new timestamp representing current time""" 52 return timestamp_from_datetime( 53 datetime.datetime.now(datetime.timezone.utc))
Create new timestamp representing current time
56def timestamp_to_bytes(t: Timestamp) -> bytes: 57 """Convert timestamp to 12 byte representation 58 59 Bytes [0, 8] are big endian unsigned `Timestamp.s` + 2^63 and 60 bytes [9, 12] are big endian unsigned `Timestamp.us`. 61 62 """ 63 return struct.pack(">QI", t.s + (1 << 63), t.us)
Convert timestamp to 12 byte representation
Bytes [0, 8] are big endian unsigned Timestamp.s
+ 2^63 and
bytes [9, 12] are big endian unsigned Timestamp.us
.
66def timestamp_from_bytes(data: bytes) -> Timestamp: 67 """Create new timestamp from 12 byte representation 68 69 Bytes representation is same as defined for `timestamp_to_bytes` function. 70 71 """ 72 s, us = struct.unpack(">QI", data) 73 return Timestamp(s - (1 << 63), us)
Create new timestamp from 12 byte representation
Bytes representation is same as defined for timestamp_to_bytes
function.
76def timestamp_to_float(t: Timestamp) -> float: 77 """Convert timestamp to floating number of seconds since 1970-01-01 UTC 78 79 For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`. 80 81 """ 82 return t.s + t.us * 1E-6
Convert timestamp to floating number of seconds since 1970-01-01 UTC
For precise serialization see timestamp_to_bytes
/timestamp_from_bytes
.
85def timestamp_from_float(ts: float) -> Timestamp: 86 """Create timestamp from floating number of seconds since 1970-01-01 UTC 87 88 For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`. 89 90 """ 91 s = int(ts) 92 if ts < 0: 93 s = s - 1 94 us = round((ts - s) * 1E6) 95 if us == 1000000: 96 return Timestamp(s + 1, 0) 97 else: 98 return Timestamp(s, us)
Create timestamp from floating number of seconds since 1970-01-01 UTC
For precise serialization see timestamp_to_bytes
/timestamp_from_bytes
.
101def timestamp_to_datetime(t: Timestamp) -> datetime.datetime: 102 """Convert timestamp to datetime (representing utc time) 103 104 For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`. 105 106 """ 107 try: 108 dt_from_s = datetime.datetime.fromtimestamp(t.s, datetime.timezone.utc) 109 except OSError: 110 dt_from_s = ( 111 datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + 112 datetime.timedelta(seconds=t.s)) 113 return datetime.datetime( 114 year=dt_from_s.year, 115 month=dt_from_s.month, 116 day=dt_from_s.day, 117 hour=dt_from_s.hour, 118 minute=dt_from_s.minute, 119 second=dt_from_s.second, 120 microsecond=t.us, 121 tzinfo=datetime.timezone.utc)
Convert timestamp to datetime (representing utc time)
For precise serialization see timestamp_to_bytes
/timestamp_from_bytes
.
124def timestamp_from_datetime(dt: datetime.datetime) -> Timestamp: 125 """Create new timestamp from datetime 126 127 If `tzinfo` is not set, it is assumed that provided datetime represents 128 utc time. 129 130 For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`. 131 132 """ 133 if not dt.tzinfo: 134 dt = dt.replace(tzinfo=datetime.timezone.utc) 135 s = int(dt.timestamp()) 136 if dt.timestamp() < 0: 137 s = s - 1 138 return Timestamp(s=s, us=dt.microsecond)
Create new timestamp from datetime
If tzinfo
is not set, it is assumed that provided datetime represents
utc time.
For precise serialization see timestamp_to_bytes
/timestamp_from_bytes
.
141def timestamp_to_sbs(t: Timestamp) -> sbs.Data: 142 """Convert timestamp to SBS data""" 143 return {'s': t.s, 'us': t.us}
Convert timestamp to SBS data
146def timestamp_from_sbs(data: sbs.Data) -> Timestamp: 147 """Create new timestamp from SBS data""" 148 return Timestamp(s=data['s'], us=data['us'])
Create new timestamp from SBS data
19def matches_query_type(event_type: EventType, 20 query_type: EventType 21 ) -> bool: 22 """Determine if event type matches query type 23 24 Event type is tested if it matches query type according to the following 25 rules: 26 27 * Matching is performed on subtypes in increasing order. 28 * Event type is a match only if all its subtypes are matched by 29 corresponding query subtypes. 30 * Matching is finished when all query subtypes are exhausted. 31 * Query subtype '?' matches exactly one event subtype of any value. 32 The subtype must exist. 33 * Query subtype '*' matches 0 or more event subtypes of any value. It 34 must be the last query subtype. 35 * All other values of query subtype match exactly one event subtype 36 of the same value. 37 * Query type without subtypes is matched only by event type with no 38 subtypes. 39 40 As a consequence of aforementioned matching rules, event subtypes '*' and 41 '?' cannot be directly matched and it is advisable not to use them in event 42 types. 43 44 """ 45 is_variable = bool(query_type and query_type[-1] == '*') 46 if is_variable: 47 query_type = query_type[:-1] 48 49 if len(event_type) < len(query_type): 50 return False 51 52 if len(event_type) > len(query_type) and not is_variable: 53 return False 54 55 for i, j in zip(event_type, query_type): 56 if j != '?' and i != j: 57 return False 58 59 return True
Determine if event type matches query type
Event type is tested if it matches query type according to the following rules:
* Matching is performed on subtypes in increasing order.
* Event type is a match only if all its subtypes are matched by
corresponding query subtypes.
* Matching is finished when all query subtypes are exhausted.
* Query subtype '?' matches exactly one event subtype of any value.
The subtype must exist.
* Query subtype '*' matches 0 or more event subtypes of any value. It
must be the last query subtype.
* All other values of query subtype match exactly one event subtype
of the same value.
* Query type without subtypes is matched only by event type with no
subtypes.
As a consequence of aforementioned matching rules, event subtypes '*' and '?' cannot be directly matched and it is advisable not to use them in event types.
Subscription defined by query event types