hat.event.server.eventer_client

  1import collections
  2import contextlib
  3import enum
  4import logging
  5import typing
  6
  7from hat import aio
  8from hat.drivers import tcp
  9
 10from hat.event import common
 11from hat.event import eventer
 12
 13
 14mlog: logging.Logger = logging.getLogger(__name__)
 15"""Module logger"""
 16
 17
 18class SyncedState(enum.Enum):
 19    """Synced state"""
 20    CONNECTED = 0
 21    SYNCING = 1
 22    SYNCED = 2
 23
 24
 25StatusCb: typing.TypeAlias = aio.AsyncCallable[[common.Status], None]
 26"""Status callback"""
 27
 28SyncedCb: typing.TypeAlias = aio.AsyncCallable[[SyncedState, int | None], None]
 29"""Synced callback"""
 30
 31
 32async def create_eventer_client(addr: tcp.Address,
 33                                client_name: str,
 34                                local_server_id: common.ServerId,
 35                                remote_server_id: common.ServerId,
 36                                backend: common.Backend,
 37                                *,
 38                                client_token: str | None = None,
 39                                status_cb: StatusCb | None = None,
 40                                synced_cb: SyncedCb | None = None,
 41                                **kwargs
 42                                ) -> 'EventerClient':
 43    """Create eventer client"""
 44    client = EventerClient()
 45    client._local_server_id = local_server_id
 46    client._remote_server_id = remote_server_id
 47    client._backend = backend
 48    client._status_cb = status_cb
 49    client._synced_cb = synced_cb
 50    client._synced = None
 51    client._events_queue = collections.deque()
 52
 53    client._client = await eventer.connect(addr=addr,
 54                                           client_name=client_name,
 55                                           client_token=client_token,
 56                                           subscriptions=[('*', )],
 57                                           server_id=remote_server_id,
 58                                           persisted=True,
 59                                           status_cb=client._on_status,
 60                                           events_cb=client._on_events,
 61                                           **kwargs)
 62
 63    try:
 64        client.async_group.spawn(client._synchronize)
 65
 66    except BaseException:
 67        await aio.uncancellable(client.async_close())
 68        raise
 69
 70    return client
 71
 72
 73class EventerClient(aio.Resource):
 74    """Eventer client
 75
 76    For creating new client see `create_eventer_client` coroutine.
 77
 78    """
 79
 80    @property
 81    def async_group(self) -> aio.Group:
 82        """Async group"""
 83        return self._client.async_group
 84
 85    @property
 86    def status(self) -> common.Status:
 87        """Status"""
 88        return self._client.status
 89
 90    @property
 91    def synced(self) -> SyncedState | None:
 92        """Synced state"""
 93        return self._synced
 94
 95    async def _on_status(self, client, status):
 96        if status == common.Status.OPERATIONAL and self._synced:
 97            data = {'state': self._synced.name}
 98            if self._synced == SyncedState.SYNCED:
 99                data['count'] = None
100
101            with contextlib.suppress(Exception):
102                await self._client.register([
103                    common.RegisterEvent(
104                        type=('event', str(self._local_server_id), 'synced',
105                              str(self._remote_server_id)),
106                        source_timestamp=None,
107                        payload=common.EventPayloadJson(data))])
108
109        if not self._status_cb:
110            return
111
112        await aio.call(self._status_cb, status)
113
114    async def _on_events(self, client, events):
115        mlog.debug("received %s notify events", len(events))
116
117        if self._events_queue is not None:
118            self._events_queue.append(events)
119            return
120
121        await self._backend.register(events)
122
123    async def _synchronize(self):
124        mlog.debug("starting synchronization")
125
126        try:
127            last_event_id = await self._backend.get_last_event_id(
128                self._remote_server_id)
129            events = collections.deque()
130            result = common.QueryResult([], True)
131            synced_counter = 0
132
133            await self._set_synced(SyncedState.CONNECTED, None)
134
135            while result.more_follows:
136                params = common.QueryServerParams(
137                    server_id=self._remote_server_id,
138                    persisted=True,
139                    last_event_id=last_event_id)
140                result = await self._client.query(params)
141
142                mlog.debug("received %s query events", len(result.events))
143                events.extend(result.events)
144
145                if result.events and synced_counter == 0:
146                    await self._set_synced(SyncedState.SYNCING, None)
147
148                synced_counter += len(result.events)
149                if not events:
150                    continue
151
152                last_event_id = events[-1].id
153
154                while events[0].id.session != events[-1].id.session:
155                    session_id = events[0].id.session
156                    session_events = collections.deque()
157
158                    while events[0].id.session == session_id:
159                        session_events.append(events.popleft())
160
161                    await self._backend.register(session_events)
162
163            if events:
164                await self._backend.register(events)
165
166            mlog.debug("processing cached notify events")
167            while self._events_queue:
168                events = [event for event in self._events_queue.popleft()
169                          if event.id > last_event_id]
170                if not events:
171                    continue
172
173                await self._backend.register(events)
174
175            self._events_queue = None
176
177            mlog.debug("synchronized %s events", synced_counter)
178            await self._set_synced(SyncedState.SYNCED, synced_counter)
179
180        except ConnectionError:
181            mlog.debug("connection closed")
182            self.close()
183
184        except Exception as e:
185            mlog.error("synchronization error: %s", e, exc_info=e)
186            self.close()
187
188    async def _set_synced(self, state, count):
189        self._synced = state
190
191        data = {'state': state.name}
192        if state == SyncedState.SYNCED:
193            data['count'] = count
194
195        await self._client.register([
196            common.RegisterEvent(
197                type=('event', str(self._local_server_id), 'synced',
198                      str(self._remote_server_id)),
199                source_timestamp=None,
200                payload=common.EventPayloadJson(data))])
201
202        if not self._synced_cb:
203            return
204
205        await aio.call(self._synced_cb, state, count)
mlog: logging.Logger = <Logger hat.event.server.eventer_client (WARNING)>

Module logger

class SyncedState(enum.Enum):
19class SyncedState(enum.Enum):
20    """Synced state"""
21    CONNECTED = 0
22    SYNCING = 1
23    SYNCED = 2

Synced state

CONNECTED = <SyncedState.CONNECTED: 0>
SYNCING = <SyncedState.SYNCING: 1>
SYNCED = <SyncedState.SYNCED: 2>
StatusCb: TypeAlias = Callable[[hat.event.common.Status], None | Awaitable[None]]

Status callback

SyncedCb: TypeAlias = Callable[[SyncedState, int | None], None | Awaitable[None]]

Synced callback

async def create_eventer_client( addr: hat.drivers.tcp.Address, client_name: str, local_server_id: int, remote_server_id: int, backend: hat.event.common.Backend, *, client_token: str | None = None, status_cb: Optional[Callable[[hat.event.common.Status], None | Awaitable[None]]] = None, synced_cb: Optional[Callable[[SyncedState, int | None], None | Awaitable[None]]] = None, **kwargs) -> EventerClient:
33async def create_eventer_client(addr: tcp.Address,
34                                client_name: str,
35                                local_server_id: common.ServerId,
36                                remote_server_id: common.ServerId,
37                                backend: common.Backend,
38                                *,
39                                client_token: str | None = None,
40                                status_cb: StatusCb | None = None,
41                                synced_cb: SyncedCb | None = None,
42                                **kwargs
43                                ) -> 'EventerClient':
44    """Create eventer client"""
45    client = EventerClient()
46    client._local_server_id = local_server_id
47    client._remote_server_id = remote_server_id
48    client._backend = backend
49    client._status_cb = status_cb
50    client._synced_cb = synced_cb
51    client._synced = None
52    client._events_queue = collections.deque()
53
54    client._client = await eventer.connect(addr=addr,
55                                           client_name=client_name,
56                                           client_token=client_token,
57                                           subscriptions=[('*', )],
58                                           server_id=remote_server_id,
59                                           persisted=True,
60                                           status_cb=client._on_status,
61                                           events_cb=client._on_events,
62                                           **kwargs)
63
64    try:
65        client.async_group.spawn(client._synchronize)
66
67    except BaseException:
68        await aio.uncancellable(client.async_close())
69        raise
70
71    return client

Create eventer client

class EventerClient(hat.aio.group.Resource):
 74class EventerClient(aio.Resource):
 75    """Eventer client
 76
 77    For creating new client see `create_eventer_client` coroutine.
 78
 79    """
 80
 81    @property
 82    def async_group(self) -> aio.Group:
 83        """Async group"""
 84        return self._client.async_group
 85
 86    @property
 87    def status(self) -> common.Status:
 88        """Status"""
 89        return self._client.status
 90
 91    @property
 92    def synced(self) -> SyncedState | None:
 93        """Synced state"""
 94        return self._synced
 95
 96    async def _on_status(self, client, status):
 97        if status == common.Status.OPERATIONAL and self._synced:
 98            data = {'state': self._synced.name}
 99            if self._synced == SyncedState.SYNCED:
100                data['count'] = None
101
102            with contextlib.suppress(Exception):
103                await self._client.register([
104                    common.RegisterEvent(
105                        type=('event', str(self._local_server_id), 'synced',
106                              str(self._remote_server_id)),
107                        source_timestamp=None,
108                        payload=common.EventPayloadJson(data))])
109
110        if not self._status_cb:
111            return
112
113        await aio.call(self._status_cb, status)
114
115    async def _on_events(self, client, events):
116        mlog.debug("received %s notify events", len(events))
117
118        if self._events_queue is not None:
119            self._events_queue.append(events)
120            return
121
122        await self._backend.register(events)
123
124    async def _synchronize(self):
125        mlog.debug("starting synchronization")
126
127        try:
128            last_event_id = await self._backend.get_last_event_id(
129                self._remote_server_id)
130            events = collections.deque()
131            result = common.QueryResult([], True)
132            synced_counter = 0
133
134            await self._set_synced(SyncedState.CONNECTED, None)
135
136            while result.more_follows:
137                params = common.QueryServerParams(
138                    server_id=self._remote_server_id,
139                    persisted=True,
140                    last_event_id=last_event_id)
141                result = await self._client.query(params)
142
143                mlog.debug("received %s query events", len(result.events))
144                events.extend(result.events)
145
146                if result.events and synced_counter == 0:
147                    await self._set_synced(SyncedState.SYNCING, None)
148
149                synced_counter += len(result.events)
150                if not events:
151                    continue
152
153                last_event_id = events[-1].id
154
155                while events[0].id.session != events[-1].id.session:
156                    session_id = events[0].id.session
157                    session_events = collections.deque()
158
159                    while events[0].id.session == session_id:
160                        session_events.append(events.popleft())
161
162                    await self._backend.register(session_events)
163
164            if events:
165                await self._backend.register(events)
166
167            mlog.debug("processing cached notify events")
168            while self._events_queue:
169                events = [event for event in self._events_queue.popleft()
170                          if event.id > last_event_id]
171                if not events:
172                    continue
173
174                await self._backend.register(events)
175
176            self._events_queue = None
177
178            mlog.debug("synchronized %s events", synced_counter)
179            await self._set_synced(SyncedState.SYNCED, synced_counter)
180
181        except ConnectionError:
182            mlog.debug("connection closed")
183            self.close()
184
185        except Exception as e:
186            mlog.error("synchronization error: %s", e, exc_info=e)
187            self.close()
188
189    async def _set_synced(self, state, count):
190        self._synced = state
191
192        data = {'state': state.name}
193        if state == SyncedState.SYNCED:
194            data['count'] = count
195
196        await self._client.register([
197            common.RegisterEvent(
198                type=('event', str(self._local_server_id), 'synced',
199                      str(self._remote_server_id)),
200                source_timestamp=None,
201                payload=common.EventPayloadJson(data))])
202
203        if not self._synced_cb:
204            return
205
206        await aio.call(self._synced_cb, state, count)

Eventer client

For creating new client see create_eventer_client coroutine.

async_group: hat.aio.group.Group
81    @property
82    def async_group(self) -> aio.Group:
83        """Async group"""
84        return self._client.async_group

Async group

status: hat.event.common.Status
86    @property
87    def status(self) -> common.Status:
88        """Status"""
89        return self._client.status

Status

synced: SyncedState | None
91    @property
92    def synced(self) -> SyncedState | None:
93        """Synced state"""
94        return self._synced

Synced state