hat.event.component

Eventer Component

  1"""Eventer Component"""
  2
  3from collections.abc import Collection
  4import asyncio
  5import contextlib
  6import logging
  7import types
  8import typing
  9
 10from hat import aio
 11from hat import json
 12from hat import util
 13from hat.drivers import tcp
 14import hat.monitor.component
 15
 16from hat.event import common
 17import hat.event.eventer.client
 18
 19
 20mlog: logging.Logger = logging.getLogger(__name__)
 21"""Module logger"""
 22
 23State: typing.TypeAlias = hat.monitor.component.State
 24"""Component state"""
 25
 26Runner: typing.TypeAlias = aio.Resource
 27"""Component runner"""
 28
 29RunnerCb: typing.TypeAlias = aio.AsyncCallable[
 30    ['Component', 'ServerData', hat.event.eventer.client.Client],
 31    Runner]
 32"""Runner callback"""
 33
 34StateCb: typing.TypeAlias = aio.AsyncCallable[['Component', State], None]
 35"""State callback"""
 36
 37CloseReqCb: typing.TypeAlias = aio.AsyncCallable[['Component'], None]
 38"""Close request callback"""
 39
 40StatusCb: typing.TypeAlias = aio.AsyncCallable[
 41    ['Component', hat.event.eventer.client.Client, common.Status],
 42    None]
 43"""Status callback"""
 44
 45EventsCb: typing.TypeAlias = aio.AsyncCallable[
 46    ['Component', hat.event.eventer.client.Client, Collection[common.Event]],
 47    None]
 48"""Events callback"""
 49
 50
 51class ServerData(typing.NamedTuple):
 52    """Server data"""
 53    server_id: common.ServerId
 54    addr: tcp.Address
 55    server_token: str | None
 56
 57
 58async def connect(addr: tcp.Address,
 59                  name: str,
 60                  group: str,
 61                  server_group: str,
 62                  runner_cb: RunnerCb,
 63                  *,
 64                  state_cb: StateCb | None = None,
 65                  close_req_cb: CloseReqCb | None = None,
 66                  status_cb: StatusCb | None = None,
 67                  events_cb: EventsCb | None = None,
 68                  server_data_queue_size: int = 1024,
 69                  reconnect_delay: float = 0.5,
 70                  observer_kwargs: dict[str, typing.Any] = {},
 71                  eventer_kwargs: dict[str, typing.Any] = {}
 72                  ) -> 'Component':
 73    """Connect to local monitor server and create component
 74
 75    High-level interface for communication with Event Server, based on
 76    information obtained from Monitor Server.
 77
 78    Component instance tries to establish active connection with
 79    Event Server within monitor component group `server_group`. Once this
 80    connection is established, `runner_cb` is called with currently active
 81    eventer client instance. Result of calling `runner_cb` should be runner
 82    representing user defined components activity associated with connection
 83    to active Event Server. Once connection to Event Server is closed or new
 84    active Event Server is detected, associated runner is closed. If new
 85    connection to Event Server is successfully established,
 86    `component_cb` will be called again to create new runner associated with
 87    new instance of eventer client.
 88
 89    If runner is closed while connection to Event Server is open, component
 90    is closed.
 91
 92    """
 93    component = Component()
 94    component._name = name
 95    component._server_group = server_group
 96    component._runner_cb = runner_cb
 97    component._state_cb = state_cb
 98    component._close_req_cb = close_req_cb
 99    component._status_cb = status_cb
100    component._events_cb = events_cb
101    component._server_data_queue_size = server_data_queue_size
102    component._reconnect_delay = reconnect_delay
103    component._eventer_kwargs = eventer_kwargs
104    component._server_data = None
105    component._server_data_queue = None
106
107    component._monitor_component = await hat.monitor.component.connect(
108        addr=addr,
109        name=name,
110        group=group,
111        runner_cb=component._create_monitor_runner,
112        **{**observer_kwargs,
113           'state_cb': component._on_state,
114           'close_req_cb': component._on_close_req})
115
116    return component
117
118
119class Component(aio.Resource):
120    """Eventer Component"""
121
122    @property
123    def async_group(self):
124        return self._monitor_component.async_group
125
126    @property
127    def state(self) -> State:
128        """State"""
129        return self._monitor_component.state
130
131    @property
132    def ready(self) -> bool:
133        """Ready"""
134        return self._monitor_component.ready
135
136    async def set_ready(self, ready: bool):
137        """Set ready"""
138        await self._monitor_component.set_ready(ready)
139
140    async def _on_state(self, monitor_component, state):
141        if self._server_data_queue is not None:
142            data = self._get_active_server_data(state)
143
144            if data != self._server_data:
145                self._server_data = data
146
147                with contextlib.suppress(aio.QueueClosedError):
148                    await self._server_data_queue.put(data)
149
150        if self._state_cb:
151            await aio.call(self._state_cb, self, state)
152
153    async def _on_close_req(self, monitor_component):
154        if self._close_req_cb:
155            await aio.call(self._close_req_cb, self)
156
157    async def _on_status(self, eventer_client, status):
158        if self._status_cb:
159            await aio.call(self._status_cb, self, eventer_client, status)
160
161    async def _on_events(self, eventer_client, events):
162        if self._events_cb:
163            await aio.call(self._events_cb, self, eventer_client, events)
164
165    def _create_monitor_runner(self, monitor_component):
166        self._server_data = self._get_active_server_data(
167            monitor_component.state)
168        self._server_data_queue = aio.Queue(self._server_data_queue_size)
169
170        if self._server_data:
171            self._server_data_queue.put_nowait(self._server_data)
172
173        runner = aio.Group()
174        runner.spawn(self._server_data_loop, runner, self._server_data_queue)
175
176        return runner
177
178    def _get_active_server_data(self, state):
179        info = util.first(state.components, self._active_server_filter)
180        if not info:
181            return
182
183        server_id = json.get(info.data, 'server_id')
184        host = json.get(info.data, ['eventer_server', 'host'])
185        port = json.get(info.data, ['eventer_server', 'port'])
186        server_token = json.get(info.data, 'server_token')
187        if (not isinstance(server_id, int) or
188                not isinstance(host, str) or
189                not isinstance(port, int) or
190                not isinstance(server_token, (str, types.NoneType))):
191            return
192
193        client_token = self._eventer_kwargs.get('client_token')
194        if client_token is not None and client_token != server_token:
195            return
196
197        return ServerData(server_id=server_id,
198                          addr=tcp.Address(host, port),
199                          server_token=server_token)
200
201    def _active_server_filter(self, info):
202        return (info.group == self._server_group and
203                info.blessing_req.token is not None and
204                info.blessing_req.token == info.blessing_res.token)
205
206    async def _server_data_loop(self, async_group, server_data_queue):
207        try:
208            server_data = None
209            while True:
210                while not server_data and not server_data_queue.empty():
211                    server_data = await server_data_queue.get_until_empty()
212
213                async with async_group.create_subgroup() as subgroup:
214                    subgroup.spawn(self._client_loop, subgroup, server_data)
215                    server_data = await server_data_queue.get_until_empty()
216
217        except Exception as e:
218            mlog.error("address loop error: %s", e, exc_info=e)
219            self.close()
220
221        finally:
222            async_group.close()
223            server_data_queue.close()
224
225    async def _client_loop(self, async_group, server_data):
226        try:
227            while True:
228                try:
229                    kwargs = {**self._eventer_kwargs,
230                              'status_cb': self._on_status,
231                              'events_cb': self._on_events}
232
233                    if 'server_id' not in kwargs:
234                        kwargs['server_id'] = server_data.server_id
235
236                    mlog.debug("connecting to server %s", server_data.addr)
237                    client = await hat.event.eventer.client.connect(
238                        addr=server_data.addr,
239                        client_name=self._name,
240                        **kwargs)
241
242                except Exception as e:
243                    mlog.warning("error connecting to server: %s", e,
244                                 exc_info=e)
245                    await asyncio.sleep(self._reconnect_delay)
246                    continue
247
248                try:
249                    mlog.debug("connected to server")
250                    runner = await client.async_group.spawn(
251                        aio.call, self._runner_cb, self, server_data, client)
252
253                    try:
254                        async with async_group.create_subgroup() as subgroup:
255                            client_closing_task = subgroup.spawn(
256                                client.wait_closing)
257                            runner_closing_task = subgroup.spawn(
258                                runner.wait_closing)
259
260                            await asyncio.wait(
261                                [client_closing_task, runner_closing_task],
262                                return_when=asyncio.FIRST_COMPLETED)
263
264                            if (runner_closing_task.done() and
265                                    not client_closing_task.done()):
266                                self.close()
267                                return
268
269                    finally:
270                        await aio.uncancellable(runner.async_close())
271
272                finally:
273                    await aio.uncancellable(client.async_close())
274
275                mlog.debug("connection to server closed")
276                await asyncio.sleep(self._reconnect_delay)
277
278        except Exception as e:
279            mlog.error("client loop error: %s", e, exc_info=e)
280            self.close()
281
282        finally:
283            async_group.close()
mlog: logging.Logger = <Logger hat.event.component (WARNING)>

Module logger

class State(typing.NamedTuple):
26class State(typing.NamedTuple):
27    """Client state"""
28    info: common.ComponentInfo | None
29    components: list[common.ComponentInfo]

Component state

State( info: hat.monitor.common.ComponentInfo | None, components: list[hat.monitor.common.ComponentInfo])

Create new instance of State(info, components)

info: hat.monitor.common.ComponentInfo | None

Alias for field number 0

components: list[hat.monitor.common.ComponentInfo]

Alias for field number 1

Inherited Members
builtins.tuple
index
count
Runner: TypeAlias = hat.aio.group.Resource

Component runner

RunnerCb: TypeAlias = Callable[[ForwardRef('Component'), ForwardRef('ServerData'), hat.event.eventer.client.Client], Union[hat.aio.group.Resource, Awaitable[hat.aio.group.Resource]]]

Runner callback

StateCb: TypeAlias = Callable[[ForwardRef('Component'), State], Optional[Awaitable[NoneType]]]

State callback

CloseReqCb: TypeAlias = Callable[[ForwardRef('Component')], Optional[Awaitable[NoneType]]]

Close request callback

StatusCb: TypeAlias = Callable[[ForwardRef('Component'), hat.event.eventer.client.Client, hat.event.common.common.Status], Optional[Awaitable[NoneType]]]

Status callback

EventsCb: TypeAlias = Callable[[ForwardRef('Component'), hat.event.eventer.client.Client, collections.abc.Collection[hat.event.common.common.Event]], Optional[Awaitable[NoneType]]]

Events callback

class ServerData(typing.NamedTuple):
52class ServerData(typing.NamedTuple):
53    """Server data"""
54    server_id: common.ServerId
55    addr: tcp.Address
56    server_token: str | None

Server data

ServerData( server_id: int, addr: hat.drivers.tcp.Address, server_token: str | None)

Create new instance of ServerData(server_id, addr, server_token)

server_id: int

Alias for field number 0

addr: hat.drivers.tcp.Address

Alias for field number 1

server_token: str | None

Alias for field number 2

Inherited Members
builtins.tuple
index
count
async def connect( addr: hat.drivers.tcp.Address, name: str, group: str, server_group: str, runner_cb: Callable[[Component, ServerData, hat.event.eventer.client.Client], Union[hat.aio.group.Resource, Awaitable[hat.aio.group.Resource]]], *, state_cb: Optional[Callable[[Component, State], Optional[Awaitable[NoneType]]]] = None, close_req_cb: Optional[Callable[[Component], Optional[Awaitable[NoneType]]]] = None, status_cb: Optional[Callable[[Component, hat.event.eventer.client.Client, hat.event.common.common.Status], Optional[Awaitable[NoneType]]]] = None, events_cb: Optional[Callable[[Component, hat.event.eventer.client.Client, collections.abc.Collection[hat.event.common.common.Event]], Optional[Awaitable[NoneType]]]] = None, server_data_queue_size: int = 1024, reconnect_delay: float = 0.5, observer_kwargs: dict[str, typing.Any] = {}, eventer_kwargs: dict[str, typing.Any] = {}) -> Component:
 59async def connect(addr: tcp.Address,
 60                  name: str,
 61                  group: str,
 62                  server_group: str,
 63                  runner_cb: RunnerCb,
 64                  *,
 65                  state_cb: StateCb | None = None,
 66                  close_req_cb: CloseReqCb | None = None,
 67                  status_cb: StatusCb | None = None,
 68                  events_cb: EventsCb | None = None,
 69                  server_data_queue_size: int = 1024,
 70                  reconnect_delay: float = 0.5,
 71                  observer_kwargs: dict[str, typing.Any] = {},
 72                  eventer_kwargs: dict[str, typing.Any] = {}
 73                  ) -> 'Component':
 74    """Connect to local monitor server and create component
 75
 76    High-level interface for communication with Event Server, based on
 77    information obtained from Monitor Server.
 78
 79    Component instance tries to establish active connection with
 80    Event Server within monitor component group `server_group`. Once this
 81    connection is established, `runner_cb` is called with currently active
 82    eventer client instance. Result of calling `runner_cb` should be runner
 83    representing user defined components activity associated with connection
 84    to active Event Server. Once connection to Event Server is closed or new
 85    active Event Server is detected, associated runner is closed. If new
 86    connection to Event Server is successfully established,
 87    `component_cb` will be called again to create new runner associated with
 88    new instance of eventer client.
 89
 90    If runner is closed while connection to Event Server is open, component
 91    is closed.
 92
 93    """
 94    component = Component()
 95    component._name = name
 96    component._server_group = server_group
 97    component._runner_cb = runner_cb
 98    component._state_cb = state_cb
 99    component._close_req_cb = close_req_cb
100    component._status_cb = status_cb
101    component._events_cb = events_cb
102    component._server_data_queue_size = server_data_queue_size
103    component._reconnect_delay = reconnect_delay
104    component._eventer_kwargs = eventer_kwargs
105    component._server_data = None
106    component._server_data_queue = None
107
108    component._monitor_component = await hat.monitor.component.connect(
109        addr=addr,
110        name=name,
111        group=group,
112        runner_cb=component._create_monitor_runner,
113        **{**observer_kwargs,
114           'state_cb': component._on_state,
115           'close_req_cb': component._on_close_req})
116
117    return component

Connect to local monitor server and create component

High-level interface for communication with Event Server, based on information obtained from Monitor Server.

Component instance tries to establish active connection with Event Server within monitor component group server_group. Once this connection is established, runner_cb is called with currently active eventer client instance. Result of calling runner_cb should be runner representing user defined components activity associated with connection to active Event Server. Once connection to Event Server is closed or new active Event Server is detected, associated runner is closed. If new connection to Event Server is successfully established, component_cb will be called again to create new runner associated with new instance of eventer client.

If runner is closed while connection to Event Server is open, component is closed.

class Component(hat.aio.group.Resource):
120class Component(aio.Resource):
121    """Eventer Component"""
122
123    @property
124    def async_group(self):
125        return self._monitor_component.async_group
126
127    @property
128    def state(self) -> State:
129        """State"""
130        return self._monitor_component.state
131
132    @property
133    def ready(self) -> bool:
134        """Ready"""
135        return self._monitor_component.ready
136
137    async def set_ready(self, ready: bool):
138        """Set ready"""
139        await self._monitor_component.set_ready(ready)
140
141    async def _on_state(self, monitor_component, state):
142        if self._server_data_queue is not None:
143            data = self._get_active_server_data(state)
144
145            if data != self._server_data:
146                self._server_data = data
147
148                with contextlib.suppress(aio.QueueClosedError):
149                    await self._server_data_queue.put(data)
150
151        if self._state_cb:
152            await aio.call(self._state_cb, self, state)
153
154    async def _on_close_req(self, monitor_component):
155        if self._close_req_cb:
156            await aio.call(self._close_req_cb, self)
157
158    async def _on_status(self, eventer_client, status):
159        if self._status_cb:
160            await aio.call(self._status_cb, self, eventer_client, status)
161
162    async def _on_events(self, eventer_client, events):
163        if self._events_cb:
164            await aio.call(self._events_cb, self, eventer_client, events)
165
166    def _create_monitor_runner(self, monitor_component):
167        self._server_data = self._get_active_server_data(
168            monitor_component.state)
169        self._server_data_queue = aio.Queue(self._server_data_queue_size)
170
171        if self._server_data:
172            self._server_data_queue.put_nowait(self._server_data)
173
174        runner = aio.Group()
175        runner.spawn(self._server_data_loop, runner, self._server_data_queue)
176
177        return runner
178
179    def _get_active_server_data(self, state):
180        info = util.first(state.components, self._active_server_filter)
181        if not info:
182            return
183
184        server_id = json.get(info.data, 'server_id')
185        host = json.get(info.data, ['eventer_server', 'host'])
186        port = json.get(info.data, ['eventer_server', 'port'])
187        server_token = json.get(info.data, 'server_token')
188        if (not isinstance(server_id, int) or
189                not isinstance(host, str) or
190                not isinstance(port, int) or
191                not isinstance(server_token, (str, types.NoneType))):
192            return
193
194        client_token = self._eventer_kwargs.get('client_token')
195        if client_token is not None and client_token != server_token:
196            return
197
198        return ServerData(server_id=server_id,
199                          addr=tcp.Address(host, port),
200                          server_token=server_token)
201
202    def _active_server_filter(self, info):
203        return (info.group == self._server_group and
204                info.blessing_req.token is not None and
205                info.blessing_req.token == info.blessing_res.token)
206
207    async def _server_data_loop(self, async_group, server_data_queue):
208        try:
209            server_data = None
210            while True:
211                while not server_data and not server_data_queue.empty():
212                    server_data = await server_data_queue.get_until_empty()
213
214                async with async_group.create_subgroup() as subgroup:
215                    subgroup.spawn(self._client_loop, subgroup, server_data)
216                    server_data = await server_data_queue.get_until_empty()
217
218        except Exception as e:
219            mlog.error("address loop error: %s", e, exc_info=e)
220            self.close()
221
222        finally:
223            async_group.close()
224            server_data_queue.close()
225
226    async def _client_loop(self, async_group, server_data):
227        try:
228            while True:
229                try:
230                    kwargs = {**self._eventer_kwargs,
231                              'status_cb': self._on_status,
232                              'events_cb': self._on_events}
233
234                    if 'server_id' not in kwargs:
235                        kwargs['server_id'] = server_data.server_id
236
237                    mlog.debug("connecting to server %s", server_data.addr)
238                    client = await hat.event.eventer.client.connect(
239                        addr=server_data.addr,
240                        client_name=self._name,
241                        **kwargs)
242
243                except Exception as e:
244                    mlog.warning("error connecting to server: %s", e,
245                                 exc_info=e)
246                    await asyncio.sleep(self._reconnect_delay)
247                    continue
248
249                try:
250                    mlog.debug("connected to server")
251                    runner = await client.async_group.spawn(
252                        aio.call, self._runner_cb, self, server_data, client)
253
254                    try:
255                        async with async_group.create_subgroup() as subgroup:
256                            client_closing_task = subgroup.spawn(
257                                client.wait_closing)
258                            runner_closing_task = subgroup.spawn(
259                                runner.wait_closing)
260
261                            await asyncio.wait(
262                                [client_closing_task, runner_closing_task],
263                                return_when=asyncio.FIRST_COMPLETED)
264
265                            if (runner_closing_task.done() and
266                                    not client_closing_task.done()):
267                                self.close()
268                                return
269
270                    finally:
271                        await aio.uncancellable(runner.async_close())
272
273                finally:
274                    await aio.uncancellable(client.async_close())
275
276                mlog.debug("connection to server closed")
277                await asyncio.sleep(self._reconnect_delay)
278
279        except Exception as e:
280            mlog.error("client loop error: %s", e, exc_info=e)
281            self.close()
282
283        finally:
284            async_group.close()

Eventer Component

async_group
123    @property
124    def async_group(self):
125        return self._monitor_component.async_group

Group controlling resource's lifetime.

state: State
127    @property
128    def state(self) -> State:
129        """State"""
130        return self._monitor_component.state

State

ready: bool
132    @property
133    def ready(self) -> bool:
134        """Ready"""
135        return self._monitor_component.ready

Ready

async def set_ready(self, ready: bool):
137    async def set_ready(self, ready: bool):
138        """Set ready"""
139        await self._monitor_component.set_ready(ready)

Set ready

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close