hat.event.eventer
Eventer communication protocol
1"""Eventer communication protocol""" 2 3from hat.event.eventer.client import (StatusCb, 4 EventsCb, 5 EventerInitError, 6 connect, 7 Client) 8from hat.event.eventer.server import (ConnectionId, 9 ConnectionInfo, 10 ConnectionCb, 11 RegisterCb, 12 QueryCb, 13 listen, 14 Server) 15 16 17__all__ = ['StatusCb', 18 'EventsCb', 19 'EventerInitError', 20 'connect', 21 'Client', 22 'ConnectionId', 23 'ConnectionInfo', 24 'ConnectionCb', 25 'RegisterCb', 26 'QueryCb', 27 'listen', 28 'Server']
Eventer initialization error
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
33async def connect(addr: tcp.Address, 34 client_name: str, 35 *, 36 client_token: str | None = None, 37 subscriptions: Iterable[common.EventType] = [], 38 server_id: common.ServerId | None = None, 39 persisted: bool = False, 40 status_cb: StatusCb | None = None, 41 events_cb: EventsCb | None = None, 42 **kwargs 43 ) -> 'Client': 44 """Connect to Eventer Server 45 46 Arguments `client_name` and optional `client_token` identifies eventer 47 client. 48 49 According to Event Server specification, each subscription is event 50 type identifier which can contain special subtypes ``?`` and ``*``. 51 Subtype ``?`` can occur at any position inside event type identifier 52 and is used as replacement for any single subtype. Subtype ``*`` is valid 53 only as last subtype in event type identifier and is used as replacement 54 for zero or more arbitrary subtypes. 55 56 If `subscriptions` is empty list, client doesn't subscribe for any events 57 and will not receive server's notifications. 58 59 If `server_id` is ``None``, client will receive all event notifications, 60 in accordance to `subscriptions`, regardless of event's server id. If 61 `server_id` is set, Eventer Server will only send events notifications 62 for events with provided server id. 63 64 If `persisted` is set to ``True``, Eventer Server will notify events 65 after they are persisted (flushed to disk). Otherwise, events are 66 notified immediately after registration. 67 68 Additional arguments are passed to `hat.chatter.connect` coroutine. 69 70 """ 71 client = Client() 72 client._status_cb = status_cb 73 client._events_cb = events_cb 74 client._loop = asyncio.get_running_loop() 75 client._conv_futures = {} 76 client._status = common.Status.STANDBY 77 78 client._conn = await chatter.connect(addr, **kwargs) 79 80 try: 81 req_data = {'clientName': client_name, 82 'clientToken': _optional_to_sbs(client_token), 83 'subscriptions': [list(i) for i in subscriptions], 84 'serverId': _optional_to_sbs(server_id), 85 'persisted': persisted} 86 conv = await common.send_msg(conn=client._conn, 87 msg_type='HatEventer.MsgInitReq', 88 msg_data=req_data, 89 last=False) 90 91 res, res_type, res_data = await common.receive_msg(client._conn) 92 if res_type != 'HatEventer.MsgInitRes' or res.conv != conv: 93 raise Exception('invalid init response') 94 95 if res_data[0] == 'success': 96 client._status = common.Status(common.status_from_sbs(res_data[1])) 97 98 elif res_data[0] == 'error': 99 raise EventerInitError(res_data[1]) 100 101 else: 102 raise ValueError('unsupported init response') 103 104 client.async_group.spawn(client._receive_loop) 105 106 except BaseException: 107 await aio.uncancellable(client.async_close()) 108 raise 109 110 return client
Connect to Eventer Server
Arguments client_name
and optional client_token
identifies eventer
client.
According to Event Server specification, each subscription is event
type identifier which can contain special subtypes ?
and *
.
Subtype ?
can occur at any position inside event type identifier
and is used as replacement for any single subtype. Subtype *
is valid
only as last subtype in event type identifier and is used as replacement
for zero or more arbitrary subtypes.
If subscriptions
is empty list, client doesn't subscribe for any events
and will not receive server's notifications.
If server_id
is None
, client will receive all event notifications,
in accordance to subscriptions
, regardless of event's server id. If
server_id
is set, Eventer Server will only send events notifications
for events with provided server id.
If persisted
is set to True
, Eventer Server will notify events
after they are persisted (flushed to disk). Otherwise, events are
notified immediately after registration.
Additional arguments are passed to hat.chatter.connect
coroutine.
113class Client(aio.Resource): 114 """Eventer client 115 116 For creating new client see `connect` coroutine. 117 118 """ 119 120 @property 121 def async_group(self) -> aio.Group: 122 """Async group""" 123 return self._conn.async_group 124 125 @property 126 def status(self) -> common.Status: 127 """Status""" 128 return self._status 129 130 async def register(self, 131 events: Collection[common.RegisterEvent], 132 with_response: bool = False 133 ) -> Collection[common.Event] | None: 134 """Register events and optionally wait for response 135 136 If `with_response` is ``True``, this coroutine returns list of events 137 or ``None`` if registration failure occurred. 138 139 """ 140 msg_data = [common.register_event_to_sbs(i) for i in events] 141 conv = await common.send_msg(conn=self._conn, 142 msg_type='HatEventer.MsgRegisterReq', 143 msg_data=msg_data, 144 last=not with_response) 145 146 if with_response: 147 return await self._wait_conv_res(conv) 148 149 async def query(self, 150 params: common.QueryParams 151 ) -> common.QueryResult: 152 """Query events from server""" 153 msg_data = common.query_params_to_sbs(params) 154 conv = await common.send_msg(conn=self._conn, 155 msg_type='HatEventer.MsgQueryReq', 156 msg_data=msg_data, 157 last=False) 158 159 return await self._wait_conv_res(conv) 160 161 async def _receive_loop(self): 162 mlog.debug("starting receive loop") 163 try: 164 while True: 165 mlog.debug("waiting for incoming message") 166 msg, msg_type, msg_data = await common.receive_msg(self._conn) 167 168 if msg_type == 'HatEventer.MsgStatusNotify': 169 mlog.debug("received status notification") 170 await self._process_msg_status_notify(msg, msg_data) 171 172 elif msg_type == 'HatEventer.MsgEventsNotify': 173 mlog.debug("received events notification") 174 await self._process_msg_events_notify(msg, msg_data) 175 176 elif msg_type == 'HatEventer.MsgRegisterRes': 177 mlog.debug("received register response") 178 await self._process_msg_register_res(msg, msg_data) 179 180 elif msg_type == 'HatEventer.MsgQueryRes': 181 mlog.debug("received query response") 182 await self._process_msg_query_res(msg, msg_data) 183 184 else: 185 raise Exception("unsupported message type") 186 187 except ConnectionError: 188 pass 189 190 except Exception as e: 191 mlog.error("read loop error: %s", e, exc_info=e) 192 193 finally: 194 mlog.debug("stopping receive loop") 195 self.close() 196 197 for future in self._conv_futures.values(): 198 if not future.done(): 199 future.set_exception(ConnectionError()) 200 201 async def _wait_conv_res(self, conv): 202 if not self.is_open: 203 raise ConnectionError() 204 205 future = self._loop.create_future() 206 self._conv_futures[conv] = future 207 208 try: 209 return await future 210 211 finally: 212 self._conv_futures.pop(conv, None) 213 214 async def _process_msg_status_notify(self, msg, msg_data): 215 self._status = common.status_from_sbs(msg_data) 216 217 if self._status_cb: 218 await aio.call(self._status_cb, self, self._status) 219 220 await common.send_msg(conn=self._conn, 221 msg_type='HatEventer.MsgStatusAck', 222 msg_data=None, 223 conv=msg.conv) 224 225 async def _process_msg_events_notify(self, msg, msg_data): 226 events = [common.event_from_sbs(event) for event in msg_data] 227 228 if self._events_cb: 229 await aio.call(self._events_cb, self, events) 230 231 async def _process_msg_register_res(self, msg, msg_data): 232 if msg_data[0] == 'events': 233 result = [common.event_from_sbs(event) for event in msg_data[1]] 234 235 elif msg_data[0] == 'failure': 236 result = None 237 238 else: 239 raise ValueError('unsupported register response') 240 241 future = self._conv_futures.get(msg.conv) 242 if not future or future.done(): 243 return 244 245 future.set_result(result) 246 247 async def _process_msg_query_res(self, msg, msg_data): 248 result = common.query_result_from_sbs(msg_data) 249 250 future = self._conv_futures.get(msg.conv) 251 if not future or future.done(): 252 return 253 254 future.set_result(result)
Eventer client
For creating new client see connect
coroutine.
120 @property 121 def async_group(self) -> aio.Group: 122 """Async group""" 123 return self._conn.async_group
Async group
130 async def register(self, 131 events: Collection[common.RegisterEvent], 132 with_response: bool = False 133 ) -> Collection[common.Event] | None: 134 """Register events and optionally wait for response 135 136 If `with_response` is ``True``, this coroutine returns list of events 137 or ``None`` if registration failure occurred. 138 139 """ 140 msg_data = [common.register_event_to_sbs(i) for i in events] 141 conv = await common.send_msg(conn=self._conn, 142 msg_type='HatEventer.MsgRegisterReq', 143 msg_data=msg_data, 144 last=not with_response) 145 146 if with_response: 147 return await self._wait_conv_res(conv)
Register events and optionally wait for response
If with_response
is True
, this coroutine returns list of events
or None
if registration failure occurred.
149 async def query(self, 150 params: common.QueryParams 151 ) -> common.QueryResult: 152 """Query events from server""" 153 msg_data = common.query_params_to_sbs(params) 154 conv = await common.send_msg(conn=self._conn, 155 msg_type='HatEventer.MsgQueryReq', 156 msg_data=msg_data, 157 last=False) 158 159 return await self._wait_conv_res(conv)
Query events from server
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close
24class ConnectionInfo(typing.NamedTuple): 25 id: ConnectionId 26 client_name: str 27 client_token: str | None 28 subscription: common.Subscription 29 server_id: int | None 30 persisted: bool
ConnectionInfo(id, client_name, client_token, subscription, server_id, persisted)
Create new instance of ConnectionInfo(id, client_name, client_token, subscription, server_id, persisted)
Inherited Members
- builtins.tuple
- index
- count
47async def listen(addr: tcp.Address, 48 *, 49 status: common.Status = common.Status.STANDBY, 50 connected_cb: ConnectionCb | None = None, 51 disconnected_cb: ConnectionCb | None = None, 52 register_cb: RegisterCb | None = None, 53 query_cb: QueryCb | None = None, 54 close_timeout: float = 0.5, 55 **kwargs 56 ) -> 'Server': 57 """Create listening Eventer Server instance""" 58 server = Server() 59 server._status = status 60 server._connected_cb = connected_cb 61 server._disconnected_cb = disconnected_cb 62 server._register_cb = register_cb 63 server._query_cb = query_cb 64 server._close_timeout = close_timeout 65 server._loop = asyncio.get_running_loop() 66 server._next_conn_ids = itertools.count(1) 67 server._conn_infos = {} 68 server._conn_conv_futures = {} 69 70 server._srv = await chatter.listen(server._connection_loop, addr, **kwargs) 71 mlog.debug("listening on %s", addr) 72 73 return server
Create listening Eventer Server instance
76class Server(aio.Resource): 77 78 @property 79 def async_group(self) -> aio.Group: 80 """Async group""" 81 return self._srv.async_group 82 83 async def set_status(self, status: common.Status): 84 """Set status and wait for acks""" 85 if self._status == status: 86 return 87 88 self._status = status 89 tasks = [self.async_group.spawn(self._notify_status, conn, status) 90 for conn in self._conn_infos.keys()] 91 if not tasks: 92 return 93 94 await asyncio.wait(tasks) 95 96 async def notify_events(self, 97 events: Collection[common.Event], 98 persisted: bool): 99 """Notify events to clients""" 100 for conn, info in list(self._conn_infos.items()): 101 if info.persisted != persisted: 102 continue 103 104 filtered_events = collections.deque( 105 event for event in events 106 if (info.subscription.matches(event.type) and 107 (info.server_id is None or 108 info.server_id == event.id.server))) 109 if not filtered_events: 110 continue 111 112 await self._notify_events(conn, filtered_events) 113 114 async def _connection_loop(self, conn): 115 mlog.debug("starting connection loop") 116 conn_id = next(self._next_conn_ids) 117 info = None 118 119 try: 120 req, req_type, req_data = await common.receive_msg(conn) 121 if req_type != 'HatEventer.MsgInitReq': 122 raise Exception('invalid init request type') 123 124 try: 125 info = ConnectionInfo( 126 id=conn_id, 127 client_name=req_data['clientName'], 128 client_token=_optional_from_sbs(req_data['clientToken']), 129 subscription=common.create_subscription( 130 tuple(i) for i in req_data['subscriptions']), 131 server_id=_optional_from_sbs(req_data['serverId']), 132 persisted=req_data['persisted']) 133 134 if self._connected_cb: 135 await aio.call(self._connected_cb, info) 136 137 res_data = 'success', common.status_to_sbs(self._status) 138 self._conn_infos[conn] = info 139 140 except Exception as e: 141 info = None 142 res_data = 'error', str(e) 143 144 mlog.debug("sending init response %s", res_data[0]) 145 await common.send_msg(conn, 'HatEventer.MsgInitRes', res_data, 146 conv=req.conv) 147 148 if res_data[0] != 'success': 149 with contextlib.suppress(asyncio.TimeoutError): 150 await aio.wait_for(conn.wait_closing(), 151 self._close_timeout) 152 return 153 154 while True: 155 mlog.debug("waiting for incomming messages") 156 msg, msg_type, msg_data = await common.receive_msg(conn) 157 158 if msg_type == 'HatEventer.MsgStatusAck': 159 mlog.debug("received status ack") 160 future = self._conn_conv_futures.get((conn, msg.conv)) 161 if future and not future.done(): 162 future.set_result(None) 163 164 elif msg_type == 'HatEventer.MsgRegisterReq': 165 mlog.debug("received register request") 166 await self._process_msg_register(conn, info, msg, msg_data) 167 168 elif msg_type == 'HatEventer.MsgQueryReq': 169 mlog.debug("received query request") 170 await self._process_msg_query(conn, info, msg, msg_data) 171 172 else: 173 raise Exception('unsupported message type') 174 175 except ConnectionError: 176 pass 177 178 except Exception as e: 179 mlog.error("on connection error: %s", e, exc_info=e) 180 181 finally: 182 mlog.debug("stopping connection loop") 183 conn.close() 184 self._conn_infos.pop(conn, None) 185 186 for future in self._conn_conv_futures.values(): 187 if not future.done(): 188 future.set_exception(ConnectionError()) 189 190 if self._disconnected_cb and info: 191 with contextlib.suppress(Exception): 192 await aio.call(self._disconnected_cb, info) 193 194 async def _process_msg_register(self, conn, info, req, req_data): 195 register_events = [common.register_event_from_sbs(i) 196 for i in req_data] 197 198 if self._register_cb: 199 events = await aio.call(self._register_cb, info, register_events) 200 201 else: 202 events = None 203 204 if req.last: 205 return 206 207 if events is not None: 208 res_data = 'events', [common.event_to_sbs(event) 209 for event in events] 210 211 else: 212 res_data = 'failure', None 213 214 await common.send_msg(conn, 'HatEventer.MsgRegisterRes', res_data, 215 conv=req.conv) 216 217 async def _process_msg_query(self, conn, info, req, req_data): 218 params = common.query_params_from_sbs(req_data) 219 220 if self._query_cb: 221 result = await aio.call(self._query_cb, info, params) 222 223 else: 224 result = common.QueryResult(events=[], 225 more_follows=False) 226 227 res_data = common.query_result_to_sbs(result) 228 await common.send_msg(conn, 'HatEventer.MsgQueryRes', res_data, 229 conv=req.conv) 230 231 async def _notify_status(self, conn, status): 232 try: 233 req_data = common.status_to_sbs(self._status) 234 conv = await common.send_msg(conn, 'HatEventer.MsgStatusNotify', 235 req_data) 236 237 future = self._loop.create_future() 238 self._conn_conv_futures[(conn, conv)] = future 239 240 try: 241 await future 242 243 finally: 244 self._conn_conv_futures.pop((conn, conv)) 245 246 except ConnectionError: 247 pass 248 249 except Exception as e: 250 mlog.error("notify status error: %s", e, exc_info=e) 251 252 async def _notify_events(self, conn, events): 253 try: 254 msg_data = [common.event_to_sbs(event) for event in events] 255 await common.send_msg(conn, 'HatEventer.MsgEventsNotify', msg_data) 256 257 except ConnectionError: 258 pass 259 260 except Exception as e: 261 mlog.error("notify events error: %s", e, exc_info=e)
Resource with lifetime control based on Group
.
78 @property 79 def async_group(self) -> aio.Group: 80 """Async group""" 81 return self._srv.async_group
Async group
83 async def set_status(self, status: common.Status): 84 """Set status and wait for acks""" 85 if self._status == status: 86 return 87 88 self._status = status 89 tasks = [self.async_group.spawn(self._notify_status, conn, status) 90 for conn in self._conn_infos.keys()] 91 if not tasks: 92 return 93 94 await asyncio.wait(tasks)
Set status and wait for acks
96 async def notify_events(self, 97 events: Collection[common.Event], 98 persisted: bool): 99 """Notify events to clients""" 100 for conn, info in list(self._conn_infos.items()): 101 if info.persisted != persisted: 102 continue 103 104 filtered_events = collections.deque( 105 event for event in events 106 if (info.subscription.matches(event.type) and 107 (info.server_id is None or 108 info.server_id == event.id.server))) 109 if not filtered_events: 110 continue 111 112 await self._notify_events(conn, filtered_events)
Notify events to clients
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close