hat.event.backends.lmdb.backend

  1from collections.abc import Collection
  2from pathlib import Path
  3import asyncio
  4import collections
  5import contextlib
  6import logging
  7import typing
  8
  9from hat import aio
 10from hat import json
 11
 12from hat.event.backends.lmdb import common
 13from hat.event.backends.lmdb import environment
 14from hat.event.backends.lmdb import latestdb
 15from hat.event.backends.lmdb import refdb
 16from hat.event.backends.lmdb import systemdb
 17from hat.event.backends.lmdb import timeseriesdb
 18from hat.event.backends.lmdb.conditions import Conditions
 19
 20
 21mlog = logging.getLogger(__name__)
 22
 23cleanup_max_results = 1024
 24
 25flush_queue_size = 4096
 26
 27max_registered_count = 256 * 1024
 28
 29default_timeseries_max_results = 4096
 30
 31default_timeseries_event_type_cache_size = 256 * 1024
 32
 33version = '0.9'
 34
 35
 36class Databases(typing.NamedTuple):
 37    system: systemdb.SystemDb
 38    latest: latestdb.LatestDb
 39    timeseries: timeseriesdb.TimeseriesDb
 40    ref: refdb.RefDb
 41
 42
 43class Changes(typing.NamedTuple):
 44    system: systemdb.Changes
 45    latest: latestdb.Changes
 46    timeseries: timeseriesdb.Changes
 47    ref: refdb.Changes
 48
 49
 50async def create(conf: json.Data,
 51                 registered_events_cb: common.BackendRegisteredEventsCb | None,
 52                 flushed_events_cb: common.BackendFlushedEventsCb | None
 53                 ) -> 'LmdbBackend':
 54    backend = LmdbBackend()
 55    backend._registered_events_cb = registered_events_cb
 56    backend._flushed_events_cb = flushed_events_cb
 57    backend._conditions = Conditions(conf['conditions'])
 58    backend._loop = asyncio.get_running_loop()
 59    backend._flush_queue = aio.Queue(flush_queue_size)
 60    backend._registered_count = 0
 61    backend._registered_queue = collections.deque()
 62    backend._async_group = aio.Group()
 63
 64    backend._env = await environment.create(Path(conf['db_path']))
 65    backend.async_group.spawn(aio.call_on_done, backend._env.wait_closing(),
 66                              backend.close)
 67
 68    try:
 69        latest_subscription = common.create_subscription(
 70            tuple(i) for i in conf['latest']['subscriptions'])
 71
 72        timeseries_partitions = (
 73            timeseriesdb.Partition(
 74                order_by=common.OrderBy[i['order_by']],
 75                subscription=common.create_subscription(
 76                    tuple(event_type) for event_type in i['subscriptions']),
 77                limit=(
 78                    timeseriesdb.Limit(
 79                        min_entries=i['limit'].get('min_entries'),
 80                        max_entries=i['limit'].get('max_entries'),
 81                        duration=i['limit'].get('duration'),
 82                        size=i['limit'].get('size'))
 83                    if 'limit' in i else None))
 84            for i in conf['timeseries'])
 85
 86        timeseries_max_results = conf.get('timeseries_max_results',
 87                                          default_timeseries_max_results)
 88        timeseries_event_type_cache_size = conf.get(
 89            'timeseries_event_type_cache_size',
 90            default_timeseries_event_type_cache_size)
 91
 92        backend._dbs = await backend._env.execute(
 93            _ext_create_dbs, backend._env, conf['identifier'],
 94            backend._conditions, latest_subscription, timeseries_partitions,
 95            timeseries_max_results, timeseries_event_type_cache_size)
 96
 97        backend.async_group.spawn(backend._flush_loop, conf['flush_period'])
 98        backend.async_group.spawn(backend._cleanup_loop,
 99                                  conf['cleanup_period'])
100
101    except BaseException:
102        await aio.uncancellable(backend._env.async_close())
103        raise
104
105    return backend
106
107
108def _ext_create_dbs(env, identifier, conditions, latest_subscription,
109                    timeseries_partitions, timeseries_max_results,
110                    timeseries_event_type_cache_size):
111    with env.ext_begin(write=True) as txn:
112        system_db = systemdb.ext_create(env, txn, version, identifier)
113        latest_db = latestdb.ext_create(env, txn, conditions,
114                                        latest_subscription)
115        timeseries_db = timeseriesdb.ext_create(
116            env, txn, conditions, timeseries_partitions,
117            timeseries_max_results, timeseries_event_type_cache_size)
118        ref_db = refdb.RefDb(env)
119
120    return Databases(system=system_db,
121                     latest=latest_db,
122                     timeseries=timeseries_db,
123                     ref=ref_db)
124
125
126class LmdbBackend(common.Backend):
127
128    @property
129    def async_group(self) -> aio.Group:
130        return self._async_group
131
132    async def get_last_event_id(self,
133                                server_id: int
134                                ) -> common.EventId:
135        if not self.is_open:
136            raise common.BackendClosedError()
137
138        return self._dbs.system.get_last_event_id(server_id)
139
140    async def register(self,
141                       events: Collection[common.Event]
142                       ) -> Collection[common.Event] | None:
143        if not self.is_open:
144            raise common.BackendClosedError()
145
146        for event in events:
147            server_id = event.id.server
148
149            last_event_id = self._dbs.system.get_last_event_id(server_id)
150            last_timestamp = self._dbs.system.get_last_timestamp(server_id)
151
152            if last_event_id >= event.id:
153                mlog.warning("event registration skipped: invalid event id")
154                continue
155
156            if last_timestamp > event.timestamp:
157                mlog.warning("event registration skipped: invalid timestamp")
158                continue
159
160            if not self._conditions.matches(event):
161                mlog.warning(
162                    "event %s registration skipped: invalid conditions",
163                    event.type)
164                continue
165
166            refs = collections.deque()
167
168            latest_result = self._dbs.latest.add(event)
169
170            if latest_result.added_ref:
171                refs.append(latest_result.added_ref)
172
173            if latest_result.removed_ref:
174                self._dbs.ref.remove(*latest_result.removed_ref)
175
176            refs.extend(self._dbs.timeseries.add(event))
177
178            if not refs:
179                continue
180
181            self._dbs.ref.add(event, refs)
182            self._dbs.system.set_last_event_id(event.id)
183            self._dbs.system.set_last_timestamp(server_id, event.timestamp)
184
185        self._registered_queue.append(events)
186        self._registered_count += len(events)
187
188        if self._registered_count > max_registered_count:
189            await self._flush_queue.put(None)
190
191        if self._registered_events_cb:
192            await aio.call(self._registered_events_cb, events)
193
194        return events
195
196    async def query(self,
197                    params: common.QueryParams
198                    ) -> common.QueryResult:
199        if not self.is_open:
200            raise common.BackendClosedError()
201
202        if isinstance(params, common.QueryLatestParams):
203            return self._dbs.latest.query(params)
204
205        if isinstance(params, common.QueryTimeseriesParams):
206            return await self._dbs.timeseries.query(params)
207
208        if isinstance(params, common.QueryServerParams):
209            return await self._dbs.ref.query(params)
210
211        raise ValueError('unsupported params type')
212
213    async def flush(self):
214        try:
215            future = self._loop.create_future()
216            await self._flush_queue.put(future)
217            await future
218
219        except aio.QueueClosedError:
220            raise common.BackendClosedError()
221
222    async def _flush_loop(self, flush_period):
223        futures = collections.deque()
224
225        async def cleanup():
226            with contextlib.suppress(Exception):
227                await self._flush()
228
229            await self._env.async_close()
230
231        try:
232            while True:
233                try:
234                    future = await aio.wait_for(self._flush_queue.get(),
235                                                flush_period)
236                    futures.append(future)
237
238                except asyncio.TimeoutError:
239                    pass
240
241                except aio.CancelledWithResultError as e:
242                    if e.result:
243                        futures.append(e.result)
244
245                    raise
246
247                while not self._flush_queue.empty():
248                    futures.append(self._flush_queue.get_nowait())
249
250                await aio.uncancellable(self._flush())
251
252                while futures:
253                    future = futures.popleft()
254                    if future and not future.done():
255                        future.set_result(None)
256
257        except Exception as e:
258            mlog.error('backend flush error: %s', e, exc_info=e)
259
260        finally:
261            self.close()
262            self._flush_queue.close()
263
264            while not self._flush_queue.empty():
265                futures.append(self._flush_queue.get_nowait())
266
267            for future in futures:
268                if future and not future.done():
269                    future.set_exception(common.BackendClosedError())
270
271            await aio.uncancellable(cleanup())
272
273    async def _cleanup_loop(self, cleanup_period):
274        try:
275            while True:
276                await asyncio.sleep(0)
277
278                repeat = await self._env.execute(_ext_cleanup, self._env,
279                                                 self._dbs, common.now())
280                if repeat:
281                    continue
282
283                await asyncio.sleep(cleanup_period)
284
285        except Exception as e:
286            mlog.error('backend cleanup error: %s', e, exc_info=e)
287
288        finally:
289            self.close()
290
291    async def _flush(self):
292        if not self._env.is_open:
293            return
294
295        self._registered_count = 0
296        registered_queue, self._registered_queue = (self._registered_queue,
297                                                    collections.deque())
298
299        changes = Changes(system=self._dbs.system.create_changes(),
300                          latest=self._dbs.latest.create_changes(),
301                          timeseries=self._dbs.timeseries.create_changes(),
302                          ref=self._dbs.ref.create_changes())
303
304        # TODO lock period between create_changes and locking executor
305        #      (timeseries and ref must write changes before new queries are
306        #      allowed)
307
308        await self._env.execute(_ext_flush, self._env, self._dbs, changes)
309
310        if not self._flushed_events_cb:
311            return
312
313        while registered_queue:
314            events = registered_queue.popleft()
315            await aio.call(self._flushed_events_cb, events)
316
317
318def _ext_flush(env, dbs, changes):
319    with env.ext_begin(write=True) as txn:
320        dbs.system.ext_write(txn, changes.system)
321        dbs.latest.ext_write(txn, changes.latest)
322        dbs.timeseries.ext_write(txn, changes.timeseries)
323        dbs.ref.ext_write(txn, changes.ref)
324
325
326def _ext_cleanup(env, dbs, now):
327    with env.ext_begin(write=True) as txn:
328        result = dbs.timeseries.ext_cleanup(txn, now, cleanup_max_results)
329        if not result:
330            return False
331
332        dbs.ref.ext_cleanup(txn, result)
333
334    return len(result) >= cleanup_max_results
mlog = <Logger hat.event.backends.lmdb.backend (WARNING)>
cleanup_max_results = 1024
flush_queue_size = 4096
max_registered_count = 262144
default_timeseries_max_results = 4096
default_timeseries_event_type_cache_size = 262144
version = '0.9'
class Databases(typing.NamedTuple):
37class Databases(typing.NamedTuple):
38    system: systemdb.SystemDb
39    latest: latestdb.LatestDb
40    timeseries: timeseriesdb.TimeseriesDb
41    ref: refdb.RefDb

Databases(system, latest, timeseries, ref)

Create new instance of Databases(system, latest, timeseries, ref)

Alias for field number 0

Alias for field number 1

Alias for field number 2

Alias for field number 3

class Changes(typing.NamedTuple):
44class Changes(typing.NamedTuple):
45    system: systemdb.Changes
46    latest: latestdb.Changes
47    timeseries: timeseriesdb.Changes
48    ref: refdb.Changes

Changes(system, latest, timeseries, ref)

Create new instance of Changes(system, latest, timeseries, ref)

Alias for field number 0

Alias for field number 1

timeseries: dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]]

Alias for field number 2

Alias for field number 3

async def create( conf: None | bool | int | float | str | List[ForwardRef('Data')] | Dict[str, ForwardRef('Data')], registered_events_cb: Callable[[Collection[hat.event.common.Event]], None | Awaitable[None]] | None, flushed_events_cb: Callable[[Collection[hat.event.common.Event]], None | Awaitable[None]] | None) -> LmdbBackend:
 51async def create(conf: json.Data,
 52                 registered_events_cb: common.BackendRegisteredEventsCb | None,
 53                 flushed_events_cb: common.BackendFlushedEventsCb | None
 54                 ) -> 'LmdbBackend':
 55    backend = LmdbBackend()
 56    backend._registered_events_cb = registered_events_cb
 57    backend._flushed_events_cb = flushed_events_cb
 58    backend._conditions = Conditions(conf['conditions'])
 59    backend._loop = asyncio.get_running_loop()
 60    backend._flush_queue = aio.Queue(flush_queue_size)
 61    backend._registered_count = 0
 62    backend._registered_queue = collections.deque()
 63    backend._async_group = aio.Group()
 64
 65    backend._env = await environment.create(Path(conf['db_path']))
 66    backend.async_group.spawn(aio.call_on_done, backend._env.wait_closing(),
 67                              backend.close)
 68
 69    try:
 70        latest_subscription = common.create_subscription(
 71            tuple(i) for i in conf['latest']['subscriptions'])
 72
 73        timeseries_partitions = (
 74            timeseriesdb.Partition(
 75                order_by=common.OrderBy[i['order_by']],
 76                subscription=common.create_subscription(
 77                    tuple(event_type) for event_type in i['subscriptions']),
 78                limit=(
 79                    timeseriesdb.Limit(
 80                        min_entries=i['limit'].get('min_entries'),
 81                        max_entries=i['limit'].get('max_entries'),
 82                        duration=i['limit'].get('duration'),
 83                        size=i['limit'].get('size'))
 84                    if 'limit' in i else None))
 85            for i in conf['timeseries'])
 86
 87        timeseries_max_results = conf.get('timeseries_max_results',
 88                                          default_timeseries_max_results)
 89        timeseries_event_type_cache_size = conf.get(
 90            'timeseries_event_type_cache_size',
 91            default_timeseries_event_type_cache_size)
 92
 93        backend._dbs = await backend._env.execute(
 94            _ext_create_dbs, backend._env, conf['identifier'],
 95            backend._conditions, latest_subscription, timeseries_partitions,
 96            timeseries_max_results, timeseries_event_type_cache_size)
 97
 98        backend.async_group.spawn(backend._flush_loop, conf['flush_period'])
 99        backend.async_group.spawn(backend._cleanup_loop,
100                                  conf['cleanup_period'])
101
102    except BaseException:
103        await aio.uncancellable(backend._env.async_close())
104        raise
105
106    return backend
class LmdbBackend(hat.event.common.backend.Backend):
127class LmdbBackend(common.Backend):
128
129    @property
130    def async_group(self) -> aio.Group:
131        return self._async_group
132
133    async def get_last_event_id(self,
134                                server_id: int
135                                ) -> common.EventId:
136        if not self.is_open:
137            raise common.BackendClosedError()
138
139        return self._dbs.system.get_last_event_id(server_id)
140
141    async def register(self,
142                       events: Collection[common.Event]
143                       ) -> Collection[common.Event] | None:
144        if not self.is_open:
145            raise common.BackendClosedError()
146
147        for event in events:
148            server_id = event.id.server
149
150            last_event_id = self._dbs.system.get_last_event_id(server_id)
151            last_timestamp = self._dbs.system.get_last_timestamp(server_id)
152
153            if last_event_id >= event.id:
154                mlog.warning("event registration skipped: invalid event id")
155                continue
156
157            if last_timestamp > event.timestamp:
158                mlog.warning("event registration skipped: invalid timestamp")
159                continue
160
161            if not self._conditions.matches(event):
162                mlog.warning(
163                    "event %s registration skipped: invalid conditions",
164                    event.type)
165                continue
166
167            refs = collections.deque()
168
169            latest_result = self._dbs.latest.add(event)
170
171            if latest_result.added_ref:
172                refs.append(latest_result.added_ref)
173
174            if latest_result.removed_ref:
175                self._dbs.ref.remove(*latest_result.removed_ref)
176
177            refs.extend(self._dbs.timeseries.add(event))
178
179            if not refs:
180                continue
181
182            self._dbs.ref.add(event, refs)
183            self._dbs.system.set_last_event_id(event.id)
184            self._dbs.system.set_last_timestamp(server_id, event.timestamp)
185
186        self._registered_queue.append(events)
187        self._registered_count += len(events)
188
189        if self._registered_count > max_registered_count:
190            await self._flush_queue.put(None)
191
192        if self._registered_events_cb:
193            await aio.call(self._registered_events_cb, events)
194
195        return events
196
197    async def query(self,
198                    params: common.QueryParams
199                    ) -> common.QueryResult:
200        if not self.is_open:
201            raise common.BackendClosedError()
202
203        if isinstance(params, common.QueryLatestParams):
204            return self._dbs.latest.query(params)
205
206        if isinstance(params, common.QueryTimeseriesParams):
207            return await self._dbs.timeseries.query(params)
208
209        if isinstance(params, common.QueryServerParams):
210            return await self._dbs.ref.query(params)
211
212        raise ValueError('unsupported params type')
213
214    async def flush(self):
215        try:
216            future = self._loop.create_future()
217            await self._flush_queue.put(future)
218            await future
219
220        except aio.QueueClosedError:
221            raise common.BackendClosedError()
222
223    async def _flush_loop(self, flush_period):
224        futures = collections.deque()
225
226        async def cleanup():
227            with contextlib.suppress(Exception):
228                await self._flush()
229
230            await self._env.async_close()
231
232        try:
233            while True:
234                try:
235                    future = await aio.wait_for(self._flush_queue.get(),
236                                                flush_period)
237                    futures.append(future)
238
239                except asyncio.TimeoutError:
240                    pass
241
242                except aio.CancelledWithResultError as e:
243                    if e.result:
244                        futures.append(e.result)
245
246                    raise
247
248                while not self._flush_queue.empty():
249                    futures.append(self._flush_queue.get_nowait())
250
251                await aio.uncancellable(self._flush())
252
253                while futures:
254                    future = futures.popleft()
255                    if future and not future.done():
256                        future.set_result(None)
257
258        except Exception as e:
259            mlog.error('backend flush error: %s', e, exc_info=e)
260
261        finally:
262            self.close()
263            self._flush_queue.close()
264
265            while not self._flush_queue.empty():
266                futures.append(self._flush_queue.get_nowait())
267
268            for future in futures:
269                if future and not future.done():
270                    future.set_exception(common.BackendClosedError())
271
272            await aio.uncancellable(cleanup())
273
274    async def _cleanup_loop(self, cleanup_period):
275        try:
276            while True:
277                await asyncio.sleep(0)
278
279                repeat = await self._env.execute(_ext_cleanup, self._env,
280                                                 self._dbs, common.now())
281                if repeat:
282                    continue
283
284                await asyncio.sleep(cleanup_period)
285
286        except Exception as e:
287            mlog.error('backend cleanup error: %s', e, exc_info=e)
288
289        finally:
290            self.close()
291
292    async def _flush(self):
293        if not self._env.is_open:
294            return
295
296        self._registered_count = 0
297        registered_queue, self._registered_queue = (self._registered_queue,
298                                                    collections.deque())
299
300        changes = Changes(system=self._dbs.system.create_changes(),
301                          latest=self._dbs.latest.create_changes(),
302                          timeseries=self._dbs.timeseries.create_changes(),
303                          ref=self._dbs.ref.create_changes())
304
305        # TODO lock period between create_changes and locking executor
306        #      (timeseries and ref must write changes before new queries are
307        #      allowed)
308
309        await self._env.execute(_ext_flush, self._env, self._dbs, changes)
310
311        if not self._flushed_events_cb:
312            return
313
314        while registered_queue:
315            events = registered_queue.popleft()
316            await aio.call(self._flushed_events_cb, events)

Backend ABC

async_group: hat.aio.group.Group
129    @property
130    def async_group(self) -> aio.Group:
131        return self._async_group

Group controlling resource's lifetime.

async def get_last_event_id(self, server_id: int) -> hat.event.common.EventId:
133    async def get_last_event_id(self,
134                                server_id: int
135                                ) -> common.EventId:
136        if not self.is_open:
137            raise common.BackendClosedError()
138
139        return self._dbs.system.get_last_event_id(server_id)

Get last registered event id associated with server id

async def register( self, events: Collection[hat.event.common.Event]) -> Collection[hat.event.common.Event] | None:
141    async def register(self,
142                       events: Collection[common.Event]
143                       ) -> Collection[common.Event] | None:
144        if not self.is_open:
145            raise common.BackendClosedError()
146
147        for event in events:
148            server_id = event.id.server
149
150            last_event_id = self._dbs.system.get_last_event_id(server_id)
151            last_timestamp = self._dbs.system.get_last_timestamp(server_id)
152
153            if last_event_id >= event.id:
154                mlog.warning("event registration skipped: invalid event id")
155                continue
156
157            if last_timestamp > event.timestamp:
158                mlog.warning("event registration skipped: invalid timestamp")
159                continue
160
161            if not self._conditions.matches(event):
162                mlog.warning(
163                    "event %s registration skipped: invalid conditions",
164                    event.type)
165                continue
166
167            refs = collections.deque()
168
169            latest_result = self._dbs.latest.add(event)
170
171            if latest_result.added_ref:
172                refs.append(latest_result.added_ref)
173
174            if latest_result.removed_ref:
175                self._dbs.ref.remove(*latest_result.removed_ref)
176
177            refs.extend(self._dbs.timeseries.add(event))
178
179            if not refs:
180                continue
181
182            self._dbs.ref.add(event, refs)
183            self._dbs.system.set_last_event_id(event.id)
184            self._dbs.system.set_last_timestamp(server_id, event.timestamp)
185
186        self._registered_queue.append(events)
187        self._registered_count += len(events)
188
189        if self._registered_count > max_registered_count:
190            await self._flush_queue.put(None)
191
192        if self._registered_events_cb:
193            await aio.call(self._registered_events_cb, events)
194
195        return events

Register events

197    async def query(self,
198                    params: common.QueryParams
199                    ) -> common.QueryResult:
200        if not self.is_open:
201            raise common.BackendClosedError()
202
203        if isinstance(params, common.QueryLatestParams):
204            return self._dbs.latest.query(params)
205
206        if isinstance(params, common.QueryTimeseriesParams):
207            return await self._dbs.timeseries.query(params)
208
209        if isinstance(params, common.QueryServerParams):
210            return await self._dbs.ref.query(params)
211
212        raise ValueError('unsupported params type')

Query events

async def flush(self):
214    async def flush(self):
215        try:
216            future = self._loop.create_future()
217            await self._flush_queue.put(future)
218            await future
219
220        except aio.QueueClosedError:
221            raise common.BackendClosedError()

Flush internal buffers and permanently persist events