hat.event.mariner
EventsCb =
typing.Callable[[list[hat.event.common.data.Event]], typing.Optional[typing.Awaitable[NoneType]]]
async def
connect( address: hat.drivers.tcp.Address, client_id: str, client_token: str | None = None, last_event_id: hat.event.common.data.EventId | None = None, subscriptions: list[typing.Tuple[str, ...]] = [], events_cb: Optional[Callable[[list[hat.event.common.data.Event]], Optional[Awaitable[NoneType]]]] = None, ping_delay: int = 30, ping_timeout: int = 10, **kwargs) -> Client:
21async def connect(address: tcp.Address, 22 client_id: str, 23 client_token: str | None = None, 24 last_event_id: common.EventId | None = None, 25 subscriptions: list[common.EventType] = [], 26 events_cb: EventsCb | None = None, 27 ping_delay: int = 30, 28 ping_timeout: int = 10, 29 **kwargs 30 ) -> 'Client': 31 """Connect to mariner server 32 33 Additional arguments are passed directly to `hat.drivers.tcp.connect`. 34 35 """ 36 conn = await tcp.connect(address, **kwargs) 37 38 try: 39 transport = Transport(conn) 40 41 msg = common.InitMsg(client_id=client_id, 42 client_token=client_token, 43 last_event_id=last_event_id, 44 subscriptions=subscriptions) 45 await transport.send(msg) 46 47 return Client(transport, events_cb, ping_delay, ping_timeout) 48 49 except BaseException: 50 await aio.uncancellable(conn.async_close()) 51 raise
Connect to mariner server
Additional arguments are passed directly to hat.drivers.tcp.connect
.
class
Client(hat.aio.group.Resource):
54class Client(aio.Resource): 55 """Mariner client 56 57 For creation of new instance see `connect` coroutine. 58 59 """ 60 61 def __init__(self, 62 transport: Transport, 63 events_cb: EventsCb | None, 64 ping_delay: int, 65 ping_timeout: int): 66 self._transport = transport 67 self._events_cb = events_cb 68 self._ping_delay = ping_delay 69 self._ping_timeout = ping_timeout 70 self._ping_event = asyncio.Event() 71 72 self.async_group.spawn(self._receive_loop) 73 self.async_group.spawn(self._ping_loop) 74 75 @property 76 def async_group(self) -> aio.Group: 77 """Async group""" 78 return self._transport.async_group 79 80 async def _receive_loop(self): 81 try: 82 mlog.debug("starting receive loop") 83 84 while True: 85 msg = await self._transport.receive() 86 self._ping_event.set() 87 88 if isinstance(msg, common.PingMsg): 89 await self._transport.send(common.PongMsg()) 90 91 elif isinstance(msg, common.PongMsg): 92 pass 93 94 elif isinstance(msg, common.EventsMsg): 95 if self._events_cb: 96 await aio.call(self._events_cb, msg.events) 97 98 else: 99 raise Exception("unsupported msg: %s", msg) 100 101 except ConnectionError: 102 pass 103 104 except Exception as e: 105 mlog.error("receive loop error: %s", e, exc_info=e) 106 107 finally: 108 mlog.debug("stopping receive loop") 109 self.close() 110 111 async def _ping_loop(self): 112 try: 113 mlog.debug("starting ping loop %s", id(self)) 114 115 while True: 116 self._ping_event.clear() 117 118 with contextlib.suppress(asyncio.TimeoutError): 119 await aio.wait_for(self._ping_event.wait(), 120 self._ping_delay) 121 continue 122 123 await self._transport.send(common.PingMsg()) 124 await aio.wait_for(self._ping_event.wait(), 125 self._ping_timeout) 126 127 except ConnectionError: 128 pass 129 130 except asyncio.TimeoutError: 131 mlog.debug("ping timeout") 132 133 except Exception as e: 134 mlog.error("ping loop error: %s", e, exc_info=e) 135 136 finally: 137 mlog.debug("stopping ping loop") 138 self.close()
Mariner client
For creation of new instance see connect
coroutine.
Client( transport: hat.event.mariner.transport.Transport, events_cb: Optional[Callable[[list[hat.event.common.data.Event]], Optional[Awaitable[NoneType]]]], ping_delay: int, ping_timeout: int)
61 def __init__(self, 62 transport: Transport, 63 events_cb: EventsCb | None, 64 ping_delay: int, 65 ping_timeout: int): 66 self._transport = transport 67 self._events_cb = events_cb 68 self._ping_delay = ping_delay 69 self._ping_timeout = ping_timeout 70 self._ping_event = asyncio.Event() 71 72 self.async_group.spawn(self._receive_loop) 73 self.async_group.spawn(self._ping_loop)
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close
ServerConnectionCb =
typing.Callable[[ForwardRef('ServerConnection')], typing.Optional[typing.Awaitable[NoneType]]]
async def
listen( address: hat.drivers.tcp.Address, connection_cb: Callable[[ServerConnection], Optional[Awaitable[NoneType]]], ping_delay: int = 30, ping_timeout: int = 10, subscriptions: list[typing.Tuple[str, ...]] = ['*'], *, bind_connections: bool = True, **kwargs) -> Server:
22async def listen(address: tcp.Address, 23 connection_cb: ServerConnectionCb, 24 ping_delay: int = 30, 25 ping_timeout: int = 10, 26 subscriptions: list[common.EventType] = [('*')], 27 *, 28 bind_connections: bool = True, 29 **kwargs 30 ) -> 'Server': 31 """Create listening server 32 33 Additional arguments are passed directly to `hat.drivers.tcp.listen`. 34 35 """ 36 server = Server() 37 server._connection_cb = connection_cb 38 server._ping_delay = ping_delay 39 server._ping_timeout = ping_timeout 40 server._subscription = common.Subscription(subscriptions) 41 42 server._server = await tcp.listen(server._on_connection, address, 43 bind_connections=bind_connections, 44 **kwargs) 45 46 mlog.debug('listening on %s', address) 47 return server
Create listening server
Additional arguments are passed directly to hat.drivers.tcp.listen
.
class
Server(hat.aio.group.Resource):
50class Server(aio.Resource): 51 """Mariner server""" 52 53 @property 54 def async_group(self) -> aio.Group: 55 """Async group""" 56 return self._server.async_group 57 58 async def _on_connection(self, conn): 59 try: 60 transport = Transport(conn) 61 62 msg = await transport.receive() 63 64 if not isinstance(msg, common.InitMsg): 65 raise Exception('invalid initialization') 66 67 subscription = self._subscription.intersection( 68 common.Subscription(msg.subscriptions)) 69 70 srv_conn = ServerConnection() 71 srv_conn._transport = transport 72 srv_conn._ping_delay = self._ping_delay 73 srv_conn._ping_timeout = self._ping_timeout 74 srv_conn._client_id = msg.client_id 75 srv_conn._client_token = msg.client_token 76 srv_conn._last_event_id = msg.last_event_id 77 srv_conn._subscription = subscription 78 srv_conn._ping_event = asyncio.Event() 79 80 srv_conn.async_group.spawn(srv_conn._receive_loop) 81 srv_conn.async_group.spawn(srv_conn._ping_loop) 82 83 await aio.call(self._connection_cb, srv_conn) 84 85 except Exception as e: 86 mlog.error("on connection error: %s", e, exc_info=e) 87 conn.close()
Mariner server
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close
class
ServerConnection(hat.aio.group.Resource):
90class ServerConnection(aio.Resource): 91 """Mariner server connection""" 92 93 @property 94 def async_group(self) -> aio.Group: 95 """Async group""" 96 return self._transport.async_group 97 98 @property 99 def client_id(self) -> str: 100 """Client id""" 101 return self._client_id 102 103 @property 104 def client_token(self) -> str | None: 105 """Client token""" 106 return self._client_token 107 108 @property 109 def last_event_id(self) -> common.EventId | None: 110 """Laste event id""" 111 return self._last_event_id 112 113 @property 114 def subscription(self) -> common.Subscription: 115 """Subscription""" 116 return self._subscription 117 118 async def send_events(self, events: list[common.Event]): 119 """Send events""" 120 await self._transport.send(common.EventsMsg(events=events)) 121 122 async def _receive_loop(self): 123 try: 124 mlog.debug("starting receive loop") 125 126 while True: 127 msg = await self._transport.receive() 128 self._ping_event.set() 129 130 if isinstance(msg, common.PingMsg): 131 await self._transport.send(common.PongMsg()) 132 133 elif isinstance(msg, common.PongMsg): 134 pass 135 136 else: 137 raise Exception("unsupported msg: %s", msg) 138 139 except ConnectionError: 140 pass 141 142 except Exception as e: 143 mlog.error("receive loop error: %s", e, exc_info=e) 144 145 finally: 146 mlog.debug("stopping receive loop") 147 self.close() 148 149 async def _ping_loop(self): 150 try: 151 mlog.debug("starting ping loop") 152 153 while True: 154 self._ping_event.clear() 155 156 with contextlib.suppress(asyncio.TimeoutError): 157 await aio.wait_for(self._ping_event.wait(), 158 self._ping_delay) 159 continue 160 161 await self._transport.send(common.PingMsg()) 162 163 await aio.wait_for(self._ping_event.wait(), 164 self._ping_timeout) 165 166 except ConnectionError: 167 pass 168 169 except asyncio.TimeoutError: 170 mlog.debug("ping timeout") 171 172 except Exception as e: 173 mlog.error("ping loop error: %s", e, exc_info=e) 174 175 finally: 176 mlog.debug("stopping ping loop") 177 self.close()
Mariner server connection
async def
send_events(self, events: list[hat.event.common.data.Event]):
118 async def send_events(self, events: list[common.Event]): 119 """Send events""" 120 await self._transport.send(common.EventsMsg(events=events))
Send events
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close