hat.event.server.engine

Engine

  1"""Engine"""
  2
  3import asyncio
  4import collections
  5import importlib
  6import logging
  7import typing
  8
  9from hat import aio
 10from hat import json
 11from hat import util
 12from hat.event.server import common
 13
 14
 15mlog: logging.Logger = logging.getLogger(__name__)
 16"""Module logger"""
 17
 18EventsCb = typing.Callable[[typing.List[common.Event]], None]
 19"""Events callback"""
 20
 21
 22async def create_engine(conf: json.Data,
 23                        backend: common.Backend
 24                        ) -> 'Engine':
 25    """Create engine
 26
 27    Args:
 28        conf: configuration defined by
 29            ``hat-event://main.yaml#/definitions/engine``
 30        backend: backend
 31
 32    """
 33    engine = Engine()
 34    engine._backend = backend
 35    engine._async_group = aio.Group()
 36    engine._register_queue = aio.Queue()
 37    engine._register_cbs = util.CallbackRegistry()
 38    engine._source_modules = collections.deque()
 39
 40    engine._last_event_id = await backend.get_last_event_id(conf['server_id'])
 41
 42    future = asyncio.Future()
 43    source = common.Source(type=common.SourceType.ENGINE, id=0)
 44    events = [engine._create_status_reg_event('STARTED')]
 45    engine._register_queue.put_nowait((future, source, events))
 46    try:
 47        for source_id, module_conf in enumerate(conf['modules']):
 48            py_module = importlib.import_module(module_conf['module'])
 49
 50            source = common.Source(type=common.SourceType.MODULE,
 51                                   id=source_id)
 52
 53            module = await engine.async_group.spawn(
 54                aio.call, py_module.create, module_conf, engine, source)
 55            engine.async_group.spawn(aio.call_on_cancel, module.async_close)
 56            engine.async_group.spawn(aio.call_on_done, module.wait_closing(),
 57                                     engine.close)
 58
 59            engine._source_modules.append((source, module))
 60
 61        engine.async_group.spawn(engine._register_loop)
 62
 63    except BaseException:
 64        await aio.uncancellable(engine.async_close())
 65        raise
 66
 67    return engine
 68
 69
 70class Engine(aio.Resource):
 71
 72    @property
 73    def async_group(self) -> aio.Group:
 74        """Async group"""
 75        return self._async_group
 76
 77    def register_events_cb(self,
 78                           cb: EventsCb
 79                           ) -> util.RegisterCallbackHandle:
 80        """Register events callback"""
 81        return self._register_cbs.register(cb)
 82
 83    async def register(self,
 84                       source: common.Source,
 85                       events: typing.List[common.RegisterEvent]
 86                       ) -> typing.List[typing.Optional[common.Event]]:
 87        """Register events"""
 88        if not events:
 89            return []
 90
 91        future = asyncio.Future()
 92        self._register_queue.put_nowait((future, source, events))
 93        return await future
 94
 95    async def query(self,
 96                    data: common.QueryData
 97                    ) -> typing.List[common.Event]:
 98        """Query events"""
 99        return await self._backend.query(data)
100
101    async def _register_loop(self):
102        future = None
103        mlog.debug("starting register loop")
104
105        try:
106            while True:
107                mlog.debug("waiting for register requests")
108                future, source, register_events = \
109                    await self._register_queue.get()
110                mlog.debug("processing session")
111                events = await self._process_sessions(source, register_events)
112
113                mlog.debug("registering to backend")
114                events = await self._backend.register(events)
115                if not future.done():
116                    result = events[:len(register_events)]
117                    future.set_result(result)
118
119                events = [event for event in events if event]
120                if events:
121                    self._register_cbs.notify(events)
122
123        except Exception as e:
124            mlog.error("register loop error: %s", e, exc_info=e)
125
126        finally:
127            mlog.debug("register loop closed")
128            self.close()
129            self._register_queue.close()
130
131            while True:
132                if future and not future.done():
133                    future.set_exception(Exception('module engine closed'))
134                if self._register_queue.empty():
135                    break
136                future, _, __ = self._register_queue.get_nowait()
137
138            status_reg_event = self._create_status_reg_event('STOPPED')
139            events = [self._create_event(common.now(), status_reg_event)]
140            await self._backend.register(events)
141
142    async def _process_sessions(self, source, register_events):
143        timestamp = common.now()
144        self._last_event_id = self._last_event_id._replace(
145            session=self._last_event_id.session + 1)
146
147        for _, module in self._source_modules:
148            await module.on_session_start(self._last_event_id.session)
149
150        events = collections.deque(
151            self._create_event(timestamp, register_event)
152            for register_event in register_events)
153
154        input_source_events = [(source, event) for event in events]
155        while input_source_events:
156            output_source_events = collections.deque()
157
158            for output_source, module in self._source_modules:
159                for input_source, input_event in input_source_events:
160                    if not module.subscription.matches(input_event.event_type):
161                        continue
162
163                    async for register_event in module.process(input_source,
164                                                               input_event):
165                        output_event = self._create_event(timestamp,
166                                                          register_event)
167                        output_source_events.append((output_source,
168                                                     output_event))
169                        events.append(output_event)
170
171            input_source_events = output_source_events
172
173        for _, module in self._source_modules:
174            await module.on_session_stop(self._last_event_id.session)
175
176        return list(events)
177
178    def _create_status_reg_event(self, status):
179        return common.RegisterEvent(
180            event_type=('event', 'engine'),
181            source_timestamp=None,
182            payload=common.EventPayload(
183                type=common.EventPayloadType.JSON,
184                data=status))
185
186    def _create_event(self, timestamp, register_event):
187        self._last_event_id = self._last_event_id._replace(
188            instance=self._last_event_id.instance + 1)
189
190        return common.Event(event_id=self._last_event_id,
191                            event_type=register_event.event_type,
192                            timestamp=timestamp,
193                            source_timestamp=register_event.source_timestamp,
194                            payload=register_event.payload)
mlog: logging.Logger = <Logger hat.event.server.engine (WARNING)>

Module logger

EventsCb = typing.Callable[[typing.List[hat.event.common.data.Event]], NoneType]

Events callback

async def create_engine( conf: ~Data, backend: hat.event.server.common.Backend) -> hat.event.server.engine.Engine:
23async def create_engine(conf: json.Data,
24                        backend: common.Backend
25                        ) -> 'Engine':
26    """Create engine
27
28    Args:
29        conf: configuration defined by
30            ``hat-event://main.yaml#/definitions/engine``
31        backend: backend
32
33    """
34    engine = Engine()
35    engine._backend = backend
36    engine._async_group = aio.Group()
37    engine._register_queue = aio.Queue()
38    engine._register_cbs = util.CallbackRegistry()
39    engine._source_modules = collections.deque()
40
41    engine._last_event_id = await backend.get_last_event_id(conf['server_id'])
42
43    future = asyncio.Future()
44    source = common.Source(type=common.SourceType.ENGINE, id=0)
45    events = [engine._create_status_reg_event('STARTED')]
46    engine._register_queue.put_nowait((future, source, events))
47    try:
48        for source_id, module_conf in enumerate(conf['modules']):
49            py_module = importlib.import_module(module_conf['module'])
50
51            source = common.Source(type=common.SourceType.MODULE,
52                                   id=source_id)
53
54            module = await engine.async_group.spawn(
55                aio.call, py_module.create, module_conf, engine, source)
56            engine.async_group.spawn(aio.call_on_cancel, module.async_close)
57            engine.async_group.spawn(aio.call_on_done, module.wait_closing(),
58                                     engine.close)
59
60            engine._source_modules.append((source, module))
61
62        engine.async_group.spawn(engine._register_loop)
63
64    except BaseException:
65        await aio.uncancellable(engine.async_close())
66        raise
67
68    return engine

Create engine

Arguments:
  • conf: configuration defined by hat-event://main.yaml#/definitions/engine
  • backend: backend
class Engine(hat.aio.Resource):
 71class Engine(aio.Resource):
 72
 73    @property
 74    def async_group(self) -> aio.Group:
 75        """Async group"""
 76        return self._async_group
 77
 78    def register_events_cb(self,
 79                           cb: EventsCb
 80                           ) -> util.RegisterCallbackHandle:
 81        """Register events callback"""
 82        return self._register_cbs.register(cb)
 83
 84    async def register(self,
 85                       source: common.Source,
 86                       events: typing.List[common.RegisterEvent]
 87                       ) -> typing.List[typing.Optional[common.Event]]:
 88        """Register events"""
 89        if not events:
 90            return []
 91
 92        future = asyncio.Future()
 93        self._register_queue.put_nowait((future, source, events))
 94        return await future
 95
 96    async def query(self,
 97                    data: common.QueryData
 98                    ) -> typing.List[common.Event]:
 99        """Query events"""
100        return await self._backend.query(data)
101
102    async def _register_loop(self):
103        future = None
104        mlog.debug("starting register loop")
105
106        try:
107            while True:
108                mlog.debug("waiting for register requests")
109                future, source, register_events = \
110                    await self._register_queue.get()
111                mlog.debug("processing session")
112                events = await self._process_sessions(source, register_events)
113
114                mlog.debug("registering to backend")
115                events = await self._backend.register(events)
116                if not future.done():
117                    result = events[:len(register_events)]
118                    future.set_result(result)
119
120                events = [event for event in events if event]
121                if events:
122                    self._register_cbs.notify(events)
123
124        except Exception as e:
125            mlog.error("register loop error: %s", e, exc_info=e)
126
127        finally:
128            mlog.debug("register loop closed")
129            self.close()
130            self._register_queue.close()
131
132            while True:
133                if future and not future.done():
134                    future.set_exception(Exception('module engine closed'))
135                if self._register_queue.empty():
136                    break
137                future, _, __ = self._register_queue.get_nowait()
138
139            status_reg_event = self._create_status_reg_event('STOPPED')
140            events = [self._create_event(common.now(), status_reg_event)]
141            await self._backend.register(events)
142
143    async def _process_sessions(self, source, register_events):
144        timestamp = common.now()
145        self._last_event_id = self._last_event_id._replace(
146            session=self._last_event_id.session + 1)
147
148        for _, module in self._source_modules:
149            await module.on_session_start(self._last_event_id.session)
150
151        events = collections.deque(
152            self._create_event(timestamp, register_event)
153            for register_event in register_events)
154
155        input_source_events = [(source, event) for event in events]
156        while input_source_events:
157            output_source_events = collections.deque()
158
159            for output_source, module in self._source_modules:
160                for input_source, input_event in input_source_events:
161                    if not module.subscription.matches(input_event.event_type):
162                        continue
163
164                    async for register_event in module.process(input_source,
165                                                               input_event):
166                        output_event = self._create_event(timestamp,
167                                                          register_event)
168                        output_source_events.append((output_source,
169                                                     output_event))
170                        events.append(output_event)
171
172            input_source_events = output_source_events
173
174        for _, module in self._source_modules:
175            await module.on_session_stop(self._last_event_id.session)
176
177        return list(events)
178
179    def _create_status_reg_event(self, status):
180        return common.RegisterEvent(
181            event_type=('event', 'engine'),
182            source_timestamp=None,
183            payload=common.EventPayload(
184                type=common.EventPayloadType.JSON,
185                data=status))
186
187    def _create_event(self, timestamp, register_event):
188        self._last_event_id = self._last_event_id._replace(
189            instance=self._last_event_id.instance + 1)
190
191        return common.Event(event_id=self._last_event_id,
192                            event_type=register_event.event_type,
193                            timestamp=timestamp,
194                            source_timestamp=register_event.source_timestamp,
195                            payload=register_event.payload)

Resource with lifetime control based on Group.

Engine()
async_group: hat.aio.Group

Async group

def register_events_cb( self, cb: Callable[[List[hat.event.common.data.Event]], NoneType]) -> hat.util.RegisterCallbackHandle:
78    def register_events_cb(self,
79                           cb: EventsCb
80                           ) -> util.RegisterCallbackHandle:
81        """Register events callback"""
82        return self._register_cbs.register(cb)

Register events callback

async def register( self, source: hat.event.server.common.Source, events: List[hat.event.common.data.RegisterEvent]) -> List[Optional[hat.event.common.data.Event]]:
84    async def register(self,
85                       source: common.Source,
86                       events: typing.List[common.RegisterEvent]
87                       ) -> typing.List[typing.Optional[common.Event]]:
88        """Register events"""
89        if not events:
90            return []
91
92        future = asyncio.Future()
93        self._register_queue.put_nowait((future, source, events))
94        return await future

Register events

async def query( self, data: hat.event.common.data.QueryData) -> List[hat.event.common.data.Event]:
 96    async def query(self,
 97                    data: common.QueryData
 98                    ) -> typing.List[common.Event]:
 99        """Query events"""
100        return await self._backend.query(data)

Query events

Inherited Members
hat.aio.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close