hat.event.backends.lmdb.backend
1from collections.abc import Collection 2from pathlib import Path 3import asyncio 4import collections 5import contextlib 6import logging 7import typing 8 9from hat import aio 10from hat import json 11 12from hat.event.backends.lmdb import common 13from hat.event.backends.lmdb import environment 14from hat.event.backends.lmdb import latestdb 15from hat.event.backends.lmdb import refdb 16from hat.event.backends.lmdb import systemdb 17from hat.event.backends.lmdb import timeseriesdb 18from hat.event.backends.lmdb.conditions import Conditions 19 20 21mlog = logging.getLogger(__name__) 22 23cleanup_max_results = 1024 24 25flush_queue_size = 4096 26 27max_registered_count = 1024 * 256 28 29version = '0.9' 30 31 32class Databases(typing.NamedTuple): 33 system: systemdb.SystemDb 34 latest: latestdb.LatestDb 35 timeseries: timeseriesdb.TimeseriesDb 36 ref: refdb.RefDb 37 38 39class Changes(typing.NamedTuple): 40 system: systemdb.Changes 41 latest: latestdb.Changes 42 timeseries: timeseriesdb.Changes 43 ref: refdb.Changes 44 45 46async def create(conf: json.Data, 47 registered_events_cb: common.BackendRegisteredEventsCb | None, 48 flushed_events_cb: common.BackendFlushedEventsCb | None 49 ) -> 'LmdbBackend': 50 backend = LmdbBackend() 51 backend._registered_events_cb = registered_events_cb 52 backend._flushed_events_cb = flushed_events_cb 53 backend._conditions = Conditions(conf['conditions']) 54 backend._loop = asyncio.get_running_loop() 55 backend._flush_queue = aio.Queue(flush_queue_size) 56 backend._registered_count = 0 57 backend._registered_queue = collections.deque() 58 backend._async_group = aio.Group() 59 60 backend._env = await environment.create(Path(conf['db_path'])) 61 backend.async_group.spawn(aio.call_on_done, backend._env.wait_closing(), 62 backend.close) 63 64 try: 65 latest_subscription = common.create_subscription( 66 tuple(i) for i in conf['latest']['subscriptions']) 67 68 timeseries_partitions = ( 69 timeseriesdb.Partition( 70 order_by=common.OrderBy[i['order_by']], 71 subscription=common.create_subscription( 72 tuple(event_type) for event_type in i['subscriptions']), 73 limit=( 74 timeseriesdb.Limit( 75 min_entries=i['limit'].get('min_entries'), 76 max_entries=i['limit'].get('max_entries'), 77 duration=i['limit'].get('duration'), 78 size=i['limit'].get('size')) 79 if 'limit' in i else None)) 80 for i in conf['timeseries']) 81 82 backend._dbs = await backend._env.execute( 83 _ext_create_dbs, backend._env, conf['identifier'], 84 backend._conditions, latest_subscription, timeseries_partitions) 85 86 backend.async_group.spawn(backend._flush_loop, conf['flush_period']) 87 backend.async_group.spawn(backend._cleanup_loop, 88 conf['cleanup_period']) 89 90 except BaseException: 91 await aio.uncancellable(backend._env.async_close()) 92 raise 93 94 return backend 95 96 97def _ext_create_dbs(env, identifier, conditions, latest_subscription, 98 timeseries_partitions): 99 with env.ext_begin(write=True) as txn: 100 system_db = systemdb.ext_create(env, txn, version, identifier) 101 latest_db = latestdb.ext_create(env, txn, conditions, 102 latest_subscription) 103 timeseries_db = timeseriesdb.ext_create(env, txn, conditions, 104 timeseries_partitions) 105 ref_db = refdb.RefDb(env) 106 107 return Databases(system=system_db, 108 latest=latest_db, 109 timeseries=timeseries_db, 110 ref=ref_db) 111 112 113class LmdbBackend(common.Backend): 114 115 @property 116 def async_group(self) -> aio.Group: 117 return self._async_group 118 119 async def get_last_event_id(self, 120 server_id: int 121 ) -> common.EventId: 122 if not self.is_open: 123 raise common.BackendClosedError() 124 125 return self._dbs.system.get_last_event_id(server_id) 126 127 async def register(self, 128 events: Collection[common.Event] 129 ) -> Collection[common.Event] | None: 130 if not self.is_open: 131 raise common.BackendClosedError() 132 133 for event in events: 134 server_id = event.id.server 135 136 last_event_id = self._dbs.system.get_last_event_id(server_id) 137 last_timestamp = self._dbs.system.get_last_timestamp(server_id) 138 139 if last_event_id >= event.id: 140 mlog.warning("event registration skipped: invalid event id") 141 continue 142 143 if last_timestamp > event.timestamp: 144 mlog.warning("event registration skipped: invalid timestamp") 145 continue 146 147 if not self._conditions.matches(event): 148 mlog.warning("event registration skipped: invalid conditions") 149 continue 150 151 refs = collections.deque() 152 153 latest_result = self._dbs.latest.add(event) 154 155 if latest_result.added_ref: 156 refs.append(latest_result.added_ref) 157 158 if latest_result.removed_ref: 159 self._dbs.ref.remove(*latest_result.removed_ref) 160 161 refs.extend(self._dbs.timeseries.add(event)) 162 163 if not refs: 164 continue 165 166 self._dbs.ref.add(event, refs) 167 self._dbs.system.set_last_event_id(event.id) 168 self._dbs.system.set_last_timestamp(server_id, event.timestamp) 169 170 self._registered_queue.append(events) 171 self._registered_count += len(events) 172 173 if self._registered_count > max_registered_count: 174 await self._flush_queue.put(self._loop.create_future()) 175 176 if self._registered_events_cb: 177 await aio.call(self._registered_events_cb, events) 178 179 return events 180 181 async def query(self, 182 params: common.QueryParams 183 ) -> common.QueryResult: 184 if not self.is_open: 185 raise common.BackendClosedError() 186 187 if isinstance(params, common.QueryLatestParams): 188 return self._dbs.latest.query(params) 189 190 if isinstance(params, common.QueryTimeseriesParams): 191 return await self._dbs.timeseries.query(params) 192 193 if isinstance(params, common.QueryServerParams): 194 return await self._dbs.ref.query(params) 195 196 raise ValueError('unsupported params type') 197 198 async def flush(self): 199 try: 200 future = self._loop.create_future() 201 await self._flush_queue.put(future) 202 await future 203 204 except aio.QueueClosedError: 205 raise common.BackendClosedError() 206 207 async def _flush_loop(self, flush_period): 208 futures = collections.deque() 209 210 async def cleanup(): 211 with contextlib.suppress(Exception): 212 await self._flush() 213 214 await self._env.async_close() 215 216 try: 217 while True: 218 try: 219 future = await aio.wait_for(self._flush_queue.get(), 220 flush_period) 221 futures.append(future) 222 223 except asyncio.TimeoutError: 224 pass 225 226 except aio.CancelledWithResultError as e: 227 if e.result: 228 futures.append(e.result) 229 230 raise 231 232 while not self._flush_queue.empty(): 233 futures.append(self._flush_queue.get_nowait()) 234 235 await aio.uncancellable(self._flush()) 236 237 while futures: 238 future = futures.popleft() 239 if not future.done(): 240 future.set_result(None) 241 242 except Exception as e: 243 mlog.error('backend flush error: %s', e, exc_info=e) 244 245 finally: 246 self.close() 247 self._flush_queue.close() 248 249 while not self._flush_queue.empty(): 250 futures.append(self._flush_queue.get_nowait()) 251 252 for future in futures: 253 if not future.done(): 254 future.set_exception(common.BackendClosedError()) 255 256 await aio.uncancellable(cleanup()) 257 258 async def _cleanup_loop(self, cleanup_period): 259 try: 260 while True: 261 await asyncio.sleep(0) 262 263 repeat = await self._env.execute(_ext_cleanup, self._env, 264 self._dbs, common.now()) 265 if repeat: 266 continue 267 268 await asyncio.sleep(cleanup_period) 269 270 except Exception as e: 271 mlog.error('backend cleanup error: %s', e, exc_info=e) 272 273 finally: 274 self.close() 275 276 async def _flush(self): 277 if not self._env.is_open: 278 return 279 280 self._registered_count = 0 281 registered_queue, self._registered_queue = (self._registered_queue, 282 collections.deque()) 283 284 changes = Changes(system=self._dbs.system.create_changes(), 285 latest=self._dbs.latest.create_changes(), 286 timeseries=self._dbs.timeseries.create_changes(), 287 ref=self._dbs.ref.create_changes()) 288 289 # TODO lock period between create_changes and locking executor 290 # (timeseries and ref must write changes before new queries are 291 # allowed) 292 293 await self._env.execute(_ext_flush, self._env, self._dbs, changes) 294 295 if not self._flushed_events_cb: 296 return 297 298 while registered_queue: 299 events = registered_queue.popleft() 300 await aio.call(self._flushed_events_cb, events) 301 302 303def _ext_flush(env, dbs, changes): 304 with env.ext_begin(write=True) as txn: 305 dbs.system.ext_write(txn, changes.system) 306 dbs.latest.ext_write(txn, changes.latest) 307 dbs.timeseries.ext_write(txn, changes.timeseries) 308 dbs.ref.ext_write(txn, changes.ref) 309 310 311def _ext_cleanup(env, dbs, now): 312 with env.ext_begin(write=True) as txn: 313 result = dbs.timeseries.ext_cleanup(txn, now, cleanup_max_results) 314 if not result: 315 return False 316 317 dbs.ref.ext_cleanup(txn, result) 318 319 return len(result) >= cleanup_max_results
mlog =
<Logger hat.event.backends.lmdb.backend (WARNING)>
cleanup_max_results =
1024
flush_queue_size =
4096
max_registered_count =
262144
version =
'0.9'
class
Databases(typing.NamedTuple):
33class Databases(typing.NamedTuple): 34 system: systemdb.SystemDb 35 latest: latestdb.LatestDb 36 timeseries: timeseriesdb.TimeseriesDb 37 ref: refdb.RefDb
Databases(system, latest, timeseries, ref)
Databases( system: hat.event.backends.lmdb.systemdb.SystemDb, latest: hat.event.backends.lmdb.latestdb.LatestDb, timeseries: hat.event.backends.lmdb.timeseriesdb.TimeseriesDb, ref: hat.event.backends.lmdb.refdb.RefDb)
Create new instance of Databases(system, latest, timeseries, ref)
class
Changes(typing.NamedTuple):
40class Changes(typing.NamedTuple): 41 system: systemdb.Changes 42 latest: latestdb.Changes 43 timeseries: timeseriesdb.Changes 44 ref: refdb.Changes
Changes(system, latest, timeseries, ref)
Changes( system: hat.event.backends.lmdb.systemdb.Changes, latest: hat.event.backends.lmdb.latestdb.Changes, timeseries: dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]], ref: dict[int, hat.event.backends.lmdb.refdb.ServerChanges])
Create new instance of Changes(system, latest, timeseries, ref)
timeseries: dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]]
Alias for field number 2
async def
create( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], registered_events_cb: Optional[Callable[[Collection[hat.event.common.Event]], Optional[Awaitable[NoneType]]]], flushed_events_cb: Optional[Callable[[Collection[hat.event.common.Event]], Optional[Awaitable[NoneType]]]]) -> LmdbBackend:
47async def create(conf: json.Data, 48 registered_events_cb: common.BackendRegisteredEventsCb | None, 49 flushed_events_cb: common.BackendFlushedEventsCb | None 50 ) -> 'LmdbBackend': 51 backend = LmdbBackend() 52 backend._registered_events_cb = registered_events_cb 53 backend._flushed_events_cb = flushed_events_cb 54 backend._conditions = Conditions(conf['conditions']) 55 backend._loop = asyncio.get_running_loop() 56 backend._flush_queue = aio.Queue(flush_queue_size) 57 backend._registered_count = 0 58 backend._registered_queue = collections.deque() 59 backend._async_group = aio.Group() 60 61 backend._env = await environment.create(Path(conf['db_path'])) 62 backend.async_group.spawn(aio.call_on_done, backend._env.wait_closing(), 63 backend.close) 64 65 try: 66 latest_subscription = common.create_subscription( 67 tuple(i) for i in conf['latest']['subscriptions']) 68 69 timeseries_partitions = ( 70 timeseriesdb.Partition( 71 order_by=common.OrderBy[i['order_by']], 72 subscription=common.create_subscription( 73 tuple(event_type) for event_type in i['subscriptions']), 74 limit=( 75 timeseriesdb.Limit( 76 min_entries=i['limit'].get('min_entries'), 77 max_entries=i['limit'].get('max_entries'), 78 duration=i['limit'].get('duration'), 79 size=i['limit'].get('size')) 80 if 'limit' in i else None)) 81 for i in conf['timeseries']) 82 83 backend._dbs = await backend._env.execute( 84 _ext_create_dbs, backend._env, conf['identifier'], 85 backend._conditions, latest_subscription, timeseries_partitions) 86 87 backend.async_group.spawn(backend._flush_loop, conf['flush_period']) 88 backend.async_group.spawn(backend._cleanup_loop, 89 conf['cleanup_period']) 90 91 except BaseException: 92 await aio.uncancellable(backend._env.async_close()) 93 raise 94 95 return backend
class
LmdbBackend(hat.event.common.backend.Backend):
114class LmdbBackend(common.Backend): 115 116 @property 117 def async_group(self) -> aio.Group: 118 return self._async_group 119 120 async def get_last_event_id(self, 121 server_id: int 122 ) -> common.EventId: 123 if not self.is_open: 124 raise common.BackendClosedError() 125 126 return self._dbs.system.get_last_event_id(server_id) 127 128 async def register(self, 129 events: Collection[common.Event] 130 ) -> Collection[common.Event] | None: 131 if not self.is_open: 132 raise common.BackendClosedError() 133 134 for event in events: 135 server_id = event.id.server 136 137 last_event_id = self._dbs.system.get_last_event_id(server_id) 138 last_timestamp = self._dbs.system.get_last_timestamp(server_id) 139 140 if last_event_id >= event.id: 141 mlog.warning("event registration skipped: invalid event id") 142 continue 143 144 if last_timestamp > event.timestamp: 145 mlog.warning("event registration skipped: invalid timestamp") 146 continue 147 148 if not self._conditions.matches(event): 149 mlog.warning("event registration skipped: invalid conditions") 150 continue 151 152 refs = collections.deque() 153 154 latest_result = self._dbs.latest.add(event) 155 156 if latest_result.added_ref: 157 refs.append(latest_result.added_ref) 158 159 if latest_result.removed_ref: 160 self._dbs.ref.remove(*latest_result.removed_ref) 161 162 refs.extend(self._dbs.timeseries.add(event)) 163 164 if not refs: 165 continue 166 167 self._dbs.ref.add(event, refs) 168 self._dbs.system.set_last_event_id(event.id) 169 self._dbs.system.set_last_timestamp(server_id, event.timestamp) 170 171 self._registered_queue.append(events) 172 self._registered_count += len(events) 173 174 if self._registered_count > max_registered_count: 175 await self._flush_queue.put(self._loop.create_future()) 176 177 if self._registered_events_cb: 178 await aio.call(self._registered_events_cb, events) 179 180 return events 181 182 async def query(self, 183 params: common.QueryParams 184 ) -> common.QueryResult: 185 if not self.is_open: 186 raise common.BackendClosedError() 187 188 if isinstance(params, common.QueryLatestParams): 189 return self._dbs.latest.query(params) 190 191 if isinstance(params, common.QueryTimeseriesParams): 192 return await self._dbs.timeseries.query(params) 193 194 if isinstance(params, common.QueryServerParams): 195 return await self._dbs.ref.query(params) 196 197 raise ValueError('unsupported params type') 198 199 async def flush(self): 200 try: 201 future = self._loop.create_future() 202 await self._flush_queue.put(future) 203 await future 204 205 except aio.QueueClosedError: 206 raise common.BackendClosedError() 207 208 async def _flush_loop(self, flush_period): 209 futures = collections.deque() 210 211 async def cleanup(): 212 with contextlib.suppress(Exception): 213 await self._flush() 214 215 await self._env.async_close() 216 217 try: 218 while True: 219 try: 220 future = await aio.wait_for(self._flush_queue.get(), 221 flush_period) 222 futures.append(future) 223 224 except asyncio.TimeoutError: 225 pass 226 227 except aio.CancelledWithResultError as e: 228 if e.result: 229 futures.append(e.result) 230 231 raise 232 233 while not self._flush_queue.empty(): 234 futures.append(self._flush_queue.get_nowait()) 235 236 await aio.uncancellable(self._flush()) 237 238 while futures: 239 future = futures.popleft() 240 if not future.done(): 241 future.set_result(None) 242 243 except Exception as e: 244 mlog.error('backend flush error: %s', e, exc_info=e) 245 246 finally: 247 self.close() 248 self._flush_queue.close() 249 250 while not self._flush_queue.empty(): 251 futures.append(self._flush_queue.get_nowait()) 252 253 for future in futures: 254 if not future.done(): 255 future.set_exception(common.BackendClosedError()) 256 257 await aio.uncancellable(cleanup()) 258 259 async def _cleanup_loop(self, cleanup_period): 260 try: 261 while True: 262 await asyncio.sleep(0) 263 264 repeat = await self._env.execute(_ext_cleanup, self._env, 265 self._dbs, common.now()) 266 if repeat: 267 continue 268 269 await asyncio.sleep(cleanup_period) 270 271 except Exception as e: 272 mlog.error('backend cleanup error: %s', e, exc_info=e) 273 274 finally: 275 self.close() 276 277 async def _flush(self): 278 if not self._env.is_open: 279 return 280 281 self._registered_count = 0 282 registered_queue, self._registered_queue = (self._registered_queue, 283 collections.deque()) 284 285 changes = Changes(system=self._dbs.system.create_changes(), 286 latest=self._dbs.latest.create_changes(), 287 timeseries=self._dbs.timeseries.create_changes(), 288 ref=self._dbs.ref.create_changes()) 289 290 # TODO lock period between create_changes and locking executor 291 # (timeseries and ref must write changes before new queries are 292 # allowed) 293 294 await self._env.execute(_ext_flush, self._env, self._dbs, changes) 295 296 if not self._flushed_events_cb: 297 return 298 299 while registered_queue: 300 events = registered_queue.popleft() 301 await aio.call(self._flushed_events_cb, events)
Backend ABC
120 async def get_last_event_id(self, 121 server_id: int 122 ) -> common.EventId: 123 if not self.is_open: 124 raise common.BackendClosedError() 125 126 return self._dbs.system.get_last_event_id(server_id)
Get last registered event id associated with server id
async def
register( self, events: Collection[hat.event.common.Event]) -> Collection[hat.event.common.Event] | None:
128 async def register(self, 129 events: Collection[common.Event] 130 ) -> Collection[common.Event] | None: 131 if not self.is_open: 132 raise common.BackendClosedError() 133 134 for event in events: 135 server_id = event.id.server 136 137 last_event_id = self._dbs.system.get_last_event_id(server_id) 138 last_timestamp = self._dbs.system.get_last_timestamp(server_id) 139 140 if last_event_id >= event.id: 141 mlog.warning("event registration skipped: invalid event id") 142 continue 143 144 if last_timestamp > event.timestamp: 145 mlog.warning("event registration skipped: invalid timestamp") 146 continue 147 148 if not self._conditions.matches(event): 149 mlog.warning("event registration skipped: invalid conditions") 150 continue 151 152 refs = collections.deque() 153 154 latest_result = self._dbs.latest.add(event) 155 156 if latest_result.added_ref: 157 refs.append(latest_result.added_ref) 158 159 if latest_result.removed_ref: 160 self._dbs.ref.remove(*latest_result.removed_ref) 161 162 refs.extend(self._dbs.timeseries.add(event)) 163 164 if not refs: 165 continue 166 167 self._dbs.ref.add(event, refs) 168 self._dbs.system.set_last_event_id(event.id) 169 self._dbs.system.set_last_timestamp(server_id, event.timestamp) 170 171 self._registered_queue.append(events) 172 self._registered_count += len(events) 173 174 if self._registered_count > max_registered_count: 175 await self._flush_queue.put(self._loop.create_future()) 176 177 if self._registered_events_cb: 178 await aio.call(self._registered_events_cb, events) 179 180 return events
Register events
async def
query( self, params: hat.event.common.QueryLatestParams | hat.event.common.QueryTimeseriesParams | hat.event.common.QueryServerParams) -> hat.event.common.QueryResult:
182 async def query(self, 183 params: common.QueryParams 184 ) -> common.QueryResult: 185 if not self.is_open: 186 raise common.BackendClosedError() 187 188 if isinstance(params, common.QueryLatestParams): 189 return self._dbs.latest.query(params) 190 191 if isinstance(params, common.QueryTimeseriesParams): 192 return await self._dbs.timeseries.query(params) 193 194 if isinstance(params, common.QueryServerParams): 195 return await self._dbs.ref.query(params) 196 197 raise ValueError('unsupported params type')
Query events