hat.event.backends.lmdb.convert.main
1from pathlib import Path 2import sys 3import typing 4 5from hat import util 6 7from hat.event.backends.lmdb.convert import convert_v06_to_v07 8from hat.event.backends.lmdb.convert import convert_v07_to_v09 9from hat.event.backends.lmdb.convert.version import Version, get_version 10 11 12class Conversion(typing.NamedTuple): 13 src_version: Version 14 dst_version: Version 15 convert: typing.Callable[[Path, Path], None] 16 17 18target_version = Version.v09 19 20conversions = [Conversion(src_version=Version.v06, 21 dst_version=Version.v07, 22 convert=convert_v06_to_v07.convert), 23 Conversion(src_version=Version.v07, 24 dst_version=Version.v09, 25 convert=convert_v07_to_v09.convert)] 26 27 28def main(): 29 if len(sys.argv) != 3: 30 print(f"Usage: {sys.argv[0]} SRC_DB_PATH DST_DB_PATH", file=sys.stderr) 31 sys.exit(1) 32 33 src_path = Path(sys.argv[1]) 34 dst_path = Path(sys.argv[2]) 35 36 if not src_path.exists(): 37 print(f"invalid SRC_DB_PATH: {src_path}", file=sys.stderr) 38 sys.exit(1) 39 40 if dst_path.exists(): 41 print(f"existing DST_DB_PATH: {dst_path}", file=sys.stderr) 42 sys.exit(1) 43 44 try: 45 convert(src_path, dst_path) 46 47 except Exception as e: 48 print(f"conversion error: {e}", file=sys.stderr) 49 sys.exit(1) 50 51 52def convert(src_path: Path, 53 dst_path: Path): 54 version = get_version(src_path) 55 56 if version == target_version: 57 raise Exception(f"{src_path} already up to date") 58 59 tmp_src_path = src_path 60 61 while version != target_version: 62 conversion = util.first(conversions, 63 lambda i: i.src_version == version) 64 if not conversion: 65 raise Exception(f"unsupported version {version.value}") 66 67 if conversion.dst_version == target_version: 68 tmp_dst_path = dst_path 69 70 else: 71 tmp_dst_path = dst_path.with_suffix( 72 f"{dst_path.suffix}.{conversion.dst_version.name}") 73 74 if tmp_dst_path.exists(): 75 tmp_dst_path.unlink() 76 77 conversion.convert(tmp_src_path, tmp_dst_path) 78 version = conversion.dst_version 79 80 if tmp_src_path != src_path: 81 tmp_src_path.unlink() 82 83 tmp_src_path = tmp_dst_path 84 85 86if __name__ == '__main__': 87 sys.argv[0] = 'hat-event-lmdb-convert' 88 main()
class
Conversion(typing.NamedTuple):
13class Conversion(typing.NamedTuple): 14 src_version: Version 15 dst_version: Version 16 convert: typing.Callable[[Path, Path], None]
Conversion(src_version, dst_version, convert)
Conversion( src_version: hat.event.backends.lmdb.convert.version.Version, dst_version: hat.event.backends.lmdb.convert.version.Version, convert: Callable[[pathlib.Path, pathlib.Path], NoneType])
Create new instance of Conversion(src_version, dst_version, convert)
target_version =
<Version.v09: '0.9'>
conversions =
[Conversion(src_version=<Version.v06: '0.6'>, dst_version=<Version.v07: '0.7'>, convert=<function convert>), Conversion(src_version=<Version.v07: '0.7'>, dst_version=<Version.v09: '0.9'>, convert=<function convert>)]
def
main():
29def main(): 30 if len(sys.argv) != 3: 31 print(f"Usage: {sys.argv[0]} SRC_DB_PATH DST_DB_PATH", file=sys.stderr) 32 sys.exit(1) 33 34 src_path = Path(sys.argv[1]) 35 dst_path = Path(sys.argv[2]) 36 37 if not src_path.exists(): 38 print(f"invalid SRC_DB_PATH: {src_path}", file=sys.stderr) 39 sys.exit(1) 40 41 if dst_path.exists(): 42 print(f"existing DST_DB_PATH: {dst_path}", file=sys.stderr) 43 sys.exit(1) 44 45 try: 46 convert(src_path, dst_path) 47 48 except Exception as e: 49 print(f"conversion error: {e}", file=sys.stderr) 50 sys.exit(1)
def
convert(src_path: pathlib.Path, dst_path: pathlib.Path):
53def convert(src_path: Path, 54 dst_path: Path): 55 version = get_version(src_path) 56 57 if version == target_version: 58 raise Exception(f"{src_path} already up to date") 59 60 tmp_src_path = src_path 61 62 while version != target_version: 63 conversion = util.first(conversions, 64 lambda i: i.src_version == version) 65 if not conversion: 66 raise Exception(f"unsupported version {version.value}") 67 68 if conversion.dst_version == target_version: 69 tmp_dst_path = dst_path 70 71 else: 72 tmp_dst_path = dst_path.with_suffix( 73 f"{dst_path.suffix}.{conversion.dst_version.name}") 74 75 if tmp_dst_path.exists(): 76 tmp_dst_path.unlink() 77 78 conversion.convert(tmp_src_path, tmp_dst_path) 79 version = conversion.dst_version 80 81 if tmp_src_path != src_path: 82 tmp_src_path.unlink() 83 84 tmp_src_path = tmp_dst_path