hat.event.backends.lmdb.environment

  1from pathlib import Path
  2import asyncio
  3import typing
  4
  5import lmdb
  6
  7from hat import aio
  8
  9from hat.event.backends.lmdb import common
 10
 11
 12async def create(db_path: Path,
 13                 max_size: int = common.default_max_size
 14                 ) -> 'Environment':
 15    env = Environment()
 16    env._loop = asyncio.get_running_loop()
 17    env._async_group = aio.Group()
 18    env._executor = aio.Executor(1, log_exceptions=False)
 19    env._env = None
 20    env._dbs = {}
 21
 22    env.async_group.spawn(aio.call_on_cancel, env._on_close)
 23
 24    try:
 25        await env._executor.spawn(env._ext_init, db_path, max_size)
 26
 27    except BaseException:
 28        await aio.uncancellable(env.async_close())
 29        raise
 30
 31    return env
 32
 33
 34class Environment(aio.Resource):
 35
 36    @property
 37    def async_group(self) -> aio.Group:
 38        return self._async_group
 39
 40    async def execute(self,
 41                      ext_fn: typing.Callable,
 42                      *args: typing.Any) -> typing.Any:
 43        if not self.is_open:
 44            raise Exception('environment closed')
 45
 46        return await self._executor.spawn(ext_fn, *args)
 47
 48    def ext_begin(self, write: bool = False) -> lmdb.Transaction:
 49        return self._env.begin(write=write,
 50                               buffers=True)
 51
 52    def ext_cursor(self,
 53                   txn: lmdb.Transaction,
 54                   db_type: common.DbType
 55                   ) -> lmdb.Cursor:
 56        return txn.cursor(self._dbs[db_type])
 57
 58    def ext_stat(self,
 59                 txn: lmdb.Transaction,
 60                 db_type: common.DbType
 61                 ) -> dict[str, int]:
 62        return txn.stat(self._dbs[db_type])
 63
 64    def ext_read(self,
 65                 txn: lmdb.Transaction,
 66                 db_type: common.DbType
 67                 ) -> typing.Iterable[tuple[common.DbKey, common.DbValue]]:
 68        db_def = common.db_defs[db_type]
 69        with txn.cursor(self._dbs[db_type]) as cursor:
 70            for encoded_key, encoded_value in cursor:
 71                key = db_def.decode_key(encoded_key)
 72                value = db_def.decode_value(encoded_value)
 73                yield key, value
 74
 75    def ext_write(self,
 76                  txn: lmdb.Transaction,
 77                  db_type: common.DbType,
 78                  data: typing.Iterable[tuple[common.DbKey, common.DbValue]]):
 79        db_def = common.db_defs[db_type]
 80        with txn.cursor(self._dbs[db_type]) as cursor:
 81            for key, value in data:
 82                encoded_key = db_def.encode_key(key)
 83                encoded_value = db_def.encode_value(value)
 84                cursor.put(encoded_key, encoded_value)
 85
 86    async def _on_close(self):
 87        if self._env:
 88            await self._executor.spawn(self._env.close)
 89
 90        await self._executor.async_close()
 91
 92    def _ext_init(self, db_path, max_size):
 93        create = not db_path.exists()
 94
 95        self._env = common.ext_create_env(path=db_path,
 96                                          max_size=max_size)
 97        self._dbs = {db_type: common.ext_open_db(env=self._env,
 98                                                 db_type=db_type,
 99                                                 create=create)
100                     for db_type in common.DbType}
async def create( db_path: pathlib.Path, max_size: int = 1099511627776) -> Environment:
13async def create(db_path: Path,
14                 max_size: int = common.default_max_size
15                 ) -> 'Environment':
16    env = Environment()
17    env._loop = asyncio.get_running_loop()
18    env._async_group = aio.Group()
19    env._executor = aio.Executor(1, log_exceptions=False)
20    env._env = None
21    env._dbs = {}
22
23    env.async_group.spawn(aio.call_on_cancel, env._on_close)
24
25    try:
26        await env._executor.spawn(env._ext_init, db_path, max_size)
27
28    except BaseException:
29        await aio.uncancellable(env.async_close())
30        raise
31
32    return env
class Environment(hat.aio.group.Resource):
 35class Environment(aio.Resource):
 36
 37    @property
 38    def async_group(self) -> aio.Group:
 39        return self._async_group
 40
 41    async def execute(self,
 42                      ext_fn: typing.Callable,
 43                      *args: typing.Any) -> typing.Any:
 44        if not self.is_open:
 45            raise Exception('environment closed')
 46
 47        return await self._executor.spawn(ext_fn, *args)
 48
 49    def ext_begin(self, write: bool = False) -> lmdb.Transaction:
 50        return self._env.begin(write=write,
 51                               buffers=True)
 52
 53    def ext_cursor(self,
 54                   txn: lmdb.Transaction,
 55                   db_type: common.DbType
 56                   ) -> lmdb.Cursor:
 57        return txn.cursor(self._dbs[db_type])
 58
 59    def ext_stat(self,
 60                 txn: lmdb.Transaction,
 61                 db_type: common.DbType
 62                 ) -> dict[str, int]:
 63        return txn.stat(self._dbs[db_type])
 64
 65    def ext_read(self,
 66                 txn: lmdb.Transaction,
 67                 db_type: common.DbType
 68                 ) -> typing.Iterable[tuple[common.DbKey, common.DbValue]]:
 69        db_def = common.db_defs[db_type]
 70        with txn.cursor(self._dbs[db_type]) as cursor:
 71            for encoded_key, encoded_value in cursor:
 72                key = db_def.decode_key(encoded_key)
 73                value = db_def.decode_value(encoded_value)
 74                yield key, value
 75
 76    def ext_write(self,
 77                  txn: lmdb.Transaction,
 78                  db_type: common.DbType,
 79                  data: typing.Iterable[tuple[common.DbKey, common.DbValue]]):
 80        db_def = common.db_defs[db_type]
 81        with txn.cursor(self._dbs[db_type]) as cursor:
 82            for key, value in data:
 83                encoded_key = db_def.encode_key(key)
 84                encoded_value = db_def.encode_value(value)
 85                cursor.put(encoded_key, encoded_value)
 86
 87    async def _on_close(self):
 88        if self._env:
 89            await self._executor.spawn(self._env.close)
 90
 91        await self._executor.async_close()
 92
 93    def _ext_init(self, db_path, max_size):
 94        create = not db_path.exists()
 95
 96        self._env = common.ext_create_env(path=db_path,
 97                                          max_size=max_size)
 98        self._dbs = {db_type: common.ext_open_db(env=self._env,
 99                                                 db_type=db_type,
100                                                 create=create)
101                     for db_type in common.DbType}

Resource with lifetime control based on Group.

async_group: hat.aio.group.Group
37    @property
38    def async_group(self) -> aio.Group:
39        return self._async_group

Group controlling resource's lifetime.

async def execute(self, ext_fn: Callable, *args: Any) -> Any:
41    async def execute(self,
42                      ext_fn: typing.Callable,
43                      *args: typing.Any) -> typing.Any:
44        if not self.is_open:
45            raise Exception('environment closed')
46
47        return await self._executor.spawn(ext_fn, *args)
def ext_begin(self, write: bool = False) -> Transaction:
49    def ext_begin(self, write: bool = False) -> lmdb.Transaction:
50        return self._env.begin(write=write,
51                               buffers=True)
def ext_cursor( self, txn: Transaction, db_type: hat.event.backends.lmdb.common.DbType) -> Cursor:
53    def ext_cursor(self,
54                   txn: lmdb.Transaction,
55                   db_type: common.DbType
56                   ) -> lmdb.Cursor:
57        return txn.cursor(self._dbs[db_type])
def ext_stat( self, txn: Transaction, db_type: hat.event.backends.lmdb.common.DbType) -> dict[str, int]:
59    def ext_stat(self,
60                 txn: lmdb.Transaction,
61                 db_type: common.DbType
62                 ) -> dict[str, int]:
63        return txn.stat(self._dbs[db_type])
def ext_read( self, txn: Transaction, db_type: hat.event.backends.lmdb.common.DbType) -> Iterable[tuple[~DbKey, ~DbValue]]:
65    def ext_read(self,
66                 txn: lmdb.Transaction,
67                 db_type: common.DbType
68                 ) -> typing.Iterable[tuple[common.DbKey, common.DbValue]]:
69        db_def = common.db_defs[db_type]
70        with txn.cursor(self._dbs[db_type]) as cursor:
71            for encoded_key, encoded_value in cursor:
72                key = db_def.decode_key(encoded_key)
73                value = db_def.decode_value(encoded_value)
74                yield key, value
def ext_write( self, txn: Transaction, db_type: hat.event.backends.lmdb.common.DbType, data: Iterable[tuple[~DbKey, ~DbValue]]):
76    def ext_write(self,
77                  txn: lmdb.Transaction,
78                  db_type: common.DbType,
79                  data: typing.Iterable[tuple[common.DbKey, common.DbValue]]):
80        db_def = common.db_defs[db_type]
81        with txn.cursor(self._dbs[db_type]) as cursor:
82            for key, value in data:
83                encoded_key = db_def.encode_key(key)
84                encoded_value = db_def.encode_value(value)
85                cursor.put(encoded_key, encoded_value)