hat.event.backends.lmdb.convert.convert_v06_to_v07
1from pathlib import Path 2import itertools 3 4from hat.event.backends.lmdb.convert import v06 5from hat.event.backends.lmdb.convert import v07 6 7 8def convert(src_path: Path, 9 dst_path: Path): 10 with v06.create_env(src_path) as src_env: 11 src_system_db = src_env.open_db(b'system') 12 server_id = _get_server_id(src_env, src_system_db) 13 14 if server_id < 1: 15 server_id = 1 16 17 with v07.create_env(dst_path) as dst_env: 18 dst_system_db = v07.open_db(dst_env, v07.DbType.SYSTEM) 19 dst_ref_db = v07.open_db(dst_env, v07.DbType.REF) 20 21 _convert_latest(src_env=src_env, 22 dst_env=dst_env, 23 dst_system_db=dst_system_db, 24 dst_ref_db=dst_ref_db, 25 server_id=server_id) 26 27 _convert_ordered(src_env=src_env, 28 dst_env=dst_env, 29 dst_system_db=dst_system_db, 30 dst_ref_db=dst_ref_db, 31 server_id=server_id) 32 33 34def _convert_latest(src_env, dst_env, dst_system_db, dst_ref_db, server_id): 35 src_latest_db = src_env.open_db(b'latest') 36 37 dst_latest_data_db = v07.open_db(dst_env, v07.DbType.LATEST_DATA) 38 dst_latest_type_db = v07.open_db(dst_env, v07.DbType.LATEST_TYPE) 39 40 latest_event = None 41 next_event_type_ref = itertools.count(1) 42 43 with src_env.begin(db=src_latest_db, buffers=True) as src_txn: 44 for _, src_encoded_value in src_txn.cursor(): 45 src_event = v06.decode_event(src_encoded_value) 46 dst_event = _convert_event(src_event, server_id) 47 48 if not latest_event or latest_event.event_id < dst_event.event_id: 49 latest_event = dst_event 50 51 event_type_ref = next(next_event_type_ref) 52 53 with dst_env.begin(db=dst_latest_data_db, write=True) as dst_txn: 54 dst_txn.put( 55 v07.encode_latest_data_db_key( 56 event_type_ref), 57 v07.encode_latest_data_db_value( 58 dst_event)) 59 60 with dst_env.begin(db=dst_latest_type_db, write=True) as dst_txn: 61 dst_txn.put( 62 v07.encode_latest_type_db_key( 63 event_type_ref), 64 v07.encode_latest_type_db_value( 65 dst_event.event_type)) 66 67 _update_ref_db(dst_env=dst_env, 68 dst_ref_db=dst_ref_db, 69 dst_event=dst_event, 70 event_ref=v07.LatestEventRef(event_type_ref)) 71 72 if latest_event: 73 _update_system_db(dst_env=dst_env, 74 dst_system_db=dst_system_db, 75 server_id=server_id, 76 latest_event=latest_event) 77 78 79def _convert_ordered(src_env, dst_env, dst_system_db, dst_ref_db, server_id): 80 src_ordered_data_db = src_env.open_db(b'ordered_data') 81 src_ordered_partition_db = src_env.open_db(b'ordered_partition') 82 src_ordered_count_db = src_env.open_db(b'ordered_count') 83 84 dst_ordered_data_db = v07.open_db(dst_env, v07.DbType.ORDERED_DATA) 85 dst_ordered_partition_db = v07.open_db( 86 dst_env, v07.DbType.ORDERED_PARTITION) 87 dst_ordered_count_db = v07.open_db(dst_env, v07.DbType.ORDERED_COUNT) 88 89 latest_event = None 90 91 with src_env.begin(db=src_ordered_data_db, buffers=True) as src_txn: 92 for src_encoded_key, src_encoded_value in src_txn.cursor(): 93 partition_id, src_timestamp, _ = v06.decode_uint_timestamp_uint( 94 src_encoded_key) 95 src_event = v06.decode_event(src_encoded_value) 96 dst_timestamp = _convert_timestamp(src_timestamp) 97 dst_event = _convert_event(src_event, server_id) 98 99 if not latest_event or latest_event.event_id < dst_event.event_id: 100 latest_event = dst_event 101 102 dst_key = partition_id, dst_timestamp, dst_event.event_id 103 104 with dst_env.begin(db=dst_ordered_data_db, write=True) as dst_txn: 105 dst_txn.put( 106 v07.encode_ordered_data_db_key( 107 dst_key), 108 v07.encode_ordered_data_db_value( 109 dst_event)) 110 111 _update_ref_db(dst_env=dst_env, 112 dst_ref_db=dst_ref_db, 113 dst_event=dst_event, 114 event_ref=v07.OrderedEventRef(dst_key)) 115 116 if latest_event: 117 _update_system_db(dst_env=dst_env, 118 dst_system_db=dst_system_db, 119 server_id=server_id, 120 latest_event=latest_event) 121 122 with src_env.begin(db=src_ordered_partition_db, buffers=True) as src_txn: 123 for src_encoded_key, src_encoded_value in src_txn.cursor(): 124 partition_id = v06.decode_uint(src_encoded_key) 125 partition_data = v06.decode_json(src_encoded_value) 126 127 with dst_env.begin(db=dst_ordered_partition_db, 128 write=True) as dst_txn: 129 dst_txn.put( 130 v07.encode_ordered_partition_db_key( 131 partition_id), 132 v07.encode_ordered_partition_db_value( 133 partition_data)) 134 135 with src_env.begin(db=src_ordered_count_db, buffers=True) as src_txn: 136 for src_encoded_key, src_encoded_value in src_txn.cursor(): 137 partition_id = v06.decode_uint(src_encoded_key) 138 count = v06.decode_json(src_encoded_value) 139 140 with dst_env.begin(db=dst_ordered_count_db, 141 write=True) as dst_txn: 142 dst_txn.put( 143 v07.encode_ordered_count_db_key( 144 partition_id), 145 v07.encode_ordered_count_db_value( 146 count)) 147 148 149def _update_ref_db(dst_env, dst_ref_db, dst_event, event_ref): 150 with dst_env.begin(db=dst_ref_db, 151 write=True, 152 buffers=True) as dst_txn: 153 dst_encoded_key = v07.encode_ref_db_key(dst_event.event_id) 154 dst_encoded_value = dst_txn.get(dst_encoded_key) 155 156 event_refs = (v07.decode_ref_db_value(dst_encoded_value) 157 if dst_encoded_value else set()) 158 event_refs.add(event_ref) 159 dst_encoded_value = v07.encode_ref_db_value(event_refs) 160 161 dst_txn.put(dst_encoded_key, dst_encoded_value) 162 163 164def _update_system_db(dst_env, dst_system_db, server_id, latest_event): 165 with dst_env.begin(db=dst_system_db, 166 write=True, 167 buffers=True) as dst_txn: 168 dst_encoded_key = v07.encode_system_db_key(server_id) 169 dst_encoded_value = dst_txn.get(dst_encoded_key) 170 171 event_id, _ = (v07.decode_system_db_value(dst_encoded_value) 172 if dst_encoded_value 173 else (v07.EventId(server_id, 0, 0), None)) 174 175 if latest_event.event_id > event_id: 176 dst_txn.put(dst_encoded_key, 177 v07.encode_system_db_value((latest_event.event_id, 178 latest_event.timestamp))) 179 180 181def _get_server_id(src_env, src_system_db): 182 with src_env.begin(db=src_system_db, buffers=True) as txn: 183 src_encoded_server_id = txn.get(b'server_id') 184 return v06.decode_uint(src_encoded_server_id) 185 186 187def _convert_event(src_event, server_id): 188 return v07.Event( 189 event_id=_convert_event_id(src_event.event_id, server_id), 190 event_type=src_event.event_type, 191 timestamp=_convert_timestamp(src_event.timestamp), 192 source_timestamp=(_convert_timestamp(src_event.source_timestamp) 193 if src_event.source_timestamp else None), 194 payload=(_convert_event_payload(src_event.payload) 195 if src_event.payload else None)) 196 197 198def _convert_event_id(src_event_id, server_id): 199 return v07.EventId(server=(src_event_id.server if src_event_id.server > 0 200 else server_id), 201 session=src_event_id.instance, 202 instance=src_event_id.instance) 203 204 205def _convert_event_payload(src_event_payload): 206 return v07.EventPayload( 207 type=v07.EventPayloadType[src_event_payload.type.name], 208 data=src_event_payload.data) 209 210 211def _convert_timestamp(src_timestamp): 212 return v07.Timestamp(s=src_timestamp.s, 213 us=src_timestamp.us)
def
convert(src_path: pathlib.Path, dst_path: pathlib.Path):
9def convert(src_path: Path, 10 dst_path: Path): 11 with v06.create_env(src_path) as src_env: 12 src_system_db = src_env.open_db(b'system') 13 server_id = _get_server_id(src_env, src_system_db) 14 15 if server_id < 1: 16 server_id = 1 17 18 with v07.create_env(dst_path) as dst_env: 19 dst_system_db = v07.open_db(dst_env, v07.DbType.SYSTEM) 20 dst_ref_db = v07.open_db(dst_env, v07.DbType.REF) 21 22 _convert_latest(src_env=src_env, 23 dst_env=dst_env, 24 dst_system_db=dst_system_db, 25 dst_ref_db=dst_ref_db, 26 server_id=server_id) 27 28 _convert_ordered(src_env=src_env, 29 dst_env=dst_env, 30 dst_system_db=dst_system_db, 31 dst_ref_db=dst_ref_db, 32 server_id=server_id)