hat.event.backends.lmdb.timeseriesdb

  1from collections.abc import Iterable
  2import collections
  3import functools
  4import itertools
  5import typing
  6
  7import lmdb
  8
  9from hat.event.backends.lmdb import common
 10from hat.event.backends.lmdb import environment
 11from hat.event.backends.lmdb.conditions import Conditions
 12
 13
 14Changes: typing.TypeAlias = dict[common.PartitionId,
 15                                 collections.deque[tuple[common.Timestamp,
 16                                                         common.Event]]]
 17
 18
 19class Limit(typing.NamedTuple):
 20    min_entries: int | None = None
 21    max_entries: int | None = None
 22    duration: float | None = None
 23    size: int | None = None
 24
 25
 26class Partition(typing.NamedTuple):
 27    order_by: common.OrderBy
 28    subscription: common.Subscription
 29    limit: Limit | None
 30
 31
 32def ext_create(env: environment.Environment,
 33               txn: lmdb.Transaction,
 34               conditions: Conditions,
 35               partitions: Iterable[Partition],
 36               max_results: int,
 37               event_type_cache_size: int
 38               ) -> 'TimeseriesDb':
 39    db = TimeseriesDb()
 40    db._env = env
 41    db._conditions = conditions
 42    db._max_results = max_results
 43    db._changes = collections.defaultdict(collections.deque)
 44    db._event_type_partitions = {}  # type: dict[common.EventType, Collection[tuple[common.PartitionId, Partition]]]  # NOQA
 45
 46    # depending on dict order
 47    db._partitions = dict(_ext_init_partitions(env, txn, partitions))
 48
 49    partitions_collection = common.create_event_type_collection(
 50        (partition.subscription, (partition_id, partition))
 51        for partition_id, partition in db._partitions.items())
 52
 53    db._get_event_type_partitions = functools.lru_cache(
 54        maxsize=event_type_cache_size)(
 55        lambda event_type: list(partitions_collection.get(event_type)))
 56
 57    return db
 58
 59
 60class TimeseriesDb:
 61
 62    def add(self,
 63            event: common.Event
 64            ) -> Iterable[common.EventRef]:
 65        partitions = self._get_event_type_partitions(event.type)
 66
 67        for partition_id, partition in partitions:
 68            if partition.order_by == common.OrderBy.TIMESTAMP:
 69                timestamp = event.timestamp
 70
 71            elif partition.order_by == common.OrderBy.SOURCE_TIMESTAMP:
 72                if event.source_timestamp is None:
 73                    continue
 74
 75                timestamp = event.source_timestamp
 76
 77            else:
 78                raise ValueError('unsupported order by')
 79
 80            self._changes[partition_id].append((timestamp, event))
 81
 82            yield common.TimeseriesEventRef(
 83                (partition_id, timestamp, event.id))
 84
 85    async def query(self,
 86                    params: common.QueryTimeseriesParams
 87                    ) -> common.QueryResult:
 88        subscription = (common.create_subscription(params.event_types)
 89                        if params.event_types is not None else None)
 90
 91        max_results = (params.max_results
 92                       if params.max_results is not None and
 93                       params.max_results < self._max_results
 94                       else self._max_results)
 95
 96        for partition_id, partition in self._partitions.items():
 97            if partition.order_by != params.order_by:
 98                continue
 99
100            if (subscription and
101                    subscription.isdisjoint(partition.subscription)):
102                continue
103
104            return await self._query_partition(partition_id, params,
105                                               subscription, max_results)
106
107        return common.QueryResult(events=[],
108                                  more_follows=False)
109
110    def create_changes(self) -> Changes:
111        changes, self._changes = (self._changes,
112                                  collections.defaultdict(collections.deque))
113        return changes
114
115    def ext_write(self,
116                  txn: lmdb.Transaction,
117                  changes: Changes):
118        data = (((partition_id, timestamp, event.id), event)
119                for partition_id, partition_changes in changes.items()
120                for timestamp, event in partition_changes)
121        self._env.ext_write(txn, common.DbType.TIMESERIES_DATA, data)
122
123        counts = ((partition_id, len(partition_changes))
124                  for partition_id, partition_changes in changes.items())
125        _ext_inc_partition_count(self._env, txn, counts)
126
127    def ext_cleanup(self,
128                    txn: lmdb.Transaction,
129                    now: common.Timestamp,
130                    max_results: int | None = None,
131                    ) -> collections.deque[tuple[common.EventId,
132                                                 common.EventRef]]:
133        result = collections.deque()
134
135        for partition_id, partition in self._partitions.items():
136            if not partition.limit:
137                continue
138
139            partition_max_results = (max_results - len(result)
140                                     if max_results is not None else None)
141            if partition_max_results is not None and partition_max_results < 1:
142                break
143
144            result.extend(_ext_cleanup_partition(self._env, txn, now,
145                                                 partition_id, partition.limit,
146                                                 partition_max_results))
147
148        return result
149
150    async def _query_partition(self, partition_id, params, subscription,
151                               max_results):
152        events = collections.deque()
153        changes = self._changes
154
155        filter = _Filter(subscription=subscription,
156                         t_from=params.t_from,
157                         t_to=params.t_to,
158                         source_t_from=params.source_t_from,
159                         source_t_to=params.source_t_to,
160                         max_results=max_results + 1,
161                         last_event_id=params.last_event_id)
162
163        if params.order == common.Order.DESCENDING:
164            events.extend(_query_partition_changes(
165                changes[partition_id], params, filter))
166
167            if not filter.done:
168                events.extend(await self._env.execute(
169                    _ext_query_partition_events, self._env, self._conditions,
170                    partition_id, params, filter))
171
172        elif params.order == common.Order.ASCENDING:
173            events.extend(await self._env.execute(
174                _ext_query_partition_events, self._env, self._conditions,
175                partition_id, params, filter))
176
177            if not filter.done:
178                events.extend(_query_partition_changes(
179                    changes[partition_id], params, filter))
180
181        else:
182            raise ValueError('unsupported order')
183
184        more_follows = len(events) > max_results
185        while len(events) > max_results:
186            events.pop()
187
188        return common.QueryResult(events=events,
189                                  more_follows=more_follows)
190
191
192def _ext_init_partitions(env, txn, partitions):
193
194    def data_to_cache_key(data):
195        return (data['order'],
196                tuple(tuple(event_type)
197                      for event_type in data['subscriptions']))
198
199    max_partition_id = 0
200    cache = {}
201
202    for partition_id, partition_data in env.ext_read(
203            txn, common.DbType.TIMESERIES_PARTITION):
204        if partition_id > max_partition_id:
205            max_partition_id = partition_id
206
207        cache_key = data_to_cache_key(partition_data)
208        cache[cache_key] = partition_id
209
210    next_partition_ids = itertools.count(max_partition_id + 1)
211
212    for partition in partitions:
213        event_types = sorted(partition.subscription.get_query_types())
214        partition_data = {'order': partition.order_by.value,
215                          'subscriptions': [list(i) for i in event_types]}
216
217        cache_key = data_to_cache_key(partition_data)
218        partition_id = cache.get(cache_key)
219
220        if partition_id is None:
221            partition_id = next(next_partition_ids)
222            cache[cache_key] = partition_id
223            env.ext_write(txn, common.DbType.TIMESERIES_PARTITION,
224                          [(partition_id, partition_data)])
225
226        yield partition_id, partition
227
228
229def _ext_query_partition_events(env, conditions, partition_id, params,
230                                filter):
231    if params.order_by == common.OrderBy.TIMESTAMP:
232        events = _ext_query_partition_events_range(
233            env, partition_id, params.t_from, params.t_to, params.order)
234
235    elif params.order_by == common.OrderBy.SOURCE_TIMESTAMP:
236        events = _ext_query_partition_events_range(
237            env, partition_id, params.source_t_from, params.source_t_to,
238            params.order)
239
240    else:
241        raise ValueError('unsupported order by')
242
243    events = (event for event in events if conditions.matches(event))
244    events = filter.process(events)
245    return collections.deque(events)
246
247
248def _ext_query_partition_events_range(env, partition_id, t_from, t_to, order):
249    db_def = common.db_defs[common.DbType.TIMESERIES_DATA]
250
251    if not t_from:
252        t_from = common.min_timestamp
253
254    from_key = partition_id, t_from, common.EventId(0, 0, 0)
255    encoded_from_key = db_def.encode_key(from_key)
256
257    if not t_to:
258        t_to = common.max_timestamp
259
260    to_key = (partition_id,
261              t_to,
262              common.EventId((1 << 64) - 1, (1 << 64) - 1, (1 << 64) - 1))
263    encoded_to_key = db_def.encode_key(to_key)
264
265    with env.ext_begin() as txn:
266        with env.ext_cursor(txn, common.DbType.TIMESERIES_DATA) as cursor:
267            if order == common.Order.DESCENDING:
268                encoded_start_key, encoded_stop_key = (encoded_to_key,
269                                                       encoded_from_key)
270
271                if cursor.set_range(encoded_start_key):
272                    more = cursor.prev()
273                else:
274                    more = cursor.last()
275
276                while more and encoded_stop_key <= bytes(cursor.key()):
277                    yield db_def.decode_value(cursor.value())
278                    more = cursor.prev()
279
280            elif order == common.Order.ASCENDING:
281                encoded_start_key, encoded_stop_key = (encoded_from_key,
282                                                       encoded_to_key)
283
284                more = cursor.set_range(encoded_start_key)
285
286                while more and bytes(cursor.key()) <= encoded_stop_key:
287                    yield db_def.decode_value(cursor.value())
288                    more = cursor.next()
289
290            else:
291                raise ValueError('unsupported order')
292
293
294def _ext_get_partition_count(env, txn, partition_id):
295    db_def = common.db_defs[common.DbType.TIMESERIES_COUNT]
296
297    with env.ext_cursor(txn, common.DbType.TIMESERIES_COUNT) as cursor:
298        encoded_key = db_def.encode_key(partition_id)
299        encoded_value = cursor.get(encoded_key)
300
301        return db_def.decode_value(encoded_value) if encoded_value else 0
302
303
304def _ext_set_partition_count(env, txn, partition_id, count):
305    env.ext_write(txn, common.DbType.TIMESERIES_COUNT, [(partition_id, count)])
306
307
308def _ext_inc_partition_count(env, txn, partition_counts):
309    db_def = common.db_defs[common.DbType.TIMESERIES_COUNT]
310
311    with env.ext_cursor(txn, common.DbType.TIMESERIES_COUNT) as cursor:
312        for partition_id, count in partition_counts:
313            encoded_key = db_def.encode_key(partition_id)
314            encoded_value = cursor.get(encoded_key)
315
316            value = db_def.decode_value(encoded_value) if encoded_value else 0
317            inc_value = value + count
318
319            encoded_value = db_def.encode_value(inc_value)
320            cursor.put(encoded_key, encoded_value)
321
322
323def _ext_cleanup_partition(env, txn, now, partition_id, limit, max_results):
324    db_def = common.db_defs[common.DbType.TIMESERIES_DATA]
325
326    timestamp = common.min_timestamp
327    start_key = (partition_id,
328                 timestamp,
329                 common.EventId(0, 0, 0))
330    stop_key = ((partition_id + 1),
331                timestamp,
332                common.EventId(0, 0, 0))
333
334    encoded_start_key = db_def.encode_key(start_key)
335    encoded_stop_key = db_def.encode_key(stop_key)
336
337    min_entries = limit.min_entries or 0
338    max_entries = None
339    encoded_duration_key = None
340
341    if limit.size is not None:
342        stat = env.ext_stat(txn, common.DbType.TIMESERIES_DATA)
343
344        if stat['entries']:
345            total_size = stat['psize'] * (stat['branch_pages'] +
346                                          stat['leaf_pages'] +
347                                          stat['overflow_pages'])
348            entry_size = total_size / stat['entries']
349            max_entries = int(limit.size / entry_size)
350
351    if limit.max_entries is not None:
352        max_entries = (limit.max_entries if max_entries is None
353                       else min(max_entries, limit.max_entries))
354
355    if limit.duration is not None:
356        duration_key = (partition_id,
357                        now.add(-limit.duration),
358                        common.EventId(0, 0, 0))
359        encoded_duration_key = db_def.encode_key(duration_key)
360
361    result_count = 0
362    entries_count = _ext_get_partition_count(env, txn, partition_id)
363
364    with env.ext_cursor(txn, common.DbType.TIMESERIES_DATA) as cursor:
365        more = cursor.set_range(encoded_start_key)
366        while more:
367            if max_results is not None and result_count >= max_results:
368                break
369
370            if entries_count - result_count <= min_entries:
371                break
372
373            encoded_key = bytes(cursor.key())
374            if encoded_key >= encoded_stop_key:
375                break
376
377            if ((max_entries is None or
378                 entries_count - result_count <= max_entries) and
379                (encoded_duration_key is None or
380                 encoded_key >= encoded_duration_key)):
381                break
382
383            key = db_def.decode_key(encoded_key)
384            event_id = key[2]
385
386            more = cursor.delete()
387            result_count += 1
388
389            yield event_id, common.TimeseriesEventRef(key)
390
391    if result_count > 0:
392        _ext_set_partition_count(env, txn, partition_id,
393                                 entries_count - result_count)
394
395
396def _query_partition_changes(changes, params, filter):
397    if params.order == common.Order.DESCENDING:
398        events = (event for _, event in reversed(changes))
399
400        if (params.order_by == common.OrderBy.TIMESTAMP and
401                params.t_to is not None):
402            events = itertools.dropwhile(
403                lambda i: params.t_to < i.timestamp,
404                events)
405
406        elif (params.order_by == common.OrderBy.SOURCE_TIMESTAMP and
407                params.source_t_to is not None):
408            events = itertools.dropwhile(
409                lambda i: params.source_t_to < i.source_timestamp,
410                events)
411
412        if (params.order_by == common.OrderBy.TIMESTAMP and
413                params.t_from is not None):
414            events = itertools.takewhile(
415                lambda i: params.t_from <= i.timestamp,
416                events)
417
418        elif (params.order_by == common.OrderBy.SOURCE_TIMESTAMP and
419                params.source_t_from is not None):
420            events = itertools.takewhile(
421                lambda i: params.source_t_from <= i.source_timestamp,
422                events)
423
424    elif params.order == common.Order.ASCENDING:
425        events = (event for _, event in changes)
426
427        if (params.order_by == common.OrderBy.TIMESTAMP and
428                params.t_from is not None):
429            events = itertools.dropwhile(
430                lambda i: i.timestamp < params.t_from,
431                events)
432
433        elif (params.order_by == common.OrderBy.SOURCE_TIMESTAMP and
434                params.source_t_from is not None):
435            events = itertools.dropwhile(
436                lambda i: i.source_timestamp < params.source_t_from,
437                events)
438
439        if (params.order_by == common.OrderBy.TIMESTAMP and
440                params.t_to is not None):
441            events = itertools.takewhile(
442                lambda i: i.timestamp <= params.t_to,
443                events)
444
445        elif (params.order_by == common.OrderBy.SOURCE_TIMESTAMP and
446                params.source_t_to is not None):
447            events = itertools.takewhile(
448                lambda i: i.source_timestamp <= params.source_t_to,
449                events)
450
451    else:
452        raise ValueError('unsupported order')
453
454    return filter.process(events)
455
456
457class _Filter:
458
459    def __init__(self,
460                 subscription: common.Subscription,
461                 t_from: common.Timestamp | None,
462                 t_to: common.Timestamp | None,
463                 source_t_from: common.Timestamp | None,
464                 source_t_to: common.Timestamp | None,
465                 max_results: int,
466                 last_event_id: common.EventId | None):
467        self._subscription = subscription
468        self._t_from = t_from
469        self._t_to = t_to
470        self._source_t_from = source_t_from
471        self._source_t_to = source_t_to
472        self._max_results = max_results
473        self._last_event_id = last_event_id
474
475    @property
476    def done(self):
477        return self._max_results < 1
478
479    def process(self, events: Iterable[common.Event]):
480        for event in events:
481            if self._max_results < 1:
482                return
483
484            if self._last_event_id:
485                if event.id == self._last_event_id:
486                    self._last_event_id = None
487
488                continue
489
490            if self._t_from is not None and event.timestamp < self._t_from:
491                continue
492
493            if self._t_to is not None and self._t_to < event.timestamp:
494                continue
495
496            if self._source_t_from is not None and (
497                    event.source_timestamp is None or
498                    event.source_timestamp < self._source_t_from):
499                continue
500
501            if self._source_t_to is not None and (
502                    event.source_timestamp is None or
503                    self._source_t_to < event.source_timestamp):
504                continue
505
506            if (self._subscription and
507                    not self._subscription.matches(event.type)):
508                continue
509
510            self._max_results -= 1
511            yield event
Changes: TypeAlias = dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]]
class Limit(typing.NamedTuple):
20class Limit(typing.NamedTuple):
21    min_entries: int | None = None
22    max_entries: int | None = None
23    duration: float | None = None
24    size: int | None = None

Limit(min_entries, max_entries, duration, size)

Limit( min_entries: int | None = None, max_entries: int | None = None, duration: float | None = None, size: int | None = None)

Create new instance of Limit(min_entries, max_entries, duration, size)

min_entries: int | None

Alias for field number 0

max_entries: int | None

Alias for field number 1

duration: float | None

Alias for field number 2

size: int | None

Alias for field number 3

class Partition(typing.NamedTuple):
27class Partition(typing.NamedTuple):
28    order_by: common.OrderBy
29    subscription: common.Subscription
30    limit: Limit | None

Partition(order_by, subscription, limit)

Partition( order_by: hat.event.common.OrderBy, subscription: hat.event.common.Subscription, limit: Limit | None)

Create new instance of Partition(order_by, subscription, limit)

Alias for field number 0

Alias for field number 1

limit: Limit | None

Alias for field number 2

def ext_create( env: hat.event.backends.lmdb.environment.Environment, txn: Transaction, conditions: hat.event.backends.lmdb.conditions.Conditions, partitions: Iterable[Partition], max_results: int, event_type_cache_size: int) -> TimeseriesDb:
33def ext_create(env: environment.Environment,
34               txn: lmdb.Transaction,
35               conditions: Conditions,
36               partitions: Iterable[Partition],
37               max_results: int,
38               event_type_cache_size: int
39               ) -> 'TimeseriesDb':
40    db = TimeseriesDb()
41    db._env = env
42    db._conditions = conditions
43    db._max_results = max_results
44    db._changes = collections.defaultdict(collections.deque)
45    db._event_type_partitions = {}  # type: dict[common.EventType, Collection[tuple[common.PartitionId, Partition]]]  # NOQA
46
47    # depending on dict order
48    db._partitions = dict(_ext_init_partitions(env, txn, partitions))
49
50    partitions_collection = common.create_event_type_collection(
51        (partition.subscription, (partition_id, partition))
52        for partition_id, partition in db._partitions.items())
53
54    db._get_event_type_partitions = functools.lru_cache(
55        maxsize=event_type_cache_size)(
56        lambda event_type: list(partitions_collection.get(event_type)))
57
58    return db
class TimeseriesDb:
 61class TimeseriesDb:
 62
 63    def add(self,
 64            event: common.Event
 65            ) -> Iterable[common.EventRef]:
 66        partitions = self._get_event_type_partitions(event.type)
 67
 68        for partition_id, partition in partitions:
 69            if partition.order_by == common.OrderBy.TIMESTAMP:
 70                timestamp = event.timestamp
 71
 72            elif partition.order_by == common.OrderBy.SOURCE_TIMESTAMP:
 73                if event.source_timestamp is None:
 74                    continue
 75
 76                timestamp = event.source_timestamp
 77
 78            else:
 79                raise ValueError('unsupported order by')
 80
 81            self._changes[partition_id].append((timestamp, event))
 82
 83            yield common.TimeseriesEventRef(
 84                (partition_id, timestamp, event.id))
 85
 86    async def query(self,
 87                    params: common.QueryTimeseriesParams
 88                    ) -> common.QueryResult:
 89        subscription = (common.create_subscription(params.event_types)
 90                        if params.event_types is not None else None)
 91
 92        max_results = (params.max_results
 93                       if params.max_results is not None and
 94                       params.max_results < self._max_results
 95                       else self._max_results)
 96
 97        for partition_id, partition in self._partitions.items():
 98            if partition.order_by != params.order_by:
 99                continue
100
101            if (subscription and
102                    subscription.isdisjoint(partition.subscription)):
103                continue
104
105            return await self._query_partition(partition_id, params,
106                                               subscription, max_results)
107
108        return common.QueryResult(events=[],
109                                  more_follows=False)
110
111    def create_changes(self) -> Changes:
112        changes, self._changes = (self._changes,
113                                  collections.defaultdict(collections.deque))
114        return changes
115
116    def ext_write(self,
117                  txn: lmdb.Transaction,
118                  changes: Changes):
119        data = (((partition_id, timestamp, event.id), event)
120                for partition_id, partition_changes in changes.items()
121                for timestamp, event in partition_changes)
122        self._env.ext_write(txn, common.DbType.TIMESERIES_DATA, data)
123
124        counts = ((partition_id, len(partition_changes))
125                  for partition_id, partition_changes in changes.items())
126        _ext_inc_partition_count(self._env, txn, counts)
127
128    def ext_cleanup(self,
129                    txn: lmdb.Transaction,
130                    now: common.Timestamp,
131                    max_results: int | None = None,
132                    ) -> collections.deque[tuple[common.EventId,
133                                                 common.EventRef]]:
134        result = collections.deque()
135
136        for partition_id, partition in self._partitions.items():
137            if not partition.limit:
138                continue
139
140            partition_max_results = (max_results - len(result)
141                                     if max_results is not None else None)
142            if partition_max_results is not None and partition_max_results < 1:
143                break
144
145            result.extend(_ext_cleanup_partition(self._env, txn, now,
146                                                 partition_id, partition.limit,
147                                                 partition_max_results))
148
149        return result
150
151    async def _query_partition(self, partition_id, params, subscription,
152                               max_results):
153        events = collections.deque()
154        changes = self._changes
155
156        filter = _Filter(subscription=subscription,
157                         t_from=params.t_from,
158                         t_to=params.t_to,
159                         source_t_from=params.source_t_from,
160                         source_t_to=params.source_t_to,
161                         max_results=max_results + 1,
162                         last_event_id=params.last_event_id)
163
164        if params.order == common.Order.DESCENDING:
165            events.extend(_query_partition_changes(
166                changes[partition_id], params, filter))
167
168            if not filter.done:
169                events.extend(await self._env.execute(
170                    _ext_query_partition_events, self._env, self._conditions,
171                    partition_id, params, filter))
172
173        elif params.order == common.Order.ASCENDING:
174            events.extend(await self._env.execute(
175                _ext_query_partition_events, self._env, self._conditions,
176                partition_id, params, filter))
177
178            if not filter.done:
179                events.extend(_query_partition_changes(
180                    changes[partition_id], params, filter))
181
182        else:
183            raise ValueError('unsupported order')
184
185        more_follows = len(events) > max_results
186        while len(events) > max_results:
187            events.pop()
188
189        return common.QueryResult(events=events,
190                                  more_follows=more_follows)
63    def add(self,
64            event: common.Event
65            ) -> Iterable[common.EventRef]:
66        partitions = self._get_event_type_partitions(event.type)
67
68        for partition_id, partition in partitions:
69            if partition.order_by == common.OrderBy.TIMESTAMP:
70                timestamp = event.timestamp
71
72            elif partition.order_by == common.OrderBy.SOURCE_TIMESTAMP:
73                if event.source_timestamp is None:
74                    continue
75
76                timestamp = event.source_timestamp
77
78            else:
79                raise ValueError('unsupported order by')
80
81            self._changes[partition_id].append((timestamp, event))
82
83            yield common.TimeseriesEventRef(
84                (partition_id, timestamp, event.id))
async def query( self, params: hat.event.common.QueryTimeseriesParams) -> hat.event.common.QueryResult:
 86    async def query(self,
 87                    params: common.QueryTimeseriesParams
 88                    ) -> common.QueryResult:
 89        subscription = (common.create_subscription(params.event_types)
 90                        if params.event_types is not None else None)
 91
 92        max_results = (params.max_results
 93                       if params.max_results is not None and
 94                       params.max_results < self._max_results
 95                       else self._max_results)
 96
 97        for partition_id, partition in self._partitions.items():
 98            if partition.order_by != params.order_by:
 99                continue
100
101            if (subscription and
102                    subscription.isdisjoint(partition.subscription)):
103                continue
104
105            return await self._query_partition(partition_id, params,
106                                               subscription, max_results)
107
108        return common.QueryResult(events=[],
109                                  more_follows=False)
def create_changes( self) -> dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]]:
111    def create_changes(self) -> Changes:
112        changes, self._changes = (self._changes,
113                                  collections.defaultdict(collections.deque))
114        return changes
def ext_write( self, txn: Transaction, changes: dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]]):
116    def ext_write(self,
117                  txn: lmdb.Transaction,
118                  changes: Changes):
119        data = (((partition_id, timestamp, event.id), event)
120                for partition_id, partition_changes in changes.items()
121                for timestamp, event in partition_changes)
122        self._env.ext_write(txn, common.DbType.TIMESERIES_DATA, data)
123
124        counts = ((partition_id, len(partition_changes))
125                  for partition_id, partition_changes in changes.items())
126        _ext_inc_partition_count(self._env, txn, counts)
def ext_cleanup( self, txn: Transaction, now: hat.event.common.Timestamp, max_results: int | None = None) -> collections.deque[tuple[hat.event.common.EventId, hat.event.backends.lmdb.common.LatestEventRef | hat.event.backends.lmdb.common.TimeseriesEventRef]]:
128    def ext_cleanup(self,
129                    txn: lmdb.Transaction,
130                    now: common.Timestamp,
131                    max_results: int | None = None,
132                    ) -> collections.deque[tuple[common.EventId,
133                                                 common.EventRef]]:
134        result = collections.deque()
135
136        for partition_id, partition in self._partitions.items():
137            if not partition.limit:
138                continue
139
140            partition_max_results = (max_results - len(result)
141                                     if max_results is not None else None)
142            if partition_max_results is not None and partition_max_results < 1:
143                break
144
145            result.extend(_ext_cleanup_partition(self._env, txn, now,
146                                                 partition_id, partition.limit,
147                                                 partition_max_results))
148
149        return result