hat.event.backends.lmdb.timeseriesdb
1from collections.abc import Iterable 2import collections 3import functools 4import itertools 5import typing 6 7import lmdb 8 9from hat.event.backends.lmdb import common 10from hat.event.backends.lmdb import environment 11from hat.event.backends.lmdb.conditions import Conditions 12 13 14Changes: typing.TypeAlias = dict[common.PartitionId, 15 collections.deque[tuple[common.Timestamp, 16 common.Event]]] 17 18 19class Limit(typing.NamedTuple): 20 min_entries: int | None = None 21 max_entries: int | None = None 22 duration: float | None = None 23 size: int | None = None 24 25 26class Partition(typing.NamedTuple): 27 order_by: common.OrderBy 28 subscription: common.Subscription 29 limit: Limit | None 30 31 32def ext_create(env: environment.Environment, 33 txn: lmdb.Transaction, 34 conditions: Conditions, 35 partitions: Iterable[Partition], 36 max_results: int, 37 event_type_cache_size: int 38 ) -> 'TimeseriesDb': 39 db = TimeseriesDb() 40 db._env = env 41 db._conditions = conditions 42 db._max_results = max_results 43 db._changes = collections.defaultdict(collections.deque) 44 db._event_type_partitions = {} # type: dict[common.EventType, Collection[tuple[common.PartitionId, Partition]]] # NOQA 45 46 # depending on dict order 47 db._partitions = dict(_ext_init_partitions(env, txn, partitions)) 48 49 partitions_collection = common.create_event_type_collection( 50 (partition.subscription, (partition_id, partition)) 51 for partition_id, partition in db._partitions.items()) 52 53 db._get_event_type_partitions = functools.lru_cache( 54 maxsize=event_type_cache_size)( 55 lambda event_type: list(partitions_collection.get(event_type))) 56 57 return db 58 59 60class TimeseriesDb: 61 62 def add(self, 63 event: common.Event 64 ) -> Iterable[common.EventRef]: 65 partitions = self._get_event_type_partitions(event.type) 66 67 for partition_id, partition in partitions: 68 if partition.order_by == common.OrderBy.TIMESTAMP: 69 timestamp = event.timestamp 70 71 elif partition.order_by == common.OrderBy.SOURCE_TIMESTAMP: 72 if event.source_timestamp is None: 73 continue 74 75 timestamp = event.source_timestamp 76 77 else: 78 raise ValueError('unsupported order by') 79 80 self._changes[partition_id].append((timestamp, event)) 81 82 yield common.TimeseriesEventRef( 83 (partition_id, timestamp, event.id)) 84 85 async def query(self, 86 params: common.QueryTimeseriesParams 87 ) -> common.QueryResult: 88 subscription = (common.create_subscription(params.event_types) 89 if params.event_types is not None else None) 90 91 max_results = (params.max_results 92 if params.max_results is not None and 93 params.max_results < self._max_results 94 else self._max_results) 95 96 for partition_id, partition in self._partitions.items(): 97 if partition.order_by != params.order_by: 98 continue 99 100 if (subscription and 101 subscription.isdisjoint(partition.subscription)): 102 continue 103 104 return await self._query_partition(partition_id, params, 105 subscription, max_results) 106 107 return common.QueryResult(events=[], 108 more_follows=False) 109 110 def create_changes(self) -> Changes: 111 changes, self._changes = (self._changes, 112 collections.defaultdict(collections.deque)) 113 return changes 114 115 def ext_write(self, 116 txn: lmdb.Transaction, 117 changes: Changes): 118 data = (((partition_id, timestamp, event.id), event) 119 for partition_id, partition_changes in changes.items() 120 for timestamp, event in partition_changes) 121 self._env.ext_write(txn, common.DbType.TIMESERIES_DATA, data) 122 123 counts = ((partition_id, len(partition_changes)) 124 for partition_id, partition_changes in changes.items()) 125 _ext_inc_partition_count(self._env, txn, counts) 126 127 def ext_cleanup(self, 128 txn: lmdb.Transaction, 129 now: common.Timestamp, 130 max_results: int | None = None, 131 ) -> collections.deque[tuple[common.EventId, 132 common.EventRef]]: 133 result = collections.deque() 134 135 for partition_id, partition in self._partitions.items(): 136 if not partition.limit: 137 continue 138 139 partition_max_results = (max_results - len(result) 140 if max_results is not None else None) 141 if partition_max_results is not None and partition_max_results < 1: 142 break 143 144 result.extend(_ext_cleanup_partition(self._env, txn, now, 145 partition_id, partition.limit, 146 partition_max_results)) 147 148 return result 149 150 async def _query_partition(self, partition_id, params, subscription, 151 max_results): 152 events = collections.deque() 153 changes = self._changes 154 155 filter = _Filter(subscription=subscription, 156 t_from=params.t_from, 157 t_to=params.t_to, 158 source_t_from=params.source_t_from, 159 source_t_to=params.source_t_to, 160 max_results=max_results + 1, 161 last_event_id=params.last_event_id) 162 163 if params.order == common.Order.DESCENDING: 164 events.extend(_query_partition_changes( 165 changes[partition_id], params, filter)) 166 167 if not filter.done: 168 events.extend(await self._env.execute( 169 _ext_query_partition_events, self._env, self._conditions, 170 partition_id, params, filter)) 171 172 elif params.order == common.Order.ASCENDING: 173 events.extend(await self._env.execute( 174 _ext_query_partition_events, self._env, self._conditions, 175 partition_id, params, filter)) 176 177 if not filter.done: 178 events.extend(_query_partition_changes( 179 changes[partition_id], params, filter)) 180 181 else: 182 raise ValueError('unsupported order') 183 184 more_follows = len(events) > max_results 185 while len(events) > max_results: 186 events.pop() 187 188 return common.QueryResult(events=events, 189 more_follows=more_follows) 190 191 192def _ext_init_partitions(env, txn, partitions): 193 194 def data_to_cache_key(data): 195 return (data['order'], 196 tuple(tuple(event_type) 197 for event_type in data['subscriptions'])) 198 199 max_partition_id = 0 200 cache = {} 201 202 for partition_id, partition_data in env.ext_read( 203 txn, common.DbType.TIMESERIES_PARTITION): 204 if partition_id > max_partition_id: 205 max_partition_id = partition_id 206 207 cache_key = data_to_cache_key(partition_data) 208 cache[cache_key] = partition_id 209 210 next_partition_ids = itertools.count(max_partition_id + 1) 211 212 for partition in partitions: 213 event_types = sorted(partition.subscription.get_query_types()) 214 partition_data = {'order': partition.order_by.value, 215 'subscriptions': [list(i) for i in event_types]} 216 217 cache_key = data_to_cache_key(partition_data) 218 partition_id = cache.get(cache_key) 219 220 if partition_id is None: 221 partition_id = next(next_partition_ids) 222 cache[cache_key] = partition_id 223 env.ext_write(txn, common.DbType.TIMESERIES_PARTITION, 224 [(partition_id, partition_data)]) 225 226 yield partition_id, partition 227 228 229def _ext_query_partition_events(env, conditions, partition_id, params, 230 filter): 231 if params.order_by == common.OrderBy.TIMESTAMP: 232 events = _ext_query_partition_events_range( 233 env, partition_id, params.t_from, params.t_to, params.order) 234 235 elif params.order_by == common.OrderBy.SOURCE_TIMESTAMP: 236 events = _ext_query_partition_events_range( 237 env, partition_id, params.source_t_from, params.source_t_to, 238 params.order) 239 240 else: 241 raise ValueError('unsupported order by') 242 243 events = (event for event in events if conditions.matches(event)) 244 events = filter.process(events) 245 return collections.deque(events) 246 247 248def _ext_query_partition_events_range(env, partition_id, t_from, t_to, order): 249 db_def = common.db_defs[common.DbType.TIMESERIES_DATA] 250 251 if not t_from: 252 t_from = common.min_timestamp 253 254 from_key = partition_id, t_from, common.EventId(0, 0, 0) 255 encoded_from_key = db_def.encode_key(from_key) 256 257 if not t_to: 258 t_to = common.max_timestamp 259 260 to_key = (partition_id, 261 t_to, 262 common.EventId((1 << 64) - 1, (1 << 64) - 1, (1 << 64) - 1)) 263 encoded_to_key = db_def.encode_key(to_key) 264 265 with env.ext_begin() as txn: 266 with env.ext_cursor(txn, common.DbType.TIMESERIES_DATA) as cursor: 267 if order == common.Order.DESCENDING: 268 encoded_start_key, encoded_stop_key = (encoded_to_key, 269 encoded_from_key) 270 271 if cursor.set_range(encoded_start_key): 272 more = cursor.prev() 273 else: 274 more = cursor.last() 275 276 while more and encoded_stop_key <= bytes(cursor.key()): 277 yield db_def.decode_value(cursor.value()) 278 more = cursor.prev() 279 280 elif order == common.Order.ASCENDING: 281 encoded_start_key, encoded_stop_key = (encoded_from_key, 282 encoded_to_key) 283 284 more = cursor.set_range(encoded_start_key) 285 286 while more and bytes(cursor.key()) <= encoded_stop_key: 287 yield db_def.decode_value(cursor.value()) 288 more = cursor.next() 289 290 else: 291 raise ValueError('unsupported order') 292 293 294def _ext_get_partition_count(env, txn, partition_id): 295 db_def = common.db_defs[common.DbType.TIMESERIES_COUNT] 296 297 with env.ext_cursor(txn, common.DbType.TIMESERIES_COUNT) as cursor: 298 encoded_key = db_def.encode_key(partition_id) 299 encoded_value = cursor.get(encoded_key) 300 301 return db_def.decode_value(encoded_value) if encoded_value else 0 302 303 304def _ext_set_partition_count(env, txn, partition_id, count): 305 env.ext_write(txn, common.DbType.TIMESERIES_COUNT, [(partition_id, count)]) 306 307 308def _ext_inc_partition_count(env, txn, partition_counts): 309 db_def = common.db_defs[common.DbType.TIMESERIES_COUNT] 310 311 with env.ext_cursor(txn, common.DbType.TIMESERIES_COUNT) as cursor: 312 for partition_id, count in partition_counts: 313 encoded_key = db_def.encode_key(partition_id) 314 encoded_value = cursor.get(encoded_key) 315 316 value = db_def.decode_value(encoded_value) if encoded_value else 0 317 inc_value = value + count 318 319 encoded_value = db_def.encode_value(inc_value) 320 cursor.put(encoded_key, encoded_value) 321 322 323def _ext_cleanup_partition(env, txn, now, partition_id, limit, max_results): 324 db_def = common.db_defs[common.DbType.TIMESERIES_DATA] 325 326 timestamp = common.min_timestamp 327 start_key = (partition_id, 328 timestamp, 329 common.EventId(0, 0, 0)) 330 stop_key = ((partition_id + 1), 331 timestamp, 332 common.EventId(0, 0, 0)) 333 334 encoded_start_key = db_def.encode_key(start_key) 335 encoded_stop_key = db_def.encode_key(stop_key) 336 337 min_entries = limit.min_entries or 0 338 max_entries = None 339 encoded_duration_key = None 340 341 if limit.size is not None: 342 stat = env.ext_stat(txn, common.DbType.TIMESERIES_DATA) 343 344 if stat['entries']: 345 total_size = stat['psize'] * (stat['branch_pages'] + 346 stat['leaf_pages'] + 347 stat['overflow_pages']) 348 entry_size = total_size / stat['entries'] 349 max_entries = int(limit.size / entry_size) 350 351 if limit.max_entries is not None: 352 max_entries = (limit.max_entries if max_entries is None 353 else min(max_entries, limit.max_entries)) 354 355 if limit.duration is not None: 356 duration_key = (partition_id, 357 now.add(-limit.duration), 358 common.EventId(0, 0, 0)) 359 encoded_duration_key = db_def.encode_key(duration_key) 360 361 result_count = 0 362 entries_count = _ext_get_partition_count(env, txn, partition_id) 363 364 with env.ext_cursor(txn, common.DbType.TIMESERIES_DATA) as cursor: 365 more = cursor.set_range(encoded_start_key) 366 while more: 367 if max_results is not None and result_count >= max_results: 368 break 369 370 if entries_count - result_count <= min_entries: 371 break 372 373 encoded_key = bytes(cursor.key()) 374 if encoded_key >= encoded_stop_key: 375 break 376 377 if ((max_entries is None or 378 entries_count - result_count <= max_entries) and 379 (encoded_duration_key is None or 380 encoded_key >= encoded_duration_key)): 381 break 382 383 key = db_def.decode_key(encoded_key) 384 event_id = key[2] 385 386 more = cursor.delete() 387 result_count += 1 388 389 yield event_id, common.TimeseriesEventRef(key) 390 391 if result_count > 0: 392 _ext_set_partition_count(env, txn, partition_id, 393 entries_count - result_count) 394 395 396def _query_partition_changes(changes, params, filter): 397 if params.order == common.Order.DESCENDING: 398 events = (event for _, event in reversed(changes)) 399 400 if (params.order_by == common.OrderBy.TIMESTAMP and 401 params.t_to is not None): 402 events = itertools.dropwhile( 403 lambda i: params.t_to < i.timestamp, 404 events) 405 406 elif (params.order_by == common.OrderBy.SOURCE_TIMESTAMP and 407 params.source_t_to is not None): 408 events = itertools.dropwhile( 409 lambda i: params.source_t_to < i.source_timestamp, 410 events) 411 412 if (params.order_by == common.OrderBy.TIMESTAMP and 413 params.t_from is not None): 414 events = itertools.takewhile( 415 lambda i: params.t_from <= i.timestamp, 416 events) 417 418 elif (params.order_by == common.OrderBy.SOURCE_TIMESTAMP and 419 params.source_t_from is not None): 420 events = itertools.takewhile( 421 lambda i: params.source_t_from <= i.source_timestamp, 422 events) 423 424 elif params.order == common.Order.ASCENDING: 425 events = (event for _, event in changes) 426 427 if (params.order_by == common.OrderBy.TIMESTAMP and 428 params.t_from is not None): 429 events = itertools.dropwhile( 430 lambda i: i.timestamp < params.t_from, 431 events) 432 433 elif (params.order_by == common.OrderBy.SOURCE_TIMESTAMP and 434 params.source_t_from is not None): 435 events = itertools.dropwhile( 436 lambda i: i.source_timestamp < params.source_t_from, 437 events) 438 439 if (params.order_by == common.OrderBy.TIMESTAMP and 440 params.t_to is not None): 441 events = itertools.takewhile( 442 lambda i: i.timestamp <= params.t_to, 443 events) 444 445 elif (params.order_by == common.OrderBy.SOURCE_TIMESTAMP and 446 params.source_t_to is not None): 447 events = itertools.takewhile( 448 lambda i: i.source_timestamp <= params.source_t_to, 449 events) 450 451 else: 452 raise ValueError('unsupported order') 453 454 return filter.process(events) 455 456 457class _Filter: 458 459 def __init__(self, 460 subscription: common.Subscription, 461 t_from: common.Timestamp | None, 462 t_to: common.Timestamp | None, 463 source_t_from: common.Timestamp | None, 464 source_t_to: common.Timestamp | None, 465 max_results: int, 466 last_event_id: common.EventId | None): 467 self._subscription = subscription 468 self._t_from = t_from 469 self._t_to = t_to 470 self._source_t_from = source_t_from 471 self._source_t_to = source_t_to 472 self._max_results = max_results 473 self._last_event_id = last_event_id 474 475 @property 476 def done(self): 477 return self._max_results < 1 478 479 def process(self, events: Iterable[common.Event]): 480 for event in events: 481 if self._max_results < 1: 482 return 483 484 if self._last_event_id: 485 if event.id == self._last_event_id: 486 self._last_event_id = None 487 488 continue 489 490 if self._t_from is not None and event.timestamp < self._t_from: 491 continue 492 493 if self._t_to is not None and self._t_to < event.timestamp: 494 continue 495 496 if self._source_t_from is not None and ( 497 event.source_timestamp is None or 498 event.source_timestamp < self._source_t_from): 499 continue 500 501 if self._source_t_to is not None and ( 502 event.source_timestamp is None or 503 self._source_t_to < event.source_timestamp): 504 continue 505 506 if (self._subscription and 507 not self._subscription.matches(event.type)): 508 continue 509 510 self._max_results -= 1 511 yield event
Changes: TypeAlias =
dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]]
class
Limit(typing.NamedTuple):
20class Limit(typing.NamedTuple): 21 min_entries: int | None = None 22 max_entries: int | None = None 23 duration: float | None = None 24 size: int | None = None
Limit(min_entries, max_entries, duration, size)
class
Partition(typing.NamedTuple):
27class Partition(typing.NamedTuple): 28 order_by: common.OrderBy 29 subscription: common.Subscription 30 limit: Limit | None
Partition(order_by, subscription, limit)
Partition( order_by: hat.event.common.OrderBy, subscription: hat.event.common.Subscription, limit: Limit | None)
Create new instance of Partition(order_by, subscription, limit)
def
ext_create( env: hat.event.backends.lmdb.environment.Environment, txn: Transaction, conditions: hat.event.backends.lmdb.conditions.Conditions, partitions: Iterable[Partition], max_results: int, event_type_cache_size: int) -> TimeseriesDb:
33def ext_create(env: environment.Environment, 34 txn: lmdb.Transaction, 35 conditions: Conditions, 36 partitions: Iterable[Partition], 37 max_results: int, 38 event_type_cache_size: int 39 ) -> 'TimeseriesDb': 40 db = TimeseriesDb() 41 db._env = env 42 db._conditions = conditions 43 db._max_results = max_results 44 db._changes = collections.defaultdict(collections.deque) 45 db._event_type_partitions = {} # type: dict[common.EventType, Collection[tuple[common.PartitionId, Partition]]] # NOQA 46 47 # depending on dict order 48 db._partitions = dict(_ext_init_partitions(env, txn, partitions)) 49 50 partitions_collection = common.create_event_type_collection( 51 (partition.subscription, (partition_id, partition)) 52 for partition_id, partition in db._partitions.items()) 53 54 db._get_event_type_partitions = functools.lru_cache( 55 maxsize=event_type_cache_size)( 56 lambda event_type: list(partitions_collection.get(event_type))) 57 58 return db
class
TimeseriesDb:
61class TimeseriesDb: 62 63 def add(self, 64 event: common.Event 65 ) -> Iterable[common.EventRef]: 66 partitions = self._get_event_type_partitions(event.type) 67 68 for partition_id, partition in partitions: 69 if partition.order_by == common.OrderBy.TIMESTAMP: 70 timestamp = event.timestamp 71 72 elif partition.order_by == common.OrderBy.SOURCE_TIMESTAMP: 73 if event.source_timestamp is None: 74 continue 75 76 timestamp = event.source_timestamp 77 78 else: 79 raise ValueError('unsupported order by') 80 81 self._changes[partition_id].append((timestamp, event)) 82 83 yield common.TimeseriesEventRef( 84 (partition_id, timestamp, event.id)) 85 86 async def query(self, 87 params: common.QueryTimeseriesParams 88 ) -> common.QueryResult: 89 subscription = (common.create_subscription(params.event_types) 90 if params.event_types is not None else None) 91 92 max_results = (params.max_results 93 if params.max_results is not None and 94 params.max_results < self._max_results 95 else self._max_results) 96 97 for partition_id, partition in self._partitions.items(): 98 if partition.order_by != params.order_by: 99 continue 100 101 if (subscription and 102 subscription.isdisjoint(partition.subscription)): 103 continue 104 105 return await self._query_partition(partition_id, params, 106 subscription, max_results) 107 108 return common.QueryResult(events=[], 109 more_follows=False) 110 111 def create_changes(self) -> Changes: 112 changes, self._changes = (self._changes, 113 collections.defaultdict(collections.deque)) 114 return changes 115 116 def ext_write(self, 117 txn: lmdb.Transaction, 118 changes: Changes): 119 data = (((partition_id, timestamp, event.id), event) 120 for partition_id, partition_changes in changes.items() 121 for timestamp, event in partition_changes) 122 self._env.ext_write(txn, common.DbType.TIMESERIES_DATA, data) 123 124 counts = ((partition_id, len(partition_changes)) 125 for partition_id, partition_changes in changes.items()) 126 _ext_inc_partition_count(self._env, txn, counts) 127 128 def ext_cleanup(self, 129 txn: lmdb.Transaction, 130 now: common.Timestamp, 131 max_results: int | None = None, 132 ) -> collections.deque[tuple[common.EventId, 133 common.EventRef]]: 134 result = collections.deque() 135 136 for partition_id, partition in self._partitions.items(): 137 if not partition.limit: 138 continue 139 140 partition_max_results = (max_results - len(result) 141 if max_results is not None else None) 142 if partition_max_results is not None and partition_max_results < 1: 143 break 144 145 result.extend(_ext_cleanup_partition(self._env, txn, now, 146 partition_id, partition.limit, 147 partition_max_results)) 148 149 return result 150 151 async def _query_partition(self, partition_id, params, subscription, 152 max_results): 153 events = collections.deque() 154 changes = self._changes 155 156 filter = _Filter(subscription=subscription, 157 t_from=params.t_from, 158 t_to=params.t_to, 159 source_t_from=params.source_t_from, 160 source_t_to=params.source_t_to, 161 max_results=max_results + 1, 162 last_event_id=params.last_event_id) 163 164 if params.order == common.Order.DESCENDING: 165 events.extend(_query_partition_changes( 166 changes[partition_id], params, filter)) 167 168 if not filter.done: 169 events.extend(await self._env.execute( 170 _ext_query_partition_events, self._env, self._conditions, 171 partition_id, params, filter)) 172 173 elif params.order == common.Order.ASCENDING: 174 events.extend(await self._env.execute( 175 _ext_query_partition_events, self._env, self._conditions, 176 partition_id, params, filter)) 177 178 if not filter.done: 179 events.extend(_query_partition_changes( 180 changes[partition_id], params, filter)) 181 182 else: 183 raise ValueError('unsupported order') 184 185 more_follows = len(events) > max_results 186 while len(events) > max_results: 187 events.pop() 188 189 return common.QueryResult(events=events, 190 more_follows=more_follows)
def
add( self, event: hat.event.common.Event) -> Iterable[hat.event.backends.lmdb.common.LatestEventRef | hat.event.backends.lmdb.common.TimeseriesEventRef]:
63 def add(self, 64 event: common.Event 65 ) -> Iterable[common.EventRef]: 66 partitions = self._get_event_type_partitions(event.type) 67 68 for partition_id, partition in partitions: 69 if partition.order_by == common.OrderBy.TIMESTAMP: 70 timestamp = event.timestamp 71 72 elif partition.order_by == common.OrderBy.SOURCE_TIMESTAMP: 73 if event.source_timestamp is None: 74 continue 75 76 timestamp = event.source_timestamp 77 78 else: 79 raise ValueError('unsupported order by') 80 81 self._changes[partition_id].append((timestamp, event)) 82 83 yield common.TimeseriesEventRef( 84 (partition_id, timestamp, event.id))
async def
query( self, params: hat.event.common.QueryTimeseriesParams) -> hat.event.common.QueryResult:
86 async def query(self, 87 params: common.QueryTimeseriesParams 88 ) -> common.QueryResult: 89 subscription = (common.create_subscription(params.event_types) 90 if params.event_types is not None else None) 91 92 max_results = (params.max_results 93 if params.max_results is not None and 94 params.max_results < self._max_results 95 else self._max_results) 96 97 for partition_id, partition in self._partitions.items(): 98 if partition.order_by != params.order_by: 99 continue 100 101 if (subscription and 102 subscription.isdisjoint(partition.subscription)): 103 continue 104 105 return await self._query_partition(partition_id, params, 106 subscription, max_results) 107 108 return common.QueryResult(events=[], 109 more_follows=False)
def
create_changes( self) -> dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]]:
def
ext_write( self, txn: Transaction, changes: dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]]):
116 def ext_write(self, 117 txn: lmdb.Transaction, 118 changes: Changes): 119 data = (((partition_id, timestamp, event.id), event) 120 for partition_id, partition_changes in changes.items() 121 for timestamp, event in partition_changes) 122 self._env.ext_write(txn, common.DbType.TIMESERIES_DATA, data) 123 124 counts = ((partition_id, len(partition_changes)) 125 for partition_id, partition_changes in changes.items()) 126 _ext_inc_partition_count(self._env, txn, counts)
def
ext_cleanup( self, txn: Transaction, now: hat.event.common.Timestamp, max_results: int | None = None) -> collections.deque[tuple[hat.event.common.EventId, hat.event.backends.lmdb.common.LatestEventRef | hat.event.backends.lmdb.common.TimeseriesEventRef]]:
128 def ext_cleanup(self, 129 txn: lmdb.Transaction, 130 now: common.Timestamp, 131 max_results: int | None = None, 132 ) -> collections.deque[tuple[common.EventId, 133 common.EventRef]]: 134 result = collections.deque() 135 136 for partition_id, partition in self._partitions.items(): 137 if not partition.limit: 138 continue 139 140 partition_max_results = (max_results - len(result) 141 if max_results is not None else None) 142 if partition_max_results is not None and partition_max_results < 1: 143 break 144 145 result.extend(_ext_cleanup_partition(self._env, txn, now, 146 partition_id, partition.limit, 147 partition_max_results)) 148 149 return result