hat.event.component
Eventer Component
1"""Eventer Component""" 2 3from collections.abc import Collection 4import asyncio 5import contextlib 6import logging 7import types 8import typing 9 10from hat import aio 11from hat import json 12from hat import util 13from hat.drivers import tcp 14import hat.monitor.component 15 16from hat.event import common 17import hat.event.eventer.client 18 19 20mlog: logging.Logger = logging.getLogger(__name__) 21"""Module logger""" 22 23State: typing.TypeAlias = hat.monitor.component.State 24"""Component state""" 25 26Runner: typing.TypeAlias = aio.Resource 27"""Component runner""" 28 29RunnerCb: typing.TypeAlias = aio.AsyncCallable[ 30 ['Component', 'ServerData', hat.event.eventer.client.Client], 31 Runner] 32"""Runner callback""" 33 34StateCb: typing.TypeAlias = aio.AsyncCallable[['Component', State], None] 35"""State callback""" 36 37CloseReqCb: typing.TypeAlias = aio.AsyncCallable[['Component'], None] 38"""Close request callback""" 39 40StatusCb: typing.TypeAlias = aio.AsyncCallable[ 41 ['Component', hat.event.eventer.client.Client, common.Status], 42 None] 43"""Status callback""" 44 45EventsCb: typing.TypeAlias = aio.AsyncCallable[ 46 ['Component', hat.event.eventer.client.Client, Collection[common.Event]], 47 None] 48"""Events callback""" 49 50 51class ServerData(typing.NamedTuple): 52 """Server data""" 53 server_id: common.ServerId 54 addr: tcp.Address 55 server_token: str | None 56 57 58async def connect(addr: tcp.Address, 59 name: str, 60 group: str, 61 server_group: str, 62 client_name: str, 63 runner_cb: RunnerCb, 64 *, 65 state_cb: StateCb | None = None, 66 close_req_cb: CloseReqCb | None = None, 67 status_cb: StatusCb | None = None, 68 events_cb: EventsCb | None = None, 69 server_data_queue_size: int = 1024, 70 reconnect_delay: float = 0.5, 71 observer_kwargs: dict[str, typing.Any] = {}, 72 eventer_kwargs: dict[str, typing.Any] = {} 73 ) -> 'Component': 74 """Connect to local monitor server and create component 75 76 High-level interface for communication with Event Server, based on 77 information obtained from Monitor Server. 78 79 Component instance tries to establish active connection with 80 Event Server within monitor component group `server_group`. Once this 81 connection is established, `runner_cb` is called with currently active 82 eventer client instance. Result of calling `runner_cb` should be runner 83 representing user defined components activity associated with connection 84 to active Event Server. Once connection to Event Server is closed or new 85 active Event Server is detected, associated runner is closed. If new 86 connection to Event Server is successfully established, 87 `component_cb` will be called again to create new runner associated with 88 new instance of eventer client. 89 90 If runner is closed while connection to Event Server is open, component 91 is closed. 92 93 """ 94 component = Component() 95 component._server_group = server_group 96 component._client_name = client_name 97 component._runner_cb = runner_cb 98 component._state_cb = state_cb 99 component._close_req_cb = close_req_cb 100 component._status_cb = status_cb 101 component._events_cb = events_cb 102 component._server_data_queue_size = server_data_queue_size 103 component._reconnect_delay = reconnect_delay 104 component._eventer_kwargs = eventer_kwargs 105 component._server_data = None 106 component._server_data_queue = None 107 108 component._monitor_component = await hat.monitor.component.connect( 109 addr=addr, 110 name=name, 111 group=group, 112 runner_cb=component._create_monitor_runner, 113 **{**observer_kwargs, 114 'state_cb': component._on_state, 115 'close_req_cb': component._on_close_req}) 116 117 return component 118 119 120class Component(aio.Resource): 121 """Eventer Component""" 122 123 @property 124 def async_group(self): 125 return self._monitor_component.async_group 126 127 @property 128 def state(self) -> State: 129 """State""" 130 return self._monitor_component.state 131 132 @property 133 def ready(self) -> bool: 134 """Ready""" 135 return self._monitor_component.ready 136 137 async def set_ready(self, ready: bool): 138 """Set ready""" 139 await self._monitor_component.set_ready(ready) 140 141 async def _on_state(self, monitor_component, state): 142 if self._server_data_queue is not None: 143 data = self._get_active_server_data(state) 144 145 if data != self._server_data: 146 self._server_data = data 147 148 with contextlib.suppress(aio.QueueClosedError): 149 await self._server_data_queue.put(data) 150 151 if self._state_cb: 152 await aio.call(self._state_cb, self, state) 153 154 async def _on_close_req(self, monitor_component): 155 if self._close_req_cb: 156 await aio.call(self._close_req_cb, self) 157 158 async def _on_status(self, eventer_client, status): 159 if self._status_cb: 160 await aio.call(self._status_cb, self, eventer_client, status) 161 162 async def _on_events(self, eventer_client, events): 163 if self._events_cb: 164 await aio.call(self._events_cb, self, eventer_client, events) 165 166 def _create_monitor_runner(self, monitor_component): 167 self._server_data = self._get_active_server_data( 168 monitor_component.state) 169 self._server_data_queue = aio.Queue(self._server_data_queue_size) 170 171 if self._server_data: 172 self._server_data_queue.put_nowait(self._server_data) 173 174 runner = aio.Group() 175 runner.spawn(self._server_data_loop, runner, self._server_data_queue) 176 177 return runner 178 179 def _get_active_server_data(self, state): 180 info = util.first(state.components, self._active_server_filter) 181 if not info: 182 return 183 184 server_id = json.get(info.data, 'server_id') 185 host = json.get(info.data, ['eventer_server', 'host']) 186 port = json.get(info.data, ['eventer_server', 'port']) 187 server_token = json.get(info.data, 'server_token') 188 if (not isinstance(server_id, int) or 189 not isinstance(host, str) or 190 not isinstance(port, int) or 191 not isinstance(server_token, (str, types.NoneType))): 192 return 193 194 client_token = self._eventer_kwargs.get('client_token') 195 if client_token is not None and client_token != server_token: 196 return 197 198 return ServerData(server_id=server_id, 199 addr=tcp.Address(host, port), 200 server_token=server_token) 201 202 def _active_server_filter(self, info): 203 return (info.group == self._server_group and 204 info.blessing_req.token is not None and 205 info.blessing_req.token == info.blessing_res.token) 206 207 async def _server_data_loop(self, async_group, server_data_queue): 208 try: 209 server_data = None 210 while True: 211 while not server_data or not server_data_queue.empty(): 212 server_data = await server_data_queue.get_until_empty() 213 214 async with async_group.create_subgroup() as subgroup: 215 subgroup.spawn(self._client_loop, subgroup, server_data) 216 server_data = await server_data_queue.get_until_empty() 217 218 except Exception as e: 219 mlog.error("address loop error: %s", e, exc_info=e) 220 self.close() 221 222 finally: 223 async_group.close() 224 server_data_queue.close() 225 226 async def _client_loop(self, async_group, server_data): 227 try: 228 while True: 229 try: 230 kwargs = {**self._eventer_kwargs, 231 'status_cb': self._on_status, 232 'events_cb': self._on_events} 233 234 if 'server_id' not in kwargs: 235 kwargs['server_id'] = server_data.server_id 236 237 mlog.debug("connecting to server %s", server_data.addr) 238 client = await hat.event.eventer.client.connect( 239 addr=server_data.addr, 240 client_name=self._client_name, 241 **kwargs) 242 243 except Exception as e: 244 mlog.warning("error connecting to server: %s", e, 245 exc_info=e) 246 await asyncio.sleep(self._reconnect_delay) 247 continue 248 249 try: 250 mlog.debug("connected to server") 251 runner = await client.async_group.spawn( 252 aio.call, self._runner_cb, self, server_data, client) 253 254 try: 255 async with async_group.create_subgroup() as subgroup: 256 client_closing_task = subgroup.spawn( 257 client.wait_closing) 258 runner_closing_task = subgroup.spawn( 259 runner.wait_closing) 260 261 await asyncio.wait( 262 [client_closing_task, runner_closing_task], 263 return_when=asyncio.FIRST_COMPLETED) 264 265 if (runner_closing_task.done() and 266 not client_closing_task.done()): 267 self.close() 268 return 269 270 finally: 271 await aio.uncancellable(runner.async_close()) 272 273 finally: 274 await aio.uncancellable(client.async_close()) 275 276 mlog.debug("connection to server closed") 277 await asyncio.sleep(self._reconnect_delay) 278 279 except Exception as e: 280 mlog.error("client loop error: %s", e, exc_info=e) 281 self.close() 282 283 finally: 284 async_group.close()
Module logger
26class State(typing.NamedTuple): 27 """Client state""" 28 info: common.ComponentInfo | None 29 components: list[common.ComponentInfo]
Component state
Component runner
Runner callback
State callback
Close request callback
Status callback
Events callback
52class ServerData(typing.NamedTuple): 53 """Server data""" 54 server_id: common.ServerId 55 addr: tcp.Address 56 server_token: str | None
Server data
59async def connect(addr: tcp.Address, 60 name: str, 61 group: str, 62 server_group: str, 63 client_name: str, 64 runner_cb: RunnerCb, 65 *, 66 state_cb: StateCb | None = None, 67 close_req_cb: CloseReqCb | None = None, 68 status_cb: StatusCb | None = None, 69 events_cb: EventsCb | None = None, 70 server_data_queue_size: int = 1024, 71 reconnect_delay: float = 0.5, 72 observer_kwargs: dict[str, typing.Any] = {}, 73 eventer_kwargs: dict[str, typing.Any] = {} 74 ) -> 'Component': 75 """Connect to local monitor server and create component 76 77 High-level interface for communication with Event Server, based on 78 information obtained from Monitor Server. 79 80 Component instance tries to establish active connection with 81 Event Server within monitor component group `server_group`. Once this 82 connection is established, `runner_cb` is called with currently active 83 eventer client instance. Result of calling `runner_cb` should be runner 84 representing user defined components activity associated with connection 85 to active Event Server. Once connection to Event Server is closed or new 86 active Event Server is detected, associated runner is closed. If new 87 connection to Event Server is successfully established, 88 `component_cb` will be called again to create new runner associated with 89 new instance of eventer client. 90 91 If runner is closed while connection to Event Server is open, component 92 is closed. 93 94 """ 95 component = Component() 96 component._server_group = server_group 97 component._client_name = client_name 98 component._runner_cb = runner_cb 99 component._state_cb = state_cb 100 component._close_req_cb = close_req_cb 101 component._status_cb = status_cb 102 component._events_cb = events_cb 103 component._server_data_queue_size = server_data_queue_size 104 component._reconnect_delay = reconnect_delay 105 component._eventer_kwargs = eventer_kwargs 106 component._server_data = None 107 component._server_data_queue = None 108 109 component._monitor_component = await hat.monitor.component.connect( 110 addr=addr, 111 name=name, 112 group=group, 113 runner_cb=component._create_monitor_runner, 114 **{**observer_kwargs, 115 'state_cb': component._on_state, 116 'close_req_cb': component._on_close_req}) 117 118 return component
Connect to local monitor server and create component
High-level interface for communication with Event Server, based on information obtained from Monitor Server.
Component instance tries to establish active connection with
Event Server within monitor component group server_group
. Once this
connection is established, runner_cb
is called with currently active
eventer client instance. Result of calling runner_cb
should be runner
representing user defined components activity associated with connection
to active Event Server. Once connection to Event Server is closed or new
active Event Server is detected, associated runner is closed. If new
connection to Event Server is successfully established,
component_cb
will be called again to create new runner associated with
new instance of eventer client.
If runner is closed while connection to Event Server is open, component is closed.
121class Component(aio.Resource): 122 """Eventer Component""" 123 124 @property 125 def async_group(self): 126 return self._monitor_component.async_group 127 128 @property 129 def state(self) -> State: 130 """State""" 131 return self._monitor_component.state 132 133 @property 134 def ready(self) -> bool: 135 """Ready""" 136 return self._monitor_component.ready 137 138 async def set_ready(self, ready: bool): 139 """Set ready""" 140 await self._monitor_component.set_ready(ready) 141 142 async def _on_state(self, monitor_component, state): 143 if self._server_data_queue is not None: 144 data = self._get_active_server_data(state) 145 146 if data != self._server_data: 147 self._server_data = data 148 149 with contextlib.suppress(aio.QueueClosedError): 150 await self._server_data_queue.put(data) 151 152 if self._state_cb: 153 await aio.call(self._state_cb, self, state) 154 155 async def _on_close_req(self, monitor_component): 156 if self._close_req_cb: 157 await aio.call(self._close_req_cb, self) 158 159 async def _on_status(self, eventer_client, status): 160 if self._status_cb: 161 await aio.call(self._status_cb, self, eventer_client, status) 162 163 async def _on_events(self, eventer_client, events): 164 if self._events_cb: 165 await aio.call(self._events_cb, self, eventer_client, events) 166 167 def _create_monitor_runner(self, monitor_component): 168 self._server_data = self._get_active_server_data( 169 monitor_component.state) 170 self._server_data_queue = aio.Queue(self._server_data_queue_size) 171 172 if self._server_data: 173 self._server_data_queue.put_nowait(self._server_data) 174 175 runner = aio.Group() 176 runner.spawn(self._server_data_loop, runner, self._server_data_queue) 177 178 return runner 179 180 def _get_active_server_data(self, state): 181 info = util.first(state.components, self._active_server_filter) 182 if not info: 183 return 184 185 server_id = json.get(info.data, 'server_id') 186 host = json.get(info.data, ['eventer_server', 'host']) 187 port = json.get(info.data, ['eventer_server', 'port']) 188 server_token = json.get(info.data, 'server_token') 189 if (not isinstance(server_id, int) or 190 not isinstance(host, str) or 191 not isinstance(port, int) or 192 not isinstance(server_token, (str, types.NoneType))): 193 return 194 195 client_token = self._eventer_kwargs.get('client_token') 196 if client_token is not None and client_token != server_token: 197 return 198 199 return ServerData(server_id=server_id, 200 addr=tcp.Address(host, port), 201 server_token=server_token) 202 203 def _active_server_filter(self, info): 204 return (info.group == self._server_group and 205 info.blessing_req.token is not None and 206 info.blessing_req.token == info.blessing_res.token) 207 208 async def _server_data_loop(self, async_group, server_data_queue): 209 try: 210 server_data = None 211 while True: 212 while not server_data or not server_data_queue.empty(): 213 server_data = await server_data_queue.get_until_empty() 214 215 async with async_group.create_subgroup() as subgroup: 216 subgroup.spawn(self._client_loop, subgroup, server_data) 217 server_data = await server_data_queue.get_until_empty() 218 219 except Exception as e: 220 mlog.error("address loop error: %s", e, exc_info=e) 221 self.close() 222 223 finally: 224 async_group.close() 225 server_data_queue.close() 226 227 async def _client_loop(self, async_group, server_data): 228 try: 229 while True: 230 try: 231 kwargs = {**self._eventer_kwargs, 232 'status_cb': self._on_status, 233 'events_cb': self._on_events} 234 235 if 'server_id' not in kwargs: 236 kwargs['server_id'] = server_data.server_id 237 238 mlog.debug("connecting to server %s", server_data.addr) 239 client = await hat.event.eventer.client.connect( 240 addr=server_data.addr, 241 client_name=self._client_name, 242 **kwargs) 243 244 except Exception as e: 245 mlog.warning("error connecting to server: %s", e, 246 exc_info=e) 247 await asyncio.sleep(self._reconnect_delay) 248 continue 249 250 try: 251 mlog.debug("connected to server") 252 runner = await client.async_group.spawn( 253 aio.call, self._runner_cb, self, server_data, client) 254 255 try: 256 async with async_group.create_subgroup() as subgroup: 257 client_closing_task = subgroup.spawn( 258 client.wait_closing) 259 runner_closing_task = subgroup.spawn( 260 runner.wait_closing) 261 262 await asyncio.wait( 263 [client_closing_task, runner_closing_task], 264 return_when=asyncio.FIRST_COMPLETED) 265 266 if (runner_closing_task.done() and 267 not client_closing_task.done()): 268 self.close() 269 return 270 271 finally: 272 await aio.uncancellable(runner.async_close()) 273 274 finally: 275 await aio.uncancellable(client.async_close()) 276 277 mlog.debug("connection to server closed") 278 await asyncio.sleep(self._reconnect_delay) 279 280 except Exception as e: 281 mlog.error("client loop error: %s", e, exc_info=e) 282 self.close() 283 284 finally: 285 async_group.close()
Eventer Component
128 @property 129 def state(self) -> State: 130 """State""" 131 return self._monitor_component.state
State