hat.event.backends.lmdb.common
1from hat.event.common import * # NOQA 2 3from pathlib import Path 4import enum 5import itertools 6import platform 7import struct 8import sys 9import typing 10 11import lmdb 12 13from hat import json 14from hat import util 15 16from hat.event.common import (ServerId, Event, EventId, EventType, 17 Timestamp, EventPayloadBinary, 18 sbs_repo, event_to_sbs, event_from_sbs) 19 20 21default_max_size: int = (1024 * 1024 * 1024 * 1024 22 if platform.architecture()[0] == '64bit' 23 else 2 * 1024 * 1024 * 1024) 24 25 26DbKey = typing.TypeVar('DbKey') 27DbValue = typing.TypeVar('DbValue') 28 29 30class DbType(enum.Enum): 31 SYSTEM_SETTINGS = 0 32 SYSTEM_LAST_EVENT_ID = 1 33 SYSTEM_LAST_TIMESTAMP = 2 34 REF = 3 35 LATEST_DATA = 4 36 LATEST_TYPE = 5 37 TIMESERIES_DATA = 6 38 TIMESERIES_PARTITION = 7 39 TIMESERIES_COUNT = 8 40 41 42if sys.version_info[:2] >= (3, 11): 43 44 class DbDef(typing.NamedTuple, typing.Generic[DbKey, DbValue]): 45 encode_key: typing.Callable[[DbKey], util.Bytes] 46 decode_key: typing.Callable[[util.Bytes], DbKey] 47 encode_value: typing.Callable[[DbValue], util.Bytes] 48 decode_value: typing.Callable[[util.Bytes], DbValue] 49 50 def create_db_def(key_type: typing.Type, 51 value_type: typing.Type 52 ) -> typing.Type[DbDef]: 53 return DbDef[key_type, value_type] 54 55else: 56 57 class DbDef(typing.NamedTuple): 58 encode_key: typing.Callable[[DbKey], util.Bytes] 59 decode_key: typing.Callable[[util.Bytes], DbKey] 60 encode_value: typing.Callable[[DbValue], util.Bytes] 61 decode_value: typing.Callable[[util.Bytes], DbValue] 62 63 def create_db_def(key_type: typing.Type, 64 value_type: typing.Type 65 ) -> typing.Type[DbDef]: 66 return DbDef 67 68 69EventTypeRef: typing.TypeAlias = int 70PartitionId: typing.TypeAlias = int 71TimeseriesKey: typing.TypeAlias = tuple[PartitionId, Timestamp, EventId] 72 73 74class SettingsId(enum.Enum): 75 VERSION = 0 76 IDENTIFIER = 1 77 78 79class LatestEventRef(typing.NamedTuple): 80 key: EventTypeRef 81 82 83class TimeseriesEventRef(typing.NamedTuple): 84 key: TimeseriesKey 85 86 87EventRef: typing.TypeAlias = LatestEventRef | TimeseriesEventRef 88 89 90db_defs = { 91 DbType.SYSTEM_SETTINGS: create_db_def(SettingsId, json.Data)( 92 encode_key=lambda key: _encode_uint(key.value), 93 decode_key=lambda key_bytes: SettingsId(_decode_uint(key_bytes)), 94 encode_value=lambda value: _encode_json(value), 95 decode_value=lambda value_bytes: _decode_json(value_bytes)), 96 97 DbType.SYSTEM_LAST_EVENT_ID: create_db_def(ServerId, EventId)( 98 encode_key=lambda key: _encode_uint(key), 99 decode_key=lambda key_bytes: _decode_uint(key_bytes), 100 encode_value=lambda value: _encode_event_id(value), 101 decode_value=lambda value_bytes: _decode_event_id(value_bytes)), 102 103 DbType.SYSTEM_LAST_TIMESTAMP: create_db_def(ServerId, Timestamp)( 104 encode_key=lambda key: _encode_uint(key), 105 decode_key=lambda key_bytes: _decode_uint(key_bytes), 106 encode_value=lambda value: _encode_timestamp(value), 107 decode_value=lambda value_bytes: _decode_timestamp(value_bytes)), 108 109 DbType.REF: create_db_def(EventId, set[EventRef])( 110 encode_key=lambda key: _encode_event_id(key), 111 decode_key=lambda key_bytes: _decode_event_id(key_bytes), 112 encode_value=lambda value: bytes(_encode_event_refs(value)), 113 decode_value=lambda value_bytes: set(_decode_event_refs(value_bytes))), 114 115 DbType.LATEST_DATA: create_db_def(EventTypeRef, Event)( 116 encode_key=lambda key: _encode_uint(key), 117 decode_key=lambda key_bytes: _decode_uint(key_bytes), 118 encode_value=lambda value: _encode_event(value), 119 decode_value=lambda value_bytes: _decode_event(value_bytes)), 120 121 DbType.LATEST_TYPE: create_db_def(EventTypeRef, EventType)( 122 encode_key=lambda key: _encode_uint(key), 123 decode_key=lambda key_bytes: _decode_uint(key_bytes), 124 encode_value=lambda value: _encode_json(list(value)), 125 decode_value=lambda value_bytes: tuple(_decode_json(value_bytes))), 126 127 DbType.TIMESERIES_DATA: create_db_def(TimeseriesKey, Event)( 128 encode_key=lambda key: _encode_timeseries_key(key), 129 decode_key=lambda key_bytes: _decode_timeseries_key(key_bytes), 130 encode_value=lambda value: _encode_event(value), 131 decode_value=lambda value_bytes: _decode_event(value_bytes)), 132 133 DbType.TIMESERIES_PARTITION: create_db_def(PartitionId, json.Data)( 134 encode_key=lambda key: _encode_uint(key), 135 decode_key=lambda key_bytes: _decode_uint(key_bytes), 136 encode_value=lambda value: _encode_json(value), 137 decode_value=lambda value_bytes: _decode_json(value_bytes)), 138 139 DbType.TIMESERIES_COUNT: create_db_def(PartitionId, int)( 140 encode_key=lambda key: _encode_uint(key), 141 decode_key=lambda key_bytes: _decode_uint(key_bytes), 142 encode_value=lambda value: _encode_uint(value), 143 decode_value=lambda value_bytes: _decode_uint(value_bytes))} 144 145 146def ext_create_env(path: Path, 147 max_size: int = default_max_size, 148 readonly: bool = False 149 ) -> lmdb.Environment: 150 return lmdb.Environment(str(path), 151 map_size=max_size, 152 subdir=False, 153 max_dbs=len(DbType), 154 readonly=readonly) 155 156 157def ext_open_db(env: lmdb.Environment, 158 db_type: DbType, 159 create: bool = True 160 ) -> lmdb._Database: 161 return env.open_db(db_type.name.encode('utf-8'), create=create) 162 163 164def _encode_uint(value): 165 return struct.pack(">Q", value) 166 167 168def _decode_uint(value_bytes): 169 return struct.unpack(">Q", value_bytes)[0] 170 171 172def _encode_event_id(event_id): 173 return struct.pack(">QQQ", event_id.server, event_id.session, 174 event_id.instance) 175 176 177def _decode_event_id(event_id_bytes): 178 server_id, session_id, instance_id = struct.unpack(">QQQ", event_id_bytes) 179 return EventId(server=server_id, 180 session=session_id, 181 instance=instance_id) 182 183 184def _encode_timestamp(timestamp): 185 return struct.pack(">QI", timestamp.s + (1 << 63), timestamp.us) 186 187 188def _decode_timestamp(timestamp_bytes): 189 s, us = struct.unpack(">QI", timestamp_bytes) 190 return Timestamp(s - (1 << 63), us) 191 192 193def _encode_event_ref(ref): 194 if isinstance(ref, LatestEventRef): 195 db_type = DbType.LATEST_DATA 196 197 elif isinstance(ref, TimeseriesEventRef): 198 db_type = DbType.TIMESERIES_DATA 199 200 else: 201 raise ValueError('unsupported event reference') 202 203 return bytes([db_type.value]) + db_defs[db_type].encode_key(ref.key) 204 205 206def _decode_event_ref(ref_bytes): 207 db_type = DbType(ref_bytes[0]) 208 key = db_defs[db_type].decode_key(ref_bytes[1:]) 209 210 if db_type == DbType.LATEST_DATA: 211 return LatestEventRef(key) 212 213 if db_type == DbType.TIMESERIES_DATA: 214 return TimeseriesEventRef(key) 215 216 raise ValueError('unsupported database type') 217 218 219def _encode_event_refs(refs): 220 return bytes(itertools.chain.from_iterable(_encode_event_ref(ref) 221 for ref in refs)) 222 223 224def _decode_event_refs(refs_bytes): 225 while refs_bytes: 226 db_type = DbType(refs_bytes[0]) 227 228 if db_type == DbType.LATEST_DATA: 229 ref_key_len = 8 230 231 elif db_type == DbType.TIMESERIES_DATA: 232 ref_key_len = 44 233 234 else: 235 raise ValueError('unsupported event reference') 236 237 ref, refs_bytes = (_decode_event_ref(refs_bytes[:ref_key_len+1]), 238 refs_bytes[ref_key_len+1:]) 239 yield ref 240 241 242def _encode_timeseries_key(key): 243 partition_id, timestamp, event_id = key 244 return bytes(itertools.chain(_encode_uint(partition_id), 245 _encode_timestamp(timestamp), 246 _encode_event_id(event_id))) 247 248 249def _decode_timeseries_key(key_bytes): 250 partition_id = _decode_uint(key_bytes[:8]) 251 timestamp = _decode_timestamp(key_bytes[8:20]) 252 event_id = _decode_event_id(key_bytes[20:]) 253 return partition_id, timestamp, event_id 254 255 256def _encode_event(event): 257 event_sbs = event_to_sbs(event) 258 return sbs_repo.encode('HatEventer.Event', event_sbs) 259 260 261def _decode_event(event_bytes): 262 event_sbs = sbs_repo.decode('HatEventer.Event', event_bytes) 263 event = event_from_sbs(event_sbs) 264 265 if isinstance(event.payload, EventPayloadBinary): 266 event = event._replace( 267 payload=event.payload._replace(data=bytes(event.payload.data))) 268 269 return event 270 271 272def _encode_json(data): 273 return json.encode(data).encode('utf-8') 274 275 276def _decode_json(data_bytes): 277 return json.decode(str(data_bytes, encoding='utf-8'))
default_max_size: int =
1099511627776
class
DbType(enum.Enum):
31class DbType(enum.Enum): 32 SYSTEM_SETTINGS = 0 33 SYSTEM_LAST_EVENT_ID = 1 34 SYSTEM_LAST_TIMESTAMP = 2 35 REF = 3 36 LATEST_DATA = 4 37 LATEST_TYPE = 5 38 TIMESERIES_DATA = 6 39 TIMESERIES_PARTITION = 7 40 TIMESERIES_COUNT = 8
An enumeration.
SYSTEM_SETTINGS =
<DbType.SYSTEM_SETTINGS: 0>
SYSTEM_LAST_EVENT_ID =
<DbType.SYSTEM_LAST_EVENT_ID: 1>
SYSTEM_LAST_TIMESTAMP =
<DbType.SYSTEM_LAST_TIMESTAMP: 2>
REF =
<DbType.REF: 3>
LATEST_DATA =
<DbType.LATEST_DATA: 4>
LATEST_TYPE =
<DbType.LATEST_TYPE: 5>
TIMESERIES_DATA =
<DbType.TIMESERIES_DATA: 6>
TIMESERIES_PARTITION =
<DbType.TIMESERIES_PARTITION: 7>
TIMESERIES_COUNT =
<DbType.TIMESERIES_COUNT: 8>
EventTypeRef: TypeAlias =
int
PartitionId: TypeAlias =
int
class
SettingsId(enum.Enum):
An enumeration.
VERSION =
<SettingsId.VERSION: 0>
IDENTIFIER =
<SettingsId.IDENTIFIER: 1>
class
LatestEventRef(typing.NamedTuple):
LatestEventRef(key,)
class
TimeseriesEventRef(typing.NamedTuple):
TimeseriesEventRef(key,)
TimeseriesEventRef( key: tuple[int, hat.event.common.Timestamp, hat.event.common.EventId])
Create new instance of TimeseriesEventRef(key,)
db_defs =
{<DbType.SYSTEM_SETTINGS: 0>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.SYSTEM_LAST_EVENT_ID: 1>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.SYSTEM_LAST_TIMESTAMP: 2>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.REF: 3>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.LATEST_DATA: 4>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.LATEST_TYPE: 5>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.TIMESERIES_DATA: 6>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.TIMESERIES_PARTITION: 7>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.TIMESERIES_COUNT: 8>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>)}
def
ext_create_env( path: pathlib.Path, max_size: int = 1099511627776, readonly: bool = False) -> Environment:
class
DbDef(typing.NamedTuple):
45 class DbDef(typing.NamedTuple, typing.Generic[DbKey, DbValue]): 46 encode_key: typing.Callable[[DbKey], util.Bytes] 47 decode_key: typing.Callable[[util.Bytes], DbKey] 48 encode_value: typing.Callable[[DbValue], util.Bytes] 49 decode_value: typing.Callable[[util.Bytes], DbValue]
DbDef(encode_key, decode_key, encode_value, decode_value)
DbDef( encode_key: Callable[[~DbKey], bytes | bytearray | memoryview], decode_key: Callable[[bytes | bytearray | memoryview], ~DbKey], encode_value: Callable[[~DbValue], bytes | bytearray | memoryview], decode_value: Callable[[bytes | bytearray | memoryview], ~DbValue])
Create new instance of DbDef(encode_key, decode_key, encode_value, decode_value)