hat.event.server.engine

Engine

  1"""Engine"""
  2
  3from collections.abc import Callable, Collection, Iterable
  4import asyncio
  5import collections
  6import logging
  7
  8from hat import aio
  9from hat import json
 10
 11from hat.event import common
 12import hat.event.server.eventer_server
 13
 14
 15mlog: logging.Logger = logging.getLogger(__name__)
 16"""Module logger"""
 17
 18
 19async def create_engine(backend: common.Backend,
 20                        eventer_server: hat.event.server.eventer_server.EventerServer,  # NOQA
 21                        module_confs: Iterable[json.Data],
 22                        server_id: int,
 23                        restart_cb: Callable[[], None],
 24                        reset_monitor_ready_cb: Callable[[], None],
 25                        register_queue_size: int = 1024
 26                        ) -> 'Engine':
 27    """Create engine"""
 28    engine = Engine()
 29    engine._backend = backend
 30    engine._eventer_server = eventer_server
 31    engine._server_id = server_id
 32    engine._restart_cb = restart_cb
 33    engine._reset_monitor_ready_cb = reset_monitor_ready_cb
 34    engine._loop = asyncio.get_running_loop()
 35    engine._async_group = aio.Group()
 36    engine._register_queue = aio.Queue(register_queue_size)
 37    engine._source_modules = collections.deque()
 38
 39    engine._last_event_id = await backend.get_last_event_id(server_id)
 40
 41    future = engine._loop.create_future()
 42    source = common.Source(type=common.SourceType.ENGINE, id=0)
 43    events = [engine._create_status_reg_event('STARTED')]
 44    engine._register_queue.put_nowait((future, source, events))
 45
 46    try:
 47        for source_id, module_conf in enumerate(module_confs):
 48            info = common.import_module_info(module_conf['module'])
 49            source = common.Source(type=common.SourceType.MODULE,
 50                                   id=source_id)
 51
 52            module = await engine.async_group.spawn(
 53                aio.call, info.create, module_conf, engine, source)
 54            engine.async_group.spawn(aio.call_on_cancel, module.async_close)
 55            engine.async_group.spawn(aio.call_on_done, module.wait_closing(),
 56                                     engine.close)
 57
 58            engine._source_modules.append((source, module))
 59
 60        engine.async_group.spawn(engine._register_loop)
 61
 62    except BaseException:
 63        await aio.uncancellable(engine.async_close())
 64        raise
 65
 66    return engine
 67
 68
 69class Engine(common.Engine):
 70
 71    @property
 72    def async_group(self) -> aio.Group:
 73        """Async group"""
 74        return self._async_group
 75
 76    @property
 77    def server_id(self) -> int:
 78        return self._server_id
 79
 80    async def register(self,
 81                       source: common.Source,
 82                       events: Collection[common.RegisterEvent]
 83                       ) -> Collection[common.Event] | None:
 84        if not events:
 85            return []
 86
 87        future = self._loop.create_future()
 88
 89        try:
 90            await self._register_queue.put((future, source, events))
 91
 92        except aio.QueueClosedError:
 93            raise Exception('engine closed')
 94
 95        return await future
 96
 97    async def query(self,
 98                    params: common.QueryParams
 99                    ) -> common.QueryResult:
100        return await self._backend.query(params)
101
102    def get_client_names(self) -> Iterable[tuple[common.Source, str]]:
103        return self._eventer_server.get_client_names()
104
105    def restart(self):
106        self._restart_cb()
107
108    def reset_monitor_ready(self):
109        self._reset_monitor_ready_cb()
110
111    async def _register_loop(self):
112        future = None
113        mlog.debug("starting register loop")
114
115        try:
116            while True:
117                mlog.debug("waiting for register requests")
118                future, source, register_events = \
119                    await self._register_queue.get()
120
121                mlog.debug("processing session")
122                events = await self._process_sessions(source, register_events)
123
124                mlog.debug("registering to backend")
125                events = await self._backend.register(events)
126
127                if future.done():
128                    continue
129
130                result = (
131                    list(event for event, _ in zip(events, register_events))
132                    if events is not None else None)
133                future.set_result(result)
134
135        except Exception as e:
136            mlog.error("register loop error: %s", e, exc_info=e)
137
138        finally:
139            mlog.debug("register loop closed")
140            self.close()
141            self._register_queue.close()
142
143            while True:
144                if future and not future.done():
145                    future.set_exception(Exception('engine closed'))
146                if self._register_queue.empty():
147                    break
148                future, _, __ = self._register_queue.get_nowait()
149
150            timestamp = self._create_session()
151            status_reg_event = self._create_status_reg_event('STOPPED')
152            events = [self._create_event(timestamp, status_reg_event)]
153            await self._backend.register(events)
154
155    async def _process_sessions(self, source, register_events):
156        timestamp = self._create_session()
157
158        for _, module in self._source_modules:
159            await aio.call(module.on_session_start,
160                           self._last_event_id.session)
161
162        events = collections.deque(
163            self._create_event(timestamp, register_event)
164            for register_event in register_events)
165
166        input_source_events = [(source, event) for event in events]
167        while input_source_events:
168            output_source_events = collections.deque()
169
170            for output_source, module in self._source_modules:
171                for input_source, input_event in input_source_events:
172                    if not module.subscription.matches(input_event.type):
173                        continue
174
175                    output_register_events = await aio.call(
176                        module.process, input_source, input_event)
177
178                    if not output_register_events:
179                        continue
180
181                    for output_register_event in output_register_events:
182                        output_event = self._create_event(
183                            timestamp, output_register_event)
184                        output_source_events.append(
185                            (output_source, output_event))
186                        events.append(output_event)
187
188            input_source_events = output_source_events
189
190        for _, module in self._source_modules:
191            await aio.call(module.on_session_stop, self._last_event_id.session)
192
193        return events
194
195    def _create_status_reg_event(self, status):
196        return common.RegisterEvent(
197            type=('event', str(self._server_id), 'engine'),
198            source_timestamp=None,
199            payload=common.EventPayloadJson(status))
200
201    def _create_session(self):
202        self._last_event_id = self._last_event_id._replace(
203            session=self._last_event_id.session + 1,
204            instance=0)
205
206        return common.now()
207
208    def _create_event(self, timestamp, register_event):
209        self._last_event_id = self._last_event_id._replace(
210            instance=self._last_event_id.instance + 1)
211
212        return common.Event(id=self._last_event_id,
213                            type=register_event.type,
214                            timestamp=timestamp,
215                            source_timestamp=register_event.source_timestamp,
216                            payload=register_event.payload)
mlog: logging.Logger = <Logger hat.event.server.engine (WARNING)>

Module logger

async def create_engine( backend: hat.event.common.Backend, eventer_server: hat.event.server.eventer_server.EventerServer, module_confs: Iterable[typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]], server_id: int, restart_cb: Callable[[], None], reset_monitor_ready_cb: Callable[[], None], register_queue_size: int = 1024) -> Engine:
20async def create_engine(backend: common.Backend,
21                        eventer_server: hat.event.server.eventer_server.EventerServer,  # NOQA
22                        module_confs: Iterable[json.Data],
23                        server_id: int,
24                        restart_cb: Callable[[], None],
25                        reset_monitor_ready_cb: Callable[[], None],
26                        register_queue_size: int = 1024
27                        ) -> 'Engine':
28    """Create engine"""
29    engine = Engine()
30    engine._backend = backend
31    engine._eventer_server = eventer_server
32    engine._server_id = server_id
33    engine._restart_cb = restart_cb
34    engine._reset_monitor_ready_cb = reset_monitor_ready_cb
35    engine._loop = asyncio.get_running_loop()
36    engine._async_group = aio.Group()
37    engine._register_queue = aio.Queue(register_queue_size)
38    engine._source_modules = collections.deque()
39
40    engine._last_event_id = await backend.get_last_event_id(server_id)
41
42    future = engine._loop.create_future()
43    source = common.Source(type=common.SourceType.ENGINE, id=0)
44    events = [engine._create_status_reg_event('STARTED')]
45    engine._register_queue.put_nowait((future, source, events))
46
47    try:
48        for source_id, module_conf in enumerate(module_confs):
49            info = common.import_module_info(module_conf['module'])
50            source = common.Source(type=common.SourceType.MODULE,
51                                   id=source_id)
52
53            module = await engine.async_group.spawn(
54                aio.call, info.create, module_conf, engine, source)
55            engine.async_group.spawn(aio.call_on_cancel, module.async_close)
56            engine.async_group.spawn(aio.call_on_done, module.wait_closing(),
57                                     engine.close)
58
59            engine._source_modules.append((source, module))
60
61        engine.async_group.spawn(engine._register_loop)
62
63    except BaseException:
64        await aio.uncancellable(engine.async_close())
65        raise
66
67    return engine

Create engine

class Engine(hat.event.common.module.Engine):
 70class Engine(common.Engine):
 71
 72    @property
 73    def async_group(self) -> aio.Group:
 74        """Async group"""
 75        return self._async_group
 76
 77    @property
 78    def server_id(self) -> int:
 79        return self._server_id
 80
 81    async def register(self,
 82                       source: common.Source,
 83                       events: Collection[common.RegisterEvent]
 84                       ) -> Collection[common.Event] | None:
 85        if not events:
 86            return []
 87
 88        future = self._loop.create_future()
 89
 90        try:
 91            await self._register_queue.put((future, source, events))
 92
 93        except aio.QueueClosedError:
 94            raise Exception('engine closed')
 95
 96        return await future
 97
 98    async def query(self,
 99                    params: common.QueryParams
100                    ) -> common.QueryResult:
101        return await self._backend.query(params)
102
103    def get_client_names(self) -> Iterable[tuple[common.Source, str]]:
104        return self._eventer_server.get_client_names()
105
106    def restart(self):
107        self._restart_cb()
108
109    def reset_monitor_ready(self):
110        self._reset_monitor_ready_cb()
111
112    async def _register_loop(self):
113        future = None
114        mlog.debug("starting register loop")
115
116        try:
117            while True:
118                mlog.debug("waiting for register requests")
119                future, source, register_events = \
120                    await self._register_queue.get()
121
122                mlog.debug("processing session")
123                events = await self._process_sessions(source, register_events)
124
125                mlog.debug("registering to backend")
126                events = await self._backend.register(events)
127
128                if future.done():
129                    continue
130
131                result = (
132                    list(event for event, _ in zip(events, register_events))
133                    if events is not None else None)
134                future.set_result(result)
135
136        except Exception as e:
137            mlog.error("register loop error: %s", e, exc_info=e)
138
139        finally:
140            mlog.debug("register loop closed")
141            self.close()
142            self._register_queue.close()
143
144            while True:
145                if future and not future.done():
146                    future.set_exception(Exception('engine closed'))
147                if self._register_queue.empty():
148                    break
149                future, _, __ = self._register_queue.get_nowait()
150
151            timestamp = self._create_session()
152            status_reg_event = self._create_status_reg_event('STOPPED')
153            events = [self._create_event(timestamp, status_reg_event)]
154            await self._backend.register(events)
155
156    async def _process_sessions(self, source, register_events):
157        timestamp = self._create_session()
158
159        for _, module in self._source_modules:
160            await aio.call(module.on_session_start,
161                           self._last_event_id.session)
162
163        events = collections.deque(
164            self._create_event(timestamp, register_event)
165            for register_event in register_events)
166
167        input_source_events = [(source, event) for event in events]
168        while input_source_events:
169            output_source_events = collections.deque()
170
171            for output_source, module in self._source_modules:
172                for input_source, input_event in input_source_events:
173                    if not module.subscription.matches(input_event.type):
174                        continue
175
176                    output_register_events = await aio.call(
177                        module.process, input_source, input_event)
178
179                    if not output_register_events:
180                        continue
181
182                    for output_register_event in output_register_events:
183                        output_event = self._create_event(
184                            timestamp, output_register_event)
185                        output_source_events.append(
186                            (output_source, output_event))
187                        events.append(output_event)
188
189            input_source_events = output_source_events
190
191        for _, module in self._source_modules:
192            await aio.call(module.on_session_stop, self._last_event_id.session)
193
194        return events
195
196    def _create_status_reg_event(self, status):
197        return common.RegisterEvent(
198            type=('event', str(self._server_id), 'engine'),
199            source_timestamp=None,
200            payload=common.EventPayloadJson(status))
201
202    def _create_session(self):
203        self._last_event_id = self._last_event_id._replace(
204            session=self._last_event_id.session + 1,
205            instance=0)
206
207        return common.now()
208
209    def _create_event(self, timestamp, register_event):
210        self._last_event_id = self._last_event_id._replace(
211            instance=self._last_event_id.instance + 1)
212
213        return common.Event(id=self._last_event_id,
214                            type=register_event.type,
215                            timestamp=timestamp,
216                            source_timestamp=register_event.source_timestamp,
217                            payload=register_event.payload)

Engine ABC

async_group: hat.aio.group.Group
72    @property
73    def async_group(self) -> aio.Group:
74        """Async group"""
75        return self._async_group

Async group

server_id: int
77    @property
78    def server_id(self) -> int:
79        return self._server_id

Event server identifier

async def register( self, source: hat.event.common.Source, events: Collection[hat.event.common.RegisterEvent]) -> Collection[hat.event.common.Event] | None:
81    async def register(self,
82                       source: common.Source,
83                       events: Collection[common.RegisterEvent]
84                       ) -> Collection[common.Event] | None:
85        if not events:
86            return []
87
88        future = self._loop.create_future()
89
90        try:
91            await self._register_queue.put((future, source, events))
92
93        except aio.QueueClosedError:
94            raise Exception('engine closed')
95
96        return await future

Register events

 98    async def query(self,
 99                    params: common.QueryParams
100                    ) -> common.QueryResult:
101        return await self._backend.query(params)

Query events

def get_client_names(self) -> Iterable[tuple[hat.event.common.Source, str]]:
103    def get_client_names(self) -> Iterable[tuple[common.Source, str]]:
104        return self._eventer_server.get_client_names()

Get client names connected to local eventer server

def restart(self):
106    def restart(self):
107        self._restart_cb()

Schedule engine restart

def reset_monitor_ready(self):
109    def reset_monitor_ready(self):
110        self._reset_monitor_ready_cb()

Schedule reseting of monitor component's ready flag