hat.event.server.main
Event server main
1"""Event server main""" 2 3from pathlib import Path 4import argparse 5import asyncio 6import contextlib 7import logging.config 8import sys 9 10import appdirs 11 12from hat import aio 13from hat import json 14 15from hat.event import common 16from hat.event.server.main_runner import MainRunner 17 18 19mlog: logging.Logger = logging.getLogger('hat.event.server.main') 20"""Module logger""" 21 22user_conf_dir: Path = Path(appdirs.user_config_dir('hat')) 23"""User configuration directory path""" 24 25 26def create_argument_parser() -> argparse.ArgumentParser: 27 """Create argument parser""" 28 parser = argparse.ArgumentParser() 29 parser.add_argument( 30 '--conf', metavar='PATH', type=Path, default=None, 31 help="configuration defined by hat-event://server.yaml " 32 "(default $XDG_CONFIG_HOME/hat/event.{yaml|yml|toml|json})") 33 return parser 34 35 36def main(): 37 """Event Server""" 38 parser = create_argument_parser() 39 args = parser.parse_args() 40 conf = json.read_conf(args.conf, user_conf_dir / 'event') 41 sync_main(conf) 42 43 44def sync_main(conf: json.Data): 45 """Sync main entry point""" 46 aio.init_asyncio() 47 48 validator = json.DefaultSchemaValidator(common.json_schema_repo) 49 validator.validate('hat-event://server.yaml', conf) 50 51 info = common.import_backend_info(conf['backend']['module']) 52 if info.json_schema_repo and info.json_schema_id: 53 validator = json.DefaultSchemaValidator(info.json_schema_repo) 54 validator.validate(info.json_schema_id, conf['backend']) 55 56 for module_conf in conf['modules']: 57 info = common.import_module_info(module_conf['module']) 58 if info.json_schema_repo and info.json_schema_id: 59 validator = json.DefaultSchemaValidator(info.json_schema_repo) 60 validator.validate(info.json_schema_id, module_conf) 61 62 log_conf = conf.get('log') 63 if log_conf: 64 logging.config.dictConfig(log_conf) 65 66 with contextlib.suppress(asyncio.CancelledError): 67 aio.run_asyncio(async_main(conf)) 68 69 70async def async_main(conf: json.Data): 71 """Async main entry point""" 72 main_runner = MainRunner(conf) 73 74 async def cleanup(): 75 await main_runner.async_close() 76 await asyncio.sleep(0.1) 77 78 try: 79 await main_runner.wait_closing() 80 81 finally: 82 await aio.uncancellable(cleanup()) 83 84 85if __name__ == '__main__': 86 sys.argv[0] = 'hat-event-server' 87 sys.exit(main())
Module logger
user_conf_dir: pathlib.Path =
PosixPath('/home/runner/.config/hat')
User configuration directory path
def
create_argument_parser() -> argparse.ArgumentParser:
27def create_argument_parser() -> argparse.ArgumentParser: 28 """Create argument parser""" 29 parser = argparse.ArgumentParser() 30 parser.add_argument( 31 '--conf', metavar='PATH', type=Path, default=None, 32 help="configuration defined by hat-event://server.yaml " 33 "(default $XDG_CONFIG_HOME/hat/event.{yaml|yml|toml|json})") 34 return parser
Create argument parser
def
main():
37def main(): 38 """Event Server""" 39 parser = create_argument_parser() 40 args = parser.parse_args() 41 conf = json.read_conf(args.conf, user_conf_dir / 'event') 42 sync_main(conf)
Event Server
def
sync_main( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]):
45def sync_main(conf: json.Data): 46 """Sync main entry point""" 47 aio.init_asyncio() 48 49 validator = json.DefaultSchemaValidator(common.json_schema_repo) 50 validator.validate('hat-event://server.yaml', conf) 51 52 info = common.import_backend_info(conf['backend']['module']) 53 if info.json_schema_repo and info.json_schema_id: 54 validator = json.DefaultSchemaValidator(info.json_schema_repo) 55 validator.validate(info.json_schema_id, conf['backend']) 56 57 for module_conf in conf['modules']: 58 info = common.import_module_info(module_conf['module']) 59 if info.json_schema_repo and info.json_schema_id: 60 validator = json.DefaultSchemaValidator(info.json_schema_repo) 61 validator.validate(info.json_schema_id, module_conf) 62 63 log_conf = conf.get('log') 64 if log_conf: 65 logging.config.dictConfig(log_conf) 66 67 with contextlib.suppress(asyncio.CancelledError): 68 aio.run_asyncio(async_main(conf))
Sync main entry point
async def
async_main( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]):
71async def async_main(conf: json.Data): 72 """Async main entry point""" 73 main_runner = MainRunner(conf) 74 75 async def cleanup(): 76 await main_runner.async_close() 77 await asyncio.sleep(0.1) 78 79 try: 80 await main_runner.wait_closing() 81 82 finally: 83 await aio.uncancellable(cleanup())
Async main entry point