hat.event.server.main_runner
1import asyncio 2import contextlib 3import functools 4import logging 5 6from hat import aio 7from hat import json 8from hat import util 9from hat.drivers import tcp 10import hat.monitor.component 11 12from hat.event import common 13from hat.event.server.adminer_server import create_adminer_server 14from hat.event.server.engine_runner import EngineRunner 15from hat.event.server.eventer_client_runner import EventerClientRunner 16from hat.event.server.eventer_server import create_eventer_server 17 18 19mlog: logging.Logger = logging.getLogger(__name__) 20"""Module logger""" 21 22 23class MainRunner(aio.Resource): 24 25 def __init__(self, 26 conf: json.Data, 27 reset_monitor_ready_timeout: float = 30): 28 self._conf = conf 29 self._reset_monitor_ready_timeout = reset_monitor_ready_timeout 30 self._loop = asyncio.get_running_loop() 31 self._async_group = aio.Group() 32 self._backend = None 33 self._eventer_server = None 34 self._adminer_server = None 35 self._monitor_component = None 36 self._eventer_client_runner = None 37 self._engine_runner = None 38 self._monitor_state_cbs = util.CallbackRegistry() 39 self._reset_monitor_ready = asyncio.Event() 40 41 self.async_group.spawn(self._run) 42 43 @property 44 def async_group(self) -> aio.Group: 45 return self._async_group 46 47 async def _run(self): 48 try: 49 mlog.debug("starting main runner loop") 50 await self._start() 51 52 if not self._monitor_component: 53 await self._loop.create_future() 54 return 55 56 await self._monitor_component.set_ready(True) 57 58 while True: 59 self._reset_monitor_ready.clear() 60 await self._reset_monitor_ready.wait() 61 62 await self._monitor_component.set_ready(False) 63 64 with contextlib.suppress(asyncio.TimeoutError): 65 await aio.wait_for( 66 self._wait_while_monitor_blessing_res_token(), 67 self._reset_monitor_ready_timeout) 68 69 await self._monitor_component.set_ready(True) 70 71 except Exception as e: 72 mlog.error("main runner loop error: %s", e, exc_info=e) 73 74 finally: 75 mlog.debug("closing main runner loop") 76 self.close() 77 await aio.uncancellable(self._stop()) 78 79 async def _start(self): 80 mlog.debug("creating backend") 81 backend_conf = self._conf['backend'] 82 backend_info = common.import_backend_info(backend_conf['module']) 83 self._backend = await aio.call( 84 backend_info.create, backend_conf, 85 functools.partial(self._on_backend_events, False), 86 functools.partial(self._on_backend_events, True)) 87 _bind_resource(self.async_group, self._backend) 88 89 mlog.debug("creating eventer server") 90 self._eventer_server = await create_eventer_server( 91 addr=tcp.Address(self._conf['eventer_server']['host'], 92 self._conf['eventer_server']['port']), 93 backend=self._backend, 94 server_id=self._conf['server_id'], 95 server_token=self._conf.get('server_token')) 96 _bind_resource(self.async_group, self._eventer_server) 97 98 if 'adminer_server' in self._conf: 99 mlog.debug("creating adminer server") 100 self._adminer_server = await create_adminer_server( 101 addr=tcp.Address(self._conf['adminer_server']['host'], 102 self._conf['adminer_server']['port']), 103 log_conf=self._conf.get('log')) 104 _bind_resource(self.async_group, self._adminer_server) 105 106 if 'monitor_component' in self._conf: 107 mlog.debug("creating eventer client runner") 108 self._eventer_client_runner = EventerClientRunner( 109 conf=self._conf, 110 backend=self._backend, 111 synced_cb=self._on_eventer_client_synced) 112 _bind_resource(self.async_group, self._eventer_client_runner) 113 114 handle = self._monitor_state_cbs.register( 115 self._eventer_client_runner.set_monitor_state) 116 self._eventer_client_runner.async_group.spawn( 117 aio.call_on_cancel, handle.cancel) 118 119 mlog.debug("creating monitor component") 120 self._monitor_component = await hat.monitor.component.connect( 121 addr=tcp.Address(self._conf['monitor_component']['host'], 122 self._conf['monitor_component']['port']), 123 name=self._conf['name'], 124 group=self._conf['monitor_component']['group'], 125 runner_cb=self._create_monitor_runner, 126 data={'server_id': self._conf['server_id'], 127 'eventer_server': self._conf['eventer_server'], 128 'server_token': self._conf.get('server_token')}, 129 state_cb=self._on_monitor_state, 130 close_req_cb=self._on_monitor_close_req) 131 _bind_resource(self.async_group, self._monitor_component) 132 133 self._eventer_client_runner.set_monitor_state( 134 self._monitor_component.state) 135 136 else: 137 mlog.debug("creating engine runner") 138 self._engine_runner = EngineRunner( 139 conf=self._conf, 140 backend=self._backend, 141 eventer_server=self._eventer_server, 142 eventer_client_runner=None, 143 reset_monitor_ready_cb=self._reset_monitor_ready.set) 144 _bind_resource(self.async_group, self._engine_runner) 145 146 async def _stop(self): 147 if self._engine_runner and not self._monitor_component: 148 await self._engine_runner.async_close() 149 150 if self._eventer_client_runner: 151 await self._eventer_client_runner.async_close() 152 153 if self._monitor_component: 154 await self._monitor_component.async_close() 155 156 if self._adminer_server: 157 await self._adminer_server.async_close() 158 159 if self._eventer_server: 160 with contextlib.suppress(Exception): 161 await self._backend.flush() 162 163 await self._eventer_server.async_close() 164 165 if self._backend: 166 await self._backend.async_close() 167 168 def _create_monitor_runner(self, monitor_component): 169 mlog.debug("creating engine runner") 170 self._engine_runner = EngineRunner( 171 conf=self._conf, 172 backend=self._backend, 173 eventer_server=self._eventer_server, 174 eventer_client_runner=self._eventer_client_runner, 175 reset_monitor_ready_cb=self._reset_monitor_ready.set) 176 return self._engine_runner 177 178 def _on_monitor_state(self, monitor_component, state): 179 self._monitor_state_cbs.notify(state) 180 181 async def _on_monitor_close_req(self, monitor_component): 182 if not self._engine_runner: 183 return 184 185 await self._engine_runner.async_close() 186 187 async def _on_backend_events(self, persisted, events): 188 if not self._eventer_server: 189 return 190 191 await self._eventer_server.notify_events(events, persisted) 192 193 async def _on_eventer_client_synced(self, server_id, state, count): 194 if not self._engine_runner: 195 return 196 197 await self._engine_runner.set_synced(server_id, state, count) 198 199 async def _wait_while_monitor_blessing_res_token(self): 200 if not self._monitor_component: 201 return 202 203 while (self._monitor_component.state.info and 204 self._monitor_component.state.info.blessing_res.token): 205 event = asyncio.Event() 206 with self._monitor_state_cbs.register(lambda _: event.set()): 207 await event.wait() 208 209 210def _bind_resource(async_group, resource): 211 async_group.spawn(aio.call_on_done, resource.wait_closing(), 212 async_group.close)
Module logger
class
MainRunner(hat.aio.group.Resource):
24class MainRunner(aio.Resource): 25 26 def __init__(self, 27 conf: json.Data, 28 reset_monitor_ready_timeout: float = 30): 29 self._conf = conf 30 self._reset_monitor_ready_timeout = reset_monitor_ready_timeout 31 self._loop = asyncio.get_running_loop() 32 self._async_group = aio.Group() 33 self._backend = None 34 self._eventer_server = None 35 self._adminer_server = None 36 self._monitor_component = None 37 self._eventer_client_runner = None 38 self._engine_runner = None 39 self._monitor_state_cbs = util.CallbackRegistry() 40 self._reset_monitor_ready = asyncio.Event() 41 42 self.async_group.spawn(self._run) 43 44 @property 45 def async_group(self) -> aio.Group: 46 return self._async_group 47 48 async def _run(self): 49 try: 50 mlog.debug("starting main runner loop") 51 await self._start() 52 53 if not self._monitor_component: 54 await self._loop.create_future() 55 return 56 57 await self._monitor_component.set_ready(True) 58 59 while True: 60 self._reset_monitor_ready.clear() 61 await self._reset_monitor_ready.wait() 62 63 await self._monitor_component.set_ready(False) 64 65 with contextlib.suppress(asyncio.TimeoutError): 66 await aio.wait_for( 67 self._wait_while_monitor_blessing_res_token(), 68 self._reset_monitor_ready_timeout) 69 70 await self._monitor_component.set_ready(True) 71 72 except Exception as e: 73 mlog.error("main runner loop error: %s", e, exc_info=e) 74 75 finally: 76 mlog.debug("closing main runner loop") 77 self.close() 78 await aio.uncancellable(self._stop()) 79 80 async def _start(self): 81 mlog.debug("creating backend") 82 backend_conf = self._conf['backend'] 83 backend_info = common.import_backend_info(backend_conf['module']) 84 self._backend = await aio.call( 85 backend_info.create, backend_conf, 86 functools.partial(self._on_backend_events, False), 87 functools.partial(self._on_backend_events, True)) 88 _bind_resource(self.async_group, self._backend) 89 90 mlog.debug("creating eventer server") 91 self._eventer_server = await create_eventer_server( 92 addr=tcp.Address(self._conf['eventer_server']['host'], 93 self._conf['eventer_server']['port']), 94 backend=self._backend, 95 server_id=self._conf['server_id'], 96 server_token=self._conf.get('server_token')) 97 _bind_resource(self.async_group, self._eventer_server) 98 99 if 'adminer_server' in self._conf: 100 mlog.debug("creating adminer server") 101 self._adminer_server = await create_adminer_server( 102 addr=tcp.Address(self._conf['adminer_server']['host'], 103 self._conf['adminer_server']['port']), 104 log_conf=self._conf.get('log')) 105 _bind_resource(self.async_group, self._adminer_server) 106 107 if 'monitor_component' in self._conf: 108 mlog.debug("creating eventer client runner") 109 self._eventer_client_runner = EventerClientRunner( 110 conf=self._conf, 111 backend=self._backend, 112 synced_cb=self._on_eventer_client_synced) 113 _bind_resource(self.async_group, self._eventer_client_runner) 114 115 handle = self._monitor_state_cbs.register( 116 self._eventer_client_runner.set_monitor_state) 117 self._eventer_client_runner.async_group.spawn( 118 aio.call_on_cancel, handle.cancel) 119 120 mlog.debug("creating monitor component") 121 self._monitor_component = await hat.monitor.component.connect( 122 addr=tcp.Address(self._conf['monitor_component']['host'], 123 self._conf['monitor_component']['port']), 124 name=self._conf['name'], 125 group=self._conf['monitor_component']['group'], 126 runner_cb=self._create_monitor_runner, 127 data={'server_id': self._conf['server_id'], 128 'eventer_server': self._conf['eventer_server'], 129 'server_token': self._conf.get('server_token')}, 130 state_cb=self._on_monitor_state, 131 close_req_cb=self._on_monitor_close_req) 132 _bind_resource(self.async_group, self._monitor_component) 133 134 self._eventer_client_runner.set_monitor_state( 135 self._monitor_component.state) 136 137 else: 138 mlog.debug("creating engine runner") 139 self._engine_runner = EngineRunner( 140 conf=self._conf, 141 backend=self._backend, 142 eventer_server=self._eventer_server, 143 eventer_client_runner=None, 144 reset_monitor_ready_cb=self._reset_monitor_ready.set) 145 _bind_resource(self.async_group, self._engine_runner) 146 147 async def _stop(self): 148 if self._engine_runner and not self._monitor_component: 149 await self._engine_runner.async_close() 150 151 if self._eventer_client_runner: 152 await self._eventer_client_runner.async_close() 153 154 if self._monitor_component: 155 await self._monitor_component.async_close() 156 157 if self._adminer_server: 158 await self._adminer_server.async_close() 159 160 if self._eventer_server: 161 with contextlib.suppress(Exception): 162 await self._backend.flush() 163 164 await self._eventer_server.async_close() 165 166 if self._backend: 167 await self._backend.async_close() 168 169 def _create_monitor_runner(self, monitor_component): 170 mlog.debug("creating engine runner") 171 self._engine_runner = EngineRunner( 172 conf=self._conf, 173 backend=self._backend, 174 eventer_server=self._eventer_server, 175 eventer_client_runner=self._eventer_client_runner, 176 reset_monitor_ready_cb=self._reset_monitor_ready.set) 177 return self._engine_runner 178 179 def _on_monitor_state(self, monitor_component, state): 180 self._monitor_state_cbs.notify(state) 181 182 async def _on_monitor_close_req(self, monitor_component): 183 if not self._engine_runner: 184 return 185 186 await self._engine_runner.async_close() 187 188 async def _on_backend_events(self, persisted, events): 189 if not self._eventer_server: 190 return 191 192 await self._eventer_server.notify_events(events, persisted) 193 194 async def _on_eventer_client_synced(self, server_id, state, count): 195 if not self._engine_runner: 196 return 197 198 await self._engine_runner.set_synced(server_id, state, count) 199 200 async def _wait_while_monitor_blessing_res_token(self): 201 if not self._monitor_component: 202 return 203 204 while (self._monitor_component.state.info and 205 self._monitor_component.state.info.blessing_res.token): 206 event = asyncio.Event() 207 with self._monitor_state_cbs.register(lambda _: event.set()): 208 await event.wait()
Resource with lifetime control based on Group
.
MainRunner( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], reset_monitor_ready_timeout: float = 30)
26 def __init__(self, 27 conf: json.Data, 28 reset_monitor_ready_timeout: float = 30): 29 self._conf = conf 30 self._reset_monitor_ready_timeout = reset_monitor_ready_timeout 31 self._loop = asyncio.get_running_loop() 32 self._async_group = aio.Group() 33 self._backend = None 34 self._eventer_server = None 35 self._adminer_server = None 36 self._monitor_component = None 37 self._eventer_client_runner = None 38 self._engine_runner = None 39 self._monitor_state_cbs = util.CallbackRegistry() 40 self._reset_monitor_ready = asyncio.Event() 41 42 self.async_group.spawn(self._run)