hat.event.backends.lmdb.convert.v07
1from pathlib import Path 2import enum 3import itertools 4import platform 5import struct 6import typing 7 8import lmdb 9 10from hat import json 11from hat import sbs 12 13 14OrderBy = enum.Enum('OrderBy', [ 15 'TIMESTAMP', 16 'SOURCE_TIMESTAMP']) 17 18 19EventType: typing.TypeAlias = typing.Tuple[str, ...] 20 21 22class EventId(typing.NamedTuple): 23 server: int 24 session: int 25 instance: int 26 27 28EventPayloadType = enum.Enum('EventPayloadType', [ 29 'BINARY', 30 'JSON', 31 'SBS']) 32 33 34class SbsData(typing.NamedTuple): 35 module: str | None 36 type: str 37 data: bytes 38 39 40class EventPayload(typing.NamedTuple): 41 type: EventPayloadType 42 data: bytes | json.Data | SbsData 43 44 45class Timestamp(typing.NamedTuple): 46 s: int 47 us: int 48 49 def add(self, s: float) -> 'Timestamp': 50 us = self.us + round((s - int(s)) * 1e6) 51 s = self.s + int(s) 52 return Timestamp(s=s + us // int(1e6), 53 us=us % int(1e6)) 54 55 56class Event(typing.NamedTuple): 57 event_id: EventId 58 event_type: EventType 59 timestamp: Timestamp 60 source_timestamp: Timestamp | None 61 payload: EventPayload | None 62 63 64class DbType(enum.Enum): 65 SYSTEM = 0 66 LATEST_DATA = 1 67 LATEST_TYPE = 2 68 ORDERED_DATA = 3 69 ORDERED_PARTITION = 4 70 ORDERED_COUNT = 5 71 REF = 6 72 73 74ServerId: typing.TypeAlias = int 75EventTypeRef: typing.TypeAlias = int 76PartitionId: typing.TypeAlias = int 77 78 79class LatestEventRef(typing.NamedTuple): 80 key: 'LatestDataDbKey' 81 82 83class OrderedEventRef(typing.NamedTuple): 84 key: 'OrderedDataDbKey' 85 86 87EventRef: typing.TypeAlias = LatestEventRef | OrderedEventRef 88 89SystemDbKey = ServerId 90SystemDbValue = tuple[EventId, Timestamp] 91 92LatestDataDbKey = EventTypeRef 93LatestDataDbValue = Event 94 95LatestTypeDbKey = EventTypeRef 96LatestTypeDbValue = EventType 97 98OrderedDataDbKey = tuple[PartitionId, Timestamp, EventId] 99OrderedDataDbValue = Event 100 101OrderedPartitionDbKey = PartitionId 102OrderedPartitionDbValue = json.Data 103 104OrderedCountDbKey = PartitionId 105OrderedCountDbValue = int 106 107RefDbKey = EventId 108RefDbValue = set[EventRef] 109 110 111def encode_system_db_key(key: SystemDbKey) -> bytes: 112 return _encode_uint(key) 113 114 115def decode_system_db_key(key_bytes: bytes) -> SystemDbKey: 116 key, _ = _decode_uint(key_bytes) 117 return key 118 119 120def encode_system_db_value(value: SystemDbValue) -> bytes: 121 event_id, timestamp = value 122 return _encode_event_id(event_id) + _encode_timestamp(timestamp) 123 124 125def decode_system_db_value(value_bytes: bytes) -> SystemDbValue: 126 event_id, rest = _decode_event_id(value_bytes) 127 timestamp, _ = _decode_timestamp(rest) 128 return event_id, timestamp 129 130 131def encode_latest_data_db_key(key: LatestDataDbKey) -> bytes: 132 return _encode_uint(key) 133 134 135def decode_latest_data_db_key(key_bytes: bytes) -> LatestDataDbKey: 136 key, _ = _decode_uint(key_bytes) 137 return key 138 139 140def encode_latest_data_db_value(value: LatestDataDbValue) -> bytes: 141 return _encode_event(value) 142 143 144def decode_latest_data_db_value(value_bytes: bytes) -> LatestDataDbValue: 145 return _decode_event(value_bytes) 146 147 148def encode_latest_type_db_key(key: LatestTypeDbKey) -> bytes: 149 return _encode_uint(key) 150 151 152def decode_latest_type_db_key(key_bytes: bytes) -> LatestTypeDbKey: 153 key, _ = _decode_uint(key_bytes) 154 return key 155 156 157def encode_latest_type_db_value(value: OrderedDataDbKey) -> bytes: 158 return _encode_json(list(value)) 159 160 161def decode_latest_type_db_value(value_bytes: bytes) -> OrderedDataDbKey: 162 return tuple(_decode_json(value_bytes)) 163 164 165def encode_ordered_data_db_key(key: OrderedDataDbKey) -> bytes: 166 partition_id, timestamp, event_id = key 167 return (_encode_uint(partition_id) + 168 _encode_timestamp(timestamp) + 169 _encode_event_id(event_id)) 170 171 172def decode_ordered_data_db_key(key_bytes: bytes) -> OrderedDataDbKey: 173 partition_id, rest = _decode_uint(key_bytes) 174 timestamp, rest = _decode_timestamp(rest) 175 event_id, _ = _decode_event_id(rest) 176 return partition_id, timestamp, event_id 177 178 179def encode_ordered_data_db_value(value: OrderedDataDbValue) -> bytes: 180 return _encode_event(value) 181 182 183def decode_ordered_data_db_value(value_bytes: bytes) -> OrderedDataDbValue: 184 return _decode_event(value_bytes) 185 186 187def encode_ordered_partition_db_key(key: OrderedPartitionDbKey) -> bytes: 188 return _encode_uint(key) 189 190 191def decode_ordered_partition_db_key(key_bytes: bytes) -> OrderedPartitionDbKey: 192 key, _ = _decode_uint(key_bytes) 193 return key 194 195 196def encode_ordered_partition_db_value(value: OrderedPartitionDbValue) -> bytes: 197 return _encode_json(value) 198 199 200def decode_ordered_partition_db_value(value_bytes: bytes 201 ) -> OrderedPartitionDbValue: 202 return _decode_json(value_bytes) 203 204 205def encode_ordered_count_db_key(key: OrderedCountDbKey) -> bytes: 206 return _encode_uint(key) 207 208 209def decode_ordered_count_db_key(key_bytes: bytes) -> OrderedCountDbKey: 210 key, _ = _decode_uint(key_bytes) 211 return key 212 213 214def encode_ordered_count_db_value(value: OrderedCountDbValue) -> bytes: 215 return _encode_uint(value) 216 217 218def decode_ordered_count_db_value(value_bytes: bytes) -> OrderedCountDbValue: 219 value, _ = _decode_uint(value_bytes) 220 return value 221 222 223def encode_ref_db_key(key: RefDbKey) -> bytes: 224 return _encode_event_id(key) 225 226 227def decode_ref_db_key(key_bytes: bytes) -> RefDbKey: 228 event_id, _ = _decode_event_id(key_bytes) 229 return event_id 230 231 232def encode_ref_db_value(value: RefDbValue) -> bytes: 233 return bytes(itertools.chain.from_iterable( 234 _encode_event_ref(ref) for ref in value)) 235 236 237def decode_ref_db_value(value_bytes: bytes) -> RefDbValue: 238 refs = set() 239 while value_bytes: 240 ref, value_bytes = _decode_event_ref(value_bytes) 241 refs.add(ref) 242 return refs 243 244 245def open_db(env: lmdb.Environment, 246 db_type: DbType 247 ) -> lmdb._Database: 248 return env.open_db(db_type.name.encode('utf-8')) 249 250 251def create_env(path: Path) -> lmdb.Environment: 252 max_dbs = len(DbType) 253 max_db_size = (512 * 1024 * 1024 * 1024 254 if platform.architecture()[0] == '64bit' 255 else 1024 * 1024 * 1024) 256 return lmdb.Environment(str(path), 257 map_size=max_db_size, 258 subdir=False, 259 max_dbs=max_dbs) 260 261 262_sbs_repo = sbs.Repository(r""" 263module HatEventer 264 265MsgSubscribe = Array(EventType) 266 267MsgNotify = Array(Event) 268 269MsgRegisterReq = Array(RegisterEvent) 270 271MsgRegisterRes = Array(Choice { 272 event: Event 273 failure: None 274}) 275 276MsgQueryReq = QueryData 277 278MsgQueryRes = Array(Event) 279 280Timestamp = Record { 281 s: Integer 282 us: Integer 283} 284 285EventId = Record { 286 server: Integer 287 session: Integer 288 instance: Integer 289} 290 291Order = Choice { 292 descending: None 293 ascending: None 294} 295 296OrderBy = Choice { 297 timestamp: None 298 sourceTimestamp: None 299} 300 301EventType = Array(String) 302 303EventPayload = Choice { 304 binary: Bytes 305 json: String 306 sbs: Record { 307 module: Optional(String) 308 type: String 309 data: Bytes 310 } 311} 312 313Event = Record { 314 id: EventId 315 type: EventType 316 timestamp: Timestamp 317 sourceTimestamp: Optional(Timestamp) 318 payload: Optional(EventPayload) 319} 320 321RegisterEvent = Record { 322 type: EventType 323 sourceTimestamp: Optional(Timestamp) 324 payload: Optional(EventPayload) 325} 326 327QueryData = Record { 328 serverId: Optional(Integer) 329 ids: Optional(Array(EventId)) 330 types: Optional(Array(EventType)) 331 tFrom: Optional(Timestamp) 332 tTo: Optional(Timestamp) 333 sourceTFrom: Optional(Timestamp) 334 sourceTTo: Optional(Timestamp) 335 payload: Optional(EventPayload) 336 order: Order 337 orderBy: OrderBy 338 uniqueType: Boolean 339 maxResults: Optional(Integer) 340} 341""") 342 343 344def _event_to_sbs(event): 345 return { 346 'id': _event_id_to_sbs(event.event_id), 347 'type': list(event.event_type), 348 'timestamp': _timestamp_to_sbs(event.timestamp), 349 'sourceTimestamp': _optional_to_sbs(event.source_timestamp, 350 _timestamp_to_sbs), 351 'payload': _optional_to_sbs(event.payload, _event_payload_to_sbs)} 352 353 354def _event_from_sbs(data): 355 return Event( 356 event_id=_event_id_from_sbs(data['id']), 357 event_type=tuple(data['type']), 358 timestamp=_timestamp_from_sbs(data['timestamp']), 359 source_timestamp=_optional_from_sbs(data['sourceTimestamp'], 360 _timestamp_from_sbs), 361 payload=_optional_from_sbs(data['payload'], _event_payload_from_sbs)) 362 363 364def _event_id_to_sbs(event_id): 365 return {'server': event_id.server, 366 'session': event_id.session, 367 'instance': event_id.instance} 368 369 370def _event_id_from_sbs(data): 371 return EventId(server=data['server'], 372 session=data['session'], 373 instance=data['instance']) 374 375 376def _timestamp_to_sbs(t): 377 return {'s': t.s, 'us': t.us} 378 379 380def _timestamp_from_sbs(data): 381 return Timestamp(s=data['s'], us=data['us']) 382 383 384def _event_payload_to_sbs(payload): 385 if payload.type == EventPayloadType.BINARY: 386 return 'binary', payload.data 387 388 if payload.type == EventPayloadType.JSON: 389 return 'json', json.encode(payload.data) 390 391 if payload.type == EventPayloadType.SBS: 392 return 'sbs', _sbs_data_to_sbs(payload.data) 393 394 raise ValueError('unsupported payload type') 395 396 397def _event_payload_from_sbs(data): 398 data_type, data_data = data 399 400 if data_type == 'binary': 401 return EventPayload(type=EventPayloadType.BINARY, 402 data=data_data) 403 404 if data_type == 'json': 405 return EventPayload(type=EventPayloadType.JSON, 406 data=json.decode(data_data)) 407 408 if data_type == 'sbs': 409 return EventPayload(type=EventPayloadType.SBS, 410 data=_sbs_data_from_sbs(data_data)) 411 412 raise ValueError('unsupported payload type') 413 414 415def _sbs_data_to_sbs(data): 416 return {'module': _optional_to_sbs(data.module), 417 'type': data.type, 418 'data': data.data} 419 420 421def _sbs_data_from_sbs(data): 422 return SbsData(module=_optional_from_sbs(data['module']), 423 type=data['type'], 424 data=data['data']) 425 426 427def _optional_to_sbs(value, fn=lambda i: i): 428 return ('value', fn(value)) if value is not None else ('none', None) 429 430 431def _optional_from_sbs(data, fn=lambda i: i): 432 return fn(data[1]) if data[0] == 'value' else None 433 434 435def _encode_uint(value): 436 return struct.pack(">Q", value) 437 438 439def _decode_uint(value_bytes): 440 return struct.unpack(">Q", value_bytes[:8])[0], value_bytes[8:] 441 442 443def _encode_event_id(event_id): 444 return struct.pack(">QQQ", event_id.server, event_id.session, 445 event_id.instance) 446 447 448def _decode_event_id(event_id_bytes): 449 server_id, session_id, instance_id = struct.unpack(">QQQ", 450 event_id_bytes[:24]) 451 event_id = EventId(server=server_id, 452 session=session_id, 453 instance=instance_id) 454 return event_id, event_id_bytes[24:] 455 456 457def _encode_timestamp(timestamp): 458 return struct.pack(">QI", timestamp.s + (1 << 63), timestamp.us) 459 460 461def _decode_timestamp(timestamp_bytes): 462 s, us = struct.unpack(">QI", timestamp_bytes[:12]) 463 return Timestamp(s - (1 << 63), us), timestamp_bytes[12:] 464 465 466def _encode_event(event): 467 event_sbs = _event_to_sbs(event) 468 return _sbs_repo.encode('HatEventer.Event', event_sbs) 469 470 471def _decode_event(event_bytes): 472 event_sbs = _sbs_repo.decode('HatEventer.Event', event_bytes) 473 return _event_from_sbs(event_sbs) 474 475 476def _encode_json(data): 477 return json.encode(data).encode('utf-8') 478 479 480def _decode_json(data_bytes): 481 return json.decode(str(data_bytes, encoding='utf-8')) 482 483 484def _encode_event_ref(ref): 485 if isinstance(ref, LatestEventRef): 486 yield DbType.LATEST_DATA.value 487 yield from encode_latest_data_db_key(ref.key) 488 489 elif isinstance(ref, OrderedEventRef): 490 yield DbType.ORDERED_DATA.value 491 yield from encode_ordered_data_db_key(ref.key) 492 493 else: 494 raise ValueError('unsupported event reference') 495 496 497def _decode_event_ref(ref_bytes): 498 db_type, rest = DbType(ref_bytes[0]), ref_bytes[1:] 499 500 if db_type == DbType.LATEST_DATA: 501 ref = LatestEventRef(decode_latest_data_db_key(rest[:8])) 502 return ref, rest[8:] 503 504 if db_type == DbType.ORDERED_DATA: 505 ref = OrderedEventRef(decode_ordered_data_db_key(rest[:44])) 506 return ref, rest[44:] 507 508 raise ValueError('unsupported database type')
class
OrderBy(enum.Enum):
An enumeration.
TIMESTAMP =
<OrderBy.TIMESTAMP: 1>
SOURCE_TIMESTAMP =
<OrderBy.SOURCE_TIMESTAMP: 2>
EventType: TypeAlias =
Tuple[str, ...]
class
EventId(typing.NamedTuple):
EventId(server, session, instance)
class
EventPayloadType(enum.Enum):
An enumeration.
BINARY =
<EventPayloadType.BINARY: 1>
JSON =
<EventPayloadType.JSON: 2>
SBS =
<EventPayloadType.SBS: 3>
class
SbsData(typing.NamedTuple):
SbsData(module, type, data)
class
EventPayload(typing.NamedTuple):
41class EventPayload(typing.NamedTuple): 42 type: EventPayloadType 43 data: bytes | json.Data | SbsData
EventPayload(type, data)
EventPayload( type: EventPayloadType, data: Union[bytes, NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], SbsData])
Create new instance of EventPayload(type, data)
data: Union[bytes, NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], SbsData]
Alias for field number 1
class
Timestamp(typing.NamedTuple):
46class Timestamp(typing.NamedTuple): 47 s: int 48 us: int 49 50 def add(self, s: float) -> 'Timestamp': 51 us = self.us + round((s - int(s)) * 1e6) 52 s = self.s + int(s) 53 return Timestamp(s=s + us // int(1e6), 54 us=us % int(1e6))
Timestamp(s, us)
class
Event(typing.NamedTuple):
57class Event(typing.NamedTuple): 58 event_id: EventId 59 event_type: EventType 60 timestamp: Timestamp 61 source_timestamp: Timestamp | None 62 payload: EventPayload | None
Event(event_id, event_type, timestamp, source_timestamp, payload)
Event( event_id: EventId, event_type: Tuple[str, ...], timestamp: Timestamp, source_timestamp: Timestamp | None, payload: EventPayload | None)
Create new instance of Event(event_id, event_type, timestamp, source_timestamp, payload)
class
DbType(enum.Enum):
65class DbType(enum.Enum): 66 SYSTEM = 0 67 LATEST_DATA = 1 68 LATEST_TYPE = 2 69 ORDERED_DATA = 3 70 ORDERED_PARTITION = 4 71 ORDERED_COUNT = 5 72 REF = 6
An enumeration.
SYSTEM =
<DbType.SYSTEM: 0>
LATEST_DATA =
<DbType.LATEST_DATA: 1>
LATEST_TYPE =
<DbType.LATEST_TYPE: 2>
ORDERED_DATA =
<DbType.ORDERED_DATA: 3>
ORDERED_PARTITION =
<DbType.ORDERED_PARTITION: 4>
ORDERED_COUNT =
<DbType.ORDERED_COUNT: 5>
REF =
<DbType.REF: 6>
ServerId: TypeAlias =
int
EventTypeRef: TypeAlias =
int
PartitionId: TypeAlias =
int
class
LatestEventRef(typing.NamedTuple):
LatestEventRef(key,)
class
OrderedEventRef(typing.NamedTuple):
OrderedEventRef(key,)
SystemDbKey =
<class 'int'>
LatestDataDbKey =
<class 'int'>
LatestDataDbValue =
<class 'Event'>
LatestTypeDbKey =
<class 'int'>
LatestTypeDbValue =
typing.Tuple[str, ...]
OrderedDataDbValue =
<class 'Event'>
OrderedPartitionDbKey =
<class 'int'>
OrderedPartitionDbValue =
typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]
OrderedCountDbKey =
<class 'int'>
OrderedCountDbValue =
<class 'int'>
RefDbKey =
<class 'EventId'>
RefDbValue =
set[LatestEventRef | OrderedEventRef]
def
encode_system_db_key(key: int) -> bytes:
def
decode_system_db_key(key_bytes: bytes) -> int:
def
encode_latest_data_db_key(key: int) -> bytes:
def
decode_latest_data_db_key(key_bytes: bytes) -> int:
def
encode_latest_type_db_key(key: int) -> bytes:
def
decode_latest_type_db_key(key_bytes: bytes) -> int:
def
encode_ordered_partition_db_key(key: int) -> bytes:
def
decode_ordered_partition_db_key(key_bytes: bytes) -> int:
def
encode_ordered_partition_db_value( value: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]) -> bytes:
def
decode_ordered_partition_db_value( value_bytes: bytes) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
def
encode_ordered_count_db_key(key: int) -> bytes:
def
decode_ordered_count_db_key(key_bytes: bytes) -> int:
def
encode_ordered_count_db_value(value: int) -> bytes:
def
decode_ordered_count_db_value(value_bytes: bytes) -> int:
def
create_env(path: pathlib.Path) -> Environment: