hat.event.mariner

 1from hat.event.mariner.client import (EventsCb,
 2                                      connect,
 3                                      Client)
 4from hat.event.mariner.server import (ServerConnectionCb,
 5                                      listen,
 6                                      Server,
 7                                      ServerConnection)
 8
 9
10__all__ = ['EventsCb',
11           'connect',
12           'Client',
13           'ServerConnectionCb',
14           'listen',
15           'Server',
16           'ServerConnection']
EventsCb = typing.Callable[[list[hat.event.common.data.Event]], typing.Optional[typing.Awaitable[NoneType]]]
async def connect( address: hat.drivers.tcp.Address, client_id: str, client_token: str | None = None, last_event_id: hat.event.common.data.EventId | None = None, subscriptions: list[typing.Tuple[str, ...]] = [], events_cb: Optional[Callable[[list[hat.event.common.data.Event]], Optional[Awaitable[NoneType]]]] = None, ping_delay: int = 30, ping_timeout: int = 10, **kwargs) -> Client:
21async def connect(address: tcp.Address,
22                  client_id: str,
23                  client_token: str | None = None,
24                  last_event_id: common.EventId | None = None,
25                  subscriptions: list[common.EventType] = [],
26                  events_cb: EventsCb | None = None,
27                  ping_delay: int = 30,
28                  ping_timeout: int = 10,
29                  **kwargs
30                  ) -> 'Client':
31    """Connect to mariner server
32
33    Additional arguments are passed directly to `hat.drivers.tcp.connect`.
34
35    """
36    conn = await tcp.connect(address, **kwargs)
37
38    try:
39        transport = Transport(conn)
40
41        msg = common.InitMsg(client_id=client_id,
42                             client_token=client_token,
43                             last_event_id=last_event_id,
44                             subscriptions=subscriptions)
45        await transport.send(msg)
46
47        return Client(transport, events_cb, ping_delay, ping_timeout)
48
49    except BaseException:
50        await aio.uncancellable(conn.async_close())
51        raise

Connect to mariner server

Additional arguments are passed directly to hat.drivers.tcp.connect.

class Client(hat.aio.group.Resource):
 54class Client(aio.Resource):
 55    """Mariner client
 56
 57    For creation of new instance see `connect` coroutine.
 58
 59    """
 60
 61    def __init__(self,
 62                 transport: Transport,
 63                 events_cb: EventsCb | None,
 64                 ping_delay: int,
 65                 ping_timeout: int):
 66        self._transport = transport
 67        self._events_cb = events_cb
 68        self._ping_delay = ping_delay
 69        self._ping_timeout = ping_timeout
 70        self._ping_event = asyncio.Event()
 71
 72        self.async_group.spawn(self._receive_loop)
 73        self.async_group.spawn(self._ping_loop)
 74
 75    @property
 76    def async_group(self) -> aio.Group:
 77        """Async group"""
 78        return self._transport.async_group
 79
 80    async def _receive_loop(self):
 81        try:
 82            mlog.debug("starting receive loop")
 83
 84            while True:
 85                msg = await self._transport.receive()
 86                self._ping_event.set()
 87
 88                if isinstance(msg, common.PingMsg):
 89                    await self._transport.send(common.PongMsg())
 90
 91                elif isinstance(msg, common.PongMsg):
 92                    pass
 93
 94                elif isinstance(msg, common.EventsMsg):
 95                    if self._events_cb:
 96                        await aio.call(self._events_cb, msg.events)
 97
 98                else:
 99                    raise Exception("unsupported msg: %s", msg)
100
101        except ConnectionError:
102            pass
103
104        except Exception as e:
105            mlog.error("receive loop error: %s", e, exc_info=e)
106
107        finally:
108            mlog.debug("stopping receive loop")
109            self.close()
110
111    async def _ping_loop(self):
112        try:
113            mlog.debug("starting ping loop %s", id(self))
114
115            while True:
116                self._ping_event.clear()
117
118                with contextlib.suppress(asyncio.TimeoutError):
119                    await aio.wait_for(self._ping_event.wait(),
120                                       self._ping_delay)
121                    continue
122
123                await self._transport.send(common.PingMsg())
124                await aio.wait_for(self._ping_event.wait(),
125                                   self._ping_timeout)
126
127        except ConnectionError:
128            pass
129
130        except asyncio.TimeoutError:
131            mlog.debug("ping timeout")
132
133        except Exception as e:
134            mlog.error("ping loop error: %s", e, exc_info=e)
135
136        finally:
137            mlog.debug("stopping ping loop")
138            self.close()

Mariner client

For creation of new instance see connect coroutine.

Client( transport: hat.event.mariner.transport.Transport, events_cb: Optional[Callable[[list[hat.event.common.data.Event]], Optional[Awaitable[NoneType]]]], ping_delay: int, ping_timeout: int)
61    def __init__(self,
62                 transport: Transport,
63                 events_cb: EventsCb | None,
64                 ping_delay: int,
65                 ping_timeout: int):
66        self._transport = transport
67        self._events_cb = events_cb
68        self._ping_delay = ping_delay
69        self._ping_timeout = ping_timeout
70        self._ping_event = asyncio.Event()
71
72        self.async_group.spawn(self._receive_loop)
73        self.async_group.spawn(self._ping_loop)
async_group: hat.aio.group.Group

Async group

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
ServerConnectionCb = typing.Callable[[ForwardRef('ServerConnection')], typing.Optional[typing.Awaitable[NoneType]]]
async def listen( address: hat.drivers.tcp.Address, connection_cb: Callable[[ServerConnection], Optional[Awaitable[NoneType]]], ping_delay: int = 30, ping_timeout: int = 10, subscriptions: list[typing.Tuple[str, ...]] = ['*'], *, bind_connections: bool = True, **kwargs) -> Server:
22async def listen(address: tcp.Address,
23                 connection_cb: ServerConnectionCb,
24                 ping_delay: int = 30,
25                 ping_timeout: int = 10,
26                 subscriptions: list[common.EventType] = [('*')],
27                 *,
28                 bind_connections: bool = True,
29                 **kwargs
30                 ) -> 'Server':
31    """Create listening server
32
33    Additional arguments are passed directly to `hat.drivers.tcp.listen`.
34
35    """
36    server = Server()
37    server._connection_cb = connection_cb
38    server._ping_delay = ping_delay
39    server._ping_timeout = ping_timeout
40    server._subscription = common.Subscription(subscriptions)
41
42    server._server = await tcp.listen(server._on_connection, address,
43                                      bind_connections=bind_connections,
44                                      **kwargs)
45
46    mlog.debug('listening on %s', address)
47    return server

Create listening server

Additional arguments are passed directly to hat.drivers.tcp.listen.

class Server(hat.aio.group.Resource):
50class Server(aio.Resource):
51    """Mariner server"""
52
53    @property
54    def async_group(self) -> aio.Group:
55        """Async group"""
56        return self._server.async_group
57
58    async def _on_connection(self, conn):
59        try:
60            transport = Transport(conn)
61
62            msg = await transport.receive()
63
64            if not isinstance(msg, common.InitMsg):
65                raise Exception('invalid initialization')
66
67            subscription = self._subscription.intersection(
68                common.Subscription(msg.subscriptions))
69
70            srv_conn = ServerConnection()
71            srv_conn._transport = transport
72            srv_conn._ping_delay = self._ping_delay
73            srv_conn._ping_timeout = self._ping_timeout
74            srv_conn._client_id = msg.client_id
75            srv_conn._client_token = msg.client_token
76            srv_conn._last_event_id = msg.last_event_id
77            srv_conn._subscription = subscription
78            srv_conn._ping_event = asyncio.Event()
79
80            srv_conn.async_group.spawn(srv_conn._receive_loop)
81            srv_conn.async_group.spawn(srv_conn._ping_loop)
82
83            await aio.call(self._connection_cb, srv_conn)
84
85        except Exception as e:
86            mlog.error("on connection error: %s", e, exc_info=e)
87            conn.close()

Mariner server

async_group: hat.aio.group.Group

Async group

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
class ServerConnection(hat.aio.group.Resource):
 90class ServerConnection(aio.Resource):
 91    """Mariner server connection"""
 92
 93    @property
 94    def async_group(self) -> aio.Group:
 95        """Async group"""
 96        return self._transport.async_group
 97
 98    @property
 99    def client_id(self) -> str:
100        """Client id"""
101        return self._client_id
102
103    @property
104    def client_token(self) -> str | None:
105        """Client token"""
106        return self._client_token
107
108    @property
109    def last_event_id(self) -> common.EventId | None:
110        """Laste event id"""
111        return self._last_event_id
112
113    @property
114    def subscription(self) -> common.Subscription:
115        """Subscription"""
116        return self._subscription
117
118    async def send_events(self, events: list[common.Event]):
119        """Send events"""
120        await self._transport.send(common.EventsMsg(events=events))
121
122    async def _receive_loop(self):
123        try:
124            mlog.debug("starting receive loop")
125
126            while True:
127                msg = await self._transport.receive()
128                self._ping_event.set()
129
130                if isinstance(msg, common.PingMsg):
131                    await self._transport.send(common.PongMsg())
132
133                elif isinstance(msg, common.PongMsg):
134                    pass
135
136                else:
137                    raise Exception("unsupported msg: %s", msg)
138
139        except ConnectionError:
140            pass
141
142        except Exception as e:
143            mlog.error("receive loop error: %s", e, exc_info=e)
144
145        finally:
146            mlog.debug("stopping receive loop")
147            self.close()
148
149    async def _ping_loop(self):
150        try:
151            mlog.debug("starting ping loop")
152
153            while True:
154                self._ping_event.clear()
155
156                with contextlib.suppress(asyncio.TimeoutError):
157                    await aio.wait_for(self._ping_event.wait(),
158                                       self._ping_delay)
159                    continue
160
161                await self._transport.send(common.PingMsg())
162
163                await aio.wait_for(self._ping_event.wait(),
164                                   self._ping_timeout)
165
166        except ConnectionError:
167            pass
168
169        except asyncio.TimeoutError:
170            mlog.debug("ping timeout")
171
172        except Exception as e:
173            mlog.error("ping loop error: %s", e, exc_info=e)
174
175        finally:
176            mlog.debug("stopping ping loop")
177            self.close()

Mariner server connection

async_group: hat.aio.group.Group

Async group

client_id: str

Client id

client_token: str | None

Client token

last_event_id: hat.event.common.data.EventId | None

Laste event id

subscription: hat.event.common.subscription.csubscription.CSubscription

Subscription

async def send_events(self, events: list[hat.event.common.data.Event]):
118    async def send_events(self, events: list[common.Event]):
119        """Send events"""
120        await self._transport.send(common.EventsMsg(events=events))

Send events

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close