hat.event.backends.lmdb.backend

  1from collections.abc import Collection
  2from pathlib import Path
  3import asyncio
  4import collections
  5import contextlib
  6import logging
  7import typing
  8
  9from hat import aio
 10from hat import json
 11
 12from hat.event.backends.lmdb import common
 13from hat.event.backends.lmdb import environment
 14from hat.event.backends.lmdb import latestdb
 15from hat.event.backends.lmdb import refdb
 16from hat.event.backends.lmdb import systemdb
 17from hat.event.backends.lmdb import timeseriesdb
 18from hat.event.backends.lmdb.conditions import Conditions
 19
 20
 21mlog = logging.getLogger(__name__)
 22
 23cleanup_max_results = 1024
 24
 25flush_queue_size = 4096
 26
 27max_registered_count = 256 * 1024
 28
 29default_timeseries_max_results = 4096
 30
 31default_timeseries_event_type_cache_size = 256 * 1024
 32
 33version = '0.9'
 34
 35
 36class Databases(typing.NamedTuple):
 37    system: systemdb.SystemDb
 38    latest: latestdb.LatestDb
 39    timeseries: timeseriesdb.TimeseriesDb
 40    ref: refdb.RefDb
 41
 42
 43class Changes(typing.NamedTuple):
 44    system: systemdb.Changes
 45    latest: latestdb.Changes
 46    timeseries: timeseriesdb.Changes
 47    ref: refdb.Changes
 48
 49
 50async def create(conf: json.Data,
 51                 registered_events_cb: common.BackendRegisteredEventsCb | None,
 52                 flushed_events_cb: common.BackendFlushedEventsCb | None
 53                 ) -> 'LmdbBackend':
 54    backend = LmdbBackend()
 55    backend._registered_events_cb = registered_events_cb
 56    backend._flushed_events_cb = flushed_events_cb
 57    backend._conditions = Conditions(conf['conditions'])
 58    backend._loop = asyncio.get_running_loop()
 59    backend._flush_queue = aio.Queue(flush_queue_size)
 60    backend._registered_count = 0
 61    backend._registered_queue = collections.deque()
 62    backend._async_group = aio.Group()
 63
 64    backend._env = await environment.create(Path(conf['db_path']))
 65    backend.async_group.spawn(aio.call_on_done, backend._env.wait_closing(),
 66                              backend.close)
 67
 68    try:
 69        latest_subscription = common.create_subscription(
 70            tuple(i) for i in conf['latest']['subscriptions'])
 71
 72        timeseries_partitions = (
 73            timeseriesdb.Partition(
 74                order_by=common.OrderBy[i['order_by']],
 75                subscription=common.create_subscription(
 76                    tuple(event_type) for event_type in i['subscriptions']),
 77                limit=(
 78                    timeseriesdb.Limit(
 79                        min_entries=i['limit'].get('min_entries'),
 80                        max_entries=i['limit'].get('max_entries'),
 81                        duration=i['limit'].get('duration'),
 82                        size=i['limit'].get('size'))
 83                    if 'limit' in i else None))
 84            for i in conf['timeseries'])
 85
 86        timeseries_max_results = conf.get('timeseries_max_results',
 87                                          default_timeseries_max_results)
 88        timeseries_event_type_cache_size = conf.get(
 89            'timeseries_event_type_cache_size',
 90            default_timeseries_event_type_cache_size)
 91
 92        backend._dbs = await backend._env.execute(
 93            _ext_create_dbs, backend._env, conf['identifier'],
 94            backend._conditions, latest_subscription, timeseries_partitions,
 95            timeseries_max_results, timeseries_event_type_cache_size)
 96
 97        backend.async_group.spawn(backend._flush_loop, conf['flush_period'])
 98        backend.async_group.spawn(backend._cleanup_loop,
 99                                  conf['cleanup_period'])
100
101    except BaseException:
102        await aio.uncancellable(backend._env.async_close())
103        raise
104
105    return backend
106
107
108def _ext_create_dbs(env, identifier, conditions, latest_subscription,
109                    timeseries_partitions, timeseries_max_results,
110                    timeseries_event_type_cache_size):
111    with env.ext_begin(write=True) as txn:
112        system_db = systemdb.ext_create(env, txn, version, identifier)
113        latest_db = latestdb.ext_create(env, txn, conditions,
114                                        latest_subscription)
115        timeseries_db = timeseriesdb.ext_create(
116            env, txn, conditions, timeseries_partitions,
117            timeseries_max_results, timeseries_event_type_cache_size)
118        ref_db = refdb.RefDb(env)
119
120    return Databases(system=system_db,
121                     latest=latest_db,
122                     timeseries=timeseries_db,
123                     ref=ref_db)
124
125
126class LmdbBackend(common.Backend):
127
128    @property
129    def async_group(self) -> aio.Group:
130        return self._async_group
131
132    async def get_last_event_id(self,
133                                server_id: int
134                                ) -> common.EventId:
135        if not self.is_open:
136            raise common.BackendClosedError()
137
138        return self._dbs.system.get_last_event_id(server_id)
139
140    async def register(self,
141                       events: Collection[common.Event]
142                       ) -> Collection[common.Event] | None:
143        if not self.is_open:
144            raise common.BackendClosedError()
145
146        for event in events:
147            server_id = event.id.server
148
149            last_event_id = self._dbs.system.get_last_event_id(server_id)
150            last_timestamp = self._dbs.system.get_last_timestamp(server_id)
151
152            if last_event_id >= event.id:
153                mlog.warning("event registration skipped: invalid event id")
154                continue
155
156            if last_timestamp > event.timestamp:
157                mlog.warning("event registration skipped: invalid timestamp")
158                continue
159
160            if not self._conditions.matches(event):
161                mlog.warning("event registration skipped: invalid conditions")
162                continue
163
164            refs = collections.deque()
165
166            latest_result = self._dbs.latest.add(event)
167
168            if latest_result.added_ref:
169                refs.append(latest_result.added_ref)
170
171            if latest_result.removed_ref:
172                self._dbs.ref.remove(*latest_result.removed_ref)
173
174            refs.extend(self._dbs.timeseries.add(event))
175
176            if not refs:
177                continue
178
179            self._dbs.ref.add(event, refs)
180            self._dbs.system.set_last_event_id(event.id)
181            self._dbs.system.set_last_timestamp(server_id, event.timestamp)
182
183        self._registered_queue.append(events)
184        self._registered_count += len(events)
185
186        if self._registered_count > max_registered_count:
187            await self._flush_queue.put(None)
188
189        if self._registered_events_cb:
190            await aio.call(self._registered_events_cb, events)
191
192        return events
193
194    async def query(self,
195                    params: common.QueryParams
196                    ) -> common.QueryResult:
197        if not self.is_open:
198            raise common.BackendClosedError()
199
200        if isinstance(params, common.QueryLatestParams):
201            return self._dbs.latest.query(params)
202
203        if isinstance(params, common.QueryTimeseriesParams):
204            return await self._dbs.timeseries.query(params)
205
206        if isinstance(params, common.QueryServerParams):
207            return await self._dbs.ref.query(params)
208
209        raise ValueError('unsupported params type')
210
211    async def flush(self):
212        try:
213            future = self._loop.create_future()
214            await self._flush_queue.put(future)
215            await future
216
217        except aio.QueueClosedError:
218            raise common.BackendClosedError()
219
220    async def _flush_loop(self, flush_period):
221        futures = collections.deque()
222
223        async def cleanup():
224            with contextlib.suppress(Exception):
225                await self._flush()
226
227            await self._env.async_close()
228
229        try:
230            while True:
231                try:
232                    future = await aio.wait_for(self._flush_queue.get(),
233                                                flush_period)
234                    futures.append(future)
235
236                except asyncio.TimeoutError:
237                    pass
238
239                except aio.CancelledWithResultError as e:
240                    if e.result:
241                        futures.append(e.result)
242
243                    raise
244
245                while not self._flush_queue.empty():
246                    futures.append(self._flush_queue.get_nowait())
247
248                await aio.uncancellable(self._flush())
249
250                while futures:
251                    future = futures.popleft()
252                    if future and not future.done():
253                        future.set_result(None)
254
255        except Exception as e:
256            mlog.error('backend flush error: %s', e, exc_info=e)
257
258        finally:
259            self.close()
260            self._flush_queue.close()
261
262            while not self._flush_queue.empty():
263                futures.append(self._flush_queue.get_nowait())
264
265            for future in futures:
266                if future and not future.done():
267                    future.set_exception(common.BackendClosedError())
268
269            await aio.uncancellable(cleanup())
270
271    async def _cleanup_loop(self, cleanup_period):
272        try:
273            while True:
274                await asyncio.sleep(0)
275
276                repeat = await self._env.execute(_ext_cleanup, self._env,
277                                                 self._dbs, common.now())
278                if repeat:
279                    continue
280
281                await asyncio.sleep(cleanup_period)
282
283        except Exception as e:
284            mlog.error('backend cleanup error: %s', e, exc_info=e)
285
286        finally:
287            self.close()
288
289    async def _flush(self):
290        if not self._env.is_open:
291            return
292
293        self._registered_count = 0
294        registered_queue, self._registered_queue = (self._registered_queue,
295                                                    collections.deque())
296
297        changes = Changes(system=self._dbs.system.create_changes(),
298                          latest=self._dbs.latest.create_changes(),
299                          timeseries=self._dbs.timeseries.create_changes(),
300                          ref=self._dbs.ref.create_changes())
301
302        # TODO lock period between create_changes and locking executor
303        #      (timeseries and ref must write changes before new queries are
304        #      allowed)
305
306        await self._env.execute(_ext_flush, self._env, self._dbs, changes)
307
308        if not self._flushed_events_cb:
309            return
310
311        while registered_queue:
312            events = registered_queue.popleft()
313            await aio.call(self._flushed_events_cb, events)
314
315
316def _ext_flush(env, dbs, changes):
317    with env.ext_begin(write=True) as txn:
318        dbs.system.ext_write(txn, changes.system)
319        dbs.latest.ext_write(txn, changes.latest)
320        dbs.timeseries.ext_write(txn, changes.timeseries)
321        dbs.ref.ext_write(txn, changes.ref)
322
323
324def _ext_cleanup(env, dbs, now):
325    with env.ext_begin(write=True) as txn:
326        result = dbs.timeseries.ext_cleanup(txn, now, cleanup_max_results)
327        if not result:
328            return False
329
330        dbs.ref.ext_cleanup(txn, result)
331
332    return len(result) >= cleanup_max_results
mlog = <Logger hat.event.backends.lmdb.backend (WARNING)>
cleanup_max_results = 1024
flush_queue_size = 4096
max_registered_count = 262144
default_timeseries_max_results = 4096
default_timeseries_event_type_cache_size = 262144
version = '0.9'
class Databases(typing.NamedTuple):
37class Databases(typing.NamedTuple):
38    system: systemdb.SystemDb
39    latest: latestdb.LatestDb
40    timeseries: timeseriesdb.TimeseriesDb
41    ref: refdb.RefDb

Databases(system, latest, timeseries, ref)

Create new instance of Databases(system, latest, timeseries, ref)

Alias for field number 0

Alias for field number 1

Alias for field number 2

Alias for field number 3

class Changes(typing.NamedTuple):
44class Changes(typing.NamedTuple):
45    system: systemdb.Changes
46    latest: latestdb.Changes
47    timeseries: timeseriesdb.Changes
48    ref: refdb.Changes

Changes(system, latest, timeseries, ref)

Create new instance of Changes(system, latest, timeseries, ref)

Alias for field number 0

Alias for field number 1

timeseries: dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]]

Alias for field number 2

Alias for field number 3

async def create( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], registered_events_cb: Optional[Callable[[Collection[hat.event.common.Event]], None | Awaitable[None]]], flushed_events_cb: Optional[Callable[[Collection[hat.event.common.Event]], None | Awaitable[None]]]) -> LmdbBackend:
 51async def create(conf: json.Data,
 52                 registered_events_cb: common.BackendRegisteredEventsCb | None,
 53                 flushed_events_cb: common.BackendFlushedEventsCb | None
 54                 ) -> 'LmdbBackend':
 55    backend = LmdbBackend()
 56    backend._registered_events_cb = registered_events_cb
 57    backend._flushed_events_cb = flushed_events_cb
 58    backend._conditions = Conditions(conf['conditions'])
 59    backend._loop = asyncio.get_running_loop()
 60    backend._flush_queue = aio.Queue(flush_queue_size)
 61    backend._registered_count = 0
 62    backend._registered_queue = collections.deque()
 63    backend._async_group = aio.Group()
 64
 65    backend._env = await environment.create(Path(conf['db_path']))
 66    backend.async_group.spawn(aio.call_on_done, backend._env.wait_closing(),
 67                              backend.close)
 68
 69    try:
 70        latest_subscription = common.create_subscription(
 71            tuple(i) for i in conf['latest']['subscriptions'])
 72
 73        timeseries_partitions = (
 74            timeseriesdb.Partition(
 75                order_by=common.OrderBy[i['order_by']],
 76                subscription=common.create_subscription(
 77                    tuple(event_type) for event_type in i['subscriptions']),
 78                limit=(
 79                    timeseriesdb.Limit(
 80                        min_entries=i['limit'].get('min_entries'),
 81                        max_entries=i['limit'].get('max_entries'),
 82                        duration=i['limit'].get('duration'),
 83                        size=i['limit'].get('size'))
 84                    if 'limit' in i else None))
 85            for i in conf['timeseries'])
 86
 87        timeseries_max_results = conf.get('timeseries_max_results',
 88                                          default_timeseries_max_results)
 89        timeseries_event_type_cache_size = conf.get(
 90            'timeseries_event_type_cache_size',
 91            default_timeseries_event_type_cache_size)
 92
 93        backend._dbs = await backend._env.execute(
 94            _ext_create_dbs, backend._env, conf['identifier'],
 95            backend._conditions, latest_subscription, timeseries_partitions,
 96            timeseries_max_results, timeseries_event_type_cache_size)
 97
 98        backend.async_group.spawn(backend._flush_loop, conf['flush_period'])
 99        backend.async_group.spawn(backend._cleanup_loop,
100                                  conf['cleanup_period'])
101
102    except BaseException:
103        await aio.uncancellable(backend._env.async_close())
104        raise
105
106    return backend
class LmdbBackend(hat.event.common.backend.Backend):
127class LmdbBackend(common.Backend):
128
129    @property
130    def async_group(self) -> aio.Group:
131        return self._async_group
132
133    async def get_last_event_id(self,
134                                server_id: int
135                                ) -> common.EventId:
136        if not self.is_open:
137            raise common.BackendClosedError()
138
139        return self._dbs.system.get_last_event_id(server_id)
140
141    async def register(self,
142                       events: Collection[common.Event]
143                       ) -> Collection[common.Event] | None:
144        if not self.is_open:
145            raise common.BackendClosedError()
146
147        for event in events:
148            server_id = event.id.server
149
150            last_event_id = self._dbs.system.get_last_event_id(server_id)
151            last_timestamp = self._dbs.system.get_last_timestamp(server_id)
152
153            if last_event_id >= event.id:
154                mlog.warning("event registration skipped: invalid event id")
155                continue
156
157            if last_timestamp > event.timestamp:
158                mlog.warning("event registration skipped: invalid timestamp")
159                continue
160
161            if not self._conditions.matches(event):
162                mlog.warning("event registration skipped: invalid conditions")
163                continue
164
165            refs = collections.deque()
166
167            latest_result = self._dbs.latest.add(event)
168
169            if latest_result.added_ref:
170                refs.append(latest_result.added_ref)
171
172            if latest_result.removed_ref:
173                self._dbs.ref.remove(*latest_result.removed_ref)
174
175            refs.extend(self._dbs.timeseries.add(event))
176
177            if not refs:
178                continue
179
180            self._dbs.ref.add(event, refs)
181            self._dbs.system.set_last_event_id(event.id)
182            self._dbs.system.set_last_timestamp(server_id, event.timestamp)
183
184        self._registered_queue.append(events)
185        self._registered_count += len(events)
186
187        if self._registered_count > max_registered_count:
188            await self._flush_queue.put(None)
189
190        if self._registered_events_cb:
191            await aio.call(self._registered_events_cb, events)
192
193        return events
194
195    async def query(self,
196                    params: common.QueryParams
197                    ) -> common.QueryResult:
198        if not self.is_open:
199            raise common.BackendClosedError()
200
201        if isinstance(params, common.QueryLatestParams):
202            return self._dbs.latest.query(params)
203
204        if isinstance(params, common.QueryTimeseriesParams):
205            return await self._dbs.timeseries.query(params)
206
207        if isinstance(params, common.QueryServerParams):
208            return await self._dbs.ref.query(params)
209
210        raise ValueError('unsupported params type')
211
212    async def flush(self):
213        try:
214            future = self._loop.create_future()
215            await self._flush_queue.put(future)
216            await future
217
218        except aio.QueueClosedError:
219            raise common.BackendClosedError()
220
221    async def _flush_loop(self, flush_period):
222        futures = collections.deque()
223
224        async def cleanup():
225            with contextlib.suppress(Exception):
226                await self._flush()
227
228            await self._env.async_close()
229
230        try:
231            while True:
232                try:
233                    future = await aio.wait_for(self._flush_queue.get(),
234                                                flush_period)
235                    futures.append(future)
236
237                except asyncio.TimeoutError:
238                    pass
239
240                except aio.CancelledWithResultError as e:
241                    if e.result:
242                        futures.append(e.result)
243
244                    raise
245
246                while not self._flush_queue.empty():
247                    futures.append(self._flush_queue.get_nowait())
248
249                await aio.uncancellable(self._flush())
250
251                while futures:
252                    future = futures.popleft()
253                    if future and not future.done():
254                        future.set_result(None)
255
256        except Exception as e:
257            mlog.error('backend flush error: %s', e, exc_info=e)
258
259        finally:
260            self.close()
261            self._flush_queue.close()
262
263            while not self._flush_queue.empty():
264                futures.append(self._flush_queue.get_nowait())
265
266            for future in futures:
267                if future and not future.done():
268                    future.set_exception(common.BackendClosedError())
269
270            await aio.uncancellable(cleanup())
271
272    async def _cleanup_loop(self, cleanup_period):
273        try:
274            while True:
275                await asyncio.sleep(0)
276
277                repeat = await self._env.execute(_ext_cleanup, self._env,
278                                                 self._dbs, common.now())
279                if repeat:
280                    continue
281
282                await asyncio.sleep(cleanup_period)
283
284        except Exception as e:
285            mlog.error('backend cleanup error: %s', e, exc_info=e)
286
287        finally:
288            self.close()
289
290    async def _flush(self):
291        if not self._env.is_open:
292            return
293
294        self._registered_count = 0
295        registered_queue, self._registered_queue = (self._registered_queue,
296                                                    collections.deque())
297
298        changes = Changes(system=self._dbs.system.create_changes(),
299                          latest=self._dbs.latest.create_changes(),
300                          timeseries=self._dbs.timeseries.create_changes(),
301                          ref=self._dbs.ref.create_changes())
302
303        # TODO lock period between create_changes and locking executor
304        #      (timeseries and ref must write changes before new queries are
305        #      allowed)
306
307        await self._env.execute(_ext_flush, self._env, self._dbs, changes)
308
309        if not self._flushed_events_cb:
310            return
311
312        while registered_queue:
313            events = registered_queue.popleft()
314            await aio.call(self._flushed_events_cb, events)

Backend ABC

async_group: hat.aio.group.Group
129    @property
130    def async_group(self) -> aio.Group:
131        return self._async_group

Group controlling resource's lifetime.

async def get_last_event_id(self, server_id: int) -> hat.event.common.EventId:
133    async def get_last_event_id(self,
134                                server_id: int
135                                ) -> common.EventId:
136        if not self.is_open:
137            raise common.BackendClosedError()
138
139        return self._dbs.system.get_last_event_id(server_id)

Get last registered event id associated with server id

async def register( self, events: Collection[hat.event.common.Event]) -> Collection[hat.event.common.Event] | None:
141    async def register(self,
142                       events: Collection[common.Event]
143                       ) -> Collection[common.Event] | None:
144        if not self.is_open:
145            raise common.BackendClosedError()
146
147        for event in events:
148            server_id = event.id.server
149
150            last_event_id = self._dbs.system.get_last_event_id(server_id)
151            last_timestamp = self._dbs.system.get_last_timestamp(server_id)
152
153            if last_event_id >= event.id:
154                mlog.warning("event registration skipped: invalid event id")
155                continue
156
157            if last_timestamp > event.timestamp:
158                mlog.warning("event registration skipped: invalid timestamp")
159                continue
160
161            if not self._conditions.matches(event):
162                mlog.warning("event registration skipped: invalid conditions")
163                continue
164
165            refs = collections.deque()
166
167            latest_result = self._dbs.latest.add(event)
168
169            if latest_result.added_ref:
170                refs.append(latest_result.added_ref)
171
172            if latest_result.removed_ref:
173                self._dbs.ref.remove(*latest_result.removed_ref)
174
175            refs.extend(self._dbs.timeseries.add(event))
176
177            if not refs:
178                continue
179
180            self._dbs.ref.add(event, refs)
181            self._dbs.system.set_last_event_id(event.id)
182            self._dbs.system.set_last_timestamp(server_id, event.timestamp)
183
184        self._registered_queue.append(events)
185        self._registered_count += len(events)
186
187        if self._registered_count > max_registered_count:
188            await self._flush_queue.put(None)
189
190        if self._registered_events_cb:
191            await aio.call(self._registered_events_cb, events)
192
193        return events

Register events

195    async def query(self,
196                    params: common.QueryParams
197                    ) -> common.QueryResult:
198        if not self.is_open:
199            raise common.BackendClosedError()
200
201        if isinstance(params, common.QueryLatestParams):
202            return self._dbs.latest.query(params)
203
204        if isinstance(params, common.QueryTimeseriesParams):
205            return await self._dbs.timeseries.query(params)
206
207        if isinstance(params, common.QueryServerParams):
208            return await self._dbs.ref.query(params)
209
210        raise ValueError('unsupported params type')

Query events

async def flush(self):
212    async def flush(self):
213        try:
214            future = self._loop.create_future()
215            await self._flush_queue.put(future)
216            await future
217
218        except aio.QueueClosedError:
219            raise common.BackendClosedError()

Flush internal buffers and permanently persist events