hat.event.eventer

Eventer communication protocol

 1"""Eventer communication protocol"""
 2
 3from hat.event.eventer.client import (StatusCb,
 4                                      EventsCb,
 5                                      EventerInitError,
 6                                      connect,
 7                                      Client)
 8from hat.event.eventer.server import (ConnectionId,
 9                                      ConnectionInfo,
10                                      ConnectionCb,
11                                      RegisterCb,
12                                      QueryCb,
13                                      listen,
14                                      Server)
15
16
17__all__ = ['StatusCb',
18           'EventsCb',
19           'EventerInitError',
20           'connect',
21           'Client',
22           'ConnectionId',
23           'ConnectionInfo',
24           'ConnectionCb',
25           'RegisterCb',
26           'QueryCb',
27           'listen',
28           'Server']
StatusCb = typing.Callable[[ForwardRef('Client'), hat.event.common.common.Status], typing.Optional[typing.Awaitable[NoneType]]]
EventsCb = typing.Callable[[ForwardRef('Client'), collections.abc.Collection[hat.event.common.common.Event]], typing.Optional[typing.Awaitable[NoneType]]]
class EventerInitError(builtins.Exception):
29class EventerInitError(Exception):
30    """Eventer initialization error"""

Eventer initialization error

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
async def connect( addr: hat.drivers.tcp.Address, client_name: str, *, client_token: str | None = None, subscriptions: collections.abc.Iterable[tuple[str, ...]] = [], server_id: int | None = None, persisted: bool = False, status_cb: Optional[Callable[[Client, hat.event.common.common.Status], Optional[Awaitable[NoneType]]]] = None, events_cb: Optional[Callable[[Client, collections.abc.Collection[hat.event.common.common.Event]], Optional[Awaitable[NoneType]]]] = None, **kwargs) -> Client:
 33async def connect(addr: tcp.Address,
 34                  client_name: str,
 35                  *,
 36                  client_token: str | None = None,
 37                  subscriptions: Iterable[common.EventType] = [],
 38                  server_id: common.ServerId | None = None,
 39                  persisted: bool = False,
 40                  status_cb: StatusCb | None = None,
 41                  events_cb: EventsCb | None = None,
 42                  **kwargs
 43                  ) -> 'Client':
 44    """Connect to Eventer Server
 45
 46    Arguments `client_name` and optional `client_token` identifies eventer
 47    client.
 48
 49    According to Event Server specification, each subscription is event
 50    type identifier which can contain special subtypes ``?`` and ``*``.
 51    Subtype ``?`` can occur at any position inside event type identifier
 52    and is used as replacement for any single subtype. Subtype ``*`` is valid
 53    only as last subtype in event type identifier and is used as replacement
 54    for zero or more arbitrary subtypes.
 55
 56    If `subscriptions` is empty list, client doesn't subscribe for any events
 57    and will not receive server's notifications.
 58
 59    If `server_id` is ``None``, client will receive all event notifications,
 60    in accordance to `subscriptions`, regardless of event's server id. If
 61    `server_id` is set, Eventer Server will only send events notifications
 62    for events with provided server id.
 63
 64    If `persisted` is set to ``True``, Eventer Server will notify events
 65    after they are persisted (flushed to disk). Otherwise, events are
 66    notified immediately after registration.
 67
 68    Additional arguments are passed to `hat.chatter.connect` coroutine.
 69
 70    """
 71    client = Client()
 72    client._status_cb = status_cb
 73    client._events_cb = events_cb
 74    client._loop = asyncio.get_running_loop()
 75    client._conv_futures = {}
 76    client._status = common.Status.STANDBY
 77
 78    client._conn = await chatter.connect(addr, **kwargs)
 79
 80    try:
 81        req_data = {'clientName': client_name,
 82                    'clientToken': _optional_to_sbs(client_token),
 83                    'subscriptions': [list(i) for i in subscriptions],
 84                    'serverId': _optional_to_sbs(server_id),
 85                    'persisted': persisted}
 86        conv = await common.send_msg(conn=client._conn,
 87                                     msg_type='HatEventer.MsgInitReq',
 88                                     msg_data=req_data,
 89                                     last=False)
 90
 91        res, res_type, res_data = await common.receive_msg(client._conn)
 92        if res_type != 'HatEventer.MsgInitRes' or res.conv != conv:
 93            raise Exception('invalid init response')
 94
 95        if res_data[0] == 'success':
 96            client._status = common.Status(common.status_from_sbs(res_data[1]))
 97
 98        elif res_data[0] == 'error':
 99            raise EventerInitError(res_data[1])
100
101        else:
102            raise ValueError('unsupported init response')
103
104        client.async_group.spawn(client._receive_loop)
105
106    except BaseException:
107        await aio.uncancellable(client.async_close())
108        raise
109
110    return client

Connect to Eventer Server

Arguments client_name and optional client_token identifies eventer client.

According to Event Server specification, each subscription is event type identifier which can contain special subtypes ? and *. Subtype ? can occur at any position inside event type identifier and is used as replacement for any single subtype. Subtype * is valid only as last subtype in event type identifier and is used as replacement for zero or more arbitrary subtypes.

If subscriptions is empty list, client doesn't subscribe for any events and will not receive server's notifications.

If server_id is None, client will receive all event notifications, in accordance to subscriptions, regardless of event's server id. If server_id is set, Eventer Server will only send events notifications for events with provided server id.

If persisted is set to True, Eventer Server will notify events after they are persisted (flushed to disk). Otherwise, events are notified immediately after registration.

Additional arguments are passed to hat.chatter.connect coroutine.

class Client(hat.aio.group.Resource):
113class Client(aio.Resource):
114    """Eventer client
115
116    For creating new client see `connect` coroutine.
117
118    """
119
120    @property
121    def async_group(self) -> aio.Group:
122        """Async group"""
123        return self._conn.async_group
124
125    @property
126    def status(self) -> common.Status:
127        """Status"""
128        return self._status
129
130    async def register(self,
131                       events: Collection[common.RegisterEvent],
132                       with_response: bool = False
133                       ) -> Collection[common.Event] | None:
134        """Register events and optionally wait for response
135
136        If `with_response` is ``True``, this coroutine returns list of events
137        or ``None`` if registration failure occurred.
138
139        """
140        msg_data = [common.register_event_to_sbs(i) for i in events]
141        conv = await common.send_msg(conn=self._conn,
142                                     msg_type='HatEventer.MsgRegisterReq',
143                                     msg_data=msg_data,
144                                     last=not with_response)
145
146        if with_response:
147            return await self._wait_conv_res(conv)
148
149    async def query(self,
150                    params: common.QueryParams
151                    ) -> common.QueryResult:
152        """Query events from server"""
153        msg_data = common.query_params_to_sbs(params)
154        conv = await common.send_msg(conn=self._conn,
155                                     msg_type='HatEventer.MsgQueryReq',
156                                     msg_data=msg_data,
157                                     last=False)
158
159        return await self._wait_conv_res(conv)
160
161    async def _receive_loop(self):
162        mlog.debug("starting receive loop")
163        try:
164            while True:
165                mlog.debug("waiting for incoming message")
166                msg, msg_type, msg_data = await common.receive_msg(self._conn)
167
168                if msg_type == 'HatEventer.MsgStatusNotify':
169                    mlog.debug("received status notification")
170                    await self._process_msg_status_notify(msg, msg_data)
171
172                elif msg_type == 'HatEventer.MsgEventsNotify':
173                    mlog.debug("received events notification")
174                    await self._process_msg_events_notify(msg, msg_data)
175
176                elif msg_type == 'HatEventer.MsgRegisterRes':
177                    mlog.debug("received register response")
178                    await self._process_msg_register_res(msg, msg_data)
179
180                elif msg_type == 'HatEventer.MsgQueryRes':
181                    mlog.debug("received query response")
182                    await self._process_msg_query_res(msg, msg_data)
183
184                else:
185                    raise Exception("unsupported message type")
186
187        except ConnectionError:
188            pass
189
190        except Exception as e:
191            mlog.error("read loop error: %s", e, exc_info=e)
192
193        finally:
194            mlog.debug("stopping receive loop")
195            self.close()
196
197            for future in self._conv_futures.values():
198                if not future.done():
199                    future.set_exception(ConnectionError())
200
201    async def _wait_conv_res(self, conv):
202        if not self.is_open:
203            raise ConnectionError()
204
205        future = self._loop.create_future()
206        self._conv_futures[conv] = future
207
208        try:
209            return await future
210
211        finally:
212            self._conv_futures.pop(conv, None)
213
214    async def _process_msg_status_notify(self, msg, msg_data):
215        self._status = common.status_from_sbs(msg_data)
216
217        if self._status_cb:
218            await aio.call(self._status_cb, self, self._status)
219
220        await common.send_msg(conn=self._conn,
221                              msg_type='HatEventer.MsgStatusAck',
222                              msg_data=None,
223                              conv=msg.conv)
224
225    async def _process_msg_events_notify(self, msg, msg_data):
226        events = [common.event_from_sbs(event) for event in msg_data]
227
228        if self._events_cb:
229            await aio.call(self._events_cb, self, events)
230
231    async def _process_msg_register_res(self, msg, msg_data):
232        if msg_data[0] == 'events':
233            result = [common.event_from_sbs(event) for event in msg_data[1]]
234
235        elif msg_data[0] == 'failure':
236            result = None
237
238        else:
239            raise ValueError('unsupported register response')
240
241        future = self._conv_futures.get(msg.conv)
242        if not future or future.done():
243            return
244
245        future.set_result(result)
246
247    async def _process_msg_query_res(self, msg, msg_data):
248        result = common.query_result_from_sbs(msg_data)
249
250        future = self._conv_futures.get(msg.conv)
251        if not future or future.done():
252            return
253
254        future.set_result(result)

Eventer client

For creating new client see connect coroutine.

async_group: hat.aio.group.Group
120    @property
121    def async_group(self) -> aio.Group:
122        """Async group"""
123        return self._conn.async_group

Async group

status: hat.event.common.common.Status
125    @property
126    def status(self) -> common.Status:
127        """Status"""
128        return self._status

Status

async def register( self, events: collections.abc.Collection[hat.event.common.common.RegisterEvent], with_response: bool = False) -> collections.abc.Collection[hat.event.common.common.Event] | None:
130    async def register(self,
131                       events: Collection[common.RegisterEvent],
132                       with_response: bool = False
133                       ) -> Collection[common.Event] | None:
134        """Register events and optionally wait for response
135
136        If `with_response` is ``True``, this coroutine returns list of events
137        or ``None`` if registration failure occurred.
138
139        """
140        msg_data = [common.register_event_to_sbs(i) for i in events]
141        conv = await common.send_msg(conn=self._conn,
142                                     msg_type='HatEventer.MsgRegisterReq',
143                                     msg_data=msg_data,
144                                     last=not with_response)
145
146        if with_response:
147            return await self._wait_conv_res(conv)

Register events and optionally wait for response

If with_response is True, this coroutine returns list of events or None if registration failure occurred.

async def query( self, params: hat.event.common.common.QueryLatestParams | hat.event.common.common.QueryTimeseriesParams | hat.event.common.common.QueryServerParams) -> hat.event.common.common.QueryResult:
149    async def query(self,
150                    params: common.QueryParams
151                    ) -> common.QueryResult:
152        """Query events from server"""
153        msg_data = common.query_params_to_sbs(params)
154        conv = await common.send_msg(conn=self._conn,
155                                     msg_type='HatEventer.MsgQueryReq',
156                                     msg_data=msg_data,
157                                     last=False)
158
159        return await self._wait_conv_res(conv)

Query events from server

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
ConnectionId = <class 'int'>
class ConnectionInfo(typing.NamedTuple):
24class ConnectionInfo(typing.NamedTuple):
25    id: ConnectionId
26    client_name: str
27    client_token: str | None
28    subscription: common.Subscription
29    server_id: int | None
30    persisted: bool

ConnectionInfo(id, client_name, client_token, subscription, server_id, persisted)

ConnectionInfo( id: int, client_name: str, client_token: str | None, subscription: hat.event.common.subscription.common.Subscription, server_id: int | None, persisted: bool)

Create new instance of ConnectionInfo(id, client_name, client_token, subscription, server_id, persisted)

id: int

Alias for field number 0

client_name: str

Alias for field number 1

client_token: str | None

Alias for field number 2

subscription: hat.event.common.subscription.common.Subscription

Alias for field number 3

server_id: int | None

Alias for field number 4

persisted: bool

Alias for field number 5

Inherited Members
builtins.tuple
index
count
ConnectionCb = typing.Callable[[ConnectionInfo], typing.Optional[typing.Awaitable[NoneType]]]
RegisterCb = typing.Callable[[ConnectionInfo, collections.abc.Collection[hat.event.common.common.RegisterEvent]], typing.Union[collections.abc.Collection[hat.event.common.common.Event], NoneType, typing.Awaitable[collections.abc.Collection[hat.event.common.common.Event] | None]]]
QueryCb = typing.Callable[[ConnectionInfo, hat.event.common.common.QueryLatestParams | hat.event.common.common.QueryTimeseriesParams | hat.event.common.common.QueryServerParams], typing.Union[hat.event.common.common.QueryResult, typing.Awaitable[hat.event.common.common.QueryResult]]]
async def listen( addr: hat.drivers.tcp.Address, *, status: hat.event.common.common.Status = <Status.STANDBY: 'standby'>, connected_cb: Optional[Callable[[ConnectionInfo], Optional[Awaitable[NoneType]]]] = None, disconnected_cb: Optional[Callable[[ConnectionInfo], Optional[Awaitable[NoneType]]]] = None, register_cb: Optional[Callable[[ConnectionInfo, collections.abc.Collection[hat.event.common.common.RegisterEvent]], Union[collections.abc.Collection[hat.event.common.common.Event], NoneType, Awaitable[collections.abc.Collection[hat.event.common.common.Event] | None]]]] = None, query_cb: Optional[Callable[[ConnectionInfo, hat.event.common.common.QueryLatestParams | hat.event.common.common.QueryTimeseriesParams | hat.event.common.common.QueryServerParams], Union[hat.event.common.common.QueryResult, Awaitable[hat.event.common.common.QueryResult]]]] = None, close_timeout: float = 0.5, **kwargs) -> Server:
47async def listen(addr: tcp.Address,
48                 *,
49                 status: common.Status = common.Status.STANDBY,
50                 connected_cb: ConnectionCb | None = None,
51                 disconnected_cb: ConnectionCb | None = None,
52                 register_cb: RegisterCb | None = None,
53                 query_cb: QueryCb | None = None,
54                 close_timeout: float = 0.5,
55                 **kwargs
56                 ) -> 'Server':
57    """Create listening Eventer Server instance"""
58    server = Server()
59    server._status = status
60    server._connected_cb = connected_cb
61    server._disconnected_cb = disconnected_cb
62    server._register_cb = register_cb
63    server._query_cb = query_cb
64    server._close_timeout = close_timeout
65    server._loop = asyncio.get_running_loop()
66    server._next_conn_ids = itertools.count(1)
67    server._conn_infos = {}
68    server._conn_conv_futures = {}
69
70    server._srv = await chatter.listen(server._connection_loop, addr, **kwargs)
71    mlog.debug("listening on %s", addr)
72
73    return server

Create listening Eventer Server instance

class Server(hat.aio.group.Resource):
 76class Server(aio.Resource):
 77
 78    @property
 79    def async_group(self) -> aio.Group:
 80        """Async group"""
 81        return self._srv.async_group
 82
 83    async def set_status(self, status: common.Status):
 84        """Set status and wait for acks"""
 85        if self._status == status:
 86            return
 87
 88        self._status = status
 89        tasks = [self.async_group.spawn(self._notify_status, conn, status)
 90                 for conn in self._conn_infos.keys()]
 91        if not tasks:
 92            return
 93
 94        await asyncio.wait(tasks)
 95
 96    async def notify_events(self,
 97                            events: Collection[common.Event],
 98                            persisted: bool):
 99        """Notify events to clients"""
100        for conn, info in list(self._conn_infos.items()):
101            if info.persisted != persisted:
102                continue
103
104            filtered_events = collections.deque(
105                event for event in events
106                if (info.subscription.matches(event.type) and
107                    (info.server_id is None or
108                     info.server_id == event.id.server)))
109            if not filtered_events:
110                continue
111
112            await self._notify_events(conn, filtered_events)
113
114    async def _connection_loop(self, conn):
115        mlog.debug("starting connection loop")
116        conn_id = next(self._next_conn_ids)
117        info = None
118
119        try:
120            req, req_type, req_data = await common.receive_msg(conn)
121            if req_type != 'HatEventer.MsgInitReq':
122                raise Exception('invalid init request type')
123
124            try:
125                info = ConnectionInfo(
126                    id=conn_id,
127                    client_name=req_data['clientName'],
128                    client_token=_optional_from_sbs(req_data['clientToken']),
129                    subscription=common.create_subscription(
130                        tuple(i) for i in req_data['subscriptions']),
131                    server_id=_optional_from_sbs(req_data['serverId']),
132                    persisted=req_data['persisted'])
133
134                if self._connected_cb:
135                    await aio.call(self._connected_cb, info)
136
137                res_data = 'success', common.status_to_sbs(self._status)
138                self._conn_infos[conn] = info
139
140            except Exception as e:
141                info = None
142                res_data = 'error', str(e)
143
144            mlog.debug("sending init response %s", res_data[0])
145            await common.send_msg(conn, 'HatEventer.MsgInitRes', res_data,
146                                  conv=req.conv)
147
148            if res_data[0] != 'success':
149                with contextlib.suppress(asyncio.TimeoutError):
150                    await aio.wait_for(conn.wait_closing(),
151                                       self._close_timeout)
152                return
153
154            while True:
155                mlog.debug("waiting for incomming messages")
156                msg, msg_type, msg_data = await common.receive_msg(conn)
157
158                if msg_type == 'HatEventer.MsgStatusAck':
159                    mlog.debug("received status ack")
160                    future = self._conn_conv_futures.get((conn, msg.conv))
161                    if future and not future.done():
162                        future.set_result(None)
163
164                elif msg_type == 'HatEventer.MsgRegisterReq':
165                    mlog.debug("received register request")
166                    await self._process_msg_register(conn, info, msg, msg_data)
167
168                elif msg_type == 'HatEventer.MsgQueryReq':
169                    mlog.debug("received query request")
170                    await self._process_msg_query(conn, info, msg, msg_data)
171
172                else:
173                    raise Exception('unsupported message type')
174
175        except ConnectionError:
176            pass
177
178        except Exception as e:
179            mlog.error("on connection error: %s", e, exc_info=e)
180
181        finally:
182            mlog.debug("stopping connection loop")
183            conn.close()
184            self._conn_infos.pop(conn, None)
185
186            for future in self._conn_conv_futures.values():
187                if not future.done():
188                    future.set_exception(ConnectionError())
189
190            if self._disconnected_cb and info:
191                with contextlib.suppress(Exception):
192                    await aio.call(self._disconnected_cb, info)
193
194    async def _process_msg_register(self, conn, info, req, req_data):
195        register_events = [common.register_event_from_sbs(i)
196                           for i in req_data]
197
198        if self._register_cb:
199            events = await aio.call(self._register_cb, info, register_events)
200
201        else:
202            events = None
203
204        if req.last:
205            return
206
207        if events is not None:
208            res_data = 'events', [common.event_to_sbs(event)
209                                  for event in events]
210
211        else:
212            res_data = 'failure', None
213
214        await common.send_msg(conn, 'HatEventer.MsgRegisterRes', res_data,
215                              conv=req.conv)
216
217    async def _process_msg_query(self, conn, info, req, req_data):
218        params = common.query_params_from_sbs(req_data)
219
220        if self._query_cb:
221            result = await aio.call(self._query_cb, info, params)
222
223        else:
224            result = common.QueryResult(events=[],
225                                        more_follows=False)
226
227        res_data = common.query_result_to_sbs(result)
228        await common.send_msg(conn, 'HatEventer.MsgQueryRes', res_data,
229                              conv=req.conv)
230
231    async def _notify_status(self, conn, status):
232        try:
233            req_data = common.status_to_sbs(self._status)
234            conv = await common.send_msg(conn, 'HatEventer.MsgStatusNotify',
235                                         req_data)
236
237            future = self._loop.create_future()
238            self._conn_conv_futures[(conn, conv)] = future
239
240            try:
241                await future
242
243            finally:
244                self._conn_conv_futures.pop((conn, conv))
245
246        except ConnectionError:
247            pass
248
249        except Exception as e:
250            mlog.error("notify status error: %s", e, exc_info=e)
251
252    async def _notify_events(self, conn, events):
253        try:
254            msg_data = [common.event_to_sbs(event) for event in events]
255            await common.send_msg(conn, 'HatEventer.MsgEventsNotify', msg_data)
256
257        except ConnectionError:
258            pass
259
260        except Exception as e:
261            mlog.error("notify events error: %s", e, exc_info=e)

Resource with lifetime control based on Group.

async_group: hat.aio.group.Group
78    @property
79    def async_group(self) -> aio.Group:
80        """Async group"""
81        return self._srv.async_group

Async group

async def set_status(self, status: hat.event.common.common.Status):
83    async def set_status(self, status: common.Status):
84        """Set status and wait for acks"""
85        if self._status == status:
86            return
87
88        self._status = status
89        tasks = [self.async_group.spawn(self._notify_status, conn, status)
90                 for conn in self._conn_infos.keys()]
91        if not tasks:
92            return
93
94        await asyncio.wait(tasks)

Set status and wait for acks

async def notify_events( self, events: collections.abc.Collection[hat.event.common.common.Event], persisted: bool):
 96    async def notify_events(self,
 97                            events: Collection[common.Event],
 98                            persisted: bool):
 99        """Notify events to clients"""
100        for conn, info in list(self._conn_infos.items()):
101            if info.persisted != persisted:
102                continue
103
104            filtered_events = collections.deque(
105                event for event in events
106                if (info.subscription.matches(event.type) and
107                    (info.server_id is None or
108                     info.server_id == event.id.server)))
109            if not filtered_events:
110                continue
111
112            await self._notify_events(conn, filtered_events)

Notify events to clients

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close