hat.event.server.engine_runner

  1from collections.abc import Callable
  2import asyncio
  3import logging
  4
  5from hat import aio
  6from hat import json
  7
  8from hat.event import common
  9from hat.event.server.engine import create_engine
 10from hat.event.server.eventer_client import SyncedState
 11from hat.event.server.eventer_client_runner import EventerClientRunner
 12from hat.event.server.eventer_server import EventerServer
 13
 14
 15mlog: logging.Logger = logging.getLogger(__name__)
 16"""Module logger"""
 17
 18
 19class EngineRunner(aio.Resource):
 20
 21    def __init__(self,
 22                 conf: json.Data,
 23                 backend: common.Backend,
 24                 eventer_server: EventerServer,
 25                 eventer_client_runner: EventerClientRunner | None,
 26                 reset_monitor_ready_cb: Callable[[], None]):
 27        self._conf = conf
 28        self._backend = backend
 29        self._eventer_server = eventer_server
 30        self._eventer_client_runner = eventer_client_runner
 31        self._reset_monitor_ready_cb = reset_monitor_ready_cb
 32        self._async_group = aio.Group()
 33        self._engine = None
 34        self._restart = asyncio.Event()
 35
 36        self.async_group.spawn(self._run)
 37
 38    @property
 39    def async_group(self) -> aio.Group:
 40        return self._async_group
 41
 42    async def set_synced(self,
 43                         server_id: common.ServerId,
 44                         state: SyncedState,
 45                         count: int | None):
 46        if not self._engine or not self._engine.is_open:
 47            return
 48
 49        data = {'state': state.name}
 50        if state == SyncedState.SYNCED:
 51            data['count'] = count
 52
 53        source = common.Source(type=common.SourceType.SERVER, id=0)
 54        event = common.RegisterEvent(
 55            type=('event', str(self._conf['server_id']), 'synced',
 56                  str(server_id)),
 57            source_timestamp=None,
 58            payload=common.EventPayloadJson(data))
 59
 60        await self._engine.register(source, [event])
 61
 62    async def _run(self):
 63        try:
 64            mlog.debug("staring engine runner loop")
 65            while True:
 66                await self._wait_while_remote_active()
 67
 68                self._restart.clear()
 69
 70                await self._eventer_server.set_status(common.Status.STARTING,
 71                                                      None)
 72
 73                mlog.debug("creating engine")
 74                self._engine = await create_engine(
 75                    backend=self._backend,
 76                    eventer_server=self._eventer_server,
 77                    module_confs=self._conf['modules'],
 78                    server_id=self._conf['server_id'],
 79                    restart_cb=self._restart.set,
 80                    reset_monitor_ready_cb=self._reset_monitor_ready_cb)
 81                await self._eventer_server.set_status(
 82                    common.Status.OPERATIONAL, self._engine)
 83
 84                async with self._async_group.create_subgroup() as subgroup:
 85                    await asyncio.wait(
 86                        [subgroup.spawn(self._engine.wait_closing),
 87                         subgroup.spawn(self._restart.wait)],
 88                        return_when=asyncio.FIRST_COMPLETED)
 89
 90                if not self._engine.is_open:
 91                    break
 92
 93                await self._close()
 94
 95        except Exception as e:
 96            mlog.error("engine runner loop error: %s", e, exc_info=e)
 97
 98        finally:
 99            mlog.debug("closing engine runner loop")
100            self.close()
101            await aio.uncancellable(self._close())
102
103    async def _close(self):
104        if self._engine:
105            self._engine.close()
106
107        await self._eventer_server.set_status(common.Status.STOPPING, None)
108
109        if self._engine:
110            await self._engine.async_close()
111
112        await self._backend.flush()
113
114        # TODO not needed with _wait_while_remote_active
115        # await self._eventer_server.notify_events([], True, True)
116
117        await self._eventer_server.set_status(common.Status.STANDBY, None)
118
119    async def _wait_while_remote_active(self):
120        if not self._eventer_client_runner:
121            return
122
123        while self._eventer_client_runner.remote_active:
124            event = asyncio.Event()
125            with self._eventer_client_runner.register_remote_active_cb(
126                    lambda _: event.set()):
127                await event.wait()
mlog: logging.Logger = <Logger hat.event.server.engine_runner (WARNING)>

Module logger

class EngineRunner(hat.aio.group.Resource):
 20class EngineRunner(aio.Resource):
 21
 22    def __init__(self,
 23                 conf: json.Data,
 24                 backend: common.Backend,
 25                 eventer_server: EventerServer,
 26                 eventer_client_runner: EventerClientRunner | None,
 27                 reset_monitor_ready_cb: Callable[[], None]):
 28        self._conf = conf
 29        self._backend = backend
 30        self._eventer_server = eventer_server
 31        self._eventer_client_runner = eventer_client_runner
 32        self._reset_monitor_ready_cb = reset_monitor_ready_cb
 33        self._async_group = aio.Group()
 34        self._engine = None
 35        self._restart = asyncio.Event()
 36
 37        self.async_group.spawn(self._run)
 38
 39    @property
 40    def async_group(self) -> aio.Group:
 41        return self._async_group
 42
 43    async def set_synced(self,
 44                         server_id: common.ServerId,
 45                         state: SyncedState,
 46                         count: int | None):
 47        if not self._engine or not self._engine.is_open:
 48            return
 49
 50        data = {'state': state.name}
 51        if state == SyncedState.SYNCED:
 52            data['count'] = count
 53
 54        source = common.Source(type=common.SourceType.SERVER, id=0)
 55        event = common.RegisterEvent(
 56            type=('event', str(self._conf['server_id']), 'synced',
 57                  str(server_id)),
 58            source_timestamp=None,
 59            payload=common.EventPayloadJson(data))
 60
 61        await self._engine.register(source, [event])
 62
 63    async def _run(self):
 64        try:
 65            mlog.debug("staring engine runner loop")
 66            while True:
 67                await self._wait_while_remote_active()
 68
 69                self._restart.clear()
 70
 71                await self._eventer_server.set_status(common.Status.STARTING,
 72                                                      None)
 73
 74                mlog.debug("creating engine")
 75                self._engine = await create_engine(
 76                    backend=self._backend,
 77                    eventer_server=self._eventer_server,
 78                    module_confs=self._conf['modules'],
 79                    server_id=self._conf['server_id'],
 80                    restart_cb=self._restart.set,
 81                    reset_monitor_ready_cb=self._reset_monitor_ready_cb)
 82                await self._eventer_server.set_status(
 83                    common.Status.OPERATIONAL, self._engine)
 84
 85                async with self._async_group.create_subgroup() as subgroup:
 86                    await asyncio.wait(
 87                        [subgroup.spawn(self._engine.wait_closing),
 88                         subgroup.spawn(self._restart.wait)],
 89                        return_when=asyncio.FIRST_COMPLETED)
 90
 91                if not self._engine.is_open:
 92                    break
 93
 94                await self._close()
 95
 96        except Exception as e:
 97            mlog.error("engine runner loop error: %s", e, exc_info=e)
 98
 99        finally:
100            mlog.debug("closing engine runner loop")
101            self.close()
102            await aio.uncancellable(self._close())
103
104    async def _close(self):
105        if self._engine:
106            self._engine.close()
107
108        await self._eventer_server.set_status(common.Status.STOPPING, None)
109
110        if self._engine:
111            await self._engine.async_close()
112
113        await self._backend.flush()
114
115        # TODO not needed with _wait_while_remote_active
116        # await self._eventer_server.notify_events([], True, True)
117
118        await self._eventer_server.set_status(common.Status.STANDBY, None)
119
120    async def _wait_while_remote_active(self):
121        if not self._eventer_client_runner:
122            return
123
124        while self._eventer_client_runner.remote_active:
125            event = asyncio.Event()
126            with self._eventer_client_runner.register_remote_active_cb(
127                    lambda _: event.set()):
128                await event.wait()

Resource with lifetime control based on Group.

EngineRunner( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], backend: hat.event.common.Backend, eventer_server: hat.event.server.eventer_server.EventerServer, eventer_client_runner: hat.event.server.eventer_client_runner.EventerClientRunner | None, reset_monitor_ready_cb: Callable[[], None])
22    def __init__(self,
23                 conf: json.Data,
24                 backend: common.Backend,
25                 eventer_server: EventerServer,
26                 eventer_client_runner: EventerClientRunner | None,
27                 reset_monitor_ready_cb: Callable[[], None]):
28        self._conf = conf
29        self._backend = backend
30        self._eventer_server = eventer_server
31        self._eventer_client_runner = eventer_client_runner
32        self._reset_monitor_ready_cb = reset_monitor_ready_cb
33        self._async_group = aio.Group()
34        self._engine = None
35        self._restart = asyncio.Event()
36
37        self.async_group.spawn(self._run)
async_group: hat.aio.group.Group
39    @property
40    def async_group(self) -> aio.Group:
41        return self._async_group

Group controlling resource's lifetime.

async def set_synced( self, server_id: int, state: hat.event.server.eventer_client.SyncedState, count: int | None):
43    async def set_synced(self,
44                         server_id: common.ServerId,
45                         state: SyncedState,
46                         count: int | None):
47        if not self._engine or not self._engine.is_open:
48            return
49
50        data = {'state': state.name}
51        if state == SyncedState.SYNCED:
52            data['count'] = count
53
54        source = common.Source(type=common.SourceType.SERVER, id=0)
55        event = common.RegisterEvent(
56            type=('event', str(self._conf['server_id']), 'synced',
57                  str(server_id)),
58            source_timestamp=None,
59            payload=common.EventPayloadJson(data))
60
61        await self._engine.register(source, [event])