hat.event.backends.lmdb.backend
1from collections.abc import Collection 2from pathlib import Path 3import asyncio 4import collections 5import contextlib 6import logging 7import typing 8 9from hat import aio 10from hat import json 11 12from hat.event.backends.lmdb import common 13from hat.event.backends.lmdb import environment 14from hat.event.backends.lmdb import latestdb 15from hat.event.backends.lmdb import refdb 16from hat.event.backends.lmdb import systemdb 17from hat.event.backends.lmdb import timeseriesdb 18from hat.event.backends.lmdb.conditions import Conditions 19 20 21mlog = logging.getLogger(__name__) 22 23cleanup_max_results = 1024 24 25flush_queue_size = 4096 26 27max_registered_count = 256 * 1024 28 29default_timeseries_max_results = 4096 30 31default_timeseries_event_type_cache_size = 256 * 1024 32 33version = '0.9' 34 35 36class Databases(typing.NamedTuple): 37 system: systemdb.SystemDb 38 latest: latestdb.LatestDb 39 timeseries: timeseriesdb.TimeseriesDb 40 ref: refdb.RefDb 41 42 43class Changes(typing.NamedTuple): 44 system: systemdb.Changes 45 latest: latestdb.Changes 46 timeseries: timeseriesdb.Changes 47 ref: refdb.Changes 48 49 50async def create(conf: json.Data, 51 registered_events_cb: common.BackendRegisteredEventsCb | None, 52 flushed_events_cb: common.BackendFlushedEventsCb | None 53 ) -> 'LmdbBackend': 54 backend = LmdbBackend() 55 backend._registered_events_cb = registered_events_cb 56 backend._flushed_events_cb = flushed_events_cb 57 backend._conditions = Conditions(conf['conditions']) 58 backend._loop = asyncio.get_running_loop() 59 backend._flush_queue = aio.Queue(flush_queue_size) 60 backend._registered_count = 0 61 backend._registered_queue = collections.deque() 62 backend._async_group = aio.Group() 63 64 backend._env = await environment.create(Path(conf['db_path'])) 65 backend.async_group.spawn(aio.call_on_done, backend._env.wait_closing(), 66 backend.close) 67 68 try: 69 latest_subscription = common.create_subscription( 70 tuple(i) for i in conf['latest']['subscriptions']) 71 72 timeseries_partitions = ( 73 timeseriesdb.Partition( 74 order_by=common.OrderBy[i['order_by']], 75 subscription=common.create_subscription( 76 tuple(event_type) for event_type in i['subscriptions']), 77 limit=( 78 timeseriesdb.Limit( 79 min_entries=i['limit'].get('min_entries'), 80 max_entries=i['limit'].get('max_entries'), 81 duration=i['limit'].get('duration'), 82 size=i['limit'].get('size')) 83 if 'limit' in i else None)) 84 for i in conf['timeseries']) 85 86 timeseries_max_results = conf.get('timeseries_max_results', 87 default_timeseries_max_results) 88 timeseries_event_type_cache_size = conf.get( 89 'timeseries_event_type_cache_size', 90 default_timeseries_event_type_cache_size) 91 92 backend._dbs = await backend._env.execute( 93 _ext_create_dbs, backend._env, conf['identifier'], 94 backend._conditions, latest_subscription, timeseries_partitions, 95 timeseries_max_results, timeseries_event_type_cache_size) 96 97 backend.async_group.spawn(backend._flush_loop, conf['flush_period']) 98 backend.async_group.spawn(backend._cleanup_loop, 99 conf['cleanup_period']) 100 101 except BaseException: 102 await aio.uncancellable(backend._env.async_close()) 103 raise 104 105 return backend 106 107 108def _ext_create_dbs(env, identifier, conditions, latest_subscription, 109 timeseries_partitions, timeseries_max_results, 110 timeseries_event_type_cache_size): 111 with env.ext_begin(write=True) as txn: 112 system_db = systemdb.ext_create(env, txn, version, identifier) 113 latest_db = latestdb.ext_create(env, txn, conditions, 114 latest_subscription) 115 timeseries_db = timeseriesdb.ext_create( 116 env, txn, conditions, timeseries_partitions, 117 timeseries_max_results, timeseries_event_type_cache_size) 118 ref_db = refdb.RefDb(env) 119 120 return Databases(system=system_db, 121 latest=latest_db, 122 timeseries=timeseries_db, 123 ref=ref_db) 124 125 126class LmdbBackend(common.Backend): 127 128 @property 129 def async_group(self) -> aio.Group: 130 return self._async_group 131 132 async def get_last_event_id(self, 133 server_id: int 134 ) -> common.EventId: 135 if not self.is_open: 136 raise common.BackendClosedError() 137 138 return self._dbs.system.get_last_event_id(server_id) 139 140 async def register(self, 141 events: Collection[common.Event] 142 ) -> Collection[common.Event] | None: 143 if not self.is_open: 144 raise common.BackendClosedError() 145 146 for event in events: 147 server_id = event.id.server 148 149 last_event_id = self._dbs.system.get_last_event_id(server_id) 150 last_timestamp = self._dbs.system.get_last_timestamp(server_id) 151 152 if last_event_id >= event.id: 153 mlog.warning("event registration skipped: invalid event id") 154 continue 155 156 if last_timestamp > event.timestamp: 157 mlog.warning("event registration skipped: invalid timestamp") 158 continue 159 160 if not self._conditions.matches(event): 161 mlog.warning("event registration skipped: invalid conditions") 162 continue 163 164 refs = collections.deque() 165 166 latest_result = self._dbs.latest.add(event) 167 168 if latest_result.added_ref: 169 refs.append(latest_result.added_ref) 170 171 if latest_result.removed_ref: 172 self._dbs.ref.remove(*latest_result.removed_ref) 173 174 refs.extend(self._dbs.timeseries.add(event)) 175 176 if not refs: 177 continue 178 179 self._dbs.ref.add(event, refs) 180 self._dbs.system.set_last_event_id(event.id) 181 self._dbs.system.set_last_timestamp(server_id, event.timestamp) 182 183 self._registered_queue.append(events) 184 self._registered_count += len(events) 185 186 if self._registered_count > max_registered_count: 187 await self._flush_queue.put(None) 188 189 if self._registered_events_cb: 190 await aio.call(self._registered_events_cb, events) 191 192 return events 193 194 async def query(self, 195 params: common.QueryParams 196 ) -> common.QueryResult: 197 if not self.is_open: 198 raise common.BackendClosedError() 199 200 if isinstance(params, common.QueryLatestParams): 201 return self._dbs.latest.query(params) 202 203 if isinstance(params, common.QueryTimeseriesParams): 204 return await self._dbs.timeseries.query(params) 205 206 if isinstance(params, common.QueryServerParams): 207 return await self._dbs.ref.query(params) 208 209 raise ValueError('unsupported params type') 210 211 async def flush(self): 212 try: 213 future = self._loop.create_future() 214 await self._flush_queue.put(future) 215 await future 216 217 except aio.QueueClosedError: 218 raise common.BackendClosedError() 219 220 async def _flush_loop(self, flush_period): 221 futures = collections.deque() 222 223 async def cleanup(): 224 with contextlib.suppress(Exception): 225 await self._flush() 226 227 await self._env.async_close() 228 229 try: 230 while True: 231 try: 232 future = await aio.wait_for(self._flush_queue.get(), 233 flush_period) 234 futures.append(future) 235 236 except asyncio.TimeoutError: 237 pass 238 239 except aio.CancelledWithResultError as e: 240 if e.result: 241 futures.append(e.result) 242 243 raise 244 245 while not self._flush_queue.empty(): 246 futures.append(self._flush_queue.get_nowait()) 247 248 await aio.uncancellable(self._flush()) 249 250 while futures: 251 future = futures.popleft() 252 if future and not future.done(): 253 future.set_result(None) 254 255 except Exception as e: 256 mlog.error('backend flush error: %s', e, exc_info=e) 257 258 finally: 259 self.close() 260 self._flush_queue.close() 261 262 while not self._flush_queue.empty(): 263 futures.append(self._flush_queue.get_nowait()) 264 265 for future in futures: 266 if future and not future.done(): 267 future.set_exception(common.BackendClosedError()) 268 269 await aio.uncancellable(cleanup()) 270 271 async def _cleanup_loop(self, cleanup_period): 272 try: 273 while True: 274 await asyncio.sleep(0) 275 276 repeat = await self._env.execute(_ext_cleanup, self._env, 277 self._dbs, common.now()) 278 if repeat: 279 continue 280 281 await asyncio.sleep(cleanup_period) 282 283 except Exception as e: 284 mlog.error('backend cleanup error: %s', e, exc_info=e) 285 286 finally: 287 self.close() 288 289 async def _flush(self): 290 if not self._env.is_open: 291 return 292 293 self._registered_count = 0 294 registered_queue, self._registered_queue = (self._registered_queue, 295 collections.deque()) 296 297 changes = Changes(system=self._dbs.system.create_changes(), 298 latest=self._dbs.latest.create_changes(), 299 timeseries=self._dbs.timeseries.create_changes(), 300 ref=self._dbs.ref.create_changes()) 301 302 # TODO lock period between create_changes and locking executor 303 # (timeseries and ref must write changes before new queries are 304 # allowed) 305 306 await self._env.execute(_ext_flush, self._env, self._dbs, changes) 307 308 if not self._flushed_events_cb: 309 return 310 311 while registered_queue: 312 events = registered_queue.popleft() 313 await aio.call(self._flushed_events_cb, events) 314 315 316def _ext_flush(env, dbs, changes): 317 with env.ext_begin(write=True) as txn: 318 dbs.system.ext_write(txn, changes.system) 319 dbs.latest.ext_write(txn, changes.latest) 320 dbs.timeseries.ext_write(txn, changes.timeseries) 321 dbs.ref.ext_write(txn, changes.ref) 322 323 324def _ext_cleanup(env, dbs, now): 325 with env.ext_begin(write=True) as txn: 326 result = dbs.timeseries.ext_cleanup(txn, now, cleanup_max_results) 327 if not result: 328 return False 329 330 dbs.ref.ext_cleanup(txn, result) 331 332 return len(result) >= cleanup_max_results
mlog =
<Logger hat.event.backends.lmdb.backend (WARNING)>
cleanup_max_results =
1024
flush_queue_size =
4096
max_registered_count =
262144
default_timeseries_max_results =
4096
default_timeseries_event_type_cache_size =
262144
version =
'0.9'
class
Databases(typing.NamedTuple):
37class Databases(typing.NamedTuple): 38 system: systemdb.SystemDb 39 latest: latestdb.LatestDb 40 timeseries: timeseriesdb.TimeseriesDb 41 ref: refdb.RefDb
Databases(system, latest, timeseries, ref)
Databases( system: hat.event.backends.lmdb.systemdb.SystemDb, latest: hat.event.backends.lmdb.latestdb.LatestDb, timeseries: hat.event.backends.lmdb.timeseriesdb.TimeseriesDb, ref: hat.event.backends.lmdb.refdb.RefDb)
Create new instance of Databases(system, latest, timeseries, ref)
class
Changes(typing.NamedTuple):
44class Changes(typing.NamedTuple): 45 system: systemdb.Changes 46 latest: latestdb.Changes 47 timeseries: timeseriesdb.Changes 48 ref: refdb.Changes
Changes(system, latest, timeseries, ref)
Changes( system: hat.event.backends.lmdb.systemdb.Changes, latest: hat.event.backends.lmdb.latestdb.Changes, timeseries: dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]], ref: dict[int, hat.event.backends.lmdb.refdb.ServerChanges])
Create new instance of Changes(system, latest, timeseries, ref)
timeseries: dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]]
Alias for field number 2
async def
create( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], registered_events_cb: Optional[Callable[[Collection[hat.event.common.Event]], None | Awaitable[None]]], flushed_events_cb: Optional[Callable[[Collection[hat.event.common.Event]], None | Awaitable[None]]]) -> LmdbBackend:
51async def create(conf: json.Data, 52 registered_events_cb: common.BackendRegisteredEventsCb | None, 53 flushed_events_cb: common.BackendFlushedEventsCb | None 54 ) -> 'LmdbBackend': 55 backend = LmdbBackend() 56 backend._registered_events_cb = registered_events_cb 57 backend._flushed_events_cb = flushed_events_cb 58 backend._conditions = Conditions(conf['conditions']) 59 backend._loop = asyncio.get_running_loop() 60 backend._flush_queue = aio.Queue(flush_queue_size) 61 backend._registered_count = 0 62 backend._registered_queue = collections.deque() 63 backend._async_group = aio.Group() 64 65 backend._env = await environment.create(Path(conf['db_path'])) 66 backend.async_group.spawn(aio.call_on_done, backend._env.wait_closing(), 67 backend.close) 68 69 try: 70 latest_subscription = common.create_subscription( 71 tuple(i) for i in conf['latest']['subscriptions']) 72 73 timeseries_partitions = ( 74 timeseriesdb.Partition( 75 order_by=common.OrderBy[i['order_by']], 76 subscription=common.create_subscription( 77 tuple(event_type) for event_type in i['subscriptions']), 78 limit=( 79 timeseriesdb.Limit( 80 min_entries=i['limit'].get('min_entries'), 81 max_entries=i['limit'].get('max_entries'), 82 duration=i['limit'].get('duration'), 83 size=i['limit'].get('size')) 84 if 'limit' in i else None)) 85 for i in conf['timeseries']) 86 87 timeseries_max_results = conf.get('timeseries_max_results', 88 default_timeseries_max_results) 89 timeseries_event_type_cache_size = conf.get( 90 'timeseries_event_type_cache_size', 91 default_timeseries_event_type_cache_size) 92 93 backend._dbs = await backend._env.execute( 94 _ext_create_dbs, backend._env, conf['identifier'], 95 backend._conditions, latest_subscription, timeseries_partitions, 96 timeseries_max_results, timeseries_event_type_cache_size) 97 98 backend.async_group.spawn(backend._flush_loop, conf['flush_period']) 99 backend.async_group.spawn(backend._cleanup_loop, 100 conf['cleanup_period']) 101 102 except BaseException: 103 await aio.uncancellable(backend._env.async_close()) 104 raise 105 106 return backend
class
LmdbBackend(hat.event.common.backend.Backend):
127class LmdbBackend(common.Backend): 128 129 @property 130 def async_group(self) -> aio.Group: 131 return self._async_group 132 133 async def get_last_event_id(self, 134 server_id: int 135 ) -> common.EventId: 136 if not self.is_open: 137 raise common.BackendClosedError() 138 139 return self._dbs.system.get_last_event_id(server_id) 140 141 async def register(self, 142 events: Collection[common.Event] 143 ) -> Collection[common.Event] | None: 144 if not self.is_open: 145 raise common.BackendClosedError() 146 147 for event in events: 148 server_id = event.id.server 149 150 last_event_id = self._dbs.system.get_last_event_id(server_id) 151 last_timestamp = self._dbs.system.get_last_timestamp(server_id) 152 153 if last_event_id >= event.id: 154 mlog.warning("event registration skipped: invalid event id") 155 continue 156 157 if last_timestamp > event.timestamp: 158 mlog.warning("event registration skipped: invalid timestamp") 159 continue 160 161 if not self._conditions.matches(event): 162 mlog.warning("event registration skipped: invalid conditions") 163 continue 164 165 refs = collections.deque() 166 167 latest_result = self._dbs.latest.add(event) 168 169 if latest_result.added_ref: 170 refs.append(latest_result.added_ref) 171 172 if latest_result.removed_ref: 173 self._dbs.ref.remove(*latest_result.removed_ref) 174 175 refs.extend(self._dbs.timeseries.add(event)) 176 177 if not refs: 178 continue 179 180 self._dbs.ref.add(event, refs) 181 self._dbs.system.set_last_event_id(event.id) 182 self._dbs.system.set_last_timestamp(server_id, event.timestamp) 183 184 self._registered_queue.append(events) 185 self._registered_count += len(events) 186 187 if self._registered_count > max_registered_count: 188 await self._flush_queue.put(None) 189 190 if self._registered_events_cb: 191 await aio.call(self._registered_events_cb, events) 192 193 return events 194 195 async def query(self, 196 params: common.QueryParams 197 ) -> common.QueryResult: 198 if not self.is_open: 199 raise common.BackendClosedError() 200 201 if isinstance(params, common.QueryLatestParams): 202 return self._dbs.latest.query(params) 203 204 if isinstance(params, common.QueryTimeseriesParams): 205 return await self._dbs.timeseries.query(params) 206 207 if isinstance(params, common.QueryServerParams): 208 return await self._dbs.ref.query(params) 209 210 raise ValueError('unsupported params type') 211 212 async def flush(self): 213 try: 214 future = self._loop.create_future() 215 await self._flush_queue.put(future) 216 await future 217 218 except aio.QueueClosedError: 219 raise common.BackendClosedError() 220 221 async def _flush_loop(self, flush_period): 222 futures = collections.deque() 223 224 async def cleanup(): 225 with contextlib.suppress(Exception): 226 await self._flush() 227 228 await self._env.async_close() 229 230 try: 231 while True: 232 try: 233 future = await aio.wait_for(self._flush_queue.get(), 234 flush_period) 235 futures.append(future) 236 237 except asyncio.TimeoutError: 238 pass 239 240 except aio.CancelledWithResultError as e: 241 if e.result: 242 futures.append(e.result) 243 244 raise 245 246 while not self._flush_queue.empty(): 247 futures.append(self._flush_queue.get_nowait()) 248 249 await aio.uncancellable(self._flush()) 250 251 while futures: 252 future = futures.popleft() 253 if future and not future.done(): 254 future.set_result(None) 255 256 except Exception as e: 257 mlog.error('backend flush error: %s', e, exc_info=e) 258 259 finally: 260 self.close() 261 self._flush_queue.close() 262 263 while not self._flush_queue.empty(): 264 futures.append(self._flush_queue.get_nowait()) 265 266 for future in futures: 267 if future and not future.done(): 268 future.set_exception(common.BackendClosedError()) 269 270 await aio.uncancellable(cleanup()) 271 272 async def _cleanup_loop(self, cleanup_period): 273 try: 274 while True: 275 await asyncio.sleep(0) 276 277 repeat = await self._env.execute(_ext_cleanup, self._env, 278 self._dbs, common.now()) 279 if repeat: 280 continue 281 282 await asyncio.sleep(cleanup_period) 283 284 except Exception as e: 285 mlog.error('backend cleanup error: %s', e, exc_info=e) 286 287 finally: 288 self.close() 289 290 async def _flush(self): 291 if not self._env.is_open: 292 return 293 294 self._registered_count = 0 295 registered_queue, self._registered_queue = (self._registered_queue, 296 collections.deque()) 297 298 changes = Changes(system=self._dbs.system.create_changes(), 299 latest=self._dbs.latest.create_changes(), 300 timeseries=self._dbs.timeseries.create_changes(), 301 ref=self._dbs.ref.create_changes()) 302 303 # TODO lock period between create_changes and locking executor 304 # (timeseries and ref must write changes before new queries are 305 # allowed) 306 307 await self._env.execute(_ext_flush, self._env, self._dbs, changes) 308 309 if not self._flushed_events_cb: 310 return 311 312 while registered_queue: 313 events = registered_queue.popleft() 314 await aio.call(self._flushed_events_cb, events)
Backend ABC
133 async def get_last_event_id(self, 134 server_id: int 135 ) -> common.EventId: 136 if not self.is_open: 137 raise common.BackendClosedError() 138 139 return self._dbs.system.get_last_event_id(server_id)
Get last registered event id associated with server id
async def
register( self, events: Collection[hat.event.common.Event]) -> Collection[hat.event.common.Event] | None:
141 async def register(self, 142 events: Collection[common.Event] 143 ) -> Collection[common.Event] | None: 144 if not self.is_open: 145 raise common.BackendClosedError() 146 147 for event in events: 148 server_id = event.id.server 149 150 last_event_id = self._dbs.system.get_last_event_id(server_id) 151 last_timestamp = self._dbs.system.get_last_timestamp(server_id) 152 153 if last_event_id >= event.id: 154 mlog.warning("event registration skipped: invalid event id") 155 continue 156 157 if last_timestamp > event.timestamp: 158 mlog.warning("event registration skipped: invalid timestamp") 159 continue 160 161 if not self._conditions.matches(event): 162 mlog.warning("event registration skipped: invalid conditions") 163 continue 164 165 refs = collections.deque() 166 167 latest_result = self._dbs.latest.add(event) 168 169 if latest_result.added_ref: 170 refs.append(latest_result.added_ref) 171 172 if latest_result.removed_ref: 173 self._dbs.ref.remove(*latest_result.removed_ref) 174 175 refs.extend(self._dbs.timeseries.add(event)) 176 177 if not refs: 178 continue 179 180 self._dbs.ref.add(event, refs) 181 self._dbs.system.set_last_event_id(event.id) 182 self._dbs.system.set_last_timestamp(server_id, event.timestamp) 183 184 self._registered_queue.append(events) 185 self._registered_count += len(events) 186 187 if self._registered_count > max_registered_count: 188 await self._flush_queue.put(None) 189 190 if self._registered_events_cb: 191 await aio.call(self._registered_events_cb, events) 192 193 return events
Register events
async def
query( self, params: hat.event.common.QueryLatestParams | hat.event.common.QueryTimeseriesParams | hat.event.common.QueryServerParams) -> hat.event.common.QueryResult:
195 async def query(self, 196 params: common.QueryParams 197 ) -> common.QueryResult: 198 if not self.is_open: 199 raise common.BackendClosedError() 200 201 if isinstance(params, common.QueryLatestParams): 202 return self._dbs.latest.query(params) 203 204 if isinstance(params, common.QueryTimeseriesParams): 205 return await self._dbs.timeseries.query(params) 206 207 if isinstance(params, common.QueryServerParams): 208 return await self._dbs.ref.query(params) 209 210 raise ValueError('unsupported params type')
Query events