hat.event.server.runner
1import asyncio 2import contextlib 3import logging 4import types 5import typing 6 7from hat import aio 8from hat import json 9from hat.drivers import tcp 10import hat.monitor.component 11 12from hat.event import common 13from hat.event.server.engine import create_engine 14from hat.event.server.eventer_client import create_eventer_client 15from hat.event.server.eventer_server import (create_eventer_server, 16 EventerServer) 17 18 19mlog: logging.Logger = logging.getLogger(__name__) 20"""Module logger""" 21 22 23class EventerServerData(typing.NamedTuple): 24 server_id: common.ServerId 25 addr: tcp.Address 26 27 28class MainRunner(aio.Resource): 29 30 def __init__(self, conf: json.Data): 31 self._conf = conf 32 self._loop = asyncio.get_running_loop() 33 self._async_group = aio.Group() 34 self._backend = None 35 self._eventer_server = None 36 self._monitor_component = None 37 self._eventer_client_runner = None 38 self._engine_runner = None 39 40 self.async_group.spawn(self._run) 41 42 @property 43 def async_group(self) -> aio.Group: 44 return self._async_group 45 46 async def _run(self): 47 try: 48 await self._start() 49 await self._loop.create_future() 50 51 except Exception as e: 52 mlog.error("main runner loop error: %s", e, exc_info=e) 53 54 finally: 55 self.close() 56 await aio.uncancellable(self._stop()) 57 58 async def _start(self): 59 backend_conf = self._conf['backend'] 60 backend_info = common.import_backend_info(backend_conf['module']) 61 self._backend = await aio.call(backend_info.create, backend_conf, 62 self._on_backend_registered_events, 63 self._on_backend_flushed_events) 64 _bind_resource(self.async_group, self._backend) 65 66 self._eventer_server = await create_eventer_server( 67 addr=tcp.Address(self._conf['eventer_server']['host'], 68 self._conf['eventer_server']['port']), 69 backend=self._backend, 70 server_id=self._conf['server_id'], 71 server_token=self._conf.get('server_token')) 72 _bind_resource(self.async_group, self._eventer_server) 73 74 if 'monitor_component' in self._conf: 75 self._monitor_component = await hat.monitor.component.connect( 76 addr=tcp.Address(self._conf['monitor_component']['host'], 77 self._conf['monitor_component']['port']), 78 name=self._conf['monitor_component']['name'], 79 group=self._conf['monitor_component']['group'], 80 runner_cb=self._create_monitor_runner, 81 data={'server_id': self._conf['server_id'], 82 'eventer_server': self._conf['eventer_server'], 83 'server_token': self._conf.get('server_token')}, 84 state_cb=self._on_monitor_state) 85 _bind_resource(self.async_group, self._monitor_component) 86 87 self._eventer_client_runner = EventerClientRunner( 88 conf=self._conf, 89 backend=self._backend, 90 synced_cb=self._on_eventer_client_synced) 91 _bind_resource(self.async_group, self._eventer_client_runner) 92 93 await self._eventer_client_runner.set_monitor_state( 94 self._monitor_component.state) 95 96 await self._monitor_component.set_ready(True) 97 98 else: 99 self._engine_runner = EngineRunner( 100 conf=self._conf, 101 backend=self._backend, 102 eventer_server=self._eventer_server) 103 _bind_resource(self.async_group, self._engine_runner) 104 105 async def _stop(self): 106 if self._engine_runner and not self._monitor_component: 107 await self._engine_runner.async_close() 108 109 if self._eventer_client_runner: 110 await self._eventer_client_runner.async_close() 111 112 if self._monitor_component: 113 await self._monitor_component.async_close() 114 115 if self._eventer_server: 116 with contextlib.suppress(Exception): 117 await self._backend.flush() 118 119 await self._eventer_server.async_close() 120 121 if self._backend: 122 await self._backend.async_close() 123 124 async def _create_monitor_runner(self, monitor_component): 125 self._engine_runner = EngineRunner(conf=self._conf, 126 backend=self._backend, 127 eventer_server=self._eventer_server) 128 return self._engine_runner 129 130 async def _on_backend_registered_events(self, events): 131 if not self._eventer_server: 132 return 133 134 await self._eventer_server.notify_events(events, False) 135 136 async def _on_backend_flushed_events(self, events): 137 if not self._eventer_server: 138 return 139 140 await self._eventer_server.notify_events(events, True) 141 142 async def _on_monitor_state(self, monitor_component, state): 143 if not self._eventer_client_runner: 144 return 145 146 await self._eventer_client_runner.set_monitor_state(state) 147 148 async def _on_eventer_client_synced(self, server_id, synced, counter): 149 if not self._engine_runner: 150 return 151 152 await self._engine_runner.set_synced(server_id, synced, counter) 153 154 155class EventerClientRunner(aio.Resource): 156 157 def __init__(self, 158 conf: json.Data, 159 backend: common.Backend, 160 synced_cb: aio.AsyncCallable[[common.ServerId, bool, int], 161 None], 162 reconnect_delay: float = 5): 163 self._conf = conf 164 self._backend = backend 165 self._synced_cb = synced_cb 166 self._reconnect_delay = reconnect_delay 167 self._async_group = aio.Group() 168 self._client_subgroups = {} 169 170 @property 171 def async_group(self) -> aio.Group: 172 return self._async_group 173 174 async def set_monitor_state(self, state: hat.monitor.component.State): 175 valid_server_data = set(_get_eventer_server_data( 176 group=self._conf['monitor_component']['group'], 177 server_token=self._conf.get('server_token'), 178 state=state)) 179 180 for server_data in list(self._client_subgroups.keys()): 181 if server_data in valid_server_data: 182 continue 183 184 subgroup = self._client_subgroups.pop(server_data) 185 subgroup.close() 186 187 for server_data in valid_server_data: 188 subgroup = self._client_subgroups.get(server_data) 189 if subgroup and subgroup.is_open: 190 continue 191 192 subgroup = self.async_group.create_subgroup() 193 subgroup.spawn(self._client_loop, subgroup, server_data) 194 self._client_subgroups[server_data] = subgroup 195 196 async def _client_loop(self, async_group, server_data): 197 try: 198 while True: 199 try: 200 eventer_client = await create_eventer_client( 201 addr=server_data.addr, 202 client_name=self._conf['monitor_component']['name'], 203 server_id=server_data.server_id, 204 backend=self._backend, 205 client_token=self._conf.get('server_token'), 206 synced_cb=self._on_synced) 207 208 except Exception: 209 await asyncio.sleep(self._reconnect_delay) 210 continue 211 212 try: 213 await aio.call(self._synced_cb, server_data.server_id, 214 False, 0) 215 216 await eventer_client.wait_closing() 217 218 finally: 219 await aio.uncancellable(eventer_client.async_close()) 220 221 except Exception as e: 222 mlog.error("eventer client runner loop error: %s", e, exc_info=e) 223 self.close() 224 225 finally: 226 async_group.close() 227 228 async def _on_synced(self, server_id, counter): 229 await aio.call(self._synced_cb, server_id, True, counter) 230 231 232class EngineRunner(aio.Resource): 233 234 def __init__(self, 235 conf: json.Data, 236 backend: common.Backend, 237 eventer_server: EventerServer): 238 self._conf = conf 239 self._backend = backend 240 self._eventer_server = eventer_server 241 self._async_group = aio.Group() 242 self._engine = None 243 self._restart = asyncio.Event() 244 245 self.async_group.spawn(self._run) 246 247 @property 248 def async_group(self) -> aio.Group: 249 return self._async_group 250 251 async def set_synced(self, 252 server_id: common.ServerId, 253 synced: bool, 254 counter: int): 255 if self._engine and self._engine.is_open: 256 source = common.Source(type=common.SourceType.SERVER, id=0) 257 event = common.RegisterEvent( 258 type=('event', str(self._conf['server_id']), 'synced', 259 str(server_id)), 260 source_timestamp=None, 261 payload=common.EventPayloadJson(synced)) 262 263 await self._engine.register(source, [event]) 264 265 if synced and counter and self._conf.get('synced_restart_engine'): 266 self._restart.set() 267 268 async def _run(self): 269 try: 270 while True: 271 self._restart.clear() 272 273 self._engine = await create_engine( 274 backend=self._backend, 275 module_confs=self._conf['modules'], 276 server_id=self._conf['server_id']) 277 await self._eventer_server.set_engine(self._engine) 278 279 async with self._async_group.create_subgroup() as subgroup: 280 await asyncio.wait( 281 [subgroup.spawn(self._engine.wait_closing), 282 subgroup.spawn(self._restart.wait)], 283 return_when=asyncio.FIRST_COMPLETED) 284 285 if not self._engine.is_open: 286 break 287 288 await self._close() 289 290 except Exception as e: 291 mlog.error("engine runner loop error: %s", e, exc_info=e) 292 293 finally: 294 self.close() 295 await aio.uncancellable(self._close()) 296 297 async def _close(self): 298 if self._engine: 299 await self._engine.async_close() 300 301 await self._eventer_server.set_engine(None) 302 303 304def _bind_resource(async_group, resource): 305 async_group.spawn(aio.call_on_done, resource.wait_closing(), 306 async_group.close) 307 308 309def _get_eventer_server_data(group, server_token, state): 310 for info in state.components: 311 if info == state.info or info.group != group: 312 continue 313 314 server_id = json.get(info.data, 'server_id') 315 host = json.get(info.data, ['eventer_server', 'host']) 316 port = json.get(info.data, ['eventer_server', 'port']) 317 token = json.get(info.data, 'server_token') 318 if (not isinstance(server_id, int) or 319 not isinstance(host, str) or 320 not isinstance(port, int) or 321 not isinstance(token, (str, types.NoneType))): 322 continue 323 324 if server_token is not None and token != server_token: 325 continue 326 327 yield EventerServerData(server_id=server_id, 328 addr=tcp.Address(host, port))
Module logger
class
EventerServerData(typing.NamedTuple):
EventerServerData(server_id, addr)
EventerServerData(server_id: int, addr: hat.drivers.tcp.Address)
Create new instance of EventerServerData(server_id, addr)
Inherited Members
- builtins.tuple
- index
- count
class
MainRunner(hat.aio.group.Resource):
29class MainRunner(aio.Resource): 30 31 def __init__(self, conf: json.Data): 32 self._conf = conf 33 self._loop = asyncio.get_running_loop() 34 self._async_group = aio.Group() 35 self._backend = None 36 self._eventer_server = None 37 self._monitor_component = None 38 self._eventer_client_runner = None 39 self._engine_runner = None 40 41 self.async_group.spawn(self._run) 42 43 @property 44 def async_group(self) -> aio.Group: 45 return self._async_group 46 47 async def _run(self): 48 try: 49 await self._start() 50 await self._loop.create_future() 51 52 except Exception as e: 53 mlog.error("main runner loop error: %s", e, exc_info=e) 54 55 finally: 56 self.close() 57 await aio.uncancellable(self._stop()) 58 59 async def _start(self): 60 backend_conf = self._conf['backend'] 61 backend_info = common.import_backend_info(backend_conf['module']) 62 self._backend = await aio.call(backend_info.create, backend_conf, 63 self._on_backend_registered_events, 64 self._on_backend_flushed_events) 65 _bind_resource(self.async_group, self._backend) 66 67 self._eventer_server = await create_eventer_server( 68 addr=tcp.Address(self._conf['eventer_server']['host'], 69 self._conf['eventer_server']['port']), 70 backend=self._backend, 71 server_id=self._conf['server_id'], 72 server_token=self._conf.get('server_token')) 73 _bind_resource(self.async_group, self._eventer_server) 74 75 if 'monitor_component' in self._conf: 76 self._monitor_component = await hat.monitor.component.connect( 77 addr=tcp.Address(self._conf['monitor_component']['host'], 78 self._conf['monitor_component']['port']), 79 name=self._conf['monitor_component']['name'], 80 group=self._conf['monitor_component']['group'], 81 runner_cb=self._create_monitor_runner, 82 data={'server_id': self._conf['server_id'], 83 'eventer_server': self._conf['eventer_server'], 84 'server_token': self._conf.get('server_token')}, 85 state_cb=self._on_monitor_state) 86 _bind_resource(self.async_group, self._monitor_component) 87 88 self._eventer_client_runner = EventerClientRunner( 89 conf=self._conf, 90 backend=self._backend, 91 synced_cb=self._on_eventer_client_synced) 92 _bind_resource(self.async_group, self._eventer_client_runner) 93 94 await self._eventer_client_runner.set_monitor_state( 95 self._monitor_component.state) 96 97 await self._monitor_component.set_ready(True) 98 99 else: 100 self._engine_runner = EngineRunner( 101 conf=self._conf, 102 backend=self._backend, 103 eventer_server=self._eventer_server) 104 _bind_resource(self.async_group, self._engine_runner) 105 106 async def _stop(self): 107 if self._engine_runner and not self._monitor_component: 108 await self._engine_runner.async_close() 109 110 if self._eventer_client_runner: 111 await self._eventer_client_runner.async_close() 112 113 if self._monitor_component: 114 await self._monitor_component.async_close() 115 116 if self._eventer_server: 117 with contextlib.suppress(Exception): 118 await self._backend.flush() 119 120 await self._eventer_server.async_close() 121 122 if self._backend: 123 await self._backend.async_close() 124 125 async def _create_monitor_runner(self, monitor_component): 126 self._engine_runner = EngineRunner(conf=self._conf, 127 backend=self._backend, 128 eventer_server=self._eventer_server) 129 return self._engine_runner 130 131 async def _on_backend_registered_events(self, events): 132 if not self._eventer_server: 133 return 134 135 await self._eventer_server.notify_events(events, False) 136 137 async def _on_backend_flushed_events(self, events): 138 if not self._eventer_server: 139 return 140 141 await self._eventer_server.notify_events(events, True) 142 143 async def _on_monitor_state(self, monitor_component, state): 144 if not self._eventer_client_runner: 145 return 146 147 await self._eventer_client_runner.set_monitor_state(state) 148 149 async def _on_eventer_client_synced(self, server_id, synced, counter): 150 if not self._engine_runner: 151 return 152 153 await self._engine_runner.set_synced(server_id, synced, counter)
Resource with lifetime control based on Group
.
MainRunner( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')])
31 def __init__(self, conf: json.Data): 32 self._conf = conf 33 self._loop = asyncio.get_running_loop() 34 self._async_group = aio.Group() 35 self._backend = None 36 self._eventer_server = None 37 self._monitor_component = None 38 self._eventer_client_runner = None 39 self._engine_runner = None 40 41 self.async_group.spawn(self._run)
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close
class
EventerClientRunner(hat.aio.group.Resource):
156class EventerClientRunner(aio.Resource): 157 158 def __init__(self, 159 conf: json.Data, 160 backend: common.Backend, 161 synced_cb: aio.AsyncCallable[[common.ServerId, bool, int], 162 None], 163 reconnect_delay: float = 5): 164 self._conf = conf 165 self._backend = backend 166 self._synced_cb = synced_cb 167 self._reconnect_delay = reconnect_delay 168 self._async_group = aio.Group() 169 self._client_subgroups = {} 170 171 @property 172 def async_group(self) -> aio.Group: 173 return self._async_group 174 175 async def set_monitor_state(self, state: hat.monitor.component.State): 176 valid_server_data = set(_get_eventer_server_data( 177 group=self._conf['monitor_component']['group'], 178 server_token=self._conf.get('server_token'), 179 state=state)) 180 181 for server_data in list(self._client_subgroups.keys()): 182 if server_data in valid_server_data: 183 continue 184 185 subgroup = self._client_subgroups.pop(server_data) 186 subgroup.close() 187 188 for server_data in valid_server_data: 189 subgroup = self._client_subgroups.get(server_data) 190 if subgroup and subgroup.is_open: 191 continue 192 193 subgroup = self.async_group.create_subgroup() 194 subgroup.spawn(self._client_loop, subgroup, server_data) 195 self._client_subgroups[server_data] = subgroup 196 197 async def _client_loop(self, async_group, server_data): 198 try: 199 while True: 200 try: 201 eventer_client = await create_eventer_client( 202 addr=server_data.addr, 203 client_name=self._conf['monitor_component']['name'], 204 server_id=server_data.server_id, 205 backend=self._backend, 206 client_token=self._conf.get('server_token'), 207 synced_cb=self._on_synced) 208 209 except Exception: 210 await asyncio.sleep(self._reconnect_delay) 211 continue 212 213 try: 214 await aio.call(self._synced_cb, server_data.server_id, 215 False, 0) 216 217 await eventer_client.wait_closing() 218 219 finally: 220 await aio.uncancellable(eventer_client.async_close()) 221 222 except Exception as e: 223 mlog.error("eventer client runner loop error: %s", e, exc_info=e) 224 self.close() 225 226 finally: 227 async_group.close() 228 229 async def _on_synced(self, server_id, counter): 230 await aio.call(self._synced_cb, server_id, True, counter)
Resource with lifetime control based on Group
.
EventerClientRunner( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], backend: hat.event.common.backend.Backend, synced_cb: Callable[[int, bool, int], Optional[Awaitable[NoneType]]], reconnect_delay: float = 5)
158 def __init__(self, 159 conf: json.Data, 160 backend: common.Backend, 161 synced_cb: aio.AsyncCallable[[common.ServerId, bool, int], 162 None], 163 reconnect_delay: float = 5): 164 self._conf = conf 165 self._backend = backend 166 self._synced_cb = synced_cb 167 self._reconnect_delay = reconnect_delay 168 self._async_group = aio.Group() 169 self._client_subgroups = {}
async def
set_monitor_state(self, state: hat.monitor.observer.client.State):
175 async def set_monitor_state(self, state: hat.monitor.component.State): 176 valid_server_data = set(_get_eventer_server_data( 177 group=self._conf['monitor_component']['group'], 178 server_token=self._conf.get('server_token'), 179 state=state)) 180 181 for server_data in list(self._client_subgroups.keys()): 182 if server_data in valid_server_data: 183 continue 184 185 subgroup = self._client_subgroups.pop(server_data) 186 subgroup.close() 187 188 for server_data in valid_server_data: 189 subgroup = self._client_subgroups.get(server_data) 190 if subgroup and subgroup.is_open: 191 continue 192 193 subgroup = self.async_group.create_subgroup() 194 subgroup.spawn(self._client_loop, subgroup, server_data) 195 self._client_subgroups[server_data] = subgroup
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close
class
EngineRunner(hat.aio.group.Resource):
233class EngineRunner(aio.Resource): 234 235 def __init__(self, 236 conf: json.Data, 237 backend: common.Backend, 238 eventer_server: EventerServer): 239 self._conf = conf 240 self._backend = backend 241 self._eventer_server = eventer_server 242 self._async_group = aio.Group() 243 self._engine = None 244 self._restart = asyncio.Event() 245 246 self.async_group.spawn(self._run) 247 248 @property 249 def async_group(self) -> aio.Group: 250 return self._async_group 251 252 async def set_synced(self, 253 server_id: common.ServerId, 254 synced: bool, 255 counter: int): 256 if self._engine and self._engine.is_open: 257 source = common.Source(type=common.SourceType.SERVER, id=0) 258 event = common.RegisterEvent( 259 type=('event', str(self._conf['server_id']), 'synced', 260 str(server_id)), 261 source_timestamp=None, 262 payload=common.EventPayloadJson(synced)) 263 264 await self._engine.register(source, [event]) 265 266 if synced and counter and self._conf.get('synced_restart_engine'): 267 self._restart.set() 268 269 async def _run(self): 270 try: 271 while True: 272 self._restart.clear() 273 274 self._engine = await create_engine( 275 backend=self._backend, 276 module_confs=self._conf['modules'], 277 server_id=self._conf['server_id']) 278 await self._eventer_server.set_engine(self._engine) 279 280 async with self._async_group.create_subgroup() as subgroup: 281 await asyncio.wait( 282 [subgroup.spawn(self._engine.wait_closing), 283 subgroup.spawn(self._restart.wait)], 284 return_when=asyncio.FIRST_COMPLETED) 285 286 if not self._engine.is_open: 287 break 288 289 await self._close() 290 291 except Exception as e: 292 mlog.error("engine runner loop error: %s", e, exc_info=e) 293 294 finally: 295 self.close() 296 await aio.uncancellable(self._close()) 297 298 async def _close(self): 299 if self._engine: 300 await self._engine.async_close() 301 302 await self._eventer_server.set_engine(None)
Resource with lifetime control based on Group
.
EngineRunner( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], backend: hat.event.common.backend.Backend, eventer_server: hat.event.server.eventer_server.EventerServer)
235 def __init__(self, 236 conf: json.Data, 237 backend: common.Backend, 238 eventer_server: EventerServer): 239 self._conf = conf 240 self._backend = backend 241 self._eventer_server = eventer_server 242 self._async_group = aio.Group() 243 self._engine = None 244 self._restart = asyncio.Event() 245 246 self.async_group.spawn(self._run)
async def
set_synced(self, server_id: int, synced: bool, counter: int):
252 async def set_synced(self, 253 server_id: common.ServerId, 254 synced: bool, 255 counter: int): 256 if self._engine and self._engine.is_open: 257 source = common.Source(type=common.SourceType.SERVER, id=0) 258 event = common.RegisterEvent( 259 type=('event', str(self._conf['server_id']), 'synced', 260 str(server_id)), 261 source_timestamp=None, 262 payload=common.EventPayloadJson(synced)) 263 264 await self._engine.register(source, [event]) 265 266 if synced and counter and self._conf.get('synced_restart_engine'): 267 self._restart.set()
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close