hat.event.server.eventer_client

  1from collections.abc import Callable
  2import collections
  3import logging
  4import typing
  5
  6from hat import aio
  7from hat.drivers import tcp
  8
  9from hat.event import common
 10from hat.event import eventer
 11
 12
 13mlog: logging.Logger = logging.getLogger(__name__)
 14"""Module logger"""
 15
 16SyncedCb: typing.TypeAlias = Callable[[common.ServerId, int], None]
 17"""Synced callback"""
 18
 19
 20async def create_eventer_client(addr: tcp.Address,
 21                                client_name: str,
 22                                server_id: common.ServerId,
 23                                backend: common.Backend,
 24                                *,
 25                                client_token: str | None = None,
 26                                synced_cb: SyncedCb | None = None,
 27                                **kwargs
 28                                ) -> 'EventerClient':
 29    """Create eventer client"""
 30    client = EventerClient()
 31    client._server_id = server_id
 32    client._backend = backend
 33    client._synced_cb = synced_cb
 34    client._synced = False
 35    client._events_queue = collections.deque()
 36
 37    client._client = await eventer.connect(addr=addr,
 38                                           client_name=client_name,
 39                                           client_token=client_token,
 40                                           subscriptions=[('*', )],
 41                                           server_id=server_id,
 42                                           persisted=True,
 43                                           events_cb=client._on_events,
 44                                           **kwargs)
 45
 46    try:
 47        client.async_group.spawn(client._synchronize)
 48
 49    except BaseException:
 50        await aio.uncancellable(client.async_close())
 51        raise
 52
 53    return client
 54
 55
 56class EventerClient(aio.Resource):
 57    """Eventer client
 58
 59    For creating new client see `create_eventer_client` coroutine.
 60
 61    """
 62
 63    @property
 64    def async_group(self) -> aio.Group:
 65        """Async group"""
 66        return self._client.async_group
 67
 68    @property
 69    def synced(self) -> bool:
 70        """Synced state"""
 71        return self._synced
 72
 73    async def _on_events(self, client, events):
 74        mlog.debug("received %s notify events", len(events))
 75
 76        if self._events_queue is not None:
 77            self._events_queue.append(events)
 78            return
 79
 80        await self._backend.register(events)
 81
 82    async def _synchronize(self):
 83        mlog.debug("starting synchronization")
 84
 85        try:
 86            last_event_id = await self._backend.get_last_event_id(
 87                self._server_id)
 88            events = collections.deque()
 89            result = common.QueryResult([], True)
 90            synced_counter = 0
 91
 92            while result.more_follows:
 93                params = common.QueryServerParams(server_id=self._server_id,
 94                                                  persisted=True,
 95                                                  last_event_id=last_event_id)
 96                result = await self._client.query(params)
 97
 98                mlog.debug("received %s query events", len(result.events))
 99                events.extend(result.events)
100                synced_counter += len(result.events)
101                if not events:
102                    continue
103
104                last_event_id = events[-1].id
105
106                while events[0].id.session != events[-1].id.session:
107                    session_id = events[0].id.session
108                    session_events = collections.deque()
109
110                    while events[0].id.session == session_id:
111                        session_events.append(events.popleft())
112
113                    await self._backend.register(session_events)
114
115            if events:
116                await self._backend.register(events)
117
118            mlog.debug("processing cached notify events")
119            while self._events_queue:
120                events = [event for event in self._events_queue.popleft()
121                          if event.id > last_event_id]
122                if not events:
123                    continue
124
125                await self._backend.register(events)
126
127            self._events_queue = None
128            self._synced = True
129
130            mlog.debug("synchronized %s events", synced_counter)
131            if self._synced_cb:
132                await aio.call(self._synced_cb, self._server_id,
133                               synced_counter)
134
135        except ConnectionError:
136            mlog.debug("connection closed")
137            self.close()
138
139        except Exception as e:
140            mlog.error("synchronization error: %s", e, exc_info=e)
141            self.close()
mlog: logging.Logger = <Logger hat.event.server.eventer_client (WARNING)>

Module logger

SyncedCb: TypeAlias = collections.abc.Callable[[int, int], None]

Synced callback

async def create_eventer_client( addr: hat.drivers.tcp.Address, client_name: str, server_id: int, backend: hat.event.common.backend.Backend, *, client_token: str | None = None, synced_cb: collections.abc.Callable[[int, int], None] | None = None, **kwargs) -> EventerClient:
21async def create_eventer_client(addr: tcp.Address,
22                                client_name: str,
23                                server_id: common.ServerId,
24                                backend: common.Backend,
25                                *,
26                                client_token: str | None = None,
27                                synced_cb: SyncedCb | None = None,
28                                **kwargs
29                                ) -> 'EventerClient':
30    """Create eventer client"""
31    client = EventerClient()
32    client._server_id = server_id
33    client._backend = backend
34    client._synced_cb = synced_cb
35    client._synced = False
36    client._events_queue = collections.deque()
37
38    client._client = await eventer.connect(addr=addr,
39                                           client_name=client_name,
40                                           client_token=client_token,
41                                           subscriptions=[('*', )],
42                                           server_id=server_id,
43                                           persisted=True,
44                                           events_cb=client._on_events,
45                                           **kwargs)
46
47    try:
48        client.async_group.spawn(client._synchronize)
49
50    except BaseException:
51        await aio.uncancellable(client.async_close())
52        raise
53
54    return client

Create eventer client

class EventerClient(hat.aio.group.Resource):
 57class EventerClient(aio.Resource):
 58    """Eventer client
 59
 60    For creating new client see `create_eventer_client` coroutine.
 61
 62    """
 63
 64    @property
 65    def async_group(self) -> aio.Group:
 66        """Async group"""
 67        return self._client.async_group
 68
 69    @property
 70    def synced(self) -> bool:
 71        """Synced state"""
 72        return self._synced
 73
 74    async def _on_events(self, client, events):
 75        mlog.debug("received %s notify events", len(events))
 76
 77        if self._events_queue is not None:
 78            self._events_queue.append(events)
 79            return
 80
 81        await self._backend.register(events)
 82
 83    async def _synchronize(self):
 84        mlog.debug("starting synchronization")
 85
 86        try:
 87            last_event_id = await self._backend.get_last_event_id(
 88                self._server_id)
 89            events = collections.deque()
 90            result = common.QueryResult([], True)
 91            synced_counter = 0
 92
 93            while result.more_follows:
 94                params = common.QueryServerParams(server_id=self._server_id,
 95                                                  persisted=True,
 96                                                  last_event_id=last_event_id)
 97                result = await self._client.query(params)
 98
 99                mlog.debug("received %s query events", len(result.events))
100                events.extend(result.events)
101                synced_counter += len(result.events)
102                if not events:
103                    continue
104
105                last_event_id = events[-1].id
106
107                while events[0].id.session != events[-1].id.session:
108                    session_id = events[0].id.session
109                    session_events = collections.deque()
110
111                    while events[0].id.session == session_id:
112                        session_events.append(events.popleft())
113
114                    await self._backend.register(session_events)
115
116            if events:
117                await self._backend.register(events)
118
119            mlog.debug("processing cached notify events")
120            while self._events_queue:
121                events = [event for event in self._events_queue.popleft()
122                          if event.id > last_event_id]
123                if not events:
124                    continue
125
126                await self._backend.register(events)
127
128            self._events_queue = None
129            self._synced = True
130
131            mlog.debug("synchronized %s events", synced_counter)
132            if self._synced_cb:
133                await aio.call(self._synced_cb, self._server_id,
134                               synced_counter)
135
136        except ConnectionError:
137            mlog.debug("connection closed")
138            self.close()
139
140        except Exception as e:
141            mlog.error("synchronization error: %s", e, exc_info=e)
142            self.close()

Eventer client

For creating new client see create_eventer_client coroutine.

async_group: hat.aio.group.Group
64    @property
65    def async_group(self) -> aio.Group:
66        """Async group"""
67        return self._client.async_group

Async group

synced: bool
69    @property
70    def synced(self) -> bool:
71        """Synced state"""
72        return self._synced

Synced state

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close