hat.event.eventer
Eventer communication protocol
This package provides Eventer Client as:
connect
is used for establishing single eventer connection
with Eventer Server which is represented by Client
. Once
connection is terminated (signaled with Client.closed
),
it is up to user to repeat connect
call and create new Client
instance, if additional communication with Event Server is required.
Example of low-level interface usage::
client = await hat.event.eventer.connect(
'tcp+sbs://127.0.0.1:23012',
[['x', 'y', 'z']])
registered_events = await client.register_with_response([
hat.event.common.RegisterEvent(
event_type=['x', 'y', 'z'],
source_timestamp=hat.event.common.now(),
payload=hat.event.common.EventPayload(
type=hat.event.common.EventPayloadType.BINARY,
data=b'test'))])
received_events = await client.receive()
queried_events = await client.query(
hat.event.common.QueryData(
event_types=[['x', 'y', 'z']],
max_results=1))
assert registered_events == received_events
assert received_events == queried_events
await client.async_close()
Component
provides high-level interface for continuous communication with
currenty active Event Server based on information obtained from Monitor Server.
This implementation repeatedly tries to create active connection
with Eventer Server. When this connection is created, users code is notified by
calling component_cb
callback. Once connection is closed, user defined
resource, resulting from component_cb
, is cancelled and Component
repeats
connection estabishment process.
1"""Eventer communication protocol 2 3This package provides Eventer Client as: 4 * low-level interface - `Client` 5 * high-level interface - `Component` 6 7`connect` is used for establishing single eventer connection 8with Eventer Server which is represented by `Client`. Once 9connection is terminated (signaled with `Client.closed`), 10it is up to user to repeat `connect` call and create new `Client` 11instance, if additional communication with Event Server is required. 12 13Example of low-level interface usage:: 14 15 client = await hat.event.eventer.connect( 16 'tcp+sbs://127.0.0.1:23012', 17 [['x', 'y', 'z']]) 18 19 registered_events = await client.register_with_response([ 20 hat.event.common.RegisterEvent( 21 event_type=['x', 'y', 'z'], 22 source_timestamp=hat.event.common.now(), 23 payload=hat.event.common.EventPayload( 24 type=hat.event.common.EventPayloadType.BINARY, 25 data=b'test'))]) 26 27 received_events = await client.receive() 28 29 queried_events = await client.query( 30 hat.event.common.QueryData( 31 event_types=[['x', 'y', 'z']], 32 max_results=1)) 33 34 assert registered_events == received_events 35 assert received_events == queried_events 36 37 await client.async_close() 38 39`Component` provides high-level interface for continuous communication with 40currenty active Event Server based on information obtained from Monitor Server. 41This implementation repeatedly tries to create active connection 42with Eventer Server. When this connection is created, users code is notified by 43calling `component_cb` callback. Once connection is closed, user defined 44resource, resulting from `component_cb`, is cancelled and `Component` repeats 45connection estabishment process. 46 47""" 48 49from hat.event.eventer.client import (connect, 50 Client, 51 Runner, 52 ComponentCb, 53 Component) 54from hat.event.eventer.server import (ClientId, 55 ClientCb, 56 RegisterCb, 57 QueryCb, 58 listen, 59 Server) 60 61 62__all__ = ['connect', 63 'Client', 64 'Runner', 65 'ComponentCb', 66 'Component', 67 'ClientId', 68 'ClientCb', 69 'RegisterCb', 70 'QueryCb', 71 'listen', 72 'Server']
18async def connect(address: str, 19 subscriptions: list[common.EventType] = [], 20 **kwargs 21 ) -> 'Client': 22 """Connect to eventer server 23 24 For address format see `hat.chatter.connect` coroutine. 25 26 According to Event Server specification, each subscription is event 27 type identifier which can contain special subtypes ``?`` and ``*``. 28 Subtype ``?`` can occure at any position inside event type identifier 29 and is used as replacement for any single subtype. Subtype ``*`` is valid 30 only as last subtype in event type identifier and is used as replacement 31 for zero or more arbitrary subtypes. 32 33 If subscription is empty list, client doesn't subscribe for any events and 34 will not receive server's notifications. 35 36 Args: 37 address: event server's address 38 subscriptions: subscriptions 39 kwargs: additional arguments passed to `hat.chatter.connect` coroutine 40 41 """ 42 client = Client() 43 client._conv_futures = {} 44 client._event_queue = aio.Queue() 45 46 client._conn = await chatter.connect(common.sbs_repo, address, **kwargs) 47 48 if subscriptions: 49 client._conn.send(chatter.Data(module='HatEventer', 50 type='MsgSubscribe', 51 data=[list(i) for i in subscriptions])) 52 53 client.async_group.spawn(client._receive_loop) 54 return client
Connect to eventer server
For address format see hat.chatter.connect
coroutine.
According to Event Server specification, each subscription is event
type identifier which can contain special subtypes ?
and *
.
Subtype ?
can occure at any position inside event type identifier
and is used as replacement for any single subtype. Subtype *
is valid
only as last subtype in event type identifier and is used as replacement
for zero or more arbitrary subtypes.
If subscription is empty list, client doesn't subscribe for any events and will not receive server's notifications.
Arguments:
- address: event server's address
- subscriptions: subscriptions
- kwargs: additional arguments passed to
hat.chatter.connect
coroutine
57class Client(aio.Resource): 58 """Eventer client 59 60 For creating new client see `connect` coroutine. 61 62 """ 63 64 @property 65 def async_group(self) -> aio.Group: 66 """Async group""" 67 return self._conn.async_group 68 69 async def receive(self) -> list[common.Event]: 70 """Receive subscribed event notifications 71 72 Raises: 73 ConnectionError 74 75 """ 76 try: 77 return await self._event_queue.get() 78 79 except aio.QueueClosedError: 80 raise ConnectionError() 81 82 def register(self, events: list[common.RegisterEvent]): 83 """Register events 84 85 Raises: 86 ConnectionError 87 88 """ 89 msg_data = chatter.Data(module='HatEventer', 90 type='MsgRegisterReq', 91 data=[common.register_event_to_sbs(i) 92 for i in events]) 93 self._conn.send(msg_data) 94 95 async def register_with_response(self, 96 events: list[common.RegisterEvent] 97 ) -> list[common.Event | None]: 98 """Register events 99 100 Each `common.RegisterEvent` from `events` is paired with results 101 `common.Event` if new event was successfuly created or ``None`` is new 102 event could not be created. 103 104 Raises: 105 ConnectionError 106 107 """ 108 msg_data = chatter.Data(module='HatEventer', 109 type='MsgRegisterReq', 110 data=[common.register_event_to_sbs(i) 111 for i in events]) 112 conv = self._conn.send(msg_data, last=False) 113 return await self._wait_conv_res(conv) 114 115 async def query(self, 116 data: common.QueryData 117 ) -> list[common.Event]: 118 """Query events from server 119 120 Raises: 121 ConnectionError 122 123 """ 124 msg_data = chatter.Data(module='HatEventer', 125 type='MsgQueryReq', 126 data=common.query_to_sbs(data)) 127 conv = self._conn.send(msg_data, last=False) 128 return await self._wait_conv_res(conv) 129 130 async def _receive_loop(self): 131 mlog.debug("starting receive loop") 132 try: 133 while True: 134 mlog.debug("waiting for incoming message") 135 msg = await self._conn.receive() 136 msg_type = msg.data.module, msg.data.type 137 138 if msg_type == ('HatEventer', 'MsgNotify'): 139 mlog.debug("received event notification") 140 self._process_msg_notify(msg) 141 142 elif msg_type == ('HatEventer', 'MsgQueryRes'): 143 mlog.debug("received query response") 144 self._process_msg_query_res(msg) 145 146 elif msg_type == ('HatEventer', 'MsgRegisterRes'): 147 mlog.debug("received register response") 148 self._process_msg_register_res(msg) 149 150 else: 151 raise Exception("unsupported message type") 152 153 except ConnectionError: 154 pass 155 156 except Exception as e: 157 mlog.error("read loop error: %s", e, exc_info=e) 158 159 finally: 160 mlog.debug("stopping receive loop") 161 self.close() 162 self._event_queue.close() 163 for f in self._conv_futures.values(): 164 if not f.done(): 165 f.set_exception(ConnectionError()) 166 167 async def _wait_conv_res(self, conv): 168 if not self.is_open: 169 raise ConnectionError() 170 171 response_future = asyncio.Future() 172 self._conv_futures[conv] = response_future 173 try: 174 return await response_future 175 finally: 176 self._conv_futures.pop(conv, None) 177 178 def _process_msg_notify(self, msg): 179 events = [common.event_from_sbs(e) for e in msg.data.data] 180 self._event_queue.put_nowait(events) 181 182 def _process_msg_query_res(self, msg): 183 f = self._conv_futures.get(msg.conv) 184 if not f or f.done(): 185 return 186 events = [common.event_from_sbs(e) for e in msg.data.data] 187 f.set_result(events) 188 189 def _process_msg_register_res(self, msg): 190 f = self._conv_futures.get(msg.conv) 191 if not f or f.done(): 192 return 193 events = [common.event_from_sbs(e) if t == 'event' else None 194 for t, e in msg.data.data] 195 f.set_result(events)
Eventer client
For creating new client see connect
coroutine.
69 async def receive(self) -> list[common.Event]: 70 """Receive subscribed event notifications 71 72 Raises: 73 ConnectionError 74 75 """ 76 try: 77 return await self._event_queue.get() 78 79 except aio.QueueClosedError: 80 raise ConnectionError()
Receive subscribed event notifications
Raises:
- ConnectionError
82 def register(self, events: list[common.RegisterEvent]): 83 """Register events 84 85 Raises: 86 ConnectionError 87 88 """ 89 msg_data = chatter.Data(module='HatEventer', 90 type='MsgRegisterReq', 91 data=[common.register_event_to_sbs(i) 92 for i in events]) 93 self._conn.send(msg_data)
Register events
Raises:
- ConnectionError
95 async def register_with_response(self, 96 events: list[common.RegisterEvent] 97 ) -> list[common.Event | None]: 98 """Register events 99 100 Each `common.RegisterEvent` from `events` is paired with results 101 `common.Event` if new event was successfuly created or ``None`` is new 102 event could not be created. 103 104 Raises: 105 ConnectionError 106 107 """ 108 msg_data = chatter.Data(module='HatEventer', 109 type='MsgRegisterReq', 110 data=[common.register_event_to_sbs(i) 111 for i in events]) 112 conv = self._conn.send(msg_data, last=False) 113 return await self._wait_conv_res(conv)
Register events
Each common.RegisterEvent
from events
is paired with results
common.Event
if new event was successfuly created or None
is new
event could not be created.
Raises:
- ConnectionError
115 async def query(self, 116 data: common.QueryData 117 ) -> list[common.Event]: 118 """Query events from server 119 120 Raises: 121 ConnectionError 122 123 """ 124 msg_data = chatter.Data(module='HatEventer', 125 type='MsgQueryReq', 126 data=common.query_to_sbs(data)) 127 conv = self._conn.send(msg_data, last=False) 128 return await self._wait_conv_res(conv)
Query events from server
Raises:
- ConnectionError
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close
205class Component(aio.Resource): 206 """Eventer component 207 208 High-level interface for communication with Event Server, based on 209 information obtained from Monitor Server. 210 211 Instance of this class tries to establish active connection with 212 Event Server within monitor component group `server_group`. Once this 213 connection is established, `component_cb` is called with currently active 214 `Client` instance. Result of calling `component_cb` should be `Runner` 215 representing user defined components activity associated with connection 216 to active Event Server. Once connection to Event Server is closed or new 217 active Event Server is detected, associated `Runner` is closed. If new 218 connection to Event Server is successfully established, 219 `component_cb` will be called again to create new `Runner` associated with 220 new instance of `Client`. 221 222 `component_cb` is called when: 223 * new active `Client` is created 224 225 `Runner`, returned by `component_cb`, is closed when: 226 * `Component` is closed 227 * connection to Event Server is closed 228 * different active Event Server is detected from Monitor Server's list 229 of components 230 231 `Component` is closed when: 232 * connection to Monitor Server is closed 233 * `Runner`, returned by `component_cb`, is closed by causes other 234 than change of active Event Server 235 236 `reconnect_delay` defines delay in seconds before trying to reconnect to 237 Event Server. 238 239 """ 240 241 def __init__(self, 242 monitor_client: hat.monitor.client.Client, 243 server_group: str, 244 component_cb: ComponentCb, 245 subscriptions: list[common.EventType] = [], 246 reconnect_delay: float = 0.5): 247 self._monitor_client = monitor_client 248 self._server_group = server_group 249 self._component_cb = component_cb 250 self._subscriptions = subscriptions 251 self._reconnect_delay = reconnect_delay 252 self._async_group = aio.Group() 253 self._address_queue = aio.Queue() 254 255 self.async_group.spawn(self._monitor_loop) 256 self.async_group.spawn(self._address_loop) 257 258 self.async_group.spawn(aio.call_on_done, monitor_client.wait_closing(), 259 self.close) 260 261 @property 262 def async_group(self): 263 return self._async_group 264 265 async def _monitor_loop(self): 266 last_address = None 267 changes = aio.Queue() 268 269 def on_change(): 270 changes.put_nowait(None) 271 272 def info_filter(info): 273 return (info.group == self._server_group and 274 info.blessing_req.token is not None and 275 info.blessing_req.token == info.blessing_res.token) 276 277 try: 278 with self._monitor_client.register_change_cb(on_change): 279 while True: 280 info = util.first(self._monitor_client.components, 281 info_filter) 282 address = (info.data.get('eventer_server_address') 283 if info else None) 284 285 if address and address != last_address: 286 mlog.debug("new server address: %s", address) 287 last_address = address 288 self._address_queue.put_nowait(address) 289 290 await changes.get() 291 292 except Exception as e: 293 mlog.error("component monitor loop error: %s", e, exc_info=e) 294 295 finally: 296 self.close() 297 298 async def _address_loop(self): 299 try: 300 address = None 301 while True: 302 while not address: 303 address = await self._address_queue.get_until_empty() 304 305 async with self.async_group.create_subgroup() as subgroup: 306 address_future = subgroup.spawn( 307 self._address_queue.get_until_empty) 308 client_future = subgroup.spawn(self._client_loop, address) 309 310 await asyncio.wait([address_future, client_future], 311 return_when=asyncio.FIRST_COMPLETED) 312 313 if address_future.done(): 314 address = address_future.result() 315 316 elif client_future.done(): 317 break 318 319 except Exception as e: 320 mlog.error("component address loop error: %s", e, exc_info=e) 321 322 finally: 323 self.close() 324 325 async def _client_loop(self, address): 326 try: 327 while True: 328 try: 329 mlog.debug("connecting to server %s", address) 330 client = await connect(address, self._subscriptions) 331 332 except Exception as e: 333 mlog.warning("error connecting to server: %s", e, 334 exc_info=e) 335 await asyncio.sleep(self._reconnect_delay) 336 continue 337 338 try: 339 mlog.debug("connected to server") 340 async with self.async_group.create_subgroup() as subgroup: 341 client_future = subgroup.spawn(client.wait_closing) 342 runner_future = subgroup.spawn(self._runner_loop, 343 client) 344 345 await asyncio.wait([client_future, runner_future], 346 return_when=asyncio.FIRST_COMPLETED) 347 348 if client_future.done(): 349 pass 350 351 elif runner_future.done(): 352 break 353 354 finally: 355 await aio.uncancellable(client.async_close()) 356 357 mlog.debug("connection to server closed") 358 await asyncio.sleep(self._reconnect_delay) 359 360 except Exception as e: 361 mlog.error("component client loop error: %s", e, exc_info=e) 362 363 async def _runner_loop(self, client): 364 try: 365 runner = self._component_cb(client) 366 367 try: 368 await runner.wait_closing() 369 370 finally: 371 await aio.uncancellable(runner.async_close()) 372 373 except Exception as e: 374 mlog.error("component runner loop error: %s", e, exc_info=e)
Eventer component
High-level interface for communication with Event Server, based on information obtained from Monitor Server.
Instance of this class tries to establish active connection with
Event Server within monitor component group server_group
. Once this
connection is established, component_cb
is called with currently active
Client
instance. Result of calling component_cb
should be Runner
representing user defined components activity associated with connection
to active Event Server. Once connection to Event Server is closed or new
active Event Server is detected, associated Runner
is closed. If new
connection to Event Server is successfully established,
component_cb
will be called again to create new Runner
associated with
new instance of Client
.
component_cb
is called when:
* new active Client
is created
Runner
, returned by component_cb
, is closed when:
* Component
is closed
* connection to Event Server is closed
* different active Event Server is detected from Monitor Server's list
of components
Component
is closed when:
* connection to Monitor Server is closed
* Runner
, returned by component_cb
, is closed by causes other
than change of active Event Server
reconnect_delay
defines delay in seconds before trying to reconnect to
Event Server.
241 def __init__(self, 242 monitor_client: hat.monitor.client.Client, 243 server_group: str, 244 component_cb: ComponentCb, 245 subscriptions: list[common.EventType] = [], 246 reconnect_delay: float = 0.5): 247 self._monitor_client = monitor_client 248 self._server_group = server_group 249 self._component_cb = component_cb 250 self._subscriptions = subscriptions 251 self._reconnect_delay = reconnect_delay 252 self._async_group = aio.Group() 253 self._address_queue = aio.Queue() 254 255 self.async_group.spawn(self._monitor_loop) 256 self.async_group.spawn(self._address_loop) 257 258 self.async_group.spawn(aio.call_on_done, monitor_client.wait_closing(), 259 self.close)
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close
33async def listen(address: str, 34 connected_cb: ClientCb | None = None, 35 disconnected_cb: ClientCb | None = None, 36 register_cb: RegisterCb | None = None, 37 query_cb: QueryCb | None = None 38 ) -> 'Server': 39 """Create eventer server instance""" 40 server = Server() 41 server._connected_cb = connected_cb 42 server._disconnected_cb = disconnected_cb 43 server._register_cb = register_cb 44 server._query_cb = query_cb 45 server._next_client_ids = itertools.count(1) 46 server._conns = {} 47 48 server._srv = await chatter.listen(sbs_repo=common.sbs_repo, 49 address=address, 50 connection_cb=server._on_connection) 51 mlog.debug("listening on %s", address) 52 53 return server
Create eventer server instance
56class Server(aio.Resource): 57 58 @property 59 def async_group(self) -> aio.Group: 60 """Async group""" 61 return self._srv.async_group 62 63 def notify(self, events: list[common.Event]): 64 """Notify events to subscribed clients""" 65 for conn in self._conns.values(): 66 conn.notify(events) 67 68 async def _on_connection(self, conn): 69 client_id = next(self._next_client_ids) 70 71 try: 72 if self._connected_cb: 73 await aio.call(self._connected_cb, client_id) 74 75 self._conns[client_id] = _Connection(conn=conn, 76 client_id=client_id, 77 register_cb=self._register_cb, 78 query_cb=self._query_cb) 79 80 await self._conns[client_id].wait_closing() 81 82 except Exception as e: 83 mlog.error("on connection error: %s", e, exc_info=e) 84 85 finally: 86 conn.close() 87 88 if self._disconnected_cb: 89 with contextlib.suppress(Exception): 90 await aio.call(self._disconnected_cb, client_id)
Resource with lifetime control based on Group
.
63 def notify(self, events: list[common.Event]): 64 """Notify events to subscribed clients""" 65 for conn in self._conns.values(): 66 conn.notify(events)
Notify events to subscribed clients
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close