hat.event.backends.lmdb.manager.common

 1from hat.event.backends.lmdb.common import *  # NOQA
 2
 3from hat import json
 4
 5from hat.event.backends.lmdb.common import (EventId,
 6                                            Timestamp,
 7                                            Event,
 8                                            EventPayloadJson,
 9                                            EventPayloadBinary)
10
11
12def event_id_to_json(event_id: EventId) -> json.Data:
13    return {'server': event_id.server,
14            'session': event_id.session,
15            'instance': event_id.instance}
16
17
18def timestamp_to_json(timestamp: Timestamp) -> json.Data:
19    return {'s': timestamp.s,
20            'us': timestamp.us}
21
22
23def event_to_json(event: Event) -> json.Data:
24    return {'id': event_id_to_json(event.id),
25            'type': list(event.type),
26            'timestamp': timestamp_to_json(event.timestamp),
27            'source_timestamp': (timestamp_to_json(event.source_timestamp)
28                                 if event.source_timestamp else None),
29            'payload': _event_payload_to_json(event.payload)}
30
31
32def _event_payload_to_json(payload):
33    if payload is None:
34        return None
35
36    if isinstance(payload, EventPayloadBinary):
37        return {'type': 'BINARY',
38                'subtype': payload.type,
39                'data': bytes(payload.data).hex()}
40
41    if isinstance(payload, EventPayloadJson):
42        return {'type': 'JSON',
43                'data': payload.data}
44
45    raise ValueError('unsupported payload type')
def event_id_to_json( event_id: hat.event.common.EventId) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
13def event_id_to_json(event_id: EventId) -> json.Data:
14    return {'server': event_id.server,
15            'session': event_id.session,
16            'instance': event_id.instance}
def timestamp_to_json( timestamp: hat.event.common.Timestamp) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
19def timestamp_to_json(timestamp: Timestamp) -> json.Data:
20    return {'s': timestamp.s,
21            'us': timestamp.us}
def event_to_json( event: hat.event.common.Event) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
24def event_to_json(event: Event) -> json.Data:
25    return {'id': event_id_to_json(event.id),
26            'type': list(event.type),
27            'timestamp': timestamp_to_json(event.timestamp),
28            'source_timestamp': (timestamp_to_json(event.source_timestamp)
29                                 if event.source_timestamp else None),
30            'payload': _event_payload_to_json(event.payload)}