hat.event.backends.lmdb.backend
1from collections.abc import Collection 2from pathlib import Path 3import asyncio 4import collections 5import contextlib 6import logging 7import typing 8 9from hat import aio 10from hat import json 11 12from hat.event.backends.lmdb import common 13from hat.event.backends.lmdb import environment 14from hat.event.backends.lmdb import latestdb 15from hat.event.backends.lmdb import refdb 16from hat.event.backends.lmdb import systemdb 17from hat.event.backends.lmdb import timeseriesdb 18from hat.event.backends.lmdb.conditions import Conditions 19 20 21mlog = logging.getLogger(__name__) 22 23cleanup_max_results = 1024 24 25flush_queue_size = 4096 26 27max_registered_count = 1024 * 256 28 29default_timeseries_max_results = 4096 30 31version = '0.9' 32 33 34class Databases(typing.NamedTuple): 35 system: systemdb.SystemDb 36 latest: latestdb.LatestDb 37 timeseries: timeseriesdb.TimeseriesDb 38 ref: refdb.RefDb 39 40 41class Changes(typing.NamedTuple): 42 system: systemdb.Changes 43 latest: latestdb.Changes 44 timeseries: timeseriesdb.Changes 45 ref: refdb.Changes 46 47 48async def create(conf: json.Data, 49 registered_events_cb: common.BackendRegisteredEventsCb | None, 50 flushed_events_cb: common.BackendFlushedEventsCb | None 51 ) -> 'LmdbBackend': 52 backend = LmdbBackend() 53 backend._registered_events_cb = registered_events_cb 54 backend._flushed_events_cb = flushed_events_cb 55 backend._conditions = Conditions(conf['conditions']) 56 backend._loop = asyncio.get_running_loop() 57 backend._flush_queue = aio.Queue(flush_queue_size) 58 backend._registered_count = 0 59 backend._registered_queue = collections.deque() 60 backend._async_group = aio.Group() 61 62 backend._env = await environment.create(Path(conf['db_path'])) 63 backend.async_group.spawn(aio.call_on_done, backend._env.wait_closing(), 64 backend.close) 65 66 try: 67 latest_subscription = common.create_subscription( 68 tuple(i) for i in conf['latest']['subscriptions']) 69 70 timeseries_partitions = ( 71 timeseriesdb.Partition( 72 order_by=common.OrderBy[i['order_by']], 73 subscription=common.create_subscription( 74 tuple(event_type) for event_type in i['subscriptions']), 75 limit=( 76 timeseriesdb.Limit( 77 min_entries=i['limit'].get('min_entries'), 78 max_entries=i['limit'].get('max_entries'), 79 duration=i['limit'].get('duration'), 80 size=i['limit'].get('size')) 81 if 'limit' in i else None)) 82 for i in conf['timeseries']) 83 84 timeseries_max_results = conf.get('timeseries_max_results', 85 default_timeseries_max_results) 86 87 backend._dbs = await backend._env.execute( 88 _ext_create_dbs, backend._env, conf['identifier'], 89 backend._conditions, latest_subscription, timeseries_partitions, 90 timeseries_max_results) 91 92 backend.async_group.spawn(backend._flush_loop, conf['flush_period']) 93 backend.async_group.spawn(backend._cleanup_loop, 94 conf['cleanup_period']) 95 96 except BaseException: 97 await aio.uncancellable(backend._env.async_close()) 98 raise 99 100 return backend 101 102 103def _ext_create_dbs(env, identifier, conditions, latest_subscription, 104 timeseries_partitions, timeseries_max_results): 105 with env.ext_begin(write=True) as txn: 106 system_db = systemdb.ext_create(env, txn, version, identifier) 107 latest_db = latestdb.ext_create(env, txn, conditions, 108 latest_subscription) 109 timeseries_db = timeseriesdb.ext_create(env, txn, conditions, 110 timeseries_partitions, 111 timeseries_max_results) 112 ref_db = refdb.RefDb(env) 113 114 return Databases(system=system_db, 115 latest=latest_db, 116 timeseries=timeseries_db, 117 ref=ref_db) 118 119 120class LmdbBackend(common.Backend): 121 122 @property 123 def async_group(self) -> aio.Group: 124 return self._async_group 125 126 async def get_last_event_id(self, 127 server_id: int 128 ) -> common.EventId: 129 if not self.is_open: 130 raise common.BackendClosedError() 131 132 return self._dbs.system.get_last_event_id(server_id) 133 134 async def register(self, 135 events: Collection[common.Event] 136 ) -> Collection[common.Event] | None: 137 if not self.is_open: 138 raise common.BackendClosedError() 139 140 for event in events: 141 server_id = event.id.server 142 143 last_event_id = self._dbs.system.get_last_event_id(server_id) 144 last_timestamp = self._dbs.system.get_last_timestamp(server_id) 145 146 if last_event_id >= event.id: 147 mlog.warning("event registration skipped: invalid event id") 148 continue 149 150 if last_timestamp > event.timestamp: 151 mlog.warning("event registration skipped: invalid timestamp") 152 continue 153 154 if not self._conditions.matches(event): 155 mlog.warning("event registration skipped: invalid conditions") 156 continue 157 158 refs = collections.deque() 159 160 latest_result = self._dbs.latest.add(event) 161 162 if latest_result.added_ref: 163 refs.append(latest_result.added_ref) 164 165 if latest_result.removed_ref: 166 self._dbs.ref.remove(*latest_result.removed_ref) 167 168 refs.extend(self._dbs.timeseries.add(event)) 169 170 if not refs: 171 continue 172 173 self._dbs.ref.add(event, refs) 174 self._dbs.system.set_last_event_id(event.id) 175 self._dbs.system.set_last_timestamp(server_id, event.timestamp) 176 177 self._registered_queue.append(events) 178 self._registered_count += len(events) 179 180 if self._registered_count > max_registered_count: 181 await self._flush_queue.put(None) 182 183 if self._registered_events_cb: 184 await aio.call(self._registered_events_cb, events) 185 186 return events 187 188 async def query(self, 189 params: common.QueryParams 190 ) -> common.QueryResult: 191 if not self.is_open: 192 raise common.BackendClosedError() 193 194 if isinstance(params, common.QueryLatestParams): 195 return self._dbs.latest.query(params) 196 197 if isinstance(params, common.QueryTimeseriesParams): 198 return await self._dbs.timeseries.query(params) 199 200 if isinstance(params, common.QueryServerParams): 201 return await self._dbs.ref.query(params) 202 203 raise ValueError('unsupported params type') 204 205 async def flush(self): 206 try: 207 future = self._loop.create_future() 208 await self._flush_queue.put(future) 209 await future 210 211 except aio.QueueClosedError: 212 raise common.BackendClosedError() 213 214 async def _flush_loop(self, flush_period): 215 futures = collections.deque() 216 217 async def cleanup(): 218 with contextlib.suppress(Exception): 219 await self._flush() 220 221 await self._env.async_close() 222 223 try: 224 while True: 225 try: 226 future = await aio.wait_for(self._flush_queue.get(), 227 flush_period) 228 futures.append(future) 229 230 except asyncio.TimeoutError: 231 pass 232 233 except aio.CancelledWithResultError as e: 234 if e.result: 235 futures.append(e.result) 236 237 raise 238 239 while not self._flush_queue.empty(): 240 futures.append(self._flush_queue.get_nowait()) 241 242 await aio.uncancellable(self._flush()) 243 244 while futures: 245 future = futures.popleft() 246 if future and not future.done(): 247 future.set_result(None) 248 249 except Exception as e: 250 mlog.error('backend flush error: %s', e, exc_info=e) 251 252 finally: 253 self.close() 254 self._flush_queue.close() 255 256 while not self._flush_queue.empty(): 257 futures.append(self._flush_queue.get_nowait()) 258 259 for future in futures: 260 if future and not future.done(): 261 future.set_exception(common.BackendClosedError()) 262 263 await aio.uncancellable(cleanup()) 264 265 async def _cleanup_loop(self, cleanup_period): 266 try: 267 while True: 268 await asyncio.sleep(0) 269 270 repeat = await self._env.execute(_ext_cleanup, self._env, 271 self._dbs, common.now()) 272 if repeat: 273 continue 274 275 await asyncio.sleep(cleanup_period) 276 277 except Exception as e: 278 mlog.error('backend cleanup error: %s', e, exc_info=e) 279 280 finally: 281 self.close() 282 283 async def _flush(self): 284 if not self._env.is_open: 285 return 286 287 self._registered_count = 0 288 registered_queue, self._registered_queue = (self._registered_queue, 289 collections.deque()) 290 291 changes = Changes(system=self._dbs.system.create_changes(), 292 latest=self._dbs.latest.create_changes(), 293 timeseries=self._dbs.timeseries.create_changes(), 294 ref=self._dbs.ref.create_changes()) 295 296 # TODO lock period between create_changes and locking executor 297 # (timeseries and ref must write changes before new queries are 298 # allowed) 299 300 await self._env.execute(_ext_flush, self._env, self._dbs, changes) 301 302 if not self._flushed_events_cb: 303 return 304 305 while registered_queue: 306 events = registered_queue.popleft() 307 await aio.call(self._flushed_events_cb, events) 308 309 310def _ext_flush(env, dbs, changes): 311 with env.ext_begin(write=True) as txn: 312 dbs.system.ext_write(txn, changes.system) 313 dbs.latest.ext_write(txn, changes.latest) 314 dbs.timeseries.ext_write(txn, changes.timeseries) 315 dbs.ref.ext_write(txn, changes.ref) 316 317 318def _ext_cleanup(env, dbs, now): 319 with env.ext_begin(write=True) as txn: 320 result = dbs.timeseries.ext_cleanup(txn, now, cleanup_max_results) 321 if not result: 322 return False 323 324 dbs.ref.ext_cleanup(txn, result) 325 326 return len(result) >= cleanup_max_results
mlog =
<Logger hat.event.backends.lmdb.backend (WARNING)>
cleanup_max_results =
1024
flush_queue_size =
4096
max_registered_count =
262144
default_timeseries_max_results =
4096
version =
'0.9'
class
Databases(typing.NamedTuple):
35class Databases(typing.NamedTuple): 36 system: systemdb.SystemDb 37 latest: latestdb.LatestDb 38 timeseries: timeseriesdb.TimeseriesDb 39 ref: refdb.RefDb
Databases(system, latest, timeseries, ref)
Databases( system: hat.event.backends.lmdb.systemdb.SystemDb, latest: hat.event.backends.lmdb.latestdb.LatestDb, timeseries: hat.event.backends.lmdb.timeseriesdb.TimeseriesDb, ref: hat.event.backends.lmdb.refdb.RefDb)
Create new instance of Databases(system, latest, timeseries, ref)
class
Changes(typing.NamedTuple):
42class Changes(typing.NamedTuple): 43 system: systemdb.Changes 44 latest: latestdb.Changes 45 timeseries: timeseriesdb.Changes 46 ref: refdb.Changes
Changes(system, latest, timeseries, ref)
Changes( system: hat.event.backends.lmdb.systemdb.Changes, latest: hat.event.backends.lmdb.latestdb.Changes, timeseries: dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]], ref: dict[int, hat.event.backends.lmdb.refdb.ServerChanges])
Create new instance of Changes(system, latest, timeseries, ref)
timeseries: dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]]
Alias for field number 2
async def
create( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], registered_events_cb: Optional[Callable[[Collection[hat.event.common.Event]], None | Awaitable[None]]], flushed_events_cb: Optional[Callable[[Collection[hat.event.common.Event]], None | Awaitable[None]]]) -> LmdbBackend:
49async def create(conf: json.Data, 50 registered_events_cb: common.BackendRegisteredEventsCb | None, 51 flushed_events_cb: common.BackendFlushedEventsCb | None 52 ) -> 'LmdbBackend': 53 backend = LmdbBackend() 54 backend._registered_events_cb = registered_events_cb 55 backend._flushed_events_cb = flushed_events_cb 56 backend._conditions = Conditions(conf['conditions']) 57 backend._loop = asyncio.get_running_loop() 58 backend._flush_queue = aio.Queue(flush_queue_size) 59 backend._registered_count = 0 60 backend._registered_queue = collections.deque() 61 backend._async_group = aio.Group() 62 63 backend._env = await environment.create(Path(conf['db_path'])) 64 backend.async_group.spawn(aio.call_on_done, backend._env.wait_closing(), 65 backend.close) 66 67 try: 68 latest_subscription = common.create_subscription( 69 tuple(i) for i in conf['latest']['subscriptions']) 70 71 timeseries_partitions = ( 72 timeseriesdb.Partition( 73 order_by=common.OrderBy[i['order_by']], 74 subscription=common.create_subscription( 75 tuple(event_type) for event_type in i['subscriptions']), 76 limit=( 77 timeseriesdb.Limit( 78 min_entries=i['limit'].get('min_entries'), 79 max_entries=i['limit'].get('max_entries'), 80 duration=i['limit'].get('duration'), 81 size=i['limit'].get('size')) 82 if 'limit' in i else None)) 83 for i in conf['timeseries']) 84 85 timeseries_max_results = conf.get('timeseries_max_results', 86 default_timeseries_max_results) 87 88 backend._dbs = await backend._env.execute( 89 _ext_create_dbs, backend._env, conf['identifier'], 90 backend._conditions, latest_subscription, timeseries_partitions, 91 timeseries_max_results) 92 93 backend.async_group.spawn(backend._flush_loop, conf['flush_period']) 94 backend.async_group.spawn(backend._cleanup_loop, 95 conf['cleanup_period']) 96 97 except BaseException: 98 await aio.uncancellable(backend._env.async_close()) 99 raise 100 101 return backend
class
LmdbBackend(hat.event.common.backend.Backend):
121class LmdbBackend(common.Backend): 122 123 @property 124 def async_group(self) -> aio.Group: 125 return self._async_group 126 127 async def get_last_event_id(self, 128 server_id: int 129 ) -> common.EventId: 130 if not self.is_open: 131 raise common.BackendClosedError() 132 133 return self._dbs.system.get_last_event_id(server_id) 134 135 async def register(self, 136 events: Collection[common.Event] 137 ) -> Collection[common.Event] | None: 138 if not self.is_open: 139 raise common.BackendClosedError() 140 141 for event in events: 142 server_id = event.id.server 143 144 last_event_id = self._dbs.system.get_last_event_id(server_id) 145 last_timestamp = self._dbs.system.get_last_timestamp(server_id) 146 147 if last_event_id >= event.id: 148 mlog.warning("event registration skipped: invalid event id") 149 continue 150 151 if last_timestamp > event.timestamp: 152 mlog.warning("event registration skipped: invalid timestamp") 153 continue 154 155 if not self._conditions.matches(event): 156 mlog.warning("event registration skipped: invalid conditions") 157 continue 158 159 refs = collections.deque() 160 161 latest_result = self._dbs.latest.add(event) 162 163 if latest_result.added_ref: 164 refs.append(latest_result.added_ref) 165 166 if latest_result.removed_ref: 167 self._dbs.ref.remove(*latest_result.removed_ref) 168 169 refs.extend(self._dbs.timeseries.add(event)) 170 171 if not refs: 172 continue 173 174 self._dbs.ref.add(event, refs) 175 self._dbs.system.set_last_event_id(event.id) 176 self._dbs.system.set_last_timestamp(server_id, event.timestamp) 177 178 self._registered_queue.append(events) 179 self._registered_count += len(events) 180 181 if self._registered_count > max_registered_count: 182 await self._flush_queue.put(None) 183 184 if self._registered_events_cb: 185 await aio.call(self._registered_events_cb, events) 186 187 return events 188 189 async def query(self, 190 params: common.QueryParams 191 ) -> common.QueryResult: 192 if not self.is_open: 193 raise common.BackendClosedError() 194 195 if isinstance(params, common.QueryLatestParams): 196 return self._dbs.latest.query(params) 197 198 if isinstance(params, common.QueryTimeseriesParams): 199 return await self._dbs.timeseries.query(params) 200 201 if isinstance(params, common.QueryServerParams): 202 return await self._dbs.ref.query(params) 203 204 raise ValueError('unsupported params type') 205 206 async def flush(self): 207 try: 208 future = self._loop.create_future() 209 await self._flush_queue.put(future) 210 await future 211 212 except aio.QueueClosedError: 213 raise common.BackendClosedError() 214 215 async def _flush_loop(self, flush_period): 216 futures = collections.deque() 217 218 async def cleanup(): 219 with contextlib.suppress(Exception): 220 await self._flush() 221 222 await self._env.async_close() 223 224 try: 225 while True: 226 try: 227 future = await aio.wait_for(self._flush_queue.get(), 228 flush_period) 229 futures.append(future) 230 231 except asyncio.TimeoutError: 232 pass 233 234 except aio.CancelledWithResultError as e: 235 if e.result: 236 futures.append(e.result) 237 238 raise 239 240 while not self._flush_queue.empty(): 241 futures.append(self._flush_queue.get_nowait()) 242 243 await aio.uncancellable(self._flush()) 244 245 while futures: 246 future = futures.popleft() 247 if future and not future.done(): 248 future.set_result(None) 249 250 except Exception as e: 251 mlog.error('backend flush error: %s', e, exc_info=e) 252 253 finally: 254 self.close() 255 self._flush_queue.close() 256 257 while not self._flush_queue.empty(): 258 futures.append(self._flush_queue.get_nowait()) 259 260 for future in futures: 261 if future and not future.done(): 262 future.set_exception(common.BackendClosedError()) 263 264 await aio.uncancellable(cleanup()) 265 266 async def _cleanup_loop(self, cleanup_period): 267 try: 268 while True: 269 await asyncio.sleep(0) 270 271 repeat = await self._env.execute(_ext_cleanup, self._env, 272 self._dbs, common.now()) 273 if repeat: 274 continue 275 276 await asyncio.sleep(cleanup_period) 277 278 except Exception as e: 279 mlog.error('backend cleanup error: %s', e, exc_info=e) 280 281 finally: 282 self.close() 283 284 async def _flush(self): 285 if not self._env.is_open: 286 return 287 288 self._registered_count = 0 289 registered_queue, self._registered_queue = (self._registered_queue, 290 collections.deque()) 291 292 changes = Changes(system=self._dbs.system.create_changes(), 293 latest=self._dbs.latest.create_changes(), 294 timeseries=self._dbs.timeseries.create_changes(), 295 ref=self._dbs.ref.create_changes()) 296 297 # TODO lock period between create_changes and locking executor 298 # (timeseries and ref must write changes before new queries are 299 # allowed) 300 301 await self._env.execute(_ext_flush, self._env, self._dbs, changes) 302 303 if not self._flushed_events_cb: 304 return 305 306 while registered_queue: 307 events = registered_queue.popleft() 308 await aio.call(self._flushed_events_cb, events)
Backend ABC
127 async def get_last_event_id(self, 128 server_id: int 129 ) -> common.EventId: 130 if not self.is_open: 131 raise common.BackendClosedError() 132 133 return self._dbs.system.get_last_event_id(server_id)
Get last registered event id associated with server id
async def
register( self, events: Collection[hat.event.common.Event]) -> Collection[hat.event.common.Event] | None:
135 async def register(self, 136 events: Collection[common.Event] 137 ) -> Collection[common.Event] | None: 138 if not self.is_open: 139 raise common.BackendClosedError() 140 141 for event in events: 142 server_id = event.id.server 143 144 last_event_id = self._dbs.system.get_last_event_id(server_id) 145 last_timestamp = self._dbs.system.get_last_timestamp(server_id) 146 147 if last_event_id >= event.id: 148 mlog.warning("event registration skipped: invalid event id") 149 continue 150 151 if last_timestamp > event.timestamp: 152 mlog.warning("event registration skipped: invalid timestamp") 153 continue 154 155 if not self._conditions.matches(event): 156 mlog.warning("event registration skipped: invalid conditions") 157 continue 158 159 refs = collections.deque() 160 161 latest_result = self._dbs.latest.add(event) 162 163 if latest_result.added_ref: 164 refs.append(latest_result.added_ref) 165 166 if latest_result.removed_ref: 167 self._dbs.ref.remove(*latest_result.removed_ref) 168 169 refs.extend(self._dbs.timeseries.add(event)) 170 171 if not refs: 172 continue 173 174 self._dbs.ref.add(event, refs) 175 self._dbs.system.set_last_event_id(event.id) 176 self._dbs.system.set_last_timestamp(server_id, event.timestamp) 177 178 self._registered_queue.append(events) 179 self._registered_count += len(events) 180 181 if self._registered_count > max_registered_count: 182 await self._flush_queue.put(None) 183 184 if self._registered_events_cb: 185 await aio.call(self._registered_events_cb, events) 186 187 return events
Register events
async def
query( self, params: hat.event.common.QueryLatestParams | hat.event.common.QueryTimeseriesParams | hat.event.common.QueryServerParams) -> hat.event.common.QueryResult:
189 async def query(self, 190 params: common.QueryParams 191 ) -> common.QueryResult: 192 if not self.is_open: 193 raise common.BackendClosedError() 194 195 if isinstance(params, common.QueryLatestParams): 196 return self._dbs.latest.query(params) 197 198 if isinstance(params, common.QueryTimeseriesParams): 199 return await self._dbs.timeseries.query(params) 200 201 if isinstance(params, common.QueryServerParams): 202 return await self._dbs.ref.query(params) 203 204 raise ValueError('unsupported params type')
Query events