hat.event.eventer
Eventer communication protocol
1"""Eventer communication protocol""" 2 3from hat.event.eventer.client import (StatusCb, 4 EventsCb, 5 EventerInitError, 6 connect, 7 Client) 8from hat.event.eventer.server import (ConnectionId, 9 ConnectionInfo, 10 ConnectionCb, 11 RegisterCb, 12 QueryCb, 13 listen, 14 Server) 15 16 17__all__ = ['StatusCb', 18 'EventsCb', 19 'EventerInitError', 20 'connect', 21 'Client', 22 'ConnectionId', 23 'ConnectionInfo', 24 'ConnectionCb', 25 'RegisterCb', 26 'QueryCb', 27 'listen', 28 'Server']
Eventer initialization error
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
33async def connect(addr: tcp.Address, 34 client_name: str, 35 *, 36 client_token: str | None = None, 37 subscriptions: Iterable[common.EventType] = [], 38 server_id: common.ServerId | None = None, 39 persisted: bool = False, 40 status_cb: StatusCb | None = None, 41 events_cb: EventsCb | None = None, 42 **kwargs 43 ) -> 'Client': 44 """Connect to Eventer Server 45 46 Arguments `client_name` and optional `client_token` identifies eventer 47 client. 48 49 According to Event Server specification, each subscription is event 50 type identifier which can contain special subtypes ``?`` and ``*``. 51 Subtype ``?`` can occur at any position inside event type identifier 52 and is used as replacement for any single subtype. Subtype ``*`` is valid 53 only as last subtype in event type identifier and is used as replacement 54 for zero or more arbitrary subtypes. 55 56 If `subscriptions` is empty list, client doesn't subscribe for any events 57 and will not receive server's notifications. 58 59 If `server_id` is ``None``, client will receive all event notifications, 60 in accordance to `subscriptions`, regardless of event's server id. If 61 `server_id` is set, Eventer Server will only send events notifications 62 for events with provided server id. 63 64 If `persisted` is set to ``True``, Eventer Server will notify events 65 after they are persisted (flushed to disk). Otherwise, events are 66 notified immediately after registration. 67 68 Additional arguments are passed to `hat.chatter.connect` coroutine. 69 70 """ 71 client = Client() 72 client._status_cb = status_cb 73 client._events_cb = events_cb 74 client._loop = asyncio.get_running_loop() 75 client._conv_futures = {} 76 client._status = common.Status.STANDBY 77 78 client._conn = await chatter.connect(addr, **kwargs) 79 80 try: 81 req_data = {'clientName': client_name, 82 'clientToken': _optional_to_sbs(client_token), 83 'subscriptions': [list(i) for i in subscriptions], 84 'serverId': _optional_to_sbs(server_id), 85 'persisted': persisted} 86 conv = await common.send_msg(conn=client._conn, 87 msg_type='HatEventer.MsgInitReq', 88 msg_data=req_data, 89 last=False) 90 91 res, res_type, res_data = await common.receive_msg(client._conn) 92 if res_type != 'HatEventer.MsgInitRes' or res.conv != conv: 93 raise Exception('invalid init response') 94 95 if res_data[0] == 'success': 96 client._status = common.Status(common.status_from_sbs(res_data[1])) 97 98 elif res_data[0] == 'error': 99 raise EventerInitError(res_data[1]) 100 101 else: 102 raise ValueError('unsupported init response') 103 104 client.async_group.spawn(client._receive_loop) 105 106 except BaseException: 107 await aio.uncancellable(client.async_close()) 108 raise 109 110 return client
Connect to Eventer Server
Arguments client_name
and optional client_token
identifies eventer
client.
According to Event Server specification, each subscription is event
type identifier which can contain special subtypes ?
and *
.
Subtype ?
can occur at any position inside event type identifier
and is used as replacement for any single subtype. Subtype *
is valid
only as last subtype in event type identifier and is used as replacement
for zero or more arbitrary subtypes.
If subscriptions
is empty list, client doesn't subscribe for any events
and will not receive server's notifications.
If server_id
is None
, client will receive all event notifications,
in accordance to subscriptions
, regardless of event's server id. If
server_id
is set, Eventer Server will only send events notifications
for events with provided server id.
If persisted
is set to True
, Eventer Server will notify events
after they are persisted (flushed to disk). Otherwise, events are
notified immediately after registration.
Additional arguments are passed to hat.chatter.connect
coroutine.
113class Client(aio.Resource): 114 """Eventer client 115 116 For creating new client see `connect` coroutine. 117 118 """ 119 120 @property 121 def async_group(self) -> aio.Group: 122 """Async group""" 123 return self._conn.async_group 124 125 @property 126 def status(self) -> common.Status: 127 """Status""" 128 return self._status 129 130 async def register(self, 131 events: Collection[common.RegisterEvent], 132 with_response: bool = False 133 ) -> Collection[common.Event] | None: 134 """Register events and optionally wait for response 135 136 If `with_response` is ``True``, this coroutine returns list of events 137 or ``None`` if registration failure occurred. 138 139 """ 140 msg_data = [common.register_event_to_sbs(i) for i in events] 141 conv = await common.send_msg(conn=self._conn, 142 msg_type='HatEventer.MsgRegisterReq', 143 msg_data=msg_data, 144 last=not with_response) 145 146 if with_response: 147 return await self._wait_conv_res(conv) 148 149 async def query(self, 150 params: common.QueryParams 151 ) -> common.QueryResult: 152 """Query events from server""" 153 msg_data = common.query_params_to_sbs(params) 154 conv = await common.send_msg(conn=self._conn, 155 msg_type='HatEventer.MsgQueryReq', 156 msg_data=msg_data, 157 last=False) 158 159 return await self._wait_conv_res(conv) 160 161 async def _receive_loop(self): 162 mlog.debug("starting receive loop") 163 try: 164 while True: 165 mlog.debug("waiting for incoming message") 166 msg, msg_type, msg_data = await common.receive_msg(self._conn) 167 168 if msg_type == 'HatEventer.MsgStatusNotify': 169 mlog.debug("received status notification") 170 await self._process_msg_status_notify(msg, msg_data) 171 172 elif msg_type == 'HatEventer.MsgEventsNotify': 173 mlog.debug("received events notification") 174 await self._process_msg_events_notify(msg, msg_data) 175 176 elif msg_type == 'HatEventer.MsgRegisterRes': 177 mlog.debug("received register response") 178 await self._process_msg_register_res(msg, msg_data) 179 180 elif msg_type == 'HatEventer.MsgQueryRes': 181 mlog.debug("received query response") 182 await self._process_msg_query_res(msg, msg_data) 183 184 else: 185 raise Exception("unsupported message type") 186 187 except ConnectionError: 188 pass 189 190 except Exception as e: 191 mlog.error("read loop error: %s", e, exc_info=e) 192 193 finally: 194 mlog.debug("stopping receive loop") 195 self.close() 196 197 for future in self._conv_futures.values(): 198 if not future.done(): 199 future.set_exception(ConnectionError()) 200 201 async def _wait_conv_res(self, conv): 202 if not self.is_open: 203 raise ConnectionError() 204 205 future = self._loop.create_future() 206 self._conv_futures[conv] = future 207 208 try: 209 return await future 210 211 finally: 212 self._conv_futures.pop(conv, None) 213 214 async def _process_msg_status_notify(self, msg, msg_data): 215 self._status = common.status_from_sbs(msg_data) 216 217 if self._status_cb: 218 await aio.call(self._status_cb, self, self._status) 219 220 async def _process_msg_events_notify(self, msg, msg_data): 221 events = [common.event_from_sbs(event) for event in msg_data] 222 223 if self._events_cb: 224 await aio.call(self._events_cb, self, events) 225 226 if msg.last: 227 return 228 229 await common.send_msg(conn=self._conn, 230 msg_type='HatEventer.MsgEventsAck', 231 msg_data=None, 232 conv=msg.conv) 233 234 async def _process_msg_register_res(self, msg, msg_data): 235 if msg_data[0] == 'events': 236 result = [common.event_from_sbs(event) for event in msg_data[1]] 237 238 elif msg_data[0] == 'failure': 239 result = None 240 241 else: 242 raise ValueError('unsupported register response') 243 244 future = self._conv_futures.get(msg.conv) 245 if not future or future.done(): 246 return 247 248 future.set_result(result) 249 250 async def _process_msg_query_res(self, msg, msg_data): 251 result = common.query_result_from_sbs(msg_data) 252 253 future = self._conv_futures.get(msg.conv) 254 if not future or future.done(): 255 return 256 257 future.set_result(result)
Eventer client
For creating new client see connect
coroutine.
120 @property 121 def async_group(self) -> aio.Group: 122 """Async group""" 123 return self._conn.async_group
Async group
130 async def register(self, 131 events: Collection[common.RegisterEvent], 132 with_response: bool = False 133 ) -> Collection[common.Event] | None: 134 """Register events and optionally wait for response 135 136 If `with_response` is ``True``, this coroutine returns list of events 137 or ``None`` if registration failure occurred. 138 139 """ 140 msg_data = [common.register_event_to_sbs(i) for i in events] 141 conv = await common.send_msg(conn=self._conn, 142 msg_type='HatEventer.MsgRegisterReq', 143 msg_data=msg_data, 144 last=not with_response) 145 146 if with_response: 147 return await self._wait_conv_res(conv)
Register events and optionally wait for response
If with_response
is True
, this coroutine returns list of events
or None
if registration failure occurred.
149 async def query(self, 150 params: common.QueryParams 151 ) -> common.QueryResult: 152 """Query events from server""" 153 msg_data = common.query_params_to_sbs(params) 154 conv = await common.send_msg(conn=self._conn, 155 msg_type='HatEventer.MsgQueryReq', 156 msg_data=msg_data, 157 last=False) 158 159 return await self._wait_conv_res(conv)
Query events from server
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close
24class ConnectionInfo(typing.NamedTuple): 25 id: ConnectionId 26 client_name: str 27 client_token: str | None 28 subscription: common.Subscription 29 server_id: int | None 30 persisted: bool
ConnectionInfo(id, client_name, client_token, subscription, server_id, persisted)
Create new instance of ConnectionInfo(id, client_name, client_token, subscription, server_id, persisted)
Inherited Members
- builtins.tuple
- index
- count
47async def listen(addr: tcp.Address, 48 *, 49 status: common.Status = common.Status.STANDBY, 50 connected_cb: ConnectionCb | None = None, 51 disconnected_cb: ConnectionCb | None = None, 52 register_cb: RegisterCb | None = None, 53 query_cb: QueryCb | None = None, 54 close_timeout: float = 0.5, 55 **kwargs 56 ) -> 'Server': 57 """Create listening Eventer Server instance""" 58 server = Server() 59 server._status = status 60 server._connected_cb = connected_cb 61 server._disconnected_cb = disconnected_cb 62 server._register_cb = register_cb 63 server._query_cb = query_cb 64 server._close_timeout = close_timeout 65 server._loop = asyncio.get_running_loop() 66 server._next_conn_ids = itertools.count(1) 67 server._conn_infos = {} 68 server._conn_conv_futures = {} 69 70 server._srv = await chatter.listen(server._connection_loop, addr, **kwargs) 71 mlog.debug("listening on %s", addr) 72 73 return server
Create listening Eventer Server instance
76class Server(aio.Resource): 77 78 @property 79 def async_group(self) -> aio.Group: 80 """Async group""" 81 return self._srv.async_group 82 83 def get_conn_infos(self) -> list[ConnectionInfo]: 84 """Get connection infos""" 85 return list(self._conn_infos.values()) 86 87 async def set_status(self, status: common.Status): 88 """Set status""" 89 if self._status == status: 90 return 91 92 self._status = status 93 94 for conn in list(self._conn_infos.keys()): 95 await self._notify_status(conn) 96 97 async def notify_events(self, 98 events: Collection[common.Event], 99 persisted: bool, 100 with_ack: bool = False): 101 """Notify events to clients""" 102 conn_conn_events = collections.deque() 103 104 for conn, info in self._conn_infos.items(): 105 if info.persisted != persisted: 106 continue 107 108 conn_events = collections.deque( 109 event for event in events 110 if (info.subscription.matches(event.type) and 111 (info.server_id is None or 112 info.server_id == event.id.server))) 113 if not conn_events: 114 continue 115 116 conn_conn_events.append((conn, conn_events)) 117 118 if not conn_conn_events: 119 return 120 121 if with_ack: 122 await asyncio.wait([ 123 self.async_group.spawn(self._notify_events, conn, conn_events, 124 True) 125 for conn, conn_events in conn_conn_events]) 126 127 else: 128 for conn, conn_events in conn_conn_events: 129 await self._notify_events(conn, conn_events, False) 130 131 async def _connection_loop(self, conn): 132 mlog.debug("starting connection loop") 133 conn_id = next(self._next_conn_ids) 134 info = None 135 136 try: 137 req, req_type, req_data = await common.receive_msg(conn) 138 if req_type != 'HatEventer.MsgInitReq': 139 raise Exception('invalid init request type') 140 141 try: 142 info = ConnectionInfo( 143 id=conn_id, 144 client_name=req_data['clientName'], 145 client_token=_optional_from_sbs(req_data['clientToken']), 146 subscription=common.create_subscription( 147 tuple(i) for i in req_data['subscriptions']), 148 server_id=_optional_from_sbs(req_data['serverId']), 149 persisted=req_data['persisted']) 150 151 if self._connected_cb: 152 await aio.call(self._connected_cb, info) 153 154 res_data = 'success', common.status_to_sbs(self._status) 155 self._conn_infos[conn] = info 156 157 except Exception as e: 158 info = None 159 res_data = 'error', str(e) 160 161 mlog.debug("sending init response %s", res_data[0]) 162 await common.send_msg(conn, 'HatEventer.MsgInitRes', res_data, 163 conv=req.conv) 164 165 if res_data[0] != 'success': 166 with contextlib.suppress(asyncio.TimeoutError): 167 await aio.wait_for(conn.wait_closing(), 168 self._close_timeout) 169 return 170 171 while True: 172 mlog.debug("waiting for incomming messages") 173 msg, msg_type, msg_data = await common.receive_msg(conn) 174 175 if msg_type == 'HatEventer.MsgEventsAck': 176 mlog.debug("received events ack") 177 future = self._conn_conv_futures.get((conn, msg.conv)) 178 if future and not future.done(): 179 future.set_result(None) 180 181 elif msg_type == 'HatEventer.MsgRegisterReq': 182 mlog.debug("received register request") 183 await self._process_msg_register(conn, info, msg, msg_data) 184 185 elif msg_type == 'HatEventer.MsgQueryReq': 186 mlog.debug("received query request") 187 await self._process_msg_query(conn, info, msg, msg_data) 188 189 else: 190 raise Exception('unsupported message type') 191 192 except ConnectionError: 193 pass 194 195 except Exception as e: 196 mlog.error("on connection error: %s", e, exc_info=e) 197 198 finally: 199 mlog.debug("stopping connection loop") 200 conn.close() 201 self._conn_infos.pop(conn, None) 202 203 for future in self._conn_conv_futures.values(): 204 if not future.done(): 205 future.set_exception(ConnectionError()) 206 207 if self._disconnected_cb and info: 208 with contextlib.suppress(Exception): 209 await aio.call(self._disconnected_cb, info) 210 211 async def _process_msg_register(self, conn, info, req, req_data): 212 register_events = [common.register_event_from_sbs(i) 213 for i in req_data] 214 215 if self._register_cb: 216 events = await aio.call(self._register_cb, info, register_events) 217 218 else: 219 events = None 220 221 if req.last: 222 return 223 224 if events is not None: 225 res_data = 'events', [common.event_to_sbs(event) 226 for event in events] 227 228 else: 229 res_data = 'failure', None 230 231 await common.send_msg(conn, 'HatEventer.MsgRegisterRes', res_data, 232 conv=req.conv) 233 234 async def _process_msg_query(self, conn, info, req, req_data): 235 params = common.query_params_from_sbs(req_data) 236 237 if self._query_cb: 238 result = await aio.call(self._query_cb, info, params) 239 240 else: 241 result = common.QueryResult(events=[], 242 more_follows=False) 243 244 res_data = common.query_result_to_sbs(result) 245 await common.send_msg(conn, 'HatEventer.MsgQueryRes', res_data, 246 conv=req.conv) 247 248 async def _notify_status(self, conn): 249 try: 250 msg_data = common.status_to_sbs(self._status) 251 await common.send_msg(conn, 'HatEventer.MsgStatusNotify', msg_data) 252 253 except ConnectionError: 254 pass 255 256 except Exception as e: 257 mlog.error("notify status error: %s", e, exc_info=e) 258 259 async def _notify_events(self, conn, events, with_ack): 260 try: 261 msg_data = [common.event_to_sbs(event) for event in events] 262 conv = await common.send_msg(conn, 263 'HatEventer.MsgEventsNotify', 264 msg_data, 265 last=not with_ack) 266 267 if not with_ack: 268 return 269 270 future = self._loop.create_future() 271 self._conn_conv_futures[(conn, conv)] = future 272 273 try: 274 await future 275 276 finally: 277 self._conn_conv_futures.pop((conn, conv)) 278 279 except ConnectionError: 280 pass 281 282 except Exception as e: 283 mlog.error("notify events error: %s", e, exc_info=e)
Resource with lifetime control based on Group
.
78 @property 79 def async_group(self) -> aio.Group: 80 """Async group""" 81 return self._srv.async_group
Async group
83 def get_conn_infos(self) -> list[ConnectionInfo]: 84 """Get connection infos""" 85 return list(self._conn_infos.values())
Get connection infos
87 async def set_status(self, status: common.Status): 88 """Set status""" 89 if self._status == status: 90 return 91 92 self._status = status 93 94 for conn in list(self._conn_infos.keys()): 95 await self._notify_status(conn)
Set status
97 async def notify_events(self, 98 events: Collection[common.Event], 99 persisted: bool, 100 with_ack: bool = False): 101 """Notify events to clients""" 102 conn_conn_events = collections.deque() 103 104 for conn, info in self._conn_infos.items(): 105 if info.persisted != persisted: 106 continue 107 108 conn_events = collections.deque( 109 event for event in events 110 if (info.subscription.matches(event.type) and 111 (info.server_id is None or 112 info.server_id == event.id.server))) 113 if not conn_events: 114 continue 115 116 conn_conn_events.append((conn, conn_events)) 117 118 if not conn_conn_events: 119 return 120 121 if with_ack: 122 await asyncio.wait([ 123 self.async_group.spawn(self._notify_events, conn, conn_events, 124 True) 125 for conn, conn_events in conn_conn_events]) 126 127 else: 128 for conn, conn_events in conn_conn_events: 129 await self._notify_events(conn, conn_events, False)
Notify events to clients
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close