hat.event.manager.common

 1from hat.event.common import *  # NOQA
 2
 3from hat import json
 4
 5from hat.event.common import (Event,
 6                              QueryResult,
 7                              EventPayloadBinary,
 8                              EventPayloadJson)
 9
10
11def event_to_json(event: Event) -> json.Data:
12    return {
13        'id': _event_id_to_json(event.id),
14        'type': list(event.type),
15        'timestamp': _timestamp_to_json(event.timestamp),
16        'source_timestamp': (_timestamp_to_json(event.source_timestamp)
17                             if event.source_timestamp else None),
18        'payload': (_event_payload_to_json(event.payload)
19                    if event.payload else None)}
20
21
22def query_result_to_json(result: QueryResult) -> json.Data:
23    return {'events': [event_to_json(event) for event in result.events],
24            'more_follows': result.more_follows}
25
26
27def _event_id_to_json(event_id):
28    return {'server': event_id.server,
29            'session': event_id.session,
30            'instance': event_id.instance}
31
32
33def _timestamp_to_json(timestamp):
34    return {'s': timestamp.s,
35            'us': timestamp.us}
36
37
38def _event_payload_to_json(payload):
39    if isinstance(payload, EventPayloadBinary):
40        return {'data_type': 'binary',
41                'binary_type': payload.type,
42                'data': bytes(payload.data).hex()}
43
44    if isinstance(payload, EventPayloadJson):
45        return {'data_type': 'json',
46                'data': payload.data}
47
48    raise ValueError('unuspported payload type')
def event_to_json( event: hat.event.common.Event) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
12def event_to_json(event: Event) -> json.Data:
13    return {
14        'id': _event_id_to_json(event.id),
15        'type': list(event.type),
16        'timestamp': _timestamp_to_json(event.timestamp),
17        'source_timestamp': (_timestamp_to_json(event.source_timestamp)
18                             if event.source_timestamp else None),
19        'payload': (_event_payload_to_json(event.payload)
20                    if event.payload else None)}
def query_result_to_json( result: hat.event.common.QueryResult) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
23def query_result_to_json(result: QueryResult) -> json.Data:
24    return {'events': [event_to_json(event) for event in result.events],
25            'more_follows': result.more_follows}