hat.event.server.eventer_server

Eventer server

  1"""Eventer server"""
  2
  3from collections.abc import Collection, Iterable
  4import logging
  5
  6from hat import aio
  7from hat.drivers import tcp
  8
  9from hat.event import common
 10from hat.event import eventer
 11
 12
 13mlog: logging.Logger = logging.getLogger(__name__)
 14"""Module logger"""
 15
 16
 17async def create_eventer_server(addr: tcp.Address,
 18                                backend: common.Backend,
 19                                server_id: int,
 20                                *,
 21                                server_token: str | None = None,
 22                                **kwargs
 23                                ) -> 'EventerServer':
 24    """Create eventer server"""
 25    server = EventerServer()
 26    server._backend = backend
 27    server._server_id = server_id
 28    server._server_token = server_token
 29    server._engine = None
 30
 31    server._srv = await eventer.listen(addr,
 32                                       connected_cb=server._on_connected,
 33                                       disconnected_cb=server._on_disconnected,
 34                                       register_cb=server._on_register,
 35                                       query_cb=server._on_query,
 36                                       **kwargs)
 37
 38    return server
 39
 40
 41class EventerServer(aio.Resource):
 42    """Eventer server
 43
 44    For creating new server see `create_eventer_server` coroutine.
 45
 46    """
 47
 48    @property
 49    def async_group(self) -> aio.Group:
 50        """Async group"""
 51        return self._srv.async_group
 52
 53    def get_client_names(self) -> Iterable[tuple[common.Source, str]]:
 54        """Get client names"""
 55        for info in self._srv.get_conn_infos():
 56            yield _get_source(info.id), info.client_name
 57
 58    async def set_status(self,
 59                         status: common.Status,
 60                         engine: common.Engine | None):
 61        """Set status"""
 62        if status == common.Status.OPERATIONAL:
 63            if not engine:
 64                raise ValueError('invalid status/engine')
 65
 66        else:
 67            if engine:
 68                raise ValueError('invalid status/engine')
 69
 70        self._engine = engine
 71        await self._srv.set_status(status)
 72
 73    async def notify_events(self,
 74                            events: Collection[common.Event],
 75                            persisted: bool,
 76                            with_ack: bool = False):
 77        """Notify events"""
 78        await self._srv.notify_events(events, persisted, with_ack)
 79
 80    async def _on_connected(self, info):
 81        if (info.client_token is not None and
 82                info.client_token != self._server_token):
 83            raise Exception('invalid client token')
 84
 85        if not self._engine or not self._engine.is_open:
 86            return
 87
 88        source = _get_source(info.id)
 89        register_event = self._create_eventer_event(info, 'CONNECTED')
 90        await self._engine.register(source, [register_event])
 91
 92    async def _on_disconnected(self, info):
 93        if not self._engine or not self._engine.is_open:
 94            return
 95
 96        source = _get_source(info.id)
 97        register_event = self._create_eventer_event(info, 'DISCONNECTED')
 98        await self._engine.register(source, [register_event])
 99
100    async def _on_register(self, info, register_events):
101        if not self._engine:
102            return
103
104        source = _get_source(info.id)
105        return await self._engine.register(source, register_events)
106
107    async def _on_query(self, info, params):
108        return await self._backend.query(params)
109
110    def _create_eventer_event(self, info, status):
111        return common.RegisterEvent(
112            type=('event', str(self._server_id), 'eventer', info.client_name),
113            source_timestamp=None,
114            payload=common.EventPayloadJson(status))
115
116
117def _get_source(source_id):
118    return common.Source(type=common.SourceType.EVENTER,
119                         id=source_id)
mlog: logging.Logger = <Logger hat.event.server.eventer_server (WARNING)>

Module logger

async def create_eventer_server( addr: hat.drivers.tcp.Address, backend: hat.event.common.Backend, server_id: int, *, server_token: str | None = None, **kwargs) -> EventerServer:
18async def create_eventer_server(addr: tcp.Address,
19                                backend: common.Backend,
20                                server_id: int,
21                                *,
22                                server_token: str | None = None,
23                                **kwargs
24                                ) -> 'EventerServer':
25    """Create eventer server"""
26    server = EventerServer()
27    server._backend = backend
28    server._server_id = server_id
29    server._server_token = server_token
30    server._engine = None
31
32    server._srv = await eventer.listen(addr,
33                                       connected_cb=server._on_connected,
34                                       disconnected_cb=server._on_disconnected,
35                                       register_cb=server._on_register,
36                                       query_cb=server._on_query,
37                                       **kwargs)
38
39    return server

Create eventer server

class EventerServer(hat.aio.group.Resource):
 42class EventerServer(aio.Resource):
 43    """Eventer server
 44
 45    For creating new server see `create_eventer_server` coroutine.
 46
 47    """
 48
 49    @property
 50    def async_group(self) -> aio.Group:
 51        """Async group"""
 52        return self._srv.async_group
 53
 54    def get_client_names(self) -> Iterable[tuple[common.Source, str]]:
 55        """Get client names"""
 56        for info in self._srv.get_conn_infos():
 57            yield _get_source(info.id), info.client_name
 58
 59    async def set_status(self,
 60                         status: common.Status,
 61                         engine: common.Engine | None):
 62        """Set status"""
 63        if status == common.Status.OPERATIONAL:
 64            if not engine:
 65                raise ValueError('invalid status/engine')
 66
 67        else:
 68            if engine:
 69                raise ValueError('invalid status/engine')
 70
 71        self._engine = engine
 72        await self._srv.set_status(status)
 73
 74    async def notify_events(self,
 75                            events: Collection[common.Event],
 76                            persisted: bool,
 77                            with_ack: bool = False):
 78        """Notify events"""
 79        await self._srv.notify_events(events, persisted, with_ack)
 80
 81    async def _on_connected(self, info):
 82        if (info.client_token is not None and
 83                info.client_token != self._server_token):
 84            raise Exception('invalid client token')
 85
 86        if not self._engine or not self._engine.is_open:
 87            return
 88
 89        source = _get_source(info.id)
 90        register_event = self._create_eventer_event(info, 'CONNECTED')
 91        await self._engine.register(source, [register_event])
 92
 93    async def _on_disconnected(self, info):
 94        if not self._engine or not self._engine.is_open:
 95            return
 96
 97        source = _get_source(info.id)
 98        register_event = self._create_eventer_event(info, 'DISCONNECTED')
 99        await self._engine.register(source, [register_event])
100
101    async def _on_register(self, info, register_events):
102        if not self._engine:
103            return
104
105        source = _get_source(info.id)
106        return await self._engine.register(source, register_events)
107
108    async def _on_query(self, info, params):
109        return await self._backend.query(params)
110
111    def _create_eventer_event(self, info, status):
112        return common.RegisterEvent(
113            type=('event', str(self._server_id), 'eventer', info.client_name),
114            source_timestamp=None,
115            payload=common.EventPayloadJson(status))

Eventer server

For creating new server see create_eventer_server coroutine.

async_group: hat.aio.group.Group
49    @property
50    def async_group(self) -> aio.Group:
51        """Async group"""
52        return self._srv.async_group

Async group

def get_client_names(self) -> Iterable[tuple[hat.event.common.Source, str]]:
54    def get_client_names(self) -> Iterable[tuple[common.Source, str]]:
55        """Get client names"""
56        for info in self._srv.get_conn_infos():
57            yield _get_source(info.id), info.client_name

Get client names

async def set_status( self, status: hat.event.common.Status, engine: hat.event.common.Engine | None):
59    async def set_status(self,
60                         status: common.Status,
61                         engine: common.Engine | None):
62        """Set status"""
63        if status == common.Status.OPERATIONAL:
64            if not engine:
65                raise ValueError('invalid status/engine')
66
67        else:
68            if engine:
69                raise ValueError('invalid status/engine')
70
71        self._engine = engine
72        await self._srv.set_status(status)

Set status

async def notify_events( self, events: Collection[hat.event.common.Event], persisted: bool, with_ack: bool = False):
74    async def notify_events(self,
75                            events: Collection[common.Event],
76                            persisted: bool,
77                            with_ack: bool = False):
78        """Notify events"""
79        await self._srv.notify_events(events, persisted, with_ack)

Notify events