hat.event.server.eventer_server
Eventer server
1"""Eventer server""" 2 3from collections.abc import Collection 4import logging 5 6from hat import aio 7from hat.drivers import tcp 8 9from hat.event import common 10from hat.event import eventer 11 12 13mlog: logging.Logger = logging.getLogger(__name__) 14"""Module logger""" 15 16 17async def create_eventer_server(addr: tcp.Address, 18 backend: common.Backend, 19 server_id: int, 20 *, 21 server_token: str | None = None, 22 **kwargs 23 ) -> 'EventerServer': 24 """Create eventer server""" 25 server = EventerServer() 26 server._backend = backend 27 server._server_id = server_id 28 server._server_token = server_token 29 server._engine = None 30 31 server._srv = await eventer.listen(addr, 32 connected_cb=server._on_connected, 33 disconnected_cb=server._on_disconnected, 34 register_cb=server._on_register, 35 query_cb=server._on_query, 36 **kwargs) 37 38 return server 39 40 41class EventerServer(aio.Resource): 42 """Eventer server 43 44 For creating new server see `create_eventer_server` coroutine. 45 46 """ 47 48 @property 49 def async_group(self) -> aio.Group: 50 """Async group""" 51 return self._srv.async_group 52 53 async def set_engine(self, engine: common.Engine | None): 54 """Set engine""" 55 self._engine = engine 56 57 status = common.Status.OPERATIONAL if engine else common.Status.STANDBY 58 await self._srv.set_status(status) 59 60 async def notify_events(self, 61 events: Collection[common.Event], 62 persisted: bool): 63 """Notify events""" 64 await self._srv.notify_events(events, persisted) 65 66 async def _on_connected(self, info): 67 if (info.client_token is not None and 68 info.client_token != self._server_token): 69 raise Exception('invalid client token') 70 71 if not self._engine or not self._engine.is_open: 72 return 73 74 source = _get_source(info.id) 75 register_event = self._create_eventer_event(info, 'CONNECTED') 76 await self._engine.register(source, [register_event]) 77 78 async def _on_disconnected(self, info): 79 if not self._engine or not self._engine.is_open: 80 return 81 82 source = _get_source(info.id) 83 register_event = self._create_eventer_event(info, 'DISCONNECTED') 84 await self._engine.register(source, [register_event]) 85 86 async def _on_register(self, info, register_events): 87 if not self._engine: 88 return 89 90 source = _get_source(info.id) 91 return await self._engine.register(source, register_events) 92 93 async def _on_query(self, info, params): 94 return await self._backend.query(params) 95 96 def _create_eventer_event(self, info, status): 97 return common.RegisterEvent( 98 type=('event', str(self._server_id), 'eventer', info.client_name), 99 source_timestamp=None, 100 payload=common.EventPayloadJson(status)) 101 102 103def _get_source(source_id): 104 return common.Source(type=common.SourceType.EVENTER, 105 id=source_id)
Module logger
async def
create_eventer_server( addr: hat.drivers.tcp.Address, backend: hat.event.common.backend.Backend, server_id: int, *, server_token: str | None = None, **kwargs) -> EventerServer:
18async def create_eventer_server(addr: tcp.Address, 19 backend: common.Backend, 20 server_id: int, 21 *, 22 server_token: str | None = None, 23 **kwargs 24 ) -> 'EventerServer': 25 """Create eventer server""" 26 server = EventerServer() 27 server._backend = backend 28 server._server_id = server_id 29 server._server_token = server_token 30 server._engine = None 31 32 server._srv = await eventer.listen(addr, 33 connected_cb=server._on_connected, 34 disconnected_cb=server._on_disconnected, 35 register_cb=server._on_register, 36 query_cb=server._on_query, 37 **kwargs) 38 39 return server
Create eventer server
class
EventerServer(hat.aio.group.Resource):
42class EventerServer(aio.Resource): 43 """Eventer server 44 45 For creating new server see `create_eventer_server` coroutine. 46 47 """ 48 49 @property 50 def async_group(self) -> aio.Group: 51 """Async group""" 52 return self._srv.async_group 53 54 async def set_engine(self, engine: common.Engine | None): 55 """Set engine""" 56 self._engine = engine 57 58 status = common.Status.OPERATIONAL if engine else common.Status.STANDBY 59 await self._srv.set_status(status) 60 61 async def notify_events(self, 62 events: Collection[common.Event], 63 persisted: bool): 64 """Notify events""" 65 await self._srv.notify_events(events, persisted) 66 67 async def _on_connected(self, info): 68 if (info.client_token is not None and 69 info.client_token != self._server_token): 70 raise Exception('invalid client token') 71 72 if not self._engine or not self._engine.is_open: 73 return 74 75 source = _get_source(info.id) 76 register_event = self._create_eventer_event(info, 'CONNECTED') 77 await self._engine.register(source, [register_event]) 78 79 async def _on_disconnected(self, info): 80 if not self._engine or not self._engine.is_open: 81 return 82 83 source = _get_source(info.id) 84 register_event = self._create_eventer_event(info, 'DISCONNECTED') 85 await self._engine.register(source, [register_event]) 86 87 async def _on_register(self, info, register_events): 88 if not self._engine: 89 return 90 91 source = _get_source(info.id) 92 return await self._engine.register(source, register_events) 93 94 async def _on_query(self, info, params): 95 return await self._backend.query(params) 96 97 def _create_eventer_event(self, info, status): 98 return common.RegisterEvent( 99 type=('event', str(self._server_id), 'eventer', info.client_name), 100 source_timestamp=None, 101 payload=common.EventPayloadJson(status))
Eventer server
For creating new server see create_eventer_server
coroutine.
async_group: hat.aio.group.Group
49 @property 50 def async_group(self) -> aio.Group: 51 """Async group""" 52 return self._srv.async_group
Async group
async def
set_engine(self, engine: hat.event.common.module.Engine | None):
54 async def set_engine(self, engine: common.Engine | None): 55 """Set engine""" 56 self._engine = engine 57 58 status = common.Status.OPERATIONAL if engine else common.Status.STANDBY 59 await self._srv.set_status(status)
Set engine
async def
notify_events( self, events: collections.abc.Collection[hat.event.common.common.Event], persisted: bool):
61 async def notify_events(self, 62 events: Collection[common.Event], 63 persisted: bool): 64 """Notify events""" 65 await self._srv.notify_events(events, persisted)
Notify events
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close