hat.event.eventer
Eventer communication protocol
1"""Eventer communication protocol""" 2 3from hat.event.eventer.client import (StatusCb, 4 EventsCb, 5 EventerInitError, 6 connect, 7 Client) 8from hat.event.eventer.server import (ConnectionId, 9 ConnectionInfo, 10 ConnectionCb, 11 RegisterCb, 12 QueryCb, 13 listen, 14 Server) 15 16 17__all__ = ['StatusCb', 18 'EventsCb', 19 'EventerInitError', 20 'connect', 21 'Client', 22 'ConnectionId', 23 'ConnectionInfo', 24 'ConnectionCb', 25 'RegisterCb', 26 'QueryCb', 27 'listen', 28 'Server']
Eventer initialization error
33async def connect(addr: tcp.Address, 34 client_name: str, 35 *, 36 client_token: str | None = None, 37 subscriptions: Iterable[common.EventType] = [], 38 server_id: common.ServerId | None = None, 39 persisted: bool = False, 40 status_cb: StatusCb | None = None, 41 events_cb: EventsCb | None = None, 42 **kwargs 43 ) -> 'Client': 44 """Connect to Eventer Server 45 46 Arguments `client_name` and optional `client_token` identifies eventer 47 client. 48 49 According to Event Server specification, each subscription is event 50 type identifier which can contain special subtypes ``?`` and ``*``. 51 Subtype ``?`` can occur at any position inside event type identifier 52 and is used as replacement for any single subtype. Subtype ``*`` is valid 53 only as last subtype in event type identifier and is used as replacement 54 for zero or more arbitrary subtypes. 55 56 If `subscriptions` is empty list, client doesn't subscribe for any events 57 and will not receive server's notifications. 58 59 If `server_id` is ``None``, client will receive all event notifications, 60 in accordance to `subscriptions`, regardless of event's server id. If 61 `server_id` is set, Eventer Server will only send events notifications 62 for events with provided server id. 63 64 If `persisted` is set to ``True``, Eventer Server will notify events 65 after they are persisted (flushed to disk). Otherwise, events are 66 notified immediately after registration. 67 68 Additional arguments are passed to `hat.chatter.connect` coroutine. 69 70 """ 71 client = Client() 72 client._status_cb = status_cb 73 client._events_cb = events_cb 74 client._loop = asyncio.get_running_loop() 75 client._conv_futures = {} 76 client._status = common.Status.STANDBY 77 78 client._conn = await chatter.connect(addr, **kwargs) 79 80 try: 81 req_data = {'clientName': client_name, 82 'clientToken': _optional_to_sbs(client_token), 83 'subscriptions': [list(i) for i in subscriptions], 84 'serverId': _optional_to_sbs(server_id), 85 'persisted': persisted} 86 conv = await common.send_msg(conn=client._conn, 87 msg_type='HatEventer.MsgInitReq', 88 msg_data=req_data, 89 last=False) 90 91 res, res_type, res_data = await common.receive_msg(client._conn) 92 if res_type != 'HatEventer.MsgInitRes' or res.conv != conv: 93 raise Exception('invalid init response') 94 95 if res_data[0] == 'success': 96 client._status = common.Status(common.status_from_sbs(res_data[1])) 97 98 elif res_data[0] == 'error': 99 raise EventerInitError(res_data[1]) 100 101 else: 102 raise ValueError('unsupported init response') 103 104 client.async_group.spawn(client._receive_loop) 105 106 except BaseException: 107 await aio.uncancellable(client.async_close()) 108 raise 109 110 return client
Connect to Eventer Server
Arguments client_name and optional client_token identifies eventer
client.
According to Event Server specification, each subscription is event
type identifier which can contain special subtypes ? and *.
Subtype ? can occur at any position inside event type identifier
and is used as replacement for any single subtype. Subtype * is valid
only as last subtype in event type identifier and is used as replacement
for zero or more arbitrary subtypes.
If subscriptions is empty list, client doesn't subscribe for any events
and will not receive server's notifications.
If server_id is None, client will receive all event notifications,
in accordance to subscriptions, regardless of event's server id. If
server_id is set, Eventer Server will only send events notifications
for events with provided server id.
If persisted is set to True, Eventer Server will notify events
after they are persisted (flushed to disk). Otherwise, events are
notified immediately after registration.
Additional arguments are passed to hat.chatter.connect coroutine.
113class Client(aio.Resource): 114 """Eventer client 115 116 For creating new client see `connect` coroutine. 117 118 """ 119 120 @property 121 def async_group(self) -> aio.Group: 122 """Async group""" 123 return self._conn.async_group 124 125 @property 126 def status(self) -> common.Status: 127 """Status""" 128 return self._status 129 130 async def register(self, 131 events: Collection[common.RegisterEvent], 132 with_response: bool = False 133 ) -> Collection[common.Event] | None: 134 """Register events and optionally wait for response 135 136 If `with_response` is ``True``, this coroutine returns list of events 137 or ``None`` if registration failure occurred. 138 139 """ 140 msg_data = [common.register_event_to_sbs(i) for i in events] 141 conv = await common.send_msg(conn=self._conn, 142 msg_type='HatEventer.MsgRegisterReq', 143 msg_data=msg_data, 144 last=not with_response) 145 146 if with_response: 147 return await self._wait_conv_res(conv) 148 149 async def query(self, 150 params: common.QueryParams 151 ) -> common.QueryResult: 152 """Query events from server""" 153 msg_data = common.query_params_to_sbs(params) 154 conv = await common.send_msg(conn=self._conn, 155 msg_type='HatEventer.MsgQueryReq', 156 msg_data=msg_data, 157 last=False) 158 159 return await self._wait_conv_res(conv) 160 161 async def _receive_loop(self): 162 mlog.debug("starting receive loop") 163 try: 164 while True: 165 mlog.debug("waiting for incoming message") 166 msg, msg_type, msg_data = await common.receive_msg(self._conn) 167 168 if msg_type == 'HatEventer.MsgStatusNotify': 169 mlog.debug("received status notification") 170 await self._process_msg_status_notify(msg, msg_data) 171 172 elif msg_type == 'HatEventer.MsgEventsNotify': 173 mlog.debug("received events notification") 174 await self._process_msg_events_notify(msg, msg_data) 175 176 elif msg_type == 'HatEventer.MsgRegisterRes': 177 mlog.debug("received register response") 178 await self._process_msg_register_res(msg, msg_data) 179 180 elif msg_type == 'HatEventer.MsgQueryRes': 181 mlog.debug("received query response") 182 await self._process_msg_query_res(msg, msg_data) 183 184 else: 185 raise Exception("unsupported message type") 186 187 except ConnectionError: 188 pass 189 190 except Exception as e: 191 mlog.error("read loop error: %s", e, exc_info=e) 192 193 finally: 194 mlog.debug("stopping receive loop") 195 self.close() 196 197 for future in self._conv_futures.values(): 198 if not future.done(): 199 future.set_exception(ConnectionError()) 200 201 async def _wait_conv_res(self, conv): 202 if not self.is_open: 203 raise ConnectionError() 204 205 future = self._loop.create_future() 206 self._conv_futures[conv] = future 207 208 try: 209 return await future 210 211 finally: 212 self._conv_futures.pop(conv, None) 213 214 async def _process_msg_status_notify(self, msg, msg_data): 215 self._status = common.status_from_sbs(msg_data) 216 217 if self._status_cb: 218 await aio.call(self._status_cb, self, self._status) 219 220 async def _process_msg_events_notify(self, msg, msg_data): 221 events = [common.event_from_sbs(event) for event in msg_data] 222 223 if self._events_cb: 224 await aio.call(self._events_cb, self, events) 225 226 if msg.last: 227 return 228 229 await common.send_msg(conn=self._conn, 230 msg_type='HatEventer.MsgEventsAck', 231 msg_data=None, 232 conv=msg.conv) 233 234 async def _process_msg_register_res(self, msg, msg_data): 235 if msg_data[0] == 'events': 236 result = [common.event_from_sbs(event) for event in msg_data[1]] 237 238 elif msg_data[0] == 'failure': 239 result = None 240 241 else: 242 raise ValueError('unsupported register response') 243 244 future = self._conv_futures.get(msg.conv) 245 if not future or future.done(): 246 return 247 248 future.set_result(result) 249 250 async def _process_msg_query_res(self, msg, msg_data): 251 result = common.query_result_from_sbs(msg_data) 252 253 future = self._conv_futures.get(msg.conv) 254 if not future or future.done(): 255 return 256 257 future.set_result(result)
Eventer client
For creating new client see connect coroutine.
120 @property 121 def async_group(self) -> aio.Group: 122 """Async group""" 123 return self._conn.async_group
Async group
130 async def register(self, 131 events: Collection[common.RegisterEvent], 132 with_response: bool = False 133 ) -> Collection[common.Event] | None: 134 """Register events and optionally wait for response 135 136 If `with_response` is ``True``, this coroutine returns list of events 137 or ``None`` if registration failure occurred. 138 139 """ 140 msg_data = [common.register_event_to_sbs(i) for i in events] 141 conv = await common.send_msg(conn=self._conn, 142 msg_type='HatEventer.MsgRegisterReq', 143 msg_data=msg_data, 144 last=not with_response) 145 146 if with_response: 147 return await self._wait_conv_res(conv)
Register events and optionally wait for response
If with_response is True, this coroutine returns list of events
or None if registration failure occurred.
149 async def query(self, 150 params: common.QueryParams 151 ) -> common.QueryResult: 152 """Query events from server""" 153 msg_data = common.query_params_to_sbs(params) 154 conv = await common.send_msg(conn=self._conn, 155 msg_type='HatEventer.MsgQueryReq', 156 msg_data=msg_data, 157 last=False) 158 159 return await self._wait_conv_res(conv)
Query events from server
24class ConnectionInfo(typing.NamedTuple): 25 id: ConnectionId 26 client_name: str 27 client_token: str | None 28 subscription: common.Subscription 29 server_id: int | None 30 persisted: bool
ConnectionInfo(id, client_name, client_token, subscription, server_id, persisted)
Create new instance of ConnectionInfo(id, client_name, client_token, subscription, server_id, persisted)
47async def listen(addr: tcp.Address, 48 *, 49 status: common.Status = common.Status.STANDBY, 50 connected_cb: ConnectionCb | None = None, 51 disconnected_cb: ConnectionCb | None = None, 52 register_cb: RegisterCb | None = None, 53 query_cb: QueryCb | None = None, 54 close_timeout: float = 0.5, 55 **kwargs 56 ) -> 'Server': 57 """Create listening Eventer Server instance""" 58 server = Server() 59 server._status = status 60 server._connected_cb = connected_cb 61 server._disconnected_cb = disconnected_cb 62 server._register_cb = register_cb 63 server._query_cb = query_cb 64 server._close_timeout = close_timeout 65 server._loop = asyncio.get_running_loop() 66 server._next_conn_ids = itertools.count(1) 67 server._conn_infos = {} 68 server._conn_conv_futures = {} 69 70 server._srv = await chatter.listen(server._connection_loop, addr, **kwargs) 71 mlog.debug("listening on %s", addr) 72 73 return server
Create listening Eventer Server instance
76class Server(aio.Resource): 77 78 @property 79 def async_group(self) -> aio.Group: 80 """Async group""" 81 return self._srv.async_group 82 83 def get_conn_infos(self) -> list[ConnectionInfo]: 84 """Get connection infos""" 85 return list(self._conn_infos.values()) 86 87 async def set_status(self, status: common.Status): 88 """Set status""" 89 if self._status == status: 90 return 91 92 self._status = status 93 94 for conn in list(self._conn_infos.keys()): 95 await self._notify_status(conn) 96 97 async def notify_events(self, 98 events: Collection[common.Event], 99 persisted: bool, 100 with_ack: bool = False): 101 """Notify events to clients""" 102 conn_conn_events = collections.deque() 103 104 for conn, info in self._conn_infos.items(): 105 if info.persisted != persisted: 106 continue 107 108 conn_events = collections.deque( 109 event for event in events 110 if (info.subscription.matches(event.type) and 111 (info.server_id is None or 112 info.server_id == event.id.server))) 113 if not conn_events: 114 continue 115 116 conn_conn_events.append((conn, conn_events)) 117 118 if not conn_conn_events: 119 return 120 121 if with_ack: 122 await asyncio.wait([ 123 self.async_group.spawn(self._notify_events, conn, conn_events, 124 True) 125 for conn, conn_events in conn_conn_events]) 126 127 else: 128 for conn, conn_events in conn_conn_events: 129 await self._notify_events(conn, conn_events, False) 130 131 async def _connection_loop(self, conn): 132 mlog.debug("starting connection loop") 133 conn_id = next(self._next_conn_ids) 134 info = None 135 136 try: 137 req, req_type, req_data = await common.receive_msg(conn) 138 if req_type != 'HatEventer.MsgInitReq': 139 raise Exception('invalid init request type') 140 141 try: 142 info = ConnectionInfo( 143 id=conn_id, 144 client_name=req_data['clientName'], 145 client_token=_optional_from_sbs(req_data['clientToken']), 146 subscription=common.create_subscription( 147 tuple(i) for i in req_data['subscriptions']), 148 server_id=_optional_from_sbs(req_data['serverId']), 149 persisted=req_data['persisted']) 150 151 if self._connected_cb: 152 await aio.call(self._connected_cb, info) 153 154 res_data = 'success', common.status_to_sbs(self._status) 155 self._conn_infos[conn] = info 156 157 except Exception as e: 158 mlog.warning("connection initialization error: %s", e, 159 exc_info=e) 160 161 info = None 162 res_data = 'error', str(e) 163 164 mlog.debug("sending init response %s", res_data[0]) 165 await common.send_msg(conn, 'HatEventer.MsgInitRes', res_data, 166 conv=req.conv) 167 168 if res_data[0] != 'success': 169 with contextlib.suppress(asyncio.TimeoutError): 170 await aio.wait_for(conn.wait_closing(), 171 self._close_timeout) 172 return 173 174 while True: 175 mlog.debug("waiting for incomming messages") 176 msg, msg_type, msg_data = await common.receive_msg(conn) 177 178 if msg_type == 'HatEventer.MsgEventsAck': 179 mlog.debug("received events ack") 180 future = self._conn_conv_futures.get((conn, msg.conv)) 181 if future and not future.done(): 182 future.set_result(None) 183 184 elif msg_type == 'HatEventer.MsgRegisterReq': 185 mlog.debug("received register request") 186 await self._process_msg_register(conn, info, msg, msg_data) 187 188 elif msg_type == 'HatEventer.MsgQueryReq': 189 mlog.debug("received query request") 190 await self._process_msg_query(conn, info, msg, msg_data) 191 192 else: 193 raise Exception('unsupported message type') 194 195 except ConnectionError: 196 pass 197 198 except Exception as e: 199 mlog.error("on connection error: %s", e, exc_info=e) 200 201 finally: 202 mlog.debug("stopping connection loop") 203 conn.close() 204 self._conn_infos.pop(conn, None) 205 206 for future in self._conn_conv_futures.values(): 207 if not future.done(): 208 future.set_exception(ConnectionError()) 209 210 if self._disconnected_cb and info: 211 with contextlib.suppress(Exception): 212 await aio.call(self._disconnected_cb, info) 213 214 async def _process_msg_register(self, conn, info, req, req_data): 215 register_events = [common.register_event_from_sbs(i) 216 for i in req_data] 217 218 if self._register_cb: 219 events = await aio.call(self._register_cb, info, register_events) 220 221 else: 222 events = None 223 224 if req.last: 225 return 226 227 if events is not None: 228 res_data = 'events', [common.event_to_sbs(event) 229 for event in events] 230 231 else: 232 res_data = 'failure', None 233 234 await common.send_msg(conn, 'HatEventer.MsgRegisterRes', res_data, 235 conv=req.conv) 236 237 async def _process_msg_query(self, conn, info, req, req_data): 238 params = common.query_params_from_sbs(req_data) 239 240 if self._query_cb: 241 result = await aio.call(self._query_cb, info, params) 242 243 else: 244 result = common.QueryResult(events=[], 245 more_follows=False) 246 247 res_data = common.query_result_to_sbs(result) 248 await common.send_msg(conn, 'HatEventer.MsgQueryRes', res_data, 249 conv=req.conv) 250 251 async def _notify_status(self, conn): 252 try: 253 msg_data = common.status_to_sbs(self._status) 254 await common.send_msg(conn, 'HatEventer.MsgStatusNotify', msg_data) 255 256 except ConnectionError: 257 pass 258 259 except Exception as e: 260 mlog.error("notify status error: %s", e, exc_info=e) 261 262 async def _notify_events(self, conn, events, with_ack): 263 try: 264 msg_data = [common.event_to_sbs(event) for event in events] 265 conv = await common.send_msg(conn, 266 'HatEventer.MsgEventsNotify', 267 msg_data, 268 last=not with_ack) 269 270 if not with_ack: 271 return 272 273 future = self._loop.create_future() 274 self._conn_conv_futures[(conn, conv)] = future 275 276 try: 277 await future 278 279 finally: 280 self._conn_conv_futures.pop((conn, conv)) 281 282 except ConnectionError: 283 pass 284 285 except Exception as e: 286 mlog.error("notify events error: %s", e, exc_info=e)
Resource with lifetime control based on Group.
78 @property 79 def async_group(self) -> aio.Group: 80 """Async group""" 81 return self._srv.async_group
Async group
83 def get_conn_infos(self) -> list[ConnectionInfo]: 84 """Get connection infos""" 85 return list(self._conn_infos.values())
Get connection infos
87 async def set_status(self, status: common.Status): 88 """Set status""" 89 if self._status == status: 90 return 91 92 self._status = status 93 94 for conn in list(self._conn_infos.keys()): 95 await self._notify_status(conn)
Set status
97 async def notify_events(self, 98 events: Collection[common.Event], 99 persisted: bool, 100 with_ack: bool = False): 101 """Notify events to clients""" 102 conn_conn_events = collections.deque() 103 104 for conn, info in self._conn_infos.items(): 105 if info.persisted != persisted: 106 continue 107 108 conn_events = collections.deque( 109 event for event in events 110 if (info.subscription.matches(event.type) and 111 (info.server_id is None or 112 info.server_id == event.id.server))) 113 if not conn_events: 114 continue 115 116 conn_conn_events.append((conn, conn_events)) 117 118 if not conn_conn_events: 119 return 120 121 if with_ack: 122 await asyncio.wait([ 123 self.async_group.spawn(self._notify_events, conn, conn_events, 124 True) 125 for conn, conn_events in conn_conn_events]) 126 127 else: 128 for conn, conn_events in conn_conn_events: 129 await self._notify_events(conn, conn_events, False)
Notify events to clients