hat.event.eventer

Eventer communication protocol

 1"""Eventer communication protocol"""
 2
 3from hat.event.eventer.client import (StatusCb,
 4                                      EventsCb,
 5                                      EventerInitError,
 6                                      connect,
 7                                      Client)
 8from hat.event.eventer.server import (ConnectionId,
 9                                      ConnectionInfo,
10                                      ConnectionCb,
11                                      RegisterCb,
12                                      QueryCb,
13                                      listen,
14                                      Server)
15
16
17__all__ = ['StatusCb',
18           'EventsCb',
19           'EventerInitError',
20           'connect',
21           'Client',
22           'ConnectionId',
23           'ConnectionInfo',
24           'ConnectionCb',
25           'RegisterCb',
26           'QueryCb',
27           'listen',
28           'Server']
StatusCb = typing.Callable[[ForwardRef('Client'), hat.event.common.Status], None | collections.abc.Awaitable[None]]
EventsCb = typing.Callable[[ForwardRef('Client'), collections.abc.Collection[hat.event.common.Event]], None | collections.abc.Awaitable[None]]
class EventerInitError(builtins.Exception):
29class EventerInitError(Exception):
30    """Eventer initialization error"""

Eventer initialization error

async def connect( addr: hat.drivers.tcp.Address, client_name: str, *, client_token: str | None = None, subscriptions: Iterable[tuple[str, ...]] = [], server_id: int | None = None, persisted: bool = False, status_cb: Callable[[Client, hat.event.common.Status], None | Awaitable[None]] | None = None, events_cb: Callable[[Client, Collection[hat.event.common.Event]], None | Awaitable[None]] | None = None, **kwargs) -> Client:
 33async def connect(addr: tcp.Address,
 34                  client_name: str,
 35                  *,
 36                  client_token: str | None = None,
 37                  subscriptions: Iterable[common.EventType] = [],
 38                  server_id: common.ServerId | None = None,
 39                  persisted: bool = False,
 40                  status_cb: StatusCb | None = None,
 41                  events_cb: EventsCb | None = None,
 42                  **kwargs
 43                  ) -> 'Client':
 44    """Connect to Eventer Server
 45
 46    Arguments `client_name` and optional `client_token` identifies eventer
 47    client.
 48
 49    According to Event Server specification, each subscription is event
 50    type identifier which can contain special subtypes ``?`` and ``*``.
 51    Subtype ``?`` can occur at any position inside event type identifier
 52    and is used as replacement for any single subtype. Subtype ``*`` is valid
 53    only as last subtype in event type identifier and is used as replacement
 54    for zero or more arbitrary subtypes.
 55
 56    If `subscriptions` is empty list, client doesn't subscribe for any events
 57    and will not receive server's notifications.
 58
 59    If `server_id` is ``None``, client will receive all event notifications,
 60    in accordance to `subscriptions`, regardless of event's server id. If
 61    `server_id` is set, Eventer Server will only send events notifications
 62    for events with provided server id.
 63
 64    If `persisted` is set to ``True``, Eventer Server will notify events
 65    after they are persisted (flushed to disk). Otherwise, events are
 66    notified immediately after registration.
 67
 68    Additional arguments are passed to `hat.chatter.connect` coroutine.
 69
 70    """
 71    client = Client()
 72    client._status_cb = status_cb
 73    client._events_cb = events_cb
 74    client._loop = asyncio.get_running_loop()
 75    client._conv_futures = {}
 76    client._status = common.Status.STANDBY
 77
 78    client._conn = await chatter.connect(addr, **kwargs)
 79
 80    try:
 81        req_data = {'clientName': client_name,
 82                    'clientToken': _optional_to_sbs(client_token),
 83                    'subscriptions': [list(i) for i in subscriptions],
 84                    'serverId': _optional_to_sbs(server_id),
 85                    'persisted': persisted}
 86        conv = await common.send_msg(conn=client._conn,
 87                                     msg_type='HatEventer.MsgInitReq',
 88                                     msg_data=req_data,
 89                                     last=False)
 90
 91        res, res_type, res_data = await common.receive_msg(client._conn)
 92        if res_type != 'HatEventer.MsgInitRes' or res.conv != conv:
 93            raise Exception('invalid init response')
 94
 95        if res_data[0] == 'success':
 96            client._status = common.Status(common.status_from_sbs(res_data[1]))
 97
 98        elif res_data[0] == 'error':
 99            raise EventerInitError(res_data[1])
100
101        else:
102            raise ValueError('unsupported init response')
103
104        client.async_group.spawn(client._receive_loop)
105
106    except BaseException:
107        await aio.uncancellable(client.async_close())
108        raise
109
110    return client

Connect to Eventer Server

Arguments client_name and optional client_token identifies eventer client.

According to Event Server specification, each subscription is event type identifier which can contain special subtypes ? and *. Subtype ? can occur at any position inside event type identifier and is used as replacement for any single subtype. Subtype * is valid only as last subtype in event type identifier and is used as replacement for zero or more arbitrary subtypes.

If subscriptions is empty list, client doesn't subscribe for any events and will not receive server's notifications.

If server_id is None, client will receive all event notifications, in accordance to subscriptions, regardless of event's server id. If server_id is set, Eventer Server will only send events notifications for events with provided server id.

If persisted is set to True, Eventer Server will notify events after they are persisted (flushed to disk). Otherwise, events are notified immediately after registration.

Additional arguments are passed to hat.chatter.connect coroutine.

class Client(hat.aio.group.Resource):
113class Client(aio.Resource):
114    """Eventer client
115
116    For creating new client see `connect` coroutine.
117
118    """
119
120    @property
121    def async_group(self) -> aio.Group:
122        """Async group"""
123        return self._conn.async_group
124
125    @property
126    def status(self) -> common.Status:
127        """Status"""
128        return self._status
129
130    async def register(self,
131                       events: Collection[common.RegisterEvent],
132                       with_response: bool = False
133                       ) -> Collection[common.Event] | None:
134        """Register events and optionally wait for response
135
136        If `with_response` is ``True``, this coroutine returns list of events
137        or ``None`` if registration failure occurred.
138
139        """
140        msg_data = [common.register_event_to_sbs(i) for i in events]
141        conv = await common.send_msg(conn=self._conn,
142                                     msg_type='HatEventer.MsgRegisterReq',
143                                     msg_data=msg_data,
144                                     last=not with_response)
145
146        if with_response:
147            return await self._wait_conv_res(conv)
148
149    async def query(self,
150                    params: common.QueryParams
151                    ) -> common.QueryResult:
152        """Query events from server"""
153        msg_data = common.query_params_to_sbs(params)
154        conv = await common.send_msg(conn=self._conn,
155                                     msg_type='HatEventer.MsgQueryReq',
156                                     msg_data=msg_data,
157                                     last=False)
158
159        return await self._wait_conv_res(conv)
160
161    async def _receive_loop(self):
162        mlog.debug("starting receive loop")
163        try:
164            while True:
165                mlog.debug("waiting for incoming message")
166                msg, msg_type, msg_data = await common.receive_msg(self._conn)
167
168                if msg_type == 'HatEventer.MsgStatusNotify':
169                    mlog.debug("received status notification")
170                    await self._process_msg_status_notify(msg, msg_data)
171
172                elif msg_type == 'HatEventer.MsgEventsNotify':
173                    mlog.debug("received events notification")
174                    await self._process_msg_events_notify(msg, msg_data)
175
176                elif msg_type == 'HatEventer.MsgRegisterRes':
177                    mlog.debug("received register response")
178                    await self._process_msg_register_res(msg, msg_data)
179
180                elif msg_type == 'HatEventer.MsgQueryRes':
181                    mlog.debug("received query response")
182                    await self._process_msg_query_res(msg, msg_data)
183
184                else:
185                    raise Exception("unsupported message type")
186
187        except ConnectionError:
188            pass
189
190        except Exception as e:
191            mlog.error("read loop error: %s", e, exc_info=e)
192
193        finally:
194            mlog.debug("stopping receive loop")
195            self.close()
196
197            for future in self._conv_futures.values():
198                if not future.done():
199                    future.set_exception(ConnectionError())
200
201    async def _wait_conv_res(self, conv):
202        if not self.is_open:
203            raise ConnectionError()
204
205        future = self._loop.create_future()
206        self._conv_futures[conv] = future
207
208        try:
209            return await future
210
211        finally:
212            self._conv_futures.pop(conv, None)
213
214    async def _process_msg_status_notify(self, msg, msg_data):
215        self._status = common.status_from_sbs(msg_data)
216
217        if self._status_cb:
218            await aio.call(self._status_cb, self, self._status)
219
220    async def _process_msg_events_notify(self, msg, msg_data):
221        events = [common.event_from_sbs(event) for event in msg_data]
222
223        if self._events_cb:
224            await aio.call(self._events_cb, self, events)
225
226        if msg.last:
227            return
228
229        await common.send_msg(conn=self._conn,
230                              msg_type='HatEventer.MsgEventsAck',
231                              msg_data=None,
232                              conv=msg.conv)
233
234    async def _process_msg_register_res(self, msg, msg_data):
235        if msg_data[0] == 'events':
236            result = [common.event_from_sbs(event) for event in msg_data[1]]
237
238        elif msg_data[0] == 'failure':
239            result = None
240
241        else:
242            raise ValueError('unsupported register response')
243
244        future = self._conv_futures.get(msg.conv)
245        if not future or future.done():
246            return
247
248        future.set_result(result)
249
250    async def _process_msg_query_res(self, msg, msg_data):
251        result = common.query_result_from_sbs(msg_data)
252
253        future = self._conv_futures.get(msg.conv)
254        if not future or future.done():
255            return
256
257        future.set_result(result)

Eventer client

For creating new client see connect coroutine.

async_group: hat.aio.group.Group
120    @property
121    def async_group(self) -> aio.Group:
122        """Async group"""
123        return self._conn.async_group

Async group

status: hat.event.common.Status
125    @property
126    def status(self) -> common.Status:
127        """Status"""
128        return self._status

Status

async def register( self, events: Collection[hat.event.common.RegisterEvent], with_response: bool = False) -> Collection[hat.event.common.Event] | None:
130    async def register(self,
131                       events: Collection[common.RegisterEvent],
132                       with_response: bool = False
133                       ) -> Collection[common.Event] | None:
134        """Register events and optionally wait for response
135
136        If `with_response` is ``True``, this coroutine returns list of events
137        or ``None`` if registration failure occurred.
138
139        """
140        msg_data = [common.register_event_to_sbs(i) for i in events]
141        conv = await common.send_msg(conn=self._conn,
142                                     msg_type='HatEventer.MsgRegisterReq',
143                                     msg_data=msg_data,
144                                     last=not with_response)
145
146        if with_response:
147            return await self._wait_conv_res(conv)

Register events and optionally wait for response

If with_response is True, this coroutine returns list of events or None if registration failure occurred.

149    async def query(self,
150                    params: common.QueryParams
151                    ) -> common.QueryResult:
152        """Query events from server"""
153        msg_data = common.query_params_to_sbs(params)
154        conv = await common.send_msg(conn=self._conn,
155                                     msg_type='HatEventer.MsgQueryReq',
156                                     msg_data=msg_data,
157                                     last=False)
158
159        return await self._wait_conv_res(conv)

Query events from server

ConnectionId = <class 'int'>
class ConnectionInfo(typing.NamedTuple):
24class ConnectionInfo(typing.NamedTuple):
25    id: ConnectionId
26    client_name: str
27    client_token: str | None
28    subscription: common.Subscription
29    server_id: int | None
30    persisted: bool

ConnectionInfo(id, client_name, client_token, subscription, server_id, persisted)

ConnectionInfo( id: int, client_name: str, client_token: str | None, subscription: hat.event.common.Subscription, server_id: int | None, persisted: bool)

Create new instance of ConnectionInfo(id, client_name, client_token, subscription, server_id, persisted)

id: int

Alias for field number 0

client_name: str

Alias for field number 1

client_token: str | None

Alias for field number 2

Alias for field number 3

server_id: int | None

Alias for field number 4

persisted: bool

Alias for field number 5

ConnectionCb = typing.Callable[[ConnectionInfo], None | collections.abc.Awaitable[None]]
RegisterCb = typing.Callable[[ConnectionInfo, collections.abc.Collection[hat.event.common.RegisterEvent]], collections.abc.Collection[hat.event.common.Event] | None | collections.abc.Awaitable[collections.abc.Collection[hat.event.common.Event] | None]]
async def listen( addr: hat.drivers.tcp.Address, *, status: hat.event.common.Status = <Status.STANDBY: 'standby'>, connected_cb: Callable[[ConnectionInfo], None | Awaitable[None]] | None = None, disconnected_cb: Callable[[ConnectionInfo], None | Awaitable[None]] | None = None, register_cb: Callable[[ConnectionInfo, Collection[hat.event.common.RegisterEvent]], Collection[hat.event.common.Event] | None | Awaitable[Collection[hat.event.common.Event] | None]] | None = None, query_cb: Callable[[ConnectionInfo, hat.event.common.QueryLatestParams | hat.event.common.QueryTimeseriesParams | hat.event.common.QueryServerParams], hat.event.common.QueryResult | Awaitable[hat.event.common.QueryResult]] | None = None, close_timeout: float = 0.5, **kwargs) -> Server:
47async def listen(addr: tcp.Address,
48                 *,
49                 status: common.Status = common.Status.STANDBY,
50                 connected_cb: ConnectionCb | None = None,
51                 disconnected_cb: ConnectionCb | None = None,
52                 register_cb: RegisterCb | None = None,
53                 query_cb: QueryCb | None = None,
54                 close_timeout: float = 0.5,
55                 **kwargs
56                 ) -> 'Server':
57    """Create listening Eventer Server instance"""
58    server = Server()
59    server._status = status
60    server._connected_cb = connected_cb
61    server._disconnected_cb = disconnected_cb
62    server._register_cb = register_cb
63    server._query_cb = query_cb
64    server._close_timeout = close_timeout
65    server._loop = asyncio.get_running_loop()
66    server._next_conn_ids = itertools.count(1)
67    server._conn_infos = {}
68    server._conn_conv_futures = {}
69
70    server._srv = await chatter.listen(server._connection_loop, addr, **kwargs)
71    mlog.debug("listening on %s", addr)
72
73    return server

Create listening Eventer Server instance

class Server(hat.aio.group.Resource):
 76class Server(aio.Resource):
 77
 78    @property
 79    def async_group(self) -> aio.Group:
 80        """Async group"""
 81        return self._srv.async_group
 82
 83    def get_conn_infos(self) -> list[ConnectionInfo]:
 84        """Get connection infos"""
 85        return list(self._conn_infos.values())
 86
 87    async def set_status(self, status: common.Status):
 88        """Set status"""
 89        if self._status == status:
 90            return
 91
 92        self._status = status
 93
 94        for conn in list(self._conn_infos.keys()):
 95            await self._notify_status(conn)
 96
 97    async def notify_events(self,
 98                            events: Collection[common.Event],
 99                            persisted: bool,
100                            with_ack: bool = False):
101        """Notify events to clients"""
102        conn_conn_events = collections.deque()
103
104        for conn, info in self._conn_infos.items():
105            if info.persisted != persisted:
106                continue
107
108            conn_events = collections.deque(
109                event for event in events
110                if (info.subscription.matches(event.type) and
111                    (info.server_id is None or
112                     info.server_id == event.id.server)))
113            if not conn_events:
114                continue
115
116            conn_conn_events.append((conn, conn_events))
117
118        if not conn_conn_events:
119            return
120
121        if with_ack:
122            await asyncio.wait([
123                self.async_group.spawn(self._notify_events, conn, conn_events,
124                                       True)
125                for conn, conn_events in conn_conn_events])
126
127        else:
128            for conn, conn_events in conn_conn_events:
129                await self._notify_events(conn, conn_events, False)
130
131    async def _connection_loop(self, conn):
132        mlog.debug("starting connection loop")
133        conn_id = next(self._next_conn_ids)
134        info = None
135
136        try:
137            req, req_type, req_data = await common.receive_msg(conn)
138            if req_type != 'HatEventer.MsgInitReq':
139                raise Exception('invalid init request type')
140
141            try:
142                info = ConnectionInfo(
143                    id=conn_id,
144                    client_name=req_data['clientName'],
145                    client_token=_optional_from_sbs(req_data['clientToken']),
146                    subscription=common.create_subscription(
147                        tuple(i) for i in req_data['subscriptions']),
148                    server_id=_optional_from_sbs(req_data['serverId']),
149                    persisted=req_data['persisted'])
150
151                if self._connected_cb:
152                    await aio.call(self._connected_cb, info)
153
154                res_data = 'success', common.status_to_sbs(self._status)
155                self._conn_infos[conn] = info
156
157            except Exception as e:
158                mlog.warning("connection initialization error: %s", e,
159                             exc_info=e)
160
161                info = None
162                res_data = 'error', str(e)
163
164            mlog.debug("sending init response %s", res_data[0])
165            await common.send_msg(conn, 'HatEventer.MsgInitRes', res_data,
166                                  conv=req.conv)
167
168            if res_data[0] != 'success':
169                with contextlib.suppress(asyncio.TimeoutError):
170                    await aio.wait_for(conn.wait_closing(),
171                                       self._close_timeout)
172                return
173
174            while True:
175                mlog.debug("waiting for incomming messages")
176                msg, msg_type, msg_data = await common.receive_msg(conn)
177
178                if msg_type == 'HatEventer.MsgEventsAck':
179                    mlog.debug("received events ack")
180                    future = self._conn_conv_futures.get((conn, msg.conv))
181                    if future and not future.done():
182                        future.set_result(None)
183
184                elif msg_type == 'HatEventer.MsgRegisterReq':
185                    mlog.debug("received register request")
186                    await self._process_msg_register(conn, info, msg, msg_data)
187
188                elif msg_type == 'HatEventer.MsgQueryReq':
189                    mlog.debug("received query request")
190                    await self._process_msg_query(conn, info, msg, msg_data)
191
192                else:
193                    raise Exception('unsupported message type')
194
195        except ConnectionError:
196            pass
197
198        except Exception as e:
199            mlog.error("on connection error: %s", e, exc_info=e)
200
201        finally:
202            mlog.debug("stopping connection loop")
203            conn.close()
204            self._conn_infos.pop(conn, None)
205
206            for future in self._conn_conv_futures.values():
207                if not future.done():
208                    future.set_exception(ConnectionError())
209
210            if self._disconnected_cb and info:
211                with contextlib.suppress(Exception):
212                    await aio.call(self._disconnected_cb, info)
213
214    async def _process_msg_register(self, conn, info, req, req_data):
215        register_events = [common.register_event_from_sbs(i)
216                           for i in req_data]
217
218        if self._register_cb:
219            events = await aio.call(self._register_cb, info, register_events)
220
221        else:
222            events = None
223
224        if req.last:
225            return
226
227        if events is not None:
228            res_data = 'events', [common.event_to_sbs(event)
229                                  for event in events]
230
231        else:
232            res_data = 'failure', None
233
234        await common.send_msg(conn, 'HatEventer.MsgRegisterRes', res_data,
235                              conv=req.conv)
236
237    async def _process_msg_query(self, conn, info, req, req_data):
238        params = common.query_params_from_sbs(req_data)
239
240        if self._query_cb:
241            result = await aio.call(self._query_cb, info, params)
242
243        else:
244            result = common.QueryResult(events=[],
245                                        more_follows=False)
246
247        res_data = common.query_result_to_sbs(result)
248        await common.send_msg(conn, 'HatEventer.MsgQueryRes', res_data,
249                              conv=req.conv)
250
251    async def _notify_status(self, conn):
252        try:
253            msg_data = common.status_to_sbs(self._status)
254            await common.send_msg(conn, 'HatEventer.MsgStatusNotify', msg_data)
255
256        except ConnectionError:
257            pass
258
259        except Exception as e:
260            mlog.error("notify status error: %s", e, exc_info=e)
261
262    async def _notify_events(self, conn, events, with_ack):
263        try:
264            msg_data = [common.event_to_sbs(event) for event in events]
265            conv = await common.send_msg(conn,
266                                         'HatEventer.MsgEventsNotify',
267                                         msg_data,
268                                         last=not with_ack)
269
270            if not with_ack:
271                return
272
273            future = self._loop.create_future()
274            self._conn_conv_futures[(conn, conv)] = future
275
276            try:
277                await future
278
279            finally:
280                self._conn_conv_futures.pop((conn, conv))
281
282        except ConnectionError:
283            pass
284
285        except Exception as e:
286            mlog.error("notify events error: %s", e, exc_info=e)

Resource with lifetime control based on Group.

async_group: hat.aio.group.Group
78    @property
79    def async_group(self) -> aio.Group:
80        """Async group"""
81        return self._srv.async_group

Async group

def get_conn_infos(self) -> list[ConnectionInfo]:
83    def get_conn_infos(self) -> list[ConnectionInfo]:
84        """Get connection infos"""
85        return list(self._conn_infos.values())

Get connection infos

async def set_status(self, status: hat.event.common.Status):
87    async def set_status(self, status: common.Status):
88        """Set status"""
89        if self._status == status:
90            return
91
92        self._status = status
93
94        for conn in list(self._conn_infos.keys()):
95            await self._notify_status(conn)

Set status

async def notify_events( self, events: Collection[hat.event.common.Event], persisted: bool, with_ack: bool = False):
 97    async def notify_events(self,
 98                            events: Collection[common.Event],
 99                            persisted: bool,
100                            with_ack: bool = False):
101        """Notify events to clients"""
102        conn_conn_events = collections.deque()
103
104        for conn, info in self._conn_infos.items():
105            if info.persisted != persisted:
106                continue
107
108            conn_events = collections.deque(
109                event for event in events
110                if (info.subscription.matches(event.type) and
111                    (info.server_id is None or
112                     info.server_id == event.id.server)))
113            if not conn_events:
114                continue
115
116            conn_conn_events.append((conn, conn_events))
117
118        if not conn_conn_events:
119            return
120
121        if with_ack:
122            await asyncio.wait([
123                self.async_group.spawn(self._notify_events, conn, conn_events,
124                                       True)
125                for conn, conn_events in conn_conn_events])
126
127        else:
128            for conn, conn_events in conn_conn_events:
129                await self._notify_events(conn, conn_events, False)

Notify events to clients