hat.event.server.main

Event server main

  1"""Event server main"""
  2
  3from pathlib import Path
  4import argparse
  5import asyncio
  6import contextlib
  7import importlib
  8import logging.config
  9import sys
 10import typing
 11
 12import appdirs
 13
 14from hat import aio
 15from hat import json
 16from hat.event.server import common
 17from hat.event.server.engine import create_engine
 18from hat.event.server.eventer_server import create_eventer_server
 19from hat.event.server.syncer_server import (create_syncer_server,
 20                                            SyncerServer)
 21from hat.event.syncer_client import (create_syncer_client,
 22                                     SyncerClientState,
 23                                     SyncerClient)
 24import hat.monitor.client
 25import hat.monitor.common
 26
 27
 28mlog: logging.Logger = logging.getLogger('hat.event.server.main')
 29"""Module logger"""
 30
 31user_conf_dir: Path = Path(appdirs.user_config_dir('hat'))
 32"""User configuration directory path"""
 33
 34servers_engine_stopped_timeout: int = 3
 35"""Timeout for waiting engine to be stopped on all servers syncer client is
 36connected to"""
 37
 38
 39def create_argument_parser() -> argparse.ArgumentParser:
 40    """Create argument parser"""
 41    parser = argparse.ArgumentParser()
 42    parser.add_argument(
 43        '--conf', metavar='PATH', type=Path, default=None,
 44        help="configuration defined by hat-event://main.yaml# "
 45             "(default $XDG_CONFIG_HOME/hat/event.{yaml|yml|json})")
 46    return parser
 47
 48
 49def main():
 50    """Event Server"""
 51    parser = create_argument_parser()
 52    args = parser.parse_args()
 53
 54    conf_path = args.conf
 55    if not conf_path:
 56        for suffix in ('.yaml', '.yml', '.json'):
 57            conf_path = (user_conf_dir / 'event').with_suffix(suffix)
 58            if conf_path.exists():
 59                break
 60
 61    if conf_path == Path('-'):
 62        conf = json.decode_stream(sys.stdin)
 63    else:
 64        conf = json.decode_file(conf_path)
 65
 66    sync_main(conf)
 67
 68
 69def sync_main(conf: json.Data):
 70    """Sync main entry point"""
 71    aio.init_asyncio()
 72
 73    common.json_schema_repo.validate('hat-event://main.yaml#', conf)
 74
 75    sub_confs = [conf['backend'], *conf['engine']['modules']]
 76    for sub_conf in sub_confs:
 77        module = importlib.import_module(sub_conf['module'])
 78        if module.json_schema_repo and module.json_schema_id:
 79            module.json_schema_repo.validate(module.json_schema_id, sub_conf)
 80
 81    logging.config.dictConfig(conf['log'])
 82
 83    with contextlib.suppress(asyncio.CancelledError):
 84        aio.run_asyncio(async_main(conf))
 85
 86
 87async def async_main(conf: json.Data):
 88    """Async main entry point"""
 89    try:
 90        await run_backend(conf)
 91
 92    finally:
 93        await asyncio.sleep(0.1)
 94
 95
 96async def run_backend(conf: json.Data):
 97    py_module = importlib.import_module(conf['backend']['module'])
 98    backend = await aio.call(py_module.create, conf['backend'])
 99
100    async_group = aio.Group(log_exceptions=False)
101    async_group.spawn(aio.call_on_done, backend.wait_closing(),
102                      async_group.close)
103
104    async def cleanup():
105        await async_group.async_close()
106        await backend.async_close()
107
108    try:
109        await async_group.spawn(run_syncer_server, conf, backend)
110
111    finally:
112        await aio.uncancellable(cleanup())
113
114
115async def run_syncer_server(conf: json.Data,
116                            backend: common.Backend):
117    syncer_server = await create_syncer_server(conf['syncer_server'],
118                                               backend)
119
120    async_group = aio.Group(log_exceptions=False)
121    async_group.spawn(aio.call_on_done, syncer_server.wait_closing(),
122                      async_group.close)
123
124    async def cleanup():
125        await async_group.async_close()
126        with contextlib.suppress(Exception):
127            await backend.flush()
128        await syncer_server.async_close()
129
130    try:
131        if 'monitor' in conf:
132            await async_group.spawn(run_monitor_client, conf, backend,
133                                    syncer_server)
134        else:
135            await async_group.spawn(run_engine, None, conf, backend,
136                                    syncer_server, None)
137
138    finally:
139        await aio.uncancellable(cleanup())
140
141
142async def run_monitor_client(conf: json.Data,
143                             backend: common.Backend,
144                             syncer_server: SyncerServer):
145    async_group = aio.Group()
146    monitor = None
147    syncer_client = None
148    component = None
149
150    async def cleanup():
151        if component:
152            await component.async_close()
153        if syncer_client:
154            await syncer_client.async_close()
155        if monitor:
156            await monitor.async_close()
157        await async_group.async_close()
158
159    try:
160        data = {'server_id': conf['engine']['server_id'],
161                'eventer_server_address': conf['eventer_server']['address'],
162                'syncer_server_address': conf['syncer_server']['address']}
163        if 'syncer_token' in conf:
164            data['syncer_token'] = conf['syncer_token']
165        monitor = await hat.monitor.client.connect(conf['monitor'], data)
166        async_group.spawn(aio.call_on_done, monitor.wait_closing(),
167                          async_group.close)
168
169        syncer_client = await create_syncer_client(
170            backend=backend,
171            monitor_client=monitor,
172            monitor_group=conf['monitor']['group'],
173            name=str(conf['engine']['server_id']),
174            syncer_token=conf.get('syncer_token'))
175        async_group.spawn(aio.call_on_done, syncer_client.wait_closing(),
176                          async_group.close)
177
178        component = hat.monitor.client.Component(
179            monitor, run_engine, conf, backend, syncer_server, syncer_client)
180        component.set_ready(True)
181        async_group.spawn(aio.call_on_done, component.wait_closing(),
182                          async_group.close)
183
184        await async_group.wait_closing()
185
186    finally:
187        await aio.uncancellable(cleanup())
188
189
190async def run_engine(component: typing.Optional[hat.monitor.client.Component],
191                     conf: json.Data,
192                     backend: common.Backend,
193                     syncer_server: SyncerServer,
194                     syncer_client: typing.Optional[SyncerClient]):
195    async_group = aio.Group()
196    syncer_client_states = {}
197    syncer_source = common.Source(type=common.SourceType.SYNCER,
198                                  id=0)
199    engine = None
200    eventer_server = None
201
202    async def cleanup():
203        if eventer_server:
204            await eventer_server.async_close()
205        if engine:
206            await engine.async_close()
207        with contextlib.suppress(Exception):
208            await backend.flush()
209        with contextlib.suppress(Exception):
210            await syncer_server.flush()
211        await async_group.async_close()
212
213    def on_syncer_server_state(client_infos):
214        event = common.RegisterEvent(
215            event_type=('event', 'syncer', 'server'),
216            source_timestamp=None,
217            payload=common.EventPayload(
218                type=common.EventPayloadType.JSON,
219                data=[{'client_name': client_info.name,
220                       'synced': client_info.synced}
221                      for client_info in client_infos]))
222        async_group.spawn(engine.register, syncer_source, [event])
223
224    def on_syncer_client_state(server_id, state):
225        syncer_client_states[server_id] = state
226        if state != SyncerClientState.SYNCED:
227            return
228
229        event = common.RegisterEvent(
230            event_type=('event', 'syncer', 'client', str(server_id), 'synced'),
231            source_timestamp=None,
232            payload=common.EventPayload(
233                type=common.EventPayloadType.JSON,
234                data=True))
235        async_group.spawn(engine.register, syncer_source, [event])
236
237    def on_syncer_client_events(server_id, events):
238        state = syncer_client_states.pop(server_id, None)
239        if state != SyncerClientState.CONNECTED:
240            return
241
242        event = common.RegisterEvent(
243            event_type=('event', 'syncer', 'client', str(server_id), 'synced'),
244            source_timestamp=None,
245            payload=common.EventPayload(
246                type=common.EventPayloadType.JSON,
247                data=False))
248        async_group.spawn(engine.register, syncer_source, [event])
249
250    # TODO wait depending on ???
251
252    try:
253        engine = await create_engine(conf['engine'], backend)
254        async_group.spawn(aio.call_on_done, engine.wait_closing(),
255                          async_group.close)
256
257        with contextlib.ExitStack() as exit_stack:
258            exit_stack.enter_context(
259                syncer_server.register_state_cb(on_syncer_server_state))
260            on_syncer_server_state(list(syncer_server.state))
261
262            if syncer_client:
263                exit_stack.enter_context(
264                    syncer_client.register_state_cb(on_syncer_client_state))
265                exit_stack.enter_context(
266                    syncer_client.register_events_cb(on_syncer_client_events))
267
268            eventer_server = await create_eventer_server(
269                conf['eventer_server'], engine)
270            async_group.spawn(aio.call_on_done, eventer_server.wait_closing(),
271                              async_group.close)
272
273            await async_group.wait_closing()
274
275    finally:
276        await aio.uncancellable(cleanup())
277
278
279if __name__ == '__main__':
280    sys.argv[0] = 'hat-event'
281    sys.exit(main())
mlog: logging.Logger = <Logger hat.event.server.main (WARNING)>

Module logger

user_conf_dir: pathlib.Path = PosixPath('/home/runner/.config/hat')

User configuration directory path

servers_engine_stopped_timeout: int = 3

Timeout for waiting engine to be stopped on all servers syncer client is connected to

def create_argument_parser() -> argparse.ArgumentParser:
40def create_argument_parser() -> argparse.ArgumentParser:
41    """Create argument parser"""
42    parser = argparse.ArgumentParser()
43    parser.add_argument(
44        '--conf', metavar='PATH', type=Path, default=None,
45        help="configuration defined by hat-event://main.yaml# "
46             "(default $XDG_CONFIG_HOME/hat/event.{yaml|yml|json})")
47    return parser

Create argument parser

def main():
50def main():
51    """Event Server"""
52    parser = create_argument_parser()
53    args = parser.parse_args()
54
55    conf_path = args.conf
56    if not conf_path:
57        for suffix in ('.yaml', '.yml', '.json'):
58            conf_path = (user_conf_dir / 'event').with_suffix(suffix)
59            if conf_path.exists():
60                break
61
62    if conf_path == Path('-'):
63        conf = json.decode_stream(sys.stdin)
64    else:
65        conf = json.decode_file(conf_path)
66
67    sync_main(conf)

Event Server

def sync_main(conf: ~Data):
70def sync_main(conf: json.Data):
71    """Sync main entry point"""
72    aio.init_asyncio()
73
74    common.json_schema_repo.validate('hat-event://main.yaml#', conf)
75
76    sub_confs = [conf['backend'], *conf['engine']['modules']]
77    for sub_conf in sub_confs:
78        module = importlib.import_module(sub_conf['module'])
79        if module.json_schema_repo and module.json_schema_id:
80            module.json_schema_repo.validate(module.json_schema_id, sub_conf)
81
82    logging.config.dictConfig(conf['log'])
83
84    with contextlib.suppress(asyncio.CancelledError):
85        aio.run_asyncio(async_main(conf))

Sync main entry point

async def async_main(conf: ~Data):
88async def async_main(conf: json.Data):
89    """Async main entry point"""
90    try:
91        await run_backend(conf)
92
93    finally:
94        await asyncio.sleep(0.1)

Async main entry point

async def run_backend(conf: ~Data):
 97async def run_backend(conf: json.Data):
 98    py_module = importlib.import_module(conf['backend']['module'])
 99    backend = await aio.call(py_module.create, conf['backend'])
100
101    async_group = aio.Group(log_exceptions=False)
102    async_group.spawn(aio.call_on_done, backend.wait_closing(),
103                      async_group.close)
104
105    async def cleanup():
106        await async_group.async_close()
107        await backend.async_close()
108
109    try:
110        await async_group.spawn(run_syncer_server, conf, backend)
111
112    finally:
113        await aio.uncancellable(cleanup())
async def run_syncer_server(conf: ~Data, backend: hat.event.server.common.Backend):
116async def run_syncer_server(conf: json.Data,
117                            backend: common.Backend):
118    syncer_server = await create_syncer_server(conf['syncer_server'],
119                                               backend)
120
121    async_group = aio.Group(log_exceptions=False)
122    async_group.spawn(aio.call_on_done, syncer_server.wait_closing(),
123                      async_group.close)
124
125    async def cleanup():
126        await async_group.async_close()
127        with contextlib.suppress(Exception):
128            await backend.flush()
129        await syncer_server.async_close()
130
131    try:
132        if 'monitor' in conf:
133            await async_group.spawn(run_monitor_client, conf, backend,
134                                    syncer_server)
135        else:
136            await async_group.spawn(run_engine, None, conf, backend,
137                                    syncer_server, None)
138
139    finally:
140        await aio.uncancellable(cleanup())
async def run_monitor_client( conf: ~Data, backend: hat.event.server.common.Backend, syncer_server: hat.event.server.syncer_server.SyncerServer):
143async def run_monitor_client(conf: json.Data,
144                             backend: common.Backend,
145                             syncer_server: SyncerServer):
146    async_group = aio.Group()
147    monitor = None
148    syncer_client = None
149    component = None
150
151    async def cleanup():
152        if component:
153            await component.async_close()
154        if syncer_client:
155            await syncer_client.async_close()
156        if monitor:
157            await monitor.async_close()
158        await async_group.async_close()
159
160    try:
161        data = {'server_id': conf['engine']['server_id'],
162                'eventer_server_address': conf['eventer_server']['address'],
163                'syncer_server_address': conf['syncer_server']['address']}
164        if 'syncer_token' in conf:
165            data['syncer_token'] = conf['syncer_token']
166        monitor = await hat.monitor.client.connect(conf['monitor'], data)
167        async_group.spawn(aio.call_on_done, monitor.wait_closing(),
168                          async_group.close)
169
170        syncer_client = await create_syncer_client(
171            backend=backend,
172            monitor_client=monitor,
173            monitor_group=conf['monitor']['group'],
174            name=str(conf['engine']['server_id']),
175            syncer_token=conf.get('syncer_token'))
176        async_group.spawn(aio.call_on_done, syncer_client.wait_closing(),
177                          async_group.close)
178
179        component = hat.monitor.client.Component(
180            monitor, run_engine, conf, backend, syncer_server, syncer_client)
181        component.set_ready(True)
182        async_group.spawn(aio.call_on_done, component.wait_closing(),
183                          async_group.close)
184
185        await async_group.wait_closing()
186
187    finally:
188        await aio.uncancellable(cleanup())
async def run_engine( component: Optional[hat.monitor.client.Component], conf: ~Data, backend: hat.event.server.common.Backend, syncer_server: hat.event.server.syncer_server.SyncerServer, syncer_client: Optional[hat.event.syncer_client.SyncerClient]):
191async def run_engine(component: typing.Optional[hat.monitor.client.Component],
192                     conf: json.Data,
193                     backend: common.Backend,
194                     syncer_server: SyncerServer,
195                     syncer_client: typing.Optional[SyncerClient]):
196    async_group = aio.Group()
197    syncer_client_states = {}
198    syncer_source = common.Source(type=common.SourceType.SYNCER,
199                                  id=0)
200    engine = None
201    eventer_server = None
202
203    async def cleanup():
204        if eventer_server:
205            await eventer_server.async_close()
206        if engine:
207            await engine.async_close()
208        with contextlib.suppress(Exception):
209            await backend.flush()
210        with contextlib.suppress(Exception):
211            await syncer_server.flush()
212        await async_group.async_close()
213
214    def on_syncer_server_state(client_infos):
215        event = common.RegisterEvent(
216            event_type=('event', 'syncer', 'server'),
217            source_timestamp=None,
218            payload=common.EventPayload(
219                type=common.EventPayloadType.JSON,
220                data=[{'client_name': client_info.name,
221                       'synced': client_info.synced}
222                      for client_info in client_infos]))
223        async_group.spawn(engine.register, syncer_source, [event])
224
225    def on_syncer_client_state(server_id, state):
226        syncer_client_states[server_id] = state
227        if state != SyncerClientState.SYNCED:
228            return
229
230        event = common.RegisterEvent(
231            event_type=('event', 'syncer', 'client', str(server_id), 'synced'),
232            source_timestamp=None,
233            payload=common.EventPayload(
234                type=common.EventPayloadType.JSON,
235                data=True))
236        async_group.spawn(engine.register, syncer_source, [event])
237
238    def on_syncer_client_events(server_id, events):
239        state = syncer_client_states.pop(server_id, None)
240        if state != SyncerClientState.CONNECTED:
241            return
242
243        event = common.RegisterEvent(
244            event_type=('event', 'syncer', 'client', str(server_id), 'synced'),
245            source_timestamp=None,
246            payload=common.EventPayload(
247                type=common.EventPayloadType.JSON,
248                data=False))
249        async_group.spawn(engine.register, syncer_source, [event])
250
251    # TODO wait depending on ???
252
253    try:
254        engine = await create_engine(conf['engine'], backend)
255        async_group.spawn(aio.call_on_done, engine.wait_closing(),
256                          async_group.close)
257
258        with contextlib.ExitStack() as exit_stack:
259            exit_stack.enter_context(
260                syncer_server.register_state_cb(on_syncer_server_state))
261            on_syncer_server_state(list(syncer_server.state))
262
263            if syncer_client:
264                exit_stack.enter_context(
265                    syncer_client.register_state_cb(on_syncer_client_state))
266                exit_stack.enter_context(
267                    syncer_client.register_events_cb(on_syncer_client_events))
268
269            eventer_server = await create_eventer_server(
270                conf['eventer_server'], engine)
271            async_group.spawn(aio.call_on_done, eventer_server.wait_closing(),
272                              async_group.close)
273
274            await async_group.wait_closing()
275
276    finally:
277        await aio.uncancellable(cleanup())