hat.event.server.eventer_client
1import collections 2import contextlib 3import enum 4import logging 5import typing 6 7from hat import aio 8from hat.drivers import tcp 9 10from hat.event import common 11from hat.event import eventer 12 13 14mlog: logging.Logger = logging.getLogger(__name__) 15"""Module logger""" 16 17 18class SyncedState(enum.Enum): 19 """Synced state""" 20 CONNECTED = 0 21 SYNCING = 1 22 SYNCED = 2 23 24 25StatusCb: typing.TypeAlias = aio.AsyncCallable[[common.Status], None] 26"""Status callback""" 27 28SyncedCb: typing.TypeAlias = aio.AsyncCallable[[SyncedState, int | None], None] 29"""Synced callback""" 30 31 32async def create_eventer_client(addr: tcp.Address, 33 client_name: str, 34 local_server_id: common.ServerId, 35 remote_server_id: common.ServerId, 36 backend: common.Backend, 37 *, 38 client_token: str | None = None, 39 status_cb: StatusCb | None = None, 40 synced_cb: SyncedCb | None = None, 41 **kwargs 42 ) -> 'EventerClient': 43 """Create eventer client""" 44 client = EventerClient() 45 client._local_server_id = local_server_id 46 client._remote_server_id = remote_server_id 47 client._backend = backend 48 client._status_cb = status_cb 49 client._synced_cb = synced_cb 50 client._synced = None 51 client._events_queue = collections.deque() 52 53 client._client = await eventer.connect(addr=addr, 54 client_name=client_name, 55 client_token=client_token, 56 subscriptions=[('*', )], 57 server_id=remote_server_id, 58 persisted=True, 59 status_cb=client._on_status, 60 events_cb=client._on_events, 61 **kwargs) 62 63 try: 64 client.async_group.spawn(client._synchronize) 65 66 except BaseException: 67 await aio.uncancellable(client.async_close()) 68 raise 69 70 return client 71 72 73class EventerClient(aio.Resource): 74 """Eventer client 75 76 For creating new client see `create_eventer_client` coroutine. 77 78 """ 79 80 @property 81 def async_group(self) -> aio.Group: 82 """Async group""" 83 return self._client.async_group 84 85 @property 86 def status(self) -> common.Status: 87 """Status""" 88 return self._client.status 89 90 @property 91 def synced(self) -> SyncedState | None: 92 """Synced state""" 93 return self._synced 94 95 async def _on_status(self, client, status): 96 if status == common.Status.OPERATIONAL and self._synced: 97 data = {'state': self._synced.name} 98 if self._synced == SyncedState.SYNCED: 99 data['count'] = None 100 101 with contextlib.suppress(Exception): 102 await self._client.register([ 103 common.RegisterEvent( 104 type=('event', str(self._local_server_id), 'synced', 105 str(self._remote_server_id)), 106 source_timestamp=None, 107 payload=common.EventPayloadJson(data))]) 108 109 if not self._status_cb: 110 return 111 112 await aio.call(self._status_cb, status) 113 114 async def _on_events(self, client, events): 115 mlog.debug("received %s notify events", len(events)) 116 117 if self._events_queue is not None: 118 self._events_queue.append(events) 119 return 120 121 await self._backend.register(events) 122 123 async def _synchronize(self): 124 mlog.debug("starting synchronization") 125 126 try: 127 last_event_id = await self._backend.get_last_event_id( 128 self._remote_server_id) 129 events = collections.deque() 130 result = common.QueryResult([], True) 131 synced_counter = 0 132 133 await self._set_synced(SyncedState.CONNECTED, None) 134 135 while result.more_follows: 136 params = common.QueryServerParams( 137 server_id=self._remote_server_id, 138 persisted=True, 139 last_event_id=last_event_id) 140 result = await self._client.query(params) 141 142 mlog.debug("received %s query events", len(result.events)) 143 events.extend(result.events) 144 145 if result.events and synced_counter == 0: 146 await self._set_synced(SyncedState.SYNCING, None) 147 148 synced_counter += len(result.events) 149 if not events: 150 continue 151 152 last_event_id = events[-1].id 153 154 while events[0].id.session != events[-1].id.session: 155 session_id = events[0].id.session 156 session_events = collections.deque() 157 158 while events[0].id.session == session_id: 159 session_events.append(events.popleft()) 160 161 await self._backend.register(session_events) 162 163 if events: 164 await self._backend.register(events) 165 166 mlog.debug("processing cached notify events") 167 while self._events_queue: 168 events = [event for event in self._events_queue.popleft() 169 if event.id > last_event_id] 170 if not events: 171 continue 172 173 await self._backend.register(events) 174 175 self._events_queue = None 176 177 mlog.debug("synchronized %s events", synced_counter) 178 await self._set_synced(SyncedState.SYNCED, synced_counter) 179 180 except ConnectionError: 181 mlog.debug("connection closed") 182 self.close() 183 184 except Exception as e: 185 mlog.error("synchronization error: %s", e, exc_info=e) 186 self.close() 187 188 async def _set_synced(self, state, count): 189 self._synced = state 190 191 data = {'state': state.name} 192 if state == SyncedState.SYNCED: 193 data['count'] = count 194 195 await self._client.register([ 196 common.RegisterEvent( 197 type=('event', str(self._local_server_id), 'synced', 198 str(self._remote_server_id)), 199 source_timestamp=None, 200 payload=common.EventPayloadJson(data))]) 201 202 if not self._synced_cb: 203 return 204 205 await aio.call(self._synced_cb, state, count)
Module logger
class
SyncedState(enum.Enum):
Synced state
CONNECTED =
<SyncedState.CONNECTED: 0>
SYNCING =
<SyncedState.SYNCING: 1>
SYNCED =
<SyncedState.SYNCED: 2>
Status callback
Synced callback
async def
create_eventer_client( addr: hat.drivers.tcp.Address, client_name: str, local_server_id: int, remote_server_id: int, backend: hat.event.common.Backend, *, client_token: str | None = None, status_cb: Optional[Callable[[hat.event.common.Status], None | Awaitable[None]]] = None, synced_cb: Optional[Callable[[SyncedState, int | None], None | Awaitable[None]]] = None, **kwargs) -> EventerClient:
33async def create_eventer_client(addr: tcp.Address, 34 client_name: str, 35 local_server_id: common.ServerId, 36 remote_server_id: common.ServerId, 37 backend: common.Backend, 38 *, 39 client_token: str | None = None, 40 status_cb: StatusCb | None = None, 41 synced_cb: SyncedCb | None = None, 42 **kwargs 43 ) -> 'EventerClient': 44 """Create eventer client""" 45 client = EventerClient() 46 client._local_server_id = local_server_id 47 client._remote_server_id = remote_server_id 48 client._backend = backend 49 client._status_cb = status_cb 50 client._synced_cb = synced_cb 51 client._synced = None 52 client._events_queue = collections.deque() 53 54 client._client = await eventer.connect(addr=addr, 55 client_name=client_name, 56 client_token=client_token, 57 subscriptions=[('*', )], 58 server_id=remote_server_id, 59 persisted=True, 60 status_cb=client._on_status, 61 events_cb=client._on_events, 62 **kwargs) 63 64 try: 65 client.async_group.spawn(client._synchronize) 66 67 except BaseException: 68 await aio.uncancellable(client.async_close()) 69 raise 70 71 return client
Create eventer client
class
EventerClient(hat.aio.group.Resource):
74class EventerClient(aio.Resource): 75 """Eventer client 76 77 For creating new client see `create_eventer_client` coroutine. 78 79 """ 80 81 @property 82 def async_group(self) -> aio.Group: 83 """Async group""" 84 return self._client.async_group 85 86 @property 87 def status(self) -> common.Status: 88 """Status""" 89 return self._client.status 90 91 @property 92 def synced(self) -> SyncedState | None: 93 """Synced state""" 94 return self._synced 95 96 async def _on_status(self, client, status): 97 if status == common.Status.OPERATIONAL and self._synced: 98 data = {'state': self._synced.name} 99 if self._synced == SyncedState.SYNCED: 100 data['count'] = None 101 102 with contextlib.suppress(Exception): 103 await self._client.register([ 104 common.RegisterEvent( 105 type=('event', str(self._local_server_id), 'synced', 106 str(self._remote_server_id)), 107 source_timestamp=None, 108 payload=common.EventPayloadJson(data))]) 109 110 if not self._status_cb: 111 return 112 113 await aio.call(self._status_cb, status) 114 115 async def _on_events(self, client, events): 116 mlog.debug("received %s notify events", len(events)) 117 118 if self._events_queue is not None: 119 self._events_queue.append(events) 120 return 121 122 await self._backend.register(events) 123 124 async def _synchronize(self): 125 mlog.debug("starting synchronization") 126 127 try: 128 last_event_id = await self._backend.get_last_event_id( 129 self._remote_server_id) 130 events = collections.deque() 131 result = common.QueryResult([], True) 132 synced_counter = 0 133 134 await self._set_synced(SyncedState.CONNECTED, None) 135 136 while result.more_follows: 137 params = common.QueryServerParams( 138 server_id=self._remote_server_id, 139 persisted=True, 140 last_event_id=last_event_id) 141 result = await self._client.query(params) 142 143 mlog.debug("received %s query events", len(result.events)) 144 events.extend(result.events) 145 146 if result.events and synced_counter == 0: 147 await self._set_synced(SyncedState.SYNCING, None) 148 149 synced_counter += len(result.events) 150 if not events: 151 continue 152 153 last_event_id = events[-1].id 154 155 while events[0].id.session != events[-1].id.session: 156 session_id = events[0].id.session 157 session_events = collections.deque() 158 159 while events[0].id.session == session_id: 160 session_events.append(events.popleft()) 161 162 await self._backend.register(session_events) 163 164 if events: 165 await self._backend.register(events) 166 167 mlog.debug("processing cached notify events") 168 while self._events_queue: 169 events = [event for event in self._events_queue.popleft() 170 if event.id > last_event_id] 171 if not events: 172 continue 173 174 await self._backend.register(events) 175 176 self._events_queue = None 177 178 mlog.debug("synchronized %s events", synced_counter) 179 await self._set_synced(SyncedState.SYNCED, synced_counter) 180 181 except ConnectionError: 182 mlog.debug("connection closed") 183 self.close() 184 185 except Exception as e: 186 mlog.error("synchronization error: %s", e, exc_info=e) 187 self.close() 188 189 async def _set_synced(self, state, count): 190 self._synced = state 191 192 data = {'state': state.name} 193 if state == SyncedState.SYNCED: 194 data['count'] = count 195 196 await self._client.register([ 197 common.RegisterEvent( 198 type=('event', str(self._local_server_id), 'synced', 199 str(self._remote_server_id)), 200 source_timestamp=None, 201 payload=common.EventPayloadJson(data))]) 202 203 if not self._synced_cb: 204 return 205 206 await aio.call(self._synced_cb, state, count)
Eventer client
For creating new client see create_eventer_client
coroutine.
async_group: hat.aio.group.Group
81 @property 82 def async_group(self) -> aio.Group: 83 """Async group""" 84 return self._client.async_group
Async group
synced: SyncedState | None
91 @property 92 def synced(self) -> SyncedState | None: 93 """Synced state""" 94 return self._synced
Synced state