hat.event.server.eventer_server
Eventer server
1"""Eventer server""" 2 3import contextlib 4import itertools 5import logging 6 7from hat import aio 8from hat import chatter 9from hat import json 10from hat.event.server import common 11import hat.event.server.engine 12 13 14mlog: logging.Logger = logging.getLogger(__name__) 15"""Module logger""" 16 17 18async def create_eventer_server(conf: json.Data, 19 engine: hat.event.server.engine.Engine 20 ) -> 'EventerServer': 21 """Create eventer server 22 23 Args: 24 conf: configuration defined by 25 ``hat-event://main.yaml#/definitions/eventer_server`` 26 engine: engine 27 28 """ 29 comm = EventerServer() 30 comm._engine = engine 31 comm._next_source_id = itertools.count(1) 32 33 comm._server = await chatter.listen(sbs_repo=common.sbs_repo, 34 address=conf['address'], 35 connection_cb=comm._on_connection) 36 mlog.debug("listening on %s", conf['address']) 37 38 return comm 39 40 41class EventerServer(aio.Resource): 42 43 @property 44 def async_group(self) -> aio.Group: 45 """Async group""" 46 return self._server.async_group 47 48 def _on_connection(self, conn): 49 source_id = next(self._next_source_id) 50 _Connection(conn, self._engine, source_id) 51 52 53class _Connection(aio.Resource): 54 55 def __init__(self, conn, engine, source_id): 56 self._conn = conn 57 self._engine = engine 58 self._subscription = None 59 self._source = common.Source(type=common.SourceType.EVENTER, 60 id=source_id) 61 62 self.async_group.spawn(self._connection_loop) 63 64 @property 65 def async_group(self): 66 return self._conn.async_group 67 68 def _on_events(self, events): 69 if not self._subscription: 70 return 71 events = [e for e in events 72 if self._subscription.matches(e.event_type)] 73 if not events: 74 return 75 76 data = chatter.Data('HatEventer', 'MsgNotify', 77 [common.event_to_sbs(e) for e in events]) 78 with contextlib.suppress(ConnectionError): 79 self._conn.send(data) 80 81 async def _connection_loop(self): 82 mlog.debug("starting new client connection loop") 83 try: 84 with self._engine.register_events_cb(self._on_events): 85 await self._register_eventer_event('CONNECTED') 86 87 while True: 88 mlog.debug("waiting for incomming messages") 89 msg = await self._conn.receive() 90 msg_type = msg.data.module, msg.data.type 91 92 if msg_type == ('HatEventer', 'MsgSubscribe'): 93 mlog.debug("received subscribe message") 94 await self._process_msg_subscribe(msg) 95 96 elif msg_type == ('HatEventer', 'MsgRegisterReq'): 97 mlog.debug("received register request") 98 await self._process_msg_register(msg) 99 100 elif msg_type == ('HatEventer', 'MsgQueryReq'): 101 mlog.debug("received query request") 102 await self._process_msg_query(msg) 103 104 else: 105 raise Exception('unsupported message type') 106 107 except ConnectionError: 108 pass 109 110 except Exception as e: 111 mlog.error("connection loop error: %s", e, exc_info=e) 112 113 finally: 114 mlog.debug("closing client connection loop") 115 self.close() 116 with contextlib.suppress(Exception): 117 await self._register_eventer_event('DISCONNECTED') 118 119 async def _process_msg_subscribe(self, msg): 120 self._subscription = common.Subscription([tuple(i) 121 for i in msg.data.data]) 122 123 async def _process_msg_register(self, msg): 124 register_events = [common.register_event_from_sbs(i) 125 for i in msg.data.data] 126 events = await self._engine.register(self._source, register_events) 127 if msg.last: 128 return 129 130 data = chatter.Data(module='HatEventer', 131 type='MsgRegisterRes', 132 data=[(('event', common.event_to_sbs(e)) 133 if e is not None else ('failure', None)) 134 for e in events]) 135 self._conn.send(data, conv=msg.conv) 136 137 async def _process_msg_query(self, msg): 138 query_data = common.query_from_sbs(msg.data.data) 139 events = await self._engine.query(query_data) 140 data = chatter.Data(module='HatEventer', 141 type='MsgQueryRes', 142 data=[common.event_to_sbs(e) for e in events]) 143 self._conn.send(data, conv=msg.conv) 144 145 async def _register_eventer_event(self, status): 146 register_event = common.RegisterEvent( 147 event_type=('event', 'eventer'), 148 source_timestamp=None, 149 payload=common.EventPayload(type=common.EventPayloadType.JSON, 150 data=status)) 151 await self._engine.register(self._source, [register_event])
Module logger
async def
create_eventer_server( conf: ~Data, engine: hat.event.server.engine.Engine) -> hat.event.server.eventer_server.EventerServer:
19async def create_eventer_server(conf: json.Data, 20 engine: hat.event.server.engine.Engine 21 ) -> 'EventerServer': 22 """Create eventer server 23 24 Args: 25 conf: configuration defined by 26 ``hat-event://main.yaml#/definitions/eventer_server`` 27 engine: engine 28 29 """ 30 comm = EventerServer() 31 comm._engine = engine 32 comm._next_source_id = itertools.count(1) 33 34 comm._server = await chatter.listen(sbs_repo=common.sbs_repo, 35 address=conf['address'], 36 connection_cb=comm._on_connection) 37 mlog.debug("listening on %s", conf['address']) 38 39 return comm
Create eventer server
Arguments:
- conf: configuration defined by
hat-event://main.yaml#/definitions/eventer_server
- engine: engine
class
EventerServer(hat.aio.Resource):
42class EventerServer(aio.Resource): 43 44 @property 45 def async_group(self) -> aio.Group: 46 """Async group""" 47 return self._server.async_group 48 49 def _on_connection(self, conn): 50 source_id = next(self._next_source_id) 51 _Connection(conn, self._engine, source_id)
Resource with lifetime control based on Group
.
Inherited Members
- hat.aio.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close