hat.event.backends.lmdb.timeseriesdb

  1from collections.abc import Iterable
  2import collections
  3import itertools
  4import typing
  5
  6import lmdb
  7
  8from hat.event.backends.lmdb import common
  9from hat.event.backends.lmdb import environment
 10from hat.event.backends.lmdb.conditions import Conditions
 11
 12
 13Changes: typing.TypeAlias = dict[common.PartitionId,
 14                                 collections.deque[tuple[common.Timestamp,
 15                                                         common.Event]]]
 16
 17
 18class Limit(typing.NamedTuple):
 19    min_entries: int | None = None
 20    max_entries: int | None = None
 21    duration: float | None = None
 22    size: int | None = None
 23
 24
 25class Partition(typing.NamedTuple):
 26    order_by: common.OrderBy
 27    subscription: common.Subscription
 28    limit: Limit | None
 29
 30
 31def ext_create(env: environment.Environment,
 32               txn: lmdb.Transaction,
 33               conditions: Conditions,
 34               partitions: Iterable[Partition],
 35               max_results: int = 4096
 36               ) -> 'TimeseriesDb':
 37    db = TimeseriesDb()
 38    db._env = env
 39    db._conditions = conditions
 40    db._max_results = max_results
 41    db._changes = collections.defaultdict(collections.deque)
 42
 43    # depending on dict order
 44    db._partitions = dict(_ext_init_partitions(env, txn, partitions))
 45
 46    return db
 47
 48
 49class TimeseriesDb:
 50
 51    def add(self,
 52            event: common.Event
 53            ) -> Iterable[common.EventRef]:
 54        for partition_id, partition in self._partitions.items():
 55            if not partition.subscription.matches(event.type):
 56                continue
 57
 58            if partition.order_by == common.OrderBy.TIMESTAMP:
 59                timestamp = event.timestamp
 60
 61            elif partition.order_by == common.OrderBy.SOURCE_TIMESTAMP:
 62                if event.source_timestamp is None:
 63                    continue
 64
 65                timestamp = event.source_timestamp
 66
 67            else:
 68                raise ValueError('unsupported order by')
 69
 70            self._changes[partition_id].append((timestamp, event))
 71
 72            yield common.TimeseriesEventRef(
 73                (partition_id, timestamp, event.id))
 74
 75    async def query(self,
 76                    params: common.QueryTimeseriesParams
 77                    ) -> common.QueryResult:
 78        subscription = (common.create_subscription(params.event_types)
 79                        if params.event_types is not None else None)
 80
 81        max_results = (params.max_results
 82                       if params.max_results is not None and
 83                       params.max_results < self._max_results
 84                       else self._max_results)
 85
 86        for partition_id, partition in self._partitions.items():
 87            if partition.order_by != params.order_by:
 88                continue
 89
 90            if (subscription and
 91                    subscription.isdisjoint(partition.subscription)):
 92                continue
 93
 94            return await self._query_partition(partition_id, params,
 95                                               subscription, max_results)
 96
 97        return common.QueryResult(events=[],
 98                                  more_follows=False)
 99
100    def create_changes(self) -> Changes:
101        changes, self._changes = (self._changes,
102                                  collections.defaultdict(collections.deque))
103        return changes
104
105    def ext_write(self,
106                  txn: lmdb.Transaction,
107                  changes: Changes):
108        data = (((partition_id, timestamp, event.id), event)
109                for partition_id, partition_changes in changes.items()
110                for timestamp, event in partition_changes)
111        self._env.ext_write(txn, common.DbType.TIMESERIES_DATA, data)
112
113        counts = ((partition_id, len(partition_changes))
114                  for partition_id, partition_changes in changes.items())
115        _ext_inc_partition_count(self._env, txn, counts)
116
117    def ext_cleanup(self,
118                    txn: lmdb.Transaction,
119                    now: common.Timestamp,
120                    max_results: int | None = None,
121                    ) -> collections.deque[tuple[common.EventId,
122                                                 common.EventRef]]:
123        result = collections.deque()
124
125        for partition_id, partition in self._partitions.items():
126            if not partition.limit:
127                continue
128
129            partition_max_results = (max_results - len(result)
130                                     if max_results is not None else None)
131            if partition_max_results is not None and partition_max_results < 1:
132                break
133
134            result.extend(_ext_cleanup_partition(self._env, txn, now,
135                                                 partition_id, partition.limit,
136                                                 partition_max_results))
137
138        return result
139
140    async def _query_partition(self, partition_id, params, subscription,
141                               max_results):
142        events = collections.deque()
143        changes = self._changes
144
145        filter = _Filter(subscription=subscription,
146                         t_from=params.t_from,
147                         t_to=params.t_to,
148                         source_t_from=params.source_t_from,
149                         source_t_to=params.source_t_to,
150                         max_results=max_results + 1,
151                         last_event_id=params.last_event_id)
152
153        if params.order == common.Order.DESCENDING:
154            events.extend(_query_partition_changes(
155                changes[partition_id], params, filter))
156
157            if not filter.done:
158                events.extend(await self._env.execute(
159                    _ext_query_partition_events, self._env, self._conditions,
160                    partition_id, params, filter))
161
162        elif params.order == common.Order.ASCENDING:
163            events.extend(await self._env.execute(
164                _ext_query_partition_events, self._env, self._conditions,
165                partition_id, params, filter))
166
167            if not filter.done:
168                events.extend(_query_partition_changes(
169                    changes[partition_id], params, filter))
170
171        else:
172            raise ValueError('unsupported order')
173
174        more_follows = len(events) > max_results
175        while len(events) > max_results:
176            events.pop()
177
178        return common.QueryResult(events=events,
179                                  more_follows=more_follows)
180
181
182def _ext_init_partitions(env, txn, partitions):
183    db_data = dict(env.ext_read(txn, common.DbType.TIMESERIES_PARTITION))
184    next_partition_ids = itertools.count(max(db_data.keys(), default=0) + 1)
185
186    for partition in partitions:
187        event_types = sorted(partition.subscription.get_query_types())
188        partition_data = {'order': partition.order_by.value,
189                          'subscriptions': [list(i) for i in event_types]}
190
191        for partition_id, i in db_data.items():
192            if i == partition_data:
193                break
194
195        else:
196            partition_id = next(next_partition_ids)
197            db_data[partition_id] = partition_data
198            env.ext_write(txn, common.DbType.TIMESERIES_PARTITION,
199                          [(partition_id, partition_data)])
200
201        yield partition_id, partition
202
203
204def _ext_query_partition_events(env, conditions, partition_id, params,
205                                filter):
206    if params.order_by == common.OrderBy.TIMESTAMP:
207        events = _ext_query_partition_events_range(
208            env, partition_id, params.t_from, params.t_to, params.order)
209
210    elif params.order_by == common.OrderBy.SOURCE_TIMESTAMP:
211        events = _ext_query_partition_events_range(
212            env, partition_id, params.source_t_from, params.source_t_to,
213            params.order)
214
215    else:
216        raise ValueError('unsupported order by')
217
218    events = (event for event in events if conditions.matches(event))
219    events = filter.process(events)
220    return collections.deque(events)
221
222
223def _ext_query_partition_events_range(env, partition_id, t_from, t_to, order):
224    db_def = common.db_defs[common.DbType.TIMESERIES_DATA]
225
226    if not t_from:
227        t_from = common.min_timestamp
228
229    from_key = partition_id, t_from, common.EventId(0, 0, 0)
230    encoded_from_key = db_def.encode_key(from_key)
231
232    if not t_to:
233        t_to = common.max_timestamp
234
235    to_key = (partition_id,
236              t_to,
237              common.EventId((1 << 64) - 1, (1 << 64) - 1, (1 << 64) - 1))
238    encoded_to_key = db_def.encode_key(to_key)
239
240    with env.ext_begin() as txn:
241        with env.ext_cursor(txn, common.DbType.TIMESERIES_DATA) as cursor:
242            if order == common.Order.DESCENDING:
243                encoded_start_key, encoded_stop_key = (encoded_to_key,
244                                                       encoded_from_key)
245
246                if cursor.set_range(encoded_start_key):
247                    more = cursor.prev()
248                else:
249                    more = cursor.last()
250
251                while more and encoded_stop_key <= bytes(cursor.key()):
252                    yield db_def.decode_value(cursor.value())
253                    more = cursor.prev()
254
255            elif order == common.Order.ASCENDING:
256                encoded_start_key, encoded_stop_key = (encoded_from_key,
257                                                       encoded_to_key)
258
259                more = cursor.set_range(encoded_start_key)
260
261                while more and bytes(cursor.key()) <= encoded_stop_key:
262                    yield db_def.decode_value(cursor.value())
263                    more = cursor.next()
264
265            else:
266                raise ValueError('unsupported order')
267
268
269def _ext_get_partition_count(env, txn, partition_id):
270    db_def = common.db_defs[common.DbType.TIMESERIES_COUNT]
271
272    with env.ext_cursor(txn, common.DbType.TIMESERIES_COUNT) as cursor:
273        encoded_key = db_def.encode_key(partition_id)
274        encoded_value = cursor.get(encoded_key)
275
276        return db_def.decode_value(encoded_value) if encoded_value else 0
277
278
279def _ext_set_partition_count(env, txn, partition_id, count):
280    env.ext_write(txn, common.DbType.TIMESERIES_COUNT, [(partition_id, count)])
281
282
283def _ext_inc_partition_count(env, txn, partition_counts):
284    db_def = common.db_defs[common.DbType.TIMESERIES_COUNT]
285
286    with env.ext_cursor(txn, common.DbType.TIMESERIES_COUNT) as cursor:
287        for partition_id, count in partition_counts:
288            encoded_key = db_def.encode_key(partition_id)
289            encoded_value = cursor.get(encoded_key)
290
291            value = db_def.decode_value(encoded_value) if encoded_value else 0
292            inc_value = value + count
293
294            encoded_value = db_def.encode_value(inc_value)
295            cursor.put(encoded_key, encoded_value)
296
297
298def _ext_cleanup_partition(env, txn, now, partition_id, limit, max_results):
299    db_def = common.db_defs[common.DbType.TIMESERIES_DATA]
300
301    timestamp = common.min_timestamp
302    start_key = (partition_id,
303                 timestamp,
304                 common.EventId(0, 0, 0))
305    stop_key = ((partition_id + 1),
306                timestamp,
307                common.EventId(0, 0, 0))
308
309    encoded_start_key = db_def.encode_key(start_key)
310    encoded_stop_key = db_def.encode_key(stop_key)
311
312    min_entries = limit.min_entries or 0
313    max_entries = None
314    encoded_duration_key = None
315
316    if limit.size is not None:
317        stat = env.ext_stat(txn, common.DbType.TIMESERIES_DATA)
318
319        if stat['entries']:
320            total_size = stat['psize'] * (stat['branch_pages'] +
321                                          stat['leaf_pages'] +
322                                          stat['overflow_pages'])
323            entry_size = total_size / stat['entries']
324            max_entries = int(limit.size / entry_size)
325
326    if limit.max_entries is not None:
327        max_entries = (limit.max_entries if max_entries is None
328                       else min(max_entries, limit.max_entries))
329
330    if limit.duration is not None:
331        duration_key = (partition_id,
332                        now.add(-limit.duration),
333                        common.EventId(0, 0, 0))
334        encoded_duration_key = db_def.encode_key(duration_key)
335
336    result_count = 0
337    entries_count = _ext_get_partition_count(env, txn, partition_id)
338
339    with env.ext_cursor(txn, common.DbType.TIMESERIES_DATA) as cursor:
340        more = cursor.set_range(encoded_start_key)
341        while more:
342            if max_results is not None and result_count >= max_results:
343                break
344
345            if entries_count - result_count <= min_entries:
346                break
347
348            encoded_key = bytes(cursor.key())
349            if encoded_key >= encoded_stop_key:
350                break
351
352            if ((max_entries is None or
353                 entries_count - result_count <= max_entries) and
354                (encoded_duration_key is None or
355                 encoded_key >= encoded_duration_key)):
356                break
357
358            key = db_def.decode_key(encoded_key)
359            event_id = key[2]
360
361            more = cursor.delete()
362            result_count += 1
363
364            yield event_id, common.TimeseriesEventRef(key)
365
366    if result_count > 0:
367        _ext_set_partition_count(env, txn, partition_id,
368                                 entries_count - result_count)
369
370
371def _query_partition_changes(changes, params, filter):
372    if params.order == common.Order.DESCENDING:
373        events = (event for _, event in reversed(changes))
374
375        if (params.order_by == common.OrderBy.TIMESTAMP and
376                params.t_to is not None):
377            events = itertools.dropwhile(
378                lambda i: params.t_to < i.timestamp,
379                events)
380
381        elif (params.order_by == common.OrderBy.SOURCE_TIMESTAMP and
382                params.source_t_to is not None):
383            events = itertools.dropwhile(
384                lambda i: params.source_t_to < i.source_timestamp,
385                events)
386
387        if (params.order_by == common.OrderBy.TIMESTAMP and
388                params.t_from is not None):
389            events = itertools.takewhile(
390                lambda i: params.t_from <= i.timestamp,
391                events)
392
393        elif (params.order_by == common.OrderBy.SOURCE_TIMESTAMP and
394                params.source_t_from is not None):
395            events = itertools.takewhile(
396                lambda i: params.source_t_from <= i.source_timestamp,
397                events)
398
399    elif params.order == common.Order.ASCENDING:
400        events = (event for _, event in changes)
401
402        if (params.order_by == common.OrderBy.TIMESTAMP and
403                params.t_from is not None):
404            events = itertools.dropwhile(
405                lambda i: i.timestamp < params.t_from,
406                events)
407
408        elif (params.order_by == common.OrderBy.SOURCE_TIMESTAMP and
409                params.source_t_from is not None):
410            events = itertools.dropwhile(
411                lambda i: i.source_timestamp < params.source_t_from,
412                events)
413
414        if (params.order_by == common.OrderBy.TIMESTAMP and
415                params.t_to is not None):
416            events = itertools.takewhile(
417                lambda i: i.timestamp <= params.t_to,
418                events)
419
420        elif (params.order_by == common.OrderBy.SOURCE_TIMESTAMP and
421                params.source_t_to is not None):
422            events = itertools.takewhile(
423                lambda i: i.source_timestamp <= params.source_t_to,
424                events)
425
426    else:
427        raise ValueError('unsupported order')
428
429    return filter.process(events)
430
431
432class _Filter:
433
434    def __init__(self,
435                 subscription: common.Subscription,
436                 t_from: common.Timestamp | None,
437                 t_to: common.Timestamp | None,
438                 source_t_from: common.Timestamp | None,
439                 source_t_to: common.Timestamp | None,
440                 max_results: int,
441                 last_event_id: common.EventId | None):
442        self._subscription = subscription
443        self._t_from = t_from
444        self._t_to = t_to
445        self._source_t_from = source_t_from
446        self._source_t_to = source_t_to
447        self._max_results = max_results
448        self._last_event_id = last_event_id
449
450    @property
451    def done(self):
452        return self._max_results < 1
453
454    def process(self, events: Iterable[common.Event]):
455        for event in events:
456            if self._max_results < 1:
457                return
458
459            if self._last_event_id:
460                if event.id == self._last_event_id:
461                    self._last_event_id = None
462
463                continue
464
465            if self._t_from is not None and event.timestamp < self._t_from:
466                continue
467
468            if self._t_to is not None and self._t_to < event.timestamp:
469                continue
470
471            if self._source_t_from is not None and (
472                    event.source_timestamp is None or
473                    event.source_timestamp < self._source_t_from):
474                continue
475
476            if self._source_t_to is not None and (
477                    event.source_timestamp is None or
478                    self._source_t_to < event.source_timestamp):
479                continue
480
481            if (self._subscription and
482                    not self._subscription.matches(event.type)):
483                continue
484
485            self._max_results -= 1
486            yield event
Changes: TypeAlias = dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]]
class Limit(typing.NamedTuple):
19class Limit(typing.NamedTuple):
20    min_entries: int | None = None
21    max_entries: int | None = None
22    duration: float | None = None
23    size: int | None = None

Limit(min_entries, max_entries, duration, size)

Limit( min_entries: int | None = None, max_entries: int | None = None, duration: float | None = None, size: int | None = None)

Create new instance of Limit(min_entries, max_entries, duration, size)

min_entries: int | None

Alias for field number 0

max_entries: int | None

Alias for field number 1

duration: float | None

Alias for field number 2

size: int | None

Alias for field number 3

class Partition(typing.NamedTuple):
26class Partition(typing.NamedTuple):
27    order_by: common.OrderBy
28    subscription: common.Subscription
29    limit: Limit | None

Partition(order_by, subscription, limit)

Partition( order_by: hat.event.common.OrderBy, subscription: hat.event.common.Subscription, limit: Limit | None)

Create new instance of Partition(order_by, subscription, limit)

Alias for field number 0

Alias for field number 1

limit: Limit | None

Alias for field number 2

def ext_create( env: hat.event.backends.lmdb.environment.Environment, txn: Transaction, conditions: hat.event.backends.lmdb.conditions.Conditions, partitions: Iterable[Partition], max_results: int = 4096) -> TimeseriesDb:
32def ext_create(env: environment.Environment,
33               txn: lmdb.Transaction,
34               conditions: Conditions,
35               partitions: Iterable[Partition],
36               max_results: int = 4096
37               ) -> 'TimeseriesDb':
38    db = TimeseriesDb()
39    db._env = env
40    db._conditions = conditions
41    db._max_results = max_results
42    db._changes = collections.defaultdict(collections.deque)
43
44    # depending on dict order
45    db._partitions = dict(_ext_init_partitions(env, txn, partitions))
46
47    return db
class TimeseriesDb:
 50class TimeseriesDb:
 51
 52    def add(self,
 53            event: common.Event
 54            ) -> Iterable[common.EventRef]:
 55        for partition_id, partition in self._partitions.items():
 56            if not partition.subscription.matches(event.type):
 57                continue
 58
 59            if partition.order_by == common.OrderBy.TIMESTAMP:
 60                timestamp = event.timestamp
 61
 62            elif partition.order_by == common.OrderBy.SOURCE_TIMESTAMP:
 63                if event.source_timestamp is None:
 64                    continue
 65
 66                timestamp = event.source_timestamp
 67
 68            else:
 69                raise ValueError('unsupported order by')
 70
 71            self._changes[partition_id].append((timestamp, event))
 72
 73            yield common.TimeseriesEventRef(
 74                (partition_id, timestamp, event.id))
 75
 76    async def query(self,
 77                    params: common.QueryTimeseriesParams
 78                    ) -> common.QueryResult:
 79        subscription = (common.create_subscription(params.event_types)
 80                        if params.event_types is not None else None)
 81
 82        max_results = (params.max_results
 83                       if params.max_results is not None and
 84                       params.max_results < self._max_results
 85                       else self._max_results)
 86
 87        for partition_id, partition in self._partitions.items():
 88            if partition.order_by != params.order_by:
 89                continue
 90
 91            if (subscription and
 92                    subscription.isdisjoint(partition.subscription)):
 93                continue
 94
 95            return await self._query_partition(partition_id, params,
 96                                               subscription, max_results)
 97
 98        return common.QueryResult(events=[],
 99                                  more_follows=False)
100
101    def create_changes(self) -> Changes:
102        changes, self._changes = (self._changes,
103                                  collections.defaultdict(collections.deque))
104        return changes
105
106    def ext_write(self,
107                  txn: lmdb.Transaction,
108                  changes: Changes):
109        data = (((partition_id, timestamp, event.id), event)
110                for partition_id, partition_changes in changes.items()
111                for timestamp, event in partition_changes)
112        self._env.ext_write(txn, common.DbType.TIMESERIES_DATA, data)
113
114        counts = ((partition_id, len(partition_changes))
115                  for partition_id, partition_changes in changes.items())
116        _ext_inc_partition_count(self._env, txn, counts)
117
118    def ext_cleanup(self,
119                    txn: lmdb.Transaction,
120                    now: common.Timestamp,
121                    max_results: int | None = None,
122                    ) -> collections.deque[tuple[common.EventId,
123                                                 common.EventRef]]:
124        result = collections.deque()
125
126        for partition_id, partition in self._partitions.items():
127            if not partition.limit:
128                continue
129
130            partition_max_results = (max_results - len(result)
131                                     if max_results is not None else None)
132            if partition_max_results is not None and partition_max_results < 1:
133                break
134
135            result.extend(_ext_cleanup_partition(self._env, txn, now,
136                                                 partition_id, partition.limit,
137                                                 partition_max_results))
138
139        return result
140
141    async def _query_partition(self, partition_id, params, subscription,
142                               max_results):
143        events = collections.deque()
144        changes = self._changes
145
146        filter = _Filter(subscription=subscription,
147                         t_from=params.t_from,
148                         t_to=params.t_to,
149                         source_t_from=params.source_t_from,
150                         source_t_to=params.source_t_to,
151                         max_results=max_results + 1,
152                         last_event_id=params.last_event_id)
153
154        if params.order == common.Order.DESCENDING:
155            events.extend(_query_partition_changes(
156                changes[partition_id], params, filter))
157
158            if not filter.done:
159                events.extend(await self._env.execute(
160                    _ext_query_partition_events, self._env, self._conditions,
161                    partition_id, params, filter))
162
163        elif params.order == common.Order.ASCENDING:
164            events.extend(await self._env.execute(
165                _ext_query_partition_events, self._env, self._conditions,
166                partition_id, params, filter))
167
168            if not filter.done:
169                events.extend(_query_partition_changes(
170                    changes[partition_id], params, filter))
171
172        else:
173            raise ValueError('unsupported order')
174
175        more_follows = len(events) > max_results
176        while len(events) > max_results:
177            events.pop()
178
179        return common.QueryResult(events=events,
180                                  more_follows=more_follows)
52    def add(self,
53            event: common.Event
54            ) -> Iterable[common.EventRef]:
55        for partition_id, partition in self._partitions.items():
56            if not partition.subscription.matches(event.type):
57                continue
58
59            if partition.order_by == common.OrderBy.TIMESTAMP:
60                timestamp = event.timestamp
61
62            elif partition.order_by == common.OrderBy.SOURCE_TIMESTAMP:
63                if event.source_timestamp is None:
64                    continue
65
66                timestamp = event.source_timestamp
67
68            else:
69                raise ValueError('unsupported order by')
70
71            self._changes[partition_id].append((timestamp, event))
72
73            yield common.TimeseriesEventRef(
74                (partition_id, timestamp, event.id))
async def query( self, params: hat.event.common.QueryTimeseriesParams) -> hat.event.common.QueryResult:
76    async def query(self,
77                    params: common.QueryTimeseriesParams
78                    ) -> common.QueryResult:
79        subscription = (common.create_subscription(params.event_types)
80                        if params.event_types is not None else None)
81
82        max_results = (params.max_results
83                       if params.max_results is not None and
84                       params.max_results < self._max_results
85                       else self._max_results)
86
87        for partition_id, partition in self._partitions.items():
88            if partition.order_by != params.order_by:
89                continue
90
91            if (subscription and
92                    subscription.isdisjoint(partition.subscription)):
93                continue
94
95            return await self._query_partition(partition_id, params,
96                                               subscription, max_results)
97
98        return common.QueryResult(events=[],
99                                  more_follows=False)
def create_changes( self) -> dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]]:
101    def create_changes(self) -> Changes:
102        changes, self._changes = (self._changes,
103                                  collections.defaultdict(collections.deque))
104        return changes
def ext_write( self, txn: Transaction, changes: dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]]):
106    def ext_write(self,
107                  txn: lmdb.Transaction,
108                  changes: Changes):
109        data = (((partition_id, timestamp, event.id), event)
110                for partition_id, partition_changes in changes.items()
111                for timestamp, event in partition_changes)
112        self._env.ext_write(txn, common.DbType.TIMESERIES_DATA, data)
113
114        counts = ((partition_id, len(partition_changes))
115                  for partition_id, partition_changes in changes.items())
116        _ext_inc_partition_count(self._env, txn, counts)
def ext_cleanup( self, txn: Transaction, now: hat.event.common.Timestamp, max_results: int | None = None) -> collections.deque[tuple[hat.event.common.EventId, hat.event.backends.lmdb.common.LatestEventRef | hat.event.backends.lmdb.common.TimeseriesEventRef]]:
118    def ext_cleanup(self,
119                    txn: lmdb.Transaction,
120                    now: common.Timestamp,
121                    max_results: int | None = None,
122                    ) -> collections.deque[tuple[common.EventId,
123                                                 common.EventRef]]:
124        result = collections.deque()
125
126        for partition_id, partition in self._partitions.items():
127            if not partition.limit:
128                continue
129
130            partition_max_results = (max_results - len(result)
131                                     if max_results is not None else None)
132            if partition_max_results is not None and partition_max_results < 1:
133                break
134
135            result.extend(_ext_cleanup_partition(self._env, txn, now,
136                                                 partition_id, partition.limit,
137                                                 partition_max_results))
138
139        return result