hat.event.eventer

Eventer communication protocol

This package provides Eventer Client as:

connect is used for establishing single eventer connection with Eventer Server which is represented by Client. Once connection is terminated (signaled with Client.closed), it is up to user to repeat connect call and create new Client instance, if additional communication with Event Server is required.

Example of low-level interface usage::

client = await hat.event.eventer.connect(
    'tcp+sbs://127.0.0.1:23012',
    [['x', 'y', 'z']])

registered_events = await client.register_with_response([
    hat.event.common.RegisterEvent(
        event_type=['x', 'y', 'z'],
        source_timestamp=hat.event.common.now(),
        payload=hat.event.common.EventPayload(
            type=hat.event.common.EventPayloadType.BINARY,
            data=b'test'))])

received_events = await client.receive()

queried_events = await client.query(
    hat.event.common.QueryData(
        event_types=[['x', 'y', 'z']],
        max_results=1))

assert registered_events == received_events
assert received_events == queried_events

await client.async_close()

Component provides high-level interface for continuous communication with currenty active Event Server based on information obtained from Monitor Server. This implementation repeatedly tries to create active connection with Eventer Server. When this connection is created, users code is notified by calling component_cb callback. Once connection is closed, user defined resource, resulting from component_cb, is cancelled and Component repeats connection estabishment process.

 1"""Eventer communication protocol
 2
 3This package provides Eventer Client as:
 4    * low-level interface - `Client`
 5    * high-level interface - `Component`
 6
 7`connect` is used for establishing single eventer connection
 8with Eventer Server which is represented by `Client`. Once
 9connection is terminated (signaled with `Client.closed`),
10it is up to user to repeat `connect` call and create new `Client`
11instance, if additional communication with Event Server is required.
12
13Example of low-level interface usage::
14
15    client = await hat.event.eventer.connect(
16        'tcp+sbs://127.0.0.1:23012',
17        [['x', 'y', 'z']])
18
19    registered_events = await client.register_with_response([
20        hat.event.common.RegisterEvent(
21            event_type=['x', 'y', 'z'],
22            source_timestamp=hat.event.common.now(),
23            payload=hat.event.common.EventPayload(
24                type=hat.event.common.EventPayloadType.BINARY,
25                data=b'test'))])
26
27    received_events = await client.receive()
28
29    queried_events = await client.query(
30        hat.event.common.QueryData(
31            event_types=[['x', 'y', 'z']],
32            max_results=1))
33
34    assert registered_events == received_events
35    assert received_events == queried_events
36
37    await client.async_close()
38
39`Component` provides high-level interface for continuous communication with
40currenty active Event Server based on information obtained from Monitor Server.
41This implementation repeatedly tries to create active connection
42with Eventer Server. When this connection is created, users code is notified by
43calling `component_cb` callback. Once connection is closed, user defined
44resource, resulting from `component_cb`, is cancelled and `Component` repeats
45connection estabishment process.
46
47"""
48
49from hat.event.eventer.client import (connect,
50                                      Client,
51                                      Runner,
52                                      ComponentCb,
53                                      Component)
54from hat.event.eventer.server import (ClientId,
55                                      ClientCb,
56                                      RegisterCb,
57                                      QueryCb,
58                                      listen,
59                                      Server)
60
61
62__all__ = ['connect',
63           'Client',
64           'Runner',
65           'ComponentCb',
66           'Component',
67           'ClientId',
68           'ClientCb',
69           'RegisterCb',
70           'QueryCb',
71           'listen',
72           'Server']
async def connect( address: str, subscriptions: list[typing.Tuple[str, ...]] = [], **kwargs) -> Client:
18async def connect(address: str,
19                  subscriptions: list[common.EventType] = [],
20                  **kwargs
21                  ) -> 'Client':
22    """Connect to eventer server
23
24    For address format see `hat.chatter.connect` coroutine.
25
26    According to Event Server specification, each subscription is event
27    type identifier which can contain special subtypes ``?`` and ``*``.
28    Subtype ``?`` can occure at any position inside event type identifier
29    and is used as replacement for any single subtype. Subtype ``*`` is valid
30    only as last subtype in event type identifier and is used as replacement
31    for zero or more arbitrary subtypes.
32
33    If subscription is empty list, client doesn't subscribe for any events and
34    will not receive server's notifications.
35
36    Args:
37        address: event server's address
38        subscriptions: subscriptions
39        kwargs: additional arguments passed to `hat.chatter.connect` coroutine
40
41    """
42    client = Client()
43    client._conv_futures = {}
44    client._event_queue = aio.Queue()
45
46    client._conn = await chatter.connect(common.sbs_repo, address, **kwargs)
47
48    if subscriptions:
49        client._conn.send(chatter.Data(module='HatEventer',
50                                       type='MsgSubscribe',
51                                       data=[list(i) for i in subscriptions]))
52
53    client.async_group.spawn(client._receive_loop)
54    return client

Connect to eventer server

For address format see hat.chatter.connect coroutine.

According to Event Server specification, each subscription is event type identifier which can contain special subtypes ? and *. Subtype ? can occure at any position inside event type identifier and is used as replacement for any single subtype. Subtype * is valid only as last subtype in event type identifier and is used as replacement for zero or more arbitrary subtypes.

If subscription is empty list, client doesn't subscribe for any events and will not receive server's notifications.

Arguments:
  • address: event server's address
  • subscriptions: subscriptions
  • kwargs: additional arguments passed to hat.chatter.connect coroutine
class Client(hat.aio.group.Resource):
 57class Client(aio.Resource):
 58    """Eventer client
 59
 60    For creating new client see `connect` coroutine.
 61
 62    """
 63
 64    @property
 65    def async_group(self) -> aio.Group:
 66        """Async group"""
 67        return self._conn.async_group
 68
 69    async def receive(self) -> list[common.Event]:
 70        """Receive subscribed event notifications
 71
 72        Raises:
 73            ConnectionError
 74
 75        """
 76        try:
 77            return await self._event_queue.get()
 78
 79        except aio.QueueClosedError:
 80            raise ConnectionError()
 81
 82    def register(self, events: list[common.RegisterEvent]):
 83        """Register events
 84
 85        Raises:
 86            ConnectionError
 87
 88        """
 89        msg_data = chatter.Data(module='HatEventer',
 90                                type='MsgRegisterReq',
 91                                data=[common.register_event_to_sbs(i)
 92                                      for i in events])
 93        self._conn.send(msg_data)
 94
 95    async def register_with_response(self,
 96                                     events: list[common.RegisterEvent]
 97                                     ) -> list[common.Event | None]:
 98        """Register events
 99
100        Each `common.RegisterEvent` from `events` is paired with results
101        `common.Event` if new event was successfuly created or ``None`` is new
102        event could not be created.
103
104        Raises:
105            ConnectionError
106
107        """
108        msg_data = chatter.Data(module='HatEventer',
109                                type='MsgRegisterReq',
110                                data=[common.register_event_to_sbs(i)
111                                      for i in events])
112        conv = self._conn.send(msg_data, last=False)
113        return await self._wait_conv_res(conv)
114
115    async def query(self,
116                    data: common.QueryData
117                    ) -> list[common.Event]:
118        """Query events from server
119
120        Raises:
121            ConnectionError
122
123        """
124        msg_data = chatter.Data(module='HatEventer',
125                                type='MsgQueryReq',
126                                data=common.query_to_sbs(data))
127        conv = self._conn.send(msg_data, last=False)
128        return await self._wait_conv_res(conv)
129
130    async def _receive_loop(self):
131        mlog.debug("starting receive loop")
132        try:
133            while True:
134                mlog.debug("waiting for incoming message")
135                msg = await self._conn.receive()
136                msg_type = msg.data.module, msg.data.type
137
138                if msg_type == ('HatEventer', 'MsgNotify'):
139                    mlog.debug("received event notification")
140                    self._process_msg_notify(msg)
141
142                elif msg_type == ('HatEventer', 'MsgQueryRes'):
143                    mlog.debug("received query response")
144                    self._process_msg_query_res(msg)
145
146                elif msg_type == ('HatEventer', 'MsgRegisterRes'):
147                    mlog.debug("received register response")
148                    self._process_msg_register_res(msg)
149
150                else:
151                    raise Exception("unsupported message type")
152
153        except ConnectionError:
154            pass
155
156        except Exception as e:
157            mlog.error("read loop error: %s", e, exc_info=e)
158
159        finally:
160            mlog.debug("stopping receive loop")
161            self.close()
162            self._event_queue.close()
163            for f in self._conv_futures.values():
164                if not f.done():
165                    f.set_exception(ConnectionError())
166
167    async def _wait_conv_res(self, conv):
168        if not self.is_open:
169            raise ConnectionError()
170
171        response_future = asyncio.Future()
172        self._conv_futures[conv] = response_future
173        try:
174            return await response_future
175        finally:
176            self._conv_futures.pop(conv, None)
177
178    def _process_msg_notify(self, msg):
179        events = [common.event_from_sbs(e) for e in msg.data.data]
180        self._event_queue.put_nowait(events)
181
182    def _process_msg_query_res(self, msg):
183        f = self._conv_futures.get(msg.conv)
184        if not f or f.done():
185            return
186        events = [common.event_from_sbs(e) for e in msg.data.data]
187        f.set_result(events)
188
189    def _process_msg_register_res(self, msg):
190        f = self._conv_futures.get(msg.conv)
191        if not f or f.done():
192            return
193        events = [common.event_from_sbs(e) if t == 'event' else None
194                  for t, e in msg.data.data]
195        f.set_result(events)

Eventer client

For creating new client see connect coroutine.

async_group: hat.aio.group.Group

Async group

async def receive(self) -> list[hat.event.common.data.Event]:
69    async def receive(self) -> list[common.Event]:
70        """Receive subscribed event notifications
71
72        Raises:
73            ConnectionError
74
75        """
76        try:
77            return await self._event_queue.get()
78
79        except aio.QueueClosedError:
80            raise ConnectionError()

Receive subscribed event notifications

Raises:
  • ConnectionError
def register(self, events: list[hat.event.common.data.RegisterEvent]):
82    def register(self, events: list[common.RegisterEvent]):
83        """Register events
84
85        Raises:
86            ConnectionError
87
88        """
89        msg_data = chatter.Data(module='HatEventer',
90                                type='MsgRegisterReq',
91                                data=[common.register_event_to_sbs(i)
92                                      for i in events])
93        self._conn.send(msg_data)

Register events

Raises:
  • ConnectionError
async def register_with_response( self, events: list[hat.event.common.data.RegisterEvent]) -> list[hat.event.common.data.Event | None]:
 95    async def register_with_response(self,
 96                                     events: list[common.RegisterEvent]
 97                                     ) -> list[common.Event | None]:
 98        """Register events
 99
100        Each `common.RegisterEvent` from `events` is paired with results
101        `common.Event` if new event was successfuly created or ``None`` is new
102        event could not be created.
103
104        Raises:
105            ConnectionError
106
107        """
108        msg_data = chatter.Data(module='HatEventer',
109                                type='MsgRegisterReq',
110                                data=[common.register_event_to_sbs(i)
111                                      for i in events])
112        conv = self._conn.send(msg_data, last=False)
113        return await self._wait_conv_res(conv)

Register events

Each common.RegisterEvent from events is paired with results common.Event if new event was successfuly created or None is new event could not be created.

Raises:
  • ConnectionError
async def query( self, data: hat.event.common.data.QueryData) -> list[hat.event.common.data.Event]:
115    async def query(self,
116                    data: common.QueryData
117                    ) -> list[common.Event]:
118        """Query events from server
119
120        Raises:
121            ConnectionError
122
123        """
124        msg_data = chatter.Data(module='HatEventer',
125                                type='MsgQueryReq',
126                                data=common.query_to_sbs(data))
127        conv = self._conn.send(msg_data, last=False)
128        return await self._wait_conv_res(conv)

Query events from server

Raises:
  • ConnectionError
Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
Runner = <class 'hat.aio.group.Resource'>
ComponentCb = typing.Callable[[Client], hat.aio.group.Resource]
class Component(hat.aio.group.Resource):
205class Component(aio.Resource):
206    """Eventer component
207
208    High-level interface for communication with Event Server, based on
209    information obtained from Monitor Server.
210
211    Instance of this class tries to establish active connection with
212    Event Server within monitor component group `server_group`. Once this
213    connection is established, `component_cb` is called with currently active
214    `Client` instance. Result of calling `component_cb` should be `Runner`
215    representing user defined components activity associated with connection
216    to active Event Server. Once connection to Event Server is closed or new
217    active Event Server is detected, associated `Runner` is closed. If new
218    connection to Event Server is successfully established,
219    `component_cb` will be called again to create new `Runner` associated with
220    new instance of `Client`.
221
222    `component_cb` is called when:
223        * new active `Client` is created
224
225    `Runner`, returned by `component_cb`, is closed when:
226        * `Component` is closed
227        * connection to Event Server is closed
228        * different active Event Server is detected from Monitor Server's list
229          of components
230
231    `Component` is closed when:
232        * connection to Monitor Server is closed
233        * `Runner`, returned by `component_cb`, is closed by causes other
234          than change of active Event Server
235
236    `reconnect_delay` defines delay in seconds before trying to reconnect to
237    Event Server.
238
239    """
240
241    def __init__(self,
242                 monitor_client: hat.monitor.client.Client,
243                 server_group: str,
244                 component_cb: ComponentCb,
245                 subscriptions: list[common.EventType] = [],
246                 reconnect_delay: float = 0.5):
247        self._monitor_client = monitor_client
248        self._server_group = server_group
249        self._component_cb = component_cb
250        self._subscriptions = subscriptions
251        self._reconnect_delay = reconnect_delay
252        self._async_group = aio.Group()
253        self._address_queue = aio.Queue()
254
255        self.async_group.spawn(self._monitor_loop)
256        self.async_group.spawn(self._address_loop)
257
258        self.async_group.spawn(aio.call_on_done, monitor_client.wait_closing(),
259                               self.close)
260
261    @property
262    def async_group(self):
263        return self._async_group
264
265    async def _monitor_loop(self):
266        last_address = None
267        changes = aio.Queue()
268
269        def on_change():
270            changes.put_nowait(None)
271
272        def info_filter(info):
273            return (info.group == self._server_group and
274                    info.blessing_req.token is not None and
275                    info.blessing_req.token == info.blessing_res.token)
276
277        try:
278            with self._monitor_client.register_change_cb(on_change):
279                while True:
280                    info = util.first(self._monitor_client.components,
281                                      info_filter)
282                    address = (info.data.get('eventer_server_address')
283                               if info else None)
284
285                    if address and address != last_address:
286                        mlog.debug("new server address: %s", address)
287                        last_address = address
288                        self._address_queue.put_nowait(address)
289
290                    await changes.get()
291
292        except Exception as e:
293            mlog.error("component monitor loop error: %s", e, exc_info=e)
294
295        finally:
296            self.close()
297
298    async def _address_loop(self):
299        try:
300            address = None
301            while True:
302                while not address:
303                    address = await self._address_queue.get_until_empty()
304
305                async with self.async_group.create_subgroup() as subgroup:
306                    address_future = subgroup.spawn(
307                        self._address_queue.get_until_empty)
308                    client_future = subgroup.spawn(self._client_loop, address)
309
310                    await asyncio.wait([address_future, client_future],
311                                       return_when=asyncio.FIRST_COMPLETED)
312
313                    if address_future.done():
314                        address = address_future.result()
315
316                    elif client_future.done():
317                        break
318
319        except Exception as e:
320            mlog.error("component address loop error: %s", e, exc_info=e)
321
322        finally:
323            self.close()
324
325    async def _client_loop(self, address):
326        try:
327            while True:
328                try:
329                    mlog.debug("connecting to server %s", address)
330                    client = await connect(address, self._subscriptions)
331
332                except Exception as e:
333                    mlog.warning("error connecting to server: %s", e,
334                                 exc_info=e)
335                    await asyncio.sleep(self._reconnect_delay)
336                    continue
337
338                try:
339                    mlog.debug("connected to server")
340                    async with self.async_group.create_subgroup() as subgroup:
341                        client_future = subgroup.spawn(client.wait_closing)
342                        runner_future = subgroup.spawn(self._runner_loop,
343                                                       client)
344
345                        await asyncio.wait([client_future, runner_future],
346                                           return_when=asyncio.FIRST_COMPLETED)
347
348                        if client_future.done():
349                            pass
350
351                        elif runner_future.done():
352                            break
353
354                finally:
355                    await aio.uncancellable(client.async_close())
356
357                mlog.debug("connection to server closed")
358                await asyncio.sleep(self._reconnect_delay)
359
360        except Exception as e:
361            mlog.error("component client loop error: %s", e, exc_info=e)
362
363    async def _runner_loop(self, client):
364        try:
365            runner = self._component_cb(client)
366
367            try:
368                await runner.wait_closing()
369
370            finally:
371                await aio.uncancellable(runner.async_close())
372
373        except Exception as e:
374            mlog.error("component runner loop error: %s", e, exc_info=e)

Eventer component

High-level interface for communication with Event Server, based on information obtained from Monitor Server.

Instance of this class tries to establish active connection with Event Server within monitor component group server_group. Once this connection is established, component_cb is called with currently active Client instance. Result of calling component_cb should be Runner representing user defined components activity associated with connection to active Event Server. Once connection to Event Server is closed or new active Event Server is detected, associated Runner is closed. If new connection to Event Server is successfully established, component_cb will be called again to create new Runner associated with new instance of Client.

component_cb is called when: * new active Client is created

Runner, returned by component_cb, is closed when: * Component is closed * connection to Event Server is closed * different active Event Server is detected from Monitor Server's list of components

Component is closed when: * connection to Monitor Server is closed * Runner, returned by component_cb, is closed by causes other than change of active Event Server

reconnect_delay defines delay in seconds before trying to reconnect to Event Server.

Component( monitor_client: hat.monitor.client.Client, server_group: str, component_cb: Callable[[Client], hat.aio.group.Resource], subscriptions: list[typing.Tuple[str, ...]] = [], reconnect_delay: float = 0.5)
241    def __init__(self,
242                 monitor_client: hat.monitor.client.Client,
243                 server_group: str,
244                 component_cb: ComponentCb,
245                 subscriptions: list[common.EventType] = [],
246                 reconnect_delay: float = 0.5):
247        self._monitor_client = monitor_client
248        self._server_group = server_group
249        self._component_cb = component_cb
250        self._subscriptions = subscriptions
251        self._reconnect_delay = reconnect_delay
252        self._async_group = aio.Group()
253        self._address_queue = aio.Queue()
254
255        self.async_group.spawn(self._monitor_loop)
256        self.async_group.spawn(self._address_loop)
257
258        self.async_group.spawn(aio.call_on_done, monitor_client.wait_closing(),
259                               self.close)
async_group

Group controlling resource's lifetime.

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
ClientId = <class 'int'>
ClientCb = typing.Callable[[int], typing.Optional[typing.Awaitable[NoneType]]]
RegisterCb = typing.Callable[[int, list[hat.event.common.data.RegisterEvent]], typing.Union[list[hat.event.common.data.Event], typing.Awaitable[list[hat.event.common.data.Event]]]]
QueryCb = typing.Callable[[int, list[hat.event.common.data.QueryData]], typing.Union[list[hat.event.common.data.Event], typing.Awaitable[list[hat.event.common.data.Event]]]]
async def listen( address: str, connected_cb: Optional[Callable[[int], Optional[Awaitable[NoneType]]]] = None, disconnected_cb: Optional[Callable[[int], Optional[Awaitable[NoneType]]]] = None, register_cb: Optional[Callable[[int, list[hat.event.common.data.RegisterEvent]], Union[list[hat.event.common.data.Event], Awaitable[list[hat.event.common.data.Event]]]]] = None, query_cb: Optional[Callable[[int, list[hat.event.common.data.QueryData]], Union[list[hat.event.common.data.Event], Awaitable[list[hat.event.common.data.Event]]]]] = None) -> Server:
33async def listen(address: str,
34                 connected_cb: ClientCb | None = None,
35                 disconnected_cb: ClientCb | None = None,
36                 register_cb: RegisterCb | None = None,
37                 query_cb: QueryCb | None = None
38                 ) -> 'Server':
39    """Create eventer server instance"""
40    server = Server()
41    server._connected_cb = connected_cb
42    server._disconnected_cb = disconnected_cb
43    server._register_cb = register_cb
44    server._query_cb = query_cb
45    server._next_client_ids = itertools.count(1)
46    server._conns = {}
47
48    server._srv = await chatter.listen(sbs_repo=common.sbs_repo,
49                                       address=address,
50                                       connection_cb=server._on_connection)
51    mlog.debug("listening on %s", address)
52
53    return server

Create eventer server instance

class Server(hat.aio.group.Resource):
56class Server(aio.Resource):
57
58    @property
59    def async_group(self) -> aio.Group:
60        """Async group"""
61        return self._srv.async_group
62
63    def notify(self, events: list[common.Event]):
64        """Notify events to subscribed clients"""
65        for conn in self._conns.values():
66            conn.notify(events)
67
68    async def _on_connection(self, conn):
69        client_id = next(self._next_client_ids)
70
71        try:
72            if self._connected_cb:
73                await aio.call(self._connected_cb, client_id)
74
75            self._conns[client_id] = _Connection(conn=conn,
76                                                 client_id=client_id,
77                                                 register_cb=self._register_cb,
78                                                 query_cb=self._query_cb)
79
80            await self._conns[client_id].wait_closing()
81
82        except Exception as e:
83            mlog.error("on connection error: %s", e, exc_info=e)
84
85        finally:
86            conn.close()
87
88            if self._disconnected_cb:
89                with contextlib.suppress(Exception):
90                    await aio.call(self._disconnected_cb, client_id)

Resource with lifetime control based on Group.

async_group: hat.aio.group.Group

Async group

def notify(self, events: list[hat.event.common.data.Event]):
63    def notify(self, events: list[common.Event]):
64        """Notify events to subscribed clients"""
65        for conn in self._conns.values():
66            conn.notify(events)

Notify events to subscribed clients

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close