hat.event.server.engine
Engine
1"""Engine""" 2 3from collections.abc import Collection, Iterable 4import asyncio 5import collections 6import logging 7 8from hat import aio 9from hat import json 10 11from hat.event import common 12 13 14mlog: logging.Logger = logging.getLogger(__name__) 15"""Module logger""" 16 17 18async def create_engine(backend: common.Backend, 19 module_confs: Iterable[json.Data], 20 server_id: int, 21 register_queue_size: int = 1024 22 ) -> 'Engine': 23 """Create engine""" 24 engine = Engine() 25 engine._backend = backend 26 engine._server_id = server_id 27 engine._loop = asyncio.get_running_loop() 28 engine._async_group = aio.Group() 29 engine._register_queue = aio.Queue(register_queue_size) 30 engine._source_modules = collections.deque() 31 32 engine._last_event_id = await backend.get_last_event_id(server_id) 33 34 future = engine._loop.create_future() 35 source = common.Source(type=common.SourceType.ENGINE, id=0) 36 events = [engine._create_status_reg_event('STARTED')] 37 engine._register_queue.put_nowait((future, source, events)) 38 39 try: 40 for source_id, module_conf in enumerate(module_confs): 41 info = common.import_module_info(module_conf['module']) 42 source = common.Source(type=common.SourceType.MODULE, 43 id=source_id) 44 45 module = await engine.async_group.spawn( 46 aio.call, info.create, module_conf, engine, source) 47 engine.async_group.spawn(aio.call_on_cancel, module.async_close) 48 engine.async_group.spawn(aio.call_on_done, module.wait_closing(), 49 engine.close) 50 51 engine._source_modules.append((source, module)) 52 53 engine.async_group.spawn(engine._register_loop) 54 55 except BaseException: 56 await aio.uncancellable(engine.async_close()) 57 raise 58 59 return engine 60 61 62class Engine(common.Engine): 63 64 @property 65 def async_group(self) -> aio.Group: 66 """Async group""" 67 return self._async_group 68 69 @property 70 def server_id(self) -> int: 71 """Event server identifier""" 72 return self._server_id 73 74 async def register(self, 75 source: common.Source, 76 events: Collection[common.RegisterEvent] 77 ) -> Collection[common.Event] | None: 78 """Register events""" 79 if not events: 80 return [] 81 82 future = self._loop.create_future() 83 84 try: 85 await self._register_queue.put((future, source, events)) 86 87 except aio.QueueClosedError: 88 raise Exception('engine closed') 89 90 return await future 91 92 async def query(self, 93 params: common.QueryParams 94 ) -> common.QueryResult: 95 """Query events""" 96 return await self._backend.query(params) 97 98 async def _register_loop(self): 99 future = None 100 mlog.debug("starting register loop") 101 102 try: 103 while True: 104 mlog.debug("waiting for register requests") 105 future, source, register_events = \ 106 await self._register_queue.get() 107 108 mlog.debug("processing session") 109 events = await self._process_sessions(source, register_events) 110 111 mlog.debug("registering to backend") 112 events = await self._backend.register(events) 113 114 if future.done(): 115 continue 116 117 result = ( 118 list(event for event, _ in zip(events, register_events)) 119 if events is not None else None) 120 future.set_result(result) 121 122 except Exception as e: 123 mlog.error("register loop error: %s", e, exc_info=e) 124 125 finally: 126 mlog.debug("register loop closed") 127 self.close() 128 self._register_queue.close() 129 130 while True: 131 if future and not future.done(): 132 future.set_exception(Exception('engine closed')) 133 if self._register_queue.empty(): 134 break 135 future, _, __ = self._register_queue.get_nowait() 136 137 timestamp = self._create_session() 138 status_reg_event = self._create_status_reg_event('STOPPED') 139 events = [self._create_event(timestamp, status_reg_event)] 140 await self._backend.register(events) 141 142 async def _process_sessions(self, source, register_events): 143 timestamp = self._create_session() 144 145 for _, module in self._source_modules: 146 await aio.call(module.on_session_start, 147 self._last_event_id.session) 148 149 events = collections.deque( 150 self._create_event(timestamp, register_event) 151 for register_event in register_events) 152 153 input_source_events = [(source, event) for event in events] 154 while input_source_events: 155 output_source_events = collections.deque() 156 157 for output_source, module in self._source_modules: 158 for input_source, input_event in input_source_events: 159 if not module.subscription.matches(input_event.type): 160 continue 161 162 output_register_events = await aio.call( 163 module.process, input_source, input_event) 164 165 if not output_register_events: 166 continue 167 168 for output_register_event in output_register_events: 169 output_event = self._create_event( 170 timestamp, output_register_event) 171 output_source_events.append( 172 (output_source, output_event)) 173 events.append(output_event) 174 175 input_source_events = output_source_events 176 177 for _, module in self._source_modules: 178 await aio.call(module.on_session_stop, self._last_event_id.session) 179 180 return events 181 182 def _create_status_reg_event(self, status): 183 return common.RegisterEvent( 184 type=('event', str(self._server_id), 'engine'), 185 source_timestamp=None, 186 payload=common.EventPayloadJson(status)) 187 188 def _create_session(self): 189 self._last_event_id = self._last_event_id._replace( 190 session=self._last_event_id.session + 1, 191 instance=0) 192 193 return common.now() 194 195 def _create_event(self, timestamp, register_event): 196 self._last_event_id = self._last_event_id._replace( 197 instance=self._last_event_id.instance + 1) 198 199 return common.Event(id=self._last_event_id, 200 type=register_event.type, 201 timestamp=timestamp, 202 source_timestamp=register_event.source_timestamp, 203 payload=register_event.payload)
Module logger
async def
create_engine( backend: hat.event.common.backend.Backend, module_confs: collections.abc.Iterable[None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]], server_id: int, register_queue_size: int = 1024) -> Engine:
19async def create_engine(backend: common.Backend, 20 module_confs: Iterable[json.Data], 21 server_id: int, 22 register_queue_size: int = 1024 23 ) -> 'Engine': 24 """Create engine""" 25 engine = Engine() 26 engine._backend = backend 27 engine._server_id = server_id 28 engine._loop = asyncio.get_running_loop() 29 engine._async_group = aio.Group() 30 engine._register_queue = aio.Queue(register_queue_size) 31 engine._source_modules = collections.deque() 32 33 engine._last_event_id = await backend.get_last_event_id(server_id) 34 35 future = engine._loop.create_future() 36 source = common.Source(type=common.SourceType.ENGINE, id=0) 37 events = [engine._create_status_reg_event('STARTED')] 38 engine._register_queue.put_nowait((future, source, events)) 39 40 try: 41 for source_id, module_conf in enumerate(module_confs): 42 info = common.import_module_info(module_conf['module']) 43 source = common.Source(type=common.SourceType.MODULE, 44 id=source_id) 45 46 module = await engine.async_group.spawn( 47 aio.call, info.create, module_conf, engine, source) 48 engine.async_group.spawn(aio.call_on_cancel, module.async_close) 49 engine.async_group.spawn(aio.call_on_done, module.wait_closing(), 50 engine.close) 51 52 engine._source_modules.append((source, module)) 53 54 engine.async_group.spawn(engine._register_loop) 55 56 except BaseException: 57 await aio.uncancellable(engine.async_close()) 58 raise 59 60 return engine
Create engine
class
Engine(hat.event.common.module.Engine):
63class Engine(common.Engine): 64 65 @property 66 def async_group(self) -> aio.Group: 67 """Async group""" 68 return self._async_group 69 70 @property 71 def server_id(self) -> int: 72 """Event server identifier""" 73 return self._server_id 74 75 async def register(self, 76 source: common.Source, 77 events: Collection[common.RegisterEvent] 78 ) -> Collection[common.Event] | None: 79 """Register events""" 80 if not events: 81 return [] 82 83 future = self._loop.create_future() 84 85 try: 86 await self._register_queue.put((future, source, events)) 87 88 except aio.QueueClosedError: 89 raise Exception('engine closed') 90 91 return await future 92 93 async def query(self, 94 params: common.QueryParams 95 ) -> common.QueryResult: 96 """Query events""" 97 return await self._backend.query(params) 98 99 async def _register_loop(self): 100 future = None 101 mlog.debug("starting register loop") 102 103 try: 104 while True: 105 mlog.debug("waiting for register requests") 106 future, source, register_events = \ 107 await self._register_queue.get() 108 109 mlog.debug("processing session") 110 events = await self._process_sessions(source, register_events) 111 112 mlog.debug("registering to backend") 113 events = await self._backend.register(events) 114 115 if future.done(): 116 continue 117 118 result = ( 119 list(event for event, _ in zip(events, register_events)) 120 if events is not None else None) 121 future.set_result(result) 122 123 except Exception as e: 124 mlog.error("register loop error: %s", e, exc_info=e) 125 126 finally: 127 mlog.debug("register loop closed") 128 self.close() 129 self._register_queue.close() 130 131 while True: 132 if future and not future.done(): 133 future.set_exception(Exception('engine closed')) 134 if self._register_queue.empty(): 135 break 136 future, _, __ = self._register_queue.get_nowait() 137 138 timestamp = self._create_session() 139 status_reg_event = self._create_status_reg_event('STOPPED') 140 events = [self._create_event(timestamp, status_reg_event)] 141 await self._backend.register(events) 142 143 async def _process_sessions(self, source, register_events): 144 timestamp = self._create_session() 145 146 for _, module in self._source_modules: 147 await aio.call(module.on_session_start, 148 self._last_event_id.session) 149 150 events = collections.deque( 151 self._create_event(timestamp, register_event) 152 for register_event in register_events) 153 154 input_source_events = [(source, event) for event in events] 155 while input_source_events: 156 output_source_events = collections.deque() 157 158 for output_source, module in self._source_modules: 159 for input_source, input_event in input_source_events: 160 if not module.subscription.matches(input_event.type): 161 continue 162 163 output_register_events = await aio.call( 164 module.process, input_source, input_event) 165 166 if not output_register_events: 167 continue 168 169 for output_register_event in output_register_events: 170 output_event = self._create_event( 171 timestamp, output_register_event) 172 output_source_events.append( 173 (output_source, output_event)) 174 events.append(output_event) 175 176 input_source_events = output_source_events 177 178 for _, module in self._source_modules: 179 await aio.call(module.on_session_stop, self._last_event_id.session) 180 181 return events 182 183 def _create_status_reg_event(self, status): 184 return common.RegisterEvent( 185 type=('event', str(self._server_id), 'engine'), 186 source_timestamp=None, 187 payload=common.EventPayloadJson(status)) 188 189 def _create_session(self): 190 self._last_event_id = self._last_event_id._replace( 191 session=self._last_event_id.session + 1, 192 instance=0) 193 194 return common.now() 195 196 def _create_event(self, timestamp, register_event): 197 self._last_event_id = self._last_event_id._replace( 198 instance=self._last_event_id.instance + 1) 199 200 return common.Event(id=self._last_event_id, 201 type=register_event.type, 202 timestamp=timestamp, 203 source_timestamp=register_event.source_timestamp, 204 payload=register_event.payload)
Engine ABC
async_group: hat.aio.group.Group
65 @property 66 def async_group(self) -> aio.Group: 67 """Async group""" 68 return self._async_group
Async group
server_id: int
70 @property 71 def server_id(self) -> int: 72 """Event server identifier""" 73 return self._server_id
Event server identifier
async def
register( self, source: hat.event.common.module.Source, events: collections.abc.Collection[hat.event.common.common.RegisterEvent]) -> collections.abc.Collection[hat.event.common.common.Event] | None:
75 async def register(self, 76 source: common.Source, 77 events: Collection[common.RegisterEvent] 78 ) -> Collection[common.Event] | None: 79 """Register events""" 80 if not events: 81 return [] 82 83 future = self._loop.create_future() 84 85 try: 86 await self._register_queue.put((future, source, events)) 87 88 except aio.QueueClosedError: 89 raise Exception('engine closed') 90 91 return await future
Register events
async def
query( self, params: hat.event.common.common.QueryLatestParams | hat.event.common.common.QueryTimeseriesParams | hat.event.common.common.QueryServerParams) -> hat.event.common.common.QueryResult:
93 async def query(self, 94 params: common.QueryParams 95 ) -> common.QueryResult: 96 """Query events""" 97 return await self._backend.query(params)
Query events
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close