hat.event.syncer
1from hat.event.syncer.client import (SyncedCb, 2 EventsCb, 3 SyncerInitError, 4 connect, 5 Client) 6from hat.event.syncer.server import (ClientInfo, 7 StateCb, 8 QueryCb, 9 listen, 10 Server) 11 12 13__all__ = ['SyncedCb', 14 'EventsCb', 15 'SyncerInitError', 16 'connect', 17 'Client', 18 'ClientInfo', 19 'StateCb', 20 'QueryCb', 21 'listen', 22 'Server']
SyncedCb =
typing.Callable[[], typing.Optional[typing.Awaitable[NoneType]]]
EventsCb =
typing.Callable[[list[hat.event.common.data.Event]], typing.Optional[typing.Awaitable[NoneType]]]
class
SyncerInitError(builtins.Exception):
Syncer initialization error
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
async def
connect( address: str, client_name: str, last_event_id: hat.event.common.data.EventId, synced_cb: Optional[Callable[[], Optional[Awaitable[NoneType]]]] = None, events_cb: Optional[Callable[[list[hat.event.common.data.Event]], Optional[Awaitable[NoneType]]]] = None, client_token: str | None = None, subscriptions: list[typing.Tuple[str, ...]] = [('*',)]) -> Client:
25async def connect(address: str, 26 client_name: str, 27 last_event_id: common.EventId, 28 synced_cb: SyncedCb | None = None, 29 events_cb: EventsCb | None = None, 30 client_token: str | None = None, 31 subscriptions: list[common.EventType] = [('*',)] 32 ) -> 'Client': 33 """Connect to remote syncer server""" 34 client = Client() 35 client._synced_cb = synced_cb 36 client._events_cb = events_cb 37 38 client._conn = await chatter.connect(common.sbs_repo, address) 39 mlog.debug("connected to %s", address) 40 41 try: 42 req = common.SyncerInitReq(last_event_id=last_event_id, 43 client_name=client_name, 44 client_token=client_token, 45 subscriptions=subscriptions) 46 47 mlog.debug("sending %s", req) 48 req_msg_data = chatter.Data(module='HatSyncer', 49 type='MsgInitReq', 50 data=common.syncer_init_req_to_sbs(req)) 51 client._conn.send(req_msg_data, last=False) 52 53 res_msg = await client._conn.receive() 54 res_msg_type = res_msg.data.module, res_msg.data.type 55 56 if res_msg_type != ('HatSyncer', 'MsgInitRes'): 57 raise Exception('unsupported message type') 58 59 mlog.debug("received init response") 60 res = common.syncer_init_res_from_sbs(res_msg.data.data) 61 62 if res is not None: 63 raise SyncerInitError(res) 64 65 client.async_group.spawn(client._receive_loop) 66 67 except BaseException: 68 await aio.uncancellable(client.async_close()) 69 raise 70 71 return client
Connect to remote syncer server
class
Client(hat.aio.group.Resource):
74class Client(aio.Resource): 75 """Syncer client""" 76 77 @property 78 def async_group(self) -> aio.Group: 79 """Async group""" 80 return self._conn.async_group 81 82 async def _receive_loop(self): 83 mlog.debug("staring receive loop") 84 try: 85 while True: 86 mlog.debug("waiting for incoming message") 87 msg = await self._conn.receive() 88 msg_type = msg.data.module, msg.data.type 89 90 if msg_type == ('HatSyncer', 'MsgEvents'): 91 mlog.debug("received events") 92 events = [common.event_from_sbs(i) 93 for i in msg.data.data] 94 95 if self._events_cb: 96 await aio.call(self._events_cb, events) 97 98 elif msg_type == ('HatSyncer', 'MsgSynced'): 99 mlog.debug("received synced") 100 101 if self._synced_cb: 102 await aio.call(self._synced_cb) 103 104 elif msg_type == ('HatSyncer', 'MsgFlushReq'): 105 mlog.debug("received flush request") 106 self._conn.send(chatter.Data(module='HatSyncer', 107 type='MsgFlushRes', 108 data=None), 109 conv=msg.conv) 110 111 else: 112 raise Exception("unsupported message type") 113 114 except ConnectionError: 115 pass 116 117 except Exception as e: 118 mlog.error("receive loop error: %s", e, exc_info=e) 119 120 finally: 121 mlog.debug("stopping receive loop") 122 self.close()
Syncer client
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close
class
ClientInfo(typing.NamedTuple):
20class ClientInfo(typing.NamedTuple): 21 """Client connection information""" 22 name: str 23 synced: bool
Client connection information
Inherited Members
- builtins.tuple
- index
- count
StateCb =
typing.Callable[[list[ClientInfo]], NoneType]
QueryCb =
typing.Callable[[hat.event.common.data.EventId], typing.AsyncIterable[list[hat.event.common.data.Event]]]
async def
listen( address: str, query_cb: Optional[Callable[[hat.event.common.data.EventId], AsyncIterable[list[hat.event.common.data.Event]]]] = None, subscriptions: list[typing.Tuple[str, ...]] = [('*',)], token: str | None = None) -> Server:
34async def listen(address: str, 35 query_cb: QueryCb | None = None, 36 subscriptions: list[common.EventType] = [('*',)], 37 token: str | None = None 38 ) -> 'Server': 39 """Create listening syncer server""" 40 server = Server() 41 server._query_cb = query_cb 42 server._subscription = common.Subscription(subscriptions) 43 server._token = token 44 server._state = {} 45 server._next_client_ids = itertools.count(1) 46 server._state_cbs = util.CallbackRegistry() 47 server._notify_cbs = util.CallbackRegistry() 48 server._clients = {} 49 50 server._server = await chatter.listen(sbs_repo=common.sbs_repo, 51 address=address, 52 connection_cb=server._on_connection, 53 bind_connections=False) 54 55 mlog.debug("listening on %s", address) 56 return server
Create listening syncer server
class
Server(hat.aio.group.Resource):
59class Server(aio.Resource): 60 """Syncer server""" 61 62 @property 63 def async_group(self): 64 """Async group""" 65 return self._server.async_group 66 67 @property 68 def state(self) -> list[ClientInfo]: 69 """State of all active connections""" 70 return list(self._state.values()) 71 72 def register_state_cb(self, 73 cb: StateCb 74 ) -> util.RegisterCallbackHandle: 75 """Register state change callback""" 76 return self._state_cbs.register(cb) 77 78 def notify(self, events: list[common.Event]): 79 """Notify clients of new events""" 80 self._notify_cbs.notify(events) 81 82 async def flush(self): 83 """Send flush requests and wait for flush responses""" 84 if not self.is_open: 85 await self.wait_closed() 86 return 87 88 if not self._clients: 89 return 90 91 await asyncio.wait([self.async_group.spawn(client.flush) 92 for client in self._clients.values()]) 93 94 def _on_connection(self, conn): 95 self.async_group.spawn(self._connection_loop, conn) 96 97 def _update_client_info(self, client_id, client_info): 98 self._state[client_id] = client_info 99 self._state_cbs.notify(list(self._state.values())) 100 101 def _remove_client_info(self, client_id): 102 if self._state.pop(client_id, None): 103 self._state_cbs.notify(list(self._state.values())) 104 105 async def _connection_loop(self, conn): 106 mlog.debug("starting new connection loop") 107 108 client_id = None 109 try: 110 mlog.debug("waiting for incomming message") 111 req_msg = await conn.receive() 112 req_msg_type = req_msg.data.module, req_msg.data.type 113 114 if req_msg_type != ('HatSyncer', 'MsgInitReq'): 115 raise Exception('unsupported message type') 116 117 mlog.debug("received init request") 118 req = common.syncer_init_req_from_sbs(req_msg.data.data) 119 120 if self._token is not None and req.client_token != self._token: 121 res = 'invalid client token' 122 123 else: 124 res = None 125 126 mlog.debug("sending init response") 127 res_msg_data = chatter.Data( 128 module='HatSyncer', 129 type='MsgInitRes', 130 data=common.syncer_init_res_to_sbs(res)) 131 conn.send(res_msg_data, conv=req_msg.conv) 132 133 if res is not None: 134 await conn.drain() 135 raise Exception(res) 136 137 client_id = next(self._next_client_ids) 138 last_event_id = req.last_event_id 139 subscription = self._subscription.intersection( 140 common.Subscription(req.subscriptions)) 141 client_info = ClientInfo(name=req.client_name, 142 synced=False) 143 144 self._update_client_info(client_id, client_info) 145 146 mlog.debug("creating client") 147 synced_cb = functools.partial(self._update_client_info, client_id, 148 client_info._replace(synced=True)) 149 client = _Client(query_cb=self._query_cb, 150 notify_cbs=self._notify_cbs, 151 conn=conn, 152 last_event_id=last_event_id, 153 subscription=subscription, 154 synced_cb=synced_cb) 155 156 self._clients[client_id] = client 157 try: 158 await client.wait_closing() 159 160 finally: 161 self._clients.pop(client_id) 162 await aio.uncancellable(client.async_close()) 163 164 except ConnectionError: 165 pass 166 167 except Exception as e: 168 mlog.error("connection loop error: %s", e, exc_info=e) 169 170 finally: 171 mlog.debug("closing client connection loop") 172 conn.close() 173 self._remove_client_info(client_id) 174 await aio.uncancellable(conn.async_close())
Syncer server
def
register_state_cb( self, cb: Callable[[list[ClientInfo]], NoneType]) -> hat.util.RegisterCallbackHandle:
72 def register_state_cb(self, 73 cb: StateCb 74 ) -> util.RegisterCallbackHandle: 75 """Register state change callback""" 76 return self._state_cbs.register(cb)
Register state change callback
def
notify(self, events: list[hat.event.common.data.Event]):
78 def notify(self, events: list[common.Event]): 79 """Notify clients of new events""" 80 self._notify_cbs.notify(events)
Notify clients of new events
async def
flush(self):
82 async def flush(self): 83 """Send flush requests and wait for flush responses""" 84 if not self.is_open: 85 await self.wait_closed() 86 return 87 88 if not self._clients: 89 return 90 91 await asyncio.wait([self.async_group.spawn(client.flush) 92 for client in self._clients.values()])
Send flush requests and wait for flush responses
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close