hat.event.server.engine

Engine

  1"""Engine"""
  2
  3from collections.abc import Collection, Iterable
  4import asyncio
  5import collections
  6import logging
  7
  8from hat import aio
  9from hat import json
 10
 11from hat.event import common
 12
 13
 14mlog: logging.Logger = logging.getLogger(__name__)
 15"""Module logger"""
 16
 17
 18async def create_engine(backend: common.Backend,
 19                        module_confs: Iterable[json.Data],
 20                        server_id: int,
 21                        register_queue_size: int = 1024
 22                        ) -> 'Engine':
 23    """Create engine"""
 24    engine = Engine()
 25    engine._backend = backend
 26    engine._server_id = server_id
 27    engine._loop = asyncio.get_running_loop()
 28    engine._async_group = aio.Group()
 29    engine._register_queue = aio.Queue(register_queue_size)
 30    engine._source_modules = collections.deque()
 31
 32    engine._last_event_id = await backend.get_last_event_id(server_id)
 33
 34    future = engine._loop.create_future()
 35    source = common.Source(type=common.SourceType.ENGINE, id=0)
 36    events = [engine._create_status_reg_event('STARTED')]
 37    engine._register_queue.put_nowait((future, source, events))
 38
 39    try:
 40        for source_id, module_conf in enumerate(module_confs):
 41            info = common.import_module_info(module_conf['module'])
 42            source = common.Source(type=common.SourceType.MODULE,
 43                                   id=source_id)
 44
 45            module = await engine.async_group.spawn(
 46                aio.call, info.create, module_conf, engine, source)
 47            engine.async_group.spawn(aio.call_on_cancel, module.async_close)
 48            engine.async_group.spawn(aio.call_on_done, module.wait_closing(),
 49                                     engine.close)
 50
 51            engine._source_modules.append((source, module))
 52
 53        engine.async_group.spawn(engine._register_loop)
 54
 55    except BaseException:
 56        await aio.uncancellable(engine.async_close())
 57        raise
 58
 59    return engine
 60
 61
 62class Engine(common.Engine):
 63
 64    @property
 65    def async_group(self) -> aio.Group:
 66        """Async group"""
 67        return self._async_group
 68
 69    @property
 70    def server_id(self) -> int:
 71        """Event server identifier"""
 72        return self._server_id
 73
 74    async def register(self,
 75                       source: common.Source,
 76                       events: Collection[common.RegisterEvent]
 77                       ) -> Collection[common.Event] | None:
 78        """Register events"""
 79        if not events:
 80            return []
 81
 82        future = self._loop.create_future()
 83
 84        try:
 85            await self._register_queue.put((future, source, events))
 86
 87        except aio.QueueClosedError:
 88            raise Exception('engine closed')
 89
 90        return await future
 91
 92    async def query(self,
 93                    params: common.QueryParams
 94                    ) -> common.QueryResult:
 95        """Query events"""
 96        return await self._backend.query(params)
 97
 98    async def _register_loop(self):
 99        future = None
100        mlog.debug("starting register loop")
101
102        try:
103            while True:
104                mlog.debug("waiting for register requests")
105                future, source, register_events = \
106                    await self._register_queue.get()
107
108                mlog.debug("processing session")
109                events = await self._process_sessions(source, register_events)
110
111                mlog.debug("registering to backend")
112                events = await self._backend.register(events)
113
114                if future.done():
115                    continue
116
117                result = (
118                    list(event for event, _ in zip(events, register_events))
119                    if events is not None else None)
120                future.set_result(result)
121
122        except Exception as e:
123            mlog.error("register loop error: %s", e, exc_info=e)
124
125        finally:
126            mlog.debug("register loop closed")
127            self.close()
128            self._register_queue.close()
129
130            while True:
131                if future and not future.done():
132                    future.set_exception(Exception('engine closed'))
133                if self._register_queue.empty():
134                    break
135                future, _, __ = self._register_queue.get_nowait()
136
137            timestamp = self._create_session()
138            status_reg_event = self._create_status_reg_event('STOPPED')
139            events = [self._create_event(timestamp, status_reg_event)]
140            await self._backend.register(events)
141
142    async def _process_sessions(self, source, register_events):
143        timestamp = self._create_session()
144
145        for _, module in self._source_modules:
146            await aio.call(module.on_session_start,
147                           self._last_event_id.session)
148
149        events = collections.deque(
150            self._create_event(timestamp, register_event)
151            for register_event in register_events)
152
153        input_source_events = [(source, event) for event in events]
154        while input_source_events:
155            output_source_events = collections.deque()
156
157            for output_source, module in self._source_modules:
158                for input_source, input_event in input_source_events:
159                    if not module.subscription.matches(input_event.type):
160                        continue
161
162                    output_register_events = await aio.call(
163                        module.process, input_source, input_event)
164
165                    if not output_register_events:
166                        continue
167
168                    for output_register_event in output_register_events:
169                        output_event = self._create_event(
170                            timestamp, output_register_event)
171                        output_source_events.append(
172                            (output_source, output_event))
173                        events.append(output_event)
174
175            input_source_events = output_source_events
176
177        for _, module in self._source_modules:
178            await aio.call(module.on_session_stop, self._last_event_id.session)
179
180        return events
181
182    def _create_status_reg_event(self, status):
183        return common.RegisterEvent(
184            type=('event', str(self._server_id), 'engine'),
185            source_timestamp=None,
186            payload=common.EventPayloadJson(status))
187
188    def _create_session(self):
189        self._last_event_id = self._last_event_id._replace(
190            session=self._last_event_id.session + 1,
191            instance=0)
192
193        return common.now()
194
195    def _create_event(self, timestamp, register_event):
196        self._last_event_id = self._last_event_id._replace(
197            instance=self._last_event_id.instance + 1)
198
199        return common.Event(id=self._last_event_id,
200                            type=register_event.type,
201                            timestamp=timestamp,
202                            source_timestamp=register_event.source_timestamp,
203                            payload=register_event.payload)
mlog: logging.Logger = <Logger hat.event.server.engine (WARNING)>

Module logger

async def create_engine( backend: hat.event.common.backend.Backend, module_confs: collections.abc.Iterable[None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]], server_id: int, register_queue_size: int = 1024) -> Engine:
19async def create_engine(backend: common.Backend,
20                        module_confs: Iterable[json.Data],
21                        server_id: int,
22                        register_queue_size: int = 1024
23                        ) -> 'Engine':
24    """Create engine"""
25    engine = Engine()
26    engine._backend = backend
27    engine._server_id = server_id
28    engine._loop = asyncio.get_running_loop()
29    engine._async_group = aio.Group()
30    engine._register_queue = aio.Queue(register_queue_size)
31    engine._source_modules = collections.deque()
32
33    engine._last_event_id = await backend.get_last_event_id(server_id)
34
35    future = engine._loop.create_future()
36    source = common.Source(type=common.SourceType.ENGINE, id=0)
37    events = [engine._create_status_reg_event('STARTED')]
38    engine._register_queue.put_nowait((future, source, events))
39
40    try:
41        for source_id, module_conf in enumerate(module_confs):
42            info = common.import_module_info(module_conf['module'])
43            source = common.Source(type=common.SourceType.MODULE,
44                                   id=source_id)
45
46            module = await engine.async_group.spawn(
47                aio.call, info.create, module_conf, engine, source)
48            engine.async_group.spawn(aio.call_on_cancel, module.async_close)
49            engine.async_group.spawn(aio.call_on_done, module.wait_closing(),
50                                     engine.close)
51
52            engine._source_modules.append((source, module))
53
54        engine.async_group.spawn(engine._register_loop)
55
56    except BaseException:
57        await aio.uncancellable(engine.async_close())
58        raise
59
60    return engine

Create engine

class Engine(hat.event.common.module.Engine):
 63class Engine(common.Engine):
 64
 65    @property
 66    def async_group(self) -> aio.Group:
 67        """Async group"""
 68        return self._async_group
 69
 70    @property
 71    def server_id(self) -> int:
 72        """Event server identifier"""
 73        return self._server_id
 74
 75    async def register(self,
 76                       source: common.Source,
 77                       events: Collection[common.RegisterEvent]
 78                       ) -> Collection[common.Event] | None:
 79        """Register events"""
 80        if not events:
 81            return []
 82
 83        future = self._loop.create_future()
 84
 85        try:
 86            await self._register_queue.put((future, source, events))
 87
 88        except aio.QueueClosedError:
 89            raise Exception('engine closed')
 90
 91        return await future
 92
 93    async def query(self,
 94                    params: common.QueryParams
 95                    ) -> common.QueryResult:
 96        """Query events"""
 97        return await self._backend.query(params)
 98
 99    async def _register_loop(self):
100        future = None
101        mlog.debug("starting register loop")
102
103        try:
104            while True:
105                mlog.debug("waiting for register requests")
106                future, source, register_events = \
107                    await self._register_queue.get()
108
109                mlog.debug("processing session")
110                events = await self._process_sessions(source, register_events)
111
112                mlog.debug("registering to backend")
113                events = await self._backend.register(events)
114
115                if future.done():
116                    continue
117
118                result = (
119                    list(event for event, _ in zip(events, register_events))
120                    if events is not None else None)
121                future.set_result(result)
122
123        except Exception as e:
124            mlog.error("register loop error: %s", e, exc_info=e)
125
126        finally:
127            mlog.debug("register loop closed")
128            self.close()
129            self._register_queue.close()
130
131            while True:
132                if future and not future.done():
133                    future.set_exception(Exception('engine closed'))
134                if self._register_queue.empty():
135                    break
136                future, _, __ = self._register_queue.get_nowait()
137
138            timestamp = self._create_session()
139            status_reg_event = self._create_status_reg_event('STOPPED')
140            events = [self._create_event(timestamp, status_reg_event)]
141            await self._backend.register(events)
142
143    async def _process_sessions(self, source, register_events):
144        timestamp = self._create_session()
145
146        for _, module in self._source_modules:
147            await aio.call(module.on_session_start,
148                           self._last_event_id.session)
149
150        events = collections.deque(
151            self._create_event(timestamp, register_event)
152            for register_event in register_events)
153
154        input_source_events = [(source, event) for event in events]
155        while input_source_events:
156            output_source_events = collections.deque()
157
158            for output_source, module in self._source_modules:
159                for input_source, input_event in input_source_events:
160                    if not module.subscription.matches(input_event.type):
161                        continue
162
163                    output_register_events = await aio.call(
164                        module.process, input_source, input_event)
165
166                    if not output_register_events:
167                        continue
168
169                    for output_register_event in output_register_events:
170                        output_event = self._create_event(
171                            timestamp, output_register_event)
172                        output_source_events.append(
173                            (output_source, output_event))
174                        events.append(output_event)
175
176            input_source_events = output_source_events
177
178        for _, module in self._source_modules:
179            await aio.call(module.on_session_stop, self._last_event_id.session)
180
181        return events
182
183    def _create_status_reg_event(self, status):
184        return common.RegisterEvent(
185            type=('event', str(self._server_id), 'engine'),
186            source_timestamp=None,
187            payload=common.EventPayloadJson(status))
188
189    def _create_session(self):
190        self._last_event_id = self._last_event_id._replace(
191            session=self._last_event_id.session + 1,
192            instance=0)
193
194        return common.now()
195
196    def _create_event(self, timestamp, register_event):
197        self._last_event_id = self._last_event_id._replace(
198            instance=self._last_event_id.instance + 1)
199
200        return common.Event(id=self._last_event_id,
201                            type=register_event.type,
202                            timestamp=timestamp,
203                            source_timestamp=register_event.source_timestamp,
204                            payload=register_event.payload)

Engine ABC

async_group: hat.aio.group.Group
65    @property
66    def async_group(self) -> aio.Group:
67        """Async group"""
68        return self._async_group

Async group

server_id: int
70    @property
71    def server_id(self) -> int:
72        """Event server identifier"""
73        return self._server_id

Event server identifier

async def register( self, source: hat.event.common.module.Source, events: collections.abc.Collection[hat.event.common.common.RegisterEvent]) -> collections.abc.Collection[hat.event.common.common.Event] | None:
75    async def register(self,
76                       source: common.Source,
77                       events: Collection[common.RegisterEvent]
78                       ) -> Collection[common.Event] | None:
79        """Register events"""
80        if not events:
81            return []
82
83        future = self._loop.create_future()
84
85        try:
86            await self._register_queue.put((future, source, events))
87
88        except aio.QueueClosedError:
89            raise Exception('engine closed')
90
91        return await future

Register events

async def query( self, params: hat.event.common.common.QueryLatestParams | hat.event.common.common.QueryTimeseriesParams | hat.event.common.common.QueryServerParams) -> hat.event.common.common.QueryResult:
93    async def query(self,
94                    params: common.QueryParams
95                    ) -> common.QueryResult:
96        """Query events"""
97        return await self._backend.query(params)

Query events

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close