hat.event.server.engine_runner
1from collections.abc import Callable 2import asyncio 3import logging 4 5from hat import aio 6from hat import json 7 8from hat.event import common 9from hat.event.server.engine import create_engine 10from hat.event.server.eventer_client import SyncedState 11from hat.event.server.eventer_client_runner import EventerClientRunner 12from hat.event.server.eventer_server import EventerServer 13 14 15mlog: logging.Logger = logging.getLogger(__name__) 16"""Module logger""" 17 18 19class EngineRunner(aio.Resource): 20 21 def __init__(self, 22 conf: json.Data, 23 backend: common.Backend, 24 eventer_server: EventerServer, 25 eventer_client_runner: EventerClientRunner | None, 26 reset_monitor_ready_cb: Callable[[], None]): 27 self._conf = conf 28 self._backend = backend 29 self._eventer_server = eventer_server 30 self._eventer_client_runner = eventer_client_runner 31 self._reset_monitor_ready_cb = reset_monitor_ready_cb 32 self._async_group = aio.Group() 33 self._engine = None 34 self._restart = asyncio.Event() 35 36 self.async_group.spawn(self._run) 37 38 @property 39 def async_group(self) -> aio.Group: 40 return self._async_group 41 42 async def set_synced(self, 43 server_id: common.ServerId, 44 state: SyncedState, 45 count: int | None): 46 if not self._engine or not self._engine.is_open: 47 return 48 49 data = {'state': state.name} 50 if state == SyncedState.SYNCED: 51 data['count'] = count 52 53 source = common.Source(type=common.SourceType.SERVER, id=0) 54 event = common.RegisterEvent( 55 type=('event', str(self._conf['server_id']), 'synced', 56 str(server_id)), 57 source_timestamp=None, 58 payload=common.EventPayloadJson(data)) 59 60 await self._engine.register(source, [event]) 61 62 async def _run(self): 63 try: 64 mlog.debug("staring engine runner loop") 65 while True: 66 await self._wait_while_remote_active() 67 68 self._restart.clear() 69 70 await self._eventer_server.set_status(common.Status.STARTING, 71 None) 72 73 mlog.debug("creating engine") 74 self._engine = await create_engine( 75 backend=self._backend, 76 eventer_server=self._eventer_server, 77 module_confs=self._conf['modules'], 78 server_id=self._conf['server_id'], 79 restart_cb=self._restart.set, 80 reset_monitor_ready_cb=self._reset_monitor_ready_cb) 81 await self._eventer_server.set_status( 82 common.Status.OPERATIONAL, self._engine) 83 84 async with self._async_group.create_subgroup() as subgroup: 85 await asyncio.wait( 86 [subgroup.spawn(self._engine.wait_closing), 87 subgroup.spawn(self._restart.wait)], 88 return_when=asyncio.FIRST_COMPLETED) 89 90 if not self._engine.is_open: 91 break 92 93 await self._close() 94 95 except Exception as e: 96 mlog.error("engine runner loop error: %s", e, exc_info=e) 97 98 finally: 99 mlog.debug("closing engine runner loop") 100 self.close() 101 await aio.uncancellable(self._close()) 102 103 async def _close(self): 104 if self._engine: 105 self._engine.close() 106 107 await self._eventer_server.set_status(common.Status.STOPPING, None) 108 109 if self._engine: 110 await self._engine.async_close() 111 112 await self._backend.flush() 113 114 # TODO not needed with _wait_while_remote_active 115 # await self._eventer_server.notify_events([], True, True) 116 117 await self._eventer_server.set_status(common.Status.STANDBY, None) 118 119 async def _wait_while_remote_active(self): 120 if not self._eventer_client_runner: 121 return 122 123 while self._eventer_client_runner.remote_active: 124 event = asyncio.Event() 125 with self._eventer_client_runner.register_remote_active_cb( 126 lambda _: event.set()): 127 await event.wait()
Module logger
class
EngineRunner(hat.aio.group.Resource):
20class EngineRunner(aio.Resource): 21 22 def __init__(self, 23 conf: json.Data, 24 backend: common.Backend, 25 eventer_server: EventerServer, 26 eventer_client_runner: EventerClientRunner | None, 27 reset_monitor_ready_cb: Callable[[], None]): 28 self._conf = conf 29 self._backend = backend 30 self._eventer_server = eventer_server 31 self._eventer_client_runner = eventer_client_runner 32 self._reset_monitor_ready_cb = reset_monitor_ready_cb 33 self._async_group = aio.Group() 34 self._engine = None 35 self._restart = asyncio.Event() 36 37 self.async_group.spawn(self._run) 38 39 @property 40 def async_group(self) -> aio.Group: 41 return self._async_group 42 43 async def set_synced(self, 44 server_id: common.ServerId, 45 state: SyncedState, 46 count: int | None): 47 if not self._engine or not self._engine.is_open: 48 return 49 50 data = {'state': state.name} 51 if state == SyncedState.SYNCED: 52 data['count'] = count 53 54 source = common.Source(type=common.SourceType.SERVER, id=0) 55 event = common.RegisterEvent( 56 type=('event', str(self._conf['server_id']), 'synced', 57 str(server_id)), 58 source_timestamp=None, 59 payload=common.EventPayloadJson(data)) 60 61 await self._engine.register(source, [event]) 62 63 async def _run(self): 64 try: 65 mlog.debug("staring engine runner loop") 66 while True: 67 await self._wait_while_remote_active() 68 69 self._restart.clear() 70 71 await self._eventer_server.set_status(common.Status.STARTING, 72 None) 73 74 mlog.debug("creating engine") 75 self._engine = await create_engine( 76 backend=self._backend, 77 eventer_server=self._eventer_server, 78 module_confs=self._conf['modules'], 79 server_id=self._conf['server_id'], 80 restart_cb=self._restart.set, 81 reset_monitor_ready_cb=self._reset_monitor_ready_cb) 82 await self._eventer_server.set_status( 83 common.Status.OPERATIONAL, self._engine) 84 85 async with self._async_group.create_subgroup() as subgroup: 86 await asyncio.wait( 87 [subgroup.spawn(self._engine.wait_closing), 88 subgroup.spawn(self._restart.wait)], 89 return_when=asyncio.FIRST_COMPLETED) 90 91 if not self._engine.is_open: 92 break 93 94 await self._close() 95 96 except Exception as e: 97 mlog.error("engine runner loop error: %s", e, exc_info=e) 98 99 finally: 100 mlog.debug("closing engine runner loop") 101 self.close() 102 await aio.uncancellable(self._close()) 103 104 async def _close(self): 105 if self._engine: 106 self._engine.close() 107 108 await self._eventer_server.set_status(common.Status.STOPPING, None) 109 110 if self._engine: 111 await self._engine.async_close() 112 113 await self._backend.flush() 114 115 # TODO not needed with _wait_while_remote_active 116 # await self._eventer_server.notify_events([], True, True) 117 118 await self._eventer_server.set_status(common.Status.STANDBY, None) 119 120 async def _wait_while_remote_active(self): 121 if not self._eventer_client_runner: 122 return 123 124 while self._eventer_client_runner.remote_active: 125 event = asyncio.Event() 126 with self._eventer_client_runner.register_remote_active_cb( 127 lambda _: event.set()): 128 await event.wait()
Resource with lifetime control based on Group
.
EngineRunner( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], backend: hat.event.common.Backend, eventer_server: hat.event.server.eventer_server.EventerServer, eventer_client_runner: hat.event.server.eventer_client_runner.EventerClientRunner | None, reset_monitor_ready_cb: Callable[[], None])
22 def __init__(self, 23 conf: json.Data, 24 backend: common.Backend, 25 eventer_server: EventerServer, 26 eventer_client_runner: EventerClientRunner | None, 27 reset_monitor_ready_cb: Callable[[], None]): 28 self._conf = conf 29 self._backend = backend 30 self._eventer_server = eventer_server 31 self._eventer_client_runner = eventer_client_runner 32 self._reset_monitor_ready_cb = reset_monitor_ready_cb 33 self._async_group = aio.Group() 34 self._engine = None 35 self._restart = asyncio.Event() 36 37 self.async_group.spawn(self._run)
async def
set_synced( self, server_id: int, state: hat.event.server.eventer_client.SyncedState, count: int | None):
43 async def set_synced(self, 44 server_id: common.ServerId, 45 state: SyncedState, 46 count: int | None): 47 if not self._engine or not self._engine.is_open: 48 return 49 50 data = {'state': state.name} 51 if state == SyncedState.SYNCED: 52 data['count'] = count 53 54 source = common.Source(type=common.SourceType.SERVER, id=0) 55 event = common.RegisterEvent( 56 type=('event', str(self._conf['server_id']), 'synced', 57 str(server_id)), 58 source_timestamp=None, 59 payload=common.EventPayloadJson(data)) 60 61 await self._engine.register(source, [event])