hat.event.adminer
Event adminer communication protocol
1"""Event adminer communication protocol""" 2 3from hat.event.adminer.client import (AdminerError, 4 connect, 5 Client) 6from hat.event.adminer.server import (GetLogConfCb, 7 SetLogConfCb, 8 listen, 9 Server) 10 11 12__all__ = ['AdminerError', 13 'connect', 14 'Client', 15 'GetLogConfCb', 16 'SetLogConfCb', 17 'listen', 18 'Server']
class
AdminerError(builtins.Exception):
Errors reported by Event Adminer Server
21async def connect(addr: tcp.Address, 22 **kwargs 23 ) -> 'Client': 24 """Connect to Event Adminer Server 25 26 Additional arguments are passed to `hat.chatter.connect` coroutine. 27 28 """ 29 client = Client() 30 client._loop = asyncio.get_running_loop() 31 client._conv_msg_type_futures = {} 32 33 client._conn = await chatter.connect(addr, **kwargs) 34 35 try: 36 client.async_group.spawn(client._receive_loop) 37 38 except BaseException: 39 await aio.uncancellable(client.async_close()) 40 raise 41 42 return client
Connect to Event Adminer Server
Additional arguments are passed to hat.chatter.connect
coroutine.
class
Client(hat.aio.group.Resource):
45class Client(aio.Resource): 46 """Event adminer client 47 48 For creating new client see `connect` coroutine. 49 50 """ 51 52 @property 53 def async_group(self) -> aio.Group: 54 """Async group""" 55 return self._conn.async_group 56 57 async def get_log_conf(self) -> json.Data: 58 """Get logging configuration""" 59 data = await self._send( 60 req_msg_type='HatEventAdminer.MsgGetLogConfReq', 61 req_msg_data=None, 62 res_msg_type='HatEventAdminer.MsgGetLogConfRes') 63 64 return json.decode(data) 65 66 async def set_log_conf(self, conf: json.Data): 67 """Set logging configuration""" 68 await self._send(req_msg_type='HatEventAdminer.MsgSetLogConfReq', 69 req_msg_data=json.encode(conf), 70 res_msg_type='HatEventAdminer.MsgSetLogConfRes') 71 72 async def _send(self, req_msg_type, req_msg_data, res_msg_type): 73 conv = await common.send_msg( 74 conn=self._conn, 75 msg_type=req_msg_type, 76 msg_data=req_msg_data, 77 last=False) 78 79 if not self.is_open: 80 raise ConnectionError() 81 82 future = self._loop.create_future() 83 self._conv_msg_type_futures[conv] = res_msg_type, future 84 85 try: 86 return await future 87 88 finally: 89 self._conv_msg_type_futures.pop(conv, None) 90 91 async def _receive_loop(self): 92 mlog.debug("starting receive loop") 93 try: 94 while True: 95 mlog.debug("waiting for incoming message") 96 msg, msg_type, msg_data = await common.receive_msg(self._conn) 97 98 mlog.debug(f"received message {msg_type}") 99 100 res_msg_type, future = self._conv_msg_type_futures.get( 101 msg.conv, (None, None)) 102 if not future or future.done(): 103 return 104 105 if res_msg_type != msg_type: 106 raise Exception('invalid response message type') 107 108 if msg_data[0] == 'error': 109 future.set_exception(AdminerError(msg_data[1])) 110 111 future.set_result(msg_data[1]) 112 113 except ConnectionError: 114 pass 115 116 except Exception as e: 117 mlog.error("read loop error: %s", e, exc_info=e) 118 119 finally: 120 mlog.debug("stopping receive loop") 121 self.close() 122 123 for _, future in self._conv_msg_type_futures.values(): 124 if not future.done(): 125 future.set_exception(ConnectionError())
Event adminer client
For creating new client see connect
coroutine.
async_group: hat.aio.group.Group
52 @property 53 def async_group(self) -> aio.Group: 54 """Async group""" 55 return self._conn.async_group
Async group
async def
get_log_conf( self) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
57 async def get_log_conf(self) -> json.Data: 58 """Get logging configuration""" 59 data = await self._send( 60 req_msg_type='HatEventAdminer.MsgGetLogConfReq', 61 req_msg_data=None, 62 res_msg_type='HatEventAdminer.MsgGetLogConfRes') 63 64 return json.decode(data)
Get logging configuration
async def
set_log_conf( self, conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]):
66 async def set_log_conf(self, conf: json.Data): 67 """Set logging configuration""" 68 await self._send(req_msg_type='HatEventAdminer.MsgSetLogConfReq', 69 req_msg_data=json.encode(conf), 70 res_msg_type='HatEventAdminer.MsgSetLogConfRes')
Set logging configuration
GetLogConfCb =
typing.Callable[[NoneType], typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')], collections.abc.Awaitable[typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]]]]
SetLogConfCb =
typing.Callable[[typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]], None | collections.abc.Awaitable[None]]
async def
listen( addr: hat.drivers.tcp.Address, *, get_log_conf_cb: Optional[Callable[[NoneType], Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Awaitable[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]]]] = None, set_log_conf_cb: Optional[Callable[[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], None | Awaitable[None]]] = None, **kwargs) -> Server:
23async def listen(addr: tcp.Address, 24 *, 25 get_log_conf_cb: GetLogConfCb | None = None, 26 set_log_conf_cb: SetLogConfCb | None = None, 27 **kwargs 28 ) -> 'Server': 29 """Create listening Event Adminer Server instance""" 30 server = Server() 31 server._get_log_conf_cb = get_log_conf_cb 32 server._set_log_conf_cb = set_log_conf_cb 33 34 server._srv = await chatter.listen(server._connection_loop, addr, **kwargs) 35 mlog.debug("listening on %s", addr) 36 37 return server
Create listening Event Adminer Server instance
class
Server(hat.aio.group.Resource):
40class Server(aio.Resource): 41 42 @property 43 def async_group(self) -> aio.Group: 44 """Async group""" 45 return self._srv.async_group 46 47 async def _connection_loop(self, conn): 48 mlog.debug("starting connection loop") 49 try: 50 while True: 51 mlog.debug("waiting for incomming messages") 52 msg, msg_type, msg_data = await common.receive_msg(conn) 53 54 mlog.debug(f"received message {msg_type}") 55 56 if msg_type == 'HatEventAdminer.MsgGetLogConfReq': 57 await self._process_msg_get_log_conf( 58 conn=conn, 59 conv=msg.conv, 60 req_msg_data=msg_data) 61 62 elif msg_type == 'HatEventAdminer.MsgSetLogConfReq': 63 await self._process_msg_set_log_conf( 64 conn=conn, 65 conv=msg.conv, 66 req_msg_data=msg_data) 67 68 else: 69 raise Exception('unsupported message type') 70 71 except ConnectionError: 72 pass 73 74 except Exception as e: 75 mlog.error("on connection error: %s", e, exc_info=e) 76 77 finally: 78 mlog.debug("stopping connection loop") 79 conn.close() 80 81 async def _process_msg_get_log_conf(self, conn, conv, req_msg_data): 82 try: 83 if not self._get_log_conf_cb: 84 raise Exception('not implemented') 85 86 result = await aio.call(self._get_log_conf_cb) 87 88 res_msg_data = 'success', json.encode(result) 89 90 except Exception as e: 91 res_msg_data = 'error', str(e) 92 93 await common.send_msg( 94 conn, 'HatEventAdminer.MsgGetLogConfRes', res_msg_data, 95 conv=conv) 96 97 async def _process_msg_set_log_conf(self, conn, conv, req_msg_data): 98 try: 99 if not self._set_log_conf_cb: 100 raise Exception('not implemented') 101 102 conf = json.decode(req_msg_data) 103 await aio.call(self._set_log_conf_cb, conf) 104 105 res_msg_data = 'success', None 106 107 except Exception as e: 108 res_msg_data = 'error', str(e) 109 110 await common.send_msg( 111 conn, 'HatEventAdminer.MsgSetLogConfRes', res_msg_data, 112 conv=conv)
Resource with lifetime control based on Group
.