hat.event.backends.lmdb.convert.convert_v07_to_v09
1from pathlib import Path 2 3from hat.event.backends.lmdb.convert import v07 4from hat.event.backends.lmdb.convert import v09 5 6 7def convert(src_path: Path, 8 dst_path: Path): 9 with v07.create_env(src_path) as src_env: 10 with v09.create_env(dst_path) as dst_env: 11 src_dbs = {db_type: v07.open_db(src_env, db_type) 12 for db_type in v07.DbType} 13 dst_dbs = {db_type: v09.open_db(dst_env, db_type) 14 for db_type in v09.DbType} 15 16 with src_env.begin(buffers=True) as src_txn: 17 with dst_env.begin(write=True) as dst_txn: 18 _convert_system_db(src_txn, src_dbs, dst_txn, dst_dbs) 19 _convert_ref_db(src_txn, src_dbs, dst_txn, dst_dbs) 20 _convert_latest_db(src_txn, src_dbs, dst_txn, dst_dbs) 21 _convert_timeseries_db(src_txn, src_dbs, dst_txn, dst_dbs) 22 23 24def _convert_system_db(src_txn, src_dbs, dst_txn, dst_dbs): 25 v09.write(dst_dbs, dst_txn, v09.DbType.SYSTEM_SETTINGS, 26 v09.SettingsId.VERSION, v09.version) 27 28 with src_txn.cursor(db=src_dbs[v07.DbType.SYSTEM]) as src_cursor: 29 for src_key, src_value in src_cursor: 30 server_id = v07.decode_system_db_key(src_key) 31 src_event_id, src_timestamp = v07.decode_system_db_value(src_value) 32 33 dst_event_id = _convert_event_id(src_event_id) 34 dst_timestamp = _convert_timestamp(src_timestamp) 35 36 v09.write(dst_dbs, dst_txn, v09.DbType.SYSTEM_LAST_EVENT_ID, 37 server_id, dst_event_id) 38 39 v09.write(dst_dbs, dst_txn, v09.DbType.SYSTEM_LAST_TIMESTAMP, 40 server_id, dst_timestamp) 41 42 43def _convert_ref_db(src_txn, src_dbs, dst_txn, dst_dbs): 44 with src_txn.cursor(db=src_dbs[v07.DbType.REF]) as src_cursor: 45 for src_key, src_value in src_cursor: 46 src_event_id = v07.decode_ref_db_key(src_key) 47 src_event_refs = v07.decode_ref_db_value(src_value) 48 49 dst_event_id = _convert_event_id(src_event_id) 50 dst_event_refs = {_convert_event_ref(src_event_ref) 51 for src_event_ref in src_event_refs} 52 53 v09.write(dst_dbs, dst_txn, v09.DbType.REF, 54 dst_event_id, dst_event_refs) 55 56 57def _convert_latest_db(src_txn, src_dbs, dst_txn, dst_dbs): 58 with src_txn.cursor(db=src_dbs[v07.DbType.LATEST_DATA]) as src_cursor: 59 for src_key, src_value in src_cursor: 60 event_type_ref = v07.decode_latest_data_db_key(src_key) 61 src_event = v07.decode_latest_data_db_value(src_value) 62 63 dst_event = _convert_event(src_event) 64 65 v09.write(dst_dbs, dst_txn, v09.DbType.LATEST_DATA, 66 event_type_ref, dst_event) 67 68 with src_txn.cursor(db=src_dbs[v07.DbType.LATEST_TYPE]) as src_cursor: 69 for src_key, src_value in src_cursor: 70 event_type_ref = v07.decode_latest_type_db_key(src_key) 71 event_type = v07.decode_latest_type_db_value(src_value) 72 73 v09.write(dst_dbs, dst_txn, v09.DbType.LATEST_TYPE, 74 event_type_ref, event_type) 75 76 77def _convert_timeseries_db(src_txn, src_dbs, dst_txn, dst_dbs): 78 with src_txn.cursor(db=src_dbs[v07.DbType.ORDERED_DATA]) as src_cursor: 79 for src_key, src_value in src_cursor: 80 partition_id, src_timestamp, src_event_id = \ 81 v07.decode_ordered_data_db_key(src_key) 82 src_event = v07.decode_ordered_data_db_value(src_value) 83 84 dst_timestamp = _convert_timestamp(src_timestamp) 85 dst_event_id = _convert_event_id(src_event_id) 86 dst_event = _convert_event(src_event) 87 88 v09.write(dst_dbs, dst_txn, v09.DbType.TIMESERIES_DATA, 89 (partition_id, dst_timestamp, dst_event_id), dst_event) 90 91 with src_txn.cursor(db=src_dbs[v07.DbType.ORDERED_PARTITION]) as src_cursor: # NOQA 92 for src_key, src_value in src_cursor: 93 partition_id = v07.decode_ordered_partition_db_key(src_key) 94 src_partition_data = v07.decode_ordered_partition_db_value( 95 src_value) 96 97 dst_partition_data = _convert_partition_data(src_partition_data) 98 99 v09.write(dst_dbs, dst_txn, v09.DbType.TIMESERIES_PARTITION, 100 partition_id, dst_partition_data) 101 102 with src_txn.cursor(db=src_dbs[v07.DbType.ORDERED_COUNT]) as src_cursor: 103 for src_key, src_value in src_cursor: 104 partition_id = v07.decode_ordered_count_db_key(src_key) 105 count = v07.decode_ordered_count_db_value(src_value) 106 107 v09.write(dst_dbs, dst_txn, v09.DbType.TIMESERIES_COUNT, 108 partition_id, count) 109 110 111def _convert_event_id(src_event_id): 112 return v09.EventId(*src_event_id) 113 114 115def _convert_timestamp(src_timestamp): 116 return v09.Timestamp(*src_timestamp) 117 118 119def _convert_event_ref(src_event_ref): 120 if isinstance(src_event_ref, v07.LatestEventRef): 121 return v09.LatestEventRef(src_event_ref.key) 122 123 if isinstance(src_event_ref, v07.OrderedEventRef): 124 partition_id, src_timestamp, src_event_id = src_event_ref.key 125 126 dst_timestamp = _convert_timestamp(src_timestamp) 127 dst_event_id = _convert_event_id(src_event_id) 128 129 dst_key = partition_id, dst_timestamp, dst_event_id 130 return v09.TimeseriesEventRef(dst_key) 131 132 raise ValueError('unsupported event reference') 133 134 135def _convert_event(src_event): 136 return v09.Event( 137 id=_convert_event_id(src_event.event_id), 138 type=src_event.event_type, 139 timestamp=_convert_timestamp(src_event.timestamp), 140 source_timestamp=(_convert_timestamp(src_event.source_timestamp) 141 if src_event.source_timestamp else None), 142 payload=_convert_payload(src_event.payload)) 143 144 145def _convert_payload(src_payload): 146 if src_payload is None: 147 return 148 149 if src_payload.type == v07.EventPayloadType.BINARY: 150 return v09.EventPayloadBinary(type='', 151 data=src_payload.data) 152 153 if src_payload.type == v07.EventPayloadType.JSON: 154 return v09.EventPayloadJson(data=src_payload.data) 155 156 if src_payload.type == v07.EventPayloadType.SBS: 157 binary_type = (f'{src_payload.data.module}.{src_payload.data.type}' 158 if src_payload.data.module is not None else 159 src_payload.data.type) 160 return v09.EventPayloadBinary(type=binary_type, 161 data=src_payload.data.data) 162 163 raise ValueError('unsupported payload type') 164 165 166def _convert_partition_data(src_data): 167 src_order_by = v07.OrderBy(src_data['order']) 168 dst_order_by = _convert_order_by(src_order_by) 169 170 return {'order': dst_order_by.value, 171 'subscriptions': src_data['subscriptions']} 172 173 174def _convert_order_by(src_order_by): 175 return v09.OrderBy[src_order_by.name]
def
convert(src_path: pathlib.Path, dst_path: pathlib.Path):
8def convert(src_path: Path, 9 dst_path: Path): 10 with v07.create_env(src_path) as src_env: 11 with v09.create_env(dst_path) as dst_env: 12 src_dbs = {db_type: v07.open_db(src_env, db_type) 13 for db_type in v07.DbType} 14 dst_dbs = {db_type: v09.open_db(dst_env, db_type) 15 for db_type in v09.DbType} 16 17 with src_env.begin(buffers=True) as src_txn: 18 with dst_env.begin(write=True) as dst_txn: 19 _convert_system_db(src_txn, src_dbs, dst_txn, dst_dbs) 20 _convert_ref_db(src_txn, src_dbs, dst_txn, dst_dbs) 21 _convert_latest_db(src_txn, src_dbs, dst_txn, dst_dbs) 22 _convert_timeseries_db(src_txn, src_dbs, dst_txn, dst_dbs)