hat.event.backends.lmdb.systemdb

  1import typing
  2
  3import lmdb
  4
  5from hat.event.backends.lmdb import common
  6from hat.event.backends.lmdb import environment
  7
  8
  9class Changes(typing.NamedTuple):
 10    last_event_ids: dict[common.ServerId, common.EventId]
 11    last_timestamps: dict[common.ServerId, common.Timestamp]
 12
 13
 14def ext_create(env: environment.Environment,
 15               txn: lmdb.Transaction,
 16               version: str,
 17               identifier: str | None
 18               ) -> 'SystemDb':
 19    db = SystemDb()
 20    db._env = env
 21    db._changes = Changes({}, {})
 22
 23    _ext_validate_settings(env, txn, version, identifier)
 24
 25    db._last_event_ids = dict(
 26        env.ext_read(txn, common.DbType.SYSTEM_LAST_EVENT_ID))
 27
 28    db._last_timestamps = dict(
 29        env.ext_read(txn, common.DbType.SYSTEM_LAST_TIMESTAMP))
 30
 31    return db
 32
 33
 34class SystemDb:
 35
 36    def get_last_event_id(self,
 37                          server_id: common.ServerId
 38                          ) -> common.EventId:
 39        if server_id in self._last_event_ids:
 40            return self._last_event_ids[server_id]
 41
 42        return common.EventId(server=server_id,
 43                              session=0,
 44                              instance=0)
 45
 46    def set_last_event_id(self, event_id: common.EventId):
 47        self._changes.last_event_ids[event_id.server] = event_id
 48        self._last_event_ids[event_id.server] = event_id
 49
 50    def get_last_timestamp(self,
 51                           server_id: common.ServerId
 52                           ) -> common.Timestamp:
 53        if server_id in self._last_timestamps:
 54            return self._last_timestamps[server_id]
 55
 56        return common.min_timestamp
 57
 58    def set_last_timestamp(self,
 59                           server_id: common.ServerId,
 60                           timestamp: common.Timestamp):
 61        self._changes.last_timestamps[server_id] = timestamp
 62        self._last_timestamps[server_id] = timestamp
 63
 64    def create_changes(self) -> Changes:
 65        changes, self._changes = self._changes, Changes({}, {})
 66        return changes
 67
 68    def ext_write(self,
 69                  txn: lmdb.Transaction,
 70                  changes: Changes):
 71        self._env.ext_write(txn, common.DbType.SYSTEM_LAST_EVENT_ID,
 72                            changes.last_event_ids.items())
 73
 74        self._env.ext_write(txn, common.DbType.SYSTEM_LAST_TIMESTAMP,
 75                            changes.last_timestamps.items())
 76
 77
 78def _ext_validate_settings(env, txn, version, identifier):
 79    settings = dict(env.ext_read(txn, common.DbType.SYSTEM_SETTINGS))
 80
 81    if common.SettingsId.VERSION not in settings:
 82        settings[common.SettingsId.VERSION] = version
 83
 84    elif settings[common.SettingsId.VERSION] != version:
 85        raise Exception('invalid version')
 86
 87    else:
 88        settings.pop(common.SettingsId.VERSION)
 89
 90    if identifier is None:
 91        settings.pop(common.SettingsId.IDENTIFIER, None)
 92
 93    elif common.SettingsId.IDENTIFIER not in settings:
 94        settings[common.SettingsId.IDENTIFIER] = identifier
 95
 96    elif settings[common.SettingsId.IDENTIFIER] != identifier:
 97        raise Exception('invalid identifier')
 98
 99    else:
100        settings.pop(common.SettingsId.IDENTIFIER)
101
102    env.ext_write(txn, common.DbType.SYSTEM_SETTINGS, settings.items())
class Changes(typing.NamedTuple):
10class Changes(typing.NamedTuple):
11    last_event_ids: dict[common.ServerId, common.EventId]
12    last_timestamps: dict[common.ServerId, common.Timestamp]

Changes(last_event_ids, last_timestamps)

Changes( last_event_ids: dict[int, hat.event.common.EventId], last_timestamps: dict[int, hat.event.common.Timestamp])

Create new instance of Changes(last_event_ids, last_timestamps)

last_event_ids: dict[int, hat.event.common.EventId]

Alias for field number 0

last_timestamps: dict[int, hat.event.common.Timestamp]

Alias for field number 1

def ext_create( env: hat.event.backends.lmdb.environment.Environment, txn: Transaction, version: str, identifier: str | None) -> SystemDb:
15def ext_create(env: environment.Environment,
16               txn: lmdb.Transaction,
17               version: str,
18               identifier: str | None
19               ) -> 'SystemDb':
20    db = SystemDb()
21    db._env = env
22    db._changes = Changes({}, {})
23
24    _ext_validate_settings(env, txn, version, identifier)
25
26    db._last_event_ids = dict(
27        env.ext_read(txn, common.DbType.SYSTEM_LAST_EVENT_ID))
28
29    db._last_timestamps = dict(
30        env.ext_read(txn, common.DbType.SYSTEM_LAST_TIMESTAMP))
31
32    return db
class SystemDb:
35class SystemDb:
36
37    def get_last_event_id(self,
38                          server_id: common.ServerId
39                          ) -> common.EventId:
40        if server_id in self._last_event_ids:
41            return self._last_event_ids[server_id]
42
43        return common.EventId(server=server_id,
44                              session=0,
45                              instance=0)
46
47    def set_last_event_id(self, event_id: common.EventId):
48        self._changes.last_event_ids[event_id.server] = event_id
49        self._last_event_ids[event_id.server] = event_id
50
51    def get_last_timestamp(self,
52                           server_id: common.ServerId
53                           ) -> common.Timestamp:
54        if server_id in self._last_timestamps:
55            return self._last_timestamps[server_id]
56
57        return common.min_timestamp
58
59    def set_last_timestamp(self,
60                           server_id: common.ServerId,
61                           timestamp: common.Timestamp):
62        self._changes.last_timestamps[server_id] = timestamp
63        self._last_timestamps[server_id] = timestamp
64
65    def create_changes(self) -> Changes:
66        changes, self._changes = self._changes, Changes({}, {})
67        return changes
68
69    def ext_write(self,
70                  txn: lmdb.Transaction,
71                  changes: Changes):
72        self._env.ext_write(txn, common.DbType.SYSTEM_LAST_EVENT_ID,
73                            changes.last_event_ids.items())
74
75        self._env.ext_write(txn, common.DbType.SYSTEM_LAST_TIMESTAMP,
76                            changes.last_timestamps.items())
def get_last_event_id(self, server_id: int) -> hat.event.common.EventId:
37    def get_last_event_id(self,
38                          server_id: common.ServerId
39                          ) -> common.EventId:
40        if server_id in self._last_event_ids:
41            return self._last_event_ids[server_id]
42
43        return common.EventId(server=server_id,
44                              session=0,
45                              instance=0)
def set_last_event_id(self, event_id: hat.event.common.EventId):
47    def set_last_event_id(self, event_id: common.EventId):
48        self._changes.last_event_ids[event_id.server] = event_id
49        self._last_event_ids[event_id.server] = event_id
def get_last_timestamp(self, server_id: int) -> hat.event.common.Timestamp:
51    def get_last_timestamp(self,
52                           server_id: common.ServerId
53                           ) -> common.Timestamp:
54        if server_id in self._last_timestamps:
55            return self._last_timestamps[server_id]
56
57        return common.min_timestamp
def set_last_timestamp(self, server_id: int, timestamp: hat.event.common.Timestamp):
59    def set_last_timestamp(self,
60                           server_id: common.ServerId,
61                           timestamp: common.Timestamp):
62        self._changes.last_timestamps[server_id] = timestamp
63        self._last_timestamps[server_id] = timestamp
def create_changes(self) -> Changes:
65    def create_changes(self) -> Changes:
66        changes, self._changes = self._changes, Changes({}, {})
67        return changes
def ext_write( self, txn: Transaction, changes: Changes):
69    def ext_write(self,
70                  txn: lmdb.Transaction,
71                  changes: Changes):
72        self._env.ext_write(txn, common.DbType.SYSTEM_LAST_EVENT_ID,
73                            changes.last_event_ids.items())
74
75        self._env.ext_write(txn, common.DbType.SYSTEM_LAST_TIMESTAMP,
76                            changes.last_timestamps.items())