hat.event.server.engine

Engine

  1"""Engine"""
  2
  3import asyncio
  4import collections
  5import importlib
  6import logging
  7
  8from hat import aio
  9from hat import json
 10from hat import util
 11
 12from hat.event.server import common
 13
 14
 15mlog: logging.Logger = logging.getLogger(__name__)
 16"""Module logger"""
 17
 18
 19async def create_engine(conf: json.Data,
 20                        backend: common.Backend
 21                        ) -> 'Engine':
 22    """Create engine
 23
 24    Args:
 25        conf: configuration defined by
 26            ``hat-event://main.yaml#/definitions/engine``
 27        backend: backend
 28
 29    """
 30    engine = Engine()
 31    engine._backend = backend
 32    engine._async_group = aio.Group()
 33    engine._register_queue = aio.Queue()
 34    engine._register_cbs = util.CallbackRegistry()
 35    engine._source_modules = collections.deque()
 36
 37    engine._last_event_id = await backend.get_last_event_id(conf['server_id'])
 38
 39    future = asyncio.Future()
 40    source = common.Source(type=common.SourceType.ENGINE, id=0)
 41    events = [engine._create_status_reg_event('STARTED')]
 42    engine._register_queue.put_nowait((future, source, events))
 43    try:
 44        for source_id, module_conf in enumerate(conf['modules']):
 45            py_module = importlib.import_module(module_conf['module'])
 46
 47            source = common.Source(type=common.SourceType.MODULE,
 48                                   id=source_id)
 49
 50            module = await engine.async_group.spawn(
 51                aio.call, py_module.create, module_conf, engine, source)
 52            engine.async_group.spawn(aio.call_on_cancel, module.async_close)
 53            engine.async_group.spawn(aio.call_on_done, module.wait_closing(),
 54                                     engine.close)
 55
 56            engine._source_modules.append((source, module))
 57
 58        engine.async_group.spawn(engine._register_loop)
 59
 60    except BaseException:
 61        await aio.uncancellable(engine.async_close())
 62        raise
 63
 64    return engine
 65
 66
 67class Engine(common.Engine):
 68
 69    @property
 70    def async_group(self) -> aio.Group:
 71        """Async group"""
 72        return self._async_group
 73
 74    def register_events_cb(self,
 75                           cb: common.EventsCb
 76                           ) -> util.RegisterCallbackHandle:
 77        """Register events callback"""
 78        return self._register_cbs.register(cb)
 79
 80    async def register(self,
 81                       source: common.Source,
 82                       events: list[common.RegisterEvent]
 83                       ) -> list[common.Event | None]:
 84        """Register events"""
 85        if not events:
 86            return []
 87
 88        future = asyncio.Future()
 89        self._register_queue.put_nowait((future, source, events))
 90        return await future
 91
 92    async def query(self,
 93                    data: common.QueryData
 94                    ) -> list[common.Event]:
 95        """Query events"""
 96        return await self._backend.query(data)
 97
 98    async def _register_loop(self):
 99        future = None
100        mlog.debug("starting register loop")
101
102        try:
103            while True:
104                mlog.debug("waiting for register requests")
105                future, source, register_events = \
106                    await self._register_queue.get()
107                mlog.debug("processing session")
108                events = await self._process_sessions(source, register_events)
109
110                mlog.debug("registering to backend")
111                events = await self._backend.register(events)
112                if not future.done():
113                    result = events[:len(register_events)]
114                    future.set_result(result)
115
116                events = [event for event in events if event]
117                if events:
118                    self._register_cbs.notify(events)
119
120        except Exception as e:
121            mlog.error("register loop error: %s", e, exc_info=e)
122
123        finally:
124            mlog.debug("register loop closed")
125            self.close()
126            self._register_queue.close()
127
128            while True:
129                if future and not future.done():
130                    future.set_exception(Exception('module engine closed'))
131                if self._register_queue.empty():
132                    break
133                future, _, __ = self._register_queue.get_nowait()
134
135            status_reg_event = self._create_status_reg_event('STOPPED')
136            events = [self._create_event(common.now(), status_reg_event)]
137            await self._backend.register(events)
138
139    async def _process_sessions(self, source, register_events):
140        timestamp = common.now()
141        self._last_event_id = self._last_event_id._replace(
142            session=self._last_event_id.session + 1)
143
144        for _, module in self._source_modules:
145            await module.on_session_start(self._last_event_id.session)
146
147        events = collections.deque(
148            self._create_event(timestamp, register_event)
149            for register_event in register_events)
150
151        input_source_events = [(source, event) for event in events]
152        while input_source_events:
153            output_source_events = collections.deque()
154
155            for output_source, module in self._source_modules:
156                for input_source, input_event in input_source_events:
157                    if not module.subscription.matches(input_event.event_type):
158                        continue
159
160                    async for register_event in module.process(input_source,
161                                                               input_event):
162                        output_event = self._create_event(timestamp,
163                                                          register_event)
164                        output_source_events.append((output_source,
165                                                     output_event))
166                        events.append(output_event)
167
168            input_source_events = output_source_events
169
170        for _, module in self._source_modules:
171            await module.on_session_stop(self._last_event_id.session)
172
173        return list(events)
174
175    def _create_status_reg_event(self, status):
176        return common.RegisterEvent(
177            event_type=('event', 'engine'),
178            source_timestamp=None,
179            payload=common.EventPayload(
180                type=common.EventPayloadType.JSON,
181                data=status))
182
183    def _create_event(self, timestamp, register_event):
184        self._last_event_id = self._last_event_id._replace(
185            instance=self._last_event_id.instance + 1)
186
187        return common.Event(event_id=self._last_event_id,
188                            event_type=register_event.event_type,
189                            timestamp=timestamp,
190                            source_timestamp=register_event.source_timestamp,
191                            payload=register_event.payload)
mlog: logging.Logger = <Logger hat.event.server.engine (WARNING)>

Module logger

async def create_engine( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], backend: hat.event.server.common.Backend) -> Engine:
20async def create_engine(conf: json.Data,
21                        backend: common.Backend
22                        ) -> 'Engine':
23    """Create engine
24
25    Args:
26        conf: configuration defined by
27            ``hat-event://main.yaml#/definitions/engine``
28        backend: backend
29
30    """
31    engine = Engine()
32    engine._backend = backend
33    engine._async_group = aio.Group()
34    engine._register_queue = aio.Queue()
35    engine._register_cbs = util.CallbackRegistry()
36    engine._source_modules = collections.deque()
37
38    engine._last_event_id = await backend.get_last_event_id(conf['server_id'])
39
40    future = asyncio.Future()
41    source = common.Source(type=common.SourceType.ENGINE, id=0)
42    events = [engine._create_status_reg_event('STARTED')]
43    engine._register_queue.put_nowait((future, source, events))
44    try:
45        for source_id, module_conf in enumerate(conf['modules']):
46            py_module = importlib.import_module(module_conf['module'])
47
48            source = common.Source(type=common.SourceType.MODULE,
49                                   id=source_id)
50
51            module = await engine.async_group.spawn(
52                aio.call, py_module.create, module_conf, engine, source)
53            engine.async_group.spawn(aio.call_on_cancel, module.async_close)
54            engine.async_group.spawn(aio.call_on_done, module.wait_closing(),
55                                     engine.close)
56
57            engine._source_modules.append((source, module))
58
59        engine.async_group.spawn(engine._register_loop)
60
61    except BaseException:
62        await aio.uncancellable(engine.async_close())
63        raise
64
65    return engine

Create engine

Arguments:
  • conf: configuration defined by hat-event://main.yaml#/definitions/engine
  • backend: backend
class Engine(hat.event.server.common.Engine):
 68class Engine(common.Engine):
 69
 70    @property
 71    def async_group(self) -> aio.Group:
 72        """Async group"""
 73        return self._async_group
 74
 75    def register_events_cb(self,
 76                           cb: common.EventsCb
 77                           ) -> util.RegisterCallbackHandle:
 78        """Register events callback"""
 79        return self._register_cbs.register(cb)
 80
 81    async def register(self,
 82                       source: common.Source,
 83                       events: list[common.RegisterEvent]
 84                       ) -> list[common.Event | None]:
 85        """Register events"""
 86        if not events:
 87            return []
 88
 89        future = asyncio.Future()
 90        self._register_queue.put_nowait((future, source, events))
 91        return await future
 92
 93    async def query(self,
 94                    data: common.QueryData
 95                    ) -> list[common.Event]:
 96        """Query events"""
 97        return await self._backend.query(data)
 98
 99    async def _register_loop(self):
100        future = None
101        mlog.debug("starting register loop")
102
103        try:
104            while True:
105                mlog.debug("waiting for register requests")
106                future, source, register_events = \
107                    await self._register_queue.get()
108                mlog.debug("processing session")
109                events = await self._process_sessions(source, register_events)
110
111                mlog.debug("registering to backend")
112                events = await self._backend.register(events)
113                if not future.done():
114                    result = events[:len(register_events)]
115                    future.set_result(result)
116
117                events = [event for event in events if event]
118                if events:
119                    self._register_cbs.notify(events)
120
121        except Exception as e:
122            mlog.error("register loop error: %s", e, exc_info=e)
123
124        finally:
125            mlog.debug("register loop closed")
126            self.close()
127            self._register_queue.close()
128
129            while True:
130                if future and not future.done():
131                    future.set_exception(Exception('module engine closed'))
132                if self._register_queue.empty():
133                    break
134                future, _, __ = self._register_queue.get_nowait()
135
136            status_reg_event = self._create_status_reg_event('STOPPED')
137            events = [self._create_event(common.now(), status_reg_event)]
138            await self._backend.register(events)
139
140    async def _process_sessions(self, source, register_events):
141        timestamp = common.now()
142        self._last_event_id = self._last_event_id._replace(
143            session=self._last_event_id.session + 1)
144
145        for _, module in self._source_modules:
146            await module.on_session_start(self._last_event_id.session)
147
148        events = collections.deque(
149            self._create_event(timestamp, register_event)
150            for register_event in register_events)
151
152        input_source_events = [(source, event) for event in events]
153        while input_source_events:
154            output_source_events = collections.deque()
155
156            for output_source, module in self._source_modules:
157                for input_source, input_event in input_source_events:
158                    if not module.subscription.matches(input_event.event_type):
159                        continue
160
161                    async for register_event in module.process(input_source,
162                                                               input_event):
163                        output_event = self._create_event(timestamp,
164                                                          register_event)
165                        output_source_events.append((output_source,
166                                                     output_event))
167                        events.append(output_event)
168
169            input_source_events = output_source_events
170
171        for _, module in self._source_modules:
172            await module.on_session_stop(self._last_event_id.session)
173
174        return list(events)
175
176    def _create_status_reg_event(self, status):
177        return common.RegisterEvent(
178            event_type=('event', 'engine'),
179            source_timestamp=None,
180            payload=common.EventPayload(
181                type=common.EventPayloadType.JSON,
182                data=status))
183
184    def _create_event(self, timestamp, register_event):
185        self._last_event_id = self._last_event_id._replace(
186            instance=self._last_event_id.instance + 1)
187
188        return common.Event(event_id=self._last_event_id,
189                            event_type=register_event.event_type,
190                            timestamp=timestamp,
191                            source_timestamp=register_event.source_timestamp,
192                            payload=register_event.payload)

Engine ABC

async_group: hat.aio.group.Group

Async group

def register_events_cb( self, cb: Callable[[list[hat.event.common.data.Event]], NoneType]) -> hat.util.RegisterCallbackHandle:
75    def register_events_cb(self,
76                           cb: common.EventsCb
77                           ) -> util.RegisterCallbackHandle:
78        """Register events callback"""
79        return self._register_cbs.register(cb)

Register events callback

async def register( self, source: hat.event.server.common.Source, events: list[hat.event.common.data.RegisterEvent]) -> list[hat.event.common.data.Event | None]:
81    async def register(self,
82                       source: common.Source,
83                       events: list[common.RegisterEvent]
84                       ) -> list[common.Event | None]:
85        """Register events"""
86        if not events:
87            return []
88
89        future = asyncio.Future()
90        self._register_queue.put_nowait((future, source, events))
91        return await future

Register events

async def query( self, data: hat.event.common.data.QueryData) -> list[hat.event.common.data.Event]:
93    async def query(self,
94                    data: common.QueryData
95                    ) -> list[common.Event]:
96        """Query events"""
97        return await self._backend.query(data)

Query events

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close