hat.event.server.eventer_client
1from collections.abc import Callable 2import collections 3import logging 4import typing 5 6from hat import aio 7from hat.drivers import tcp 8 9from hat.event import common 10from hat.event import eventer 11 12 13mlog: logging.Logger = logging.getLogger(__name__) 14"""Module logger""" 15 16SyncedCb: typing.TypeAlias = Callable[[common.ServerId, int], None] 17"""Synced callback""" 18 19 20async def create_eventer_client(addr: tcp.Address, 21 client_name: str, 22 server_id: common.ServerId, 23 backend: common.Backend, 24 *, 25 client_token: str | None = None, 26 synced_cb: SyncedCb | None = None, 27 **kwargs 28 ) -> 'EventerClient': 29 """Create eventer client""" 30 client = EventerClient() 31 client._server_id = server_id 32 client._backend = backend 33 client._synced_cb = synced_cb 34 client._synced = False 35 client._events_queue = collections.deque() 36 37 client._client = await eventer.connect(addr=addr, 38 client_name=client_name, 39 client_token=client_token, 40 subscriptions=[('*', )], 41 server_id=server_id, 42 persisted=True, 43 events_cb=client._on_events, 44 **kwargs) 45 46 try: 47 client.async_group.spawn(client._synchronize) 48 49 except BaseException: 50 await aio.uncancellable(client.async_close()) 51 raise 52 53 return client 54 55 56class EventerClient(aio.Resource): 57 """Eventer client 58 59 For creating new client see `create_eventer_client` coroutine. 60 61 """ 62 63 @property 64 def async_group(self) -> aio.Group: 65 """Async group""" 66 return self._client.async_group 67 68 @property 69 def synced(self) -> bool: 70 """Synced state""" 71 return self._synced 72 73 async def _on_events(self, client, events): 74 mlog.debug("received %s notify events", len(events)) 75 76 if self._events_queue is not None: 77 self._events_queue.append(events) 78 return 79 80 await self._backend.register(events) 81 82 async def _synchronize(self): 83 mlog.debug("starting synchronization") 84 85 try: 86 last_event_id = await self._backend.get_last_event_id( 87 self._server_id) 88 events = collections.deque() 89 result = common.QueryResult([], True) 90 synced_counter = 0 91 92 while result.more_follows: 93 params = common.QueryServerParams(server_id=self._server_id, 94 persisted=True, 95 last_event_id=last_event_id) 96 result = await self._client.query(params) 97 98 mlog.debug("received %s query events", len(result.events)) 99 events.extend(result.events) 100 synced_counter += len(result.events) 101 if not events: 102 continue 103 104 last_event_id = events[-1].id 105 106 while events[0].id.session != events[-1].id.session: 107 session_id = events[0].id.session 108 session_events = collections.deque() 109 110 while events[0].id.session == session_id: 111 session_events.append(events.popleft()) 112 113 await self._backend.register(session_events) 114 115 if events: 116 await self._backend.register(events) 117 118 mlog.debug("processing cached notify events") 119 while self._events_queue: 120 events = [event for event in self._events_queue.popleft() 121 if event.id > last_event_id] 122 if not events: 123 continue 124 125 await self._backend.register(events) 126 127 self._events_queue = None 128 self._synced = True 129 130 mlog.debug("synchronized %s events", synced_counter) 131 if self._synced_cb: 132 await aio.call(self._synced_cb, self._server_id, 133 synced_counter) 134 135 except ConnectionError: 136 mlog.debug("connection closed") 137 self.close() 138 139 except Exception as e: 140 mlog.error("synchronization error: %s", e, exc_info=e) 141 self.close()
Module logger
SyncedCb: TypeAlias =
collections.abc.Callable[[int, int], None]
Synced callback
async def
create_eventer_client( addr: hat.drivers.tcp.Address, client_name: str, server_id: int, backend: hat.event.common.backend.Backend, *, client_token: str | None = None, synced_cb: collections.abc.Callable[[int, int], None] | None = None, **kwargs) -> EventerClient:
21async def create_eventer_client(addr: tcp.Address, 22 client_name: str, 23 server_id: common.ServerId, 24 backend: common.Backend, 25 *, 26 client_token: str | None = None, 27 synced_cb: SyncedCb | None = None, 28 **kwargs 29 ) -> 'EventerClient': 30 """Create eventer client""" 31 client = EventerClient() 32 client._server_id = server_id 33 client._backend = backend 34 client._synced_cb = synced_cb 35 client._synced = False 36 client._events_queue = collections.deque() 37 38 client._client = await eventer.connect(addr=addr, 39 client_name=client_name, 40 client_token=client_token, 41 subscriptions=[('*', )], 42 server_id=server_id, 43 persisted=True, 44 events_cb=client._on_events, 45 **kwargs) 46 47 try: 48 client.async_group.spawn(client._synchronize) 49 50 except BaseException: 51 await aio.uncancellable(client.async_close()) 52 raise 53 54 return client
Create eventer client
class
EventerClient(hat.aio.group.Resource):
57class EventerClient(aio.Resource): 58 """Eventer client 59 60 For creating new client see `create_eventer_client` coroutine. 61 62 """ 63 64 @property 65 def async_group(self) -> aio.Group: 66 """Async group""" 67 return self._client.async_group 68 69 @property 70 def synced(self) -> bool: 71 """Synced state""" 72 return self._synced 73 74 async def _on_events(self, client, events): 75 mlog.debug("received %s notify events", len(events)) 76 77 if self._events_queue is not None: 78 self._events_queue.append(events) 79 return 80 81 await self._backend.register(events) 82 83 async def _synchronize(self): 84 mlog.debug("starting synchronization") 85 86 try: 87 last_event_id = await self._backend.get_last_event_id( 88 self._server_id) 89 events = collections.deque() 90 result = common.QueryResult([], True) 91 synced_counter = 0 92 93 while result.more_follows: 94 params = common.QueryServerParams(server_id=self._server_id, 95 persisted=True, 96 last_event_id=last_event_id) 97 result = await self._client.query(params) 98 99 mlog.debug("received %s query events", len(result.events)) 100 events.extend(result.events) 101 synced_counter += len(result.events) 102 if not events: 103 continue 104 105 last_event_id = events[-1].id 106 107 while events[0].id.session != events[-1].id.session: 108 session_id = events[0].id.session 109 session_events = collections.deque() 110 111 while events[0].id.session == session_id: 112 session_events.append(events.popleft()) 113 114 await self._backend.register(session_events) 115 116 if events: 117 await self._backend.register(events) 118 119 mlog.debug("processing cached notify events") 120 while self._events_queue: 121 events = [event for event in self._events_queue.popleft() 122 if event.id > last_event_id] 123 if not events: 124 continue 125 126 await self._backend.register(events) 127 128 self._events_queue = None 129 self._synced = True 130 131 mlog.debug("synchronized %s events", synced_counter) 132 if self._synced_cb: 133 await aio.call(self._synced_cb, self._server_id, 134 synced_counter) 135 136 except ConnectionError: 137 mlog.debug("connection closed") 138 self.close() 139 140 except Exception as e: 141 mlog.error("synchronization error: %s", e, exc_info=e) 142 self.close()
Eventer client
For creating new client see create_eventer_client
coroutine.
async_group: hat.aio.group.Group
64 @property 65 def async_group(self) -> aio.Group: 66 """Async group""" 67 return self._client.async_group
Async group
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close