hat.event.adminer

Event adminer communication protocol

 1"""Event adminer communication protocol"""
 2
 3from hat.event.adminer.client import (AdminerError,
 4                                      connect,
 5                                      Client)
 6from hat.event.adminer.server import (GetLogConfCb,
 7                                      SetLogConfCb,
 8                                      listen,
 9                                      Server)
10
11
12__all__ = ['AdminerError',
13           'connect',
14           'Client',
15           'GetLogConfCb',
16           'SetLogConfCb',
17           'listen',
18           'Server']
class AdminerError(builtins.Exception):
17class AdminerError(Exception):
18    """Errors reported by Event Adminer Server"""

Errors reported by Event Adminer Server

async def connect( addr: hat.drivers.tcp.Address, **kwargs) -> Client:
21async def connect(addr: tcp.Address,
22                  **kwargs
23                  ) -> 'Client':
24    """Connect to Event Adminer Server
25
26    Additional arguments are passed to `hat.chatter.connect` coroutine.
27
28    """
29    client = Client()
30    client._loop = asyncio.get_running_loop()
31    client._conv_msg_type_futures = {}
32
33    client._conn = await chatter.connect(addr, **kwargs)
34
35    try:
36        client.async_group.spawn(client._receive_loop)
37
38    except BaseException:
39        await aio.uncancellable(client.async_close())
40        raise
41
42    return client

Connect to Event Adminer Server

Additional arguments are passed to hat.chatter.connect coroutine.

class Client(hat.aio.group.Resource):
 45class Client(aio.Resource):
 46    """Event adminer client
 47
 48    For creating new client see `connect` coroutine.
 49
 50    """
 51
 52    @property
 53    def async_group(self) -> aio.Group:
 54        """Async group"""
 55        return self._conn.async_group
 56
 57    async def get_log_conf(self) -> json.Data:
 58        """Get logging configuration"""
 59        data = await self._send(
 60            req_msg_type='HatEventAdminer.MsgGetLogConfReq',
 61            req_msg_data=None,
 62            res_msg_type='HatEventAdminer.MsgGetLogConfRes')
 63
 64        return json.decode(data)
 65
 66    async def set_log_conf(self, conf: json.Data):
 67        """Set logging configuration"""
 68        await self._send(req_msg_type='HatEventAdminer.MsgSetLogConfReq',
 69                         req_msg_data=json.encode(conf),
 70                         res_msg_type='HatEventAdminer.MsgSetLogConfRes')
 71
 72    async def _send(self, req_msg_type, req_msg_data, res_msg_type):
 73        conv = await common.send_msg(
 74            conn=self._conn,
 75            msg_type=req_msg_type,
 76            msg_data=req_msg_data,
 77            last=False)
 78
 79        if not self.is_open:
 80            raise ConnectionError()
 81
 82        future = self._loop.create_future()
 83        self._conv_msg_type_futures[conv] = res_msg_type, future
 84
 85        try:
 86            return await future
 87
 88        finally:
 89            self._conv_msg_type_futures.pop(conv, None)
 90
 91    async def _receive_loop(self):
 92        mlog.debug("starting receive loop")
 93        try:
 94            while True:
 95                mlog.debug("waiting for incoming message")
 96                msg, msg_type, msg_data = await common.receive_msg(self._conn)
 97
 98                mlog.debug(f"received message {msg_type}")
 99
100                res_msg_type, future = self._conv_msg_type_futures.get(
101                    msg.conv, (None, None))
102                if not future or future.done():
103                    return
104
105                if res_msg_type != msg_type:
106                    raise Exception('invalid response message type')
107
108                if msg_data[0] == 'error':
109                    future.set_exception(AdminerError(msg_data[1]))
110
111                future.set_result(msg_data[1])
112
113        except ConnectionError:
114            pass
115
116        except Exception as e:
117            mlog.error("read loop error: %s", e, exc_info=e)
118
119        finally:
120            mlog.debug("stopping receive loop")
121            self.close()
122
123            for _, future in self._conv_msg_type_futures.values():
124                if not future.done():
125                    future.set_exception(ConnectionError())

Event adminer client

For creating new client see connect coroutine.

async_group: hat.aio.group.Group
52    @property
53    def async_group(self) -> aio.Group:
54        """Async group"""
55        return self._conn.async_group

Async group

async def get_log_conf( self) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
57    async def get_log_conf(self) -> json.Data:
58        """Get logging configuration"""
59        data = await self._send(
60            req_msg_type='HatEventAdminer.MsgGetLogConfReq',
61            req_msg_data=None,
62            res_msg_type='HatEventAdminer.MsgGetLogConfRes')
63
64        return json.decode(data)

Get logging configuration

async def set_log_conf( self, conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]):
66    async def set_log_conf(self, conf: json.Data):
67        """Set logging configuration"""
68        await self._send(req_msg_type='HatEventAdminer.MsgSetLogConfReq',
69                         req_msg_data=json.encode(conf),
70                         res_msg_type='HatEventAdminer.MsgSetLogConfRes')

Set logging configuration

GetLogConfCb = typing.Callable[[NoneType], typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')], collections.abc.Awaitable[typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]]]]
SetLogConfCb = typing.Callable[[typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]], None | collections.abc.Awaitable[None]]
async def listen( addr: hat.drivers.tcp.Address, *, get_log_conf_cb: Optional[Callable[[NoneType], Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Awaitable[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]]]] = None, set_log_conf_cb: Optional[Callable[[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], None | Awaitable[None]]] = None, **kwargs) -> Server:
23async def listen(addr: tcp.Address,
24                 *,
25                 get_log_conf_cb: GetLogConfCb | None = None,
26                 set_log_conf_cb: SetLogConfCb | None = None,
27                 **kwargs
28                 ) -> 'Server':
29    """Create listening Event Adminer Server instance"""
30    server = Server()
31    server._get_log_conf_cb = get_log_conf_cb
32    server._set_log_conf_cb = set_log_conf_cb
33
34    server._srv = await chatter.listen(server._connection_loop, addr, **kwargs)
35    mlog.debug("listening on %s", addr)
36
37    return server

Create listening Event Adminer Server instance

class Server(hat.aio.group.Resource):
 40class Server(aio.Resource):
 41
 42    @property
 43    def async_group(self) -> aio.Group:
 44        """Async group"""
 45        return self._srv.async_group
 46
 47    async def _connection_loop(self, conn):
 48        mlog.debug("starting connection loop")
 49        try:
 50            while True:
 51                mlog.debug("waiting for incomming messages")
 52                msg, msg_type, msg_data = await common.receive_msg(conn)
 53
 54                mlog.debug(f"received message {msg_type}")
 55
 56                if msg_type == 'HatEventAdminer.MsgGetLogConfReq':
 57                    await self._process_msg_get_log_conf(
 58                        conn=conn,
 59                        conv=msg.conv,
 60                        req_msg_data=msg_data)
 61
 62                elif msg_type == 'HatEventAdminer.MsgSetLogConfReq':
 63                    await self._process_msg_set_log_conf(
 64                        conn=conn,
 65                        conv=msg.conv,
 66                        req_msg_data=msg_data)
 67
 68                else:
 69                    raise Exception('unsupported message type')
 70
 71        except ConnectionError:
 72            pass
 73
 74        except Exception as e:
 75            mlog.error("on connection error: %s", e, exc_info=e)
 76
 77        finally:
 78            mlog.debug("stopping connection loop")
 79            conn.close()
 80
 81    async def _process_msg_get_log_conf(self, conn, conv, req_msg_data):
 82        try:
 83            if not self._get_log_conf_cb:
 84                raise Exception('not implemented')
 85
 86            result = await aio.call(self._get_log_conf_cb)
 87
 88            res_msg_data = 'success', json.encode(result)
 89
 90        except Exception as e:
 91            res_msg_data = 'error', str(e)
 92
 93        await common.send_msg(
 94            conn, 'HatEventAdminer.MsgGetLogConfRes', res_msg_data,
 95            conv=conv)
 96
 97    async def _process_msg_set_log_conf(self, conn, conv, req_msg_data):
 98        try:
 99            if not self._set_log_conf_cb:
100                raise Exception('not implemented')
101
102            conf = json.decode(req_msg_data)
103            await aio.call(self._set_log_conf_cb, conf)
104
105            res_msg_data = 'success', None
106
107        except Exception as e:
108            res_msg_data = 'error', str(e)
109
110        await common.send_msg(
111            conn, 'HatEventAdminer.MsgSetLogConfRes', res_msg_data,
112            conv=conv)

Resource with lifetime control based on Group.

async_group: hat.aio.group.Group
42    @property
43    def async_group(self) -> aio.Group:
44        """Async group"""
45        return self._srv.async_group

Async group