hat.event.server.eventer_client_runner
1from collections.abc import Callable 2import asyncio 3import functools 4import logging 5import types 6import typing 7 8from hat import aio 9from hat import json 10from hat import util 11from hat.drivers import tcp 12import hat.monitor.component 13 14from hat.event import common 15from hat.event.server.eventer_client import (SyncedState, 16 create_eventer_client) 17 18 19mlog: logging.Logger = logging.getLogger(__name__) 20"""Module logger""" 21 22 23SyncedCb: typing.TypeAlias = aio.AsyncCallable[ 24 [common.ServerId, SyncedState, int | None], 25 None] 26 27 28class EventerServerData(typing.NamedTuple): 29 server_id: common.ServerId 30 addr: tcp.Address 31 32 33class EventerClientRunner(aio.Resource): 34 35 def __init__(self, 36 conf: json.Data, 37 backend: common.Backend, 38 synced_cb: SyncedCb, 39 reconnect_delay: float = 5): 40 self._conf = conf 41 self._backend = backend 42 self._synced_cb = synced_cb 43 self._reconnect_delay = reconnect_delay 44 self._async_group = aio.Group() 45 self._remote_active = False 46 self._remote_active_cbs = util.CallbackRegistry() 47 self._valid_server_data = set() 48 self._connecting_server_data = set() 49 self._remote_active_server_data = set() 50 51 @property 52 def async_group(self) -> aio.Group: 53 return self._async_group 54 55 @property 56 def remote_active(self) -> bool: 57 return self._remote_active 58 59 def register_remote_active_cb(self, 60 cb: Callable[[bool], None] 61 ) -> util.RegisterCallbackHandle: 62 return self._remote_active_cbs.register(cb) 63 64 def set_monitor_state(self, state: hat.monitor.component.State): 65 self._valid_server_data = set(_get_eventer_server_data( 66 group=self._conf['monitor_component']['group'], 67 server_token=self._conf.get('server_token'), 68 state=state)) 69 70 for server_data in self._valid_server_data: 71 if server_data in self._connecting_server_data: 72 continue 73 74 self.async_group.spawn(self._client_loop, server_data) 75 self._connecting_server_data.add(server_data) 76 77 async def _client_loop(self, server_data): 78 try: 79 mlog.debug("staring eventer client runner loop") 80 while server_data in self._valid_server_data: 81 self._set_client_status(server_data, None) 82 83 try: 84 mlog.debug("creating eventer client") 85 eventer_client = await create_eventer_client( 86 addr=server_data.addr, 87 client_name=f"event/{self._conf['name']}", 88 local_server_id=self._conf['server_id'], 89 remote_server_id=server_data.server_id, 90 backend=self._backend, 91 client_token=self._conf.get('server_token'), 92 status_cb=functools.partial(self._set_client_status, 93 server_data), 94 synced_cb=functools.partial(self._synced_cb, 95 server_data.server_id)) 96 97 except Exception: 98 await asyncio.sleep(self._reconnect_delay) 99 continue 100 101 self._set_client_status(server_data, eventer_client.status) 102 103 try: 104 await eventer_client.wait_closing() 105 106 finally: 107 await aio.uncancellable(eventer_client.async_close()) 108 109 except Exception as e: 110 mlog.error("eventer client runner loop error: %s", e, exc_info=e) 111 self.close() 112 113 finally: 114 mlog.debug("closing eventer client runner loop") 115 self._connecting_server_data.remove(server_data) 116 self._set_client_status(server_data, None) 117 118 def _set_client_status(self, server_data, status): 119 if status is None or status == common.Status.STANDBY: 120 self._remote_active_server_data.discard(server_data) 121 122 else: 123 self._remote_active_server_data.add(server_data) 124 125 remote_active = bool(self._remote_active_server_data) 126 if remote_active == self._remote_active: 127 return 128 129 self._remote_active = remote_active 130 self._remote_active_cbs.notify(remote_active) 131 132 133def _get_eventer_server_data(group, server_token, state): 134 for info in state.components: 135 if info == state.info or info.group != group: 136 continue 137 138 server_id = json.get(info.data, 'server_id') 139 host = json.get(info.data, ['eventer_server', 'host']) 140 port = json.get(info.data, ['eventer_server', 'port']) 141 token = json.get(info.data, 'server_token') 142 if (not isinstance(server_id, int) or 143 not isinstance(host, str) or 144 not isinstance(port, int) or 145 not isinstance(token, (str, types.NoneType))): 146 continue 147 148 if server_token is not None and token != server_token: 149 continue 150 151 yield EventerServerData(server_id=server_id, 152 addr=tcp.Address(host, port))
Module logger
SyncedCb: TypeAlias =
Callable[[int, hat.event.server.eventer_client.SyncedState, int | None], None | Awaitable[None]]
class
EventerServerData(typing.NamedTuple):
EventerServerData(server_id, addr)
class
EventerClientRunner(hat.aio.group.Resource):
34class EventerClientRunner(aio.Resource): 35 36 def __init__(self, 37 conf: json.Data, 38 backend: common.Backend, 39 synced_cb: SyncedCb, 40 reconnect_delay: float = 5): 41 self._conf = conf 42 self._backend = backend 43 self._synced_cb = synced_cb 44 self._reconnect_delay = reconnect_delay 45 self._async_group = aio.Group() 46 self._remote_active = False 47 self._remote_active_cbs = util.CallbackRegistry() 48 self._valid_server_data = set() 49 self._connecting_server_data = set() 50 self._remote_active_server_data = set() 51 52 @property 53 def async_group(self) -> aio.Group: 54 return self._async_group 55 56 @property 57 def remote_active(self) -> bool: 58 return self._remote_active 59 60 def register_remote_active_cb(self, 61 cb: Callable[[bool], None] 62 ) -> util.RegisterCallbackHandle: 63 return self._remote_active_cbs.register(cb) 64 65 def set_monitor_state(self, state: hat.monitor.component.State): 66 self._valid_server_data = set(_get_eventer_server_data( 67 group=self._conf['monitor_component']['group'], 68 server_token=self._conf.get('server_token'), 69 state=state)) 70 71 for server_data in self._valid_server_data: 72 if server_data in self._connecting_server_data: 73 continue 74 75 self.async_group.spawn(self._client_loop, server_data) 76 self._connecting_server_data.add(server_data) 77 78 async def _client_loop(self, server_data): 79 try: 80 mlog.debug("staring eventer client runner loop") 81 while server_data in self._valid_server_data: 82 self._set_client_status(server_data, None) 83 84 try: 85 mlog.debug("creating eventer client") 86 eventer_client = await create_eventer_client( 87 addr=server_data.addr, 88 client_name=f"event/{self._conf['name']}", 89 local_server_id=self._conf['server_id'], 90 remote_server_id=server_data.server_id, 91 backend=self._backend, 92 client_token=self._conf.get('server_token'), 93 status_cb=functools.partial(self._set_client_status, 94 server_data), 95 synced_cb=functools.partial(self._synced_cb, 96 server_data.server_id)) 97 98 except Exception: 99 await asyncio.sleep(self._reconnect_delay) 100 continue 101 102 self._set_client_status(server_data, eventer_client.status) 103 104 try: 105 await eventer_client.wait_closing() 106 107 finally: 108 await aio.uncancellable(eventer_client.async_close()) 109 110 except Exception as e: 111 mlog.error("eventer client runner loop error: %s", e, exc_info=e) 112 self.close() 113 114 finally: 115 mlog.debug("closing eventer client runner loop") 116 self._connecting_server_data.remove(server_data) 117 self._set_client_status(server_data, None) 118 119 def _set_client_status(self, server_data, status): 120 if status is None or status == common.Status.STANDBY: 121 self._remote_active_server_data.discard(server_data) 122 123 else: 124 self._remote_active_server_data.add(server_data) 125 126 remote_active = bool(self._remote_active_server_data) 127 if remote_active == self._remote_active: 128 return 129 130 self._remote_active = remote_active 131 self._remote_active_cbs.notify(remote_active)
Resource with lifetime control based on Group
.
EventerClientRunner( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], backend: hat.event.common.Backend, synced_cb: Callable[[int, hat.event.server.eventer_client.SyncedState, int | None], None | Awaitable[None]], reconnect_delay: float = 5)
36 def __init__(self, 37 conf: json.Data, 38 backend: common.Backend, 39 synced_cb: SyncedCb, 40 reconnect_delay: float = 5): 41 self._conf = conf 42 self._backend = backend 43 self._synced_cb = synced_cb 44 self._reconnect_delay = reconnect_delay 45 self._async_group = aio.Group() 46 self._remote_active = False 47 self._remote_active_cbs = util.CallbackRegistry() 48 self._valid_server_data = set() 49 self._connecting_server_data = set() 50 self._remote_active_server_data = set()
def
register_remote_active_cb( self, cb: Callable[[bool], None]) -> hat.util.callback.RegisterCallbackHandle:
def
set_monitor_state(self, state: hat.monitor.observer.client.State):
65 def set_monitor_state(self, state: hat.monitor.component.State): 66 self._valid_server_data = set(_get_eventer_server_data( 67 group=self._conf['monitor_component']['group'], 68 server_token=self._conf.get('server_token'), 69 state=state)) 70 71 for server_data in self._valid_server_data: 72 if server_data in self._connecting_server_data: 73 continue 74 75 self.async_group.spawn(self._client_loop, server_data) 76 self._connecting_server_data.add(server_data)