hat.event.server.syncer_server
Syncer server
1"""Syncer server""" 2 3import asyncio 4import contextlib 5import functools 6import itertools 7import logging 8import typing 9 10from hat import aio 11from hat import chatter 12from hat import json 13from hat import util 14from hat.event.server import common 15import hat.event.common.data 16 17 18mlog: logging.Logger = logging.getLogger(__name__) 19"""Module logger""" 20 21 22class ClientInfo(typing.NamedTuple): 23 """Client connection information""" 24 name: str 25 synced: bool 26 27 28StateCb = typing.Callable[[typing.List[ClientInfo]], None] 29"""Syncer state change callback""" 30 31 32async def create_syncer_server(conf: json.Data, 33 backend: common.Backend 34 ) -> 'SyncerServer': 35 """Create syncer server 36 37 Args: 38 conf: configuration defined by 39 ``hat-event://main.yaml#/definitions/syncer_server`` 40 backend: backend 41 42 """ 43 srv = SyncerServer() 44 srv._backend = backend 45 srv._state = {} 46 srv._next_client_ids = itertools.count(1) 47 srv._state_cbs = util.CallbackRegistry() 48 srv._clients = {} 49 50 srv._server = await chatter.listen(sbs_repo=common.sbs_repo, 51 address=conf['address'], 52 connection_cb=srv._on_connection, 53 bind_connections=False) 54 mlog.debug("listening on %s", conf['address']) 55 return srv 56 57 58class SyncerServer(aio.Resource): 59 60 @property 61 def async_group(self) -> aio.Group: 62 """Async group""" 63 return self._server.async_group 64 65 @property 66 def state(self) -> typing.Iterable[ClientInfo]: 67 """State of all active connections""" 68 return self._state.values() 69 70 def register_state_cb(self, 71 cb: StateCb 72 ) -> util.RegisterCallbackHandle: 73 """Register state change callback""" 74 return self._state_cbs.register(cb) 75 76 async def flush(self): 77 """Send flush requests and wait for flush responses""" 78 if not self.is_open: 79 await self.wait_closed() 80 return 81 82 await asyncio.wait([self.async_group.spawn(client.flush) 83 for client in self._clients.values()]) 84 85 def _on_connection(self, conn): 86 self.async_group.spawn(self._connection_loop, conn) 87 88 def _update_client_info(self, client_id, name, synced): 89 self._state[client_id] = ClientInfo(name, synced) 90 self._state_cbs.notify(list(self._state.values())) 91 92 def _remove_client_info(self, client_id): 93 if self._state.pop(client_id, None): 94 self._state_cbs.notify(list(self._state.values())) 95 96 async def _connection_loop(self, conn): 97 mlog.debug("starting new connection loop") 98 99 client_id = None 100 try: 101 mlog.debug("waiting for incomming message") 102 msg = await conn.receive() 103 msg_type = msg.data.module, msg.data.type 104 105 if msg_type != ('HatSyncer', 'MsgReq'): 106 raise Exception('unsupported message type') 107 108 mlog.debug("received request") 109 msg_req = hat.event.common.data.syncer_req_from_sbs(msg.data.data) 110 client_id = next(self._next_client_ids) 111 name = msg_req.client_name 112 last_event_id = msg_req.last_event_id 113 self._update_client_info(client_id, name, False) 114 115 mlog.debug("creating client") 116 synced_cb = functools.partial(self._update_client_info, client_id, 117 name, True) 118 client = _Client(backend=self._backend, 119 conn=conn, 120 last_event_id=last_event_id, 121 synced_cb=synced_cb) 122 123 self._clients[client_id] = client 124 try: 125 await client.wait_closing() 126 127 finally: 128 self._clients.pop(client_id) 129 await aio.uncancellable(client.async_close()) 130 131 except ConnectionError: 132 pass 133 134 except Exception as e: 135 mlog.error("connection loop error: %s", e, exc_info=e) 136 137 finally: 138 mlog.debug("closing client connection loop") 139 conn.close() 140 self._remove_client_info(client_id) 141 await aio.uncancellable(conn.async_close()) 142 143 144class _Client(aio.Resource): 145 146 def __init__(self, backend, conn, last_event_id, synced_cb): 147 self._backend = backend 148 self._conn = conn 149 self._last_event_id = last_event_id 150 self._async_group = aio.Group() 151 152 self._receiver = _Receiver(conn) 153 self.async_group.spawn(aio.call_on_done, self._receiver.wait_closing(), 154 self.close) 155 156 self._sender = _Sender(conn, last_event_id, synced_cb, self._receiver) 157 self.async_group.spawn(aio.call_on_done, self._sender.wait_closing(), 158 self.close) 159 160 self.async_group.spawn(self._client_loop) 161 self.async_group.spawn(aio.call_on_done, self._conn.wait_closing(), 162 self.close) 163 164 @property 165 def async_group(self): 166 return self._async_group 167 168 async def flush(self): 169 try: 170 await self._sender.send_flush() 171 172 except Exception: 173 await self._sender.wait_closed() 174 175 async def _client_loop(self): 176 mlog.debug("starting new client loop") 177 events_queue = aio.Queue() 178 is_synced = False 179 180 async def cleanup(): 181 self.close() 182 183 if is_synced: 184 with contextlib.suppress(ConnectionError): 185 while not events_queue.empty(): 186 self._sender.send_events(events_queue.get_nowait()) 187 188 await self._sender.async_close() 189 await self._receiver.async_close() 190 191 try: 192 with self._backend.register_flushed_events_cb( 193 events_queue.put_nowait): 194 mlog.debug("query backend") 195 async for events in self._backend.query_flushed( 196 self._last_event_id): 197 self._sender.send_events(events) 198 199 mlog.debug("sending synced") 200 self._sender.send_synced() 201 is_synced = True 202 203 while True: 204 events = await events_queue.get() 205 self._sender.send_events(events) 206 207 except ConnectionError: 208 pass 209 210 except Exception as e: 211 mlog.error("client loop error: %s", e, exc_info=e) 212 213 finally: 214 mlog.debug("stopping client loop") 215 await aio.uncancellable(cleanup()) 216 217 218class _Sender(aio.Resource): 219 220 def __init__(self, conn, last_event_id, synced_cb, receiver): 221 self._conn = conn 222 self._last_event_id = last_event_id 223 self._synced_cb = synced_cb 224 self._receiver = receiver 225 self._send_queue = aio.Queue() 226 self._async_group = aio.Group() 227 228 self.async_group.spawn(self._send_loop) 229 230 @property 231 def async_group(self): 232 return self._async_group 233 234 def send_events(self, events): 235 try: 236 self._send_queue.put_nowait(('events', events)) 237 238 except aio.QueueClosedError: 239 raise ConnectionError() 240 241 def send_synced(self): 242 try: 243 self._send_queue.put_nowait(('synced', None)) 244 245 except aio.QueueClosedError: 246 raise ConnectionError() 247 248 async def send_flush(self): 249 try: 250 future = asyncio.Future() 251 self._send_queue.put_nowait(('flush', future)) 252 await future 253 254 except aio.QueueClosedError: 255 raise ConnectionError() 256 257 async def _send_loop(self): 258 is_synced = False 259 260 async def cleanup(): 261 self.close() 262 self._send_queue.close() 263 264 while not self._send_queue.empty(): 265 msg_type, msg_data = self._send_queue.get_nowait() 266 267 with contextlib.suppress(Exception): 268 if msg_type == 'events' and is_synced: 269 self._send_events(msg_data) 270 271 elif msg_type == 'flush': 272 self._send_flush(msg_data) 273 274 if is_synced: 275 with contextlib.suppress(Exception): 276 future = asyncio.Future() 277 self._send_flush(future) 278 await future 279 280 with contextlib.suppress(Exception): 281 await self._conn.drain() 282 283 try: 284 while True: 285 msg_type, msg_data = await self._send_queue.get() 286 287 if msg_type == 'events': 288 self._send_events(msg_data) 289 290 elif msg_type == 'synced': 291 self._send_synced() 292 is_synced = True 293 self._synced_cb() 294 295 elif msg_type == 'flush': 296 self._send_flush(msg_data) 297 298 else: 299 raise ValueError('unsupported message') 300 301 except ConnectionError: 302 pass 303 304 except Exception as e: 305 mlog.error("send loop error: %s", e, exc_info=e) 306 307 finally: 308 await aio.uncancellable(cleanup()) 309 310 def _send_events(self, events): 311 if not events: 312 return 313 314 if events[0].event_id.server != self._last_event_id.server: 315 return 316 317 if events[0].event_id.session < self._last_event_id.session: 318 return 319 320 if events[0].event_id.session == self._last_event_id.session: 321 events = [event for event in events 322 if event.event_id > self._last_event_id] 323 if not events: 324 return 325 326 mlog.debug("sending events") 327 data = chatter.Data(module='HatSyncer', 328 type='MsgEvents', 329 data=[common.event_to_sbs(e) 330 for e in events]) 331 self._conn.send(data) 332 self._last_event_id = events[-1].event_id 333 334 def _send_synced(self): 335 self._conn.send(chatter.Data(module='HatSyncer', 336 type='MsgSynced', 337 data=None)) 338 339 def _send_flush(self, future): 340 try: 341 conv = self._conn.send(chatter.Data(module='HatSyncer', 342 type='MsgFlushReq', 343 data=None), 344 last=False) 345 self._receiver.add_flush_future(conv, future) 346 347 except Exception: 348 future.set_result(None) 349 raise 350 351 352class _Receiver(aio.Resource): 353 354 def __init__(self, conn): 355 self._conn = conn 356 self._flush_futures = {} 357 self._async_group = aio.Group() 358 359 self.async_group.spawn(self._receive_loop) 360 361 @property 362 def async_group(self): 363 return self._async_group 364 365 def add_flush_future(self, conv, future): 366 if not self.is_open: 367 raise ConnectionError() 368 369 self._flush_futures[conv] = future 370 371 async def _receive_loop(self): 372 try: 373 while True: 374 msg = await self._conn.receive() 375 msg_type = msg.data.module, msg.data.type 376 377 if msg_type != ('HatSyncer', 'MsgFlushRes'): 378 raise Exception("unsupported message type") 379 380 flush_future = self._flush_futures.pop(msg.conv, None) 381 if flush_future and not flush_future.done(): 382 flush_future.set_result(None) 383 384 except ConnectionError: 385 pass 386 387 except Exception as e: 388 mlog.error("receive loop error: %s", e, exc_info=e) 389 390 finally: 391 self.close() 392 393 for flush_future in self._flush_futures.values(): 394 if not flush_future.done(): 395 flush_future.set_result(None)
Module logger
class
ClientInfo(typing.NamedTuple):
23class ClientInfo(typing.NamedTuple): 24 """Client connection information""" 25 name: str 26 synced: bool
Client connection information
Inherited Members
- builtins.tuple
- index
- count
StateCb = typing.Callable[[typing.List[hat.event.server.syncer_server.ClientInfo]], NoneType]
Syncer state change callback
async def
create_syncer_server( conf: ~Data, backend: hat.event.server.common.Backend) -> hat.event.server.syncer_server.SyncerServer:
33async def create_syncer_server(conf: json.Data, 34 backend: common.Backend 35 ) -> 'SyncerServer': 36 """Create syncer server 37 38 Args: 39 conf: configuration defined by 40 ``hat-event://main.yaml#/definitions/syncer_server`` 41 backend: backend 42 43 """ 44 srv = SyncerServer() 45 srv._backend = backend 46 srv._state = {} 47 srv._next_client_ids = itertools.count(1) 48 srv._state_cbs = util.CallbackRegistry() 49 srv._clients = {} 50 51 srv._server = await chatter.listen(sbs_repo=common.sbs_repo, 52 address=conf['address'], 53 connection_cb=srv._on_connection, 54 bind_connections=False) 55 mlog.debug("listening on %s", conf['address']) 56 return srv
Create syncer server
Arguments:
- conf: configuration defined by
hat-event://main.yaml#/definitions/syncer_server
- backend: backend
class
SyncerServer(hat.aio.Resource):
59class SyncerServer(aio.Resource): 60 61 @property 62 def async_group(self) -> aio.Group: 63 """Async group""" 64 return self._server.async_group 65 66 @property 67 def state(self) -> typing.Iterable[ClientInfo]: 68 """State of all active connections""" 69 return self._state.values() 70 71 def register_state_cb(self, 72 cb: StateCb 73 ) -> util.RegisterCallbackHandle: 74 """Register state change callback""" 75 return self._state_cbs.register(cb) 76 77 async def flush(self): 78 """Send flush requests and wait for flush responses""" 79 if not self.is_open: 80 await self.wait_closed() 81 return 82 83 await asyncio.wait([self.async_group.spawn(client.flush) 84 for client in self._clients.values()]) 85 86 def _on_connection(self, conn): 87 self.async_group.spawn(self._connection_loop, conn) 88 89 def _update_client_info(self, client_id, name, synced): 90 self._state[client_id] = ClientInfo(name, synced) 91 self._state_cbs.notify(list(self._state.values())) 92 93 def _remove_client_info(self, client_id): 94 if self._state.pop(client_id, None): 95 self._state_cbs.notify(list(self._state.values())) 96 97 async def _connection_loop(self, conn): 98 mlog.debug("starting new connection loop") 99 100 client_id = None 101 try: 102 mlog.debug("waiting for incomming message") 103 msg = await conn.receive() 104 msg_type = msg.data.module, msg.data.type 105 106 if msg_type != ('HatSyncer', 'MsgReq'): 107 raise Exception('unsupported message type') 108 109 mlog.debug("received request") 110 msg_req = hat.event.common.data.syncer_req_from_sbs(msg.data.data) 111 client_id = next(self._next_client_ids) 112 name = msg_req.client_name 113 last_event_id = msg_req.last_event_id 114 self._update_client_info(client_id, name, False) 115 116 mlog.debug("creating client") 117 synced_cb = functools.partial(self._update_client_info, client_id, 118 name, True) 119 client = _Client(backend=self._backend, 120 conn=conn, 121 last_event_id=last_event_id, 122 synced_cb=synced_cb) 123 124 self._clients[client_id] = client 125 try: 126 await client.wait_closing() 127 128 finally: 129 self._clients.pop(client_id) 130 await aio.uncancellable(client.async_close()) 131 132 except ConnectionError: 133 pass 134 135 except Exception as e: 136 mlog.error("connection loop error: %s", e, exc_info=e) 137 138 finally: 139 mlog.debug("closing client connection loop") 140 conn.close() 141 self._remove_client_info(client_id) 142 await aio.uncancellable(conn.async_close())
Resource with lifetime control based on Group
.
def
register_state_cb( self, cb: Callable[[List[hat.event.server.syncer_server.ClientInfo]], NoneType]) -> hat.util.RegisterCallbackHandle:
71 def register_state_cb(self, 72 cb: StateCb 73 ) -> util.RegisterCallbackHandle: 74 """Register state change callback""" 75 return self._state_cbs.register(cb)
Register state change callback
async def
flush(self):
77 async def flush(self): 78 """Send flush requests and wait for flush responses""" 79 if not self.is_open: 80 await self.wait_closed() 81 return 82 83 await asyncio.wait([self.async_group.spawn(client.flush) 84 for client in self._clients.values()])
Send flush requests and wait for flush responses
Inherited Members
- hat.aio.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close