hat.event.backends.lmdb.manager.query
1from pathlib import Path 2import argparse 3 4from hat import json 5 6from hat.event.backends.lmdb.manager import common 7 8 9def create_argument_parser(subparsers) -> argparse.ArgumentParser: 10 parser = subparsers.add_parser('query') 11 parser.add_argument('--db', metavar='PATH', type=Path, required=True) 12 subsubparsers = parser.add_subparsers(dest='subaction', required=True) 13 14 subsubparsers.add_parser('settings') 15 16 last_event_id_parser = subsubparsers.add_parser('last_event_id') 17 last_event_id_parser.add_argument('--server-id', type=int, default=None) 18 19 last_timestamps_parser = subsubparsers.add_parser('last_timestamp') 20 last_timestamps_parser.add_argument('--server-id', type=int, default=None) 21 22 ref_parser = subsubparsers.add_parser('ref') 23 ref_parser.add_argument('--server-id', type=int, default=None) 24 25 latest_parser = subsubparsers.add_parser('latest') 26 latest_parser.add_argument('--event-types', type=str, default=None, 27 nargs='*') 28 29 partition_parser = subsubparsers.add_parser('partition') 30 partition_parser.add_argument('--partition-id', type=int, default=None) 31 32 timeseries_parser = subsubparsers.add_parser('timeseries') 33 timeseries_parser.add_argument('--partition-id', type=int, default=None) 34 timeseries_parser.add_argument('--server-id', type=int, default=None) 35 timeseries_parser.add_argument('--event-types', type=str, default=None, 36 nargs='*') 37 38 return parser 39 40 41def query(args): 42 with common.ext_create_env(args.db, readonly=True) as env: 43 dbs = {db_type: common.ext_open_db(env, db_type, False) 44 for db_type in common.DbType} 45 46 with env.begin(buffers=True) as txn: 47 if args.subaction == 'settings': 48 for result in _query_settings(dbs, txn): 49 _print_result(result) 50 51 elif args.subaction == 'last_event_id': 52 for result in _query_last_event_id(dbs, txn, args.server_id): 53 _print_result(result) 54 55 elif args.subaction == 'last_timestamp': 56 for result in _query_last_timestamp(dbs, txn, args.server_id): 57 _print_result(result) 58 59 elif args.subaction == 'ref': 60 for result in _query_ref(dbs, txn): 61 _print_result(result) 62 63 elif args.subaction == 'latest': 64 event_types = ([tuple(i.split('/')) for i in args.event_types] 65 if args.event_types is not None else None) 66 for result in _query_ref(dbs, txn, event_types): 67 _print_result(result) 68 69 elif args.subaction == 'partition': 70 for result in _query_partition(dbs, txn, args.partition_id): 71 _print_result(result) 72 73 elif args.subaction == 'timeseries': 74 event_types = ([tuple(i.split('/')) for i in args.event_types] 75 if args.event_types is not None else None) 76 for result in _query_timeseries(dbs, txn, args.partition_id, 77 args.server_id, event_types): 78 _print_result(result) 79 80 else: 81 raise ValueError('unsupported subaction') 82 83 84def _print_result(result): 85 print(json.encode(result)) 86 87 88def _query_settings(dbs, txn): 89 db_type = common.DbType.SYSTEM_SETTINGS 90 db = dbs[db_type] 91 db_def = common.db_defs[db_type] 92 93 for key, value in txn.cursor(db=db): 94 settings_id = db_def.decode_key(key) 95 data = db_def.decode_value(value) 96 97 yield {'settings_id': settings_id.name, 98 'data': data} 99 100 101def _query_last_event_id(dbs, txn, server_id): 102 db_type = common.DbType.SYSTEM_LAST_EVENT_ID 103 db = dbs[db_type] 104 db_def = common.db_defs[db_type] 105 106 for key, value in txn.cursor(db=db): 107 decoded_server_id = db_def.decode_key(key) 108 if server_id is not None and server_id != decoded_server_id: 109 continue 110 111 event_id = db_def.decode_value(value) 112 yield common.event_id_to_json(event_id) 113 114 115def _query_last_timestamp(dbs, txn, server_id): 116 db_type = common.DbType.SYSTEM_LAST_TIMESTAMP 117 db = dbs[db_type] 118 db_def = common.db_defs[db_type] 119 120 for key, value in txn.cursor(db=db): 121 decoded_server_id = db_def.decode_key(key) 122 if server_id is not None and server_id != decoded_server_id: 123 continue 124 125 timestamp = db_def.decode_value(value) 126 yield common.timestamp_to_json(timestamp) 127 128 129def _query_ref(dbs, txn, server_id): 130 ref_db_type = common.DbType.REF 131 ref_db = dbs[ref_db_type] 132 ref_db_def = common.db_defs[ref_db_type] 133 134 latest_type_db_type = common.DbType.LATEST_TYPE 135 latest_type_db = dbs[latest_type_db_type] 136 latest_type_db_def = common.db_defs[latest_type_db_type] 137 138 for key, value in txn.cursor(db=ref_db): 139 event_id = ref_db_def.decode_key(key) 140 if server_id is not None and event_id.server != server_id: 141 continue 142 143 event_refs = ref_db_def.decode_value(value) 144 145 for event_ref in event_refs: 146 if isinstance(event_ref, common.LatestEventRef): 147 latest_type_key = latest_type_db_def.encoded_key(event_ref.key) 148 latest_type_value = txn.get(latest_type_key, db=latest_type_db) 149 150 event_type = latest_type_db_def.decode_value(latest_type_value) 151 yield {'event_id': common.event_id_to_json(event_id), 152 'ref_type': 'latest', 153 'event_type': list(event_type)} 154 155 if isinstance(event_ref, common.TimeseriesEventRef): 156 partition_id, timestamp, _ = event_ref.key 157 yield {'event_id': common.event_id_to_json(event_id), 158 'ref_type': 'timeseries', 159 'partition_id': partition_id, 160 'timestamp': common.timestamp_to_json(timestamp)} 161 162 163def _query_latest(dbs, txn, event_types): 164 db_type = common.DbType.LATEST_DATA 165 db = dbs[db_type] 166 db_def = common.db_defs[db_type] 167 168 subscription = common.create_subscription([('*', )] if event_types is None 169 else event_types) 170 171 for _, value in txn.cursor(db=db): 172 event = db_def.decode_value(value) 173 if not subscription.matches(event.type): 174 continue 175 176 yield common.event_to_json(value) 177 178 179def _query_partition(dbs, txn, partition_id): 180 db_type = common.DbType.TIMESERIES_PARTITION 181 db = dbs[db_type] 182 db_def = common.db_defs[db_type] 183 184 for key, value in txn.cursor(db=db): 185 decoded_partition_id = db_def.decode_key(key) 186 if partition_id is not None and partition_id != decoded_partition_id: 187 continue 188 189 data = db_def.decode_value(value) 190 yield {'partition_id': decoded_partition_id, 191 'data': data} 192 193 194def _query_timeseries(dbs, txn, partition_id, server_id, event_types): 195 db_type = common.DbType.TIMESERIES_DATA 196 db = dbs[db_type] 197 db_def = common.db_defs[db_type] 198 199 subscription = common.create_subscription([('*', )] if event_types is None 200 else event_types) 201 202 for key, value in txn.cursor(db=db): 203 decoded_partition_id, _, event_id = db_def.decode(key) 204 205 if partition_id is not None and partition_id != decoded_partition_id: 206 continue 207 208 if server_id is not None and server_id != event_id.server: 209 continue 210 211 event = db_def.decode_value(value) 212 if not subscription.matches(event.type): 213 continue 214 215 yield common.event_to_json(value)
def
create_argument_parser(subparsers) -> argparse.ArgumentParser:
10def create_argument_parser(subparsers) -> argparse.ArgumentParser: 11 parser = subparsers.add_parser('query') 12 parser.add_argument('--db', metavar='PATH', type=Path, required=True) 13 subsubparsers = parser.add_subparsers(dest='subaction', required=True) 14 15 subsubparsers.add_parser('settings') 16 17 last_event_id_parser = subsubparsers.add_parser('last_event_id') 18 last_event_id_parser.add_argument('--server-id', type=int, default=None) 19 20 last_timestamps_parser = subsubparsers.add_parser('last_timestamp') 21 last_timestamps_parser.add_argument('--server-id', type=int, default=None) 22 23 ref_parser = subsubparsers.add_parser('ref') 24 ref_parser.add_argument('--server-id', type=int, default=None) 25 26 latest_parser = subsubparsers.add_parser('latest') 27 latest_parser.add_argument('--event-types', type=str, default=None, 28 nargs='*') 29 30 partition_parser = subsubparsers.add_parser('partition') 31 partition_parser.add_argument('--partition-id', type=int, default=None) 32 33 timeseries_parser = subsubparsers.add_parser('timeseries') 34 timeseries_parser.add_argument('--partition-id', type=int, default=None) 35 timeseries_parser.add_argument('--server-id', type=int, default=None) 36 timeseries_parser.add_argument('--event-types', type=str, default=None, 37 nargs='*') 38 39 return parser
def
query(args):
42def query(args): 43 with common.ext_create_env(args.db, readonly=True) as env: 44 dbs = {db_type: common.ext_open_db(env, db_type, False) 45 for db_type in common.DbType} 46 47 with env.begin(buffers=True) as txn: 48 if args.subaction == 'settings': 49 for result in _query_settings(dbs, txn): 50 _print_result(result) 51 52 elif args.subaction == 'last_event_id': 53 for result in _query_last_event_id(dbs, txn, args.server_id): 54 _print_result(result) 55 56 elif args.subaction == 'last_timestamp': 57 for result in _query_last_timestamp(dbs, txn, args.server_id): 58 _print_result(result) 59 60 elif args.subaction == 'ref': 61 for result in _query_ref(dbs, txn): 62 _print_result(result) 63 64 elif args.subaction == 'latest': 65 event_types = ([tuple(i.split('/')) for i in args.event_types] 66 if args.event_types is not None else None) 67 for result in _query_ref(dbs, txn, event_types): 68 _print_result(result) 69 70 elif args.subaction == 'partition': 71 for result in _query_partition(dbs, txn, args.partition_id): 72 _print_result(result) 73 74 elif args.subaction == 'timeseries': 75 event_types = ([tuple(i.split('/')) for i in args.event_types] 76 if args.event_types is not None else None) 77 for result in _query_timeseries(dbs, txn, args.partition_id, 78 args.server_id, event_types): 79 _print_result(result) 80 81 else: 82 raise ValueError('unsupported subaction')