hat.event.backends.lmdb.systemdb
1import typing 2 3import lmdb 4 5from hat.event.backends.lmdb import common 6from hat.event.backends.lmdb import environment 7 8 9class Changes(typing.NamedTuple): 10 last_event_ids: dict[common.ServerId, common.EventId] 11 last_timestamps: dict[common.ServerId, common.Timestamp] 12 13 14def ext_create(env: environment.Environment, 15 txn: lmdb.Transaction, 16 version: str, 17 identifier: str | None 18 ) -> 'SystemDb': 19 db = SystemDb() 20 db._env = env 21 db._changes = Changes({}, {}) 22 23 _ext_validate_settings(env, txn, version, identifier) 24 25 db._last_event_ids = dict( 26 env.ext_read(txn, common.DbType.SYSTEM_LAST_EVENT_ID)) 27 28 db._last_timestamps = dict( 29 env.ext_read(txn, common.DbType.SYSTEM_LAST_TIMESTAMP)) 30 31 return db 32 33 34class SystemDb: 35 36 def get_last_event_id(self, 37 server_id: common.ServerId 38 ) -> common.EventId: 39 if server_id in self._last_event_ids: 40 return self._last_event_ids[server_id] 41 42 return common.EventId(server=server_id, 43 session=0, 44 instance=0) 45 46 def set_last_event_id(self, event_id: common.EventId): 47 self._changes.last_event_ids[event_id.server] = event_id 48 self._last_event_ids[event_id.server] = event_id 49 50 def get_last_timestamp(self, 51 server_id: common.ServerId 52 ) -> common.Timestamp: 53 if server_id in self._last_timestamps: 54 return self._last_timestamps[server_id] 55 56 return common.min_timestamp 57 58 def set_last_timestamp(self, 59 server_id: common.ServerId, 60 timestamp: common.Timestamp): 61 self._changes.last_timestamps[server_id] = timestamp 62 self._last_timestamps[server_id] = timestamp 63 64 def create_changes(self) -> Changes: 65 changes, self._changes = self._changes, Changes({}, {}) 66 return changes 67 68 def ext_write(self, 69 txn: lmdb.Transaction, 70 changes: Changes): 71 self._env.ext_write(txn, common.DbType.SYSTEM_LAST_EVENT_ID, 72 changes.last_event_ids.items()) 73 74 self._env.ext_write(txn, common.DbType.SYSTEM_LAST_TIMESTAMP, 75 changes.last_timestamps.items()) 76 77 78def _ext_validate_settings(env, txn, version, identifier): 79 settings = dict(env.ext_read(txn, common.DbType.SYSTEM_SETTINGS)) 80 81 if common.SettingsId.VERSION not in settings: 82 settings[common.SettingsId.VERSION] = version 83 84 elif settings[common.SettingsId.VERSION] != version: 85 raise Exception('invalid version') 86 87 else: 88 settings.pop(common.SettingsId.VERSION) 89 90 if identifier is None: 91 settings.pop(common.SettingsId.IDENTIFIER, None) 92 93 elif common.SettingsId.IDENTIFIER not in settings: 94 settings[common.SettingsId.IDENTIFIER] = identifier 95 96 elif settings[common.SettingsId.IDENTIFIER] != identifier: 97 raise Exception('invalid identifier') 98 99 else: 100 settings.pop(common.SettingsId.IDENTIFIER) 101 102 env.ext_write(txn, common.DbType.SYSTEM_SETTINGS, settings.items())
class
Changes(typing.NamedTuple):
10class Changes(typing.NamedTuple): 11 last_event_ids: dict[common.ServerId, common.EventId] 12 last_timestamps: dict[common.ServerId, common.Timestamp]
Changes(last_event_ids, last_timestamps)
Changes( last_event_ids: dict[int, hat.event.common.EventId], last_timestamps: dict[int, hat.event.common.Timestamp])
Create new instance of Changes(last_event_ids, last_timestamps)
def
ext_create( env: hat.event.backends.lmdb.environment.Environment, txn: Transaction, version: str, identifier: str | None) -> SystemDb:
15def ext_create(env: environment.Environment, 16 txn: lmdb.Transaction, 17 version: str, 18 identifier: str | None 19 ) -> 'SystemDb': 20 db = SystemDb() 21 db._env = env 22 db._changes = Changes({}, {}) 23 24 _ext_validate_settings(env, txn, version, identifier) 25 26 db._last_event_ids = dict( 27 env.ext_read(txn, common.DbType.SYSTEM_LAST_EVENT_ID)) 28 29 db._last_timestamps = dict( 30 env.ext_read(txn, common.DbType.SYSTEM_LAST_TIMESTAMP)) 31 32 return db
class
SystemDb:
35class SystemDb: 36 37 def get_last_event_id(self, 38 server_id: common.ServerId 39 ) -> common.EventId: 40 if server_id in self._last_event_ids: 41 return self._last_event_ids[server_id] 42 43 return common.EventId(server=server_id, 44 session=0, 45 instance=0) 46 47 def set_last_event_id(self, event_id: common.EventId): 48 self._changes.last_event_ids[event_id.server] = event_id 49 self._last_event_ids[event_id.server] = event_id 50 51 def get_last_timestamp(self, 52 server_id: common.ServerId 53 ) -> common.Timestamp: 54 if server_id in self._last_timestamps: 55 return self._last_timestamps[server_id] 56 57 return common.min_timestamp 58 59 def set_last_timestamp(self, 60 server_id: common.ServerId, 61 timestamp: common.Timestamp): 62 self._changes.last_timestamps[server_id] = timestamp 63 self._last_timestamps[server_id] = timestamp 64 65 def create_changes(self) -> Changes: 66 changes, self._changes = self._changes, Changes({}, {}) 67 return changes 68 69 def ext_write(self, 70 txn: lmdb.Transaction, 71 changes: Changes): 72 self._env.ext_write(txn, common.DbType.SYSTEM_LAST_EVENT_ID, 73 changes.last_event_ids.items()) 74 75 self._env.ext_write(txn, common.DbType.SYSTEM_LAST_TIMESTAMP, 76 changes.last_timestamps.items())