hat.event.eventer_client

Library used by components for communication with Event Server based on eventer protocol.

This module provides low-level interface (connect/EventerClient) and high-level interface (run_eventer_client).

connect() is used for establishing single eventer connection with Event Server which is represented by EventerClient. Once connection is terminated (signaled with EventerClient.closed()), it is up to user to repeat connect() call and create new EventerClient instance, if additional communication with Event Server is required.

Example of low-level interface usage::

client = await hat.event.eventer_client.connect(
    'tcp+sbs://127.0.0.1:23012',
    [['x', 'y', 'z']])

registered_events = await client.register_with_response([
    hat.event.common.RegisterEvent(
        event_type=['x', 'y', 'z'],
        source_timestamp=hat.event.common.now(),
        payload=hat.event.common.EventPayload(
            type=hat.event.common.EventPayloadType.BINARY,
            data=b'test'))])

received_events = await client.receive()

queried_events = await client.query(
    hat.event.common.QueryData(
        event_types=[['x', 'y', 'z']],
        max_results=1))

assert registered_events == received_events
assert received_events == queried_events

await client.async_close()

run_eventer_client() provides high-level interface for continuous communication with currenty active Event Server based on information obtained from Monitor Server. This function repeatedly tries to create active connection with Event Server. When this connection is created, users code is notified by calling run_cb callback. Once connection is closed, execution of run_cb is cancelled and run_eventer_client() repeats connection estabishment process.

Example of high-level interface usage::

async def monitor_run(component):
    await hat.event.eventer_client.run_eventer_client(
        monitor_client=monitor,
        server_group='event servers',
        run_cb=event_run])

async def event_run(client):
    while True:
        assert not client.is_closed
        await asyncio.sleep(10)

monitor = await hat.monitor.client.connect({
    'name': 'client',
    'group': 'test clients',
    'monitor_address': 'tcp+sbs://127.0.0.1:23010'})
component = hat.monitor.client.Component(monitor, monitor_run)
component.set_enabled(True)
await monitor.async_close()
  1"""Library used by components for communication with Event Server based on
  2eventer protocol.
  3
  4This module provides low-level interface (connect/EventerClient) and high-level
  5interface (run_eventer_client).
  6
  7:func:`connect` is used for establishing single eventer connection
  8with Event Server which is represented by :class:`EventerClient`. Once
  9connection is terminated (signaled with :meth:`EventerClient.closed`),
 10it is up to user to repeat :func:`connect` call and create new
 11:class:`EventerClient` instance, if additional communication with Event Server
 12is required.
 13
 14Example of low-level interface usage::
 15
 16    client = await hat.event.eventer_client.connect(
 17        'tcp+sbs://127.0.0.1:23012',
 18        [['x', 'y', 'z']])
 19
 20    registered_events = await client.register_with_response([
 21        hat.event.common.RegisterEvent(
 22            event_type=['x', 'y', 'z'],
 23            source_timestamp=hat.event.common.now(),
 24            payload=hat.event.common.EventPayload(
 25                type=hat.event.common.EventPayloadType.BINARY,
 26                data=b'test'))])
 27
 28    received_events = await client.receive()
 29
 30    queried_events = await client.query(
 31        hat.event.common.QueryData(
 32            event_types=[['x', 'y', 'z']],
 33            max_results=1))
 34
 35    assert registered_events == received_events
 36    assert received_events == queried_events
 37
 38    await client.async_close()
 39
 40:func:`run_eventer_client` provides high-level interface for continuous
 41communication with currenty active Event Server based on information obtained
 42from Monitor Server.
 43This function repeatedly tries to create active connection with
 44Event Server. When this connection is created, users code is notified by
 45calling `run_cb` callback. Once connection is closed, execution of
 46`run_cb` is cancelled and :func:`run_eventer_client` repeats connection
 47estabishment process.
 48
 49Example of high-level interface usage::
 50
 51    async def monitor_run(component):
 52        await hat.event.eventer_client.run_eventer_client(
 53            monitor_client=monitor,
 54            server_group='event servers',
 55            run_cb=event_run])
 56
 57    async def event_run(client):
 58        while True:
 59            assert not client.is_closed
 60            await asyncio.sleep(10)
 61
 62    monitor = await hat.monitor.client.connect({
 63        'name': 'client',
 64        'group': 'test clients',
 65        'monitor_address': 'tcp+sbs://127.0.0.1:23010'})
 66    component = hat.monitor.client.Component(monitor, monitor_run)
 67    component.set_enabled(True)
 68    await monitor.async_close()
 69
 70"""
 71
 72import asyncio
 73import contextlib
 74import logging
 75import typing
 76
 77from hat import aio
 78from hat import chatter
 79from hat import util
 80from hat.event import common
 81import hat.monitor.client
 82
 83
 84mlog: logging.Logger = logging.getLogger(__name__)
 85"""Module logger"""
 86
 87reconnect_delay: float = 0.5
 88"""Delay in seconds before trying to reconnect to event server
 89(used in high-level interface)"""
 90
 91RunCb = typing.Callable[['EventerClient'], typing.Awaitable]
 92"""Event client run callback coroutine"""
 93
 94
 95async def connect(address: str,
 96                  subscriptions: typing.List[common.EventType] = [],
 97                  **kwargs
 98                  ) -> 'EventerClient':
 99    """Connect to eventer server
100
101    For address format see `hat.chatter.connect` coroutine.
102
103    According to Event Server specification, each subscription is event
104    type identifier which can contain special subtypes ``?`` and ``*``.
105    Subtype ``?`` can occure at any position inside event type identifier
106    and is used as replacement for any single subtype. Subtype ``*`` is valid
107    only as last subtype in event type identifier and is used as replacement
108    for zero or more arbitrary subtypes.
109
110    If subscription is empty list, client doesn't subscribe for any events and
111    will not receive server's notifications.
112
113    Args:
114        address: event server's address
115        subscriptions: subscriptions
116        kwargs: additional arguments passed to `hat.chatter.connect` coroutine
117
118    """
119    client = EventerClient()
120    client._conv_futures = {}
121    client._event_queue = aio.Queue()
122
123    client._conn = await chatter.connect(common.sbs_repo, address, **kwargs)
124
125    if subscriptions:
126        client._conn.send(chatter.Data(module='HatEventer',
127                                       type='MsgSubscribe',
128                                       data=[list(i) for i in subscriptions]))
129
130    client.async_group.spawn(client._receive_loop)
131    return client
132
133
134class EventerClient(aio.Resource):
135    """Eventer client
136
137    For creating new client see `connect` coroutine.
138
139    """
140
141    @property
142    def async_group(self) -> aio.Group:
143        """Async group"""
144        return self._conn.async_group
145
146    async def receive(self) -> typing.List[common.Event]:
147        """Receive subscribed event notifications
148
149        Raises:
150            ConnectionError
151
152        """
153        try:
154            return await self._event_queue.get()
155
156        except aio.QueueClosedError:
157            raise ConnectionError()
158
159    def register(self, events: typing.List[common.RegisterEvent]):
160        """Register events
161
162        Raises:
163            ConnectionError
164
165        """
166        msg_data = chatter.Data(module='HatEventer',
167                                type='MsgRegisterReq',
168                                data=[common.register_event_to_sbs(i)
169                                      for i in events])
170        self._conn.send(msg_data)
171
172    async def register_with_response(self,
173                                     events: typing.List[common.RegisterEvent]
174                                     ) -> typing.List[typing.Optional[common.Event]]:  # NOQA
175        """Register events
176
177        Each `common.RegisterEvent` from `events` is paired with results
178        `common.Event` if new event was successfuly created or ``None`` is new
179        event could not be created.
180
181        Raises:
182            ConnectionError
183
184        """
185        msg_data = chatter.Data(module='HatEventer',
186                                type='MsgRegisterReq',
187                                data=[common.register_event_to_sbs(i)
188                                      for i in events])
189        conv = self._conn.send(msg_data, last=False)
190        return await self._wait_conv_res(conv)
191
192    async def query(self,
193                    data: common.QueryData
194                    ) -> typing.List[common.Event]:
195        """Query events from server
196
197        Raises:
198            ConnectionError
199
200        """
201        msg_data = chatter.Data(module='HatEventer',
202                                type='MsgQueryReq',
203                                data=common.query_to_sbs(data))
204        conv = self._conn.send(msg_data, last=False)
205        return await self._wait_conv_res(conv)
206
207    async def _receive_loop(self):
208        mlog.debug("starting receive loop")
209        try:
210            while True:
211                mlog.debug("waiting for incoming message")
212                msg = await self._conn.receive()
213                msg_type = msg.data.module, msg.data.type
214
215                if msg_type == ('HatEventer', 'MsgNotify'):
216                    mlog.debug("received event notification")
217                    self._process_msg_notify(msg)
218
219                elif msg_type == ('HatEventer', 'MsgQueryRes'):
220                    mlog.debug("received query response")
221                    self._process_msg_query_res(msg)
222
223                elif msg_type == ('HatEventer', 'MsgRegisterRes'):
224                    mlog.debug("received register response")
225                    self._process_msg_register_res(msg)
226
227                else:
228                    raise Exception("unsupported message type")
229
230        except ConnectionError:
231            pass
232
233        except Exception as e:
234            mlog.error("read loop error: %s", e, exc_info=e)
235
236        finally:
237            mlog.debug("stopping receive loop")
238            self.close()
239            self._event_queue.close()
240            for f in self._conv_futures.values():
241                if not f.done():
242                    f.set_exception(ConnectionError())
243
244    async def _wait_conv_res(self, conv):
245        if not self.is_open:
246            raise ConnectionError()
247
248        response_future = asyncio.Future()
249        self._conv_futures[conv] = response_future
250        try:
251            return await response_future
252        finally:
253            self._conv_futures.pop(conv, None)
254
255    def _process_msg_notify(self, msg):
256        events = [common.event_from_sbs(e) for e in msg.data.data]
257        self._event_queue.put_nowait(events)
258
259    def _process_msg_query_res(self, msg):
260        f = self._conv_futures.get(msg.conv)
261        if not f or f.done():
262            return
263        events = [common.event_from_sbs(e) for e in msg.data.data]
264        f.set_result(events)
265
266    def _process_msg_register_res(self, msg):
267        f = self._conv_futures.get(msg.conv)
268        if not f or f.done():
269            return
270        events = [common.event_from_sbs(e) if t == 'event' else None
271                  for t, e in msg.data.data]
272        f.set_result(events)
273
274
275async def run_eventer_client(monitor_client: hat.monitor.client.Client,
276                             server_group: str,
277                             run_cb: RunCb,
278                             subscriptions: typing.List[common.EventType] = []
279                             ) -> typing.Any:
280    """Continuously communicate with currently active Event Server
281
282    This function tries to establish active connection with Event Server
283    within monitor component group `server_group`. Once this connection is
284    established, `run_cb` is called with currently active `EventerClient`
285    instance. Once connection to Event Server is closed or new active Event
286    Server is detected, execution of `run_cb` is canceled. If new
287    connection to Event Server is successfully established,
288    `run_cb` is called with new instance of `EventerClient`.
289
290    `run_cb` is called when:
291        * new active `EventerClient` is created
292
293    `run_cb` execution is cancelled when:
294        * `run_eventer_client` finishes execution
295        * connection to Event Server is closed
296        * different active Event Server is detected from Monitor Server's list
297          of components
298
299    `run_eventer_client` finishes execution when:
300        * connection to Monitor Server is closed
301        * `run_cb` finishes execution (by returning value or raising
302          exception, other than `asyncio.CancelledError`)
303
304    Return value of this function is the same as return value of
305    `run_cb`. If `run_cb` finishes by raising exception or if
306    connection to Monitor Server is closed, ConnectionError is raised.
307
308    If execution of `run_eventer_client` is canceled while `run_cb` is
309    running, connection to Event Server is closed after `run_cb`
310    cancellation finishes.
311
312    """
313    async_group = aio.Group()
314    address_queue = aio.Queue()
315    async_group.spawn(aio.call_on_done, monitor_client.wait_closing(),
316                      address_queue.close)
317    async_group.spawn(_address_loop, monitor_client, server_group,
318                      address_queue)
319
320    address = None
321    try:
322        while True:
323            while not address:
324                address = await address_queue.get_until_empty()
325
326            async with async_group.create_subgroup() as subgroup:
327                address_future = subgroup.spawn(address_queue.get_until_empty)
328                client_future = subgroup.spawn(_client_loop, address,
329                                               subscriptions, run_cb)
330
331                await asyncio.wait([address_future, client_future],
332                                   return_when=asyncio.FIRST_COMPLETED)
333
334                if address_future.done():
335                    address = address_future.result()
336                else:
337                    return client_future.result()
338
339    except aio.QueueClosedError:
340        raise ConnectionError()
341
342    finally:
343        await aio.uncancellable(async_group.async_close())
344
345
346async def _address_loop(monitor_client, server_group, address_queue):
347    last_address = None
348    changes = aio.Queue()
349
350    with monitor_client.register_change_cb(lambda: changes.put_nowait(None)):
351        while True:
352            info = util.first(monitor_client.components, lambda c: (
353                c.group == server_group and
354                c.blessing_req.token is not None and
355                c.blessing_req.token == c.blessing_res.token))
356            address = info.data['eventer_server_address'] if info else None
357
358            if (address and address != last_address and
359                    not address_queue.is_closed):
360                mlog.debug("new server address: %s", address)
361                last_address = address
362                address_queue.put_nowait(address)
363
364            await changes.get()
365
366
367async def _client_loop(address, subscriptions, run_cb):
368    while True:
369        client = None
370        try:
371            mlog.debug("connecting to server %s", address)
372            try:
373                client = await connect(address, subscriptions)
374            except Exception as e:
375                mlog.warning("error connecting to server: %s", e, exc_info=e)
376                await asyncio.sleep(reconnect_delay)
377                continue
378
379            mlog.debug("connected to server - running run_cb")
380
381            subgroup = client.async_group.create_subgroup()
382            try:
383                run_future = subgroup.spawn(run_cb, client)
384                await asyncio.wait([run_future])
385
386                with contextlib.suppress(asyncio.CancelledError):
387                    return run_future.result()
388
389            finally:
390                await aio.uncancellable(subgroup.async_close())
391
392        finally:
393            if client:
394                await aio.uncancellable(client.async_close())
395
396        mlog.debug("connection to server closed")
397        await asyncio.sleep(reconnect_delay)
mlog: logging.Logger = <Logger hat.event.eventer_client (WARNING)>

Module logger

reconnect_delay: float = 0.5

Delay in seconds before trying to reconnect to event server (used in high-level interface)

RunCb = typing.Callable[[ForwardRef('EventerClient')], typing.Awaitable]

Event client run callback coroutine

async def connect( address: str, subscriptions: List[Tuple[str, ...]] = [], **kwargs) -> hat.event.eventer_client.EventerClient:
 96async def connect(address: str,
 97                  subscriptions: typing.List[common.EventType] = [],
 98                  **kwargs
 99                  ) -> 'EventerClient':
100    """Connect to eventer server
101
102    For address format see `hat.chatter.connect` coroutine.
103
104    According to Event Server specification, each subscription is event
105    type identifier which can contain special subtypes ``?`` and ``*``.
106    Subtype ``?`` can occure at any position inside event type identifier
107    and is used as replacement for any single subtype. Subtype ``*`` is valid
108    only as last subtype in event type identifier and is used as replacement
109    for zero or more arbitrary subtypes.
110
111    If subscription is empty list, client doesn't subscribe for any events and
112    will not receive server's notifications.
113
114    Args:
115        address: event server's address
116        subscriptions: subscriptions
117        kwargs: additional arguments passed to `hat.chatter.connect` coroutine
118
119    """
120    client = EventerClient()
121    client._conv_futures = {}
122    client._event_queue = aio.Queue()
123
124    client._conn = await chatter.connect(common.sbs_repo, address, **kwargs)
125
126    if subscriptions:
127        client._conn.send(chatter.Data(module='HatEventer',
128                                       type='MsgSubscribe',
129                                       data=[list(i) for i in subscriptions]))
130
131    client.async_group.spawn(client._receive_loop)
132    return client

Connect to eventer server

For address format see hat.chatter.connect coroutine.

According to Event Server specification, each subscription is event type identifier which can contain special subtypes ? and *. Subtype ? can occure at any position inside event type identifier and is used as replacement for any single subtype. Subtype * is valid only as last subtype in event type identifier and is used as replacement for zero or more arbitrary subtypes.

If subscription is empty list, client doesn't subscribe for any events and will not receive server's notifications.

Arguments:
  • address: event server's address
  • subscriptions: subscriptions
  • kwargs: additional arguments passed to hat.chatter.connect coroutine
class EventerClient(hat.aio.Resource):
135class EventerClient(aio.Resource):
136    """Eventer client
137
138    For creating new client see `connect` coroutine.
139
140    """
141
142    @property
143    def async_group(self) -> aio.Group:
144        """Async group"""
145        return self._conn.async_group
146
147    async def receive(self) -> typing.List[common.Event]:
148        """Receive subscribed event notifications
149
150        Raises:
151            ConnectionError
152
153        """
154        try:
155            return await self._event_queue.get()
156
157        except aio.QueueClosedError:
158            raise ConnectionError()
159
160    def register(self, events: typing.List[common.RegisterEvent]):
161        """Register events
162
163        Raises:
164            ConnectionError
165
166        """
167        msg_data = chatter.Data(module='HatEventer',
168                                type='MsgRegisterReq',
169                                data=[common.register_event_to_sbs(i)
170                                      for i in events])
171        self._conn.send(msg_data)
172
173    async def register_with_response(self,
174                                     events: typing.List[common.RegisterEvent]
175                                     ) -> typing.List[typing.Optional[common.Event]]:  # NOQA
176        """Register events
177
178        Each `common.RegisterEvent` from `events` is paired with results
179        `common.Event` if new event was successfuly created or ``None`` is new
180        event could not be created.
181
182        Raises:
183            ConnectionError
184
185        """
186        msg_data = chatter.Data(module='HatEventer',
187                                type='MsgRegisterReq',
188                                data=[common.register_event_to_sbs(i)
189                                      for i in events])
190        conv = self._conn.send(msg_data, last=False)
191        return await self._wait_conv_res(conv)
192
193    async def query(self,
194                    data: common.QueryData
195                    ) -> typing.List[common.Event]:
196        """Query events from server
197
198        Raises:
199            ConnectionError
200
201        """
202        msg_data = chatter.Data(module='HatEventer',
203                                type='MsgQueryReq',
204                                data=common.query_to_sbs(data))
205        conv = self._conn.send(msg_data, last=False)
206        return await self._wait_conv_res(conv)
207
208    async def _receive_loop(self):
209        mlog.debug("starting receive loop")
210        try:
211            while True:
212                mlog.debug("waiting for incoming message")
213                msg = await self._conn.receive()
214                msg_type = msg.data.module, msg.data.type
215
216                if msg_type == ('HatEventer', 'MsgNotify'):
217                    mlog.debug("received event notification")
218                    self._process_msg_notify(msg)
219
220                elif msg_type == ('HatEventer', 'MsgQueryRes'):
221                    mlog.debug("received query response")
222                    self._process_msg_query_res(msg)
223
224                elif msg_type == ('HatEventer', 'MsgRegisterRes'):
225                    mlog.debug("received register response")
226                    self._process_msg_register_res(msg)
227
228                else:
229                    raise Exception("unsupported message type")
230
231        except ConnectionError:
232            pass
233
234        except Exception as e:
235            mlog.error("read loop error: %s", e, exc_info=e)
236
237        finally:
238            mlog.debug("stopping receive loop")
239            self.close()
240            self._event_queue.close()
241            for f in self._conv_futures.values():
242                if not f.done():
243                    f.set_exception(ConnectionError())
244
245    async def _wait_conv_res(self, conv):
246        if not self.is_open:
247            raise ConnectionError()
248
249        response_future = asyncio.Future()
250        self._conv_futures[conv] = response_future
251        try:
252            return await response_future
253        finally:
254            self._conv_futures.pop(conv, None)
255
256    def _process_msg_notify(self, msg):
257        events = [common.event_from_sbs(e) for e in msg.data.data]
258        self._event_queue.put_nowait(events)
259
260    def _process_msg_query_res(self, msg):
261        f = self._conv_futures.get(msg.conv)
262        if not f or f.done():
263            return
264        events = [common.event_from_sbs(e) for e in msg.data.data]
265        f.set_result(events)
266
267    def _process_msg_register_res(self, msg):
268        f = self._conv_futures.get(msg.conv)
269        if not f or f.done():
270            return
271        events = [common.event_from_sbs(e) if t == 'event' else None
272                  for t, e in msg.data.data]
273        f.set_result(events)

Eventer client

For creating new client see connect coroutine.

EventerClient()
async_group: hat.aio.Group

Async group

async def receive(self) -> List[hat.event.common.data.Event]:
147    async def receive(self) -> typing.List[common.Event]:
148        """Receive subscribed event notifications
149
150        Raises:
151            ConnectionError
152
153        """
154        try:
155            return await self._event_queue.get()
156
157        except aio.QueueClosedError:
158            raise ConnectionError()

Receive subscribed event notifications

Raises:
  • ConnectionError
def register(self, events: List[hat.event.common.data.RegisterEvent]):
160    def register(self, events: typing.List[common.RegisterEvent]):
161        """Register events
162
163        Raises:
164            ConnectionError
165
166        """
167        msg_data = chatter.Data(module='HatEventer',
168                                type='MsgRegisterReq',
169                                data=[common.register_event_to_sbs(i)
170                                      for i in events])
171        self._conn.send(msg_data)

Register events

Raises:
  • ConnectionError
async def register_with_response( self, events: List[hat.event.common.data.RegisterEvent]) -> List[Optional[hat.event.common.data.Event]]:
173    async def register_with_response(self,
174                                     events: typing.List[common.RegisterEvent]
175                                     ) -> typing.List[typing.Optional[common.Event]]:  # NOQA
176        """Register events
177
178        Each `common.RegisterEvent` from `events` is paired with results
179        `common.Event` if new event was successfuly created or ``None`` is new
180        event could not be created.
181
182        Raises:
183            ConnectionError
184
185        """
186        msg_data = chatter.Data(module='HatEventer',
187                                type='MsgRegisterReq',
188                                data=[common.register_event_to_sbs(i)
189                                      for i in events])
190        conv = self._conn.send(msg_data, last=False)
191        return await self._wait_conv_res(conv)

Register events

Each common.RegisterEvent from events is paired with results common.Event if new event was successfuly created or None is new event could not be created.

Raises:
  • ConnectionError
async def query( self, data: hat.event.common.data.QueryData) -> List[hat.event.common.data.Event]:
193    async def query(self,
194                    data: common.QueryData
195                    ) -> typing.List[common.Event]:
196        """Query events from server
197
198        Raises:
199            ConnectionError
200
201        """
202        msg_data = chatter.Data(module='HatEventer',
203                                type='MsgQueryReq',
204                                data=common.query_to_sbs(data))
205        conv = self._conn.send(msg_data, last=False)
206        return await self._wait_conv_res(conv)

Query events from server

Raises:
  • ConnectionError
Inherited Members
hat.aio.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
async def run_eventer_client( monitor_client: hat.monitor.client.Client, server_group: str, run_cb: Callable[[hat.event.eventer_client.EventerClient], Awaitable], subscriptions: List[Tuple[str, ...]] = []) -> Any:
276async def run_eventer_client(monitor_client: hat.monitor.client.Client,
277                             server_group: str,
278                             run_cb: RunCb,
279                             subscriptions: typing.List[common.EventType] = []
280                             ) -> typing.Any:
281    """Continuously communicate with currently active Event Server
282
283    This function tries to establish active connection with Event Server
284    within monitor component group `server_group`. Once this connection is
285    established, `run_cb` is called with currently active `EventerClient`
286    instance. Once connection to Event Server is closed or new active Event
287    Server is detected, execution of `run_cb` is canceled. If new
288    connection to Event Server is successfully established,
289    `run_cb` is called with new instance of `EventerClient`.
290
291    `run_cb` is called when:
292        * new active `EventerClient` is created
293
294    `run_cb` execution is cancelled when:
295        * `run_eventer_client` finishes execution
296        * connection to Event Server is closed
297        * different active Event Server is detected from Monitor Server's list
298          of components
299
300    `run_eventer_client` finishes execution when:
301        * connection to Monitor Server is closed
302        * `run_cb` finishes execution (by returning value or raising
303          exception, other than `asyncio.CancelledError`)
304
305    Return value of this function is the same as return value of
306    `run_cb`. If `run_cb` finishes by raising exception or if
307    connection to Monitor Server is closed, ConnectionError is raised.
308
309    If execution of `run_eventer_client` is canceled while `run_cb` is
310    running, connection to Event Server is closed after `run_cb`
311    cancellation finishes.
312
313    """
314    async_group = aio.Group()
315    address_queue = aio.Queue()
316    async_group.spawn(aio.call_on_done, monitor_client.wait_closing(),
317                      address_queue.close)
318    async_group.spawn(_address_loop, monitor_client, server_group,
319                      address_queue)
320
321    address = None
322    try:
323        while True:
324            while not address:
325                address = await address_queue.get_until_empty()
326
327            async with async_group.create_subgroup() as subgroup:
328                address_future = subgroup.spawn(address_queue.get_until_empty)
329                client_future = subgroup.spawn(_client_loop, address,
330                                               subscriptions, run_cb)
331
332                await asyncio.wait([address_future, client_future],
333                                   return_when=asyncio.FIRST_COMPLETED)
334
335                if address_future.done():
336                    address = address_future.result()
337                else:
338                    return client_future.result()
339
340    except aio.QueueClosedError:
341        raise ConnectionError()
342
343    finally:
344        await aio.uncancellable(async_group.async_close())

Continuously communicate with currently active Event Server

This function tries to establish active connection with Event Server within monitor component group server_group. Once this connection is established, run_cb is called with currently active EventerClient instance. Once connection to Event Server is closed or new active Event Server is detected, execution of run_cb is canceled. If new connection to Event Server is successfully established, run_cb is called with new instance of EventerClient.

run_cb is called when: * new active EventerClient is created

run_cb execution is cancelled when: * run_eventer_client finishes execution * connection to Event Server is closed * different active Event Server is detected from Monitor Server's list of components

run_eventer_client finishes execution when: * connection to Monitor Server is closed * run_cb finishes execution (by returning value or raising exception, other than asyncio.CancelledError)

Return value of this function is the same as return value of run_cb. If run_cb finishes by raising exception or if connection to Monitor Server is closed, ConnectionError is raised.

If execution of run_eventer_client is canceled while run_cb is running, connection to Event Server is closed after run_cb cancellation finishes.