hat.event.backends.lmdb.common

  1from hat.event.common import *  # NOQA
  2
  3from pathlib import Path
  4import enum
  5import itertools
  6import platform
  7import struct
  8import sys
  9import typing
 10
 11import lmdb
 12
 13from hat import json
 14from hat import util
 15
 16from hat.event.common import (ServerId, Event, EventId, EventType,
 17                              Timestamp, EventPayloadBinary,
 18                              sbs_repo, event_to_sbs, event_from_sbs)
 19
 20
 21default_max_size: int = (1024 * 1024 * 1024 * 1024
 22                         if platform.architecture()[0] == '64bit'
 23                         else 2 * 1024 * 1024 * 1024)
 24
 25
 26DbKey = typing.TypeVar('DbKey')
 27DbValue = typing.TypeVar('DbValue')
 28
 29
 30class DbType(enum.Enum):
 31    SYSTEM_SETTINGS = 0
 32    SYSTEM_LAST_EVENT_ID = 1
 33    SYSTEM_LAST_TIMESTAMP = 2
 34    REF = 3
 35    LATEST_DATA = 4
 36    LATEST_TYPE = 5
 37    TIMESERIES_DATA = 6
 38    TIMESERIES_PARTITION = 7
 39    TIMESERIES_COUNT = 8
 40
 41
 42if sys.version_info[:2] >= (3, 11):
 43
 44    class DbDef(typing.NamedTuple, typing.Generic[DbKey, DbValue]):
 45        encode_key: typing.Callable[[DbKey], util.Bytes]
 46        decode_key: typing.Callable[[util.Bytes], DbKey]
 47        encode_value: typing.Callable[[DbValue], util.Bytes]
 48        decode_value: typing.Callable[[util.Bytes], DbValue]
 49
 50    def create_db_def(key_type: typing.Type,
 51                      value_type: typing.Type
 52                      ) -> typing.Type[DbDef]:
 53        return DbDef[key_type, value_type]
 54
 55else:
 56
 57    class DbDef(typing.NamedTuple):
 58        encode_key: typing.Callable[[DbKey], util.Bytes]
 59        decode_key: typing.Callable[[util.Bytes], DbKey]
 60        encode_value: typing.Callable[[DbValue], util.Bytes]
 61        decode_value: typing.Callable[[util.Bytes], DbValue]
 62
 63    def create_db_def(key_type: typing.Type,
 64                      value_type: typing.Type
 65                      ) -> typing.Type[DbDef]:
 66        return DbDef
 67
 68
 69EventTypeRef: typing.TypeAlias = int
 70PartitionId: typing.TypeAlias = int
 71TimeseriesKey: typing.TypeAlias = tuple[PartitionId, Timestamp, EventId]
 72
 73
 74class SettingsId(enum.Enum):
 75    VERSION = 0
 76    IDENTIFIER = 1
 77
 78
 79class LatestEventRef(typing.NamedTuple):
 80    key: EventTypeRef
 81
 82
 83class TimeseriesEventRef(typing.NamedTuple):
 84    key: TimeseriesKey
 85
 86
 87EventRef: typing.TypeAlias = LatestEventRef | TimeseriesEventRef
 88
 89
 90db_defs = {
 91    DbType.SYSTEM_SETTINGS: create_db_def(SettingsId, json.Data)(
 92        encode_key=lambda key: _encode_uint(key.value),
 93        decode_key=lambda key_bytes: SettingsId(_decode_uint(key_bytes)),
 94        encode_value=lambda value: _encode_json(value),
 95        decode_value=lambda value_bytes: _decode_json(value_bytes)),
 96
 97    DbType.SYSTEM_LAST_EVENT_ID: create_db_def(ServerId, EventId)(
 98        encode_key=lambda key: _encode_uint(key),
 99        decode_key=lambda key_bytes: _decode_uint(key_bytes),
100        encode_value=lambda value: _encode_event_id(value),
101        decode_value=lambda value_bytes: _decode_event_id(value_bytes)),
102
103    DbType.SYSTEM_LAST_TIMESTAMP: create_db_def(ServerId, Timestamp)(
104        encode_key=lambda key: _encode_uint(key),
105        decode_key=lambda key_bytes: _decode_uint(key_bytes),
106        encode_value=lambda value: _encode_timestamp(value),
107        decode_value=lambda value_bytes: _decode_timestamp(value_bytes)),
108
109    DbType.REF: create_db_def(EventId, set[EventRef])(
110        encode_key=lambda key: _encode_event_id(key),
111        decode_key=lambda key_bytes: _decode_event_id(key_bytes),
112        encode_value=lambda value: bytes(_encode_event_refs(value)),
113        decode_value=lambda value_bytes: set(_decode_event_refs(value_bytes))),
114
115    DbType.LATEST_DATA: create_db_def(EventTypeRef, Event)(
116        encode_key=lambda key: _encode_uint(key),
117        decode_key=lambda key_bytes: _decode_uint(key_bytes),
118        encode_value=lambda value: _encode_event(value),
119        decode_value=lambda value_bytes: _decode_event(value_bytes)),
120
121    DbType.LATEST_TYPE: create_db_def(EventTypeRef, EventType)(
122        encode_key=lambda key: _encode_uint(key),
123        decode_key=lambda key_bytes: _decode_uint(key_bytes),
124        encode_value=lambda value: _encode_json(list(value)),
125        decode_value=lambda value_bytes: tuple(_decode_json(value_bytes))),
126
127    DbType.TIMESERIES_DATA: create_db_def(TimeseriesKey, Event)(
128        encode_key=lambda key: _encode_timeseries_key(key),
129        decode_key=lambda key_bytes: _decode_timeseries_key(key_bytes),
130        encode_value=lambda value: _encode_event(value),
131        decode_value=lambda value_bytes: _decode_event(value_bytes)),
132
133    DbType.TIMESERIES_PARTITION: create_db_def(PartitionId, json.Data)(
134        encode_key=lambda key: _encode_uint(key),
135        decode_key=lambda key_bytes: _decode_uint(key_bytes),
136        encode_value=lambda value: _encode_json(value),
137        decode_value=lambda value_bytes: _decode_json(value_bytes)),
138
139    DbType.TIMESERIES_COUNT: create_db_def(PartitionId, int)(
140        encode_key=lambda key: _encode_uint(key),
141        decode_key=lambda key_bytes: _decode_uint(key_bytes),
142        encode_value=lambda value: _encode_uint(value),
143        decode_value=lambda value_bytes: _decode_uint(value_bytes))}
144
145
146def ext_create_env(path: Path,
147                   max_size: int = default_max_size,
148                   readonly: bool = False
149                   ) -> lmdb.Environment:
150    return lmdb.Environment(str(path),
151                            map_size=max_size,
152                            subdir=False,
153                            max_dbs=len(DbType),
154                            readonly=readonly)
155
156
157def ext_open_db(env: lmdb.Environment,
158                db_type: DbType,
159                create: bool = True
160                ) -> lmdb._Database:
161    return env.open_db(db_type.name.encode('utf-8'), create=create)
162
163
164def _encode_uint(value):
165    return struct.pack(">Q", value)
166
167
168def _decode_uint(value_bytes):
169    return struct.unpack(">Q", value_bytes)[0]
170
171
172def _encode_event_id(event_id):
173    return struct.pack(">QQQ", event_id.server, event_id.session,
174                       event_id.instance)
175
176
177def _decode_event_id(event_id_bytes):
178    server_id, session_id, instance_id = struct.unpack(">QQQ", event_id_bytes)
179    return EventId(server=server_id,
180                   session=session_id,
181                   instance=instance_id)
182
183
184def _encode_timestamp(timestamp):
185    return struct.pack(">QI", timestamp.s + (1 << 63), timestamp.us)
186
187
188def _decode_timestamp(timestamp_bytes):
189    s, us = struct.unpack(">QI", timestamp_bytes)
190    return Timestamp(s - (1 << 63), us)
191
192
193def _encode_event_ref(ref):
194    if isinstance(ref, LatestEventRef):
195        db_type = DbType.LATEST_DATA
196
197    elif isinstance(ref, TimeseriesEventRef):
198        db_type = DbType.TIMESERIES_DATA
199
200    else:
201        raise ValueError('unsupported event reference')
202
203    return bytes([db_type.value]) + db_defs[db_type].encode_key(ref.key)
204
205
206def _decode_event_ref(ref_bytes):
207    db_type = DbType(ref_bytes[0])
208    key = db_defs[db_type].decode_key(ref_bytes[1:])
209
210    if db_type == DbType.LATEST_DATA:
211        return LatestEventRef(key)
212
213    if db_type == DbType.TIMESERIES_DATA:
214        return TimeseriesEventRef(key)
215
216    raise ValueError('unsupported database type')
217
218
219def _encode_event_refs(refs):
220    return bytes(itertools.chain.from_iterable(_encode_event_ref(ref)
221                                               for ref in refs))
222
223
224def _decode_event_refs(refs_bytes):
225    while refs_bytes:
226        db_type = DbType(refs_bytes[0])
227
228        if db_type == DbType.LATEST_DATA:
229            ref_key_len = 8
230
231        elif db_type == DbType.TIMESERIES_DATA:
232            ref_key_len = 44
233
234        else:
235            raise ValueError('unsupported event reference')
236
237        ref, refs_bytes = (_decode_event_ref(refs_bytes[:ref_key_len+1]),
238                           refs_bytes[ref_key_len+1:])
239        yield ref
240
241
242def _encode_timeseries_key(key):
243    partition_id, timestamp, event_id = key
244    return bytes(itertools.chain(_encode_uint(partition_id),
245                                 _encode_timestamp(timestamp),
246                                 _encode_event_id(event_id)))
247
248
249def _decode_timeseries_key(key_bytes):
250    partition_id = _decode_uint(key_bytes[:8])
251    timestamp = _decode_timestamp(key_bytes[8:20])
252    event_id = _decode_event_id(key_bytes[20:])
253    return partition_id, timestamp, event_id
254
255
256def _encode_event(event):
257    event_sbs = event_to_sbs(event)
258    return sbs_repo.encode('HatEventer.Event', event_sbs)
259
260
261def _decode_event(event_bytes):
262    event_sbs = sbs_repo.decode('HatEventer.Event', event_bytes)
263    event = event_from_sbs(event_sbs)
264
265    if isinstance(event.payload, EventPayloadBinary):
266        event = event._replace(
267            payload=event.payload._replace(data=bytes(event.payload.data)))
268
269    return event
270
271
272def _encode_json(data):
273    return json.encode(data).encode('utf-8')
274
275
276def _decode_json(data_bytes):
277    return json.decode(str(data_bytes, encoding='utf-8'))
default_max_size: int = 1099511627776
class DbType(enum.Enum):
31class DbType(enum.Enum):
32    SYSTEM_SETTINGS = 0
33    SYSTEM_LAST_EVENT_ID = 1
34    SYSTEM_LAST_TIMESTAMP = 2
35    REF = 3
36    LATEST_DATA = 4
37    LATEST_TYPE = 5
38    TIMESERIES_DATA = 6
39    TIMESERIES_PARTITION = 7
40    TIMESERIES_COUNT = 8

An enumeration.

SYSTEM_SETTINGS = <DbType.SYSTEM_SETTINGS: 0>
SYSTEM_LAST_EVENT_ID = <DbType.SYSTEM_LAST_EVENT_ID: 1>
SYSTEM_LAST_TIMESTAMP = <DbType.SYSTEM_LAST_TIMESTAMP: 2>
REF = <DbType.REF: 3>
LATEST_DATA = <DbType.LATEST_DATA: 4>
LATEST_TYPE = <DbType.LATEST_TYPE: 5>
TIMESERIES_DATA = <DbType.TIMESERIES_DATA: 6>
TIMESERIES_PARTITION = <DbType.TIMESERIES_PARTITION: 7>
TIMESERIES_COUNT = <DbType.TIMESERIES_COUNT: 8>
EventTypeRef: TypeAlias = int
PartitionId: TypeAlias = int
TimeseriesKey: TypeAlias = tuple[int, hat.event.common.Timestamp, hat.event.common.EventId]
class SettingsId(enum.Enum):
75class SettingsId(enum.Enum):
76    VERSION = 0
77    IDENTIFIER = 1

An enumeration.

VERSION = <SettingsId.VERSION: 0>
IDENTIFIER = <SettingsId.IDENTIFIER: 1>
class LatestEventRef(typing.NamedTuple):
80class LatestEventRef(typing.NamedTuple):
81    key: EventTypeRef

LatestEventRef(key,)

LatestEventRef(key: int)

Create new instance of LatestEventRef(key,)

key: int

Alias for field number 0

class TimeseriesEventRef(typing.NamedTuple):
84class TimeseriesEventRef(typing.NamedTuple):
85    key: TimeseriesKey

TimeseriesEventRef(key,)

TimeseriesEventRef( key: tuple[int, hat.event.common.Timestamp, hat.event.common.EventId])

Create new instance of TimeseriesEventRef(key,)

Alias for field number 0

EventRef: TypeAlias = LatestEventRef | TimeseriesEventRef
db_defs = {<DbType.SYSTEM_SETTINGS: 0>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.SYSTEM_LAST_EVENT_ID: 1>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.SYSTEM_LAST_TIMESTAMP: 2>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.REF: 3>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.LATEST_DATA: 4>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.LATEST_TYPE: 5>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.TIMESERIES_DATA: 6>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.TIMESERIES_PARTITION: 7>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.TIMESERIES_COUNT: 8>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>)}
def ext_create_env( path: pathlib.Path, max_size: int = 1099511627776, readonly: bool = False) -> Environment:
147def ext_create_env(path: Path,
148                   max_size: int = default_max_size,
149                   readonly: bool = False
150                   ) -> lmdb.Environment:
151    return lmdb.Environment(str(path),
152                            map_size=max_size,
153                            subdir=False,
154                            max_dbs=len(DbType),
155                            readonly=readonly)
def ext_open_db( env: Environment, db_type: DbType, create: bool = True) -> _Database:
158def ext_open_db(env: lmdb.Environment,
159                db_type: DbType,
160                create: bool = True
161                ) -> lmdb._Database:
162    return env.open_db(db_type.name.encode('utf-8'), create=create)
class DbDef(typing.NamedTuple):
45    class DbDef(typing.NamedTuple, typing.Generic[DbKey, DbValue]):
46        encode_key: typing.Callable[[DbKey], util.Bytes]
47        decode_key: typing.Callable[[util.Bytes], DbKey]
48        encode_value: typing.Callable[[DbValue], util.Bytes]
49        decode_value: typing.Callable[[util.Bytes], DbValue]

DbDef(encode_key, decode_key, encode_value, decode_value)

DbDef( encode_key: Callable[[~DbKey], bytes | bytearray | memoryview], decode_key: Callable[[bytes | bytearray | memoryview], ~DbKey], encode_value: Callable[[~DbValue], bytes | bytearray | memoryview], decode_value: Callable[[bytes | bytearray | memoryview], ~DbValue])

Create new instance of DbDef(encode_key, decode_key, encode_value, decode_value)

encode_key: Callable[[~DbKey], bytes | bytearray | memoryview]

Alias for field number 0

decode_key: Callable[[bytes | bytearray | memoryview], ~DbKey]

Alias for field number 1

encode_value: Callable[[~DbValue], bytes | bytearray | memoryview]

Alias for field number 2

decode_value: Callable[[bytes | bytearray | memoryview], ~DbValue]

Alias for field number 3

def create_db_def( key_type: Type, value_type: Type) -> Type[DbDef]:
64    def create_db_def(key_type: typing.Type,
65                      value_type: typing.Type
66                      ) -> typing.Type[DbDef]:
67        return DbDef