hat.event.backends.lmdb.manager.common
1from hat.event.backends.lmdb.common import * # NOQA 2 3from hat import json 4 5from hat.event.backends.lmdb.common import (EventId, 6 Timestamp, 7 Event, 8 EventPayloadJson, 9 EventPayloadBinary) 10 11 12def event_id_to_json(event_id: EventId) -> json.Data: 13 return {'server': event_id.server, 14 'session': event_id.session, 15 'instance': event_id.instance} 16 17 18def timestamp_to_json(timestamp: Timestamp) -> json.Data: 19 return {'s': timestamp.s, 20 'us': timestamp.us} 21 22 23def event_to_json(event: Event) -> json.Data: 24 return {'id': event_id_to_json(event.id), 25 'type': list(event.type), 26 'timestamp': timestamp_to_json(event.timestamp), 27 'source_timestamp': (timestamp_to_json(event.source_timestamp) 28 if event.source_timestamp else None), 29 'payload': _event_payload_to_json(event.payload)} 30 31 32def _event_payload_to_json(payload): 33 if payload is None: 34 return None 35 36 if isinstance(payload, EventPayloadBinary): 37 return {'type': 'BINARY', 38 'subtype': payload.type, 39 'data': bytes(payload.data).hex()} 40 41 if isinstance(payload, EventPayloadJson): 42 return {'type': 'JSON', 43 'data': payload.data} 44 45 raise ValueError('unsupported payload type')
def
event_id_to_json( event_id: hat.event.common.EventId) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
def
timestamp_to_json( timestamp: hat.event.common.Timestamp) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
def
event_to_json( event: hat.event.common.Event) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
24def event_to_json(event: Event) -> json.Data: 25 return {'id': event_id_to_json(event.id), 26 'type': list(event.type), 27 'timestamp': timestamp_to_json(event.timestamp), 28 'source_timestamp': (timestamp_to_json(event.source_timestamp) 29 if event.source_timestamp else None), 30 'payload': _event_payload_to_json(event.payload)}