hat.event.manager.main

  1from collections.abc import Collection
  2from pathlib import Path
  3import argparse
  4import asyncio
  5import contextlib
  6import sys
  7
  8from hat import aio
  9from hat import json
 10from hat.drivers import tcp
 11
 12from hat.event import eventer
 13from hat.event.manager import common
 14
 15
 16def create_argument_parser() -> argparse.ArgumentParser:
 17    parser = argparse.ArgumentParser()
 18    parser.add_argument(
 19        '--host', type=str, default='127.0.0.1',
 20        help="server host name (default '127.0.0.1')")
 21    parser.add_argument(
 22        '--port', type=int, default='23012',
 23        help="server TCP port (default 23012)")
 24    parser.add_argument(
 25        '--client-name', metavar='NAME', type=str, default='manager',
 26        help="client name (default 'manager')")
 27    parser.add_argument(
 28        '--client-token', metavar='TOKEN', type=str, default=None,
 29        help="client token")
 30    subparsers = parser.add_subparsers(
 31        title='actions', dest='action', required=True)
 32
 33    register_parser = subparsers.add_parser(
 34        'register', description="register new event")
 35    register_parser.add_argument(
 36        '--source-timestamp', metavar='TIMESTAMP', type=_parse_timestamp,
 37        default=None,
 38        help="source timestamp")
 39    register_parser.add_argument(
 40        '--payload-type', metavar='TYPE',
 41        choices=['json', 'yaml', 'toml', 'binary', 'none'],
 42        default=None,
 43        help="payload type")
 44    register_parser.add_argument(
 45        '--binary-type', metavar='TYPE', type=str, default='',
 46        help="binary payload type (default '')")
 47    register_parser.add_argument(
 48        '--payload-path', metavar='PATH', type=Path, default=Path('-'),
 49        help="payload file path or '-' for stdin (default '-')")
 50    register_parser.add_argument(
 51        'event_type', metavar='EVENT_TYPE', type=_parse_event_type,
 52        help="event type where segments are delimited by '/'")
 53
 54    query_parser = subparsers.add_parser(
 55        'query', description="query events")
 56    query_subparsers = query_parser.add_subparsers(
 57        title='query types', dest='query_type', required=True)
 58
 59    query_latest_parser = query_subparsers.add_parser(
 60        'latest', description="query latest events")
 61    query_latest_parser.add_argument(
 62        '--event-types', metavar='EVENT_TYPE', type=_parse_event_type,
 63        default=None, nargs='*',
 64        help='query event types')
 65
 66    query_timeseries_parser = query_subparsers.add_parser(
 67        'timeseries', description="query timeseries events")
 68    query_timeseries_parser.add_argument(
 69        '--event-types', metavar='EVENT_TYPE', type=_parse_event_type,
 70        default=None, nargs='*',
 71        help='query event types')
 72    query_timeseries_parser.add_argument(
 73        '--t-from', metavar='TIMESTAMP', type=_parse_timestamp, default=None,
 74        help="from timestamp")
 75    query_timeseries_parser.add_argument(
 76        '--t-to', metavar='TIMESTAMP', type=_parse_timestamp, default=None,
 77        help="to timestamp")
 78    query_timeseries_parser.add_argument(
 79        '--source-t-from', metavar='TIMESTAMP', type=_parse_timestamp,
 80        default=None,
 81        help="from source timestamp")
 82    query_timeseries_parser.add_argument(
 83        '--source-t-to', metavar='TIMESTAMP', type=_parse_timestamp,
 84        default=None,
 85        help="to source timestamp")
 86    query_timeseries_parser.add_argument(
 87        '--order', type=_parse_order, choices=[i.name for i in common.Order],
 88        default=common.Order.DESCENDING,
 89        help="order (default 'DESCENDING')")
 90    query_timeseries_parser.add_argument(
 91        '--order-by', type=_parse_order_by,
 92        choices=[i.name for i in common.OrderBy],
 93        default=common.OrderBy.TIMESTAMP,
 94        help="order (default 'TIMESTAMP')")
 95    query_timeseries_parser.add_argument(
 96        '--max-results', metavar='N', type=int, default=None,
 97        help="maximum number of results")
 98    query_timeseries_parser.add_argument(
 99        '--last-event-id', metavar='SERVER_ID,SESSION_ID,INSTANCE_ID',
100        type=_parse_event_id, default=None,
101        help="last event id")
102
103    query_server_parser = query_subparsers.add_parser(
104        'server', description="query server events")
105    query_server_parser.add_argument(
106        '--persisted', action='store_true',
107        help="persisted events")
108    query_server_parser.add_argument(
109        '--max-results', metavar='N', type=int, default=None,
110        help="maximum number of results")
111    query_server_parser.add_argument(
112        '--last-event-id', metavar='SERVER_ID,SESSION_ID,INSTANCE_ID',
113        type=_parse_event_id, default=None,
114        help="last event id")
115    query_server_parser.add_argument(
116        'server_id', metavar='SERVER_ID', type=int,
117        help="server id")
118
119    subscribe_parser = subparsers.add_parser(
120        'subscribe', description="watch newly registered events")
121    subscribe_parser.add_argument(
122        '--server-id', type=int, default=None,
123        help="server id")
124    subscribe_parser.add_argument(
125        '--persisted', action='store_true',
126        help="persisted events")
127    subscribe_parser.add_argument(
128        'event_types', metavar='EVENT_TYPE', type=_parse_event_type, nargs='*',
129        help='query event type')
130
131    server_parser = subparsers.add_parser(
132        'server', description="run manager server with web ui")
133    server_parser.add_argument(
134        '--server-id', type=int, default=None,
135        help="server id")
136    server_parser.add_argument(
137        '--persisted', action='store_true',
138        help="persisted events")
139    server_parser.add_argument(
140        'event_types', metavar='EVENT_TYPE', type=_parse_event_type, nargs='*',
141        help='query event type')
142
143    return parser
144
145
146def main():
147    parser = create_argument_parser()
148    args = parser.parse_args()
149
150    addr = tcp.Address(args.host, args.port)
151
152    if args.action == 'register':
153        register_event = common.RegisterEvent(
154            type=args.event_type,
155            source_timestamp=args.source_timestamp,
156            payload=_read_payload(payload_type=args.payload_type,
157                                  binary_type=args.binary_type,
158                                  path=args.payload_path))
159
160        co = register(addr=addr,
161                      client_name=args.client_name,
162                      client_token=args.client_token,
163                      register_event=register_event)
164
165    elif args.action == 'query':
166        if args.query_type == 'latest':
167            params = common.QueryLatestParams(
168                event_types=args.event_types)
169
170        elif args.query_type == 'timeseries':
171            params = common.QueryTimeseriesParams(
172                event_types=args.event_types,
173                t_from=args.t_from,
174                t_to=args.t_to,
175                source_t_from=args.source_t_from,
176                source_t_to=args.source_t_to,
177                order=args.order,
178                order_by=args.order_by,
179                max_results=args.max_results,
180                last_event_id=args.last_event_id)
181
182        elif args.query_type == 'server':
183            params = common.QueryTimeseriesParams(
184                server_id=args.server_id,
185                persisted=args.persisted,
186                max_results=args.max_results,
187                last_event_id=args.last_event_id)
188
189        else:
190            raise ValueError('unsupported query type')
191
192        co = query(addr=addr,
193                   client_name=args.client_name,
194                   client_token=args.client_token,
195                   params=params)
196
197    elif args.action == 'subscribe':
198        subscriptions = args.event_types or [('*', )]
199
200        co = subscribe(addr=addr,
201                       client_name=args.client_name,
202                       client_token=args.client_token,
203                       subscriptions=subscriptions,
204                       server_id=args.server_id,
205                       persisted=args.persisted)
206
207    elif args.action == 'server':
208        subscriptions = args.event_types or [('*', )]
209
210        co = server(addr=addr,
211                    client_name=args.client_name,
212                    client_token=args.client_token,
213                    subscriptions=subscriptions,
214                    server_id=args.server_id,
215                    persisted=args.persisted)
216
217    else:
218        raise ValueError('unsupported action')
219
220    aio.init_asyncio()
221    with contextlib.suppress(asyncio.CancelledError):
222        return aio.run_asyncio(co)
223
224
225async def register(addr: tcp.Address,
226                   client_name: str,
227                   client_token: str | None,
228                   register_event: common.RegisterEvent):
229    client = await eventer.connect(addr=addr,
230                                   client_name=client_name,
231                                   client_token=client_token)
232
233    try:
234        result = await client.register([register_event])
235
236        if result is None:
237            return 1
238
239    finally:
240        await aio.uncancellable(client.async_close())
241
242
243async def query(addr: tcp.Address,
244                client_name: str,
245                client_token: str | None,
246                params: common.QueryParams):
247    client = await eventer.connect(addr=addr,
248                                   client_name=client_name,
249                                   client_token=client_token)
250
251    try:
252        result = await client.query(params)
253
254        result_json = common.query_result_to_json(result)
255        print(json.encode(result_json))
256
257    finally:
258        await aio.uncancellable(client.async_close())
259
260
261async def subscribe(addr: tcp.Address,
262                    client_name: str,
263                    client_token: str | None,
264                    subscriptions: Collection[common.EventType],
265                    server_id: int | None,
266                    persisted: bool):
267
268    def on_events(client, events):
269        events_json = [common.event_to_json(event) for event in events]
270        print(json.encode(events_json))
271
272    client = await eventer.connect(addr=addr,
273                                   client_name=client_name,
274                                   client_token=client_token,
275                                   subscriptions=subscriptions,
276                                   server_id=server_id,
277                                   persisted=persisted,
278                                   events_cb=on_events)
279
280    try:
281        await client.wait_closing()
282
283    finally:
284        await aio.uncancellable(client.async_close())
285
286
287async def server(addr: tcp.Address,
288                 client_name: str,
289                 client_token: str | None,
290                 subscriptions: Collection[common.EventType],
291                 server_id: int | None,
292                 persisted: bool):
293    raise NotImplementedError()
294
295
296def _parse_timestamp(t):
297    if t == 'now':
298        return common.now()
299
300    return common.timestamp_from_float(float(t))
301
302
303def _parse_event_type(event_type):
304    return tuple(event_type.split('/'))
305
306
307def _parse_event_id(event_id):
308    return common.EventId(event_id.split(','))
309
310
311def _parse_order(order):
312    return common.Order[order]
313
314
315def _parse_order_by(order_by):
316    return common.OrderBy[order_by]
317
318
319def _read_payload(payload_type, binary_type, path):
320    if payload_type == 'none':
321        return
322
323    if path == Path('-'):
324        if payload_type == 'json' or payload_type is None:
325            json_format = json.Format.JSON
326
327        elif payload_type == 'yaml':
328            json_format = json.Format.YAML
329
330        elif payload_type == 'toml':
331            json_format = json.Format.TOML
332
333        elif payload_type == 'binary':
334            json_format = None
335
336        else:
337            raise ValueError('unsupported payload type')
338
339        if json_format is None or json_format == json.Format.TOML:
340            stdin, sys.stdin = sys.stdin.detach(), None
341
342        else:
343            stdin = sys.stdin
344
345        if json_format:
346            data = json.decode_stream(stdin, json_format)
347            return common.EventPayloadJson(data)
348
349        else:
350            data = stdin.read()
351            return common.EventPayloadBinary(type=binary_type,
352                                             data=data)
353
354    if payload_type is None:
355        try:
356            json_format = json.get_file_format(path)
357
358        except ValueError:
359            json_format = None
360
361    elif payload_type == 'json':
362        json_format = json.Format.JSON
363
364    elif payload_type == 'yaml':
365        json_format = json.Format.YAML
366
367    elif payload_type == 'toml':
368        json_format = json.Format.TOML
369
370    elif payload_type == 'binary':
371        json_format = None
372
373    else:
374        raise ValueError('unsupported payload type')
375
376    if json_format:
377        data = json.decode_file(path, json_format)
378        return common.EventPayloadJson(data)
379
380    data = path.read_bytes()
381    return common.EventPayloadBinary(type=binary_type,
382                                     data=data)
383
384
385if __name__ == '__main__':
386    sys.argv[0] = 'hat-event-manager'
387    sys.exit(main())
def create_argument_parser() -> argparse.ArgumentParser:
 17def create_argument_parser() -> argparse.ArgumentParser:
 18    parser = argparse.ArgumentParser()
 19    parser.add_argument(
 20        '--host', type=str, default='127.0.0.1',
 21        help="server host name (default '127.0.0.1')")
 22    parser.add_argument(
 23        '--port', type=int, default='23012',
 24        help="server TCP port (default 23012)")
 25    parser.add_argument(
 26        '--client-name', metavar='NAME', type=str, default='manager',
 27        help="client name (default 'manager')")
 28    parser.add_argument(
 29        '--client-token', metavar='TOKEN', type=str, default=None,
 30        help="client token")
 31    subparsers = parser.add_subparsers(
 32        title='actions', dest='action', required=True)
 33
 34    register_parser = subparsers.add_parser(
 35        'register', description="register new event")
 36    register_parser.add_argument(
 37        '--source-timestamp', metavar='TIMESTAMP', type=_parse_timestamp,
 38        default=None,
 39        help="source timestamp")
 40    register_parser.add_argument(
 41        '--payload-type', metavar='TYPE',
 42        choices=['json', 'yaml', 'toml', 'binary', 'none'],
 43        default=None,
 44        help="payload type")
 45    register_parser.add_argument(
 46        '--binary-type', metavar='TYPE', type=str, default='',
 47        help="binary payload type (default '')")
 48    register_parser.add_argument(
 49        '--payload-path', metavar='PATH', type=Path, default=Path('-'),
 50        help="payload file path or '-' for stdin (default '-')")
 51    register_parser.add_argument(
 52        'event_type', metavar='EVENT_TYPE', type=_parse_event_type,
 53        help="event type where segments are delimited by '/'")
 54
 55    query_parser = subparsers.add_parser(
 56        'query', description="query events")
 57    query_subparsers = query_parser.add_subparsers(
 58        title='query types', dest='query_type', required=True)
 59
 60    query_latest_parser = query_subparsers.add_parser(
 61        'latest', description="query latest events")
 62    query_latest_parser.add_argument(
 63        '--event-types', metavar='EVENT_TYPE', type=_parse_event_type,
 64        default=None, nargs='*',
 65        help='query event types')
 66
 67    query_timeseries_parser = query_subparsers.add_parser(
 68        'timeseries', description="query timeseries events")
 69    query_timeseries_parser.add_argument(
 70        '--event-types', metavar='EVENT_TYPE', type=_parse_event_type,
 71        default=None, nargs='*',
 72        help='query event types')
 73    query_timeseries_parser.add_argument(
 74        '--t-from', metavar='TIMESTAMP', type=_parse_timestamp, default=None,
 75        help="from timestamp")
 76    query_timeseries_parser.add_argument(
 77        '--t-to', metavar='TIMESTAMP', type=_parse_timestamp, default=None,
 78        help="to timestamp")
 79    query_timeseries_parser.add_argument(
 80        '--source-t-from', metavar='TIMESTAMP', type=_parse_timestamp,
 81        default=None,
 82        help="from source timestamp")
 83    query_timeseries_parser.add_argument(
 84        '--source-t-to', metavar='TIMESTAMP', type=_parse_timestamp,
 85        default=None,
 86        help="to source timestamp")
 87    query_timeseries_parser.add_argument(
 88        '--order', type=_parse_order, choices=[i.name for i in common.Order],
 89        default=common.Order.DESCENDING,
 90        help="order (default 'DESCENDING')")
 91    query_timeseries_parser.add_argument(
 92        '--order-by', type=_parse_order_by,
 93        choices=[i.name for i in common.OrderBy],
 94        default=common.OrderBy.TIMESTAMP,
 95        help="order (default 'TIMESTAMP')")
 96    query_timeseries_parser.add_argument(
 97        '--max-results', metavar='N', type=int, default=None,
 98        help="maximum number of results")
 99    query_timeseries_parser.add_argument(
100        '--last-event-id', metavar='SERVER_ID,SESSION_ID,INSTANCE_ID',
101        type=_parse_event_id, default=None,
102        help="last event id")
103
104    query_server_parser = query_subparsers.add_parser(
105        'server', description="query server events")
106    query_server_parser.add_argument(
107        '--persisted', action='store_true',
108        help="persisted events")
109    query_server_parser.add_argument(
110        '--max-results', metavar='N', type=int, default=None,
111        help="maximum number of results")
112    query_server_parser.add_argument(
113        '--last-event-id', metavar='SERVER_ID,SESSION_ID,INSTANCE_ID',
114        type=_parse_event_id, default=None,
115        help="last event id")
116    query_server_parser.add_argument(
117        'server_id', metavar='SERVER_ID', type=int,
118        help="server id")
119
120    subscribe_parser = subparsers.add_parser(
121        'subscribe', description="watch newly registered events")
122    subscribe_parser.add_argument(
123        '--server-id', type=int, default=None,
124        help="server id")
125    subscribe_parser.add_argument(
126        '--persisted', action='store_true',
127        help="persisted events")
128    subscribe_parser.add_argument(
129        'event_types', metavar='EVENT_TYPE', type=_parse_event_type, nargs='*',
130        help='query event type')
131
132    server_parser = subparsers.add_parser(
133        'server', description="run manager server with web ui")
134    server_parser.add_argument(
135        '--server-id', type=int, default=None,
136        help="server id")
137    server_parser.add_argument(
138        '--persisted', action='store_true',
139        help="persisted events")
140    server_parser.add_argument(
141        'event_types', metavar='EVENT_TYPE', type=_parse_event_type, nargs='*',
142        help='query event type')
143
144    return parser
def main():
147def main():
148    parser = create_argument_parser()
149    args = parser.parse_args()
150
151    addr = tcp.Address(args.host, args.port)
152
153    if args.action == 'register':
154        register_event = common.RegisterEvent(
155            type=args.event_type,
156            source_timestamp=args.source_timestamp,
157            payload=_read_payload(payload_type=args.payload_type,
158                                  binary_type=args.binary_type,
159                                  path=args.payload_path))
160
161        co = register(addr=addr,
162                      client_name=args.client_name,
163                      client_token=args.client_token,
164                      register_event=register_event)
165
166    elif args.action == 'query':
167        if args.query_type == 'latest':
168            params = common.QueryLatestParams(
169                event_types=args.event_types)
170
171        elif args.query_type == 'timeseries':
172            params = common.QueryTimeseriesParams(
173                event_types=args.event_types,
174                t_from=args.t_from,
175                t_to=args.t_to,
176                source_t_from=args.source_t_from,
177                source_t_to=args.source_t_to,
178                order=args.order,
179                order_by=args.order_by,
180                max_results=args.max_results,
181                last_event_id=args.last_event_id)
182
183        elif args.query_type == 'server':
184            params = common.QueryTimeseriesParams(
185                server_id=args.server_id,
186                persisted=args.persisted,
187                max_results=args.max_results,
188                last_event_id=args.last_event_id)
189
190        else:
191            raise ValueError('unsupported query type')
192
193        co = query(addr=addr,
194                   client_name=args.client_name,
195                   client_token=args.client_token,
196                   params=params)
197
198    elif args.action == 'subscribe':
199        subscriptions = args.event_types or [('*', )]
200
201        co = subscribe(addr=addr,
202                       client_name=args.client_name,
203                       client_token=args.client_token,
204                       subscriptions=subscriptions,
205                       server_id=args.server_id,
206                       persisted=args.persisted)
207
208    elif args.action == 'server':
209        subscriptions = args.event_types or [('*', )]
210
211        co = server(addr=addr,
212                    client_name=args.client_name,
213                    client_token=args.client_token,
214                    subscriptions=subscriptions,
215                    server_id=args.server_id,
216                    persisted=args.persisted)
217
218    else:
219        raise ValueError('unsupported action')
220
221    aio.init_asyncio()
222    with contextlib.suppress(asyncio.CancelledError):
223        return aio.run_asyncio(co)
async def register( addr: hat.drivers.tcp.Address, client_name: str, client_token: str | None, register_event: hat.event.common.RegisterEvent):
226async def register(addr: tcp.Address,
227                   client_name: str,
228                   client_token: str | None,
229                   register_event: common.RegisterEvent):
230    client = await eventer.connect(addr=addr,
231                                   client_name=client_name,
232                                   client_token=client_token)
233
234    try:
235        result = await client.register([register_event])
236
237        if result is None:
238            return 1
239
240    finally:
241        await aio.uncancellable(client.async_close())
async def query( addr: hat.drivers.tcp.Address, client_name: str, client_token: str | None, params: hat.event.common.QueryLatestParams | hat.event.common.QueryTimeseriesParams | hat.event.common.QueryServerParams):
244async def query(addr: tcp.Address,
245                client_name: str,
246                client_token: str | None,
247                params: common.QueryParams):
248    client = await eventer.connect(addr=addr,
249                                   client_name=client_name,
250                                   client_token=client_token)
251
252    try:
253        result = await client.query(params)
254
255        result_json = common.query_result_to_json(result)
256        print(json.encode(result_json))
257
258    finally:
259        await aio.uncancellable(client.async_close())
async def subscribe( addr: hat.drivers.tcp.Address, client_name: str, client_token: str | None, subscriptions: Collection[tuple[str, ...]], server_id: int | None, persisted: bool):
262async def subscribe(addr: tcp.Address,
263                    client_name: str,
264                    client_token: str | None,
265                    subscriptions: Collection[common.EventType],
266                    server_id: int | None,
267                    persisted: bool):
268
269    def on_events(client, events):
270        events_json = [common.event_to_json(event) for event in events]
271        print(json.encode(events_json))
272
273    client = await eventer.connect(addr=addr,
274                                   client_name=client_name,
275                                   client_token=client_token,
276                                   subscriptions=subscriptions,
277                                   server_id=server_id,
278                                   persisted=persisted,
279                                   events_cb=on_events)
280
281    try:
282        await client.wait_closing()
283
284    finally:
285        await aio.uncancellable(client.async_close())
async def server( addr: hat.drivers.tcp.Address, client_name: str, client_token: str | None, subscriptions: Collection[tuple[str, ...]], server_id: int | None, persisted: bool):
288async def server(addr: tcp.Address,
289                 client_name: str,
290                 client_token: str | None,
291                 subscriptions: Collection[common.EventType],
292                 server_id: int | None,
293                 persisted: bool):
294    raise NotImplementedError()