hat.event.backends.lmdb.backend

  1from collections.abc import Collection
  2from pathlib import Path
  3import asyncio
  4import collections
  5import contextlib
  6import logging
  7import typing
  8
  9from hat import aio
 10from hat import json
 11
 12from hat.event.backends.lmdb import common
 13from hat.event.backends.lmdb import environment
 14from hat.event.backends.lmdb import latestdb
 15from hat.event.backends.lmdb import refdb
 16from hat.event.backends.lmdb import systemdb
 17from hat.event.backends.lmdb import timeseriesdb
 18from hat.event.backends.lmdb.conditions import Conditions
 19
 20
 21mlog = logging.getLogger(__name__)
 22
 23cleanup_max_results = 1024
 24
 25flush_queue_size = 4096
 26
 27max_registered_count = 1024 * 256
 28
 29default_timeseries_max_results = 4096
 30
 31version = '0.9'
 32
 33
 34class Databases(typing.NamedTuple):
 35    system: systemdb.SystemDb
 36    latest: latestdb.LatestDb
 37    timeseries: timeseriesdb.TimeseriesDb
 38    ref: refdb.RefDb
 39
 40
 41class Changes(typing.NamedTuple):
 42    system: systemdb.Changes
 43    latest: latestdb.Changes
 44    timeseries: timeseriesdb.Changes
 45    ref: refdb.Changes
 46
 47
 48async def create(conf: json.Data,
 49                 registered_events_cb: common.BackendRegisteredEventsCb | None,
 50                 flushed_events_cb: common.BackendFlushedEventsCb | None
 51                 ) -> 'LmdbBackend':
 52    backend = LmdbBackend()
 53    backend._registered_events_cb = registered_events_cb
 54    backend._flushed_events_cb = flushed_events_cb
 55    backend._conditions = Conditions(conf['conditions'])
 56    backend._loop = asyncio.get_running_loop()
 57    backend._flush_queue = aio.Queue(flush_queue_size)
 58    backend._registered_count = 0
 59    backend._registered_queue = collections.deque()
 60    backend._async_group = aio.Group()
 61
 62    backend._env = await environment.create(Path(conf['db_path']))
 63    backend.async_group.spawn(aio.call_on_done, backend._env.wait_closing(),
 64                              backend.close)
 65
 66    try:
 67        latest_subscription = common.create_subscription(
 68            tuple(i) for i in conf['latest']['subscriptions'])
 69
 70        timeseries_partitions = (
 71            timeseriesdb.Partition(
 72                order_by=common.OrderBy[i['order_by']],
 73                subscription=common.create_subscription(
 74                    tuple(event_type) for event_type in i['subscriptions']),
 75                limit=(
 76                    timeseriesdb.Limit(
 77                        min_entries=i['limit'].get('min_entries'),
 78                        max_entries=i['limit'].get('max_entries'),
 79                        duration=i['limit'].get('duration'),
 80                        size=i['limit'].get('size'))
 81                    if 'limit' in i else None))
 82            for i in conf['timeseries'])
 83
 84        timeseries_max_results = conf.get('timeseries_max_results',
 85                                          default_timeseries_max_results)
 86
 87        backend._dbs = await backend._env.execute(
 88            _ext_create_dbs, backend._env, conf['identifier'],
 89            backend._conditions, latest_subscription, timeseries_partitions,
 90            timeseries_max_results)
 91
 92        backend.async_group.spawn(backend._flush_loop, conf['flush_period'])
 93        backend.async_group.spawn(backend._cleanup_loop,
 94                                  conf['cleanup_period'])
 95
 96    except BaseException:
 97        await aio.uncancellable(backend._env.async_close())
 98        raise
 99
100    return backend
101
102
103def _ext_create_dbs(env, identifier, conditions, latest_subscription,
104                    timeseries_partitions, timeseries_max_results):
105    with env.ext_begin(write=True) as txn:
106        system_db = systemdb.ext_create(env, txn, version, identifier)
107        latest_db = latestdb.ext_create(env, txn, conditions,
108                                        latest_subscription)
109        timeseries_db = timeseriesdb.ext_create(env, txn, conditions,
110                                                timeseries_partitions,
111                                                timeseries_max_results)
112        ref_db = refdb.RefDb(env)
113
114    return Databases(system=system_db,
115                     latest=latest_db,
116                     timeseries=timeseries_db,
117                     ref=ref_db)
118
119
120class LmdbBackend(common.Backend):
121
122    @property
123    def async_group(self) -> aio.Group:
124        return self._async_group
125
126    async def get_last_event_id(self,
127                                server_id: int
128                                ) -> common.EventId:
129        if not self.is_open:
130            raise common.BackendClosedError()
131
132        return self._dbs.system.get_last_event_id(server_id)
133
134    async def register(self,
135                       events: Collection[common.Event]
136                       ) -> Collection[common.Event] | None:
137        if not self.is_open:
138            raise common.BackendClosedError()
139
140        for event in events:
141            server_id = event.id.server
142
143            last_event_id = self._dbs.system.get_last_event_id(server_id)
144            last_timestamp = self._dbs.system.get_last_timestamp(server_id)
145
146            if last_event_id >= event.id:
147                mlog.warning("event registration skipped: invalid event id")
148                continue
149
150            if last_timestamp > event.timestamp:
151                mlog.warning("event registration skipped: invalid timestamp")
152                continue
153
154            if not self._conditions.matches(event):
155                mlog.warning("event registration skipped: invalid conditions")
156                continue
157
158            refs = collections.deque()
159
160            latest_result = self._dbs.latest.add(event)
161
162            if latest_result.added_ref:
163                refs.append(latest_result.added_ref)
164
165            if latest_result.removed_ref:
166                self._dbs.ref.remove(*latest_result.removed_ref)
167
168            refs.extend(self._dbs.timeseries.add(event))
169
170            if not refs:
171                continue
172
173            self._dbs.ref.add(event, refs)
174            self._dbs.system.set_last_event_id(event.id)
175            self._dbs.system.set_last_timestamp(server_id, event.timestamp)
176
177        self._registered_queue.append(events)
178        self._registered_count += len(events)
179
180        if self._registered_count > max_registered_count:
181            await self._flush_queue.put(None)
182
183        if self._registered_events_cb:
184            await aio.call(self._registered_events_cb, events)
185
186        return events
187
188    async def query(self,
189                    params: common.QueryParams
190                    ) -> common.QueryResult:
191        if not self.is_open:
192            raise common.BackendClosedError()
193
194        if isinstance(params, common.QueryLatestParams):
195            return self._dbs.latest.query(params)
196
197        if isinstance(params, common.QueryTimeseriesParams):
198            return await self._dbs.timeseries.query(params)
199
200        if isinstance(params, common.QueryServerParams):
201            return await self._dbs.ref.query(params)
202
203        raise ValueError('unsupported params type')
204
205    async def flush(self):
206        try:
207            future = self._loop.create_future()
208            await self._flush_queue.put(future)
209            await future
210
211        except aio.QueueClosedError:
212            raise common.BackendClosedError()
213
214    async def _flush_loop(self, flush_period):
215        futures = collections.deque()
216
217        async def cleanup():
218            with contextlib.suppress(Exception):
219                await self._flush()
220
221            await self._env.async_close()
222
223        try:
224            while True:
225                try:
226                    future = await aio.wait_for(self._flush_queue.get(),
227                                                flush_period)
228                    futures.append(future)
229
230                except asyncio.TimeoutError:
231                    pass
232
233                except aio.CancelledWithResultError as e:
234                    if e.result:
235                        futures.append(e.result)
236
237                    raise
238
239                while not self._flush_queue.empty():
240                    futures.append(self._flush_queue.get_nowait())
241
242                await aio.uncancellable(self._flush())
243
244                while futures:
245                    future = futures.popleft()
246                    if future and not future.done():
247                        future.set_result(None)
248
249        except Exception as e:
250            mlog.error('backend flush error: %s', e, exc_info=e)
251
252        finally:
253            self.close()
254            self._flush_queue.close()
255
256            while not self._flush_queue.empty():
257                futures.append(self._flush_queue.get_nowait())
258
259            for future in futures:
260                if future and not future.done():
261                    future.set_exception(common.BackendClosedError())
262
263            await aio.uncancellable(cleanup())
264
265    async def _cleanup_loop(self, cleanup_period):
266        try:
267            while True:
268                await asyncio.sleep(0)
269
270                repeat = await self._env.execute(_ext_cleanup, self._env,
271                                                 self._dbs, common.now())
272                if repeat:
273                    continue
274
275                await asyncio.sleep(cleanup_period)
276
277        except Exception as e:
278            mlog.error('backend cleanup error: %s', e, exc_info=e)
279
280        finally:
281            self.close()
282
283    async def _flush(self):
284        if not self._env.is_open:
285            return
286
287        self._registered_count = 0
288        registered_queue, self._registered_queue = (self._registered_queue,
289                                                    collections.deque())
290
291        changes = Changes(system=self._dbs.system.create_changes(),
292                          latest=self._dbs.latest.create_changes(),
293                          timeseries=self._dbs.timeseries.create_changes(),
294                          ref=self._dbs.ref.create_changes())
295
296        # TODO lock period between create_changes and locking executor
297        #      (timeseries and ref must write changes before new queries are
298        #      allowed)
299
300        await self._env.execute(_ext_flush, self._env, self._dbs, changes)
301
302        if not self._flushed_events_cb:
303            return
304
305        while registered_queue:
306            events = registered_queue.popleft()
307            await aio.call(self._flushed_events_cb, events)
308
309
310def _ext_flush(env, dbs, changes):
311    with env.ext_begin(write=True) as txn:
312        dbs.system.ext_write(txn, changes.system)
313        dbs.latest.ext_write(txn, changes.latest)
314        dbs.timeseries.ext_write(txn, changes.timeseries)
315        dbs.ref.ext_write(txn, changes.ref)
316
317
318def _ext_cleanup(env, dbs, now):
319    with env.ext_begin(write=True) as txn:
320        result = dbs.timeseries.ext_cleanup(txn, now, cleanup_max_results)
321        if not result:
322            return False
323
324        dbs.ref.ext_cleanup(txn, result)
325
326    return len(result) >= cleanup_max_results
mlog = <Logger hat.event.backends.lmdb.backend (WARNING)>
cleanup_max_results = 1024
flush_queue_size = 4096
max_registered_count = 262144
default_timeseries_max_results = 4096
version = '0.9'
class Databases(typing.NamedTuple):
35class Databases(typing.NamedTuple):
36    system: systemdb.SystemDb
37    latest: latestdb.LatestDb
38    timeseries: timeseriesdb.TimeseriesDb
39    ref: refdb.RefDb

Databases(system, latest, timeseries, ref)

Create new instance of Databases(system, latest, timeseries, ref)

Alias for field number 0

Alias for field number 1

Alias for field number 2

Alias for field number 3

class Changes(typing.NamedTuple):
42class Changes(typing.NamedTuple):
43    system: systemdb.Changes
44    latest: latestdb.Changes
45    timeseries: timeseriesdb.Changes
46    ref: refdb.Changes

Changes(system, latest, timeseries, ref)

Create new instance of Changes(system, latest, timeseries, ref)

Alias for field number 0

Alias for field number 1

timeseries: dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]]

Alias for field number 2

Alias for field number 3

async def create( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]], registered_events_cb: Optional[Callable[[Collection[hat.event.common.Event]], None | Awaitable[None]]], flushed_events_cb: Optional[Callable[[Collection[hat.event.common.Event]], None | Awaitable[None]]]) -> LmdbBackend:
 49async def create(conf: json.Data,
 50                 registered_events_cb: common.BackendRegisteredEventsCb | None,
 51                 flushed_events_cb: common.BackendFlushedEventsCb | None
 52                 ) -> 'LmdbBackend':
 53    backend = LmdbBackend()
 54    backend._registered_events_cb = registered_events_cb
 55    backend._flushed_events_cb = flushed_events_cb
 56    backend._conditions = Conditions(conf['conditions'])
 57    backend._loop = asyncio.get_running_loop()
 58    backend._flush_queue = aio.Queue(flush_queue_size)
 59    backend._registered_count = 0
 60    backend._registered_queue = collections.deque()
 61    backend._async_group = aio.Group()
 62
 63    backend._env = await environment.create(Path(conf['db_path']))
 64    backend.async_group.spawn(aio.call_on_done, backend._env.wait_closing(),
 65                              backend.close)
 66
 67    try:
 68        latest_subscription = common.create_subscription(
 69            tuple(i) for i in conf['latest']['subscriptions'])
 70
 71        timeseries_partitions = (
 72            timeseriesdb.Partition(
 73                order_by=common.OrderBy[i['order_by']],
 74                subscription=common.create_subscription(
 75                    tuple(event_type) for event_type in i['subscriptions']),
 76                limit=(
 77                    timeseriesdb.Limit(
 78                        min_entries=i['limit'].get('min_entries'),
 79                        max_entries=i['limit'].get('max_entries'),
 80                        duration=i['limit'].get('duration'),
 81                        size=i['limit'].get('size'))
 82                    if 'limit' in i else None))
 83            for i in conf['timeseries'])
 84
 85        timeseries_max_results = conf.get('timeseries_max_results',
 86                                          default_timeseries_max_results)
 87
 88        backend._dbs = await backend._env.execute(
 89            _ext_create_dbs, backend._env, conf['identifier'],
 90            backend._conditions, latest_subscription, timeseries_partitions,
 91            timeseries_max_results)
 92
 93        backend.async_group.spawn(backend._flush_loop, conf['flush_period'])
 94        backend.async_group.spawn(backend._cleanup_loop,
 95                                  conf['cleanup_period'])
 96
 97    except BaseException:
 98        await aio.uncancellable(backend._env.async_close())
 99        raise
100
101    return backend
class LmdbBackend(hat.event.common.backend.Backend):
121class LmdbBackend(common.Backend):
122
123    @property
124    def async_group(self) -> aio.Group:
125        return self._async_group
126
127    async def get_last_event_id(self,
128                                server_id: int
129                                ) -> common.EventId:
130        if not self.is_open:
131            raise common.BackendClosedError()
132
133        return self._dbs.system.get_last_event_id(server_id)
134
135    async def register(self,
136                       events: Collection[common.Event]
137                       ) -> Collection[common.Event] | None:
138        if not self.is_open:
139            raise common.BackendClosedError()
140
141        for event in events:
142            server_id = event.id.server
143
144            last_event_id = self._dbs.system.get_last_event_id(server_id)
145            last_timestamp = self._dbs.system.get_last_timestamp(server_id)
146
147            if last_event_id >= event.id:
148                mlog.warning("event registration skipped: invalid event id")
149                continue
150
151            if last_timestamp > event.timestamp:
152                mlog.warning("event registration skipped: invalid timestamp")
153                continue
154
155            if not self._conditions.matches(event):
156                mlog.warning("event registration skipped: invalid conditions")
157                continue
158
159            refs = collections.deque()
160
161            latest_result = self._dbs.latest.add(event)
162
163            if latest_result.added_ref:
164                refs.append(latest_result.added_ref)
165
166            if latest_result.removed_ref:
167                self._dbs.ref.remove(*latest_result.removed_ref)
168
169            refs.extend(self._dbs.timeseries.add(event))
170
171            if not refs:
172                continue
173
174            self._dbs.ref.add(event, refs)
175            self._dbs.system.set_last_event_id(event.id)
176            self._dbs.system.set_last_timestamp(server_id, event.timestamp)
177
178        self._registered_queue.append(events)
179        self._registered_count += len(events)
180
181        if self._registered_count > max_registered_count:
182            await self._flush_queue.put(None)
183
184        if self._registered_events_cb:
185            await aio.call(self._registered_events_cb, events)
186
187        return events
188
189    async def query(self,
190                    params: common.QueryParams
191                    ) -> common.QueryResult:
192        if not self.is_open:
193            raise common.BackendClosedError()
194
195        if isinstance(params, common.QueryLatestParams):
196            return self._dbs.latest.query(params)
197
198        if isinstance(params, common.QueryTimeseriesParams):
199            return await self._dbs.timeseries.query(params)
200
201        if isinstance(params, common.QueryServerParams):
202            return await self._dbs.ref.query(params)
203
204        raise ValueError('unsupported params type')
205
206    async def flush(self):
207        try:
208            future = self._loop.create_future()
209            await self._flush_queue.put(future)
210            await future
211
212        except aio.QueueClosedError:
213            raise common.BackendClosedError()
214
215    async def _flush_loop(self, flush_period):
216        futures = collections.deque()
217
218        async def cleanup():
219            with contextlib.suppress(Exception):
220                await self._flush()
221
222            await self._env.async_close()
223
224        try:
225            while True:
226                try:
227                    future = await aio.wait_for(self._flush_queue.get(),
228                                                flush_period)
229                    futures.append(future)
230
231                except asyncio.TimeoutError:
232                    pass
233
234                except aio.CancelledWithResultError as e:
235                    if e.result:
236                        futures.append(e.result)
237
238                    raise
239
240                while not self._flush_queue.empty():
241                    futures.append(self._flush_queue.get_nowait())
242
243                await aio.uncancellable(self._flush())
244
245                while futures:
246                    future = futures.popleft()
247                    if future and not future.done():
248                        future.set_result(None)
249
250        except Exception as e:
251            mlog.error('backend flush error: %s', e, exc_info=e)
252
253        finally:
254            self.close()
255            self._flush_queue.close()
256
257            while not self._flush_queue.empty():
258                futures.append(self._flush_queue.get_nowait())
259
260            for future in futures:
261                if future and not future.done():
262                    future.set_exception(common.BackendClosedError())
263
264            await aio.uncancellable(cleanup())
265
266    async def _cleanup_loop(self, cleanup_period):
267        try:
268            while True:
269                await asyncio.sleep(0)
270
271                repeat = await self._env.execute(_ext_cleanup, self._env,
272                                                 self._dbs, common.now())
273                if repeat:
274                    continue
275
276                await asyncio.sleep(cleanup_period)
277
278        except Exception as e:
279            mlog.error('backend cleanup error: %s', e, exc_info=e)
280
281        finally:
282            self.close()
283
284    async def _flush(self):
285        if not self._env.is_open:
286            return
287
288        self._registered_count = 0
289        registered_queue, self._registered_queue = (self._registered_queue,
290                                                    collections.deque())
291
292        changes = Changes(system=self._dbs.system.create_changes(),
293                          latest=self._dbs.latest.create_changes(),
294                          timeseries=self._dbs.timeseries.create_changes(),
295                          ref=self._dbs.ref.create_changes())
296
297        # TODO lock period between create_changes and locking executor
298        #      (timeseries and ref must write changes before new queries are
299        #      allowed)
300
301        await self._env.execute(_ext_flush, self._env, self._dbs, changes)
302
303        if not self._flushed_events_cb:
304            return
305
306        while registered_queue:
307            events = registered_queue.popleft()
308            await aio.call(self._flushed_events_cb, events)

Backend ABC

async_group: hat.aio.group.Group
123    @property
124    def async_group(self) -> aio.Group:
125        return self._async_group

Group controlling resource's lifetime.

async def get_last_event_id(self, server_id: int) -> hat.event.common.EventId:
127    async def get_last_event_id(self,
128                                server_id: int
129                                ) -> common.EventId:
130        if not self.is_open:
131            raise common.BackendClosedError()
132
133        return self._dbs.system.get_last_event_id(server_id)

Get last registered event id associated with server id

async def register( self, events: Collection[hat.event.common.Event]) -> Collection[hat.event.common.Event] | None:
135    async def register(self,
136                       events: Collection[common.Event]
137                       ) -> Collection[common.Event] | None:
138        if not self.is_open:
139            raise common.BackendClosedError()
140
141        for event in events:
142            server_id = event.id.server
143
144            last_event_id = self._dbs.system.get_last_event_id(server_id)
145            last_timestamp = self._dbs.system.get_last_timestamp(server_id)
146
147            if last_event_id >= event.id:
148                mlog.warning("event registration skipped: invalid event id")
149                continue
150
151            if last_timestamp > event.timestamp:
152                mlog.warning("event registration skipped: invalid timestamp")
153                continue
154
155            if not self._conditions.matches(event):
156                mlog.warning("event registration skipped: invalid conditions")
157                continue
158
159            refs = collections.deque()
160
161            latest_result = self._dbs.latest.add(event)
162
163            if latest_result.added_ref:
164                refs.append(latest_result.added_ref)
165
166            if latest_result.removed_ref:
167                self._dbs.ref.remove(*latest_result.removed_ref)
168
169            refs.extend(self._dbs.timeseries.add(event))
170
171            if not refs:
172                continue
173
174            self._dbs.ref.add(event, refs)
175            self._dbs.system.set_last_event_id(event.id)
176            self._dbs.system.set_last_timestamp(server_id, event.timestamp)
177
178        self._registered_queue.append(events)
179        self._registered_count += len(events)
180
181        if self._registered_count > max_registered_count:
182            await self._flush_queue.put(None)
183
184        if self._registered_events_cb:
185            await aio.call(self._registered_events_cb, events)
186
187        return events

Register events

189    async def query(self,
190                    params: common.QueryParams
191                    ) -> common.QueryResult:
192        if not self.is_open:
193            raise common.BackendClosedError()
194
195        if isinstance(params, common.QueryLatestParams):
196            return self._dbs.latest.query(params)
197
198        if isinstance(params, common.QueryTimeseriesParams):
199            return await self._dbs.timeseries.query(params)
200
201        if isinstance(params, common.QueryServerParams):
202            return await self._dbs.ref.query(params)
203
204        raise ValueError('unsupported params type')

Query events

async def flush(self):
206    async def flush(self):
207        try:
208            future = self._loop.create_future()
209            await self._flush_queue.put(future)
210            await future
211
212        except aio.QueueClosedError:
213            raise common.BackendClosedError()

Flush internal buffers and permanently persist events