hat.event.manager.common
1from hat.event.common import * # NOQA 2 3from hat import json 4 5from hat.event.common import (Event, 6 QueryResult, 7 EventPayloadBinary, 8 EventPayloadJson) 9 10 11def event_to_json(event: Event) -> json.Data: 12 return { 13 'id': _event_id_to_json(event.id), 14 'type': list(event.type), 15 'timestamp': _timestamp_to_json(event.timestamp), 16 'source_timestamp': (_timestamp_to_json(event.source_timestamp) 17 if event.source_timestamp else None), 18 'payload': (_event_payload_to_json(event.payload) 19 if event.payload else None)} 20 21 22def query_result_to_json(result: QueryResult) -> json.Data: 23 return {'events': [event_to_json(event) for event in result.events], 24 'more_follows': result.more_follows} 25 26 27def _event_id_to_json(event_id): 28 return {'server': event_id.server, 29 'session': event_id.session, 30 'instance': event_id.instance} 31 32 33def _timestamp_to_json(timestamp): 34 return {'s': timestamp.s, 35 'us': timestamp.us} 36 37 38def _event_payload_to_json(payload): 39 if isinstance(payload, EventPayloadBinary): 40 return {'data_type': 'binary', 41 'binary_type': payload.type, 42 'data': bytes(payload.data).hex()} 43 44 if isinstance(payload, EventPayloadJson): 45 return {'data_type': 'json', 46 'data': payload.data} 47 48 raise ValueError('unuspported payload type')
def
event_to_json( event: hat.event.common.Event) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
12def event_to_json(event: Event) -> json.Data: 13 return { 14 'id': _event_id_to_json(event.id), 15 'type': list(event.type), 16 'timestamp': _timestamp_to_json(event.timestamp), 17 'source_timestamp': (_timestamp_to_json(event.source_timestamp) 18 if event.source_timestamp else None), 19 'payload': (_event_payload_to_json(event.payload) 20 if event.payload else None)}
def
query_result_to_json( result: hat.event.common.QueryResult) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]: