hat.event.backends.lmdb.manager.copy
1from pathlib import Path 2import argparse 3 4from hat.event.backends.lmdb.manager import common 5 6 7def create_argument_parser(subparsers) -> argparse.ArgumentParser: 8 parser = subparsers.add_parser('copy') 9 parser.add_argument('--skip-latest', action='store_true') 10 parser.add_argument('--skip-timeseries', action='store_true') 11 parser.add_argument('src_path', type=Path) 12 parser.add_argument('dst_path', type=Path) 13 return parser 14 15 16def copy(args): 17 if args.dst_path.exists(): 18 raise Exception('destination db already exists') 19 20 with common.ext_create_env(args.src_path, readonly=True) as src_env: 21 with common.ext_create_env(args.dst_path) as dst_env: 22 src_dbs = {db_type: common.ext_open_db(src_env, db_type, False) 23 for db_type in common.DbType} 24 dst_dbs = {db_type: common.ext_open_db(dst_env, db_type, True) 25 for db_type in common.DbType} 26 27 with src_env.begin(buffers=True) as src_txn: 28 with dst_env.begin(write=True) as dst_txn: 29 _copy_system(src_dbs, src_txn, dst_dbs, dst_txn) 30 _copy_ref(src_dbs, src_txn, dst_dbs, dst_txn, 31 args.skip_latest, args.skip_timeseries) 32 33 if not args.skip_latest: 34 _copy_latest(src_dbs, src_txn, dst_dbs, dst_txn) 35 36 if not args.skip_timeseries: 37 _copy_timeseries(src_dbs, src_txn, dst_dbs, dst_txn) 38 39 40def _copy_system(src_dbs, src_txn, dst_dbs, dst_txn): 41 for db_type in [common.DbType.SYSTEM_SETTINGS, 42 common.DbType.SYSTEM_LAST_EVENT_ID, 43 common.DbType.SYSTEM_LAST_TIMESTAMP]: 44 _copy_db(src_dbs[db_type], src_txn, dst_dbs[db_type], dst_txn) 45 46 47def _copy_ref(src_dbs, src_txn, dst_dbs, dst_txn, skip_latest, 48 skip_timeseries): 49 db_type = common.DbType.REF 50 db_def = common.db_defs[db_type] 51 52 for key, value in src_txn.cursor(db=src_dbs[db_type]): 53 if not skip_latest and not skip_timeseries: 54 dst_txn.put(key, value, db=dst_dbs[db_type]) 55 continue 56 57 event_refs = db_def.decode_value(value) 58 59 if skip_latest: 60 event_refs = ( 61 event_ref for event_ref in event_refs 62 if not isinstance(event_ref, common.LatestEventRef)) 63 64 if skip_timeseries: 65 event_refs = ( 66 event_ref for event_ref in event_refs 67 if not isinstance(event_ref, common.TimeseriesEventRef)) 68 69 event_refs = set(event_refs) 70 if not event_refs: 71 continue 72 73 value = db_def.encode_value(event_refs) 74 dst_txn.put(key, value, db=dst_dbs[db_type]) 75 76 77def _copy_latest(src_dbs, src_txn, dst_dbs, dst_txn): 78 for db_type in [common.DbType.LATEST_DATA, 79 common.DbType.LATEST_TYPE]: 80 _copy_db(src_dbs[db_type], src_txn, dst_dbs[db_type], dst_txn) 81 82 83def _copy_timeseries(src_dbs, src_txn, dst_dbs, dst_txn): 84 for db_type in [common.DbType.TIMESERIES_DATA, 85 common.DbType.TIMESERIES_PARTITION, 86 common.DbType.TIMESERIES_COUNT]: 87 _copy_db(src_dbs[db_type], src_txn, dst_dbs[db_type], dst_txn) 88 89 90def _copy_db(src_db, src_txn, dst_db, dst_txn): 91 for key, value in src_txn.cursor(db=src_db): 92 dst_txn.put(key, value, db=dst_db)
def
create_argument_parser(subparsers) -> argparse.ArgumentParser:
8def create_argument_parser(subparsers) -> argparse.ArgumentParser: 9 parser = subparsers.add_parser('copy') 10 parser.add_argument('--skip-latest', action='store_true') 11 parser.add_argument('--skip-timeseries', action='store_true') 12 parser.add_argument('src_path', type=Path) 13 parser.add_argument('dst_path', type=Path) 14 return parser
def
copy(args):
17def copy(args): 18 if args.dst_path.exists(): 19 raise Exception('destination db already exists') 20 21 with common.ext_create_env(args.src_path, readonly=True) as src_env: 22 with common.ext_create_env(args.dst_path) as dst_env: 23 src_dbs = {db_type: common.ext_open_db(src_env, db_type, False) 24 for db_type in common.DbType} 25 dst_dbs = {db_type: common.ext_open_db(dst_env, db_type, True) 26 for db_type in common.DbType} 27 28 with src_env.begin(buffers=True) as src_txn: 29 with dst_env.begin(write=True) as dst_txn: 30 _copy_system(src_dbs, src_txn, dst_dbs, dst_txn) 31 _copy_ref(src_dbs, src_txn, dst_dbs, dst_txn, 32 args.skip_latest, args.skip_timeseries) 33 34 if not args.skip_latest: 35 _copy_latest(src_dbs, src_txn, dst_dbs, dst_txn) 36 37 if not args.skip_timeseries: 38 _copy_timeseries(src_dbs, src_txn, dst_dbs, dst_txn)