hat.event.backends.lmdb.convert.v06
1from pathlib import Path 2import enum 3import struct 4import platform 5import typing 6 7import lmdb 8 9from hat import json 10from hat import sbs 11 12 13EventType: typing.TypeAlias = typing.Tuple[str, ...] 14 15 16EventPayloadType = enum.Enum('EventPayloadType', [ 17 'BINARY', 18 'JSON', 19 'SBS']) 20 21 22class EventId(typing.NamedTuple): 23 server: int 24 instance: int 25 26 27class SbsData(typing.NamedTuple): 28 module: str | None 29 type: str 30 data: bytes 31 32 33class EventPayload(typing.NamedTuple): 34 type: EventPayloadType 35 data: bytes | json.Data | SbsData 36 37 38class Timestamp(typing.NamedTuple): 39 s: int 40 us: int 41 42 43class Event(typing.NamedTuple): 44 event_id: EventId 45 event_type: EventType 46 timestamp: Timestamp 47 source_timestamp: Timestamp | None 48 payload: EventPayload | None 49 50 51def decode_uint(x: bytes) -> int: 52 return struct.unpack(">Q", x)[0] 53 54 55def decode_timestamp(x: bytes) -> Timestamp: 56 res = struct.unpack(">QI", x) 57 return Timestamp(res[0] - (1 << 63), res[1]) 58 59 60def decode_tuple_str(x: bytes) -> typing.Tuple[str, ...]: 61 return tuple(json.decode(str(x, encoding='utf-8'))) 62 63 64def decode_json(x: bytes) -> json.Data: 65 return json.decode(str(x, encoding='utf-8')) 66 67 68def decode_uint_timestamp_uint(x: bytes 69 ) -> tuple[int, Timestamp, int]: 70 res = struct.unpack(">QQIQ", x) 71 return res[0], Timestamp(res[1] - (1 << 63), res[2]), res[3] 72 73 74def decode_event(event_bytes: bytes) -> Event: 75 event_sbs = _sbs_repo.decode('HatEvent.Event', event_bytes) 76 return _event_from_sbs(event_sbs) 77 78 79def create_env(path: Path): 80 max_dbs = 5 81 max_db_size = (512 * 1024 * 1024 * 1024 82 if platform.architecture()[0] == '64bit' 83 else 1024 * 1024 * 1024) 84 return lmdb.Environment(str(path), 85 map_size=max_db_size, 86 subdir=False, 87 max_dbs=max_dbs) 88 89 90_sbs_repo = sbs.Repository(r""" 91module HatEvent 92 93MsgSubscribe = Array(EventType) 94 95MsgNotify = Array(Event) 96 97MsgRegisterReq = Array(RegisterEvent) 98 99MsgRegisterRes = Array(Choice { 100 event: Event 101 failure: None 102}) 103 104MsgQueryReq = QueryData 105 106MsgQueryRes = Array(Event) 107 108Timestamp = Record { 109 s: Integer 110 us: Integer 111} 112 113EventId = Record { 114 server: Integer 115 instance: Integer 116} 117 118Order = Choice { 119 descending: None 120 ascending: None 121} 122 123OrderBy = Choice { 124 timestamp: None 125 sourceTimestamp: None 126} 127 128EventType = Array(String) 129 130EventPayload = Choice { 131 binary: Bytes 132 json: String 133 sbs: Record { 134 module: Optional(String) 135 type: String 136 data: Bytes 137 } 138} 139 140Event = Record { 141 id: EventId 142 type: EventType 143 timestamp: Timestamp 144 sourceTimestamp: Optional(Timestamp) 145 payload: Optional(EventPayload) 146} 147 148RegisterEvent = Record { 149 type: EventType 150 sourceTimestamp: Optional(Timestamp) 151 payload: Optional(EventPayload) 152} 153 154QueryData = Record { 155 ids: Optional(Array(EventId)) 156 types: Optional(Array(EventType)) 157 tFrom: Optional(Timestamp) 158 tTo: Optional(Timestamp) 159 sourceTFrom: Optional(Timestamp) 160 sourceTTo: Optional(Timestamp) 161 payload: Optional(EventPayload) 162 order: Order 163 orderBy: OrderBy 164 uniqueType: Boolean 165 maxResults: Optional(Integer) 166} 167""") 168 169 170def _event_from_sbs(data: sbs.Data) -> Event: 171 return Event( 172 event_id=_event_id_from_sbs(data['id']), 173 event_type=tuple(data['type']), 174 timestamp=_timestamp_from_sbs(data['timestamp']), 175 source_timestamp=_optional_from_sbs(data['sourceTimestamp'], 176 _timestamp_from_sbs), 177 payload=_optional_from_sbs(data['payload'], _event_payload_from_sbs)) 178 179 180def _event_payload_from_sbs(data: sbs.Data) -> EventPayload: 181 data_type, data_data = data 182 183 if data_type == 'binary': 184 return EventPayload(type=EventPayloadType.BINARY, 185 data=data_data) 186 187 if data_type == 'json': 188 return EventPayload(type=EventPayloadType.JSON, 189 data=json.decode(data_data)) 190 191 if data_type == 'sbs': 192 return EventPayload(type=EventPayloadType.SBS, 193 data=_sbs_data_from_sbs(data_data)) 194 195 raise ValueError('unsupported payload type') 196 197 198def _timestamp_from_sbs(data: sbs.Data) -> Timestamp: 199 return Timestamp(s=data['s'], us=data['us']) 200 201 202def _event_id_from_sbs(data: sbs.Data) -> EventId: 203 return EventId(server=data['server'], 204 instance=data['instance']) 205 206 207def _sbs_data_from_sbs(data: sbs.Data) -> SbsData: 208 return SbsData(module=_optional_from_sbs(data['module']), 209 type=data['type'], 210 data=data['data']) 211 212 213def _optional_from_sbs(data: sbs.Data, 214 fn=lambda i: i 215 ) -> typing.Any | None: 216 return fn(data[1]) if data[0] == 'value' else None
EventType: TypeAlias =
Tuple[str, ...]
class
EventPayloadType(enum.Enum):
An enumeration.
BINARY =
<EventPayloadType.BINARY: 1>
JSON =
<EventPayloadType.JSON: 2>
SBS =
<EventPayloadType.SBS: 3>
class
EventId(typing.NamedTuple):
EventId(server, instance)
class
SbsData(typing.NamedTuple):
SbsData(module, type, data)
class
EventPayload(typing.NamedTuple):
34class EventPayload(typing.NamedTuple): 35 type: EventPayloadType 36 data: bytes | json.Data | SbsData
EventPayload(type, data)
EventPayload( type: EventPayloadType, data: Union[bytes, NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], SbsData])
Create new instance of EventPayload(type, data)
data: Union[bytes, NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], SbsData]
Alias for field number 1
class
Timestamp(typing.NamedTuple):
Timestamp(s, us)
class
Event(typing.NamedTuple):
44class Event(typing.NamedTuple): 45 event_id: EventId 46 event_type: EventType 47 timestamp: Timestamp 48 source_timestamp: Timestamp | None 49 payload: EventPayload | None
Event(event_id, event_type, timestamp, source_timestamp, payload)
Event( event_id: EventId, event_type: Tuple[str, ...], timestamp: Timestamp, source_timestamp: Timestamp | None, payload: EventPayload | None)
Create new instance of Event(event_id, event_type, timestamp, source_timestamp, payload)
def
decode_uint(x: bytes) -> int:
def
decode_tuple_str(x: bytes) -> Tuple[str, ...]:
def
decode_json( x: bytes) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
def
create_env(path: pathlib.Path):