hat.event.component
Eventer Component
1"""Eventer Component""" 2 3from collections.abc import Collection 4import asyncio 5import contextlib 6import logging 7import types 8import typing 9 10from hat import aio 11from hat import json 12from hat import util 13from hat.drivers import tcp 14import hat.monitor.component 15 16from hat.event import common 17import hat.event.eventer.client 18 19 20mlog: logging.Logger = logging.getLogger(__name__) 21"""Module logger""" 22 23State: typing.TypeAlias = hat.monitor.component.State 24"""Component state""" 25 26Runner: typing.TypeAlias = aio.Resource 27"""Component runner""" 28 29RunnerCb: typing.TypeAlias = aio.AsyncCallable[ 30 ['Component', 'ServerData', hat.event.eventer.client.Client], 31 Runner] 32"""Runner callback""" 33 34StateCb: typing.TypeAlias = aio.AsyncCallable[['Component', State], None] 35"""State callback""" 36 37CloseReqCb: typing.TypeAlias = aio.AsyncCallable[['Component'], None] 38"""Close request callback""" 39 40StatusCb: typing.TypeAlias = aio.AsyncCallable[ 41 ['Component', hat.event.eventer.client.Client, common.Status], 42 None] 43"""Status callback""" 44 45EventsCb: typing.TypeAlias = aio.AsyncCallable[ 46 ['Component', hat.event.eventer.client.Client, Collection[common.Event]], 47 None] 48"""Events callback""" 49 50 51class ServerData(typing.NamedTuple): 52 """Server data""" 53 server_id: common.ServerId 54 addr: tcp.Address 55 server_token: str | None 56 57 58async def connect(addr: tcp.Address, 59 name: str, 60 group: str, 61 server_group: str, 62 runner_cb: RunnerCb, 63 *, 64 state_cb: StateCb | None = None, 65 close_req_cb: CloseReqCb | None = None, 66 status_cb: StatusCb | None = None, 67 events_cb: EventsCb | None = None, 68 server_data_queue_size: int = 1024, 69 reconnect_delay: float = 0.5, 70 observer_kwargs: dict[str, typing.Any] = {}, 71 eventer_kwargs: dict[str, typing.Any] = {} 72 ) -> 'Component': 73 """Connect to local monitor server and create component 74 75 High-level interface for communication with Event Server, based on 76 information obtained from Monitor Server. 77 78 Component instance tries to establish active connection with 79 Event Server within monitor component group `server_group`. Once this 80 connection is established, `runner_cb` is called with currently active 81 eventer client instance. Result of calling `runner_cb` should be runner 82 representing user defined components activity associated with connection 83 to active Event Server. Once connection to Event Server is closed or new 84 active Event Server is detected, associated runner is closed. If new 85 connection to Event Server is successfully established, 86 `component_cb` will be called again to create new runner associated with 87 new instance of eventer client. 88 89 If runner is closed while connection to Event Server is open, component 90 is closed. 91 92 """ 93 component = Component() 94 component._name = name 95 component._server_group = server_group 96 component._runner_cb = runner_cb 97 component._state_cb = state_cb 98 component._close_req_cb = close_req_cb 99 component._status_cb = status_cb 100 component._events_cb = events_cb 101 component._server_data_queue_size = server_data_queue_size 102 component._reconnect_delay = reconnect_delay 103 component._eventer_kwargs = eventer_kwargs 104 component._server_data = None 105 component._server_data_queue = None 106 107 component._monitor_component = await hat.monitor.component.connect( 108 addr=addr, 109 name=name, 110 group=group, 111 runner_cb=component._create_monitor_runner, 112 **{**observer_kwargs, 113 'state_cb': component._on_state, 114 'close_req_cb': component._on_close_req}) 115 116 return component 117 118 119class Component(aio.Resource): 120 """Eventer Component""" 121 122 @property 123 def async_group(self): 124 return self._monitor_component.async_group 125 126 @property 127 def state(self) -> State: 128 """State""" 129 return self._monitor_component.state 130 131 @property 132 def ready(self) -> bool: 133 """Ready""" 134 return self._monitor_component.ready 135 136 async def set_ready(self, ready: bool): 137 """Set ready""" 138 await self._monitor_component.set_ready(ready) 139 140 async def _on_state(self, monitor_component, state): 141 if self._server_data_queue is not None: 142 data = self._get_active_server_data(state) 143 144 if data != self._server_data: 145 self._server_data = data 146 147 with contextlib.suppress(aio.QueueClosedError): 148 await self._server_data_queue.put(data) 149 150 if self._state_cb: 151 await aio.call(self._state_cb, self, state) 152 153 async def _on_close_req(self, monitor_component): 154 if self._close_req_cb: 155 await aio.call(self._close_req_cb, self) 156 157 async def _on_status(self, eventer_client, status): 158 if self._status_cb: 159 await aio.call(self._status_cb, self, eventer_client, status) 160 161 async def _on_events(self, eventer_client, events): 162 if self._events_cb: 163 await aio.call(self._events_cb, self, eventer_client, events) 164 165 def _create_monitor_runner(self, monitor_component): 166 self._server_data = self._get_active_server_data( 167 monitor_component.state) 168 self._server_data_queue = aio.Queue(self._server_data_queue_size) 169 170 if self._server_data: 171 self._server_data_queue.put_nowait(self._server_data) 172 173 runner = aio.Group() 174 runner.spawn(self._server_data_loop, runner, self._server_data_queue) 175 176 return runner 177 178 def _get_active_server_data(self, state): 179 info = util.first(state.components, self._active_server_filter) 180 if not info: 181 return 182 183 server_id = json.get(info.data, 'server_id') 184 host = json.get(info.data, ['eventer_server', 'host']) 185 port = json.get(info.data, ['eventer_server', 'port']) 186 server_token = json.get(info.data, 'server_token') 187 if (not isinstance(server_id, int) or 188 not isinstance(host, str) or 189 not isinstance(port, int) or 190 not isinstance(server_token, (str, types.NoneType))): 191 return 192 193 client_token = self._eventer_kwargs.get('client_token') 194 if client_token is not None and client_token != server_token: 195 return 196 197 return ServerData(server_id=server_id, 198 addr=tcp.Address(host, port), 199 server_token=server_token) 200 201 def _active_server_filter(self, info): 202 return (info.group == self._server_group and 203 info.blessing_req.token is not None and 204 info.blessing_req.token == info.blessing_res.token) 205 206 async def _server_data_loop(self, async_group, server_data_queue): 207 try: 208 server_data = None 209 while True: 210 while not server_data and not server_data_queue.empty(): 211 server_data = await server_data_queue.get_until_empty() 212 213 async with async_group.create_subgroup() as subgroup: 214 subgroup.spawn(self._client_loop, subgroup, server_data) 215 server_data = await server_data_queue.get_until_empty() 216 217 except Exception as e: 218 mlog.error("address loop error: %s", e, exc_info=e) 219 self.close() 220 221 finally: 222 async_group.close() 223 server_data_queue.close() 224 225 async def _client_loop(self, async_group, server_data): 226 try: 227 while True: 228 try: 229 kwargs = {**self._eventer_kwargs, 230 'status_cb': self._on_status, 231 'events_cb': self._on_events} 232 233 if 'server_id' not in kwargs: 234 kwargs['server_id'] = server_data.server_id 235 236 mlog.debug("connecting to server %s", server_data.addr) 237 client = await hat.event.eventer.client.connect( 238 addr=server_data.addr, 239 client_name=self._name, 240 **kwargs) 241 242 except Exception as e: 243 mlog.warning("error connecting to server: %s", e, 244 exc_info=e) 245 await asyncio.sleep(self._reconnect_delay) 246 continue 247 248 try: 249 mlog.debug("connected to server") 250 runner = await client.async_group.spawn( 251 aio.call, self._runner_cb, self, server_data, client) 252 253 try: 254 async with async_group.create_subgroup() as subgroup: 255 client_closing_task = subgroup.spawn( 256 client.wait_closing) 257 runner_closing_task = subgroup.spawn( 258 runner.wait_closing) 259 260 await asyncio.wait( 261 [client_closing_task, runner_closing_task], 262 return_when=asyncio.FIRST_COMPLETED) 263 264 if (runner_closing_task.done() and 265 not client_closing_task.done()): 266 self.close() 267 return 268 269 finally: 270 await aio.uncancellable(runner.async_close()) 271 272 finally: 273 await aio.uncancellable(client.async_close()) 274 275 mlog.debug("connection to server closed") 276 await asyncio.sleep(self._reconnect_delay) 277 278 except Exception as e: 279 mlog.error("client loop error: %s", e, exc_info=e) 280 self.close() 281 282 finally: 283 async_group.close()
Module logger
26class State(typing.NamedTuple): 27 """Client state""" 28 info: common.ComponentInfo | None 29 components: list[common.ComponentInfo]
Component state
Create new instance of State(info, components)
Inherited Members
- builtins.tuple
- index
- count
Component runner
Runner callback
State callback
Close request callback
Status callback
Events callback
52class ServerData(typing.NamedTuple): 53 """Server data""" 54 server_id: common.ServerId 55 addr: tcp.Address 56 server_token: str | None
Server data
Create new instance of ServerData(server_id, addr, server_token)
Inherited Members
- builtins.tuple
- index
- count
59async def connect(addr: tcp.Address, 60 name: str, 61 group: str, 62 server_group: str, 63 runner_cb: RunnerCb, 64 *, 65 state_cb: StateCb | None = None, 66 close_req_cb: CloseReqCb | None = None, 67 status_cb: StatusCb | None = None, 68 events_cb: EventsCb | None = None, 69 server_data_queue_size: int = 1024, 70 reconnect_delay: float = 0.5, 71 observer_kwargs: dict[str, typing.Any] = {}, 72 eventer_kwargs: dict[str, typing.Any] = {} 73 ) -> 'Component': 74 """Connect to local monitor server and create component 75 76 High-level interface for communication with Event Server, based on 77 information obtained from Monitor Server. 78 79 Component instance tries to establish active connection with 80 Event Server within monitor component group `server_group`. Once this 81 connection is established, `runner_cb` is called with currently active 82 eventer client instance. Result of calling `runner_cb` should be runner 83 representing user defined components activity associated with connection 84 to active Event Server. Once connection to Event Server is closed or new 85 active Event Server is detected, associated runner is closed. If new 86 connection to Event Server is successfully established, 87 `component_cb` will be called again to create new runner associated with 88 new instance of eventer client. 89 90 If runner is closed while connection to Event Server is open, component 91 is closed. 92 93 """ 94 component = Component() 95 component._name = name 96 component._server_group = server_group 97 component._runner_cb = runner_cb 98 component._state_cb = state_cb 99 component._close_req_cb = close_req_cb 100 component._status_cb = status_cb 101 component._events_cb = events_cb 102 component._server_data_queue_size = server_data_queue_size 103 component._reconnect_delay = reconnect_delay 104 component._eventer_kwargs = eventer_kwargs 105 component._server_data = None 106 component._server_data_queue = None 107 108 component._monitor_component = await hat.monitor.component.connect( 109 addr=addr, 110 name=name, 111 group=group, 112 runner_cb=component._create_monitor_runner, 113 **{**observer_kwargs, 114 'state_cb': component._on_state, 115 'close_req_cb': component._on_close_req}) 116 117 return component
Connect to local monitor server and create component
High-level interface for communication with Event Server, based on information obtained from Monitor Server.
Component instance tries to establish active connection with
Event Server within monitor component group server_group
. Once this
connection is established, runner_cb
is called with currently active
eventer client instance. Result of calling runner_cb
should be runner
representing user defined components activity associated with connection
to active Event Server. Once connection to Event Server is closed or new
active Event Server is detected, associated runner is closed. If new
connection to Event Server is successfully established,
component_cb
will be called again to create new runner associated with
new instance of eventer client.
If runner is closed while connection to Event Server is open, component is closed.
120class Component(aio.Resource): 121 """Eventer Component""" 122 123 @property 124 def async_group(self): 125 return self._monitor_component.async_group 126 127 @property 128 def state(self) -> State: 129 """State""" 130 return self._monitor_component.state 131 132 @property 133 def ready(self) -> bool: 134 """Ready""" 135 return self._monitor_component.ready 136 137 async def set_ready(self, ready: bool): 138 """Set ready""" 139 await self._monitor_component.set_ready(ready) 140 141 async def _on_state(self, monitor_component, state): 142 if self._server_data_queue is not None: 143 data = self._get_active_server_data(state) 144 145 if data != self._server_data: 146 self._server_data = data 147 148 with contextlib.suppress(aio.QueueClosedError): 149 await self._server_data_queue.put(data) 150 151 if self._state_cb: 152 await aio.call(self._state_cb, self, state) 153 154 async def _on_close_req(self, monitor_component): 155 if self._close_req_cb: 156 await aio.call(self._close_req_cb, self) 157 158 async def _on_status(self, eventer_client, status): 159 if self._status_cb: 160 await aio.call(self._status_cb, self, eventer_client, status) 161 162 async def _on_events(self, eventer_client, events): 163 if self._events_cb: 164 await aio.call(self._events_cb, self, eventer_client, events) 165 166 def _create_monitor_runner(self, monitor_component): 167 self._server_data = self._get_active_server_data( 168 monitor_component.state) 169 self._server_data_queue = aio.Queue(self._server_data_queue_size) 170 171 if self._server_data: 172 self._server_data_queue.put_nowait(self._server_data) 173 174 runner = aio.Group() 175 runner.spawn(self._server_data_loop, runner, self._server_data_queue) 176 177 return runner 178 179 def _get_active_server_data(self, state): 180 info = util.first(state.components, self._active_server_filter) 181 if not info: 182 return 183 184 server_id = json.get(info.data, 'server_id') 185 host = json.get(info.data, ['eventer_server', 'host']) 186 port = json.get(info.data, ['eventer_server', 'port']) 187 server_token = json.get(info.data, 'server_token') 188 if (not isinstance(server_id, int) or 189 not isinstance(host, str) or 190 not isinstance(port, int) or 191 not isinstance(server_token, (str, types.NoneType))): 192 return 193 194 client_token = self._eventer_kwargs.get('client_token') 195 if client_token is not None and client_token != server_token: 196 return 197 198 return ServerData(server_id=server_id, 199 addr=tcp.Address(host, port), 200 server_token=server_token) 201 202 def _active_server_filter(self, info): 203 return (info.group == self._server_group and 204 info.blessing_req.token is not None and 205 info.blessing_req.token == info.blessing_res.token) 206 207 async def _server_data_loop(self, async_group, server_data_queue): 208 try: 209 server_data = None 210 while True: 211 while not server_data and not server_data_queue.empty(): 212 server_data = await server_data_queue.get_until_empty() 213 214 async with async_group.create_subgroup() as subgroup: 215 subgroup.spawn(self._client_loop, subgroup, server_data) 216 server_data = await server_data_queue.get_until_empty() 217 218 except Exception as e: 219 mlog.error("address loop error: %s", e, exc_info=e) 220 self.close() 221 222 finally: 223 async_group.close() 224 server_data_queue.close() 225 226 async def _client_loop(self, async_group, server_data): 227 try: 228 while True: 229 try: 230 kwargs = {**self._eventer_kwargs, 231 'status_cb': self._on_status, 232 'events_cb': self._on_events} 233 234 if 'server_id' not in kwargs: 235 kwargs['server_id'] = server_data.server_id 236 237 mlog.debug("connecting to server %s", server_data.addr) 238 client = await hat.event.eventer.client.connect( 239 addr=server_data.addr, 240 client_name=self._name, 241 **kwargs) 242 243 except Exception as e: 244 mlog.warning("error connecting to server: %s", e, 245 exc_info=e) 246 await asyncio.sleep(self._reconnect_delay) 247 continue 248 249 try: 250 mlog.debug("connected to server") 251 runner = await client.async_group.spawn( 252 aio.call, self._runner_cb, self, server_data, client) 253 254 try: 255 async with async_group.create_subgroup() as subgroup: 256 client_closing_task = subgroup.spawn( 257 client.wait_closing) 258 runner_closing_task = subgroup.spawn( 259 runner.wait_closing) 260 261 await asyncio.wait( 262 [client_closing_task, runner_closing_task], 263 return_when=asyncio.FIRST_COMPLETED) 264 265 if (runner_closing_task.done() and 266 not client_closing_task.done()): 267 self.close() 268 return 269 270 finally: 271 await aio.uncancellable(runner.async_close()) 272 273 finally: 274 await aio.uncancellable(client.async_close()) 275 276 mlog.debug("connection to server closed") 277 await asyncio.sleep(self._reconnect_delay) 278 279 except Exception as e: 280 mlog.error("client loop error: %s", e, exc_info=e) 281 self.close() 282 283 finally: 284 async_group.close()
Eventer Component
127 @property 128 def state(self) -> State: 129 """State""" 130 return self._monitor_component.state
State
137 async def set_ready(self, ready: bool): 138 """Set ready""" 139 await self._monitor_component.set_ready(ready)
Set ready
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close