hat.event.backends.lmdb.manager.copy

 1from pathlib import Path
 2import argparse
 3
 4from hat.event.backends.lmdb.manager import common
 5
 6
 7def create_argument_parser(subparsers) -> argparse.ArgumentParser:
 8    parser = subparsers.add_parser('copy')
 9    parser.add_argument('--skip-latest', action='store_true')
10    parser.add_argument('--skip-timeseries', action='store_true')
11    parser.add_argument('src_path', type=Path)
12    parser.add_argument('dst_path', type=Path)
13    return parser
14
15
16def copy(args):
17    if args.dst_path.exists():
18        raise Exception('destination db already exists')
19
20    with common.ext_create_env(args.src_path, readonly=True) as src_env:
21        with common.ext_create_env(args.dst_path) as dst_env:
22            src_dbs = {db_type: common.ext_open_db(src_env, db_type, False)
23                       for db_type in common.DbType}
24            dst_dbs = {db_type: common.ext_open_db(dst_env, db_type, True)
25                       for db_type in common.DbType}
26
27            with src_env.begin(buffers=True) as src_txn:
28                with dst_env.begin(write=True) as dst_txn:
29                    _copy_system(src_dbs, src_txn, dst_dbs, dst_txn)
30                    _copy_ref(src_dbs, src_txn, dst_dbs, dst_txn,
31                              args.skip_latest, args.skip_timeseries)
32
33                    if not args.skip_latest:
34                        _copy_latest(src_dbs, src_txn, dst_dbs, dst_txn)
35
36                    if not args.skip_timeseries:
37                        _copy_timeseries(src_dbs, src_txn, dst_dbs, dst_txn)
38
39
40def _copy_system(src_dbs, src_txn, dst_dbs, dst_txn):
41    for db_type in [common.DbType.SYSTEM_SETTINGS,
42                    common.DbType.SYSTEM_LAST_EVENT_ID,
43                    common.DbType.SYSTEM_LAST_TIMESTAMP]:
44        _copy_db(src_dbs[db_type], src_txn, dst_dbs[db_type], dst_txn)
45
46
47def _copy_ref(src_dbs, src_txn, dst_dbs, dst_txn, skip_latest,
48              skip_timeseries):
49    db_type = common.DbType.REF
50    db_def = common.db_defs[db_type]
51
52    for key, value in src_txn.cursor(db=src_dbs[db_type]):
53        if not skip_latest and not skip_timeseries:
54            dst_txn.put(key, value, db=dst_dbs[db_type])
55            continue
56
57        event_refs = db_def.decode_value(value)
58
59        if skip_latest:
60            event_refs = (
61                event_ref for event_ref in event_refs
62                if not isinstance(event_ref, common.LatestEventRef))
63
64        if skip_timeseries:
65            event_refs = (
66                event_ref for event_ref in event_refs
67                if not isinstance(event_ref, common.TimeseriesEventRef))
68
69        event_refs = set(event_refs)
70        if not event_refs:
71            continue
72
73        value = db_def.encode_value(event_refs)
74        dst_txn.put(key, value, db=dst_dbs[db_type])
75
76
77def _copy_latest(src_dbs, src_txn, dst_dbs, dst_txn):
78    for db_type in [common.DbType.LATEST_DATA,
79                    common.DbType.LATEST_TYPE]:
80        _copy_db(src_dbs[db_type], src_txn, dst_dbs[db_type], dst_txn)
81
82
83def _copy_timeseries(src_dbs, src_txn, dst_dbs, dst_txn):
84    for db_type in [common.DbType.TIMESERIES_DATA,
85                    common.DbType.TIMESERIES_PARTITION,
86                    common.DbType.TIMESERIES_COUNT]:
87        _copy_db(src_dbs[db_type], src_txn, dst_dbs[db_type], dst_txn)
88
89
90def _copy_db(src_db, src_txn, dst_db, dst_txn):
91    for key, value in src_txn.cursor(db=src_db):
92        dst_txn.put(key, value, db=dst_db)
def create_argument_parser(subparsers) -> argparse.ArgumentParser:
 8def create_argument_parser(subparsers) -> argparse.ArgumentParser:
 9    parser = subparsers.add_parser('copy')
10    parser.add_argument('--skip-latest', action='store_true')
11    parser.add_argument('--skip-timeseries', action='store_true')
12    parser.add_argument('src_path', type=Path)
13    parser.add_argument('dst_path', type=Path)
14    return parser
def copy(args):
17def copy(args):
18    if args.dst_path.exists():
19        raise Exception('destination db already exists')
20
21    with common.ext_create_env(args.src_path, readonly=True) as src_env:
22        with common.ext_create_env(args.dst_path) as dst_env:
23            src_dbs = {db_type: common.ext_open_db(src_env, db_type, False)
24                       for db_type in common.DbType}
25            dst_dbs = {db_type: common.ext_open_db(dst_env, db_type, True)
26                       for db_type in common.DbType}
27
28            with src_env.begin(buffers=True) as src_txn:
29                with dst_env.begin(write=True) as dst_txn:
30                    _copy_system(src_dbs, src_txn, dst_dbs, dst_txn)
31                    _copy_ref(src_dbs, src_txn, dst_dbs, dst_txn,
32                              args.skip_latest, args.skip_timeseries)
33
34                    if not args.skip_latest:
35                        _copy_latest(src_dbs, src_txn, dst_dbs, dst_txn)
36
37                    if not args.skip_timeseries:
38                        _copy_timeseries(src_dbs, src_txn, dst_dbs, dst_txn)