hat.event.server.eventer_server
Eventer server
1"""Eventer server""" 2 3from collections.abc import Collection, Iterable 4import logging 5 6from hat import aio 7from hat.drivers import tcp 8 9from hat.event import common 10from hat.event import eventer 11 12 13mlog: logging.Logger = logging.getLogger(__name__) 14"""Module logger""" 15 16 17async def create_eventer_server(addr: tcp.Address, 18 backend: common.Backend, 19 server_id: int, 20 *, 21 server_token: str | None = None, 22 **kwargs 23 ) -> 'EventerServer': 24 """Create eventer server""" 25 server = EventerServer() 26 server._backend = backend 27 server._server_id = server_id 28 server._server_token = server_token 29 server._engine = None 30 31 server._srv = await eventer.listen(addr, 32 connected_cb=server._on_connected, 33 disconnected_cb=server._on_disconnected, 34 register_cb=server._on_register, 35 query_cb=server._on_query, 36 **kwargs) 37 38 return server 39 40 41class EventerServer(aio.Resource): 42 """Eventer server 43 44 For creating new server see `create_eventer_server` coroutine. 45 46 """ 47 48 @property 49 def async_group(self) -> aio.Group: 50 """Async group""" 51 return self._srv.async_group 52 53 def get_client_names(self) -> Iterable[tuple[common.Source, str]]: 54 """Get client names""" 55 for info in self._srv.get_conn_infos(): 56 yield _get_source(info.id), info.client_name 57 58 async def set_status(self, 59 status: common.Status, 60 engine: common.Engine | None): 61 """Set status""" 62 if status == common.Status.OPERATIONAL: 63 if not engine: 64 raise ValueError('invalid status/engine') 65 66 else: 67 if engine: 68 raise ValueError('invalid status/engine') 69 70 self._engine = engine 71 await self._srv.set_status(status) 72 73 async def notify_events(self, 74 events: Collection[common.Event], 75 persisted: bool, 76 with_ack: bool = False): 77 """Notify events""" 78 await self._srv.notify_events(events, persisted, with_ack) 79 80 async def _on_connected(self, info): 81 if (info.client_token is not None and 82 info.client_token != self._server_token): 83 raise Exception('invalid client token') 84 85 if not self._engine or not self._engine.is_open: 86 return 87 88 source = _get_source(info.id) 89 register_event = self._create_eventer_event(info, 'CONNECTED') 90 await self._engine.register(source, [register_event]) 91 92 async def _on_disconnected(self, info): 93 if not self._engine or not self._engine.is_open: 94 return 95 96 source = _get_source(info.id) 97 register_event = self._create_eventer_event(info, 'DISCONNECTED') 98 await self._engine.register(source, [register_event]) 99 100 async def _on_register(self, info, register_events): 101 if not self._engine: 102 return 103 104 source = _get_source(info.id) 105 return await self._engine.register(source, register_events) 106 107 async def _on_query(self, info, params): 108 return await self._backend.query(params) 109 110 def _create_eventer_event(self, info, status): 111 return common.RegisterEvent( 112 type=('event', str(self._server_id), 'eventer', info.client_name), 113 source_timestamp=None, 114 payload=common.EventPayloadJson(status)) 115 116 117def _get_source(source_id): 118 return common.Source(type=common.SourceType.EVENTER, 119 id=source_id)
Module logger
async def
create_eventer_server( addr: hat.drivers.tcp.Address, backend: hat.event.common.Backend, server_id: int, *, server_token: str | None = None, **kwargs) -> EventerServer:
18async def create_eventer_server(addr: tcp.Address, 19 backend: common.Backend, 20 server_id: int, 21 *, 22 server_token: str | None = None, 23 **kwargs 24 ) -> 'EventerServer': 25 """Create eventer server""" 26 server = EventerServer() 27 server._backend = backend 28 server._server_id = server_id 29 server._server_token = server_token 30 server._engine = None 31 32 server._srv = await eventer.listen(addr, 33 connected_cb=server._on_connected, 34 disconnected_cb=server._on_disconnected, 35 register_cb=server._on_register, 36 query_cb=server._on_query, 37 **kwargs) 38 39 return server
Create eventer server
class
EventerServer(hat.aio.group.Resource):
42class EventerServer(aio.Resource): 43 """Eventer server 44 45 For creating new server see `create_eventer_server` coroutine. 46 47 """ 48 49 @property 50 def async_group(self) -> aio.Group: 51 """Async group""" 52 return self._srv.async_group 53 54 def get_client_names(self) -> Iterable[tuple[common.Source, str]]: 55 """Get client names""" 56 for info in self._srv.get_conn_infos(): 57 yield _get_source(info.id), info.client_name 58 59 async def set_status(self, 60 status: common.Status, 61 engine: common.Engine | None): 62 """Set status""" 63 if status == common.Status.OPERATIONAL: 64 if not engine: 65 raise ValueError('invalid status/engine') 66 67 else: 68 if engine: 69 raise ValueError('invalid status/engine') 70 71 self._engine = engine 72 await self._srv.set_status(status) 73 74 async def notify_events(self, 75 events: Collection[common.Event], 76 persisted: bool, 77 with_ack: bool = False): 78 """Notify events""" 79 await self._srv.notify_events(events, persisted, with_ack) 80 81 async def _on_connected(self, info): 82 if (info.client_token is not None and 83 info.client_token != self._server_token): 84 raise Exception('invalid client token') 85 86 if not self._engine or not self._engine.is_open: 87 return 88 89 source = _get_source(info.id) 90 register_event = self._create_eventer_event(info, 'CONNECTED') 91 await self._engine.register(source, [register_event]) 92 93 async def _on_disconnected(self, info): 94 if not self._engine or not self._engine.is_open: 95 return 96 97 source = _get_source(info.id) 98 register_event = self._create_eventer_event(info, 'DISCONNECTED') 99 await self._engine.register(source, [register_event]) 100 101 async def _on_register(self, info, register_events): 102 if not self._engine: 103 return 104 105 source = _get_source(info.id) 106 return await self._engine.register(source, register_events) 107 108 async def _on_query(self, info, params): 109 return await self._backend.query(params) 110 111 def _create_eventer_event(self, info, status): 112 return common.RegisterEvent( 113 type=('event', str(self._server_id), 'eventer', info.client_name), 114 source_timestamp=None, 115 payload=common.EventPayloadJson(status))
Eventer server
For creating new server see create_eventer_server
coroutine.
async_group: hat.aio.group.Group
49 @property 50 def async_group(self) -> aio.Group: 51 """Async group""" 52 return self._srv.async_group
Async group
54 def get_client_names(self) -> Iterable[tuple[common.Source, str]]: 55 """Get client names""" 56 for info in self._srv.get_conn_infos(): 57 yield _get_source(info.id), info.client_name
Get client names
async def
set_status( self, status: hat.event.common.Status, engine: hat.event.common.Engine | None):
59 async def set_status(self, 60 status: common.Status, 61 engine: common.Engine | None): 62 """Set status""" 63 if status == common.Status.OPERATIONAL: 64 if not engine: 65 raise ValueError('invalid status/engine') 66 67 else: 68 if engine: 69 raise ValueError('invalid status/engine') 70 71 self._engine = engine 72 await self._srv.set_status(status)
Set status
async def
notify_events( self, events: Collection[hat.event.common.Event], persisted: bool, with_ack: bool = False):
74 async def notify_events(self, 75 events: Collection[common.Event], 76 persisted: bool, 77 with_ack: bool = False): 78 """Notify events""" 79 await self._srv.notify_events(events, persisted, with_ack)
Notify events