hat.event.server.engine
Engine
1"""Engine""" 2 3import asyncio 4import collections 5import importlib 6import logging 7import typing 8 9from hat import aio 10from hat import json 11from hat import util 12from hat.event.server import common 13 14 15mlog: logging.Logger = logging.getLogger(__name__) 16"""Module logger""" 17 18EventsCb = typing.Callable[[typing.List[common.Event]], None] 19"""Events callback""" 20 21 22async def create_engine(conf: json.Data, 23 backend: common.Backend 24 ) -> 'Engine': 25 """Create engine 26 27 Args: 28 conf: configuration defined by 29 ``hat-event://main.yaml#/definitions/engine`` 30 backend: backend 31 32 """ 33 engine = Engine() 34 engine._backend = backend 35 engine._async_group = aio.Group() 36 engine._register_queue = aio.Queue() 37 engine._register_cbs = util.CallbackRegistry() 38 engine._source_modules = collections.deque() 39 40 engine._last_event_id = await backend.get_last_event_id(conf['server_id']) 41 42 future = asyncio.Future() 43 source = common.Source(type=common.SourceType.ENGINE, id=0) 44 events = [engine._create_status_reg_event('STARTED')] 45 engine._register_queue.put_nowait((future, source, events)) 46 try: 47 for source_id, module_conf in enumerate(conf['modules']): 48 py_module = importlib.import_module(module_conf['module']) 49 50 source = common.Source(type=common.SourceType.MODULE, 51 id=source_id) 52 53 module = await engine.async_group.spawn( 54 aio.call, py_module.create, module_conf, engine, source) 55 engine.async_group.spawn(aio.call_on_cancel, module.async_close) 56 engine.async_group.spawn(aio.call_on_done, module.wait_closing(), 57 engine.close) 58 59 engine._source_modules.append((source, module)) 60 61 engine.async_group.spawn(engine._register_loop) 62 63 except BaseException: 64 await aio.uncancellable(engine.async_close()) 65 raise 66 67 return engine 68 69 70class Engine(aio.Resource): 71 72 @property 73 def async_group(self) -> aio.Group: 74 """Async group""" 75 return self._async_group 76 77 def register_events_cb(self, 78 cb: EventsCb 79 ) -> util.RegisterCallbackHandle: 80 """Register events callback""" 81 return self._register_cbs.register(cb) 82 83 async def register(self, 84 source: common.Source, 85 events: typing.List[common.RegisterEvent] 86 ) -> typing.List[typing.Optional[common.Event]]: 87 """Register events""" 88 if not events: 89 return [] 90 91 future = asyncio.Future() 92 self._register_queue.put_nowait((future, source, events)) 93 return await future 94 95 async def query(self, 96 data: common.QueryData 97 ) -> typing.List[common.Event]: 98 """Query events""" 99 return await self._backend.query(data) 100 101 async def _register_loop(self): 102 future = None 103 mlog.debug("starting register loop") 104 105 try: 106 while True: 107 mlog.debug("waiting for register requests") 108 future, source, register_events = \ 109 await self._register_queue.get() 110 mlog.debug("processing session") 111 events = await self._process_sessions(source, register_events) 112 113 mlog.debug("registering to backend") 114 events = await self._backend.register(events) 115 if not future.done(): 116 result = events[:len(register_events)] 117 future.set_result(result) 118 119 events = [event for event in events if event] 120 if events: 121 self._register_cbs.notify(events) 122 123 except Exception as e: 124 mlog.error("register loop error: %s", e, exc_info=e) 125 126 finally: 127 mlog.debug("register loop closed") 128 self.close() 129 self._register_queue.close() 130 131 while True: 132 if future and not future.done(): 133 future.set_exception(Exception('module engine closed')) 134 if self._register_queue.empty(): 135 break 136 future, _, __ = self._register_queue.get_nowait() 137 138 status_reg_event = self._create_status_reg_event('STOPPED') 139 events = [self._create_event(common.now(), status_reg_event)] 140 await self._backend.register(events) 141 142 async def _process_sessions(self, source, register_events): 143 timestamp = common.now() 144 self._last_event_id = self._last_event_id._replace( 145 session=self._last_event_id.session + 1) 146 147 for _, module in self._source_modules: 148 await module.on_session_start(self._last_event_id.session) 149 150 events = collections.deque( 151 self._create_event(timestamp, register_event) 152 for register_event in register_events) 153 154 input_source_events = [(source, event) for event in events] 155 while input_source_events: 156 output_source_events = collections.deque() 157 158 for output_source, module in self._source_modules: 159 for input_source, input_event in input_source_events: 160 if not module.subscription.matches(input_event.event_type): 161 continue 162 163 async for register_event in module.process(input_source, 164 input_event): 165 output_event = self._create_event(timestamp, 166 register_event) 167 output_source_events.append((output_source, 168 output_event)) 169 events.append(output_event) 170 171 input_source_events = output_source_events 172 173 for _, module in self._source_modules: 174 await module.on_session_stop(self._last_event_id.session) 175 176 return list(events) 177 178 def _create_status_reg_event(self, status): 179 return common.RegisterEvent( 180 event_type=('event', 'engine'), 181 source_timestamp=None, 182 payload=common.EventPayload( 183 type=common.EventPayloadType.JSON, 184 data=status)) 185 186 def _create_event(self, timestamp, register_event): 187 self._last_event_id = self._last_event_id._replace( 188 instance=self._last_event_id.instance + 1) 189 190 return common.Event(event_id=self._last_event_id, 191 event_type=register_event.event_type, 192 timestamp=timestamp, 193 source_timestamp=register_event.source_timestamp, 194 payload=register_event.payload)
Module logger
EventsCb = typing.Callable[[typing.List[hat.event.common.data.Event]], NoneType]
Events callback
async def
create_engine( conf: ~Data, backend: hat.event.server.common.Backend) -> hat.event.server.engine.Engine:
23async def create_engine(conf: json.Data, 24 backend: common.Backend 25 ) -> 'Engine': 26 """Create engine 27 28 Args: 29 conf: configuration defined by 30 ``hat-event://main.yaml#/definitions/engine`` 31 backend: backend 32 33 """ 34 engine = Engine() 35 engine._backend = backend 36 engine._async_group = aio.Group() 37 engine._register_queue = aio.Queue() 38 engine._register_cbs = util.CallbackRegistry() 39 engine._source_modules = collections.deque() 40 41 engine._last_event_id = await backend.get_last_event_id(conf['server_id']) 42 43 future = asyncio.Future() 44 source = common.Source(type=common.SourceType.ENGINE, id=0) 45 events = [engine._create_status_reg_event('STARTED')] 46 engine._register_queue.put_nowait((future, source, events)) 47 try: 48 for source_id, module_conf in enumerate(conf['modules']): 49 py_module = importlib.import_module(module_conf['module']) 50 51 source = common.Source(type=common.SourceType.MODULE, 52 id=source_id) 53 54 module = await engine.async_group.spawn( 55 aio.call, py_module.create, module_conf, engine, source) 56 engine.async_group.spawn(aio.call_on_cancel, module.async_close) 57 engine.async_group.spawn(aio.call_on_done, module.wait_closing(), 58 engine.close) 59 60 engine._source_modules.append((source, module)) 61 62 engine.async_group.spawn(engine._register_loop) 63 64 except BaseException: 65 await aio.uncancellable(engine.async_close()) 66 raise 67 68 return engine
Create engine
Arguments:
- conf: configuration defined by
hat-event://main.yaml#/definitions/engine
- backend: backend
class
Engine(hat.aio.Resource):
71class Engine(aio.Resource): 72 73 @property 74 def async_group(self) -> aio.Group: 75 """Async group""" 76 return self._async_group 77 78 def register_events_cb(self, 79 cb: EventsCb 80 ) -> util.RegisterCallbackHandle: 81 """Register events callback""" 82 return self._register_cbs.register(cb) 83 84 async def register(self, 85 source: common.Source, 86 events: typing.List[common.RegisterEvent] 87 ) -> typing.List[typing.Optional[common.Event]]: 88 """Register events""" 89 if not events: 90 return [] 91 92 future = asyncio.Future() 93 self._register_queue.put_nowait((future, source, events)) 94 return await future 95 96 async def query(self, 97 data: common.QueryData 98 ) -> typing.List[common.Event]: 99 """Query events""" 100 return await self._backend.query(data) 101 102 async def _register_loop(self): 103 future = None 104 mlog.debug("starting register loop") 105 106 try: 107 while True: 108 mlog.debug("waiting for register requests") 109 future, source, register_events = \ 110 await self._register_queue.get() 111 mlog.debug("processing session") 112 events = await self._process_sessions(source, register_events) 113 114 mlog.debug("registering to backend") 115 events = await self._backend.register(events) 116 if not future.done(): 117 result = events[:len(register_events)] 118 future.set_result(result) 119 120 events = [event for event in events if event] 121 if events: 122 self._register_cbs.notify(events) 123 124 except Exception as e: 125 mlog.error("register loop error: %s", e, exc_info=e) 126 127 finally: 128 mlog.debug("register loop closed") 129 self.close() 130 self._register_queue.close() 131 132 while True: 133 if future and not future.done(): 134 future.set_exception(Exception('module engine closed')) 135 if self._register_queue.empty(): 136 break 137 future, _, __ = self._register_queue.get_nowait() 138 139 status_reg_event = self._create_status_reg_event('STOPPED') 140 events = [self._create_event(common.now(), status_reg_event)] 141 await self._backend.register(events) 142 143 async def _process_sessions(self, source, register_events): 144 timestamp = common.now() 145 self._last_event_id = self._last_event_id._replace( 146 session=self._last_event_id.session + 1) 147 148 for _, module in self._source_modules: 149 await module.on_session_start(self._last_event_id.session) 150 151 events = collections.deque( 152 self._create_event(timestamp, register_event) 153 for register_event in register_events) 154 155 input_source_events = [(source, event) for event in events] 156 while input_source_events: 157 output_source_events = collections.deque() 158 159 for output_source, module in self._source_modules: 160 for input_source, input_event in input_source_events: 161 if not module.subscription.matches(input_event.event_type): 162 continue 163 164 async for register_event in module.process(input_source, 165 input_event): 166 output_event = self._create_event(timestamp, 167 register_event) 168 output_source_events.append((output_source, 169 output_event)) 170 events.append(output_event) 171 172 input_source_events = output_source_events 173 174 for _, module in self._source_modules: 175 await module.on_session_stop(self._last_event_id.session) 176 177 return list(events) 178 179 def _create_status_reg_event(self, status): 180 return common.RegisterEvent( 181 event_type=('event', 'engine'), 182 source_timestamp=None, 183 payload=common.EventPayload( 184 type=common.EventPayloadType.JSON, 185 data=status)) 186 187 def _create_event(self, timestamp, register_event): 188 self._last_event_id = self._last_event_id._replace( 189 instance=self._last_event_id.instance + 1) 190 191 return common.Event(event_id=self._last_event_id, 192 event_type=register_event.event_type, 193 timestamp=timestamp, 194 source_timestamp=register_event.source_timestamp, 195 payload=register_event.payload)
Resource with lifetime control based on Group
.
def
register_events_cb( self, cb: Callable[[List[hat.event.common.data.Event]], NoneType]) -> hat.util.RegisterCallbackHandle:
78 def register_events_cb(self, 79 cb: EventsCb 80 ) -> util.RegisterCallbackHandle: 81 """Register events callback""" 82 return self._register_cbs.register(cb)
Register events callback
async def
register( self, source: hat.event.server.common.Source, events: List[hat.event.common.data.RegisterEvent]) -> List[Optional[hat.event.common.data.Event]]:
84 async def register(self, 85 source: common.Source, 86 events: typing.List[common.RegisterEvent] 87 ) -> typing.List[typing.Optional[common.Event]]: 88 """Register events""" 89 if not events: 90 return [] 91 92 future = asyncio.Future() 93 self._register_queue.put_nowait((future, source, events)) 94 return await future
Register events
async def
query( self, data: hat.event.common.data.QueryData) -> List[hat.event.common.data.Event]:
96 async def query(self, 97 data: common.QueryData 98 ) -> typing.List[common.Event]: 99 """Query events""" 100 return await self._backend.query(data)
Query events
Inherited Members
- hat.aio.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close