hat.event.server.runner

  1import asyncio
  2import contextlib
  3import logging
  4import types
  5import typing
  6
  7from hat import aio
  8from hat import json
  9from hat.drivers import tcp
 10import hat.monitor.component
 11
 12from hat.event import common
 13from hat.event.server.engine import create_engine
 14from hat.event.server.eventer_client import create_eventer_client
 15from hat.event.server.eventer_server import (create_eventer_server,
 16                                             EventerServer)
 17
 18
 19mlog: logging.Logger = logging.getLogger(__name__)
 20"""Module logger"""
 21
 22
 23class EventerServerData(typing.NamedTuple):
 24    server_id: common.ServerId
 25    addr: tcp.Address
 26
 27
 28class MainRunner(aio.Resource):
 29
 30    def __init__(self, conf: json.Data):
 31        self._conf = conf
 32        self._loop = asyncio.get_running_loop()
 33        self._async_group = aio.Group()
 34        self._backend = None
 35        self._eventer_server = None
 36        self._monitor_component = None
 37        self._eventer_client_runner = None
 38        self._engine_runner = None
 39
 40        self.async_group.spawn(self._run)
 41
 42    @property
 43    def async_group(self) -> aio.Group:
 44        return self._async_group
 45
 46    async def _run(self):
 47        try:
 48            await self._start()
 49            await self._loop.create_future()
 50
 51        except Exception as e:
 52            mlog.error("main runner loop error: %s", e, exc_info=e)
 53
 54        finally:
 55            self.close()
 56            await aio.uncancellable(self._stop())
 57
 58    async def _start(self):
 59        backend_conf = self._conf['backend']
 60        backend_info = common.import_backend_info(backend_conf['module'])
 61        self._backend = await aio.call(backend_info.create, backend_conf,
 62                                       self._on_backend_registered_events,
 63                                       self._on_backend_flushed_events)
 64        _bind_resource(self.async_group, self._backend)
 65
 66        self._eventer_server = await create_eventer_server(
 67            addr=tcp.Address(self._conf['eventer_server']['host'],
 68                             self._conf['eventer_server']['port']),
 69            backend=self._backend,
 70            server_id=self._conf['server_id'],
 71            server_token=self._conf.get('server_token'))
 72        _bind_resource(self.async_group, self._eventer_server)
 73
 74        if 'monitor_component' in self._conf:
 75            self._monitor_component = await hat.monitor.component.connect(
 76                addr=tcp.Address(self._conf['monitor_component']['host'],
 77                                 self._conf['monitor_component']['port']),
 78                name=self._conf['monitor_component']['name'],
 79                group=self._conf['monitor_component']['group'],
 80                runner_cb=self._create_monitor_runner,
 81                data={'server_id': self._conf['server_id'],
 82                      'eventer_server': self._conf['eventer_server'],
 83                      'server_token': self._conf.get('server_token')},
 84                state_cb=self._on_monitor_state)
 85            _bind_resource(self.async_group, self._monitor_component)
 86
 87            self._eventer_client_runner = EventerClientRunner(
 88                conf=self._conf,
 89                backend=self._backend,
 90                synced_cb=self._on_eventer_client_synced)
 91            _bind_resource(self.async_group, self._eventer_client_runner)
 92
 93            await self._eventer_client_runner.set_monitor_state(
 94                self._monitor_component.state)
 95
 96            await self._monitor_component.set_ready(True)
 97
 98        else:
 99            self._engine_runner = EngineRunner(
100                conf=self._conf,
101                backend=self._backend,
102                eventer_server=self._eventer_server)
103            _bind_resource(self.async_group, self._engine_runner)
104
105    async def _stop(self):
106        if self._engine_runner and not self._monitor_component:
107            await self._engine_runner.async_close()
108
109        if self._eventer_client_runner:
110            await self._eventer_client_runner.async_close()
111
112        if self._monitor_component:
113            await self._monitor_component.async_close()
114
115        if self._eventer_server:
116            with contextlib.suppress(Exception):
117                await self._backend.flush()
118
119            await self._eventer_server.async_close()
120
121        if self._backend:
122            await self._backend.async_close()
123
124    async def _create_monitor_runner(self, monitor_component):
125        self._engine_runner = EngineRunner(conf=self._conf,
126                                           backend=self._backend,
127                                           eventer_server=self._eventer_server)
128        return self._engine_runner
129
130    async def _on_backend_registered_events(self, events):
131        if not self._eventer_server:
132            return
133
134        await self._eventer_server.notify_events(events, False)
135
136    async def _on_backend_flushed_events(self, events):
137        if not self._eventer_server:
138            return
139
140        await self._eventer_server.notify_events(events, True)
141
142    async def _on_monitor_state(self, monitor_component, state):
143        if not self._eventer_client_runner:
144            return
145
146        await self._eventer_client_runner.set_monitor_state(state)
147
148    async def _on_eventer_client_synced(self, server_id, synced, counter):
149        if not self._engine_runner:
150            return
151
152        await self._engine_runner.set_synced(server_id, synced, counter)
153
154
155class EventerClientRunner(aio.Resource):
156
157    def __init__(self,
158                 conf: json.Data,
159                 backend: common.Backend,
160                 synced_cb: aio.AsyncCallable[[common.ServerId, bool, int],
161                                              None],
162                 reconnect_delay: float = 5):
163        self._conf = conf
164        self._backend = backend
165        self._synced_cb = synced_cb
166        self._reconnect_delay = reconnect_delay
167        self._async_group = aio.Group()
168        self._client_subgroups = {}
169
170    @property
171    def async_group(self) -> aio.Group:
172        return self._async_group
173
174    async def set_monitor_state(self, state: hat.monitor.component.State):
175        valid_server_data = set(_get_eventer_server_data(
176            group=self._conf['monitor_component']['group'],
177            server_token=self._conf.get('server_token'),
178            state=state))
179
180        for server_data in list(self._client_subgroups.keys()):
181            if server_data in valid_server_data:
182                continue
183
184            subgroup = self._client_subgroups.pop(server_data)
185            subgroup.close()
186
187        for server_data in valid_server_data:
188            subgroup = self._client_subgroups.get(server_data)
189            if subgroup and subgroup.is_open:
190                continue
191
192            subgroup = self.async_group.create_subgroup()
193            subgroup.spawn(self._client_loop, subgroup, server_data)
194            self._client_subgroups[server_data] = subgroup
195
196    async def _client_loop(self, async_group, server_data):
197        try:
198            while True:
199                try:
200                    eventer_client = await create_eventer_client(
201                        addr=server_data.addr,
202                        client_name=self._conf['monitor_component']['name'],
203                        server_id=server_data.server_id,
204                        backend=self._backend,
205                        client_token=self._conf.get('server_token'),
206                        synced_cb=self._on_synced)
207
208                except Exception:
209                    await asyncio.sleep(self._reconnect_delay)
210                    continue
211
212                try:
213                    await aio.call(self._synced_cb, server_data.server_id,
214                                   False, 0)
215
216                    await eventer_client.wait_closing()
217
218                finally:
219                    await aio.uncancellable(eventer_client.async_close())
220
221        except Exception as e:
222            mlog.error("eventer client runner loop error: %s", e, exc_info=e)
223            self.close()
224
225        finally:
226            async_group.close()
227
228    async def _on_synced(self, server_id, counter):
229        await aio.call(self._synced_cb, server_id, True, counter)
230
231
232class EngineRunner(aio.Resource):
233
234    def __init__(self,
235                 conf: json.Data,
236                 backend: common.Backend,
237                 eventer_server: EventerServer):
238        self._conf = conf
239        self._backend = backend
240        self._eventer_server = eventer_server
241        self._async_group = aio.Group()
242        self._engine = None
243        self._restart = asyncio.Event()
244
245        self.async_group.spawn(self._run)
246
247    @property
248    def async_group(self) -> aio.Group:
249        return self._async_group
250
251    async def set_synced(self,
252                         server_id: common.ServerId,
253                         synced: bool,
254                         counter: int):
255        if self._engine and self._engine.is_open:
256            source = common.Source(type=common.SourceType.SERVER, id=0)
257            event = common.RegisterEvent(
258                type=('event', str(self._conf['server_id']), 'synced',
259                      str(server_id)),
260                source_timestamp=None,
261                payload=common.EventPayloadJson(synced))
262
263            await self._engine.register(source, [event])
264
265        if synced and counter and self._conf.get('synced_restart_engine'):
266            self._restart.set()
267
268    async def _run(self):
269        try:
270            while True:
271                self._restart.clear()
272
273                self._engine = await create_engine(
274                    backend=self._backend,
275                    module_confs=self._conf['modules'],
276                    server_id=self._conf['server_id'])
277                await self._eventer_server.set_engine(self._engine)
278
279                async with self._async_group.create_subgroup() as subgroup:
280                    await asyncio.wait(
281                        [subgroup.spawn(self._engine.wait_closing),
282                         subgroup.spawn(self._restart.wait)],
283                        return_when=asyncio.FIRST_COMPLETED)
284
285                if not self._engine.is_open:
286                    break
287
288                await self._close()
289
290        except Exception as e:
291            mlog.error("engine runner loop error: %s", e, exc_info=e)
292
293        finally:
294            self.close()
295            await aio.uncancellable(self._close())
296
297    async def _close(self):
298        if self._engine:
299            await self._engine.async_close()
300
301        await self._eventer_server.set_engine(None)
302
303
304def _bind_resource(async_group, resource):
305    async_group.spawn(aio.call_on_done, resource.wait_closing(),
306                      async_group.close)
307
308
309def _get_eventer_server_data(group, server_token, state):
310    for info in state.components:
311        if info == state.info or info.group != group:
312            continue
313
314        server_id = json.get(info.data, 'server_id')
315        host = json.get(info.data, ['eventer_server', 'host'])
316        port = json.get(info.data, ['eventer_server', 'port'])
317        token = json.get(info.data, 'server_token')
318        if (not isinstance(server_id, int) or
319                not isinstance(host, str) or
320                not isinstance(port, int) or
321                not isinstance(token, (str, types.NoneType))):
322            continue
323
324        if server_token is not None and token != server_token:
325            continue
326
327        yield EventerServerData(server_id=server_id,
328                                addr=tcp.Address(host, port))
mlog: logging.Logger = <Logger hat.event.server.runner (WARNING)>

Module logger

class EventerServerData(typing.NamedTuple):
24class EventerServerData(typing.NamedTuple):
25    server_id: common.ServerId
26    addr: tcp.Address

EventerServerData(server_id, addr)

EventerServerData(server_id: int, addr: hat.drivers.tcp.Address)

Create new instance of EventerServerData(server_id, addr)

server_id: int

Alias for field number 0

addr: hat.drivers.tcp.Address

Alias for field number 1

Inherited Members
builtins.tuple
index
count
class MainRunner(hat.aio.group.Resource):
 29class MainRunner(aio.Resource):
 30
 31    def __init__(self, conf: json.Data):
 32        self._conf = conf
 33        self._loop = asyncio.get_running_loop()
 34        self._async_group = aio.Group()
 35        self._backend = None
 36        self._eventer_server = None
 37        self._monitor_component = None
 38        self._eventer_client_runner = None
 39        self._engine_runner = None
 40
 41        self.async_group.spawn(self._run)
 42
 43    @property
 44    def async_group(self) -> aio.Group:
 45        return self._async_group
 46
 47    async def _run(self):
 48        try:
 49            await self._start()
 50            await self._loop.create_future()
 51
 52        except Exception as e:
 53            mlog.error("main runner loop error: %s", e, exc_info=e)
 54
 55        finally:
 56            self.close()
 57            await aio.uncancellable(self._stop())
 58
 59    async def _start(self):
 60        backend_conf = self._conf['backend']
 61        backend_info = common.import_backend_info(backend_conf['module'])
 62        self._backend = await aio.call(backend_info.create, backend_conf,
 63                                       self._on_backend_registered_events,
 64                                       self._on_backend_flushed_events)
 65        _bind_resource(self.async_group, self._backend)
 66
 67        self._eventer_server = await create_eventer_server(
 68            addr=tcp.Address(self._conf['eventer_server']['host'],
 69                             self._conf['eventer_server']['port']),
 70            backend=self._backend,
 71            server_id=self._conf['server_id'],
 72            server_token=self._conf.get('server_token'))
 73        _bind_resource(self.async_group, self._eventer_server)
 74
 75        if 'monitor_component' in self._conf:
 76            self._monitor_component = await hat.monitor.component.connect(
 77                addr=tcp.Address(self._conf['monitor_component']['host'],
 78                                 self._conf['monitor_component']['port']),
 79                name=self._conf['monitor_component']['name'],
 80                group=self._conf['monitor_component']['group'],
 81                runner_cb=self._create_monitor_runner,
 82                data={'server_id': self._conf['server_id'],
 83                      'eventer_server': self._conf['eventer_server'],
 84                      'server_token': self._conf.get('server_token')},
 85                state_cb=self._on_monitor_state)
 86            _bind_resource(self.async_group, self._monitor_component)
 87
 88            self._eventer_client_runner = EventerClientRunner(
 89                conf=self._conf,
 90                backend=self._backend,
 91                synced_cb=self._on_eventer_client_synced)
 92            _bind_resource(self.async_group, self._eventer_client_runner)
 93
 94            await self._eventer_client_runner.set_monitor_state(
 95                self._monitor_component.state)
 96
 97            await self._monitor_component.set_ready(True)
 98
 99        else:
100            self._engine_runner = EngineRunner(
101                conf=self._conf,
102                backend=self._backend,
103                eventer_server=self._eventer_server)
104            _bind_resource(self.async_group, self._engine_runner)
105
106    async def _stop(self):
107        if self._engine_runner and not self._monitor_component:
108            await self._engine_runner.async_close()
109
110        if self._eventer_client_runner:
111            await self._eventer_client_runner.async_close()
112
113        if self._monitor_component:
114            await self._monitor_component.async_close()
115
116        if self._eventer_server:
117            with contextlib.suppress(Exception):
118                await self._backend.flush()
119
120            await self._eventer_server.async_close()
121
122        if self._backend:
123            await self._backend.async_close()
124
125    async def _create_monitor_runner(self, monitor_component):
126        self._engine_runner = EngineRunner(conf=self._conf,
127                                           backend=self._backend,
128                                           eventer_server=self._eventer_server)
129        return self._engine_runner
130
131    async def _on_backend_registered_events(self, events):
132        if not self._eventer_server:
133            return
134
135        await self._eventer_server.notify_events(events, False)
136
137    async def _on_backend_flushed_events(self, events):
138        if not self._eventer_server:
139            return
140
141        await self._eventer_server.notify_events(events, True)
142
143    async def _on_monitor_state(self, monitor_component, state):
144        if not self._eventer_client_runner:
145            return
146
147        await self._eventer_client_runner.set_monitor_state(state)
148
149    async def _on_eventer_client_synced(self, server_id, synced, counter):
150        if not self._engine_runner:
151            return
152
153        await self._engine_runner.set_synced(server_id, synced, counter)

Resource with lifetime control based on Group.

MainRunner( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')])
31    def __init__(self, conf: json.Data):
32        self._conf = conf
33        self._loop = asyncio.get_running_loop()
34        self._async_group = aio.Group()
35        self._backend = None
36        self._eventer_server = None
37        self._monitor_component = None
38        self._eventer_client_runner = None
39        self._engine_runner = None
40
41        self.async_group.spawn(self._run)
async_group: hat.aio.group.Group
43    @property
44    def async_group(self) -> aio.Group:
45        return self._async_group

Group controlling resource's lifetime.

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
class EventerClientRunner(hat.aio.group.Resource):
156class EventerClientRunner(aio.Resource):
157
158    def __init__(self,
159                 conf: json.Data,
160                 backend: common.Backend,
161                 synced_cb: aio.AsyncCallable[[common.ServerId, bool, int],
162                                              None],
163                 reconnect_delay: float = 5):
164        self._conf = conf
165        self._backend = backend
166        self._synced_cb = synced_cb
167        self._reconnect_delay = reconnect_delay
168        self._async_group = aio.Group()
169        self._client_subgroups = {}
170
171    @property
172    def async_group(self) -> aio.Group:
173        return self._async_group
174
175    async def set_monitor_state(self, state: hat.monitor.component.State):
176        valid_server_data = set(_get_eventer_server_data(
177            group=self._conf['monitor_component']['group'],
178            server_token=self._conf.get('server_token'),
179            state=state))
180
181        for server_data in list(self._client_subgroups.keys()):
182            if server_data in valid_server_data:
183                continue
184
185            subgroup = self._client_subgroups.pop(server_data)
186            subgroup.close()
187
188        for server_data in valid_server_data:
189            subgroup = self._client_subgroups.get(server_data)
190            if subgroup and subgroup.is_open:
191                continue
192
193            subgroup = self.async_group.create_subgroup()
194            subgroup.spawn(self._client_loop, subgroup, server_data)
195            self._client_subgroups[server_data] = subgroup
196
197    async def _client_loop(self, async_group, server_data):
198        try:
199            while True:
200                try:
201                    eventer_client = await create_eventer_client(
202                        addr=server_data.addr,
203                        client_name=self._conf['monitor_component']['name'],
204                        server_id=server_data.server_id,
205                        backend=self._backend,
206                        client_token=self._conf.get('server_token'),
207                        synced_cb=self._on_synced)
208
209                except Exception:
210                    await asyncio.sleep(self._reconnect_delay)
211                    continue
212
213                try:
214                    await aio.call(self._synced_cb, server_data.server_id,
215                                   False, 0)
216
217                    await eventer_client.wait_closing()
218
219                finally:
220                    await aio.uncancellable(eventer_client.async_close())
221
222        except Exception as e:
223            mlog.error("eventer client runner loop error: %s", e, exc_info=e)
224            self.close()
225
226        finally:
227            async_group.close()
228
229    async def _on_synced(self, server_id, counter):
230        await aio.call(self._synced_cb, server_id, True, counter)

Resource with lifetime control based on Group.

EventerClientRunner( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], backend: hat.event.common.backend.Backend, synced_cb: Callable[[int, bool, int], Optional[Awaitable[NoneType]]], reconnect_delay: float = 5)
158    def __init__(self,
159                 conf: json.Data,
160                 backend: common.Backend,
161                 synced_cb: aio.AsyncCallable[[common.ServerId, bool, int],
162                                              None],
163                 reconnect_delay: float = 5):
164        self._conf = conf
165        self._backend = backend
166        self._synced_cb = synced_cb
167        self._reconnect_delay = reconnect_delay
168        self._async_group = aio.Group()
169        self._client_subgroups = {}
async_group: hat.aio.group.Group
171    @property
172    def async_group(self) -> aio.Group:
173        return self._async_group

Group controlling resource's lifetime.

async def set_monitor_state(self, state: hat.monitor.observer.client.State):
175    async def set_monitor_state(self, state: hat.monitor.component.State):
176        valid_server_data = set(_get_eventer_server_data(
177            group=self._conf['monitor_component']['group'],
178            server_token=self._conf.get('server_token'),
179            state=state))
180
181        for server_data in list(self._client_subgroups.keys()):
182            if server_data in valid_server_data:
183                continue
184
185            subgroup = self._client_subgroups.pop(server_data)
186            subgroup.close()
187
188        for server_data in valid_server_data:
189            subgroup = self._client_subgroups.get(server_data)
190            if subgroup and subgroup.is_open:
191                continue
192
193            subgroup = self.async_group.create_subgroup()
194            subgroup.spawn(self._client_loop, subgroup, server_data)
195            self._client_subgroups[server_data] = subgroup
Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
class EngineRunner(hat.aio.group.Resource):
233class EngineRunner(aio.Resource):
234
235    def __init__(self,
236                 conf: json.Data,
237                 backend: common.Backend,
238                 eventer_server: EventerServer):
239        self._conf = conf
240        self._backend = backend
241        self._eventer_server = eventer_server
242        self._async_group = aio.Group()
243        self._engine = None
244        self._restart = asyncio.Event()
245
246        self.async_group.spawn(self._run)
247
248    @property
249    def async_group(self) -> aio.Group:
250        return self._async_group
251
252    async def set_synced(self,
253                         server_id: common.ServerId,
254                         synced: bool,
255                         counter: int):
256        if self._engine and self._engine.is_open:
257            source = common.Source(type=common.SourceType.SERVER, id=0)
258            event = common.RegisterEvent(
259                type=('event', str(self._conf['server_id']), 'synced',
260                      str(server_id)),
261                source_timestamp=None,
262                payload=common.EventPayloadJson(synced))
263
264            await self._engine.register(source, [event])
265
266        if synced and counter and self._conf.get('synced_restart_engine'):
267            self._restart.set()
268
269    async def _run(self):
270        try:
271            while True:
272                self._restart.clear()
273
274                self._engine = await create_engine(
275                    backend=self._backend,
276                    module_confs=self._conf['modules'],
277                    server_id=self._conf['server_id'])
278                await self._eventer_server.set_engine(self._engine)
279
280                async with self._async_group.create_subgroup() as subgroup:
281                    await asyncio.wait(
282                        [subgroup.spawn(self._engine.wait_closing),
283                         subgroup.spawn(self._restart.wait)],
284                        return_when=asyncio.FIRST_COMPLETED)
285
286                if not self._engine.is_open:
287                    break
288
289                await self._close()
290
291        except Exception as e:
292            mlog.error("engine runner loop error: %s", e, exc_info=e)
293
294        finally:
295            self.close()
296            await aio.uncancellable(self._close())
297
298    async def _close(self):
299        if self._engine:
300            await self._engine.async_close()
301
302        await self._eventer_server.set_engine(None)

Resource with lifetime control based on Group.

EngineRunner( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], backend: hat.event.common.backend.Backend, eventer_server: hat.event.server.eventer_server.EventerServer)
235    def __init__(self,
236                 conf: json.Data,
237                 backend: common.Backend,
238                 eventer_server: EventerServer):
239        self._conf = conf
240        self._backend = backend
241        self._eventer_server = eventer_server
242        self._async_group = aio.Group()
243        self._engine = None
244        self._restart = asyncio.Event()
245
246        self.async_group.spawn(self._run)
async_group: hat.aio.group.Group
248    @property
249    def async_group(self) -> aio.Group:
250        return self._async_group

Group controlling resource's lifetime.

async def set_synced(self, server_id: int, synced: bool, counter: int):
252    async def set_synced(self,
253                         server_id: common.ServerId,
254                         synced: bool,
255                         counter: int):
256        if self._engine and self._engine.is_open:
257            source = common.Source(type=common.SourceType.SERVER, id=0)
258            event = common.RegisterEvent(
259                type=('event', str(self._conf['server_id']), 'synced',
260                      str(server_id)),
261                source_timestamp=None,
262                payload=common.EventPayloadJson(synced))
263
264            await self._engine.register(source, [event])
265
266        if synced and counter and self._conf.get('synced_restart_engine'):
267            self._restart.set()
Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close