hat.event.server.main_runner

  1import asyncio
  2import contextlib
  3import functools
  4import logging
  5
  6from hat import aio
  7from hat import json
  8from hat import util
  9from hat.drivers import tcp
 10import hat.monitor.component
 11
 12from hat.event import common
 13from hat.event.server.adminer_server import create_adminer_server
 14from hat.event.server.engine_runner import EngineRunner
 15from hat.event.server.eventer_client_runner import EventerClientRunner
 16from hat.event.server.eventer_server import create_eventer_server
 17
 18
 19mlog: logging.Logger = logging.getLogger(__name__)
 20"""Module logger"""
 21
 22
 23class MainRunner(aio.Resource):
 24
 25    def __init__(self,
 26                 conf: json.Data,
 27                 reset_monitor_ready_timeout: float = 30):
 28        self._conf = conf
 29        self._reset_monitor_ready_timeout = reset_monitor_ready_timeout
 30        self._loop = asyncio.get_running_loop()
 31        self._async_group = aio.Group()
 32        self._backend = None
 33        self._eventer_server = None
 34        self._adminer_server = None
 35        self._monitor_component = None
 36        self._eventer_client_runner = None
 37        self._engine_runner = None
 38        self._monitor_state_cbs = util.CallbackRegistry()
 39        self._reset_monitor_ready = asyncio.Event()
 40
 41        self.async_group.spawn(self._run)
 42
 43    @property
 44    def async_group(self) -> aio.Group:
 45        return self._async_group
 46
 47    async def _run(self):
 48        try:
 49            mlog.debug("starting main runner loop")
 50            await self._start()
 51
 52            if not self._monitor_component:
 53                await self._loop.create_future()
 54                return
 55
 56            await self._monitor_component.set_ready(True)
 57
 58            while True:
 59                self._reset_monitor_ready.clear()
 60                await self._reset_monitor_ready.wait()
 61
 62                await self._monitor_component.set_ready(False)
 63
 64                with contextlib.suppress(asyncio.TimeoutError):
 65                    await aio.wait_for(
 66                        self._wait_while_monitor_blessing_res_token(),
 67                        self._reset_monitor_ready_timeout)
 68
 69                await self._monitor_component.set_ready(True)
 70
 71        except Exception as e:
 72            mlog.error("main runner loop error: %s", e, exc_info=e)
 73
 74        finally:
 75            mlog.debug("closing main runner loop")
 76            self.close()
 77            await aio.uncancellable(self._stop())
 78
 79    async def _start(self):
 80        mlog.debug("creating backend")
 81        backend_conf = self._conf['backend']
 82        backend_info = common.import_backend_info(backend_conf['module'])
 83        self._backend = await aio.call(
 84            backend_info.create, backend_conf,
 85            functools.partial(self._on_backend_events, False),
 86            functools.partial(self._on_backend_events, True))
 87        _bind_resource(self.async_group, self._backend)
 88
 89        mlog.debug("creating eventer server")
 90        self._eventer_server = await create_eventer_server(
 91            addr=tcp.Address(self._conf['eventer_server']['host'],
 92                             self._conf['eventer_server']['port']),
 93            backend=self._backend,
 94            server_id=self._conf['server_id'],
 95            server_token=self._conf.get('server_token'))
 96        _bind_resource(self.async_group, self._eventer_server)
 97
 98        if 'adminer_server' in self._conf:
 99            mlog.debug("creating adminer server")
100            self._adminer_server = await create_adminer_server(
101                addr=tcp.Address(self._conf['adminer_server']['host'],
102                                 self._conf['adminer_server']['port']),
103                log_conf=self._conf.get('log'))
104            _bind_resource(self.async_group, self._adminer_server)
105
106        if 'monitor_component' in self._conf:
107            mlog.debug("creating eventer client runner")
108            self._eventer_client_runner = EventerClientRunner(
109                conf=self._conf,
110                backend=self._backend,
111                synced_cb=self._on_eventer_client_synced)
112            _bind_resource(self.async_group, self._eventer_client_runner)
113
114            handle = self._monitor_state_cbs.register(
115                self._eventer_client_runner.set_monitor_state)
116            self._eventer_client_runner.async_group.spawn(
117                aio.call_on_cancel, handle.cancel)
118
119            mlog.debug("creating monitor component")
120            self._monitor_component = await hat.monitor.component.connect(
121                addr=tcp.Address(self._conf['monitor_component']['host'],
122                                 self._conf['monitor_component']['port']),
123                name=self._conf['name'],
124                group=self._conf['monitor_component']['group'],
125                runner_cb=self._create_monitor_runner,
126                data={'server_id': self._conf['server_id'],
127                      'eventer_server': self._conf['eventer_server'],
128                      'server_token': self._conf.get('server_token')},
129                state_cb=self._on_monitor_state,
130                close_req_cb=self._on_monitor_close_req)
131            _bind_resource(self.async_group, self._monitor_component)
132
133            self._eventer_client_runner.set_monitor_state(
134                self._monitor_component.state)
135
136        else:
137            mlog.debug("creating engine runner")
138            self._engine_runner = EngineRunner(
139                conf=self._conf,
140                backend=self._backend,
141                eventer_server=self._eventer_server,
142                eventer_client_runner=None,
143                reset_monitor_ready_cb=self._reset_monitor_ready.set)
144            _bind_resource(self.async_group, self._engine_runner)
145
146    async def _stop(self):
147        if self._engine_runner and not self._monitor_component:
148            await self._engine_runner.async_close()
149
150        if self._eventer_client_runner:
151            await self._eventer_client_runner.async_close()
152
153        if self._monitor_component:
154            await self._monitor_component.async_close()
155
156        if self._adminer_server:
157            await self._adminer_server.async_close()
158
159        if self._eventer_server:
160            with contextlib.suppress(Exception):
161                await self._backend.flush()
162
163            await self._eventer_server.async_close()
164
165        if self._backend:
166            await self._backend.async_close()
167
168    def _create_monitor_runner(self, monitor_component):
169        mlog.debug("creating engine runner")
170        self._engine_runner = EngineRunner(
171            conf=self._conf,
172            backend=self._backend,
173            eventer_server=self._eventer_server,
174            eventer_client_runner=self._eventer_client_runner,
175            reset_monitor_ready_cb=self._reset_monitor_ready.set)
176        return self._engine_runner
177
178    def _on_monitor_state(self, monitor_component, state):
179        self._monitor_state_cbs.notify(state)
180
181    async def _on_monitor_close_req(self, monitor_component):
182        if not self._engine_runner:
183            return
184
185        await self._engine_runner.async_close()
186
187    async def _on_backend_events(self, persisted, events):
188        if not self._eventer_server:
189            return
190
191        await self._eventer_server.notify_events(events, persisted)
192
193    async def _on_eventer_client_synced(self, server_id, state, count):
194        if not self._engine_runner:
195            return
196
197        await self._engine_runner.set_synced(server_id, state, count)
198
199    async def _wait_while_monitor_blessing_res_token(self):
200        if not self._monitor_component:
201            return
202
203        while (self._monitor_component.state.info and
204                self._monitor_component.state.info.blessing_res.token):
205            event = asyncio.Event()
206            with self._monitor_state_cbs.register(lambda _: event.set()):
207                await event.wait()
208
209
210def _bind_resource(async_group, resource):
211    async_group.spawn(aio.call_on_done, resource.wait_closing(),
212                      async_group.close)
mlog: logging.Logger = <Logger hat.event.server.main_runner (WARNING)>

Module logger

class MainRunner(hat.aio.group.Resource):
 24class MainRunner(aio.Resource):
 25
 26    def __init__(self,
 27                 conf: json.Data,
 28                 reset_monitor_ready_timeout: float = 30):
 29        self._conf = conf
 30        self._reset_monitor_ready_timeout = reset_monitor_ready_timeout
 31        self._loop = asyncio.get_running_loop()
 32        self._async_group = aio.Group()
 33        self._backend = None
 34        self._eventer_server = None
 35        self._adminer_server = None
 36        self._monitor_component = None
 37        self._eventer_client_runner = None
 38        self._engine_runner = None
 39        self._monitor_state_cbs = util.CallbackRegistry()
 40        self._reset_monitor_ready = asyncio.Event()
 41
 42        self.async_group.spawn(self._run)
 43
 44    @property
 45    def async_group(self) -> aio.Group:
 46        return self._async_group
 47
 48    async def _run(self):
 49        try:
 50            mlog.debug("starting main runner loop")
 51            await self._start()
 52
 53            if not self._monitor_component:
 54                await self._loop.create_future()
 55                return
 56
 57            await self._monitor_component.set_ready(True)
 58
 59            while True:
 60                self._reset_monitor_ready.clear()
 61                await self._reset_monitor_ready.wait()
 62
 63                await self._monitor_component.set_ready(False)
 64
 65                with contextlib.suppress(asyncio.TimeoutError):
 66                    await aio.wait_for(
 67                        self._wait_while_monitor_blessing_res_token(),
 68                        self._reset_monitor_ready_timeout)
 69
 70                await self._monitor_component.set_ready(True)
 71
 72        except Exception as e:
 73            mlog.error("main runner loop error: %s", e, exc_info=e)
 74
 75        finally:
 76            mlog.debug("closing main runner loop")
 77            self.close()
 78            await aio.uncancellable(self._stop())
 79
 80    async def _start(self):
 81        mlog.debug("creating backend")
 82        backend_conf = self._conf['backend']
 83        backend_info = common.import_backend_info(backend_conf['module'])
 84        self._backend = await aio.call(
 85            backend_info.create, backend_conf,
 86            functools.partial(self._on_backend_events, False),
 87            functools.partial(self._on_backend_events, True))
 88        _bind_resource(self.async_group, self._backend)
 89
 90        mlog.debug("creating eventer server")
 91        self._eventer_server = await create_eventer_server(
 92            addr=tcp.Address(self._conf['eventer_server']['host'],
 93                             self._conf['eventer_server']['port']),
 94            backend=self._backend,
 95            server_id=self._conf['server_id'],
 96            server_token=self._conf.get('server_token'))
 97        _bind_resource(self.async_group, self._eventer_server)
 98
 99        if 'adminer_server' in self._conf:
100            mlog.debug("creating adminer server")
101            self._adminer_server = await create_adminer_server(
102                addr=tcp.Address(self._conf['adminer_server']['host'],
103                                 self._conf['adminer_server']['port']),
104                log_conf=self._conf.get('log'))
105            _bind_resource(self.async_group, self._adminer_server)
106
107        if 'monitor_component' in self._conf:
108            mlog.debug("creating eventer client runner")
109            self._eventer_client_runner = EventerClientRunner(
110                conf=self._conf,
111                backend=self._backend,
112                synced_cb=self._on_eventer_client_synced)
113            _bind_resource(self.async_group, self._eventer_client_runner)
114
115            handle = self._monitor_state_cbs.register(
116                self._eventer_client_runner.set_monitor_state)
117            self._eventer_client_runner.async_group.spawn(
118                aio.call_on_cancel, handle.cancel)
119
120            mlog.debug("creating monitor component")
121            self._monitor_component = await hat.monitor.component.connect(
122                addr=tcp.Address(self._conf['monitor_component']['host'],
123                                 self._conf['monitor_component']['port']),
124                name=self._conf['name'],
125                group=self._conf['monitor_component']['group'],
126                runner_cb=self._create_monitor_runner,
127                data={'server_id': self._conf['server_id'],
128                      'eventer_server': self._conf['eventer_server'],
129                      'server_token': self._conf.get('server_token')},
130                state_cb=self._on_monitor_state,
131                close_req_cb=self._on_monitor_close_req)
132            _bind_resource(self.async_group, self._monitor_component)
133
134            self._eventer_client_runner.set_monitor_state(
135                self._monitor_component.state)
136
137        else:
138            mlog.debug("creating engine runner")
139            self._engine_runner = EngineRunner(
140                conf=self._conf,
141                backend=self._backend,
142                eventer_server=self._eventer_server,
143                eventer_client_runner=None,
144                reset_monitor_ready_cb=self._reset_monitor_ready.set)
145            _bind_resource(self.async_group, self._engine_runner)
146
147    async def _stop(self):
148        if self._engine_runner and not self._monitor_component:
149            await self._engine_runner.async_close()
150
151        if self._eventer_client_runner:
152            await self._eventer_client_runner.async_close()
153
154        if self._monitor_component:
155            await self._monitor_component.async_close()
156
157        if self._adminer_server:
158            await self._adminer_server.async_close()
159
160        if self._eventer_server:
161            with contextlib.suppress(Exception):
162                await self._backend.flush()
163
164            await self._eventer_server.async_close()
165
166        if self._backend:
167            await self._backend.async_close()
168
169    def _create_monitor_runner(self, monitor_component):
170        mlog.debug("creating engine runner")
171        self._engine_runner = EngineRunner(
172            conf=self._conf,
173            backend=self._backend,
174            eventer_server=self._eventer_server,
175            eventer_client_runner=self._eventer_client_runner,
176            reset_monitor_ready_cb=self._reset_monitor_ready.set)
177        return self._engine_runner
178
179    def _on_monitor_state(self, monitor_component, state):
180        self._monitor_state_cbs.notify(state)
181
182    async def _on_monitor_close_req(self, monitor_component):
183        if not self._engine_runner:
184            return
185
186        await self._engine_runner.async_close()
187
188    async def _on_backend_events(self, persisted, events):
189        if not self._eventer_server:
190            return
191
192        await self._eventer_server.notify_events(events, persisted)
193
194    async def _on_eventer_client_synced(self, server_id, state, count):
195        if not self._engine_runner:
196            return
197
198        await self._engine_runner.set_synced(server_id, state, count)
199
200    async def _wait_while_monitor_blessing_res_token(self):
201        if not self._monitor_component:
202            return
203
204        while (self._monitor_component.state.info and
205                self._monitor_component.state.info.blessing_res.token):
206            event = asyncio.Event()
207            with self._monitor_state_cbs.register(lambda _: event.set()):
208                await event.wait()

Resource with lifetime control based on Group.

MainRunner( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], reset_monitor_ready_timeout: float = 30)
26    def __init__(self,
27                 conf: json.Data,
28                 reset_monitor_ready_timeout: float = 30):
29        self._conf = conf
30        self._reset_monitor_ready_timeout = reset_monitor_ready_timeout
31        self._loop = asyncio.get_running_loop()
32        self._async_group = aio.Group()
33        self._backend = None
34        self._eventer_server = None
35        self._adminer_server = None
36        self._monitor_component = None
37        self._eventer_client_runner = None
38        self._engine_runner = None
39        self._monitor_state_cbs = util.CallbackRegistry()
40        self._reset_monitor_ready = asyncio.Event()
41
42        self.async_group.spawn(self._run)
async_group: hat.aio.group.Group
44    @property
45    def async_group(self) -> aio.Group:
46        return self._async_group

Group controlling resource's lifetime.