hat.event.manager.main
1from collections.abc import Collection 2from pathlib import Path 3import argparse 4import asyncio 5import contextlib 6import sys 7 8from hat import aio 9from hat import json 10from hat.drivers import tcp 11 12from hat.event import eventer 13from hat.event.manager import common 14 15 16def create_argument_parser() -> argparse.ArgumentParser: 17 parser = argparse.ArgumentParser() 18 parser.add_argument( 19 '--host', type=str, default='127.0.0.1', 20 help="server host name (default '127.0.0.1')") 21 parser.add_argument( 22 '--port', type=int, default='23012', 23 help="server TCP port (default 23012)") 24 parser.add_argument( 25 '--client-name', metavar='NAME', type=str, default='manager', 26 help="client name (default 'manager')") 27 parser.add_argument( 28 '--client-token', metavar='TOKEN', type=str, default=None, 29 help="client token") 30 subparsers = parser.add_subparsers( 31 title='actions', dest='action', required=True) 32 33 register_parser = subparsers.add_parser( 34 'register', description="register new event") 35 register_parser.add_argument( 36 '--source-timestamp', metavar='TIMESTAMP', type=_parse_timestamp, 37 default=None, 38 help="source timestamp") 39 register_parser.add_argument( 40 '--payload-type', metavar='TYPE', 41 choices=['json', 'yaml', 'toml', 'binary', 'none'], 42 default=None, 43 help="payload type") 44 register_parser.add_argument( 45 '--binary-type', metavar='TYPE', type=str, default='', 46 help="binary payload type (default '')") 47 register_parser.add_argument( 48 '--payload-path', metavar='PATH', type=Path, default=Path('-'), 49 help="payload file path or '-' for stdin (default '-')") 50 register_parser.add_argument( 51 'event_type', metavar='EVENT_TYPE', type=_parse_event_type, 52 help="event type where segments are delimited by '/'") 53 54 query_parser = subparsers.add_parser( 55 'query', description="query events") 56 query_subparsers = query_parser.add_subparsers( 57 title='query types', dest='query_type', required=True) 58 59 query_latest_parser = query_subparsers.add_parser( 60 'latest', description="query latest events") 61 query_latest_parser.add_argument( 62 '--event-types', metavar='EVENT_TYPE', type=_parse_event_type, 63 default=None, nargs='*', 64 help='query event types') 65 66 query_timeseries_parser = query_subparsers.add_parser( 67 'timeseries', description="query timeseries events") 68 query_timeseries_parser.add_argument( 69 '--event-types', metavar='EVENT_TYPE', type=_parse_event_type, 70 default=None, nargs='*', 71 help='query event types') 72 query_timeseries_parser.add_argument( 73 '--t-from', metavar='TIMESTAMP', type=_parse_timestamp, default=None, 74 help="from timestamp") 75 query_timeseries_parser.add_argument( 76 '--t-to', metavar='TIMESTAMP', type=_parse_timestamp, default=None, 77 help="to timestamp") 78 query_timeseries_parser.add_argument( 79 '--source-t-from', metavar='TIMESTAMP', type=_parse_timestamp, 80 default=None, 81 help="from source timestamp") 82 query_timeseries_parser.add_argument( 83 '--source-t-to', metavar='TIMESTAMP', type=_parse_timestamp, 84 default=None, 85 help="to source timestamp") 86 query_timeseries_parser.add_argument( 87 '--order', type=_parse_order, choices=[i.name for i in common.Order], 88 default=common.Order.DESCENDING, 89 help="order (default 'DESCENDING')") 90 query_timeseries_parser.add_argument( 91 '--order-by', type=_parse_order_by, 92 choices=[i.name for i in common.OrderBy], 93 default=common.OrderBy.TIMESTAMP, 94 help="order (default 'TIMESTAMP')") 95 query_timeseries_parser.add_argument( 96 '--max-results', metavar='N', type=int, default=None, 97 help="maximum number of results") 98 query_timeseries_parser.add_argument( 99 '--last-event-id', metavar='SERVER_ID,SESSION_ID,INSTANCE_ID', 100 type=_parse_event_id, default=None, 101 help="last event id") 102 103 query_server_parser = query_subparsers.add_parser( 104 'server', description="query server events") 105 query_server_parser.add_argument( 106 '--persisted', action='store_true', 107 help="persisted events") 108 query_server_parser.add_argument( 109 '--max-results', metavar='N', type=int, default=None, 110 help="maximum number of results") 111 query_server_parser.add_argument( 112 '--last-event-id', metavar='SERVER_ID,SESSION_ID,INSTANCE_ID', 113 type=_parse_event_id, default=None, 114 help="last event id") 115 query_server_parser.add_argument( 116 'server_id', metavar='SERVER_ID', type=int, 117 help="server id") 118 119 subscribe_parser = subparsers.add_parser( 120 'subscribe', description="watch newly registered events") 121 subscribe_parser.add_argument( 122 '--server-id', type=int, default=None, 123 help="server id") 124 subscribe_parser.add_argument( 125 '--persisted', action='store_true', 126 help="persisted events") 127 subscribe_parser.add_argument( 128 'event_types', metavar='EVENT_TYPE', type=_parse_event_type, nargs='*', 129 help='query event type') 130 131 server_parser = subparsers.add_parser( 132 'server', description="run manager server with web ui") 133 server_parser.add_argument( 134 '--server-id', type=int, default=None, 135 help="server id") 136 server_parser.add_argument( 137 '--persisted', action='store_true', 138 help="persisted events") 139 server_parser.add_argument( 140 'event_types', metavar='EVENT_TYPE', type=_parse_event_type, nargs='*', 141 help='query event type') 142 143 return parser 144 145 146def main(): 147 parser = create_argument_parser() 148 args = parser.parse_args() 149 150 addr = tcp.Address(args.host, args.port) 151 152 if args.action == 'register': 153 register_event = common.RegisterEvent( 154 type=args.event_type, 155 source_timestamp=args.source_timestamp, 156 payload=_read_payload(payload_type=args.payload_type, 157 binary_type=args.binary_type, 158 path=args.payload_path)) 159 160 co = register(addr=addr, 161 client_name=args.client_name, 162 client_token=args.client_token, 163 register_event=register_event) 164 165 elif args.action == 'query': 166 if args.query_type == 'latest': 167 params = common.QueryLatestParams( 168 event_types=args.event_types) 169 170 elif args.query_type == 'timeseries': 171 params = common.QueryTimeseriesParams( 172 event_types=args.event_types, 173 t_from=args.t_from, 174 t_to=args.t_to, 175 source_t_from=args.source_t_from, 176 source_t_to=args.source_t_to, 177 order=args.order, 178 order_by=args.order_by, 179 max_results=args.max_results, 180 last_event_id=args.last_event_id) 181 182 elif args.query_type == 'server': 183 params = common.QueryTimeseriesParams( 184 server_id=args.server_id, 185 persisted=args.persisted, 186 max_results=args.max_results, 187 last_event_id=args.last_event_id) 188 189 else: 190 raise ValueError('unsupported query type') 191 192 co = query(addr=addr, 193 client_name=args.client_name, 194 client_token=args.client_token, 195 params=params) 196 197 elif args.action == 'subscribe': 198 subscriptions = args.event_types or [('*', )] 199 200 co = subscribe(addr=addr, 201 client_name=args.client_name, 202 client_token=args.client_token, 203 subscriptions=subscriptions, 204 server_id=args.server_id, 205 persisted=args.persisted) 206 207 elif args.action == 'server': 208 subscriptions = args.event_types or [('*', )] 209 210 co = server(addr=addr, 211 client_name=args.client_name, 212 client_token=args.client_token, 213 subscriptions=subscriptions, 214 server_id=args.server_id, 215 persisted=args.persisted) 216 217 else: 218 raise ValueError('unsupported action') 219 220 aio.init_asyncio() 221 with contextlib.suppress(asyncio.CancelledError): 222 return aio.run_asyncio(co) 223 224 225async def register(addr: tcp.Address, 226 client_name: str, 227 client_token: str | None, 228 register_event: common.RegisterEvent): 229 client = await eventer.connect(addr=addr, 230 client_name=client_name, 231 client_token=client_token) 232 233 try: 234 result = await client.register([register_event]) 235 236 if result is None: 237 return 1 238 239 finally: 240 await aio.uncancellable(client.async_close()) 241 242 243async def query(addr: tcp.Address, 244 client_name: str, 245 client_token: str | None, 246 params: common.QueryParams): 247 client = await eventer.connect(addr=addr, 248 client_name=client_name, 249 client_token=client_token) 250 251 try: 252 result = await client.query(params) 253 254 result_json = common.query_result_to_json(result) 255 print(json.encode(result_json)) 256 257 finally: 258 await aio.uncancellable(client.async_close()) 259 260 261async def subscribe(addr: tcp.Address, 262 client_name: str, 263 client_token: str | None, 264 subscriptions: Collection[common.EventType], 265 server_id: int | None, 266 persisted: bool): 267 268 def on_events(client, events): 269 events_json = [common.event_to_json(event) for event in events] 270 print(json.encode(events_json)) 271 272 client = await eventer.connect(addr=addr, 273 client_name=client_name, 274 client_token=client_token, 275 subscriptions=subscriptions, 276 server_id=server_id, 277 persisted=persisted, 278 events_cb=on_events) 279 280 try: 281 await client.wait_closing() 282 283 finally: 284 await aio.uncancellable(client.async_close()) 285 286 287async def server(addr: tcp.Address, 288 client_name: str, 289 client_token: str | None, 290 subscriptions: Collection[common.EventType], 291 server_id: int | None, 292 persisted: bool): 293 raise NotImplementedError() 294 295 296def _parse_timestamp(t): 297 if t == 'now': 298 return common.now() 299 300 return common.timestamp_from_float(float(t)) 301 302 303def _parse_event_type(event_type): 304 return tuple(event_type.split('/')) 305 306 307def _parse_event_id(event_id): 308 return common.EventId(event_id.split(',')) 309 310 311def _parse_order(order): 312 return common.Order[order] 313 314 315def _parse_order_by(order_by): 316 return common.OrderBy[order_by] 317 318 319def _read_payload(payload_type, binary_type, path): 320 if payload_type == 'none': 321 return 322 323 if path == Path('-'): 324 if payload_type == 'json' or payload_type is None: 325 json_format = json.Format.JSON 326 327 elif payload_type == 'yaml': 328 json_format = json.Format.YAML 329 330 elif payload_type == 'toml': 331 json_format = json.Format.TOML 332 333 elif payload_type == 'binary': 334 json_format = None 335 336 else: 337 raise ValueError('unsupported payload type') 338 339 if json_format is None or json_format == json.Format.TOML: 340 stdin, sys.stdin = sys.stdin.detach(), None 341 342 else: 343 stdin = sys.stdin 344 345 if json_format: 346 data = json.decode_stream(stdin, json_format) 347 return common.EventPayloadJson(data) 348 349 else: 350 data = stdin.read() 351 return common.EventPayloadBinary(type=binary_type, 352 data=data) 353 354 if payload_type is None: 355 try: 356 json_format = json.get_file_format(path) 357 358 except ValueError: 359 json_format = None 360 361 elif payload_type == 'json': 362 json_format = json.Format.JSON 363 364 elif payload_type == 'yaml': 365 json_format = json.Format.YAML 366 367 elif payload_type == 'toml': 368 json_format = json.Format.TOML 369 370 elif payload_type == 'binary': 371 json_format = None 372 373 else: 374 raise ValueError('unsupported payload type') 375 376 if json_format: 377 data = json.decode_file(path, json_format) 378 return common.EventPayloadJson(data) 379 380 data = path.read_bytes() 381 return common.EventPayloadBinary(type=binary_type, 382 data=data) 383 384 385if __name__ == '__main__': 386 sys.argv[0] = 'hat-event-manager' 387 sys.exit(main())
def
create_argument_parser() -> argparse.ArgumentParser:
17def create_argument_parser() -> argparse.ArgumentParser: 18 parser = argparse.ArgumentParser() 19 parser.add_argument( 20 '--host', type=str, default='127.0.0.1', 21 help="server host name (default '127.0.0.1')") 22 parser.add_argument( 23 '--port', type=int, default='23012', 24 help="server TCP port (default 23012)") 25 parser.add_argument( 26 '--client-name', metavar='NAME', type=str, default='manager', 27 help="client name (default 'manager')") 28 parser.add_argument( 29 '--client-token', metavar='TOKEN', type=str, default=None, 30 help="client token") 31 subparsers = parser.add_subparsers( 32 title='actions', dest='action', required=True) 33 34 register_parser = subparsers.add_parser( 35 'register', description="register new event") 36 register_parser.add_argument( 37 '--source-timestamp', metavar='TIMESTAMP', type=_parse_timestamp, 38 default=None, 39 help="source timestamp") 40 register_parser.add_argument( 41 '--payload-type', metavar='TYPE', 42 choices=['json', 'yaml', 'toml', 'binary', 'none'], 43 default=None, 44 help="payload type") 45 register_parser.add_argument( 46 '--binary-type', metavar='TYPE', type=str, default='', 47 help="binary payload type (default '')") 48 register_parser.add_argument( 49 '--payload-path', metavar='PATH', type=Path, default=Path('-'), 50 help="payload file path or '-' for stdin (default '-')") 51 register_parser.add_argument( 52 'event_type', metavar='EVENT_TYPE', type=_parse_event_type, 53 help="event type where segments are delimited by '/'") 54 55 query_parser = subparsers.add_parser( 56 'query', description="query events") 57 query_subparsers = query_parser.add_subparsers( 58 title='query types', dest='query_type', required=True) 59 60 query_latest_parser = query_subparsers.add_parser( 61 'latest', description="query latest events") 62 query_latest_parser.add_argument( 63 '--event-types', metavar='EVENT_TYPE', type=_parse_event_type, 64 default=None, nargs='*', 65 help='query event types') 66 67 query_timeseries_parser = query_subparsers.add_parser( 68 'timeseries', description="query timeseries events") 69 query_timeseries_parser.add_argument( 70 '--event-types', metavar='EVENT_TYPE', type=_parse_event_type, 71 default=None, nargs='*', 72 help='query event types') 73 query_timeseries_parser.add_argument( 74 '--t-from', metavar='TIMESTAMP', type=_parse_timestamp, default=None, 75 help="from timestamp") 76 query_timeseries_parser.add_argument( 77 '--t-to', metavar='TIMESTAMP', type=_parse_timestamp, default=None, 78 help="to timestamp") 79 query_timeseries_parser.add_argument( 80 '--source-t-from', metavar='TIMESTAMP', type=_parse_timestamp, 81 default=None, 82 help="from source timestamp") 83 query_timeseries_parser.add_argument( 84 '--source-t-to', metavar='TIMESTAMP', type=_parse_timestamp, 85 default=None, 86 help="to source timestamp") 87 query_timeseries_parser.add_argument( 88 '--order', type=_parse_order, choices=[i.name for i in common.Order], 89 default=common.Order.DESCENDING, 90 help="order (default 'DESCENDING')") 91 query_timeseries_parser.add_argument( 92 '--order-by', type=_parse_order_by, 93 choices=[i.name for i in common.OrderBy], 94 default=common.OrderBy.TIMESTAMP, 95 help="order (default 'TIMESTAMP')") 96 query_timeseries_parser.add_argument( 97 '--max-results', metavar='N', type=int, default=None, 98 help="maximum number of results") 99 query_timeseries_parser.add_argument( 100 '--last-event-id', metavar='SERVER_ID,SESSION_ID,INSTANCE_ID', 101 type=_parse_event_id, default=None, 102 help="last event id") 103 104 query_server_parser = query_subparsers.add_parser( 105 'server', description="query server events") 106 query_server_parser.add_argument( 107 '--persisted', action='store_true', 108 help="persisted events") 109 query_server_parser.add_argument( 110 '--max-results', metavar='N', type=int, default=None, 111 help="maximum number of results") 112 query_server_parser.add_argument( 113 '--last-event-id', metavar='SERVER_ID,SESSION_ID,INSTANCE_ID', 114 type=_parse_event_id, default=None, 115 help="last event id") 116 query_server_parser.add_argument( 117 'server_id', metavar='SERVER_ID', type=int, 118 help="server id") 119 120 subscribe_parser = subparsers.add_parser( 121 'subscribe', description="watch newly registered events") 122 subscribe_parser.add_argument( 123 '--server-id', type=int, default=None, 124 help="server id") 125 subscribe_parser.add_argument( 126 '--persisted', action='store_true', 127 help="persisted events") 128 subscribe_parser.add_argument( 129 'event_types', metavar='EVENT_TYPE', type=_parse_event_type, nargs='*', 130 help='query event type') 131 132 server_parser = subparsers.add_parser( 133 'server', description="run manager server with web ui") 134 server_parser.add_argument( 135 '--server-id', type=int, default=None, 136 help="server id") 137 server_parser.add_argument( 138 '--persisted', action='store_true', 139 help="persisted events") 140 server_parser.add_argument( 141 'event_types', metavar='EVENT_TYPE', type=_parse_event_type, nargs='*', 142 help='query event type') 143 144 return parser
def
main():
147def main(): 148 parser = create_argument_parser() 149 args = parser.parse_args() 150 151 addr = tcp.Address(args.host, args.port) 152 153 if args.action == 'register': 154 register_event = common.RegisterEvent( 155 type=args.event_type, 156 source_timestamp=args.source_timestamp, 157 payload=_read_payload(payload_type=args.payload_type, 158 binary_type=args.binary_type, 159 path=args.payload_path)) 160 161 co = register(addr=addr, 162 client_name=args.client_name, 163 client_token=args.client_token, 164 register_event=register_event) 165 166 elif args.action == 'query': 167 if args.query_type == 'latest': 168 params = common.QueryLatestParams( 169 event_types=args.event_types) 170 171 elif args.query_type == 'timeseries': 172 params = common.QueryTimeseriesParams( 173 event_types=args.event_types, 174 t_from=args.t_from, 175 t_to=args.t_to, 176 source_t_from=args.source_t_from, 177 source_t_to=args.source_t_to, 178 order=args.order, 179 order_by=args.order_by, 180 max_results=args.max_results, 181 last_event_id=args.last_event_id) 182 183 elif args.query_type == 'server': 184 params = common.QueryTimeseriesParams( 185 server_id=args.server_id, 186 persisted=args.persisted, 187 max_results=args.max_results, 188 last_event_id=args.last_event_id) 189 190 else: 191 raise ValueError('unsupported query type') 192 193 co = query(addr=addr, 194 client_name=args.client_name, 195 client_token=args.client_token, 196 params=params) 197 198 elif args.action == 'subscribe': 199 subscriptions = args.event_types or [('*', )] 200 201 co = subscribe(addr=addr, 202 client_name=args.client_name, 203 client_token=args.client_token, 204 subscriptions=subscriptions, 205 server_id=args.server_id, 206 persisted=args.persisted) 207 208 elif args.action == 'server': 209 subscriptions = args.event_types or [('*', )] 210 211 co = server(addr=addr, 212 client_name=args.client_name, 213 client_token=args.client_token, 214 subscriptions=subscriptions, 215 server_id=args.server_id, 216 persisted=args.persisted) 217 218 else: 219 raise ValueError('unsupported action') 220 221 aio.init_asyncio() 222 with contextlib.suppress(asyncio.CancelledError): 223 return aio.run_asyncio(co)
async def
register( addr: hat.drivers.tcp.Address, client_name: str, client_token: str | None, register_event: hat.event.common.RegisterEvent):
226async def register(addr: tcp.Address, 227 client_name: str, 228 client_token: str | None, 229 register_event: common.RegisterEvent): 230 client = await eventer.connect(addr=addr, 231 client_name=client_name, 232 client_token=client_token) 233 234 try: 235 result = await client.register([register_event]) 236 237 if result is None: 238 return 1 239 240 finally: 241 await aio.uncancellable(client.async_close())
async def
query( addr: hat.drivers.tcp.Address, client_name: str, client_token: str | None, params: hat.event.common.QueryLatestParams | hat.event.common.QueryTimeseriesParams | hat.event.common.QueryServerParams):
244async def query(addr: tcp.Address, 245 client_name: str, 246 client_token: str | None, 247 params: common.QueryParams): 248 client = await eventer.connect(addr=addr, 249 client_name=client_name, 250 client_token=client_token) 251 252 try: 253 result = await client.query(params) 254 255 result_json = common.query_result_to_json(result) 256 print(json.encode(result_json)) 257 258 finally: 259 await aio.uncancellable(client.async_close())
async def
subscribe( addr: hat.drivers.tcp.Address, client_name: str, client_token: str | None, subscriptions: Collection[tuple[str, ...]], server_id: int | None, persisted: bool):
262async def subscribe(addr: tcp.Address, 263 client_name: str, 264 client_token: str | None, 265 subscriptions: Collection[common.EventType], 266 server_id: int | None, 267 persisted: bool): 268 269 def on_events(client, events): 270 events_json = [common.event_to_json(event) for event in events] 271 print(json.encode(events_json)) 272 273 client = await eventer.connect(addr=addr, 274 client_name=client_name, 275 client_token=client_token, 276 subscriptions=subscriptions, 277 server_id=server_id, 278 persisted=persisted, 279 events_cb=on_events) 280 281 try: 282 await client.wait_closing() 283 284 finally: 285 await aio.uncancellable(client.async_close())
async def
server( addr: hat.drivers.tcp.Address, client_name: str, client_token: str | None, subscriptions: Collection[tuple[str, ...]], server_id: int | None, persisted: bool):