hat.event.backends.lmdb.backend

  1from collections.abc import Collection
  2from pathlib import Path
  3import asyncio
  4import collections
  5import contextlib
  6import logging
  7import typing
  8
  9from hat import aio
 10from hat import json
 11
 12from hat.event.backends.lmdb import common
 13from hat.event.backends.lmdb import environment
 14from hat.event.backends.lmdb import latestdb
 15from hat.event.backends.lmdb import refdb
 16from hat.event.backends.lmdb import systemdb
 17from hat.event.backends.lmdb import timeseriesdb
 18from hat.event.backends.lmdb.conditions import Conditions
 19
 20
 21mlog = logging.getLogger(__name__)
 22
 23cleanup_max_results = 1024
 24
 25flush_queue_size = 4096
 26
 27max_registered_count = 1024 * 256
 28
 29version = '0.9'
 30
 31
 32class Databases(typing.NamedTuple):
 33    system: systemdb.SystemDb
 34    latest: latestdb.LatestDb
 35    timeseries: timeseriesdb.TimeseriesDb
 36    ref: refdb.RefDb
 37
 38
 39class Changes(typing.NamedTuple):
 40    system: systemdb.Changes
 41    latest: latestdb.Changes
 42    timeseries: timeseriesdb.Changes
 43    ref: refdb.Changes
 44
 45
 46async def create(conf: json.Data,
 47                 registered_events_cb: common.BackendRegisteredEventsCb | None,
 48                 flushed_events_cb: common.BackendFlushedEventsCb | None
 49                 ) -> 'LmdbBackend':
 50    backend = LmdbBackend()
 51    backend._registered_events_cb = registered_events_cb
 52    backend._flushed_events_cb = flushed_events_cb
 53    backend._conditions = Conditions(conf['conditions'])
 54    backend._loop = asyncio.get_running_loop()
 55    backend._flush_queue = aio.Queue(flush_queue_size)
 56    backend._registered_count = 0
 57    backend._registered_queue = collections.deque()
 58    backend._async_group = aio.Group()
 59
 60    backend._env = await environment.create(Path(conf['db_path']))
 61    backend.async_group.spawn(aio.call_on_done, backend._env.wait_closing(),
 62                              backend.close)
 63
 64    try:
 65        latest_subscription = common.create_subscription(
 66            tuple(i) for i in conf['latest']['subscriptions'])
 67
 68        timeseries_partitions = (
 69            timeseriesdb.Partition(
 70                order_by=common.OrderBy[i['order_by']],
 71                subscription=common.create_subscription(
 72                    tuple(event_type) for event_type in i['subscriptions']),
 73                limit=(
 74                    timeseriesdb.Limit(
 75                        min_entries=i['limit'].get('min_entries'),
 76                        max_entries=i['limit'].get('max_entries'),
 77                        duration=i['limit'].get('duration'),
 78                        size=i['limit'].get('size'))
 79                    if 'limit' in i else None))
 80            for i in conf['timeseries'])
 81
 82        backend._dbs = await backend._env.execute(
 83            _ext_create_dbs, backend._env, conf['identifier'],
 84            backend._conditions, latest_subscription, timeseries_partitions)
 85
 86        backend.async_group.spawn(backend._flush_loop, conf['flush_period'])
 87        backend.async_group.spawn(backend._cleanup_loop,
 88                                  conf['cleanup_period'])
 89
 90    except BaseException:
 91        await aio.uncancellable(backend._env.async_close())
 92        raise
 93
 94    return backend
 95
 96
 97def _ext_create_dbs(env, identifier, conditions, latest_subscription,
 98                    timeseries_partitions):
 99    with env.ext_begin(write=True) as txn:
100        system_db = systemdb.ext_create(env, txn, version, identifier)
101        latest_db = latestdb.ext_create(env, txn, conditions,
102                                        latest_subscription)
103        timeseries_db = timeseriesdb.ext_create(env, txn, conditions,
104                                                timeseries_partitions)
105        ref_db = refdb.RefDb(env)
106
107    return Databases(system=system_db,
108                     latest=latest_db,
109                     timeseries=timeseries_db,
110                     ref=ref_db)
111
112
113class LmdbBackend(common.Backend):
114
115    @property
116    def async_group(self) -> aio.Group:
117        return self._async_group
118
119    async def get_last_event_id(self,
120                                server_id: int
121                                ) -> common.EventId:
122        if not self.is_open:
123            raise common.BackendClosedError()
124
125        return self._dbs.system.get_last_event_id(server_id)
126
127    async def register(self,
128                       events: Collection[common.Event]
129                       ) -> Collection[common.Event] | None:
130        if not self.is_open:
131            raise common.BackendClosedError()
132
133        for event in events:
134            server_id = event.id.server
135
136            last_event_id = self._dbs.system.get_last_event_id(server_id)
137            last_timestamp = self._dbs.system.get_last_timestamp(server_id)
138
139            if last_event_id >= event.id:
140                mlog.warning("event registration skipped: invalid event id")
141                continue
142
143            if last_timestamp > event.timestamp:
144                mlog.warning("event registration skipped: invalid timestamp")
145                continue
146
147            if not self._conditions.matches(event):
148                mlog.warning("event registration skipped: invalid conditions")
149                continue
150
151            refs = collections.deque()
152
153            latest_result = self._dbs.latest.add(event)
154
155            if latest_result.added_ref:
156                refs.append(latest_result.added_ref)
157
158            if latest_result.removed_ref:
159                self._dbs.ref.remove(*latest_result.removed_ref)
160
161            refs.extend(self._dbs.timeseries.add(event))
162
163            if not refs:
164                continue
165
166            self._dbs.ref.add(event, refs)
167            self._dbs.system.set_last_event_id(event.id)
168            self._dbs.system.set_last_timestamp(server_id, event.timestamp)
169
170        self._registered_queue.append(events)
171        self._registered_count += len(events)
172
173        if self._registered_count > max_registered_count:
174            await self._flush_queue.put(self._loop.create_future())
175
176        if self._registered_events_cb:
177            await aio.call(self._registered_events_cb, events)
178
179        return events
180
181    async def query(self,
182                    params: common.QueryParams
183                    ) -> common.QueryResult:
184        if not self.is_open:
185            raise common.BackendClosedError()
186
187        if isinstance(params, common.QueryLatestParams):
188            return self._dbs.latest.query(params)
189
190        if isinstance(params, common.QueryTimeseriesParams):
191            return await self._dbs.timeseries.query(params)
192
193        if isinstance(params, common.QueryServerParams):
194            return await self._dbs.ref.query(params)
195
196        raise ValueError('unsupported params type')
197
198    async def flush(self):
199        try:
200            future = self._loop.create_future()
201            await self._flush_queue.put(future)
202            await future
203
204        except aio.QueueClosedError:
205            raise common.BackendClosedError()
206
207    async def _flush_loop(self, flush_period):
208        futures = collections.deque()
209
210        async def cleanup():
211            with contextlib.suppress(Exception):
212                await self._flush()
213
214            await self._env.async_close()
215
216        try:
217            while True:
218                try:
219                    future = await aio.wait_for(self._flush_queue.get(),
220                                                flush_period)
221                    futures.append(future)
222
223                except asyncio.TimeoutError:
224                    pass
225
226                except aio.CancelledWithResultError as e:
227                    if e.result:
228                        futures.append(e.result)
229
230                    raise
231
232                while not self._flush_queue.empty():
233                    futures.append(self._flush_queue.get_nowait())
234
235                await aio.uncancellable(self._flush())
236
237                while futures:
238                    future = futures.popleft()
239                    if not future.done():
240                        future.set_result(None)
241
242        except Exception as e:
243            mlog.error('backend flush error: %s', e, exc_info=e)
244
245        finally:
246            self.close()
247            self._flush_queue.close()
248
249            while not self._flush_queue.empty():
250                futures.append(self._flush_queue.get_nowait())
251
252            for future in futures:
253                if not future.done():
254                    future.set_exception(common.BackendClosedError())
255
256            await aio.uncancellable(cleanup())
257
258    async def _cleanup_loop(self, cleanup_period):
259        try:
260            while True:
261                await asyncio.sleep(0)
262
263                repeat = await self._env.execute(_ext_cleanup, self._env,
264                                                 self._dbs, common.now())
265                if repeat:
266                    continue
267
268                await asyncio.sleep(cleanup_period)
269
270        except Exception as e:
271            mlog.error('backend cleanup error: %s', e, exc_info=e)
272
273        finally:
274            self.close()
275
276    async def _flush(self):
277        if not self._env.is_open:
278            return
279
280        self._registered_count = 0
281        registered_queue, self._registered_queue = (self._registered_queue,
282                                                    collections.deque())
283
284        changes = Changes(system=self._dbs.system.create_changes(),
285                          latest=self._dbs.latest.create_changes(),
286                          timeseries=self._dbs.timeseries.create_changes(),
287                          ref=self._dbs.ref.create_changes())
288
289        # TODO lock period between create_changes and locking executor
290        #      (timeseries and ref must write changes before new queries are
291        #      allowed)
292
293        await self._env.execute(_ext_flush, self._env, self._dbs, changes)
294
295        if not self._flushed_events_cb:
296            return
297
298        while registered_queue:
299            events = registered_queue.popleft()
300            await aio.call(self._flushed_events_cb, events)
301
302
303def _ext_flush(env, dbs, changes):
304    with env.ext_begin(write=True) as txn:
305        dbs.system.ext_write(txn, changes.system)
306        dbs.latest.ext_write(txn, changes.latest)
307        dbs.timeseries.ext_write(txn, changes.timeseries)
308        dbs.ref.ext_write(txn, changes.ref)
309
310
311def _ext_cleanup(env, dbs, now):
312    with env.ext_begin(write=True) as txn:
313        result = dbs.timeseries.ext_cleanup(txn, now, cleanup_max_results)
314        if not result:
315            return False
316
317        dbs.ref.ext_cleanup(txn, result)
318
319    return len(result) >= cleanup_max_results
mlog = <Logger hat.event.backends.lmdb.backend (WARNING)>
cleanup_max_results = 1024
flush_queue_size = 4096
max_registered_count = 262144
version = '0.9'
class Databases(typing.NamedTuple):
33class Databases(typing.NamedTuple):
34    system: systemdb.SystemDb
35    latest: latestdb.LatestDb
36    timeseries: timeseriesdb.TimeseriesDb
37    ref: refdb.RefDb

Databases(system, latest, timeseries, ref)

Create new instance of Databases(system, latest, timeseries, ref)

Alias for field number 0

Alias for field number 1

Alias for field number 2

Alias for field number 3

class Changes(typing.NamedTuple):
40class Changes(typing.NamedTuple):
41    system: systemdb.Changes
42    latest: latestdb.Changes
43    timeseries: timeseriesdb.Changes
44    ref: refdb.Changes

Changes(system, latest, timeseries, ref)

Create new instance of Changes(system, latest, timeseries, ref)

Alias for field number 0

Alias for field number 1

timeseries: dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]]

Alias for field number 2

Alias for field number 3

async def create( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], registered_events_cb: Optional[Callable[[Collection[hat.event.common.Event]], Optional[Awaitable[NoneType]]]], flushed_events_cb: Optional[Callable[[Collection[hat.event.common.Event]], Optional[Awaitable[NoneType]]]]) -> LmdbBackend:
47async def create(conf: json.Data,
48                 registered_events_cb: common.BackendRegisteredEventsCb | None,
49                 flushed_events_cb: common.BackendFlushedEventsCb | None
50                 ) -> 'LmdbBackend':
51    backend = LmdbBackend()
52    backend._registered_events_cb = registered_events_cb
53    backend._flushed_events_cb = flushed_events_cb
54    backend._conditions = Conditions(conf['conditions'])
55    backend._loop = asyncio.get_running_loop()
56    backend._flush_queue = aio.Queue(flush_queue_size)
57    backend._registered_count = 0
58    backend._registered_queue = collections.deque()
59    backend._async_group = aio.Group()
60
61    backend._env = await environment.create(Path(conf['db_path']))
62    backend.async_group.spawn(aio.call_on_done, backend._env.wait_closing(),
63                              backend.close)
64
65    try:
66        latest_subscription = common.create_subscription(
67            tuple(i) for i in conf['latest']['subscriptions'])
68
69        timeseries_partitions = (
70            timeseriesdb.Partition(
71                order_by=common.OrderBy[i['order_by']],
72                subscription=common.create_subscription(
73                    tuple(event_type) for event_type in i['subscriptions']),
74                limit=(
75                    timeseriesdb.Limit(
76                        min_entries=i['limit'].get('min_entries'),
77                        max_entries=i['limit'].get('max_entries'),
78                        duration=i['limit'].get('duration'),
79                        size=i['limit'].get('size'))
80                    if 'limit' in i else None))
81            for i in conf['timeseries'])
82
83        backend._dbs = await backend._env.execute(
84            _ext_create_dbs, backend._env, conf['identifier'],
85            backend._conditions, latest_subscription, timeseries_partitions)
86
87        backend.async_group.spawn(backend._flush_loop, conf['flush_period'])
88        backend.async_group.spawn(backend._cleanup_loop,
89                                  conf['cleanup_period'])
90
91    except BaseException:
92        await aio.uncancellable(backend._env.async_close())
93        raise
94
95    return backend
class LmdbBackend(hat.event.common.backend.Backend):
114class LmdbBackend(common.Backend):
115
116    @property
117    def async_group(self) -> aio.Group:
118        return self._async_group
119
120    async def get_last_event_id(self,
121                                server_id: int
122                                ) -> common.EventId:
123        if not self.is_open:
124            raise common.BackendClosedError()
125
126        return self._dbs.system.get_last_event_id(server_id)
127
128    async def register(self,
129                       events: Collection[common.Event]
130                       ) -> Collection[common.Event] | None:
131        if not self.is_open:
132            raise common.BackendClosedError()
133
134        for event in events:
135            server_id = event.id.server
136
137            last_event_id = self._dbs.system.get_last_event_id(server_id)
138            last_timestamp = self._dbs.system.get_last_timestamp(server_id)
139
140            if last_event_id >= event.id:
141                mlog.warning("event registration skipped: invalid event id")
142                continue
143
144            if last_timestamp > event.timestamp:
145                mlog.warning("event registration skipped: invalid timestamp")
146                continue
147
148            if not self._conditions.matches(event):
149                mlog.warning("event registration skipped: invalid conditions")
150                continue
151
152            refs = collections.deque()
153
154            latest_result = self._dbs.latest.add(event)
155
156            if latest_result.added_ref:
157                refs.append(latest_result.added_ref)
158
159            if latest_result.removed_ref:
160                self._dbs.ref.remove(*latest_result.removed_ref)
161
162            refs.extend(self._dbs.timeseries.add(event))
163
164            if not refs:
165                continue
166
167            self._dbs.ref.add(event, refs)
168            self._dbs.system.set_last_event_id(event.id)
169            self._dbs.system.set_last_timestamp(server_id, event.timestamp)
170
171        self._registered_queue.append(events)
172        self._registered_count += len(events)
173
174        if self._registered_count > max_registered_count:
175            await self._flush_queue.put(self._loop.create_future())
176
177        if self._registered_events_cb:
178            await aio.call(self._registered_events_cb, events)
179
180        return events
181
182    async def query(self,
183                    params: common.QueryParams
184                    ) -> common.QueryResult:
185        if not self.is_open:
186            raise common.BackendClosedError()
187
188        if isinstance(params, common.QueryLatestParams):
189            return self._dbs.latest.query(params)
190
191        if isinstance(params, common.QueryTimeseriesParams):
192            return await self._dbs.timeseries.query(params)
193
194        if isinstance(params, common.QueryServerParams):
195            return await self._dbs.ref.query(params)
196
197        raise ValueError('unsupported params type')
198
199    async def flush(self):
200        try:
201            future = self._loop.create_future()
202            await self._flush_queue.put(future)
203            await future
204
205        except aio.QueueClosedError:
206            raise common.BackendClosedError()
207
208    async def _flush_loop(self, flush_period):
209        futures = collections.deque()
210
211        async def cleanup():
212            with contextlib.suppress(Exception):
213                await self._flush()
214
215            await self._env.async_close()
216
217        try:
218            while True:
219                try:
220                    future = await aio.wait_for(self._flush_queue.get(),
221                                                flush_period)
222                    futures.append(future)
223
224                except asyncio.TimeoutError:
225                    pass
226
227                except aio.CancelledWithResultError as e:
228                    if e.result:
229                        futures.append(e.result)
230
231                    raise
232
233                while not self._flush_queue.empty():
234                    futures.append(self._flush_queue.get_nowait())
235
236                await aio.uncancellable(self._flush())
237
238                while futures:
239                    future = futures.popleft()
240                    if not future.done():
241                        future.set_result(None)
242
243        except Exception as e:
244            mlog.error('backend flush error: %s', e, exc_info=e)
245
246        finally:
247            self.close()
248            self._flush_queue.close()
249
250            while not self._flush_queue.empty():
251                futures.append(self._flush_queue.get_nowait())
252
253            for future in futures:
254                if not future.done():
255                    future.set_exception(common.BackendClosedError())
256
257            await aio.uncancellable(cleanup())
258
259    async def _cleanup_loop(self, cleanup_period):
260        try:
261            while True:
262                await asyncio.sleep(0)
263
264                repeat = await self._env.execute(_ext_cleanup, self._env,
265                                                 self._dbs, common.now())
266                if repeat:
267                    continue
268
269                await asyncio.sleep(cleanup_period)
270
271        except Exception as e:
272            mlog.error('backend cleanup error: %s', e, exc_info=e)
273
274        finally:
275            self.close()
276
277    async def _flush(self):
278        if not self._env.is_open:
279            return
280
281        self._registered_count = 0
282        registered_queue, self._registered_queue = (self._registered_queue,
283                                                    collections.deque())
284
285        changes = Changes(system=self._dbs.system.create_changes(),
286                          latest=self._dbs.latest.create_changes(),
287                          timeseries=self._dbs.timeseries.create_changes(),
288                          ref=self._dbs.ref.create_changes())
289
290        # TODO lock period between create_changes and locking executor
291        #      (timeseries and ref must write changes before new queries are
292        #      allowed)
293
294        await self._env.execute(_ext_flush, self._env, self._dbs, changes)
295
296        if not self._flushed_events_cb:
297            return
298
299        while registered_queue:
300            events = registered_queue.popleft()
301            await aio.call(self._flushed_events_cb, events)

Backend ABC

async_group: hat.aio.group.Group
116    @property
117    def async_group(self) -> aio.Group:
118        return self._async_group

Group controlling resource's lifetime.

async def get_last_event_id(self, server_id: int) -> hat.event.common.EventId:
120    async def get_last_event_id(self,
121                                server_id: int
122                                ) -> common.EventId:
123        if not self.is_open:
124            raise common.BackendClosedError()
125
126        return self._dbs.system.get_last_event_id(server_id)

Get last registered event id associated with server id

async def register( self, events: Collection[hat.event.common.Event]) -> Collection[hat.event.common.Event] | None:
128    async def register(self,
129                       events: Collection[common.Event]
130                       ) -> Collection[common.Event] | None:
131        if not self.is_open:
132            raise common.BackendClosedError()
133
134        for event in events:
135            server_id = event.id.server
136
137            last_event_id = self._dbs.system.get_last_event_id(server_id)
138            last_timestamp = self._dbs.system.get_last_timestamp(server_id)
139
140            if last_event_id >= event.id:
141                mlog.warning("event registration skipped: invalid event id")
142                continue
143
144            if last_timestamp > event.timestamp:
145                mlog.warning("event registration skipped: invalid timestamp")
146                continue
147
148            if not self._conditions.matches(event):
149                mlog.warning("event registration skipped: invalid conditions")
150                continue
151
152            refs = collections.deque()
153
154            latest_result = self._dbs.latest.add(event)
155
156            if latest_result.added_ref:
157                refs.append(latest_result.added_ref)
158
159            if latest_result.removed_ref:
160                self._dbs.ref.remove(*latest_result.removed_ref)
161
162            refs.extend(self._dbs.timeseries.add(event))
163
164            if not refs:
165                continue
166
167            self._dbs.ref.add(event, refs)
168            self._dbs.system.set_last_event_id(event.id)
169            self._dbs.system.set_last_timestamp(server_id, event.timestamp)
170
171        self._registered_queue.append(events)
172        self._registered_count += len(events)
173
174        if self._registered_count > max_registered_count:
175            await self._flush_queue.put(self._loop.create_future())
176
177        if self._registered_events_cb:
178            await aio.call(self._registered_events_cb, events)
179
180        return events

Register events

182    async def query(self,
183                    params: common.QueryParams
184                    ) -> common.QueryResult:
185        if not self.is_open:
186            raise common.BackendClosedError()
187
188        if isinstance(params, common.QueryLatestParams):
189            return self._dbs.latest.query(params)
190
191        if isinstance(params, common.QueryTimeseriesParams):
192            return await self._dbs.timeseries.query(params)
193
194        if isinstance(params, common.QueryServerParams):
195            return await self._dbs.ref.query(params)
196
197        raise ValueError('unsupported params type')

Query events

async def flush(self):
199    async def flush(self):
200        try:
201            future = self._loop.create_future()
202            await self._flush_queue.put(future)
203            await future
204
205        except aio.QueueClosedError:
206            raise common.BackendClosedError()

Flush internal buffers and permanently persist events