hat.event.backends.lmdb.environment
1from pathlib import Path 2import asyncio 3import typing 4 5import lmdb 6 7from hat import aio 8 9from hat.event.backends.lmdb import common 10 11 12async def create(db_path: Path, 13 max_size: int = common.default_max_size 14 ) -> 'Environment': 15 env = Environment() 16 env._loop = asyncio.get_running_loop() 17 env._async_group = aio.Group() 18 env._executor = aio.Executor(1, log_exceptions=False) 19 env._env = None 20 env._dbs = {} 21 22 env.async_group.spawn(aio.call_on_cancel, env._on_close) 23 24 try: 25 await env._executor.spawn(env._ext_init, db_path, max_size) 26 27 except BaseException: 28 await aio.uncancellable(env.async_close()) 29 raise 30 31 return env 32 33 34class Environment(aio.Resource): 35 36 @property 37 def async_group(self) -> aio.Group: 38 return self._async_group 39 40 async def execute(self, 41 ext_fn: typing.Callable, 42 *args: typing.Any) -> typing.Any: 43 if not self.is_open: 44 raise Exception('environment closed') 45 46 return await self._executor.spawn(ext_fn, *args) 47 48 def ext_begin(self, write: bool = False) -> lmdb.Transaction: 49 return self._env.begin(write=write, 50 buffers=True) 51 52 def ext_cursor(self, 53 txn: lmdb.Transaction, 54 db_type: common.DbType 55 ) -> lmdb.Cursor: 56 return txn.cursor(self._dbs[db_type]) 57 58 def ext_stat(self, 59 txn: lmdb.Transaction, 60 db_type: common.DbType 61 ) -> dict[str, int]: 62 return txn.stat(self._dbs[db_type]) 63 64 def ext_read(self, 65 txn: lmdb.Transaction, 66 db_type: common.DbType 67 ) -> typing.Iterable[tuple[common.DbKey, common.DbValue]]: 68 db_def = common.db_defs[db_type] 69 with txn.cursor(self._dbs[db_type]) as cursor: 70 for encoded_key, encoded_value in cursor: 71 key = db_def.decode_key(encoded_key) 72 value = db_def.decode_value(encoded_value) 73 yield key, value 74 75 def ext_write(self, 76 txn: lmdb.Transaction, 77 db_type: common.DbType, 78 data: typing.Iterable[tuple[common.DbKey, common.DbValue]]): 79 db_def = common.db_defs[db_type] 80 with txn.cursor(self._dbs[db_type]) as cursor: 81 for key, value in data: 82 encoded_key = db_def.encode_key(key) 83 encoded_value = db_def.encode_value(value) 84 cursor.put(encoded_key, encoded_value) 85 86 async def _on_close(self): 87 if self._env: 88 await self._executor.spawn(self._env.close) 89 90 await self._executor.async_close() 91 92 def _ext_init(self, db_path, max_size): 93 create = not db_path.exists() 94 95 self._env = common.ext_create_env(path=db_path, 96 max_size=max_size) 97 self._dbs = {db_type: common.ext_open_db(env=self._env, 98 db_type=db_type, 99 create=create) 100 for db_type in common.DbType}
13async def create(db_path: Path, 14 max_size: int = common.default_max_size 15 ) -> 'Environment': 16 env = Environment() 17 env._loop = asyncio.get_running_loop() 18 env._async_group = aio.Group() 19 env._executor = aio.Executor(1, log_exceptions=False) 20 env._env = None 21 env._dbs = {} 22 23 env.async_group.spawn(aio.call_on_cancel, env._on_close) 24 25 try: 26 await env._executor.spawn(env._ext_init, db_path, max_size) 27 28 except BaseException: 29 await aio.uncancellable(env.async_close()) 30 raise 31 32 return env
class
Environment(hat.aio.group.Resource):
35class Environment(aio.Resource): 36 37 @property 38 def async_group(self) -> aio.Group: 39 return self._async_group 40 41 async def execute(self, 42 ext_fn: typing.Callable, 43 *args: typing.Any) -> typing.Any: 44 if not self.is_open: 45 raise Exception('environment closed') 46 47 return await self._executor.spawn(ext_fn, *args) 48 49 def ext_begin(self, write: bool = False) -> lmdb.Transaction: 50 return self._env.begin(write=write, 51 buffers=True) 52 53 def ext_cursor(self, 54 txn: lmdb.Transaction, 55 db_type: common.DbType 56 ) -> lmdb.Cursor: 57 return txn.cursor(self._dbs[db_type]) 58 59 def ext_stat(self, 60 txn: lmdb.Transaction, 61 db_type: common.DbType 62 ) -> dict[str, int]: 63 return txn.stat(self._dbs[db_type]) 64 65 def ext_read(self, 66 txn: lmdb.Transaction, 67 db_type: common.DbType 68 ) -> typing.Iterable[tuple[common.DbKey, common.DbValue]]: 69 db_def = common.db_defs[db_type] 70 with txn.cursor(self._dbs[db_type]) as cursor: 71 for encoded_key, encoded_value in cursor: 72 key = db_def.decode_key(encoded_key) 73 value = db_def.decode_value(encoded_value) 74 yield key, value 75 76 def ext_write(self, 77 txn: lmdb.Transaction, 78 db_type: common.DbType, 79 data: typing.Iterable[tuple[common.DbKey, common.DbValue]]): 80 db_def = common.db_defs[db_type] 81 with txn.cursor(self._dbs[db_type]) as cursor: 82 for key, value in data: 83 encoded_key = db_def.encode_key(key) 84 encoded_value = db_def.encode_value(value) 85 cursor.put(encoded_key, encoded_value) 86 87 async def _on_close(self): 88 if self._env: 89 await self._executor.spawn(self._env.close) 90 91 await self._executor.async_close() 92 93 def _ext_init(self, db_path, max_size): 94 create = not db_path.exists() 95 96 self._env = common.ext_create_env(path=db_path, 97 max_size=max_size) 98 self._dbs = {db_type: common.ext_open_db(env=self._env, 99 db_type=db_type, 100 create=create) 101 for db_type in common.DbType}
Resource with lifetime control based on Group
.
def
ext_stat( self, txn: Transaction, db_type: hat.event.backends.lmdb.common.DbType) -> dict[str, int]:
def
ext_read( self, txn: Transaction, db_type: hat.event.backends.lmdb.common.DbType) -> Iterable[tuple[~DbKey, ~DbValue]]:
65 def ext_read(self, 66 txn: lmdb.Transaction, 67 db_type: common.DbType 68 ) -> typing.Iterable[tuple[common.DbKey, common.DbValue]]: 69 db_def = common.db_defs[db_type] 70 with txn.cursor(self._dbs[db_type]) as cursor: 71 for encoded_key, encoded_value in cursor: 72 key = db_def.decode_key(encoded_key) 73 value = db_def.decode_value(encoded_value) 74 yield key, value
def
ext_write( self, txn: Transaction, db_type: hat.event.backends.lmdb.common.DbType, data: Iterable[tuple[~DbKey, ~DbValue]]):
76 def ext_write(self, 77 txn: lmdb.Transaction, 78 db_type: common.DbType, 79 data: typing.Iterable[tuple[common.DbKey, common.DbValue]]): 80 db_def = common.db_defs[db_type] 81 with txn.cursor(self._dbs[db_type]) as cursor: 82 for key, value in data: 83 encoded_key = db_def.encode_key(key) 84 encoded_value = db_def.encode_value(value) 85 cursor.put(encoded_key, encoded_value)