hat.event.server.eventer_server

Eventer server

  1"""Eventer server"""
  2
  3import contextlib
  4import itertools
  5import logging
  6
  7from hat import aio
  8from hat import chatter
  9from hat import json
 10from hat.event.server import common
 11import hat.event.server.engine
 12
 13
 14mlog: logging.Logger = logging.getLogger(__name__)
 15"""Module logger"""
 16
 17
 18async def create_eventer_server(conf: json.Data,
 19                                engine: hat.event.server.engine.Engine
 20                                ) -> 'EventerServer':
 21    """Create eventer server
 22
 23    Args:
 24        conf: configuration defined by
 25            ``hat-event://main.yaml#/definitions/eventer_server``
 26        engine: engine
 27
 28    """
 29    comm = EventerServer()
 30    comm._engine = engine
 31    comm._next_source_id = itertools.count(1)
 32
 33    comm._server = await chatter.listen(sbs_repo=common.sbs_repo,
 34                                        address=conf['address'],
 35                                        connection_cb=comm._on_connection)
 36    mlog.debug("listening on %s", conf['address'])
 37
 38    return comm
 39
 40
 41class EventerServer(aio.Resource):
 42
 43    @property
 44    def async_group(self) -> aio.Group:
 45        """Async group"""
 46        return self._server.async_group
 47
 48    def _on_connection(self, conn):
 49        source_id = next(self._next_source_id)
 50        _Connection(conn, self._engine, source_id)
 51
 52
 53class _Connection(aio.Resource):
 54
 55    def __init__(self, conn, engine, source_id):
 56        self._conn = conn
 57        self._engine = engine
 58        self._subscription = None
 59        self._source = common.Source(type=common.SourceType.EVENTER,
 60                                     id=source_id)
 61
 62        self.async_group.spawn(self._connection_loop)
 63
 64    @property
 65    def async_group(self):
 66        return self._conn.async_group
 67
 68    def _on_events(self, events):
 69        if not self._subscription:
 70            return
 71        events = [e for e in events
 72                  if self._subscription.matches(e.event_type)]
 73        if not events:
 74            return
 75
 76        data = chatter.Data('HatEventer', 'MsgNotify',
 77                            [common.event_to_sbs(e) for e in events])
 78        with contextlib.suppress(ConnectionError):
 79            self._conn.send(data)
 80
 81    async def _connection_loop(self):
 82        mlog.debug("starting new client connection loop")
 83        try:
 84            with self._engine.register_events_cb(self._on_events):
 85                await self._register_eventer_event('CONNECTED')
 86
 87                while True:
 88                    mlog.debug("waiting for incomming messages")
 89                    msg = await self._conn.receive()
 90                    msg_type = msg.data.module, msg.data.type
 91
 92                    if msg_type == ('HatEventer', 'MsgSubscribe'):
 93                        mlog.debug("received subscribe message")
 94                        await self._process_msg_subscribe(msg)
 95
 96                    elif msg_type == ('HatEventer', 'MsgRegisterReq'):
 97                        mlog.debug("received register request")
 98                        await self._process_msg_register(msg)
 99
100                    elif msg_type == ('HatEventer', 'MsgQueryReq'):
101                        mlog.debug("received query request")
102                        await self._process_msg_query(msg)
103
104                    else:
105                        raise Exception('unsupported message type')
106
107        except ConnectionError:
108            pass
109
110        except Exception as e:
111            mlog.error("connection loop error: %s", e, exc_info=e)
112
113        finally:
114            mlog.debug("closing client connection loop")
115            self.close()
116            with contextlib.suppress(Exception):
117                await self._register_eventer_event('DISCONNECTED')
118
119    async def _process_msg_subscribe(self, msg):
120        self._subscription = common.Subscription([tuple(i)
121                                                  for i in msg.data.data])
122
123    async def _process_msg_register(self, msg):
124        register_events = [common.register_event_from_sbs(i)
125                           for i in msg.data.data]
126        events = await self._engine.register(self._source, register_events)
127        if msg.last:
128            return
129
130        data = chatter.Data(module='HatEventer',
131                            type='MsgRegisterRes',
132                            data=[(('event', common.event_to_sbs(e))
133                                   if e is not None else ('failure', None))
134                                  for e in events])
135        self._conn.send(data, conv=msg.conv)
136
137    async def _process_msg_query(self, msg):
138        query_data = common.query_from_sbs(msg.data.data)
139        events = await self._engine.query(query_data)
140        data = chatter.Data(module='HatEventer',
141                            type='MsgQueryRes',
142                            data=[common.event_to_sbs(e) for e in events])
143        self._conn.send(data, conv=msg.conv)
144
145    async def _register_eventer_event(self, status):
146        register_event = common.RegisterEvent(
147            event_type=('event', 'eventer'),
148            source_timestamp=None,
149            payload=common.EventPayload(type=common.EventPayloadType.JSON,
150                                        data=status))
151        await self._engine.register(self._source, [register_event])
mlog: logging.Logger = <Logger hat.event.server.eventer_server (WARNING)>

Module logger

async def create_eventer_server( conf: ~Data, engine: hat.event.server.engine.Engine) -> hat.event.server.eventer_server.EventerServer:
19async def create_eventer_server(conf: json.Data,
20                                engine: hat.event.server.engine.Engine
21                                ) -> 'EventerServer':
22    """Create eventer server
23
24    Args:
25        conf: configuration defined by
26            ``hat-event://main.yaml#/definitions/eventer_server``
27        engine: engine
28
29    """
30    comm = EventerServer()
31    comm._engine = engine
32    comm._next_source_id = itertools.count(1)
33
34    comm._server = await chatter.listen(sbs_repo=common.sbs_repo,
35                                        address=conf['address'],
36                                        connection_cb=comm._on_connection)
37    mlog.debug("listening on %s", conf['address'])
38
39    return comm

Create eventer server

Arguments:
  • conf: configuration defined by hat-event://main.yaml#/definitions/eventer_server
  • engine: engine
class EventerServer(hat.aio.Resource):
42class EventerServer(aio.Resource):
43
44    @property
45    def async_group(self) -> aio.Group:
46        """Async group"""
47        return self._server.async_group
48
49    def _on_connection(self, conn):
50        source_id = next(self._next_source_id)
51        _Connection(conn, self._engine, source_id)

Resource with lifetime control based on Group.

EventerServer()
async_group: hat.aio.Group

Async group

Inherited Members
hat.aio.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close