hat.event.server.engine
Engine
1"""Engine""" 2 3import asyncio 4import collections 5import importlib 6import logging 7 8from hat import aio 9from hat import json 10from hat import util 11 12from hat.event.server import common 13 14 15mlog: logging.Logger = logging.getLogger(__name__) 16"""Module logger""" 17 18 19async def create_engine(conf: json.Data, 20 backend: common.Backend 21 ) -> 'Engine': 22 """Create engine 23 24 Args: 25 conf: configuration defined by 26 ``hat-event://main.yaml#/definitions/engine`` 27 backend: backend 28 29 """ 30 engine = Engine() 31 engine._backend = backend 32 engine._async_group = aio.Group() 33 engine._register_queue = aio.Queue() 34 engine._register_cbs = util.CallbackRegistry() 35 engine._source_modules = collections.deque() 36 37 engine._last_event_id = await backend.get_last_event_id(conf['server_id']) 38 39 future = asyncio.Future() 40 source = common.Source(type=common.SourceType.ENGINE, id=0) 41 events = [engine._create_status_reg_event('STARTED')] 42 engine._register_queue.put_nowait((future, source, events)) 43 try: 44 for source_id, module_conf in enumerate(conf['modules']): 45 py_module = importlib.import_module(module_conf['module']) 46 47 source = common.Source(type=common.SourceType.MODULE, 48 id=source_id) 49 50 module = await engine.async_group.spawn( 51 aio.call, py_module.create, module_conf, engine, source) 52 engine.async_group.spawn(aio.call_on_cancel, module.async_close) 53 engine.async_group.spawn(aio.call_on_done, module.wait_closing(), 54 engine.close) 55 56 engine._source_modules.append((source, module)) 57 58 engine.async_group.spawn(engine._register_loop) 59 60 except BaseException: 61 await aio.uncancellable(engine.async_close()) 62 raise 63 64 return engine 65 66 67class Engine(common.Engine): 68 69 @property 70 def async_group(self) -> aio.Group: 71 """Async group""" 72 return self._async_group 73 74 def register_events_cb(self, 75 cb: common.EventsCb 76 ) -> util.RegisterCallbackHandle: 77 """Register events callback""" 78 return self._register_cbs.register(cb) 79 80 async def register(self, 81 source: common.Source, 82 events: list[common.RegisterEvent] 83 ) -> list[common.Event | None]: 84 """Register events""" 85 if not events: 86 return [] 87 88 future = asyncio.Future() 89 self._register_queue.put_nowait((future, source, events)) 90 return await future 91 92 async def query(self, 93 data: common.QueryData 94 ) -> list[common.Event]: 95 """Query events""" 96 return await self._backend.query(data) 97 98 async def _register_loop(self): 99 future = None 100 mlog.debug("starting register loop") 101 102 try: 103 while True: 104 mlog.debug("waiting for register requests") 105 future, source, register_events = \ 106 await self._register_queue.get() 107 mlog.debug("processing session") 108 events = await self._process_sessions(source, register_events) 109 110 mlog.debug("registering to backend") 111 events = await self._backend.register(events) 112 if not future.done(): 113 result = events[:len(register_events)] 114 future.set_result(result) 115 116 events = [event for event in events if event] 117 if events: 118 self._register_cbs.notify(events) 119 120 except Exception as e: 121 mlog.error("register loop error: %s", e, exc_info=e) 122 123 finally: 124 mlog.debug("register loop closed") 125 self.close() 126 self._register_queue.close() 127 128 while True: 129 if future and not future.done(): 130 future.set_exception(Exception('module engine closed')) 131 if self._register_queue.empty(): 132 break 133 future, _, __ = self._register_queue.get_nowait() 134 135 status_reg_event = self._create_status_reg_event('STOPPED') 136 events = [self._create_event(common.now(), status_reg_event)] 137 await self._backend.register(events) 138 139 async def _process_sessions(self, source, register_events): 140 timestamp = common.now() 141 self._last_event_id = self._last_event_id._replace( 142 session=self._last_event_id.session + 1) 143 144 for _, module in self._source_modules: 145 await module.on_session_start(self._last_event_id.session) 146 147 events = collections.deque( 148 self._create_event(timestamp, register_event) 149 for register_event in register_events) 150 151 input_source_events = [(source, event) for event in events] 152 while input_source_events: 153 output_source_events = collections.deque() 154 155 for output_source, module in self._source_modules: 156 for input_source, input_event in input_source_events: 157 if not module.subscription.matches(input_event.event_type): 158 continue 159 160 async for register_event in module.process(input_source, 161 input_event): 162 output_event = self._create_event(timestamp, 163 register_event) 164 output_source_events.append((output_source, 165 output_event)) 166 events.append(output_event) 167 168 input_source_events = output_source_events 169 170 for _, module in self._source_modules: 171 await module.on_session_stop(self._last_event_id.session) 172 173 return list(events) 174 175 def _create_status_reg_event(self, status): 176 return common.RegisterEvent( 177 event_type=('event', 'engine'), 178 source_timestamp=None, 179 payload=common.EventPayload( 180 type=common.EventPayloadType.JSON, 181 data=status)) 182 183 def _create_event(self, timestamp, register_event): 184 self._last_event_id = self._last_event_id._replace( 185 instance=self._last_event_id.instance + 1) 186 187 return common.Event(event_id=self._last_event_id, 188 event_type=register_event.event_type, 189 timestamp=timestamp, 190 source_timestamp=register_event.source_timestamp, 191 payload=register_event.payload)
Module logger
async def
create_engine( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], backend: hat.event.server.common.Backend) -> Engine:
20async def create_engine(conf: json.Data, 21 backend: common.Backend 22 ) -> 'Engine': 23 """Create engine 24 25 Args: 26 conf: configuration defined by 27 ``hat-event://main.yaml#/definitions/engine`` 28 backend: backend 29 30 """ 31 engine = Engine() 32 engine._backend = backend 33 engine._async_group = aio.Group() 34 engine._register_queue = aio.Queue() 35 engine._register_cbs = util.CallbackRegistry() 36 engine._source_modules = collections.deque() 37 38 engine._last_event_id = await backend.get_last_event_id(conf['server_id']) 39 40 future = asyncio.Future() 41 source = common.Source(type=common.SourceType.ENGINE, id=0) 42 events = [engine._create_status_reg_event('STARTED')] 43 engine._register_queue.put_nowait((future, source, events)) 44 try: 45 for source_id, module_conf in enumerate(conf['modules']): 46 py_module = importlib.import_module(module_conf['module']) 47 48 source = common.Source(type=common.SourceType.MODULE, 49 id=source_id) 50 51 module = await engine.async_group.spawn( 52 aio.call, py_module.create, module_conf, engine, source) 53 engine.async_group.spawn(aio.call_on_cancel, module.async_close) 54 engine.async_group.spawn(aio.call_on_done, module.wait_closing(), 55 engine.close) 56 57 engine._source_modules.append((source, module)) 58 59 engine.async_group.spawn(engine._register_loop) 60 61 except BaseException: 62 await aio.uncancellable(engine.async_close()) 63 raise 64 65 return engine
Create engine
Arguments:
- conf: configuration defined by
hat-event://main.yaml#/definitions/engine
- backend: backend
68class Engine(common.Engine): 69 70 @property 71 def async_group(self) -> aio.Group: 72 """Async group""" 73 return self._async_group 74 75 def register_events_cb(self, 76 cb: common.EventsCb 77 ) -> util.RegisterCallbackHandle: 78 """Register events callback""" 79 return self._register_cbs.register(cb) 80 81 async def register(self, 82 source: common.Source, 83 events: list[common.RegisterEvent] 84 ) -> list[common.Event | None]: 85 """Register events""" 86 if not events: 87 return [] 88 89 future = asyncio.Future() 90 self._register_queue.put_nowait((future, source, events)) 91 return await future 92 93 async def query(self, 94 data: common.QueryData 95 ) -> list[common.Event]: 96 """Query events""" 97 return await self._backend.query(data) 98 99 async def _register_loop(self): 100 future = None 101 mlog.debug("starting register loop") 102 103 try: 104 while True: 105 mlog.debug("waiting for register requests") 106 future, source, register_events = \ 107 await self._register_queue.get() 108 mlog.debug("processing session") 109 events = await self._process_sessions(source, register_events) 110 111 mlog.debug("registering to backend") 112 events = await self._backend.register(events) 113 if not future.done(): 114 result = events[:len(register_events)] 115 future.set_result(result) 116 117 events = [event for event in events if event] 118 if events: 119 self._register_cbs.notify(events) 120 121 except Exception as e: 122 mlog.error("register loop error: %s", e, exc_info=e) 123 124 finally: 125 mlog.debug("register loop closed") 126 self.close() 127 self._register_queue.close() 128 129 while True: 130 if future and not future.done(): 131 future.set_exception(Exception('module engine closed')) 132 if self._register_queue.empty(): 133 break 134 future, _, __ = self._register_queue.get_nowait() 135 136 status_reg_event = self._create_status_reg_event('STOPPED') 137 events = [self._create_event(common.now(), status_reg_event)] 138 await self._backend.register(events) 139 140 async def _process_sessions(self, source, register_events): 141 timestamp = common.now() 142 self._last_event_id = self._last_event_id._replace( 143 session=self._last_event_id.session + 1) 144 145 for _, module in self._source_modules: 146 await module.on_session_start(self._last_event_id.session) 147 148 events = collections.deque( 149 self._create_event(timestamp, register_event) 150 for register_event in register_events) 151 152 input_source_events = [(source, event) for event in events] 153 while input_source_events: 154 output_source_events = collections.deque() 155 156 for output_source, module in self._source_modules: 157 for input_source, input_event in input_source_events: 158 if not module.subscription.matches(input_event.event_type): 159 continue 160 161 async for register_event in module.process(input_source, 162 input_event): 163 output_event = self._create_event(timestamp, 164 register_event) 165 output_source_events.append((output_source, 166 output_event)) 167 events.append(output_event) 168 169 input_source_events = output_source_events 170 171 for _, module in self._source_modules: 172 await module.on_session_stop(self._last_event_id.session) 173 174 return list(events) 175 176 def _create_status_reg_event(self, status): 177 return common.RegisterEvent( 178 event_type=('event', 'engine'), 179 source_timestamp=None, 180 payload=common.EventPayload( 181 type=common.EventPayloadType.JSON, 182 data=status)) 183 184 def _create_event(self, timestamp, register_event): 185 self._last_event_id = self._last_event_id._replace( 186 instance=self._last_event_id.instance + 1) 187 188 return common.Event(event_id=self._last_event_id, 189 event_type=register_event.event_type, 190 timestamp=timestamp, 191 source_timestamp=register_event.source_timestamp, 192 payload=register_event.payload)
Engine ABC
def
register_events_cb( self, cb: Callable[[list[hat.event.common.data.Event]], NoneType]) -> hat.util.RegisterCallbackHandle:
75 def register_events_cb(self, 76 cb: common.EventsCb 77 ) -> util.RegisterCallbackHandle: 78 """Register events callback""" 79 return self._register_cbs.register(cb)
Register events callback
async def
register( self, source: hat.event.server.common.Source, events: list[hat.event.common.data.RegisterEvent]) -> list[hat.event.common.data.Event | None]:
81 async def register(self, 82 source: common.Source, 83 events: list[common.RegisterEvent] 84 ) -> list[common.Event | None]: 85 """Register events""" 86 if not events: 87 return [] 88 89 future = asyncio.Future() 90 self._register_queue.put_nowait((future, source, events)) 91 return await future
Register events
async def
query( self, data: hat.event.common.data.QueryData) -> list[hat.event.common.data.Event]:
93 async def query(self, 94 data: common.QueryData 95 ) -> list[common.Event]: 96 """Query events""" 97 return await self._backend.query(data)
Query events
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close