hat.event.syncer

 1from hat.event.syncer.client import (SyncedCb,
 2                                     EventsCb,
 3                                     SyncerInitError,
 4                                     connect,
 5                                     Client)
 6from hat.event.syncer.server import (ClientInfo,
 7                                     StateCb,
 8                                     QueryCb,
 9                                     listen,
10                                     Server)
11
12
13__all__ = ['SyncedCb',
14           'EventsCb',
15           'SyncerInitError',
16           'connect',
17           'Client',
18           'ClientInfo',
19           'StateCb',
20           'QueryCb',
21           'listen',
22           'Server']
SyncedCb = typing.Callable[[], typing.Optional[typing.Awaitable[NoneType]]]
EventsCb = typing.Callable[[list[hat.event.common.data.Event]], typing.Optional[typing.Awaitable[NoneType]]]
class SyncerInitError(builtins.Exception):
21class SyncerInitError(Exception):
22    """Syncer initialization error"""

Syncer initialization error

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
async def connect( address: str, client_name: str, last_event_id: hat.event.common.data.EventId, synced_cb: Optional[Callable[[], Optional[Awaitable[NoneType]]]] = None, events_cb: Optional[Callable[[list[hat.event.common.data.Event]], Optional[Awaitable[NoneType]]]] = None, client_token: str | None = None, subscriptions: list[typing.Tuple[str, ...]] = [('*',)]) -> Client:
25async def connect(address: str,
26                  client_name: str,
27                  last_event_id: common.EventId,
28                  synced_cb: SyncedCb | None = None,
29                  events_cb: EventsCb | None = None,
30                  client_token: str | None = None,
31                  subscriptions: list[common.EventType] = [('*',)]
32                  ) -> 'Client':
33    """Connect to remote syncer server"""
34    client = Client()
35    client._synced_cb = synced_cb
36    client._events_cb = events_cb
37
38    client._conn = await chatter.connect(common.sbs_repo, address)
39    mlog.debug("connected to %s", address)
40
41    try:
42        req = common.SyncerInitReq(last_event_id=last_event_id,
43                                   client_name=client_name,
44                                   client_token=client_token,
45                                   subscriptions=subscriptions)
46
47        mlog.debug("sending %s", req)
48        req_msg_data = chatter.Data(module='HatSyncer',
49                                    type='MsgInitReq',
50                                    data=common.syncer_init_req_to_sbs(req))
51        client._conn.send(req_msg_data, last=False)
52
53        res_msg = await client._conn.receive()
54        res_msg_type = res_msg.data.module, res_msg.data.type
55
56        if res_msg_type != ('HatSyncer', 'MsgInitRes'):
57            raise Exception('unsupported message type')
58
59        mlog.debug("received init response")
60        res = common.syncer_init_res_from_sbs(res_msg.data.data)
61
62        if res is not None:
63            raise SyncerInitError(res)
64
65        client.async_group.spawn(client._receive_loop)
66
67    except BaseException:
68        await aio.uncancellable(client.async_close())
69        raise
70
71    return client

Connect to remote syncer server

class Client(hat.aio.group.Resource):
 74class Client(aio.Resource):
 75    """Syncer client"""
 76
 77    @property
 78    def async_group(self) -> aio.Group:
 79        """Async group"""
 80        return self._conn.async_group
 81
 82    async def _receive_loop(self):
 83        mlog.debug("staring receive loop")
 84        try:
 85            while True:
 86                mlog.debug("waiting for incoming message")
 87                msg = await self._conn.receive()
 88                msg_type = msg.data.module, msg.data.type
 89
 90                if msg_type == ('HatSyncer', 'MsgEvents'):
 91                    mlog.debug("received events")
 92                    events = [common.event_from_sbs(i)
 93                              for i in msg.data.data]
 94
 95                    if self._events_cb:
 96                        await aio.call(self._events_cb, events)
 97
 98                elif msg_type == ('HatSyncer', 'MsgSynced'):
 99                    mlog.debug("received synced")
100
101                    if self._synced_cb:
102                        await aio.call(self._synced_cb)
103
104                elif msg_type == ('HatSyncer', 'MsgFlushReq'):
105                    mlog.debug("received flush request")
106                    self._conn.send(chatter.Data(module='HatSyncer',
107                                                 type='MsgFlushRes',
108                                                 data=None),
109                                    conv=msg.conv)
110
111                else:
112                    raise Exception("unsupported message type")
113
114        except ConnectionError:
115            pass
116
117        except Exception as e:
118            mlog.error("receive loop error: %s", e, exc_info=e)
119
120        finally:
121            mlog.debug("stopping receive loop")
122            self.close()

Syncer client

async_group: hat.aio.group.Group

Async group

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
class ClientInfo(typing.NamedTuple):
20class ClientInfo(typing.NamedTuple):
21    """Client connection information"""
22    name: str
23    synced: bool

Client connection information

ClientInfo(name: str, synced: bool)

Create new instance of ClientInfo(name, synced)

name: str

Alias for field number 0

synced: bool

Alias for field number 1

Inherited Members
builtins.tuple
index
count
StateCb = typing.Callable[[list[ClientInfo]], NoneType]
QueryCb = typing.Callable[[hat.event.common.data.EventId], typing.AsyncIterable[list[hat.event.common.data.Event]]]
async def listen( address: str, query_cb: Optional[Callable[[hat.event.common.data.EventId], AsyncIterable[list[hat.event.common.data.Event]]]] = None, subscriptions: list[typing.Tuple[str, ...]] = [('*',)], token: str | None = None) -> Server:
34async def listen(address: str,
35                 query_cb: QueryCb | None = None,
36                 subscriptions: list[common.EventType] = [('*',)],
37                 token: str | None = None
38                 ) -> 'Server':
39    """Create listening syncer server"""
40    server = Server()
41    server._query_cb = query_cb
42    server._subscription = common.Subscription(subscriptions)
43    server._token = token
44    server._state = {}
45    server._next_client_ids = itertools.count(1)
46    server._state_cbs = util.CallbackRegistry()
47    server._notify_cbs = util.CallbackRegistry()
48    server._clients = {}
49
50    server._server = await chatter.listen(sbs_repo=common.sbs_repo,
51                                          address=address,
52                                          connection_cb=server._on_connection,
53                                          bind_connections=False)
54
55    mlog.debug("listening on %s", address)
56    return server

Create listening syncer server

class Server(hat.aio.group.Resource):
 59class Server(aio.Resource):
 60    """Syncer server"""
 61
 62    @property
 63    def async_group(self):
 64        """Async group"""
 65        return self._server.async_group
 66
 67    @property
 68    def state(self) -> list[ClientInfo]:
 69        """State of all active connections"""
 70        return list(self._state.values())
 71
 72    def register_state_cb(self,
 73                          cb: StateCb
 74                          ) -> util.RegisterCallbackHandle:
 75        """Register state change callback"""
 76        return self._state_cbs.register(cb)
 77
 78    def notify(self, events: list[common.Event]):
 79        """Notify clients of new events"""
 80        self._notify_cbs.notify(events)
 81
 82    async def flush(self):
 83        """Send flush requests and wait for flush responses"""
 84        if not self.is_open:
 85            await self.wait_closed()
 86            return
 87
 88        if not self._clients:
 89            return
 90
 91        await asyncio.wait([self.async_group.spawn(client.flush)
 92                            for client in self._clients.values()])
 93
 94    def _on_connection(self, conn):
 95        self.async_group.spawn(self._connection_loop, conn)
 96
 97    def _update_client_info(self, client_id, client_info):
 98        self._state[client_id] = client_info
 99        self._state_cbs.notify(list(self._state.values()))
100
101    def _remove_client_info(self, client_id):
102        if self._state.pop(client_id, None):
103            self._state_cbs.notify(list(self._state.values()))
104
105    async def _connection_loop(self, conn):
106        mlog.debug("starting new connection loop")
107
108        client_id = None
109        try:
110            mlog.debug("waiting for incomming message")
111            req_msg = await conn.receive()
112            req_msg_type = req_msg.data.module, req_msg.data.type
113
114            if req_msg_type != ('HatSyncer', 'MsgInitReq'):
115                raise Exception('unsupported message type')
116
117            mlog.debug("received init request")
118            req = common.syncer_init_req_from_sbs(req_msg.data.data)
119
120            if self._token is not None and req.client_token != self._token:
121                res = 'invalid client token'
122
123            else:
124                res = None
125
126            mlog.debug("sending init response")
127            res_msg_data = chatter.Data(
128                module='HatSyncer',
129                type='MsgInitRes',
130                data=common.syncer_init_res_to_sbs(res))
131            conn.send(res_msg_data, conv=req_msg.conv)
132
133            if res is not None:
134                await conn.drain()
135                raise Exception(res)
136
137            client_id = next(self._next_client_ids)
138            last_event_id = req.last_event_id
139            subscription = self._subscription.intersection(
140                common.Subscription(req.subscriptions))
141            client_info = ClientInfo(name=req.client_name,
142                                     synced=False)
143
144            self._update_client_info(client_id, client_info)
145
146            mlog.debug("creating client")
147            synced_cb = functools.partial(self._update_client_info, client_id,
148                                          client_info._replace(synced=True))
149            client = _Client(query_cb=self._query_cb,
150                             notify_cbs=self._notify_cbs,
151                             conn=conn,
152                             last_event_id=last_event_id,
153                             subscription=subscription,
154                             synced_cb=synced_cb)
155
156            self._clients[client_id] = client
157            try:
158                await client.wait_closing()
159
160            finally:
161                self._clients.pop(client_id)
162                await aio.uncancellable(client.async_close())
163
164        except ConnectionError:
165            pass
166
167        except Exception as e:
168            mlog.error("connection loop error: %s", e, exc_info=e)
169
170        finally:
171            mlog.debug("closing client connection loop")
172            conn.close()
173            self._remove_client_info(client_id)
174            await aio.uncancellable(conn.async_close())

Syncer server

async_group

Async group

state: list[ClientInfo]

State of all active connections

def register_state_cb( self, cb: Callable[[list[ClientInfo]], NoneType]) -> hat.util.RegisterCallbackHandle:
72    def register_state_cb(self,
73                          cb: StateCb
74                          ) -> util.RegisterCallbackHandle:
75        """Register state change callback"""
76        return self._state_cbs.register(cb)

Register state change callback

def notify(self, events: list[hat.event.common.data.Event]):
78    def notify(self, events: list[common.Event]):
79        """Notify clients of new events"""
80        self._notify_cbs.notify(events)

Notify clients of new events

async def flush(self):
82    async def flush(self):
83        """Send flush requests and wait for flush responses"""
84        if not self.is_open:
85            await self.wait_closed()
86            return
87
88        if not self._clients:
89            return
90
91        await asyncio.wait([self.async_group.spawn(client.flush)
92                            for client in self._clients.values()])

Send flush requests and wait for flush responses

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close