hat.event.component

Eventer Component

  1"""Eventer Component"""
  2
  3from collections.abc import Collection
  4import asyncio
  5import contextlib
  6import logging
  7import types
  8import typing
  9
 10from hat import aio
 11from hat import json
 12from hat import util
 13from hat.drivers import tcp
 14import hat.monitor.component
 15
 16from hat.event import common
 17import hat.event.eventer.client
 18
 19
 20mlog: logging.Logger = logging.getLogger(__name__)
 21"""Module logger"""
 22
 23State: typing.TypeAlias = hat.monitor.component.State
 24"""Component state"""
 25
 26Runner: typing.TypeAlias = aio.Resource
 27"""Component runner"""
 28
 29RunnerCb: typing.TypeAlias = aio.AsyncCallable[
 30    ['Component', 'ServerData', hat.event.eventer.client.Client],
 31    Runner]
 32"""Runner callback"""
 33
 34StateCb: typing.TypeAlias = aio.AsyncCallable[['Component', State], None]
 35"""State callback"""
 36
 37CloseReqCb: typing.TypeAlias = aio.AsyncCallable[['Component'], None]
 38"""Close request callback"""
 39
 40StatusCb: typing.TypeAlias = aio.AsyncCallable[
 41    ['Component', hat.event.eventer.client.Client, common.Status],
 42    None]
 43"""Status callback"""
 44
 45EventsCb: typing.TypeAlias = aio.AsyncCallable[
 46    ['Component', hat.event.eventer.client.Client, Collection[common.Event]],
 47    None]
 48"""Events callback"""
 49
 50
 51class ServerData(typing.NamedTuple):
 52    """Server data"""
 53    server_id: common.ServerId
 54    addr: tcp.Address
 55    server_token: str | None
 56
 57
 58async def connect(addr: tcp.Address,
 59                  name: str,
 60                  group: str,
 61                  server_group: str,
 62                  client_name: str,
 63                  runner_cb: RunnerCb,
 64                  *,
 65                  state_cb: StateCb | None = None,
 66                  close_req_cb: CloseReqCb | None = None,
 67                  status_cb: StatusCb | None = None,
 68                  events_cb: EventsCb | None = None,
 69                  server_data_queue_size: int = 1024,
 70                  reconnect_delay: float = 0.5,
 71                  observer_kwargs: dict[str, typing.Any] = {},
 72                  eventer_kwargs: dict[str, typing.Any] = {}
 73                  ) -> 'Component':
 74    """Connect to local monitor server and create component
 75
 76    High-level interface for communication with Event Server, based on
 77    information obtained from Monitor Server.
 78
 79    Component instance tries to establish active connection with
 80    Event Server within monitor component group `server_group`. Once this
 81    connection is established, `runner_cb` is called with currently active
 82    eventer client instance. Result of calling `runner_cb` should be runner
 83    representing user defined components activity associated with connection
 84    to active Event Server. Once connection to Event Server is closed or new
 85    active Event Server is detected, associated runner is closed. If new
 86    connection to Event Server is successfully established,
 87    `component_cb` will be called again to create new runner associated with
 88    new instance of eventer client.
 89
 90    If runner is closed while connection to Event Server is open, component
 91    is closed.
 92
 93    """
 94    component = Component()
 95    component._server_group = server_group
 96    component._client_name = client_name
 97    component._runner_cb = runner_cb
 98    component._state_cb = state_cb
 99    component._close_req_cb = close_req_cb
100    component._status_cb = status_cb
101    component._events_cb = events_cb
102    component._server_data_queue_size = server_data_queue_size
103    component._reconnect_delay = reconnect_delay
104    component._eventer_kwargs = eventer_kwargs
105    component._server_data = None
106    component._server_data_queue = None
107
108    component._monitor_component = await hat.monitor.component.connect(
109        addr=addr,
110        name=name,
111        group=group,
112        runner_cb=component._create_monitor_runner,
113        **{**observer_kwargs,
114           'state_cb': component._on_state,
115           'close_req_cb': component._on_close_req})
116
117    return component
118
119
120class Component(aio.Resource):
121    """Eventer Component"""
122
123    @property
124    def async_group(self):
125        return self._monitor_component.async_group
126
127    @property
128    def state(self) -> State:
129        """State"""
130        return self._monitor_component.state
131
132    @property
133    def ready(self) -> bool:
134        """Ready"""
135        return self._monitor_component.ready
136
137    async def set_ready(self, ready: bool):
138        """Set ready"""
139        await self._monitor_component.set_ready(ready)
140
141    async def _on_state(self, monitor_component, state):
142        if self._server_data_queue is not None:
143            data = self._get_active_server_data(state)
144
145            if data != self._server_data:
146                self._server_data = data
147
148                with contextlib.suppress(aio.QueueClosedError):
149                    await self._server_data_queue.put(data)
150
151        if self._state_cb:
152            await aio.call(self._state_cb, self, state)
153
154    async def _on_close_req(self, monitor_component):
155        if self._close_req_cb:
156            await aio.call(self._close_req_cb, self)
157
158    async def _on_status(self, eventer_client, status):
159        if self._status_cb:
160            await aio.call(self._status_cb, self, eventer_client, status)
161
162    async def _on_events(self, eventer_client, events):
163        if self._events_cb:
164            await aio.call(self._events_cb, self, eventer_client, events)
165
166    def _create_monitor_runner(self, monitor_component):
167        self._server_data = self._get_active_server_data(
168            monitor_component.state)
169        self._server_data_queue = aio.Queue(self._server_data_queue_size)
170
171        if self._server_data:
172            self._server_data_queue.put_nowait(self._server_data)
173
174        runner = aio.Group()
175        runner.spawn(self._server_data_loop, runner, self._server_data_queue)
176
177        return runner
178
179    def _get_active_server_data(self, state):
180        info = util.first(state.components, self._active_server_filter)
181        if not info:
182            return
183
184        server_id = json.get(info.data, 'server_id')
185        host = json.get(info.data, ['eventer_server', 'host'])
186        port = json.get(info.data, ['eventer_server', 'port'])
187        server_token = json.get(info.data, 'server_token')
188        if (not isinstance(server_id, int) or
189                not isinstance(host, str) or
190                not isinstance(port, int) or
191                not isinstance(server_token, (str, types.NoneType))):
192            return
193
194        client_token = self._eventer_kwargs.get('client_token')
195        if client_token is not None and client_token != server_token:
196            return
197
198        return ServerData(server_id=server_id,
199                          addr=tcp.Address(host, port),
200                          server_token=server_token)
201
202    def _active_server_filter(self, info):
203        return (info.group == self._server_group and
204                info.blessing_req.token is not None and
205                info.blessing_req.token == info.blessing_res.token)
206
207    async def _server_data_loop(self, async_group, server_data_queue):
208        try:
209            server_data = None
210            while True:
211                while not server_data or not server_data_queue.empty():
212                    server_data = await server_data_queue.get_until_empty()
213
214                async with async_group.create_subgroup() as subgroup:
215                    subgroup.spawn(self._client_loop, subgroup, server_data)
216                    server_data = await server_data_queue.get_until_empty()
217
218        except Exception as e:
219            mlog.error("address loop error: %s", e, exc_info=e)
220            self.close()
221
222        finally:
223            async_group.close()
224            server_data_queue.close()
225
226    async def _client_loop(self, async_group, server_data):
227        try:
228            while True:
229                try:
230                    kwargs = {**self._eventer_kwargs,
231                              'status_cb': self._on_status,
232                              'events_cb': self._on_events}
233
234                    if 'server_id' not in kwargs:
235                        kwargs['server_id'] = server_data.server_id
236
237                    mlog.debug("connecting to server %s", server_data.addr)
238                    client = await hat.event.eventer.client.connect(
239                        addr=server_data.addr,
240                        client_name=self._client_name,
241                        **kwargs)
242
243                except Exception as e:
244                    mlog.warning("error connecting to server: %s", e,
245                                 exc_info=e)
246                    await asyncio.sleep(self._reconnect_delay)
247                    continue
248
249                try:
250                    mlog.debug("connected to server")
251                    runner = await client.async_group.spawn(
252                        aio.call, self._runner_cb, self, server_data, client)
253
254                    try:
255                        async with async_group.create_subgroup() as subgroup:
256                            client_closing_task = subgroup.spawn(
257                                client.wait_closing)
258                            runner_closing_task = subgroup.spawn(
259                                runner.wait_closing)
260
261                            await asyncio.wait(
262                                [client_closing_task, runner_closing_task],
263                                return_when=asyncio.FIRST_COMPLETED)
264
265                            if (runner_closing_task.done() and
266                                    not client_closing_task.done()):
267                                self.close()
268                                return
269
270                    finally:
271                        await aio.uncancellable(runner.async_close())
272
273                finally:
274                    await aio.uncancellable(client.async_close())
275
276                mlog.debug("connection to server closed")
277                await asyncio.sleep(self._reconnect_delay)
278
279        except Exception as e:
280            mlog.error("client loop error: %s", e, exc_info=e)
281            self.close()
282
283        finally:
284            async_group.close()
mlog: logging.Logger = <Logger hat.event.component (WARNING)>

Module logger

class State(typing.NamedTuple):
26class State(typing.NamedTuple):
27    """Client state"""
28    info: common.ComponentInfo | None
29    components: list[common.ComponentInfo]

Component state

State( info: hat.monitor.common.ComponentInfo | None, components: list[hat.monitor.common.ComponentInfo])

Create new instance of State(info, components)

info: hat.monitor.common.ComponentInfo | None

Alias for field number 0

components: list[hat.monitor.common.ComponentInfo]

Alias for field number 1

Runner: TypeAlias = hat.aio.group.Resource

Component runner

RunnerCb: TypeAlias = Callable[[ForwardRef('Component'), ForwardRef('ServerData'), hat.event.eventer.Client], Union[hat.aio.group.Resource, Awaitable[hat.aio.group.Resource]]]

Runner callback

StateCb: TypeAlias = Callable[[ForwardRef('Component'), hat.monitor.observer.client.State], Optional[Awaitable[NoneType]]]

State callback

CloseReqCb: TypeAlias = Callable[[ForwardRef('Component')], Optional[Awaitable[NoneType]]]

Close request callback

StatusCb: TypeAlias = Callable[[ForwardRef('Component'), hat.event.eventer.Client, hat.event.common.Status], Optional[Awaitable[NoneType]]]

Status callback

EventsCb: TypeAlias = Callable[[ForwardRef('Component'), hat.event.eventer.Client, Collection[hat.event.common.Event]], Optional[Awaitable[NoneType]]]

Events callback

class ServerData(typing.NamedTuple):
52class ServerData(typing.NamedTuple):
53    """Server data"""
54    server_id: common.ServerId
55    addr: tcp.Address
56    server_token: str | None

Server data

ServerData( server_id: int, addr: hat.drivers.tcp.Address, server_token: str | None)

Create new instance of ServerData(server_id, addr, server_token)

server_id: int

Alias for field number 0

addr: hat.drivers.tcp.Address

Alias for field number 1

server_token: str | None

Alias for field number 2

async def connect( addr: hat.drivers.tcp.Address, name: str, group: str, server_group: str, client_name: str, runner_cb: Callable[[Component, ServerData, hat.event.eventer.Client], Union[hat.aio.group.Resource, Awaitable[hat.aio.group.Resource]]], *, state_cb: Optional[Callable[[Component, hat.monitor.observer.client.State], Optional[Awaitable[NoneType]]]] = None, close_req_cb: Optional[Callable[[Component], Optional[Awaitable[NoneType]]]] = None, status_cb: Optional[Callable[[Component, hat.event.eventer.Client, hat.event.common.Status], Optional[Awaitable[NoneType]]]] = None, events_cb: Optional[Callable[[Component, hat.event.eventer.Client, Collection[hat.event.common.Event]], Optional[Awaitable[NoneType]]]] = None, server_data_queue_size: int = 1024, reconnect_delay: float = 0.5, observer_kwargs: dict[str, typing.Any] = {}, eventer_kwargs: dict[str, typing.Any] = {}) -> Component:
 59async def connect(addr: tcp.Address,
 60                  name: str,
 61                  group: str,
 62                  server_group: str,
 63                  client_name: str,
 64                  runner_cb: RunnerCb,
 65                  *,
 66                  state_cb: StateCb | None = None,
 67                  close_req_cb: CloseReqCb | None = None,
 68                  status_cb: StatusCb | None = None,
 69                  events_cb: EventsCb | None = None,
 70                  server_data_queue_size: int = 1024,
 71                  reconnect_delay: float = 0.5,
 72                  observer_kwargs: dict[str, typing.Any] = {},
 73                  eventer_kwargs: dict[str, typing.Any] = {}
 74                  ) -> 'Component':
 75    """Connect to local monitor server and create component
 76
 77    High-level interface for communication with Event Server, based on
 78    information obtained from Monitor Server.
 79
 80    Component instance tries to establish active connection with
 81    Event Server within monitor component group `server_group`. Once this
 82    connection is established, `runner_cb` is called with currently active
 83    eventer client instance. Result of calling `runner_cb` should be runner
 84    representing user defined components activity associated with connection
 85    to active Event Server. Once connection to Event Server is closed or new
 86    active Event Server is detected, associated runner is closed. If new
 87    connection to Event Server is successfully established,
 88    `component_cb` will be called again to create new runner associated with
 89    new instance of eventer client.
 90
 91    If runner is closed while connection to Event Server is open, component
 92    is closed.
 93
 94    """
 95    component = Component()
 96    component._server_group = server_group
 97    component._client_name = client_name
 98    component._runner_cb = runner_cb
 99    component._state_cb = state_cb
100    component._close_req_cb = close_req_cb
101    component._status_cb = status_cb
102    component._events_cb = events_cb
103    component._server_data_queue_size = server_data_queue_size
104    component._reconnect_delay = reconnect_delay
105    component._eventer_kwargs = eventer_kwargs
106    component._server_data = None
107    component._server_data_queue = None
108
109    component._monitor_component = await hat.monitor.component.connect(
110        addr=addr,
111        name=name,
112        group=group,
113        runner_cb=component._create_monitor_runner,
114        **{**observer_kwargs,
115           'state_cb': component._on_state,
116           'close_req_cb': component._on_close_req})
117
118    return component

Connect to local monitor server and create component

High-level interface for communication with Event Server, based on information obtained from Monitor Server.

Component instance tries to establish active connection with Event Server within monitor component group server_group. Once this connection is established, runner_cb is called with currently active eventer client instance. Result of calling runner_cb should be runner representing user defined components activity associated with connection to active Event Server. Once connection to Event Server is closed or new active Event Server is detected, associated runner is closed. If new connection to Event Server is successfully established, component_cb will be called again to create new runner associated with new instance of eventer client.

If runner is closed while connection to Event Server is open, component is closed.

class Component(hat.aio.group.Resource):
121class Component(aio.Resource):
122    """Eventer Component"""
123
124    @property
125    def async_group(self):
126        return self._monitor_component.async_group
127
128    @property
129    def state(self) -> State:
130        """State"""
131        return self._monitor_component.state
132
133    @property
134    def ready(self) -> bool:
135        """Ready"""
136        return self._monitor_component.ready
137
138    async def set_ready(self, ready: bool):
139        """Set ready"""
140        await self._monitor_component.set_ready(ready)
141
142    async def _on_state(self, monitor_component, state):
143        if self._server_data_queue is not None:
144            data = self._get_active_server_data(state)
145
146            if data != self._server_data:
147                self._server_data = data
148
149                with contextlib.suppress(aio.QueueClosedError):
150                    await self._server_data_queue.put(data)
151
152        if self._state_cb:
153            await aio.call(self._state_cb, self, state)
154
155    async def _on_close_req(self, monitor_component):
156        if self._close_req_cb:
157            await aio.call(self._close_req_cb, self)
158
159    async def _on_status(self, eventer_client, status):
160        if self._status_cb:
161            await aio.call(self._status_cb, self, eventer_client, status)
162
163    async def _on_events(self, eventer_client, events):
164        if self._events_cb:
165            await aio.call(self._events_cb, self, eventer_client, events)
166
167    def _create_monitor_runner(self, monitor_component):
168        self._server_data = self._get_active_server_data(
169            monitor_component.state)
170        self._server_data_queue = aio.Queue(self._server_data_queue_size)
171
172        if self._server_data:
173            self._server_data_queue.put_nowait(self._server_data)
174
175        runner = aio.Group()
176        runner.spawn(self._server_data_loop, runner, self._server_data_queue)
177
178        return runner
179
180    def _get_active_server_data(self, state):
181        info = util.first(state.components, self._active_server_filter)
182        if not info:
183            return
184
185        server_id = json.get(info.data, 'server_id')
186        host = json.get(info.data, ['eventer_server', 'host'])
187        port = json.get(info.data, ['eventer_server', 'port'])
188        server_token = json.get(info.data, 'server_token')
189        if (not isinstance(server_id, int) or
190                not isinstance(host, str) or
191                not isinstance(port, int) or
192                not isinstance(server_token, (str, types.NoneType))):
193            return
194
195        client_token = self._eventer_kwargs.get('client_token')
196        if client_token is not None and client_token != server_token:
197            return
198
199        return ServerData(server_id=server_id,
200                          addr=tcp.Address(host, port),
201                          server_token=server_token)
202
203    def _active_server_filter(self, info):
204        return (info.group == self._server_group and
205                info.blessing_req.token is not None and
206                info.blessing_req.token == info.blessing_res.token)
207
208    async def _server_data_loop(self, async_group, server_data_queue):
209        try:
210            server_data = None
211            while True:
212                while not server_data or not server_data_queue.empty():
213                    server_data = await server_data_queue.get_until_empty()
214
215                async with async_group.create_subgroup() as subgroup:
216                    subgroup.spawn(self._client_loop, subgroup, server_data)
217                    server_data = await server_data_queue.get_until_empty()
218
219        except Exception as e:
220            mlog.error("address loop error: %s", e, exc_info=e)
221            self.close()
222
223        finally:
224            async_group.close()
225            server_data_queue.close()
226
227    async def _client_loop(self, async_group, server_data):
228        try:
229            while True:
230                try:
231                    kwargs = {**self._eventer_kwargs,
232                              'status_cb': self._on_status,
233                              'events_cb': self._on_events}
234
235                    if 'server_id' not in kwargs:
236                        kwargs['server_id'] = server_data.server_id
237
238                    mlog.debug("connecting to server %s", server_data.addr)
239                    client = await hat.event.eventer.client.connect(
240                        addr=server_data.addr,
241                        client_name=self._client_name,
242                        **kwargs)
243
244                except Exception as e:
245                    mlog.warning("error connecting to server: %s", e,
246                                 exc_info=e)
247                    await asyncio.sleep(self._reconnect_delay)
248                    continue
249
250                try:
251                    mlog.debug("connected to server")
252                    runner = await client.async_group.spawn(
253                        aio.call, self._runner_cb, self, server_data, client)
254
255                    try:
256                        async with async_group.create_subgroup() as subgroup:
257                            client_closing_task = subgroup.spawn(
258                                client.wait_closing)
259                            runner_closing_task = subgroup.spawn(
260                                runner.wait_closing)
261
262                            await asyncio.wait(
263                                [client_closing_task, runner_closing_task],
264                                return_when=asyncio.FIRST_COMPLETED)
265
266                            if (runner_closing_task.done() and
267                                    not client_closing_task.done()):
268                                self.close()
269                                return
270
271                    finally:
272                        await aio.uncancellable(runner.async_close())
273
274                finally:
275                    await aio.uncancellable(client.async_close())
276
277                mlog.debug("connection to server closed")
278                await asyncio.sleep(self._reconnect_delay)
279
280        except Exception as e:
281            mlog.error("client loop error: %s", e, exc_info=e)
282            self.close()
283
284        finally:
285            async_group.close()

Eventer Component

async_group
124    @property
125    def async_group(self):
126        return self._monitor_component.async_group

Group controlling resource's lifetime.

state: hat.monitor.observer.client.State
128    @property
129    def state(self) -> State:
130        """State"""
131        return self._monitor_component.state

State

ready: bool
133    @property
134    def ready(self) -> bool:
135        """Ready"""
136        return self._monitor_component.ready

Ready

async def set_ready(self, ready: bool):
138    async def set_ready(self, ready: bool):
139        """Set ready"""
140        await self._monitor_component.set_ready(ready)

Set ready