hat.event.syncer_client

  1import asyncio
  2import enum
  3import functools
  4import logging
  5import typing
  6
  7from hat import aio
  8from hat import chatter
  9from hat import util
 10from hat.event.server import common
 11import hat.event.common.data
 12import hat.monitor.client
 13
 14
 15mlog: logging.Logger = logging.getLogger(__name__)
 16"""Module logger"""
 17
 18
 19reconnect_delay: float = 10
 20"""Reconnect delay"""
 21
 22
 23class SyncerClientState(enum.Enum):
 24    """Connection state"""
 25    CONNECTED = 0
 26    SYNCED = 1
 27    DISCONNECTED = 2
 28
 29
 30ServerId = int
 31"""Server identifier"""
 32
 33StateCb = typing.Callable[[ServerId, SyncerClientState], None]
 34"""Connection state callback"""
 35
 36EventsCb = typing.Callable[[ServerId, typing.List[common.Event]], None]
 37"""Events callback"""
 38
 39
 40async def create_syncer_client(backend: common.Backend,
 41                               monitor_client: hat.monitor.client.Client,
 42                               monitor_group: str,
 43                               name: str,
 44                               syncer_token: typing.Optional[str] = None,
 45                               **kwargs
 46                               ) -> 'SyncerClient':
 47    """Create syncer client
 48
 49    Args:
 50        backend: backend
 51        monitor_client: monitor client
 52        monitor_group: monitor group name
 53        name: client name
 54        syncer_token: syncer token
 55        kwargs: additional arguments passed to `hat.chatter.connect` coroutine
 56
 57    """
 58    cli = SyncerClient()
 59    cli._backend = backend
 60    cli._monitor_client = monitor_client
 61    cli._monitor_group = monitor_group
 62    cli._name = name
 63    cli._syncer_token = syncer_token
 64    cli._kwargs = kwargs
 65    cli._conns = {}
 66    cli._state_cbs = util.CallbackRegistry()
 67    cli._events_cbs = util.CallbackRegistry()
 68    cli._async_group = aio.Group()
 69
 70    cli.async_group.spawn(cli._monitor_client_loop)
 71
 72    return cli
 73
 74
 75class SyncerClient(aio.Resource):
 76
 77    @property
 78    def async_group(self) -> aio.Group:
 79        """Async group"""
 80        return self._async_group
 81
 82    @property
 83    def servers_synced(self) -> typing.List[int]:
 84        """server_ids of all servers that client is synced with"""
 85        return [srv_id for srv_id, conn in self._conns.items() if conn.synced]
 86
 87    def register_state_cb(self,
 88                          cb: StateCb
 89                          ) -> util.RegisterCallbackHandle:
 90        """Register client state callback"""
 91        return self._state_cbs.register(cb)
 92
 93    def register_events_cb(self,
 94                           cb: EventsCb
 95                           ) -> util.RegisterCallbackHandle:
 96        """Register events callback"""
 97        return self._events_cbs.register(cb)
 98
 99    async def _monitor_client_loop(self):
100        try:
101            changes = aio.Queue()
102            change_cb = functools.partial(changes.put_nowait, None)
103            with self._monitor_client.register_change_cb(change_cb):
104
105                while True:
106                    mlog.debug("filtering syncer server addresses")
107                    server_id_addresses = {}
108                    for info in self._monitor_client.components:
109                        if not (info.group == self._monitor_group and
110                                info != self._monitor_client.info and
111                                info.data and
112                                'server_id' in info.data and
113                                'syncer_server_address' in info.data):
114                            continue
115                        if (self._syncer_token is not None and
116                                self._syncer_token != info.data.get('syncer_token')):  # NOQA
117                            mlog.warning("syncer tokens not equal, server %s "
118                                         "ignored for syncer connection to %s",
119                                         info.data['server_id'],
120                                         info.data['syncer_server_address'])
121                            continue
122                        server_id_addresses[info.data['server_id']] = info.data['syncer_server_address']  # NOQA
123
124                    for server_id, address in server_id_addresses.items():
125                        conn = self._conns.get(server_id)
126                        if conn:
127                            if conn.is_open and conn.address == address:
128                                continue
129
130                            mlog.debug("closing existing connection")
131                            await conn.async_close()
132                            self._conns.pop(server_id)
133
134                        mlog.debug("creating new connection")
135                        state_cb = functools.partial(self._state_cbs.notify,
136                                                     server_id)
137                        events_cb = functools.partial(self._events_cbs.notify,
138                                                      server_id)
139                        conn = _Connection(
140                            async_group=self.async_group.create_subgroup(),
141                            backend=self._backend,
142                            server_id=server_id,
143                            address=address,
144                            client_name=self._name,
145                            kwargs=self._kwargs,
146                            state_cb=state_cb,
147                            events_cb=events_cb)
148
149                        self._conns[server_id] = conn
150
151                    await changes.get_until_empty()
152
153        except Exception as e:
154            mlog.error("monitor client loop error: %s", e, exc_info=e)
155
156        finally:
157            self.close()
158
159
160class _Connection(aio.Resource):
161
162    def __init__(self, async_group, backend, server_id, address, client_name,
163                 kwargs, state_cb, events_cb):
164        self._async_group = async_group
165        self._backend = backend
166        self._server_id = server_id
167        self._address = address
168        self._client_name = client_name
169        self._kwargs = kwargs
170        self._state_cb = state_cb
171        self._events_cb = events_cb
172        self._synced = False
173
174        self.async_group.spawn(self._connection_loop)
175
176    @property
177    def async_group(self):
178        return self._async_group
179
180    @property
181    def address(self):
182        return self._address
183
184    @property
185    def synced(self):
186        return self._synced
187
188    async def _connection_loop(self):
189        while True:
190            try:
191                mlog.debug("connecting to syncer server")
192                conn = await chatter.connect(common.sbs_repo, self._address,
193                                             **self._kwargs)
194
195            except Exception:
196                mlog.debug("can not connect to syncer server")
197                await asyncio.sleep(reconnect_delay)
198                continue
199
200            try:
201                last_event_id = await self._backend.get_last_event_id(
202                    self._server_id)
203                msg_data = chatter.Data(
204                    module='HatSyncer',
205                    type='MsgReq',
206                    data=hat.event.common.data.syncer_req_to_sbs(
207                         hat.event.common.data.SyncerReq(last_event_id,
208                                                         self._client_name)))
209                conn.send(msg_data)
210
211                self._state_cb(SyncerClientState.CONNECTED)
212                self._synced = False
213                try:
214                    while True:
215                        mlog.debug("waiting for incoming message")
216                        msg = await conn.receive()
217                        msg_type = msg.data.module, msg.data.type
218
219                        if msg_type == ('HatSyncer', 'MsgEvents'):
220                            mlog.debug("received events")
221                            events = [common.event_from_sbs(i)
222                                      for i in msg.data.data]
223                            await self._backend.register(events)
224                            self._events_cb(events)
225
226                        elif msg_type == ('HatSyncer', 'MsgSynced'):
227                            mlog.debug("received synced")
228                            self._state_cb(SyncerClientState.SYNCED)
229                            self._synced = True
230
231                        elif msg_type == ('HatSyncer', 'MsgFlushReq'):
232                            mlog.debug("received flush request")
233                            conn.send(chatter.Data(module='HatSyncer',
234                                                   type='MsgFlushRes',
235                                                   data=None),
236                                      conv=msg.conv)
237
238                        else:
239                            raise Exception("unsupported message type")
240
241                finally:
242                    self._state_cb(SyncerClientState.DISCONNECTED)
243                    self._synced = False
244
245            except ConnectionError:
246                pass
247
248            except Exception as e:
249                mlog.error("connection loop error: %s", e, exc_info=e)
250
251            finally:
252                await aio.uncancellable(conn.async_close())
mlog: logging.Logger = <Logger hat.event.syncer_client (WARNING)>

Module logger

reconnect_delay: float = 10

Reconnect delay

class SyncerClientState(enum.Enum):
24class SyncerClientState(enum.Enum):
25    """Connection state"""
26    CONNECTED = 0
27    SYNCED = 1
28    DISCONNECTED = 2

Connection state

CONNECTED = <SyncerClientState.CONNECTED: 0>
SYNCED = <SyncerClientState.SYNCED: 1>
DISCONNECTED = <SyncerClientState.DISCONNECTED: 2>
Inherited Members
enum.Enum
name
value
ServerId = <class 'int'>

Server identifier

StateCb = typing.Callable[[int, hat.event.syncer_client.SyncerClientState], NoneType]

Connection state callback

EventsCb = typing.Callable[[int, typing.List[hat.event.common.data.Event]], NoneType]

Events callback

async def create_syncer_client( backend: hat.event.server.common.Backend, monitor_client: hat.monitor.client.Client, monitor_group: str, name: str, syncer_token: Optional[str] = None, **kwargs) -> hat.event.syncer_client.SyncerClient:
41async def create_syncer_client(backend: common.Backend,
42                               monitor_client: hat.monitor.client.Client,
43                               monitor_group: str,
44                               name: str,
45                               syncer_token: typing.Optional[str] = None,
46                               **kwargs
47                               ) -> 'SyncerClient':
48    """Create syncer client
49
50    Args:
51        backend: backend
52        monitor_client: monitor client
53        monitor_group: monitor group name
54        name: client name
55        syncer_token: syncer token
56        kwargs: additional arguments passed to `hat.chatter.connect` coroutine
57
58    """
59    cli = SyncerClient()
60    cli._backend = backend
61    cli._monitor_client = monitor_client
62    cli._monitor_group = monitor_group
63    cli._name = name
64    cli._syncer_token = syncer_token
65    cli._kwargs = kwargs
66    cli._conns = {}
67    cli._state_cbs = util.CallbackRegistry()
68    cli._events_cbs = util.CallbackRegistry()
69    cli._async_group = aio.Group()
70
71    cli.async_group.spawn(cli._monitor_client_loop)
72
73    return cli

Create syncer client

Arguments:
  • backend: backend
  • monitor_client: monitor client
  • monitor_group: monitor group name
  • name: client name
  • syncer_token: syncer token
  • kwargs: additional arguments passed to hat.chatter.connect coroutine
class SyncerClient(hat.aio.Resource):
 76class SyncerClient(aio.Resource):
 77
 78    @property
 79    def async_group(self) -> aio.Group:
 80        """Async group"""
 81        return self._async_group
 82
 83    @property
 84    def servers_synced(self) -> typing.List[int]:
 85        """server_ids of all servers that client is synced with"""
 86        return [srv_id for srv_id, conn in self._conns.items() if conn.synced]
 87
 88    def register_state_cb(self,
 89                          cb: StateCb
 90                          ) -> util.RegisterCallbackHandle:
 91        """Register client state callback"""
 92        return self._state_cbs.register(cb)
 93
 94    def register_events_cb(self,
 95                           cb: EventsCb
 96                           ) -> util.RegisterCallbackHandle:
 97        """Register events callback"""
 98        return self._events_cbs.register(cb)
 99
100    async def _monitor_client_loop(self):
101        try:
102            changes = aio.Queue()
103            change_cb = functools.partial(changes.put_nowait, None)
104            with self._monitor_client.register_change_cb(change_cb):
105
106                while True:
107                    mlog.debug("filtering syncer server addresses")
108                    server_id_addresses = {}
109                    for info in self._monitor_client.components:
110                        if not (info.group == self._monitor_group and
111                                info != self._monitor_client.info and
112                                info.data and
113                                'server_id' in info.data and
114                                'syncer_server_address' in info.data):
115                            continue
116                        if (self._syncer_token is not None and
117                                self._syncer_token != info.data.get('syncer_token')):  # NOQA
118                            mlog.warning("syncer tokens not equal, server %s "
119                                         "ignored for syncer connection to %s",
120                                         info.data['server_id'],
121                                         info.data['syncer_server_address'])
122                            continue
123                        server_id_addresses[info.data['server_id']] = info.data['syncer_server_address']  # NOQA
124
125                    for server_id, address in server_id_addresses.items():
126                        conn = self._conns.get(server_id)
127                        if conn:
128                            if conn.is_open and conn.address == address:
129                                continue
130
131                            mlog.debug("closing existing connection")
132                            await conn.async_close()
133                            self._conns.pop(server_id)
134
135                        mlog.debug("creating new connection")
136                        state_cb = functools.partial(self._state_cbs.notify,
137                                                     server_id)
138                        events_cb = functools.partial(self._events_cbs.notify,
139                                                      server_id)
140                        conn = _Connection(
141                            async_group=self.async_group.create_subgroup(),
142                            backend=self._backend,
143                            server_id=server_id,
144                            address=address,
145                            client_name=self._name,
146                            kwargs=self._kwargs,
147                            state_cb=state_cb,
148                            events_cb=events_cb)
149
150                        self._conns[server_id] = conn
151
152                    await changes.get_until_empty()
153
154        except Exception as e:
155            mlog.error("monitor client loop error: %s", e, exc_info=e)
156
157        finally:
158            self.close()

Resource with lifetime control based on Group.

SyncerClient()
async_group: hat.aio.Group

Async group

servers_synced: List[int]

server_ids of all servers that client is synced with

def register_state_cb( self, cb: Callable[[int, hat.event.syncer_client.SyncerClientState], NoneType]) -> hat.util.RegisterCallbackHandle:
88    def register_state_cb(self,
89                          cb: StateCb
90                          ) -> util.RegisterCallbackHandle:
91        """Register client state callback"""
92        return self._state_cbs.register(cb)

Register client state callback

def register_events_cb( self, cb: Callable[[int, List[hat.event.common.data.Event]], NoneType]) -> hat.util.RegisterCallbackHandle:
94    def register_events_cb(self,
95                           cb: EventsCb
96                           ) -> util.RegisterCallbackHandle:
97        """Register events callback"""
98        return self._events_cbs.register(cb)

Register events callback

Inherited Members
hat.aio.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close