hat.event.server.syncer_server

Syncer server

  1"""Syncer server"""
  2
  3import asyncio
  4import contextlib
  5import functools
  6import itertools
  7import logging
  8import typing
  9
 10from hat import aio
 11from hat import chatter
 12from hat import json
 13from hat import util
 14from hat.event.server import common
 15import hat.event.common.data
 16
 17
 18mlog: logging.Logger = logging.getLogger(__name__)
 19"""Module logger"""
 20
 21
 22class ClientInfo(typing.NamedTuple):
 23    """Client connection information"""
 24    name: str
 25    synced: bool
 26
 27
 28StateCb = typing.Callable[[typing.List[ClientInfo]], None]
 29"""Syncer state change callback"""
 30
 31
 32async def create_syncer_server(conf: json.Data,
 33                               backend: common.Backend
 34                               ) -> 'SyncerServer':
 35    """Create syncer server
 36
 37    Args:
 38        conf: configuration defined by
 39            ``hat-event://main.yaml#/definitions/syncer_server``
 40        backend: backend
 41
 42    """
 43    srv = SyncerServer()
 44    srv._backend = backend
 45    srv._state = {}
 46    srv._next_client_ids = itertools.count(1)
 47    srv._state_cbs = util.CallbackRegistry()
 48    srv._clients = {}
 49
 50    srv._server = await chatter.listen(sbs_repo=common.sbs_repo,
 51                                       address=conf['address'],
 52                                       connection_cb=srv._on_connection,
 53                                       bind_connections=False)
 54    mlog.debug("listening on %s", conf['address'])
 55    return srv
 56
 57
 58class SyncerServer(aio.Resource):
 59
 60    @property
 61    def async_group(self) -> aio.Group:
 62        """Async group"""
 63        return self._server.async_group
 64
 65    @property
 66    def state(self) -> typing.Iterable[ClientInfo]:
 67        """State of all active connections"""
 68        return self._state.values()
 69
 70    def register_state_cb(self,
 71                          cb: StateCb
 72                          ) -> util.RegisterCallbackHandle:
 73        """Register state change callback"""
 74        return self._state_cbs.register(cb)
 75
 76    async def flush(self):
 77        """Send flush requests and wait for flush responses"""
 78        if not self.is_open:
 79            await self.wait_closed()
 80            return
 81
 82        await asyncio.wait([self.async_group.spawn(client.flush)
 83                            for client in self._clients.values()])
 84
 85    def _on_connection(self, conn):
 86        self.async_group.spawn(self._connection_loop, conn)
 87
 88    def _update_client_info(self, client_id, name, synced):
 89        self._state[client_id] = ClientInfo(name, synced)
 90        self._state_cbs.notify(list(self._state.values()))
 91
 92    def _remove_client_info(self, client_id):
 93        if self._state.pop(client_id, None):
 94            self._state_cbs.notify(list(self._state.values()))
 95
 96    async def _connection_loop(self, conn):
 97        mlog.debug("starting new connection loop")
 98
 99        client_id = None
100        try:
101            mlog.debug("waiting for incomming message")
102            msg = await conn.receive()
103            msg_type = msg.data.module, msg.data.type
104
105            if msg_type != ('HatSyncer', 'MsgReq'):
106                raise Exception('unsupported message type')
107
108            mlog.debug("received request")
109            msg_req = hat.event.common.data.syncer_req_from_sbs(msg.data.data)
110            client_id = next(self._next_client_ids)
111            name = msg_req.client_name
112            last_event_id = msg_req.last_event_id
113            self._update_client_info(client_id, name, False)
114
115            mlog.debug("creating client")
116            synced_cb = functools.partial(self._update_client_info, client_id,
117                                          name, True)
118            client = _Client(backend=self._backend,
119                             conn=conn,
120                             last_event_id=last_event_id,
121                             synced_cb=synced_cb)
122
123            self._clients[client_id] = client
124            try:
125                await client.wait_closing()
126
127            finally:
128                self._clients.pop(client_id)
129                await aio.uncancellable(client.async_close())
130
131        except ConnectionError:
132            pass
133
134        except Exception as e:
135            mlog.error("connection loop error: %s", e, exc_info=e)
136
137        finally:
138            mlog.debug("closing client connection loop")
139            conn.close()
140            self._remove_client_info(client_id)
141            await aio.uncancellable(conn.async_close())
142
143
144class _Client(aio.Resource):
145
146    def __init__(self, backend, conn, last_event_id, synced_cb):
147        self._backend = backend
148        self._conn = conn
149        self._last_event_id = last_event_id
150        self._async_group = aio.Group()
151
152        self._receiver = _Receiver(conn)
153        self.async_group.spawn(aio.call_on_done, self._receiver.wait_closing(),
154                               self.close)
155
156        self._sender = _Sender(conn, last_event_id, synced_cb, self._receiver)
157        self.async_group.spawn(aio.call_on_done, self._sender.wait_closing(),
158                               self.close)
159
160        self.async_group.spawn(self._client_loop)
161        self.async_group.spawn(aio.call_on_done, self._conn.wait_closing(),
162                               self.close)
163
164    @property
165    def async_group(self):
166        return self._async_group
167
168    async def flush(self):
169        try:
170            await self._sender.send_flush()
171
172        except Exception:
173            await self._sender.wait_closed()
174
175    async def _client_loop(self):
176        mlog.debug("starting new client loop")
177        events_queue = aio.Queue()
178        is_synced = False
179
180        async def cleanup():
181            self.close()
182
183            if is_synced:
184                with contextlib.suppress(ConnectionError):
185                    while not events_queue.empty():
186                        self._sender.send_events(events_queue.get_nowait())
187
188            await self._sender.async_close()
189            await self._receiver.async_close()
190
191        try:
192            with self._backend.register_flushed_events_cb(
193                    events_queue.put_nowait):
194                mlog.debug("query backend")
195                async for events in self._backend.query_flushed(
196                        self._last_event_id):
197                    self._sender.send_events(events)
198
199                mlog.debug("sending synced")
200                self._sender.send_synced()
201                is_synced = True
202
203                while True:
204                    events = await events_queue.get()
205                    self._sender.send_events(events)
206
207        except ConnectionError:
208            pass
209
210        except Exception as e:
211            mlog.error("client loop error: %s", e, exc_info=e)
212
213        finally:
214            mlog.debug("stopping client loop")
215            await aio.uncancellable(cleanup())
216
217
218class _Sender(aio.Resource):
219
220    def __init__(self, conn, last_event_id, synced_cb, receiver):
221        self._conn = conn
222        self._last_event_id = last_event_id
223        self._synced_cb = synced_cb
224        self._receiver = receiver
225        self._send_queue = aio.Queue()
226        self._async_group = aio.Group()
227
228        self.async_group.spawn(self._send_loop)
229
230    @property
231    def async_group(self):
232        return self._async_group
233
234    def send_events(self, events):
235        try:
236            self._send_queue.put_nowait(('events', events))
237
238        except aio.QueueClosedError:
239            raise ConnectionError()
240
241    def send_synced(self):
242        try:
243            self._send_queue.put_nowait(('synced', None))
244
245        except aio.QueueClosedError:
246            raise ConnectionError()
247
248    async def send_flush(self):
249        try:
250            future = asyncio.Future()
251            self._send_queue.put_nowait(('flush', future))
252            await future
253
254        except aio.QueueClosedError:
255            raise ConnectionError()
256
257    async def _send_loop(self):
258        is_synced = False
259
260        async def cleanup():
261            self.close()
262            self._send_queue.close()
263
264            while not self._send_queue.empty():
265                msg_type, msg_data = self._send_queue.get_nowait()
266
267                with contextlib.suppress(Exception):
268                    if msg_type == 'events' and is_synced:
269                        self._send_events(msg_data)
270
271                    elif msg_type == 'flush':
272                        self._send_flush(msg_data)
273
274            if is_synced:
275                with contextlib.suppress(Exception):
276                    future = asyncio.Future()
277                    self._send_flush(future)
278                    await future
279
280            with contextlib.suppress(Exception):
281                await self._conn.drain()
282
283        try:
284            while True:
285                msg_type, msg_data = await self._send_queue.get()
286
287                if msg_type == 'events':
288                    self._send_events(msg_data)
289
290                elif msg_type == 'synced':
291                    self._send_synced()
292                    is_synced = True
293                    self._synced_cb()
294
295                elif msg_type == 'flush':
296                    self._send_flush(msg_data)
297
298                else:
299                    raise ValueError('unsupported message')
300
301        except ConnectionError:
302            pass
303
304        except Exception as e:
305            mlog.error("send loop error: %s", e, exc_info=e)
306
307        finally:
308            await aio.uncancellable(cleanup())
309
310    def _send_events(self, events):
311        if not events:
312            return
313
314        if events[0].event_id.server != self._last_event_id.server:
315            return
316
317        if events[0].event_id.session < self._last_event_id.session:
318            return
319
320        if events[0].event_id.session == self._last_event_id.session:
321            events = [event for event in events
322                      if event.event_id > self._last_event_id]
323            if not events:
324                return
325
326        mlog.debug("sending events")
327        data = chatter.Data(module='HatSyncer',
328                            type='MsgEvents',
329                            data=[common.event_to_sbs(e)
330                                  for e in events])
331        self._conn.send(data)
332        self._last_event_id = events[-1].event_id
333
334    def _send_synced(self):
335        self._conn.send(chatter.Data(module='HatSyncer',
336                                     type='MsgSynced',
337                                     data=None))
338
339    def _send_flush(self, future):
340        try:
341            conv = self._conn.send(chatter.Data(module='HatSyncer',
342                                                type='MsgFlushReq',
343                                                data=None),
344                                   last=False)
345            self._receiver.add_flush_future(conv, future)
346
347        except Exception:
348            future.set_result(None)
349            raise
350
351
352class _Receiver(aio.Resource):
353
354    def __init__(self, conn):
355        self._conn = conn
356        self._flush_futures = {}
357        self._async_group = aio.Group()
358
359        self.async_group.spawn(self._receive_loop)
360
361    @property
362    def async_group(self):
363        return self._async_group
364
365    def add_flush_future(self, conv, future):
366        if not self.is_open:
367            raise ConnectionError()
368
369        self._flush_futures[conv] = future
370
371    async def _receive_loop(self):
372        try:
373            while True:
374                msg = await self._conn.receive()
375                msg_type = msg.data.module, msg.data.type
376
377                if msg_type != ('HatSyncer', 'MsgFlushRes'):
378                    raise Exception("unsupported message type")
379
380                flush_future = self._flush_futures.pop(msg.conv, None)
381                if flush_future and not flush_future.done():
382                    flush_future.set_result(None)
383
384        except ConnectionError:
385            pass
386
387        except Exception as e:
388            mlog.error("receive loop error: %s", e, exc_info=e)
389
390        finally:
391            self.close()
392
393            for flush_future in self._flush_futures.values():
394                if not flush_future.done():
395                    flush_future.set_result(None)
mlog: logging.Logger = <Logger hat.event.server.syncer_server (WARNING)>

Module logger

class ClientInfo(typing.NamedTuple):
23class ClientInfo(typing.NamedTuple):
24    """Client connection information"""
25    name: str
26    synced: bool

Client connection information

ClientInfo(name: str, synced: bool)

Create new instance of ClientInfo(name, synced)

name: str

Alias for field number 0

synced: bool

Alias for field number 1

Inherited Members
builtins.tuple
index
count
StateCb = typing.Callable[[typing.List[hat.event.server.syncer_server.ClientInfo]], NoneType]

Syncer state change callback

async def create_syncer_server( conf: ~Data, backend: hat.event.server.common.Backend) -> hat.event.server.syncer_server.SyncerServer:
33async def create_syncer_server(conf: json.Data,
34                               backend: common.Backend
35                               ) -> 'SyncerServer':
36    """Create syncer server
37
38    Args:
39        conf: configuration defined by
40            ``hat-event://main.yaml#/definitions/syncer_server``
41        backend: backend
42
43    """
44    srv = SyncerServer()
45    srv._backend = backend
46    srv._state = {}
47    srv._next_client_ids = itertools.count(1)
48    srv._state_cbs = util.CallbackRegistry()
49    srv._clients = {}
50
51    srv._server = await chatter.listen(sbs_repo=common.sbs_repo,
52                                       address=conf['address'],
53                                       connection_cb=srv._on_connection,
54                                       bind_connections=False)
55    mlog.debug("listening on %s", conf['address'])
56    return srv

Create syncer server

Arguments:
  • conf: configuration defined by hat-event://main.yaml#/definitions/syncer_server
  • backend: backend
class SyncerServer(hat.aio.Resource):
 59class SyncerServer(aio.Resource):
 60
 61    @property
 62    def async_group(self) -> aio.Group:
 63        """Async group"""
 64        return self._server.async_group
 65
 66    @property
 67    def state(self) -> typing.Iterable[ClientInfo]:
 68        """State of all active connections"""
 69        return self._state.values()
 70
 71    def register_state_cb(self,
 72                          cb: StateCb
 73                          ) -> util.RegisterCallbackHandle:
 74        """Register state change callback"""
 75        return self._state_cbs.register(cb)
 76
 77    async def flush(self):
 78        """Send flush requests and wait for flush responses"""
 79        if not self.is_open:
 80            await self.wait_closed()
 81            return
 82
 83        await asyncio.wait([self.async_group.spawn(client.flush)
 84                            for client in self._clients.values()])
 85
 86    def _on_connection(self, conn):
 87        self.async_group.spawn(self._connection_loop, conn)
 88
 89    def _update_client_info(self, client_id, name, synced):
 90        self._state[client_id] = ClientInfo(name, synced)
 91        self._state_cbs.notify(list(self._state.values()))
 92
 93    def _remove_client_info(self, client_id):
 94        if self._state.pop(client_id, None):
 95            self._state_cbs.notify(list(self._state.values()))
 96
 97    async def _connection_loop(self, conn):
 98        mlog.debug("starting new connection loop")
 99
100        client_id = None
101        try:
102            mlog.debug("waiting for incomming message")
103            msg = await conn.receive()
104            msg_type = msg.data.module, msg.data.type
105
106            if msg_type != ('HatSyncer', 'MsgReq'):
107                raise Exception('unsupported message type')
108
109            mlog.debug("received request")
110            msg_req = hat.event.common.data.syncer_req_from_sbs(msg.data.data)
111            client_id = next(self._next_client_ids)
112            name = msg_req.client_name
113            last_event_id = msg_req.last_event_id
114            self._update_client_info(client_id, name, False)
115
116            mlog.debug("creating client")
117            synced_cb = functools.partial(self._update_client_info, client_id,
118                                          name, True)
119            client = _Client(backend=self._backend,
120                             conn=conn,
121                             last_event_id=last_event_id,
122                             synced_cb=synced_cb)
123
124            self._clients[client_id] = client
125            try:
126                await client.wait_closing()
127
128            finally:
129                self._clients.pop(client_id)
130                await aio.uncancellable(client.async_close())
131
132        except ConnectionError:
133            pass
134
135        except Exception as e:
136            mlog.error("connection loop error: %s", e, exc_info=e)
137
138        finally:
139            mlog.debug("closing client connection loop")
140            conn.close()
141            self._remove_client_info(client_id)
142            await aio.uncancellable(conn.async_close())

Resource with lifetime control based on Group.

SyncerServer()
async_group: hat.aio.Group

Async group

State of all active connections

def register_state_cb( self, cb: Callable[[List[hat.event.server.syncer_server.ClientInfo]], NoneType]) -> hat.util.RegisterCallbackHandle:
71    def register_state_cb(self,
72                          cb: StateCb
73                          ) -> util.RegisterCallbackHandle:
74        """Register state change callback"""
75        return self._state_cbs.register(cb)

Register state change callback

async def flush(self):
77    async def flush(self):
78        """Send flush requests and wait for flush responses"""
79        if not self.is_open:
80            await self.wait_closed()
81            return
82
83        await asyncio.wait([self.async_group.spawn(client.flush)
84                            for client in self._clients.values()])

Send flush requests and wait for flush responses

Inherited Members
hat.aio.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close