hat.event.server.eventer_client_runner
1from collections.abc import Callable 2import asyncio 3import functools 4import logging 5import types 6import typing 7 8from hat import aio 9from hat import json 10from hat import util 11from hat.drivers import tcp 12import hat.monitor.component 13 14from hat.event import common 15from hat.event.server.eventer_client import (SyncedState, 16 create_eventer_client) 17 18 19mlog: logging.Logger = logging.getLogger(__name__) 20"""Module logger""" 21 22 23SyncedCb: typing.TypeAlias = aio.AsyncCallable[ 24 [common.ServerId, SyncedState, int | None], 25 None] 26 27 28class EventerServerData(typing.NamedTuple): 29 server_id: common.ServerId 30 addr: tcp.Address 31 32 33class EventerClientRunner(aio.Resource): 34 35 def __init__(self, 36 conf: json.Data, 37 backend: common.Backend, 38 synced_cb: SyncedCb, 39 reconnect_delay: float = 5): 40 self._conf = conf 41 self._backend = backend 42 self._synced_cb = synced_cb 43 self._reconnect_delay = reconnect_delay 44 self._async_group = aio.Group() 45 self._remote_active = False 46 self._remote_active_cbs = util.CallbackRegistry() 47 self._valid_server_data = set() 48 self._connecting_server_data = set() 49 self._remote_active_server_data = set() 50 51 @property 52 def async_group(self) -> aio.Group: 53 return self._async_group 54 55 @property 56 def remote_active(self) -> bool: 57 return self._remote_active 58 59 def register_remote_active_cb(self, 60 cb: Callable[[bool], None] 61 ) -> util.RegisterCallbackHandle: 62 return self._remote_active_cbs.register(cb) 63 64 def set_monitor_state(self, state: hat.monitor.component.State): 65 self._valid_server_data = set(_get_eventer_server_data( 66 group=self._conf['monitor_component']['group'], 67 server_token=self._conf.get('server_token'), 68 state=state)) 69 70 for server_data in self._valid_server_data: 71 if server_data in self._connecting_server_data: 72 continue 73 74 self.async_group.spawn(self._client_loop, server_data) 75 self._connecting_server_data.add(server_data) 76 77 async def _client_loop(self, server_data): 78 try: 79 mlog.debug("staring eventer client runner loop") 80 while server_data in self._valid_server_data: 81 self._set_client_status(server_data, None) 82 83 try: 84 mlog.debug("creating eventer client") 85 eventer_client = await create_eventer_client( 86 addr=server_data.addr, 87 client_name=f"event/{self._conf['name']}", 88 local_server_id=self._conf['server_id'], 89 remote_server_id=server_data.server_id, 90 backend=self._backend, 91 client_token=self._conf.get('server_token'), 92 status_cb=functools.partial(self._set_client_status, 93 server_data), 94 synced_cb=functools.partial(self._synced_cb, 95 server_data.server_id)) 96 97 except Exception as e: 98 mlog.warning("create eventer client error: %s", e, 99 exc_info=e) 100 101 await asyncio.sleep(self._reconnect_delay) 102 continue 103 104 self._set_client_status(server_data, eventer_client.status) 105 106 try: 107 await eventer_client.wait_closing() 108 109 finally: 110 await aio.uncancellable(eventer_client.async_close()) 111 112 except Exception as e: 113 mlog.error("eventer client runner loop error: %s", e, exc_info=e) 114 self.close() 115 116 finally: 117 mlog.debug("closing eventer client runner loop") 118 self._connecting_server_data.remove(server_data) 119 self._set_client_status(server_data, None) 120 121 def _set_client_status(self, server_data, status): 122 if status is None or status == common.Status.STANDBY: 123 self._remote_active_server_data.discard(server_data) 124 125 else: 126 self._remote_active_server_data.add(server_data) 127 128 remote_active = bool(self._remote_active_server_data) 129 if remote_active == self._remote_active: 130 return 131 132 self._remote_active = remote_active 133 self._remote_active_cbs.notify(remote_active) 134 135 136def _get_eventer_server_data(group, server_token, state): 137 for info in state.components: 138 if info == state.info or info.group != group: 139 continue 140 141 server_id = json.get(info.data, 'server_id') 142 host = json.get(info.data, ['eventer_server', 'host']) 143 port = json.get(info.data, ['eventer_server', 'port']) 144 token = json.get(info.data, 'server_token') 145 if (not isinstance(server_id, int) or 146 not isinstance(host, str) or 147 not isinstance(port, int) or 148 not isinstance(token, (str, types.NoneType))): 149 continue 150 151 if server_token is not None and token != server_token: 152 continue 153 154 yield EventerServerData(server_id=server_id, 155 addr=tcp.Address(host, port))
Module logger
SyncedCb: TypeAlias =
Callable[[int, hat.event.server.eventer_client.SyncedState, int | None], None | Awaitable[None]]
class
EventerServerData(typing.NamedTuple):
EventerServerData(server_id, addr)
class
EventerClientRunner(hat.aio.group.Resource):
34class EventerClientRunner(aio.Resource): 35 36 def __init__(self, 37 conf: json.Data, 38 backend: common.Backend, 39 synced_cb: SyncedCb, 40 reconnect_delay: float = 5): 41 self._conf = conf 42 self._backend = backend 43 self._synced_cb = synced_cb 44 self._reconnect_delay = reconnect_delay 45 self._async_group = aio.Group() 46 self._remote_active = False 47 self._remote_active_cbs = util.CallbackRegistry() 48 self._valid_server_data = set() 49 self._connecting_server_data = set() 50 self._remote_active_server_data = set() 51 52 @property 53 def async_group(self) -> aio.Group: 54 return self._async_group 55 56 @property 57 def remote_active(self) -> bool: 58 return self._remote_active 59 60 def register_remote_active_cb(self, 61 cb: Callable[[bool], None] 62 ) -> util.RegisterCallbackHandle: 63 return self._remote_active_cbs.register(cb) 64 65 def set_monitor_state(self, state: hat.monitor.component.State): 66 self._valid_server_data = set(_get_eventer_server_data( 67 group=self._conf['monitor_component']['group'], 68 server_token=self._conf.get('server_token'), 69 state=state)) 70 71 for server_data in self._valid_server_data: 72 if server_data in self._connecting_server_data: 73 continue 74 75 self.async_group.spawn(self._client_loop, server_data) 76 self._connecting_server_data.add(server_data) 77 78 async def _client_loop(self, server_data): 79 try: 80 mlog.debug("staring eventer client runner loop") 81 while server_data in self._valid_server_data: 82 self._set_client_status(server_data, None) 83 84 try: 85 mlog.debug("creating eventer client") 86 eventer_client = await create_eventer_client( 87 addr=server_data.addr, 88 client_name=f"event/{self._conf['name']}", 89 local_server_id=self._conf['server_id'], 90 remote_server_id=server_data.server_id, 91 backend=self._backend, 92 client_token=self._conf.get('server_token'), 93 status_cb=functools.partial(self._set_client_status, 94 server_data), 95 synced_cb=functools.partial(self._synced_cb, 96 server_data.server_id)) 97 98 except Exception as e: 99 mlog.warning("create eventer client error: %s", e, 100 exc_info=e) 101 102 await asyncio.sleep(self._reconnect_delay) 103 continue 104 105 self._set_client_status(server_data, eventer_client.status) 106 107 try: 108 await eventer_client.wait_closing() 109 110 finally: 111 await aio.uncancellable(eventer_client.async_close()) 112 113 except Exception as e: 114 mlog.error("eventer client runner loop error: %s", e, exc_info=e) 115 self.close() 116 117 finally: 118 mlog.debug("closing eventer client runner loop") 119 self._connecting_server_data.remove(server_data) 120 self._set_client_status(server_data, None) 121 122 def _set_client_status(self, server_data, status): 123 if status is None or status == common.Status.STANDBY: 124 self._remote_active_server_data.discard(server_data) 125 126 else: 127 self._remote_active_server_data.add(server_data) 128 129 remote_active = bool(self._remote_active_server_data) 130 if remote_active == self._remote_active: 131 return 132 133 self._remote_active = remote_active 134 self._remote_active_cbs.notify(remote_active)
Resource with lifetime control based on Group.
EventerClientRunner( conf: None | bool | int | float | str | List[ForwardRef('Data')] | Dict[str, ForwardRef('Data')], backend: hat.event.common.Backend, synced_cb: Callable[[int, hat.event.server.eventer_client.SyncedState, int | None], None | Awaitable[None]], reconnect_delay: float = 5)
36 def __init__(self, 37 conf: json.Data, 38 backend: common.Backend, 39 synced_cb: SyncedCb, 40 reconnect_delay: float = 5): 41 self._conf = conf 42 self._backend = backend 43 self._synced_cb = synced_cb 44 self._reconnect_delay = reconnect_delay 45 self._async_group = aio.Group() 46 self._remote_active = False 47 self._remote_active_cbs = util.CallbackRegistry() 48 self._valid_server_data = set() 49 self._connecting_server_data = set() 50 self._remote_active_server_data = set()
def
register_remote_active_cb( self, cb: Callable[[bool], None]) -> hat.util.callback.RegisterCallbackHandle:
def
set_monitor_state(self, state: hat.monitor.observer.client.State):
65 def set_monitor_state(self, state: hat.monitor.component.State): 66 self._valid_server_data = set(_get_eventer_server_data( 67 group=self._conf['monitor_component']['group'], 68 server_token=self._conf.get('server_token'), 69 state=state)) 70 71 for server_data in self._valid_server_data: 72 if server_data in self._connecting_server_data: 73 continue 74 75 self.async_group.spawn(self._client_loop, server_data) 76 self._connecting_server_data.add(server_data)