hat.event.backends.lmdb.backend
1from collections.abc import Collection 2from pathlib import Path 3import asyncio 4import collections 5import contextlib 6import logging 7import typing 8 9from hat import aio 10from hat import json 11 12from hat.event.backends.lmdb import common 13from hat.event.backends.lmdb import environment 14from hat.event.backends.lmdb import latestdb 15from hat.event.backends.lmdb import refdb 16from hat.event.backends.lmdb import systemdb 17from hat.event.backends.lmdb import timeseriesdb 18from hat.event.backends.lmdb.conditions import Conditions 19 20 21mlog = logging.getLogger(__name__) 22 23cleanup_max_results = 1024 24 25flush_queue_size = 4096 26 27max_registered_count = 256 * 1024 28 29default_timeseries_max_results = 4096 30 31default_timeseries_event_type_cache_size = 256 * 1024 32 33version = '0.9' 34 35 36class Databases(typing.NamedTuple): 37 system: systemdb.SystemDb 38 latest: latestdb.LatestDb 39 timeseries: timeseriesdb.TimeseriesDb 40 ref: refdb.RefDb 41 42 43class Changes(typing.NamedTuple): 44 system: systemdb.Changes 45 latest: latestdb.Changes 46 timeseries: timeseriesdb.Changes 47 ref: refdb.Changes 48 49 50async def create(conf: json.Data, 51 registered_events_cb: common.BackendRegisteredEventsCb | None, 52 flushed_events_cb: common.BackendFlushedEventsCb | None 53 ) -> 'LmdbBackend': 54 backend = LmdbBackend() 55 backend._registered_events_cb = registered_events_cb 56 backend._flushed_events_cb = flushed_events_cb 57 backend._conditions = Conditions(conf['conditions']) 58 backend._loop = asyncio.get_running_loop() 59 backend._flush_queue = aio.Queue(flush_queue_size) 60 backend._registered_count = 0 61 backend._registered_queue = collections.deque() 62 backend._async_group = aio.Group() 63 64 backend._env = await environment.create(Path(conf['db_path'])) 65 backend.async_group.spawn(aio.call_on_done, backend._env.wait_closing(), 66 backend.close) 67 68 try: 69 latest_subscription = common.create_subscription( 70 tuple(i) for i in conf['latest']['subscriptions']) 71 72 timeseries_partitions = ( 73 timeseriesdb.Partition( 74 order_by=common.OrderBy[i['order_by']], 75 subscription=common.create_subscription( 76 tuple(event_type) for event_type in i['subscriptions']), 77 limit=( 78 timeseriesdb.Limit( 79 min_entries=i['limit'].get('min_entries'), 80 max_entries=i['limit'].get('max_entries'), 81 duration=i['limit'].get('duration'), 82 size=i['limit'].get('size')) 83 if 'limit' in i else None)) 84 for i in conf['timeseries']) 85 86 timeseries_max_results = conf.get('timeseries_max_results', 87 default_timeseries_max_results) 88 timeseries_event_type_cache_size = conf.get( 89 'timeseries_event_type_cache_size', 90 default_timeseries_event_type_cache_size) 91 92 backend._dbs = await backend._env.execute( 93 _ext_create_dbs, backend._env, conf['identifier'], 94 backend._conditions, latest_subscription, timeseries_partitions, 95 timeseries_max_results, timeseries_event_type_cache_size) 96 97 backend.async_group.spawn(backend._flush_loop, conf['flush_period']) 98 backend.async_group.spawn(backend._cleanup_loop, 99 conf['cleanup_period']) 100 101 except BaseException: 102 await aio.uncancellable(backend._env.async_close()) 103 raise 104 105 return backend 106 107 108def _ext_create_dbs(env, identifier, conditions, latest_subscription, 109 timeseries_partitions, timeseries_max_results, 110 timeseries_event_type_cache_size): 111 with env.ext_begin(write=True) as txn: 112 system_db = systemdb.ext_create(env, txn, version, identifier) 113 latest_db = latestdb.ext_create(env, txn, conditions, 114 latest_subscription) 115 timeseries_db = timeseriesdb.ext_create( 116 env, txn, conditions, timeseries_partitions, 117 timeseries_max_results, timeseries_event_type_cache_size) 118 ref_db = refdb.RefDb(env) 119 120 return Databases(system=system_db, 121 latest=latest_db, 122 timeseries=timeseries_db, 123 ref=ref_db) 124 125 126class LmdbBackend(common.Backend): 127 128 @property 129 def async_group(self) -> aio.Group: 130 return self._async_group 131 132 async def get_last_event_id(self, 133 server_id: int 134 ) -> common.EventId: 135 if not self.is_open: 136 raise common.BackendClosedError() 137 138 return self._dbs.system.get_last_event_id(server_id) 139 140 async def register(self, 141 events: Collection[common.Event] 142 ) -> Collection[common.Event] | None: 143 if not self.is_open: 144 raise common.BackendClosedError() 145 146 for event in events: 147 server_id = event.id.server 148 149 last_event_id = self._dbs.system.get_last_event_id(server_id) 150 last_timestamp = self._dbs.system.get_last_timestamp(server_id) 151 152 if last_event_id >= event.id: 153 mlog.warning("event registration skipped: invalid event id") 154 continue 155 156 if last_timestamp > event.timestamp: 157 mlog.warning("event registration skipped: invalid timestamp") 158 continue 159 160 if not self._conditions.matches(event): 161 mlog.warning( 162 "event %s registration skipped: invalid conditions", 163 event.type) 164 continue 165 166 refs = collections.deque() 167 168 latest_result = self._dbs.latest.add(event) 169 170 if latest_result.added_ref: 171 refs.append(latest_result.added_ref) 172 173 if latest_result.removed_ref: 174 self._dbs.ref.remove(*latest_result.removed_ref) 175 176 refs.extend(self._dbs.timeseries.add(event)) 177 178 if not refs: 179 continue 180 181 self._dbs.ref.add(event, refs) 182 self._dbs.system.set_last_event_id(event.id) 183 self._dbs.system.set_last_timestamp(server_id, event.timestamp) 184 185 self._registered_queue.append(events) 186 self._registered_count += len(events) 187 188 if self._registered_count > max_registered_count: 189 await self._flush_queue.put(None) 190 191 if self._registered_events_cb: 192 await aio.call(self._registered_events_cb, events) 193 194 return events 195 196 async def query(self, 197 params: common.QueryParams 198 ) -> common.QueryResult: 199 if not self.is_open: 200 raise common.BackendClosedError() 201 202 if isinstance(params, common.QueryLatestParams): 203 return self._dbs.latest.query(params) 204 205 if isinstance(params, common.QueryTimeseriesParams): 206 return await self._dbs.timeseries.query(params) 207 208 if isinstance(params, common.QueryServerParams): 209 return await self._dbs.ref.query(params) 210 211 raise ValueError('unsupported params type') 212 213 async def flush(self): 214 try: 215 future = self._loop.create_future() 216 await self._flush_queue.put(future) 217 await future 218 219 except aio.QueueClosedError: 220 raise common.BackendClosedError() 221 222 async def _flush_loop(self, flush_period): 223 futures = collections.deque() 224 225 async def cleanup(): 226 with contextlib.suppress(Exception): 227 await self._flush() 228 229 await self._env.async_close() 230 231 try: 232 while True: 233 try: 234 future = await aio.wait_for(self._flush_queue.get(), 235 flush_period) 236 futures.append(future) 237 238 except asyncio.TimeoutError: 239 pass 240 241 except aio.CancelledWithResultError as e: 242 if e.result: 243 futures.append(e.result) 244 245 raise 246 247 while not self._flush_queue.empty(): 248 futures.append(self._flush_queue.get_nowait()) 249 250 await aio.uncancellable(self._flush()) 251 252 while futures: 253 future = futures.popleft() 254 if future and not future.done(): 255 future.set_result(None) 256 257 except Exception as e: 258 mlog.error('backend flush error: %s', e, exc_info=e) 259 260 finally: 261 self.close() 262 self._flush_queue.close() 263 264 while not self._flush_queue.empty(): 265 futures.append(self._flush_queue.get_nowait()) 266 267 for future in futures: 268 if future and not future.done(): 269 future.set_exception(common.BackendClosedError()) 270 271 await aio.uncancellable(cleanup()) 272 273 async def _cleanup_loop(self, cleanup_period): 274 try: 275 while True: 276 await asyncio.sleep(0) 277 278 repeat = await self._env.execute(_ext_cleanup, self._env, 279 self._dbs, common.now()) 280 if repeat: 281 continue 282 283 await asyncio.sleep(cleanup_period) 284 285 except Exception as e: 286 mlog.error('backend cleanup error: %s', e, exc_info=e) 287 288 finally: 289 self.close() 290 291 async def _flush(self): 292 if not self._env.is_open: 293 return 294 295 self._registered_count = 0 296 registered_queue, self._registered_queue = (self._registered_queue, 297 collections.deque()) 298 299 changes = Changes(system=self._dbs.system.create_changes(), 300 latest=self._dbs.latest.create_changes(), 301 timeseries=self._dbs.timeseries.create_changes(), 302 ref=self._dbs.ref.create_changes()) 303 304 # TODO lock period between create_changes and locking executor 305 # (timeseries and ref must write changes before new queries are 306 # allowed) 307 308 await self._env.execute(_ext_flush, self._env, self._dbs, changes) 309 310 if not self._flushed_events_cb: 311 return 312 313 while registered_queue: 314 events = registered_queue.popleft() 315 await aio.call(self._flushed_events_cb, events) 316 317 318def _ext_flush(env, dbs, changes): 319 with env.ext_begin(write=True) as txn: 320 dbs.system.ext_write(txn, changes.system) 321 dbs.latest.ext_write(txn, changes.latest) 322 dbs.timeseries.ext_write(txn, changes.timeseries) 323 dbs.ref.ext_write(txn, changes.ref) 324 325 326def _ext_cleanup(env, dbs, now): 327 with env.ext_begin(write=True) as txn: 328 result = dbs.timeseries.ext_cleanup(txn, now, cleanup_max_results) 329 if not result: 330 return False 331 332 dbs.ref.ext_cleanup(txn, result) 333 334 return len(result) >= cleanup_max_results
mlog =
<Logger hat.event.backends.lmdb.backend (WARNING)>
cleanup_max_results =
1024
flush_queue_size =
4096
max_registered_count =
262144
default_timeseries_max_results =
4096
default_timeseries_event_type_cache_size =
262144
version =
'0.9'
class
Databases(typing.NamedTuple):
37class Databases(typing.NamedTuple): 38 system: systemdb.SystemDb 39 latest: latestdb.LatestDb 40 timeseries: timeseriesdb.TimeseriesDb 41 ref: refdb.RefDb
Databases(system, latest, timeseries, ref)
Databases( system: hat.event.backends.lmdb.systemdb.SystemDb, latest: hat.event.backends.lmdb.latestdb.LatestDb, timeseries: hat.event.backends.lmdb.timeseriesdb.TimeseriesDb, ref: hat.event.backends.lmdb.refdb.RefDb)
Create new instance of Databases(system, latest, timeseries, ref)
class
Changes(typing.NamedTuple):
44class Changes(typing.NamedTuple): 45 system: systemdb.Changes 46 latest: latestdb.Changes 47 timeseries: timeseriesdb.Changes 48 ref: refdb.Changes
Changes(system, latest, timeseries, ref)
Changes( system: hat.event.backends.lmdb.systemdb.Changes, latest: hat.event.backends.lmdb.latestdb.Changes, timeseries: dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]], ref: dict[int, hat.event.backends.lmdb.refdb.ServerChanges])
Create new instance of Changes(system, latest, timeseries, ref)
timeseries: dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]]
Alias for field number 2
async def
create( conf: None | bool | int | float | str | List[ForwardRef('Data')] | Dict[str, ForwardRef('Data')], registered_events_cb: Callable[[Collection[hat.event.common.Event]], None | Awaitable[None]] | None, flushed_events_cb: Callable[[Collection[hat.event.common.Event]], None | Awaitable[None]] | None) -> LmdbBackend:
51async def create(conf: json.Data, 52 registered_events_cb: common.BackendRegisteredEventsCb | None, 53 flushed_events_cb: common.BackendFlushedEventsCb | None 54 ) -> 'LmdbBackend': 55 backend = LmdbBackend() 56 backend._registered_events_cb = registered_events_cb 57 backend._flushed_events_cb = flushed_events_cb 58 backend._conditions = Conditions(conf['conditions']) 59 backend._loop = asyncio.get_running_loop() 60 backend._flush_queue = aio.Queue(flush_queue_size) 61 backend._registered_count = 0 62 backend._registered_queue = collections.deque() 63 backend._async_group = aio.Group() 64 65 backend._env = await environment.create(Path(conf['db_path'])) 66 backend.async_group.spawn(aio.call_on_done, backend._env.wait_closing(), 67 backend.close) 68 69 try: 70 latest_subscription = common.create_subscription( 71 tuple(i) for i in conf['latest']['subscriptions']) 72 73 timeseries_partitions = ( 74 timeseriesdb.Partition( 75 order_by=common.OrderBy[i['order_by']], 76 subscription=common.create_subscription( 77 tuple(event_type) for event_type in i['subscriptions']), 78 limit=( 79 timeseriesdb.Limit( 80 min_entries=i['limit'].get('min_entries'), 81 max_entries=i['limit'].get('max_entries'), 82 duration=i['limit'].get('duration'), 83 size=i['limit'].get('size')) 84 if 'limit' in i else None)) 85 for i in conf['timeseries']) 86 87 timeseries_max_results = conf.get('timeseries_max_results', 88 default_timeseries_max_results) 89 timeseries_event_type_cache_size = conf.get( 90 'timeseries_event_type_cache_size', 91 default_timeseries_event_type_cache_size) 92 93 backend._dbs = await backend._env.execute( 94 _ext_create_dbs, backend._env, conf['identifier'], 95 backend._conditions, latest_subscription, timeseries_partitions, 96 timeseries_max_results, timeseries_event_type_cache_size) 97 98 backend.async_group.spawn(backend._flush_loop, conf['flush_period']) 99 backend.async_group.spawn(backend._cleanup_loop, 100 conf['cleanup_period']) 101 102 except BaseException: 103 await aio.uncancellable(backend._env.async_close()) 104 raise 105 106 return backend
class
LmdbBackend(hat.event.common.backend.Backend):
127class LmdbBackend(common.Backend): 128 129 @property 130 def async_group(self) -> aio.Group: 131 return self._async_group 132 133 async def get_last_event_id(self, 134 server_id: int 135 ) -> common.EventId: 136 if not self.is_open: 137 raise common.BackendClosedError() 138 139 return self._dbs.system.get_last_event_id(server_id) 140 141 async def register(self, 142 events: Collection[common.Event] 143 ) -> Collection[common.Event] | None: 144 if not self.is_open: 145 raise common.BackendClosedError() 146 147 for event in events: 148 server_id = event.id.server 149 150 last_event_id = self._dbs.system.get_last_event_id(server_id) 151 last_timestamp = self._dbs.system.get_last_timestamp(server_id) 152 153 if last_event_id >= event.id: 154 mlog.warning("event registration skipped: invalid event id") 155 continue 156 157 if last_timestamp > event.timestamp: 158 mlog.warning("event registration skipped: invalid timestamp") 159 continue 160 161 if not self._conditions.matches(event): 162 mlog.warning( 163 "event %s registration skipped: invalid conditions", 164 event.type) 165 continue 166 167 refs = collections.deque() 168 169 latest_result = self._dbs.latest.add(event) 170 171 if latest_result.added_ref: 172 refs.append(latest_result.added_ref) 173 174 if latest_result.removed_ref: 175 self._dbs.ref.remove(*latest_result.removed_ref) 176 177 refs.extend(self._dbs.timeseries.add(event)) 178 179 if not refs: 180 continue 181 182 self._dbs.ref.add(event, refs) 183 self._dbs.system.set_last_event_id(event.id) 184 self._dbs.system.set_last_timestamp(server_id, event.timestamp) 185 186 self._registered_queue.append(events) 187 self._registered_count += len(events) 188 189 if self._registered_count > max_registered_count: 190 await self._flush_queue.put(None) 191 192 if self._registered_events_cb: 193 await aio.call(self._registered_events_cb, events) 194 195 return events 196 197 async def query(self, 198 params: common.QueryParams 199 ) -> common.QueryResult: 200 if not self.is_open: 201 raise common.BackendClosedError() 202 203 if isinstance(params, common.QueryLatestParams): 204 return self._dbs.latest.query(params) 205 206 if isinstance(params, common.QueryTimeseriesParams): 207 return await self._dbs.timeseries.query(params) 208 209 if isinstance(params, common.QueryServerParams): 210 return await self._dbs.ref.query(params) 211 212 raise ValueError('unsupported params type') 213 214 async def flush(self): 215 try: 216 future = self._loop.create_future() 217 await self._flush_queue.put(future) 218 await future 219 220 except aio.QueueClosedError: 221 raise common.BackendClosedError() 222 223 async def _flush_loop(self, flush_period): 224 futures = collections.deque() 225 226 async def cleanup(): 227 with contextlib.suppress(Exception): 228 await self._flush() 229 230 await self._env.async_close() 231 232 try: 233 while True: 234 try: 235 future = await aio.wait_for(self._flush_queue.get(), 236 flush_period) 237 futures.append(future) 238 239 except asyncio.TimeoutError: 240 pass 241 242 except aio.CancelledWithResultError as e: 243 if e.result: 244 futures.append(e.result) 245 246 raise 247 248 while not self._flush_queue.empty(): 249 futures.append(self._flush_queue.get_nowait()) 250 251 await aio.uncancellable(self._flush()) 252 253 while futures: 254 future = futures.popleft() 255 if future and not future.done(): 256 future.set_result(None) 257 258 except Exception as e: 259 mlog.error('backend flush error: %s', e, exc_info=e) 260 261 finally: 262 self.close() 263 self._flush_queue.close() 264 265 while not self._flush_queue.empty(): 266 futures.append(self._flush_queue.get_nowait()) 267 268 for future in futures: 269 if future and not future.done(): 270 future.set_exception(common.BackendClosedError()) 271 272 await aio.uncancellable(cleanup()) 273 274 async def _cleanup_loop(self, cleanup_period): 275 try: 276 while True: 277 await asyncio.sleep(0) 278 279 repeat = await self._env.execute(_ext_cleanup, self._env, 280 self._dbs, common.now()) 281 if repeat: 282 continue 283 284 await asyncio.sleep(cleanup_period) 285 286 except Exception as e: 287 mlog.error('backend cleanup error: %s', e, exc_info=e) 288 289 finally: 290 self.close() 291 292 async def _flush(self): 293 if not self._env.is_open: 294 return 295 296 self._registered_count = 0 297 registered_queue, self._registered_queue = (self._registered_queue, 298 collections.deque()) 299 300 changes = Changes(system=self._dbs.system.create_changes(), 301 latest=self._dbs.latest.create_changes(), 302 timeseries=self._dbs.timeseries.create_changes(), 303 ref=self._dbs.ref.create_changes()) 304 305 # TODO lock period between create_changes and locking executor 306 # (timeseries and ref must write changes before new queries are 307 # allowed) 308 309 await self._env.execute(_ext_flush, self._env, self._dbs, changes) 310 311 if not self._flushed_events_cb: 312 return 313 314 while registered_queue: 315 events = registered_queue.popleft() 316 await aio.call(self._flushed_events_cb, events)
Backend ABC
133 async def get_last_event_id(self, 134 server_id: int 135 ) -> common.EventId: 136 if not self.is_open: 137 raise common.BackendClosedError() 138 139 return self._dbs.system.get_last_event_id(server_id)
Get last registered event id associated with server id
async def
register( self, events: Collection[hat.event.common.Event]) -> Collection[hat.event.common.Event] | None:
141 async def register(self, 142 events: Collection[common.Event] 143 ) -> Collection[common.Event] | None: 144 if not self.is_open: 145 raise common.BackendClosedError() 146 147 for event in events: 148 server_id = event.id.server 149 150 last_event_id = self._dbs.system.get_last_event_id(server_id) 151 last_timestamp = self._dbs.system.get_last_timestamp(server_id) 152 153 if last_event_id >= event.id: 154 mlog.warning("event registration skipped: invalid event id") 155 continue 156 157 if last_timestamp > event.timestamp: 158 mlog.warning("event registration skipped: invalid timestamp") 159 continue 160 161 if not self._conditions.matches(event): 162 mlog.warning( 163 "event %s registration skipped: invalid conditions", 164 event.type) 165 continue 166 167 refs = collections.deque() 168 169 latest_result = self._dbs.latest.add(event) 170 171 if latest_result.added_ref: 172 refs.append(latest_result.added_ref) 173 174 if latest_result.removed_ref: 175 self._dbs.ref.remove(*latest_result.removed_ref) 176 177 refs.extend(self._dbs.timeseries.add(event)) 178 179 if not refs: 180 continue 181 182 self._dbs.ref.add(event, refs) 183 self._dbs.system.set_last_event_id(event.id) 184 self._dbs.system.set_last_timestamp(server_id, event.timestamp) 185 186 self._registered_queue.append(events) 187 self._registered_count += len(events) 188 189 if self._registered_count > max_registered_count: 190 await self._flush_queue.put(None) 191 192 if self._registered_events_cb: 193 await aio.call(self._registered_events_cb, events) 194 195 return events
Register events
async def
query( self, params: hat.event.common.QueryLatestParams | hat.event.common.QueryTimeseriesParams | hat.event.common.QueryServerParams) -> hat.event.common.QueryResult:
197 async def query(self, 198 params: common.QueryParams 199 ) -> common.QueryResult: 200 if not self.is_open: 201 raise common.BackendClosedError() 202 203 if isinstance(params, common.QueryLatestParams): 204 return self._dbs.latest.query(params) 205 206 if isinstance(params, common.QueryTimeseriesParams): 207 return await self._dbs.timeseries.query(params) 208 209 if isinstance(params, common.QueryServerParams): 210 return await self._dbs.ref.query(params) 211 212 raise ValueError('unsupported params type')
Query events