hat.event.server.eventer_server

Eventer server

  1"""Eventer server"""
  2
  3from collections.abc import Collection
  4import logging
  5
  6from hat import aio
  7from hat.drivers import tcp
  8
  9from hat.event import common
 10from hat.event import eventer
 11
 12
 13mlog: logging.Logger = logging.getLogger(__name__)
 14"""Module logger"""
 15
 16
 17async def create_eventer_server(addr: tcp.Address,
 18                                backend: common.Backend,
 19                                server_id: int,
 20                                *,
 21                                server_token: str | None = None,
 22                                **kwargs
 23                                ) -> 'EventerServer':
 24    """Create eventer server"""
 25    server = EventerServer()
 26    server._backend = backend
 27    server._server_id = server_id
 28    server._server_token = server_token
 29    server._engine = None
 30
 31    server._srv = await eventer.listen(addr,
 32                                       connected_cb=server._on_connected,
 33                                       disconnected_cb=server._on_disconnected,
 34                                       register_cb=server._on_register,
 35                                       query_cb=server._on_query,
 36                                       **kwargs)
 37
 38    return server
 39
 40
 41class EventerServer(aio.Resource):
 42    """Eventer server
 43
 44    For creating new server see `create_eventer_server` coroutine.
 45
 46    """
 47
 48    @property
 49    def async_group(self) -> aio.Group:
 50        """Async group"""
 51        return self._srv.async_group
 52
 53    async def set_engine(self, engine: common.Engine | None):
 54        """Set engine"""
 55        self._engine = engine
 56
 57        status = common.Status.OPERATIONAL if engine else common.Status.STANDBY
 58        await self._srv.set_status(status)
 59
 60    async def notify_events(self,
 61                            events: Collection[common.Event],
 62                            persisted: bool):
 63        """Notify events"""
 64        await self._srv.notify_events(events, persisted)
 65
 66    async def _on_connected(self, info):
 67        if (info.client_token is not None and
 68                info.client_token != self._server_token):
 69            raise Exception('invalid client token')
 70
 71        if not self._engine or not self._engine.is_open:
 72            return
 73
 74        source = _get_source(info.id)
 75        register_event = self._create_eventer_event(info, 'CONNECTED')
 76        await self._engine.register(source, [register_event])
 77
 78    async def _on_disconnected(self, info):
 79        if not self._engine or not self._engine.is_open:
 80            return
 81
 82        source = _get_source(info.id)
 83        register_event = self._create_eventer_event(info, 'DISCONNECTED')
 84        await self._engine.register(source, [register_event])
 85
 86    async def _on_register(self, info, register_events):
 87        if not self._engine:
 88            return
 89
 90        source = _get_source(info.id)
 91        return await self._engine.register(source, register_events)
 92
 93    async def _on_query(self, info, params):
 94        return await self._backend.query(params)
 95
 96    def _create_eventer_event(self, info, status):
 97        return common.RegisterEvent(
 98            type=('event', str(self._server_id), 'eventer', info.client_name),
 99            source_timestamp=None,
100            payload=common.EventPayloadJson(status))
101
102
103def _get_source(source_id):
104    return common.Source(type=common.SourceType.EVENTER,
105                         id=source_id)
mlog: logging.Logger = <Logger hat.event.server.eventer_server (WARNING)>

Module logger

async def create_eventer_server( addr: hat.drivers.tcp.Address, backend: hat.event.common.backend.Backend, server_id: int, *, server_token: str | None = None, **kwargs) -> EventerServer:
18async def create_eventer_server(addr: tcp.Address,
19                                backend: common.Backend,
20                                server_id: int,
21                                *,
22                                server_token: str | None = None,
23                                **kwargs
24                                ) -> 'EventerServer':
25    """Create eventer server"""
26    server = EventerServer()
27    server._backend = backend
28    server._server_id = server_id
29    server._server_token = server_token
30    server._engine = None
31
32    server._srv = await eventer.listen(addr,
33                                       connected_cb=server._on_connected,
34                                       disconnected_cb=server._on_disconnected,
35                                       register_cb=server._on_register,
36                                       query_cb=server._on_query,
37                                       **kwargs)
38
39    return server

Create eventer server

class EventerServer(hat.aio.group.Resource):
 42class EventerServer(aio.Resource):
 43    """Eventer server
 44
 45    For creating new server see `create_eventer_server` coroutine.
 46
 47    """
 48
 49    @property
 50    def async_group(self) -> aio.Group:
 51        """Async group"""
 52        return self._srv.async_group
 53
 54    async def set_engine(self, engine: common.Engine | None):
 55        """Set engine"""
 56        self._engine = engine
 57
 58        status = common.Status.OPERATIONAL if engine else common.Status.STANDBY
 59        await self._srv.set_status(status)
 60
 61    async def notify_events(self,
 62                            events: Collection[common.Event],
 63                            persisted: bool):
 64        """Notify events"""
 65        await self._srv.notify_events(events, persisted)
 66
 67    async def _on_connected(self, info):
 68        if (info.client_token is not None and
 69                info.client_token != self._server_token):
 70            raise Exception('invalid client token')
 71
 72        if not self._engine or not self._engine.is_open:
 73            return
 74
 75        source = _get_source(info.id)
 76        register_event = self._create_eventer_event(info, 'CONNECTED')
 77        await self._engine.register(source, [register_event])
 78
 79    async def _on_disconnected(self, info):
 80        if not self._engine or not self._engine.is_open:
 81            return
 82
 83        source = _get_source(info.id)
 84        register_event = self._create_eventer_event(info, 'DISCONNECTED')
 85        await self._engine.register(source, [register_event])
 86
 87    async def _on_register(self, info, register_events):
 88        if not self._engine:
 89            return
 90
 91        source = _get_source(info.id)
 92        return await self._engine.register(source, register_events)
 93
 94    async def _on_query(self, info, params):
 95        return await self._backend.query(params)
 96
 97    def _create_eventer_event(self, info, status):
 98        return common.RegisterEvent(
 99            type=('event', str(self._server_id), 'eventer', info.client_name),
100            source_timestamp=None,
101            payload=common.EventPayloadJson(status))

Eventer server

For creating new server see create_eventer_server coroutine.

async_group: hat.aio.group.Group
49    @property
50    def async_group(self) -> aio.Group:
51        """Async group"""
52        return self._srv.async_group

Async group

async def set_engine(self, engine: hat.event.common.module.Engine | None):
54    async def set_engine(self, engine: common.Engine | None):
55        """Set engine"""
56        self._engine = engine
57
58        status = common.Status.OPERATIONAL if engine else common.Status.STANDBY
59        await self._srv.set_status(status)

Set engine

async def notify_events( self, events: collections.abc.Collection[hat.event.common.common.Event], persisted: bool):
61    async def notify_events(self,
62                            events: Collection[common.Event],
63                            persisted: bool):
64        """Notify events"""
65        await self._srv.notify_events(events, persisted)

Notify events

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close