hat.event.eventer

Eventer communication protocol

 1"""Eventer communication protocol"""
 2
 3from hat.event.eventer.client import (StatusCb,
 4                                      EventsCb,
 5                                      EventerInitError,
 6                                      connect,
 7                                      Client)
 8from hat.event.eventer.server import (ConnectionId,
 9                                      ConnectionInfo,
10                                      ConnectionCb,
11                                      RegisterCb,
12                                      QueryCb,
13                                      listen,
14                                      Server)
15
16
17__all__ = ['StatusCb',
18           'EventsCb',
19           'EventerInitError',
20           'connect',
21           'Client',
22           'ConnectionId',
23           'ConnectionInfo',
24           'ConnectionCb',
25           'RegisterCb',
26           'QueryCb',
27           'listen',
28           'Server']
StatusCb = typing.Callable[[ForwardRef('Client'), hat.event.common.Status], None | collections.abc.Awaitable[None]]
EventsCb = typing.Callable[[ForwardRef('Client'), collections.abc.Collection[hat.event.common.Event]], None | collections.abc.Awaitable[None]]
class EventerInitError(builtins.Exception):
29class EventerInitError(Exception):
30    """Eventer initialization error"""

Eventer initialization error

async def connect( addr: hat.drivers.tcp.Address, client_name: str, *, client_token: str | None = None, subscriptions: Iterable[tuple[str, ...]] = [], server_id: int | None = None, persisted: bool = False, status_cb: Optional[Callable[[Client, hat.event.common.Status], None | Awaitable[None]]] = None, events_cb: Optional[Callable[[Client, Collection[hat.event.common.Event]], None | Awaitable[None]]] = None, **kwargs) -> Client:
 33async def connect(addr: tcp.Address,
 34                  client_name: str,
 35                  *,
 36                  client_token: str | None = None,
 37                  subscriptions: Iterable[common.EventType] = [],
 38                  server_id: common.ServerId | None = None,
 39                  persisted: bool = False,
 40                  status_cb: StatusCb | None = None,
 41                  events_cb: EventsCb | None = None,
 42                  **kwargs
 43                  ) -> 'Client':
 44    """Connect to Eventer Server
 45
 46    Arguments `client_name` and optional `client_token` identifies eventer
 47    client.
 48
 49    According to Event Server specification, each subscription is event
 50    type identifier which can contain special subtypes ``?`` and ``*``.
 51    Subtype ``?`` can occur at any position inside event type identifier
 52    and is used as replacement for any single subtype. Subtype ``*`` is valid
 53    only as last subtype in event type identifier and is used as replacement
 54    for zero or more arbitrary subtypes.
 55
 56    If `subscriptions` is empty list, client doesn't subscribe for any events
 57    and will not receive server's notifications.
 58
 59    If `server_id` is ``None``, client will receive all event notifications,
 60    in accordance to `subscriptions`, regardless of event's server id. If
 61    `server_id` is set, Eventer Server will only send events notifications
 62    for events with provided server id.
 63
 64    If `persisted` is set to ``True``, Eventer Server will notify events
 65    after they are persisted (flushed to disk). Otherwise, events are
 66    notified immediately after registration.
 67
 68    Additional arguments are passed to `hat.chatter.connect` coroutine.
 69
 70    """
 71    client = Client()
 72    client._status_cb = status_cb
 73    client._events_cb = events_cb
 74    client._loop = asyncio.get_running_loop()
 75    client._conv_futures = {}
 76    client._status = common.Status.STANDBY
 77
 78    client._conn = await chatter.connect(addr, **kwargs)
 79
 80    try:
 81        req_data = {'clientName': client_name,
 82                    'clientToken': _optional_to_sbs(client_token),
 83                    'subscriptions': [list(i) for i in subscriptions],
 84                    'serverId': _optional_to_sbs(server_id),
 85                    'persisted': persisted}
 86        conv = await common.send_msg(conn=client._conn,
 87                                     msg_type='HatEventer.MsgInitReq',
 88                                     msg_data=req_data,
 89                                     last=False)
 90
 91        res, res_type, res_data = await common.receive_msg(client._conn)
 92        if res_type != 'HatEventer.MsgInitRes' or res.conv != conv:
 93            raise Exception('invalid init response')
 94
 95        if res_data[0] == 'success':
 96            client._status = common.Status(common.status_from_sbs(res_data[1]))
 97
 98        elif res_data[0] == 'error':
 99            raise EventerInitError(res_data[1])
100
101        else:
102            raise ValueError('unsupported init response')
103
104        client.async_group.spawn(client._receive_loop)
105
106    except BaseException:
107        await aio.uncancellable(client.async_close())
108        raise
109
110    return client

Connect to Eventer Server

Arguments client_name and optional client_token identifies eventer client.

According to Event Server specification, each subscription is event type identifier which can contain special subtypes ? and *. Subtype ? can occur at any position inside event type identifier and is used as replacement for any single subtype. Subtype * is valid only as last subtype in event type identifier and is used as replacement for zero or more arbitrary subtypes.

If subscriptions is empty list, client doesn't subscribe for any events and will not receive server's notifications.

If server_id is None, client will receive all event notifications, in accordance to subscriptions, regardless of event's server id. If server_id is set, Eventer Server will only send events notifications for events with provided server id.

If persisted is set to True, Eventer Server will notify events after they are persisted (flushed to disk). Otherwise, events are notified immediately after registration.

Additional arguments are passed to hat.chatter.connect coroutine.

class Client(hat.aio.group.Resource):
113class Client(aio.Resource):
114    """Eventer client
115
116    For creating new client see `connect` coroutine.
117
118    """
119
120    @property
121    def async_group(self) -> aio.Group:
122        """Async group"""
123        return self._conn.async_group
124
125    @property
126    def status(self) -> common.Status:
127        """Status"""
128        return self._status
129
130    async def register(self,
131                       events: Collection[common.RegisterEvent],
132                       with_response: bool = False
133                       ) -> Collection[common.Event] | None:
134        """Register events and optionally wait for response
135
136        If `with_response` is ``True``, this coroutine returns list of events
137        or ``None`` if registration failure occurred.
138
139        """
140        msg_data = [common.register_event_to_sbs(i) for i in events]
141        conv = await common.send_msg(conn=self._conn,
142                                     msg_type='HatEventer.MsgRegisterReq',
143                                     msg_data=msg_data,
144                                     last=not with_response)
145
146        if with_response:
147            return await self._wait_conv_res(conv)
148
149    async def query(self,
150                    params: common.QueryParams
151                    ) -> common.QueryResult:
152        """Query events from server"""
153        msg_data = common.query_params_to_sbs(params)
154        conv = await common.send_msg(conn=self._conn,
155                                     msg_type='HatEventer.MsgQueryReq',
156                                     msg_data=msg_data,
157                                     last=False)
158
159        return await self._wait_conv_res(conv)
160
161    async def _receive_loop(self):
162        mlog.debug("starting receive loop")
163        try:
164            while True:
165                mlog.debug("waiting for incoming message")
166                msg, msg_type, msg_data = await common.receive_msg(self._conn)
167
168                if msg_type == 'HatEventer.MsgStatusNotify':
169                    mlog.debug("received status notification")
170                    await self._process_msg_status_notify(msg, msg_data)
171
172                elif msg_type == 'HatEventer.MsgEventsNotify':
173                    mlog.debug("received events notification")
174                    await self._process_msg_events_notify(msg, msg_data)
175
176                elif msg_type == 'HatEventer.MsgRegisterRes':
177                    mlog.debug("received register response")
178                    await self._process_msg_register_res(msg, msg_data)
179
180                elif msg_type == 'HatEventer.MsgQueryRes':
181                    mlog.debug("received query response")
182                    await self._process_msg_query_res(msg, msg_data)
183
184                else:
185                    raise Exception("unsupported message type")
186
187        except ConnectionError:
188            pass
189
190        except Exception as e:
191            mlog.error("read loop error: %s", e, exc_info=e)
192
193        finally:
194            mlog.debug("stopping receive loop")
195            self.close()
196
197            for future in self._conv_futures.values():
198                if not future.done():
199                    future.set_exception(ConnectionError())
200
201    async def _wait_conv_res(self, conv):
202        if not self.is_open:
203            raise ConnectionError()
204
205        future = self._loop.create_future()
206        self._conv_futures[conv] = future
207
208        try:
209            return await future
210
211        finally:
212            self._conv_futures.pop(conv, None)
213
214    async def _process_msg_status_notify(self, msg, msg_data):
215        self._status = common.status_from_sbs(msg_data)
216
217        if self._status_cb:
218            await aio.call(self._status_cb, self, self._status)
219
220    async def _process_msg_events_notify(self, msg, msg_data):
221        events = [common.event_from_sbs(event) for event in msg_data]
222
223        if self._events_cb:
224            await aio.call(self._events_cb, self, events)
225
226        if msg.last:
227            return
228
229        await common.send_msg(conn=self._conn,
230                              msg_type='HatEventer.MsgEventsAck',
231                              msg_data=None,
232                              conv=msg.conv)
233
234    async def _process_msg_register_res(self, msg, msg_data):
235        if msg_data[0] == 'events':
236            result = [common.event_from_sbs(event) for event in msg_data[1]]
237
238        elif msg_data[0] == 'failure':
239            result = None
240
241        else:
242            raise ValueError('unsupported register response')
243
244        future = self._conv_futures.get(msg.conv)
245        if not future or future.done():
246            return
247
248        future.set_result(result)
249
250    async def _process_msg_query_res(self, msg, msg_data):
251        result = common.query_result_from_sbs(msg_data)
252
253        future = self._conv_futures.get(msg.conv)
254        if not future or future.done():
255            return
256
257        future.set_result(result)

Eventer client

For creating new client see connect coroutine.

async_group: hat.aio.group.Group
120    @property
121    def async_group(self) -> aio.Group:
122        """Async group"""
123        return self._conn.async_group

Async group

status: hat.event.common.Status
125    @property
126    def status(self) -> common.Status:
127        """Status"""
128        return self._status

Status

async def register( self, events: Collection[hat.event.common.RegisterEvent], with_response: bool = False) -> Collection[hat.event.common.Event] | None:
130    async def register(self,
131                       events: Collection[common.RegisterEvent],
132                       with_response: bool = False
133                       ) -> Collection[common.Event] | None:
134        """Register events and optionally wait for response
135
136        If `with_response` is ``True``, this coroutine returns list of events
137        or ``None`` if registration failure occurred.
138
139        """
140        msg_data = [common.register_event_to_sbs(i) for i in events]
141        conv = await common.send_msg(conn=self._conn,
142                                     msg_type='HatEventer.MsgRegisterReq',
143                                     msg_data=msg_data,
144                                     last=not with_response)
145
146        if with_response:
147            return await self._wait_conv_res(conv)

Register events and optionally wait for response

If with_response is True, this coroutine returns list of events or None if registration failure occurred.

149    async def query(self,
150                    params: common.QueryParams
151                    ) -> common.QueryResult:
152        """Query events from server"""
153        msg_data = common.query_params_to_sbs(params)
154        conv = await common.send_msg(conn=self._conn,
155                                     msg_type='HatEventer.MsgQueryReq',
156                                     msg_data=msg_data,
157                                     last=False)
158
159        return await self._wait_conv_res(conv)

Query events from server

ConnectionId = <class 'int'>
class ConnectionInfo(typing.NamedTuple):
24class ConnectionInfo(typing.NamedTuple):
25    id: ConnectionId
26    client_name: str
27    client_token: str | None
28    subscription: common.Subscription
29    server_id: int | None
30    persisted: bool

ConnectionInfo(id, client_name, client_token, subscription, server_id, persisted)

ConnectionInfo( id: int, client_name: str, client_token: str | None, subscription: hat.event.common.Subscription, server_id: int | None, persisted: bool)

Create new instance of ConnectionInfo(id, client_name, client_token, subscription, server_id, persisted)

id: int

Alias for field number 0

client_name: str

Alias for field number 1

client_token: str | None

Alias for field number 2

Alias for field number 3

server_id: int | None

Alias for field number 4

persisted: bool

Alias for field number 5

ConnectionCb = typing.Callable[[ConnectionInfo], None | collections.abc.Awaitable[None]]
RegisterCb = typing.Callable[[ConnectionInfo, collections.abc.Collection[hat.event.common.RegisterEvent]], collections.abc.Collection[hat.event.common.Event] | None | collections.abc.Awaitable[collections.abc.Collection[hat.event.common.Event] | None]]
async def listen( addr: hat.drivers.tcp.Address, *, status: hat.event.common.Status = <Status.STANDBY: 'standby'>, connected_cb: Optional[Callable[[ConnectionInfo], None | Awaitable[None]]] = None, disconnected_cb: Optional[Callable[[ConnectionInfo], None | Awaitable[None]]] = None, register_cb: Optional[Callable[[ConnectionInfo, Collection[hat.event.common.RegisterEvent]], Collection[hat.event.common.Event] | None | Awaitable[Collection[hat.event.common.Event] | None]]] = None, query_cb: Optional[Callable[[ConnectionInfo, hat.event.common.QueryLatestParams | hat.event.common.QueryTimeseriesParams | hat.event.common.QueryServerParams], hat.event.common.QueryResult | Awaitable[hat.event.common.QueryResult]]] = None, close_timeout: float = 0.5, **kwargs) -> Server:
47async def listen(addr: tcp.Address,
48                 *,
49                 status: common.Status = common.Status.STANDBY,
50                 connected_cb: ConnectionCb | None = None,
51                 disconnected_cb: ConnectionCb | None = None,
52                 register_cb: RegisterCb | None = None,
53                 query_cb: QueryCb | None = None,
54                 close_timeout: float = 0.5,
55                 **kwargs
56                 ) -> 'Server':
57    """Create listening Eventer Server instance"""
58    server = Server()
59    server._status = status
60    server._connected_cb = connected_cb
61    server._disconnected_cb = disconnected_cb
62    server._register_cb = register_cb
63    server._query_cb = query_cb
64    server._close_timeout = close_timeout
65    server._loop = asyncio.get_running_loop()
66    server._next_conn_ids = itertools.count(1)
67    server._conn_infos = {}
68    server._conn_conv_futures = {}
69
70    server._srv = await chatter.listen(server._connection_loop, addr, **kwargs)
71    mlog.debug("listening on %s", addr)
72
73    return server

Create listening Eventer Server instance

class Server(hat.aio.group.Resource):
 76class Server(aio.Resource):
 77
 78    @property
 79    def async_group(self) -> aio.Group:
 80        """Async group"""
 81        return self._srv.async_group
 82
 83    def get_conn_infos(self) -> list[ConnectionInfo]:
 84        """Get connection infos"""
 85        return list(self._conn_infos.values())
 86
 87    async def set_status(self, status: common.Status):
 88        """Set status"""
 89        if self._status == status:
 90            return
 91
 92        self._status = status
 93
 94        for conn in list(self._conn_infos.keys()):
 95            await self._notify_status(conn)
 96
 97    async def notify_events(self,
 98                            events: Collection[common.Event],
 99                            persisted: bool,
100                            with_ack: bool = False):
101        """Notify events to clients"""
102        conn_conn_events = collections.deque()
103
104        for conn, info in self._conn_infos.items():
105            if info.persisted != persisted:
106                continue
107
108            conn_events = collections.deque(
109                event for event in events
110                if (info.subscription.matches(event.type) and
111                    (info.server_id is None or
112                     info.server_id == event.id.server)))
113            if not conn_events:
114                continue
115
116            conn_conn_events.append((conn, conn_events))
117
118        if not conn_conn_events:
119            return
120
121        if with_ack:
122            await asyncio.wait([
123                self.async_group.spawn(self._notify_events, conn, conn_events,
124                                       True)
125                for conn, conn_events in conn_conn_events])
126
127        else:
128            for conn, conn_events in conn_conn_events:
129                await self._notify_events(conn, conn_events, False)
130
131    async def _connection_loop(self, conn):
132        mlog.debug("starting connection loop")
133        conn_id = next(self._next_conn_ids)
134        info = None
135
136        try:
137            req, req_type, req_data = await common.receive_msg(conn)
138            if req_type != 'HatEventer.MsgInitReq':
139                raise Exception('invalid init request type')
140
141            try:
142                info = ConnectionInfo(
143                    id=conn_id,
144                    client_name=req_data['clientName'],
145                    client_token=_optional_from_sbs(req_data['clientToken']),
146                    subscription=common.create_subscription(
147                        tuple(i) for i in req_data['subscriptions']),
148                    server_id=_optional_from_sbs(req_data['serverId']),
149                    persisted=req_data['persisted'])
150
151                if self._connected_cb:
152                    await aio.call(self._connected_cb, info)
153
154                res_data = 'success', common.status_to_sbs(self._status)
155                self._conn_infos[conn] = info
156
157            except Exception as e:
158                info = None
159                res_data = 'error', str(e)
160
161            mlog.debug("sending init response %s", res_data[0])
162            await common.send_msg(conn, 'HatEventer.MsgInitRes', res_data,
163                                  conv=req.conv)
164
165            if res_data[0] != 'success':
166                with contextlib.suppress(asyncio.TimeoutError):
167                    await aio.wait_for(conn.wait_closing(),
168                                       self._close_timeout)
169                return
170
171            while True:
172                mlog.debug("waiting for incomming messages")
173                msg, msg_type, msg_data = await common.receive_msg(conn)
174
175                if msg_type == 'HatEventer.MsgEventsAck':
176                    mlog.debug("received events ack")
177                    future = self._conn_conv_futures.get((conn, msg.conv))
178                    if future and not future.done():
179                        future.set_result(None)
180
181                elif msg_type == 'HatEventer.MsgRegisterReq':
182                    mlog.debug("received register request")
183                    await self._process_msg_register(conn, info, msg, msg_data)
184
185                elif msg_type == 'HatEventer.MsgQueryReq':
186                    mlog.debug("received query request")
187                    await self._process_msg_query(conn, info, msg, msg_data)
188
189                else:
190                    raise Exception('unsupported message type')
191
192        except ConnectionError:
193            pass
194
195        except Exception as e:
196            mlog.error("on connection error: %s", e, exc_info=e)
197
198        finally:
199            mlog.debug("stopping connection loop")
200            conn.close()
201            self._conn_infos.pop(conn, None)
202
203            for future in self._conn_conv_futures.values():
204                if not future.done():
205                    future.set_exception(ConnectionError())
206
207            if self._disconnected_cb and info:
208                with contextlib.suppress(Exception):
209                    await aio.call(self._disconnected_cb, info)
210
211    async def _process_msg_register(self, conn, info, req, req_data):
212        register_events = [common.register_event_from_sbs(i)
213                           for i in req_data]
214
215        if self._register_cb:
216            events = await aio.call(self._register_cb, info, register_events)
217
218        else:
219            events = None
220
221        if req.last:
222            return
223
224        if events is not None:
225            res_data = 'events', [common.event_to_sbs(event)
226                                  for event in events]
227
228        else:
229            res_data = 'failure', None
230
231        await common.send_msg(conn, 'HatEventer.MsgRegisterRes', res_data,
232                              conv=req.conv)
233
234    async def _process_msg_query(self, conn, info, req, req_data):
235        params = common.query_params_from_sbs(req_data)
236
237        if self._query_cb:
238            result = await aio.call(self._query_cb, info, params)
239
240        else:
241            result = common.QueryResult(events=[],
242                                        more_follows=False)
243
244        res_data = common.query_result_to_sbs(result)
245        await common.send_msg(conn, 'HatEventer.MsgQueryRes', res_data,
246                              conv=req.conv)
247
248    async def _notify_status(self, conn):
249        try:
250            msg_data = common.status_to_sbs(self._status)
251            await common.send_msg(conn, 'HatEventer.MsgStatusNotify', msg_data)
252
253        except ConnectionError:
254            pass
255
256        except Exception as e:
257            mlog.error("notify status error: %s", e, exc_info=e)
258
259    async def _notify_events(self, conn, events, with_ack):
260        try:
261            msg_data = [common.event_to_sbs(event) for event in events]
262            conv = await common.send_msg(conn,
263                                         'HatEventer.MsgEventsNotify',
264                                         msg_data,
265                                         last=not with_ack)
266
267            if not with_ack:
268                return
269
270            future = self._loop.create_future()
271            self._conn_conv_futures[(conn, conv)] = future
272
273            try:
274                await future
275
276            finally:
277                self._conn_conv_futures.pop((conn, conv))
278
279        except ConnectionError:
280            pass
281
282        except Exception as e:
283            mlog.error("notify events error: %s", e, exc_info=e)

Resource with lifetime control based on Group.

async_group: hat.aio.group.Group
78    @property
79    def async_group(self) -> aio.Group:
80        """Async group"""
81        return self._srv.async_group

Async group

def get_conn_infos(self) -> list[ConnectionInfo]:
83    def get_conn_infos(self) -> list[ConnectionInfo]:
84        """Get connection infos"""
85        return list(self._conn_infos.values())

Get connection infos

async def set_status(self, status: hat.event.common.Status):
87    async def set_status(self, status: common.Status):
88        """Set status"""
89        if self._status == status:
90            return
91
92        self._status = status
93
94        for conn in list(self._conn_infos.keys()):
95            await self._notify_status(conn)

Set status

async def notify_events( self, events: Collection[hat.event.common.Event], persisted: bool, with_ack: bool = False):
 97    async def notify_events(self,
 98                            events: Collection[common.Event],
 99                            persisted: bool,
100                            with_ack: bool = False):
101        """Notify events to clients"""
102        conn_conn_events = collections.deque()
103
104        for conn, info in self._conn_infos.items():
105            if info.persisted != persisted:
106                continue
107
108            conn_events = collections.deque(
109                event for event in events
110                if (info.subscription.matches(event.type) and
111                    (info.server_id is None or
112                     info.server_id == event.id.server)))
113            if not conn_events:
114                continue
115
116            conn_conn_events.append((conn, conn_events))
117
118        if not conn_conn_events:
119            return
120
121        if with_ack:
122            await asyncio.wait([
123                self.async_group.spawn(self._notify_events, conn, conn_events,
124                                       True)
125                for conn, conn_events in conn_conn_events])
126
127        else:
128            for conn, conn_events in conn_conn_events:
129                await self._notify_events(conn, conn_events, False)

Notify events to clients