hat.event.server.engine
Engine
1"""Engine""" 2 3from collections.abc import Callable, Collection, Iterable 4import asyncio 5import collections 6import logging 7 8from hat import aio 9from hat import json 10 11from hat.event import common 12import hat.event.server.eventer_server 13 14 15mlog: logging.Logger = logging.getLogger(__name__) 16"""Module logger""" 17 18 19async def create_engine(backend: common.Backend, 20 eventer_server: hat.event.server.eventer_server.EventerServer, # NOQA 21 module_confs: Iterable[json.Data], 22 server_id: int, 23 restart_cb: Callable[[], None], 24 reset_monitor_ready_cb: Callable[[], None], 25 register_queue_size: int = 1024 26 ) -> 'Engine': 27 """Create engine""" 28 engine = Engine() 29 engine._backend = backend 30 engine._eventer_server = eventer_server 31 engine._server_id = server_id 32 engine._restart_cb = restart_cb 33 engine._reset_monitor_ready_cb = reset_monitor_ready_cb 34 engine._loop = asyncio.get_running_loop() 35 engine._async_group = aio.Group() 36 engine._register_queue = aio.Queue(register_queue_size) 37 engine._source_modules = collections.deque() 38 39 engine._last_event_id = await backend.get_last_event_id(server_id) 40 41 future = engine._loop.create_future() 42 source = common.Source(type=common.SourceType.ENGINE, id=0) 43 events = [engine._create_status_reg_event('STARTED')] 44 engine._register_queue.put_nowait((future, source, events)) 45 46 try: 47 for source_id, module_conf in enumerate(module_confs): 48 info = common.import_module_info(module_conf['module']) 49 source = common.Source(type=common.SourceType.MODULE, 50 id=source_id) 51 52 module = await engine.async_group.spawn( 53 aio.call, info.create, module_conf, engine, source) 54 engine.async_group.spawn(aio.call_on_cancel, module.async_close) 55 engine.async_group.spawn(aio.call_on_done, module.wait_closing(), 56 engine.close) 57 58 engine._source_modules.append((source, module)) 59 60 engine.async_group.spawn(engine._register_loop) 61 62 except BaseException: 63 await aio.uncancellable(engine.async_close()) 64 raise 65 66 return engine 67 68 69class Engine(common.Engine): 70 71 @property 72 def async_group(self) -> aio.Group: 73 """Async group""" 74 return self._async_group 75 76 @property 77 def server_id(self) -> int: 78 return self._server_id 79 80 async def register(self, 81 source: common.Source, 82 events: Collection[common.RegisterEvent] 83 ) -> Collection[common.Event] | None: 84 if not events: 85 return [] 86 87 future = self._loop.create_future() 88 89 try: 90 await self._register_queue.put((future, source, events)) 91 92 except aio.QueueClosedError: 93 raise Exception('engine closed') 94 95 return await future 96 97 async def query(self, 98 params: common.QueryParams 99 ) -> common.QueryResult: 100 return await self._backend.query(params) 101 102 def get_client_names(self) -> Iterable[tuple[common.Source, str]]: 103 return self._eventer_server.get_client_names() 104 105 def restart(self): 106 self._restart_cb() 107 108 def reset_monitor_ready(self): 109 self._reset_monitor_ready_cb() 110 111 async def _register_loop(self): 112 future = None 113 mlog.debug("starting register loop") 114 115 try: 116 while True: 117 mlog.debug("waiting for register requests") 118 future, source, register_events = \ 119 await self._register_queue.get() 120 121 mlog.debug("processing session") 122 events = await self._process_sessions(source, register_events) 123 124 mlog.debug("registering to backend") 125 events = await self._backend.register(events) 126 127 if future.done(): 128 continue 129 130 result = ( 131 list(event for event, _ in zip(events, register_events)) 132 if events is not None else None) 133 future.set_result(result) 134 135 except Exception as e: 136 mlog.error("register loop error: %s", e, exc_info=e) 137 138 finally: 139 mlog.debug("register loop closed") 140 self.close() 141 self._register_queue.close() 142 143 while True: 144 if future and not future.done(): 145 future.set_exception(Exception('engine closed')) 146 if self._register_queue.empty(): 147 break 148 future, _, __ = self._register_queue.get_nowait() 149 150 timestamp = self._create_session() 151 status_reg_event = self._create_status_reg_event('STOPPED') 152 events = [self._create_event(timestamp, status_reg_event)] 153 await self._backend.register(events) 154 155 async def _process_sessions(self, source, register_events): 156 timestamp = self._create_session() 157 158 for _, module in self._source_modules: 159 await aio.call(module.on_session_start, 160 self._last_event_id.session) 161 162 events = collections.deque( 163 self._create_event(timestamp, register_event) 164 for register_event in register_events) 165 166 input_source_events = [(source, event) for event in events] 167 while input_source_events: 168 output_source_events = collections.deque() 169 170 for output_source, module in self._source_modules: 171 for input_source, input_event in input_source_events: 172 if not module.subscription.matches(input_event.type): 173 continue 174 175 output_register_events = await aio.call( 176 module.process, input_source, input_event) 177 178 if not output_register_events: 179 continue 180 181 for output_register_event in output_register_events: 182 output_event = self._create_event( 183 timestamp, output_register_event) 184 output_source_events.append( 185 (output_source, output_event)) 186 events.append(output_event) 187 188 input_source_events = output_source_events 189 190 for _, module in self._source_modules: 191 await aio.call(module.on_session_stop, self._last_event_id.session) 192 193 return events 194 195 def _create_status_reg_event(self, status): 196 return common.RegisterEvent( 197 type=('event', str(self._server_id), 'engine'), 198 source_timestamp=None, 199 payload=common.EventPayloadJson(status)) 200 201 def _create_session(self): 202 self._last_event_id = self._last_event_id._replace( 203 session=self._last_event_id.session + 1, 204 instance=0) 205 206 return common.now() 207 208 def _create_event(self, timestamp, register_event): 209 self._last_event_id = self._last_event_id._replace( 210 instance=self._last_event_id.instance + 1) 211 212 return common.Event(id=self._last_event_id, 213 type=register_event.type, 214 timestamp=timestamp, 215 source_timestamp=register_event.source_timestamp, 216 payload=register_event.payload)
Module logger
async def
create_engine( backend: hat.event.common.Backend, eventer_server: hat.event.server.eventer_server.EventerServer, module_confs: Iterable[typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]], server_id: int, restart_cb: Callable[[], None], reset_monitor_ready_cb: Callable[[], None], register_queue_size: int = 1024) -> Engine:
20async def create_engine(backend: common.Backend, 21 eventer_server: hat.event.server.eventer_server.EventerServer, # NOQA 22 module_confs: Iterable[json.Data], 23 server_id: int, 24 restart_cb: Callable[[], None], 25 reset_monitor_ready_cb: Callable[[], None], 26 register_queue_size: int = 1024 27 ) -> 'Engine': 28 """Create engine""" 29 engine = Engine() 30 engine._backend = backend 31 engine._eventer_server = eventer_server 32 engine._server_id = server_id 33 engine._restart_cb = restart_cb 34 engine._reset_monitor_ready_cb = reset_monitor_ready_cb 35 engine._loop = asyncio.get_running_loop() 36 engine._async_group = aio.Group() 37 engine._register_queue = aio.Queue(register_queue_size) 38 engine._source_modules = collections.deque() 39 40 engine._last_event_id = await backend.get_last_event_id(server_id) 41 42 future = engine._loop.create_future() 43 source = common.Source(type=common.SourceType.ENGINE, id=0) 44 events = [engine._create_status_reg_event('STARTED')] 45 engine._register_queue.put_nowait((future, source, events)) 46 47 try: 48 for source_id, module_conf in enumerate(module_confs): 49 info = common.import_module_info(module_conf['module']) 50 source = common.Source(type=common.SourceType.MODULE, 51 id=source_id) 52 53 module = await engine.async_group.spawn( 54 aio.call, info.create, module_conf, engine, source) 55 engine.async_group.spawn(aio.call_on_cancel, module.async_close) 56 engine.async_group.spawn(aio.call_on_done, module.wait_closing(), 57 engine.close) 58 59 engine._source_modules.append((source, module)) 60 61 engine.async_group.spawn(engine._register_loop) 62 63 except BaseException: 64 await aio.uncancellable(engine.async_close()) 65 raise 66 67 return engine
Create engine
class
Engine(hat.event.common.module.Engine):
70class Engine(common.Engine): 71 72 @property 73 def async_group(self) -> aio.Group: 74 """Async group""" 75 return self._async_group 76 77 @property 78 def server_id(self) -> int: 79 return self._server_id 80 81 async def register(self, 82 source: common.Source, 83 events: Collection[common.RegisterEvent] 84 ) -> Collection[common.Event] | None: 85 if not events: 86 return [] 87 88 future = self._loop.create_future() 89 90 try: 91 await self._register_queue.put((future, source, events)) 92 93 except aio.QueueClosedError: 94 raise Exception('engine closed') 95 96 return await future 97 98 async def query(self, 99 params: common.QueryParams 100 ) -> common.QueryResult: 101 return await self._backend.query(params) 102 103 def get_client_names(self) -> Iterable[tuple[common.Source, str]]: 104 return self._eventer_server.get_client_names() 105 106 def restart(self): 107 self._restart_cb() 108 109 def reset_monitor_ready(self): 110 self._reset_monitor_ready_cb() 111 112 async def _register_loop(self): 113 future = None 114 mlog.debug("starting register loop") 115 116 try: 117 while True: 118 mlog.debug("waiting for register requests") 119 future, source, register_events = \ 120 await self._register_queue.get() 121 122 mlog.debug("processing session") 123 events = await self._process_sessions(source, register_events) 124 125 mlog.debug("registering to backend") 126 events = await self._backend.register(events) 127 128 if future.done(): 129 continue 130 131 result = ( 132 list(event for event, _ in zip(events, register_events)) 133 if events is not None else None) 134 future.set_result(result) 135 136 except Exception as e: 137 mlog.error("register loop error: %s", e, exc_info=e) 138 139 finally: 140 mlog.debug("register loop closed") 141 self.close() 142 self._register_queue.close() 143 144 while True: 145 if future and not future.done(): 146 future.set_exception(Exception('engine closed')) 147 if self._register_queue.empty(): 148 break 149 future, _, __ = self._register_queue.get_nowait() 150 151 timestamp = self._create_session() 152 status_reg_event = self._create_status_reg_event('STOPPED') 153 events = [self._create_event(timestamp, status_reg_event)] 154 await self._backend.register(events) 155 156 async def _process_sessions(self, source, register_events): 157 timestamp = self._create_session() 158 159 for _, module in self._source_modules: 160 await aio.call(module.on_session_start, 161 self._last_event_id.session) 162 163 events = collections.deque( 164 self._create_event(timestamp, register_event) 165 for register_event in register_events) 166 167 input_source_events = [(source, event) for event in events] 168 while input_source_events: 169 output_source_events = collections.deque() 170 171 for output_source, module in self._source_modules: 172 for input_source, input_event in input_source_events: 173 if not module.subscription.matches(input_event.type): 174 continue 175 176 output_register_events = await aio.call( 177 module.process, input_source, input_event) 178 179 if not output_register_events: 180 continue 181 182 for output_register_event in output_register_events: 183 output_event = self._create_event( 184 timestamp, output_register_event) 185 output_source_events.append( 186 (output_source, output_event)) 187 events.append(output_event) 188 189 input_source_events = output_source_events 190 191 for _, module in self._source_modules: 192 await aio.call(module.on_session_stop, self._last_event_id.session) 193 194 return events 195 196 def _create_status_reg_event(self, status): 197 return common.RegisterEvent( 198 type=('event', str(self._server_id), 'engine'), 199 source_timestamp=None, 200 payload=common.EventPayloadJson(status)) 201 202 def _create_session(self): 203 self._last_event_id = self._last_event_id._replace( 204 session=self._last_event_id.session + 1, 205 instance=0) 206 207 return common.now() 208 209 def _create_event(self, timestamp, register_event): 210 self._last_event_id = self._last_event_id._replace( 211 instance=self._last_event_id.instance + 1) 212 213 return common.Event(id=self._last_event_id, 214 type=register_event.type, 215 timestamp=timestamp, 216 source_timestamp=register_event.source_timestamp, 217 payload=register_event.payload)
Engine ABC
async_group: hat.aio.group.Group
72 @property 73 def async_group(self) -> aio.Group: 74 """Async group""" 75 return self._async_group
Async group
async def
register( self, source: hat.event.common.Source, events: Collection[hat.event.common.RegisterEvent]) -> Collection[hat.event.common.Event] | None:
81 async def register(self, 82 source: common.Source, 83 events: Collection[common.RegisterEvent] 84 ) -> Collection[common.Event] | None: 85 if not events: 86 return [] 87 88 future = self._loop.create_future() 89 90 try: 91 await self._register_queue.put((future, source, events)) 92 93 except aio.QueueClosedError: 94 raise Exception('engine closed') 95 96 return await future
Register events
async def
query( self, params: hat.event.common.QueryLatestParams | hat.event.common.QueryTimeseriesParams | hat.event.common.QueryServerParams) -> hat.event.common.QueryResult:
98 async def query(self, 99 params: common.QueryParams 100 ) -> common.QueryResult: 101 return await self._backend.query(params)
Query events