hat.event.server.eventer_client_runner

  1from collections.abc import Callable
  2import asyncio
  3import functools
  4import logging
  5import types
  6import typing
  7
  8from hat import aio
  9from hat import json
 10from hat import util
 11from hat.drivers import tcp
 12import hat.monitor.component
 13
 14from hat.event import common
 15from hat.event.server.eventer_client import (SyncedState,
 16                                             create_eventer_client)
 17
 18
 19mlog: logging.Logger = logging.getLogger(__name__)
 20"""Module logger"""
 21
 22
 23SyncedCb: typing.TypeAlias = aio.AsyncCallable[
 24    [common.ServerId, SyncedState, int | None],
 25    None]
 26
 27
 28class EventerServerData(typing.NamedTuple):
 29    server_id: common.ServerId
 30    addr: tcp.Address
 31
 32
 33class EventerClientRunner(aio.Resource):
 34
 35    def __init__(self,
 36                 conf: json.Data,
 37                 backend: common.Backend,
 38                 synced_cb: SyncedCb,
 39                 reconnect_delay: float = 5):
 40        self._conf = conf
 41        self._backend = backend
 42        self._synced_cb = synced_cb
 43        self._reconnect_delay = reconnect_delay
 44        self._async_group = aio.Group()
 45        self._remote_active = False
 46        self._remote_active_cbs = util.CallbackRegistry()
 47        self._valid_server_data = set()
 48        self._connecting_server_data = set()
 49        self._remote_active_server_data = set()
 50
 51    @property
 52    def async_group(self) -> aio.Group:
 53        return self._async_group
 54
 55    @property
 56    def remote_active(self) -> bool:
 57        return self._remote_active
 58
 59    def register_remote_active_cb(self,
 60                                  cb: Callable[[bool], None]
 61                                  ) -> util.RegisterCallbackHandle:
 62        return self._remote_active_cbs.register(cb)
 63
 64    def set_monitor_state(self, state: hat.monitor.component.State):
 65        self._valid_server_data = set(_get_eventer_server_data(
 66            group=self._conf['monitor_component']['group'],
 67            server_token=self._conf.get('server_token'),
 68            state=state))
 69
 70        for server_data in self._valid_server_data:
 71            if server_data in self._connecting_server_data:
 72                continue
 73
 74            self.async_group.spawn(self._client_loop, server_data)
 75            self._connecting_server_data.add(server_data)
 76
 77    async def _client_loop(self, server_data):
 78        try:
 79            mlog.debug("staring eventer client runner loop")
 80            while server_data in self._valid_server_data:
 81                self._set_client_status(server_data, None)
 82
 83                try:
 84                    mlog.debug("creating eventer client")
 85                    eventer_client = await create_eventer_client(
 86                        addr=server_data.addr,
 87                        client_name=f"event/{self._conf['name']}",
 88                        local_server_id=self._conf['server_id'],
 89                        remote_server_id=server_data.server_id,
 90                        backend=self._backend,
 91                        client_token=self._conf.get('server_token'),
 92                        status_cb=functools.partial(self._set_client_status,
 93                                                    server_data),
 94                        synced_cb=functools.partial(self._synced_cb,
 95                                                    server_data.server_id))
 96
 97                except Exception:
 98                    await asyncio.sleep(self._reconnect_delay)
 99                    continue
100
101                self._set_client_status(server_data, eventer_client.status)
102
103                try:
104                    await eventer_client.wait_closing()
105
106                finally:
107                    await aio.uncancellable(eventer_client.async_close())
108
109        except Exception as e:
110            mlog.error("eventer client runner loop error: %s", e, exc_info=e)
111            self.close()
112
113        finally:
114            mlog.debug("closing eventer client runner loop")
115            self._connecting_server_data.remove(server_data)
116            self._set_client_status(server_data, None)
117
118    def _set_client_status(self, server_data, status):
119        if status is None or status == common.Status.STANDBY:
120            self._remote_active_server_data.discard(server_data)
121
122        else:
123            self._remote_active_server_data.add(server_data)
124
125        remote_active = bool(self._remote_active_server_data)
126        if remote_active == self._remote_active:
127            return
128
129        self._remote_active = remote_active
130        self._remote_active_cbs.notify(remote_active)
131
132
133def _get_eventer_server_data(group, server_token, state):
134    for info in state.components:
135        if info == state.info or info.group != group:
136            continue
137
138        server_id = json.get(info.data, 'server_id')
139        host = json.get(info.data, ['eventer_server', 'host'])
140        port = json.get(info.data, ['eventer_server', 'port'])
141        token = json.get(info.data, 'server_token')
142        if (not isinstance(server_id, int) or
143                not isinstance(host, str) or
144                not isinstance(port, int) or
145                not isinstance(token, (str, types.NoneType))):
146            continue
147
148        if server_token is not None and token != server_token:
149            continue
150
151        yield EventerServerData(server_id=server_id,
152                                addr=tcp.Address(host, port))
mlog: logging.Logger = <Logger hat.event.server.eventer_client_runner (WARNING)>

Module logger

SyncedCb: TypeAlias = Callable[[int, hat.event.server.eventer_client.SyncedState, int | None], None | Awaitable[None]]
class EventerServerData(typing.NamedTuple):
29class EventerServerData(typing.NamedTuple):
30    server_id: common.ServerId
31    addr: tcp.Address

EventerServerData(server_id, addr)

EventerServerData(server_id: int, addr: hat.drivers.tcp.Address)

Create new instance of EventerServerData(server_id, addr)

server_id: int

Alias for field number 0

addr: hat.drivers.tcp.Address

Alias for field number 1

class EventerClientRunner(hat.aio.group.Resource):
 34class EventerClientRunner(aio.Resource):
 35
 36    def __init__(self,
 37                 conf: json.Data,
 38                 backend: common.Backend,
 39                 synced_cb: SyncedCb,
 40                 reconnect_delay: float = 5):
 41        self._conf = conf
 42        self._backend = backend
 43        self._synced_cb = synced_cb
 44        self._reconnect_delay = reconnect_delay
 45        self._async_group = aio.Group()
 46        self._remote_active = False
 47        self._remote_active_cbs = util.CallbackRegistry()
 48        self._valid_server_data = set()
 49        self._connecting_server_data = set()
 50        self._remote_active_server_data = set()
 51
 52    @property
 53    def async_group(self) -> aio.Group:
 54        return self._async_group
 55
 56    @property
 57    def remote_active(self) -> bool:
 58        return self._remote_active
 59
 60    def register_remote_active_cb(self,
 61                                  cb: Callable[[bool], None]
 62                                  ) -> util.RegisterCallbackHandle:
 63        return self._remote_active_cbs.register(cb)
 64
 65    def set_monitor_state(self, state: hat.monitor.component.State):
 66        self._valid_server_data = set(_get_eventer_server_data(
 67            group=self._conf['monitor_component']['group'],
 68            server_token=self._conf.get('server_token'),
 69            state=state))
 70
 71        for server_data in self._valid_server_data:
 72            if server_data in self._connecting_server_data:
 73                continue
 74
 75            self.async_group.spawn(self._client_loop, server_data)
 76            self._connecting_server_data.add(server_data)
 77
 78    async def _client_loop(self, server_data):
 79        try:
 80            mlog.debug("staring eventer client runner loop")
 81            while server_data in self._valid_server_data:
 82                self._set_client_status(server_data, None)
 83
 84                try:
 85                    mlog.debug("creating eventer client")
 86                    eventer_client = await create_eventer_client(
 87                        addr=server_data.addr,
 88                        client_name=f"event/{self._conf['name']}",
 89                        local_server_id=self._conf['server_id'],
 90                        remote_server_id=server_data.server_id,
 91                        backend=self._backend,
 92                        client_token=self._conf.get('server_token'),
 93                        status_cb=functools.partial(self._set_client_status,
 94                                                    server_data),
 95                        synced_cb=functools.partial(self._synced_cb,
 96                                                    server_data.server_id))
 97
 98                except Exception:
 99                    await asyncio.sleep(self._reconnect_delay)
100                    continue
101
102                self._set_client_status(server_data, eventer_client.status)
103
104                try:
105                    await eventer_client.wait_closing()
106
107                finally:
108                    await aio.uncancellable(eventer_client.async_close())
109
110        except Exception as e:
111            mlog.error("eventer client runner loop error: %s", e, exc_info=e)
112            self.close()
113
114        finally:
115            mlog.debug("closing eventer client runner loop")
116            self._connecting_server_data.remove(server_data)
117            self._set_client_status(server_data, None)
118
119    def _set_client_status(self, server_data, status):
120        if status is None or status == common.Status.STANDBY:
121            self._remote_active_server_data.discard(server_data)
122
123        else:
124            self._remote_active_server_data.add(server_data)
125
126        remote_active = bool(self._remote_active_server_data)
127        if remote_active == self._remote_active:
128            return
129
130        self._remote_active = remote_active
131        self._remote_active_cbs.notify(remote_active)

Resource with lifetime control based on Group.

EventerClientRunner( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], backend: hat.event.common.Backend, synced_cb: Callable[[int, hat.event.server.eventer_client.SyncedState, int | None], None | Awaitable[None]], reconnect_delay: float = 5)
36    def __init__(self,
37                 conf: json.Data,
38                 backend: common.Backend,
39                 synced_cb: SyncedCb,
40                 reconnect_delay: float = 5):
41        self._conf = conf
42        self._backend = backend
43        self._synced_cb = synced_cb
44        self._reconnect_delay = reconnect_delay
45        self._async_group = aio.Group()
46        self._remote_active = False
47        self._remote_active_cbs = util.CallbackRegistry()
48        self._valid_server_data = set()
49        self._connecting_server_data = set()
50        self._remote_active_server_data = set()
async_group: hat.aio.group.Group
52    @property
53    def async_group(self) -> aio.Group:
54        return self._async_group

Group controlling resource's lifetime.

remote_active: bool
56    @property
57    def remote_active(self) -> bool:
58        return self._remote_active
def register_remote_active_cb( self, cb: Callable[[bool], None]) -> hat.util.callback.RegisterCallbackHandle:
60    def register_remote_active_cb(self,
61                                  cb: Callable[[bool], None]
62                                  ) -> util.RegisterCallbackHandle:
63        return self._remote_active_cbs.register(cb)
def set_monitor_state(self, state: hat.monitor.observer.client.State):
65    def set_monitor_state(self, state: hat.monitor.component.State):
66        self._valid_server_data = set(_get_eventer_server_data(
67            group=self._conf['monitor_component']['group'],
68            server_token=self._conf.get('server_token'),
69            state=state))
70
71        for server_data in self._valid_server_data:
72            if server_data in self._connecting_server_data:
73                continue
74
75            self.async_group.spawn(self._client_loop, server_data)
76            self._connecting_server_data.add(server_data)