hat.event.backends.lmdb.convert.v07

  1from pathlib import Path
  2import enum
  3import itertools
  4import platform
  5import struct
  6import typing
  7
  8import lmdb
  9
 10from hat import json
 11from hat import sbs
 12
 13
 14OrderBy = enum.Enum('OrderBy', [
 15    'TIMESTAMP',
 16    'SOURCE_TIMESTAMP'])
 17
 18
 19EventType: typing.TypeAlias = typing.Tuple[str, ...]
 20
 21
 22class EventId(typing.NamedTuple):
 23    server: int
 24    session: int
 25    instance: int
 26
 27
 28EventPayloadType = enum.Enum('EventPayloadType', [
 29    'BINARY',
 30    'JSON',
 31    'SBS'])
 32
 33
 34class SbsData(typing.NamedTuple):
 35    module: str | None
 36    type: str
 37    data: bytes
 38
 39
 40class EventPayload(typing.NamedTuple):
 41    type: EventPayloadType
 42    data: bytes | json.Data | SbsData
 43
 44
 45class Timestamp(typing.NamedTuple):
 46    s: int
 47    us: int
 48
 49    def add(self, s: float) -> 'Timestamp':
 50        us = self.us + round((s - int(s)) * 1e6)
 51        s = self.s + int(s)
 52        return Timestamp(s=s + us // int(1e6),
 53                         us=us % int(1e6))
 54
 55
 56class Event(typing.NamedTuple):
 57    event_id: EventId
 58    event_type: EventType
 59    timestamp: Timestamp
 60    source_timestamp: Timestamp | None
 61    payload: EventPayload | None
 62
 63
 64class DbType(enum.Enum):
 65    SYSTEM = 0
 66    LATEST_DATA = 1
 67    LATEST_TYPE = 2
 68    ORDERED_DATA = 3
 69    ORDERED_PARTITION = 4
 70    ORDERED_COUNT = 5
 71    REF = 6
 72
 73
 74ServerId: typing.TypeAlias = int
 75EventTypeRef: typing.TypeAlias = int
 76PartitionId: typing.TypeAlias = int
 77
 78
 79class LatestEventRef(typing.NamedTuple):
 80    key: 'LatestDataDbKey'
 81
 82
 83class OrderedEventRef(typing.NamedTuple):
 84    key: 'OrderedDataDbKey'
 85
 86
 87EventRef: typing.TypeAlias = LatestEventRef | OrderedEventRef
 88
 89SystemDbKey = ServerId
 90SystemDbValue = tuple[EventId, Timestamp]
 91
 92LatestDataDbKey = EventTypeRef
 93LatestDataDbValue = Event
 94
 95LatestTypeDbKey = EventTypeRef
 96LatestTypeDbValue = EventType
 97
 98OrderedDataDbKey = tuple[PartitionId, Timestamp, EventId]
 99OrderedDataDbValue = Event
100
101OrderedPartitionDbKey = PartitionId
102OrderedPartitionDbValue = json.Data
103
104OrderedCountDbKey = PartitionId
105OrderedCountDbValue = int
106
107RefDbKey = EventId
108RefDbValue = set[EventRef]
109
110
111def encode_system_db_key(key: SystemDbKey) -> bytes:
112    return _encode_uint(key)
113
114
115def decode_system_db_key(key_bytes: bytes) -> SystemDbKey:
116    key, _ = _decode_uint(key_bytes)
117    return key
118
119
120def encode_system_db_value(value: SystemDbValue) -> bytes:
121    event_id, timestamp = value
122    return _encode_event_id(event_id) + _encode_timestamp(timestamp)
123
124
125def decode_system_db_value(value_bytes: bytes) -> SystemDbValue:
126    event_id, rest = _decode_event_id(value_bytes)
127    timestamp, _ = _decode_timestamp(rest)
128    return event_id, timestamp
129
130
131def encode_latest_data_db_key(key: LatestDataDbKey) -> bytes:
132    return _encode_uint(key)
133
134
135def decode_latest_data_db_key(key_bytes: bytes) -> LatestDataDbKey:
136    key, _ = _decode_uint(key_bytes)
137    return key
138
139
140def encode_latest_data_db_value(value: LatestDataDbValue) -> bytes:
141    return _encode_event(value)
142
143
144def decode_latest_data_db_value(value_bytes: bytes) -> LatestDataDbValue:
145    return _decode_event(value_bytes)
146
147
148def encode_latest_type_db_key(key: LatestTypeDbKey) -> bytes:
149    return _encode_uint(key)
150
151
152def decode_latest_type_db_key(key_bytes: bytes) -> LatestTypeDbKey:
153    key, _ = _decode_uint(key_bytes)
154    return key
155
156
157def encode_latest_type_db_value(value: OrderedDataDbKey) -> bytes:
158    return _encode_json(list(value))
159
160
161def decode_latest_type_db_value(value_bytes: bytes) -> OrderedDataDbKey:
162    return tuple(_decode_json(value_bytes))
163
164
165def encode_ordered_data_db_key(key: OrderedDataDbKey) -> bytes:
166    partition_id, timestamp, event_id = key
167    return (_encode_uint(partition_id) +
168            _encode_timestamp(timestamp) +
169            _encode_event_id(event_id))
170
171
172def decode_ordered_data_db_key(key_bytes: bytes) -> OrderedDataDbKey:
173    partition_id, rest = _decode_uint(key_bytes)
174    timestamp, rest = _decode_timestamp(rest)
175    event_id, _ = _decode_event_id(rest)
176    return partition_id, timestamp, event_id
177
178
179def encode_ordered_data_db_value(value: OrderedDataDbValue) -> bytes:
180    return _encode_event(value)
181
182
183def decode_ordered_data_db_value(value_bytes: bytes) -> OrderedDataDbValue:
184    return _decode_event(value_bytes)
185
186
187def encode_ordered_partition_db_key(key: OrderedPartitionDbKey) -> bytes:
188    return _encode_uint(key)
189
190
191def decode_ordered_partition_db_key(key_bytes: bytes) -> OrderedPartitionDbKey:
192    key, _ = _decode_uint(key_bytes)
193    return key
194
195
196def encode_ordered_partition_db_value(value: OrderedPartitionDbValue) -> bytes:
197    return _encode_json(value)
198
199
200def decode_ordered_partition_db_value(value_bytes: bytes
201                                      ) -> OrderedPartitionDbValue:
202    return _decode_json(value_bytes)
203
204
205def encode_ordered_count_db_key(key: OrderedCountDbKey) -> bytes:
206    return _encode_uint(key)
207
208
209def decode_ordered_count_db_key(key_bytes: bytes) -> OrderedCountDbKey:
210    key, _ = _decode_uint(key_bytes)
211    return key
212
213
214def encode_ordered_count_db_value(value: OrderedCountDbValue) -> bytes:
215    return _encode_uint(value)
216
217
218def decode_ordered_count_db_value(value_bytes: bytes) -> OrderedCountDbValue:
219    value, _ = _decode_uint(value_bytes)
220    return value
221
222
223def encode_ref_db_key(key: RefDbKey) -> bytes:
224    return _encode_event_id(key)
225
226
227def decode_ref_db_key(key_bytes: bytes) -> RefDbKey:
228    event_id, _ = _decode_event_id(key_bytes)
229    return event_id
230
231
232def encode_ref_db_value(value: RefDbValue) -> bytes:
233    return bytes(itertools.chain.from_iterable(
234        _encode_event_ref(ref) for ref in value))
235
236
237def decode_ref_db_value(value_bytes: bytes) -> RefDbValue:
238    refs = set()
239    while value_bytes:
240        ref, value_bytes = _decode_event_ref(value_bytes)
241        refs.add(ref)
242    return refs
243
244
245def open_db(env: lmdb.Environment,
246            db_type: DbType
247            ) -> lmdb._Database:
248    return env.open_db(db_type.name.encode('utf-8'))
249
250
251def create_env(path: Path) -> lmdb.Environment:
252    max_dbs = len(DbType)
253    max_db_size = (512 * 1024 * 1024 * 1024
254                   if platform.architecture()[0] == '64bit'
255                   else 1024 * 1024 * 1024)
256    return lmdb.Environment(str(path),
257                            map_size=max_db_size,
258                            subdir=False,
259                            max_dbs=max_dbs)
260
261
262_sbs_repo = sbs.Repository(r"""
263module HatEventer
264
265MsgSubscribe = Array(EventType)
266
267MsgNotify = Array(Event)
268
269MsgRegisterReq = Array(RegisterEvent)
270
271MsgRegisterRes = Array(Choice {
272    event:    Event
273    failure:  None
274})
275
276MsgQueryReq = QueryData
277
278MsgQueryRes = Array(Event)
279
280Timestamp = Record {
281    s:   Integer
282    us:  Integer
283}
284
285EventId = Record {
286    server:    Integer
287    session:   Integer
288    instance:  Integer
289}
290
291Order = Choice {
292    descending:  None
293    ascending:   None
294}
295
296OrderBy = Choice {
297    timestamp:        None
298    sourceTimestamp:  None
299}
300
301EventType = Array(String)
302
303EventPayload = Choice {
304    binary:  Bytes
305    json:    String
306    sbs:     Record {
307        module:  Optional(String)
308        type:    String
309        data:    Bytes
310    }
311}
312
313Event = Record {
314    id:               EventId
315    type:             EventType
316    timestamp:        Timestamp
317    sourceTimestamp:  Optional(Timestamp)
318    payload:          Optional(EventPayload)
319}
320
321RegisterEvent = Record {
322    type:             EventType
323    sourceTimestamp:  Optional(Timestamp)
324    payload:          Optional(EventPayload)
325}
326
327QueryData = Record {
328    serverId:           Optional(Integer)
329    ids:                Optional(Array(EventId))
330    types:              Optional(Array(EventType))
331    tFrom:              Optional(Timestamp)
332    tTo:                Optional(Timestamp)
333    sourceTFrom:        Optional(Timestamp)
334    sourceTTo:          Optional(Timestamp)
335    payload:            Optional(EventPayload)
336    order:              Order
337    orderBy:            OrderBy
338    uniqueType:         Boolean
339    maxResults:         Optional(Integer)
340}
341""")
342
343
344def _event_to_sbs(event):
345    return {
346        'id': _event_id_to_sbs(event.event_id),
347        'type': list(event.event_type),
348        'timestamp': _timestamp_to_sbs(event.timestamp),
349        'sourceTimestamp': _optional_to_sbs(event.source_timestamp,
350                                            _timestamp_to_sbs),
351        'payload': _optional_to_sbs(event.payload, _event_payload_to_sbs)}
352
353
354def _event_from_sbs(data):
355    return Event(
356        event_id=_event_id_from_sbs(data['id']),
357        event_type=tuple(data['type']),
358        timestamp=_timestamp_from_sbs(data['timestamp']),
359        source_timestamp=_optional_from_sbs(data['sourceTimestamp'],
360                                            _timestamp_from_sbs),
361        payload=_optional_from_sbs(data['payload'], _event_payload_from_sbs))
362
363
364def _event_id_to_sbs(event_id):
365    return {'server': event_id.server,
366            'session': event_id.session,
367            'instance': event_id.instance}
368
369
370def _event_id_from_sbs(data):
371    return EventId(server=data['server'],
372                   session=data['session'],
373                   instance=data['instance'])
374
375
376def _timestamp_to_sbs(t):
377    return {'s': t.s, 'us': t.us}
378
379
380def _timestamp_from_sbs(data):
381    return Timestamp(s=data['s'], us=data['us'])
382
383
384def _event_payload_to_sbs(payload):
385    if payload.type == EventPayloadType.BINARY:
386        return 'binary', payload.data
387
388    if payload.type == EventPayloadType.JSON:
389        return 'json', json.encode(payload.data)
390
391    if payload.type == EventPayloadType.SBS:
392        return 'sbs', _sbs_data_to_sbs(payload.data)
393
394    raise ValueError('unsupported payload type')
395
396
397def _event_payload_from_sbs(data):
398    data_type, data_data = data
399
400    if data_type == 'binary':
401        return EventPayload(type=EventPayloadType.BINARY,
402                            data=data_data)
403
404    if data_type == 'json':
405        return EventPayload(type=EventPayloadType.JSON,
406                            data=json.decode(data_data))
407
408    if data_type == 'sbs':
409        return EventPayload(type=EventPayloadType.SBS,
410                            data=_sbs_data_from_sbs(data_data))
411
412    raise ValueError('unsupported payload type')
413
414
415def _sbs_data_to_sbs(data):
416    return {'module': _optional_to_sbs(data.module),
417            'type': data.type,
418            'data': data.data}
419
420
421def _sbs_data_from_sbs(data):
422    return SbsData(module=_optional_from_sbs(data['module']),
423                   type=data['type'],
424                   data=data['data'])
425
426
427def _optional_to_sbs(value, fn=lambda i: i):
428    return ('value', fn(value)) if value is not None else ('none', None)
429
430
431def _optional_from_sbs(data, fn=lambda i: i):
432    return fn(data[1]) if data[0] == 'value' else None
433
434
435def _encode_uint(value):
436    return struct.pack(">Q", value)
437
438
439def _decode_uint(value_bytes):
440    return struct.unpack(">Q", value_bytes[:8])[0], value_bytes[8:]
441
442
443def _encode_event_id(event_id):
444    return struct.pack(">QQQ", event_id.server, event_id.session,
445                       event_id.instance)
446
447
448def _decode_event_id(event_id_bytes):
449    server_id, session_id, instance_id = struct.unpack(">QQQ",
450                                                       event_id_bytes[:24])
451    event_id = EventId(server=server_id,
452                       session=session_id,
453                       instance=instance_id)
454    return event_id, event_id_bytes[24:]
455
456
457def _encode_timestamp(timestamp):
458    return struct.pack(">QI", timestamp.s + (1 << 63), timestamp.us)
459
460
461def _decode_timestamp(timestamp_bytes):
462    s, us = struct.unpack(">QI", timestamp_bytes[:12])
463    return Timestamp(s - (1 << 63), us), timestamp_bytes[12:]
464
465
466def _encode_event(event):
467    event_sbs = _event_to_sbs(event)
468    return _sbs_repo.encode('HatEventer.Event', event_sbs)
469
470
471def _decode_event(event_bytes):
472    event_sbs = _sbs_repo.decode('HatEventer.Event', event_bytes)
473    return _event_from_sbs(event_sbs)
474
475
476def _encode_json(data):
477    return json.encode(data).encode('utf-8')
478
479
480def _decode_json(data_bytes):
481    return json.decode(str(data_bytes, encoding='utf-8'))
482
483
484def _encode_event_ref(ref):
485    if isinstance(ref, LatestEventRef):
486        yield DbType.LATEST_DATA.value
487        yield from encode_latest_data_db_key(ref.key)
488
489    elif isinstance(ref, OrderedEventRef):
490        yield DbType.ORDERED_DATA.value
491        yield from encode_ordered_data_db_key(ref.key)
492
493    else:
494        raise ValueError('unsupported event reference')
495
496
497def _decode_event_ref(ref_bytes):
498    db_type, rest = DbType(ref_bytes[0]), ref_bytes[1:]
499
500    if db_type == DbType.LATEST_DATA:
501        ref = LatestEventRef(decode_latest_data_db_key(rest[:8]))
502        return ref, rest[8:]
503
504    if db_type == DbType.ORDERED_DATA:
505        ref = OrderedEventRef(decode_ordered_data_db_key(rest[:44]))
506        return ref, rest[44:]
507
508    raise ValueError('unsupported database type')
class OrderBy(enum.Enum):

An enumeration.

TIMESTAMP = <OrderBy.TIMESTAMP: 1>
SOURCE_TIMESTAMP = <OrderBy.SOURCE_TIMESTAMP: 2>
EventType: TypeAlias = Tuple[str, ...]
class EventId(typing.NamedTuple):
23class EventId(typing.NamedTuple):
24    server: int
25    session: int
26    instance: int

EventId(server, session, instance)

EventId(server: int, session: int, instance: int)

Create new instance of EventId(server, session, instance)

server: int

Alias for field number 0

session: int

Alias for field number 1

instance: int

Alias for field number 2

class EventPayloadType(enum.Enum):

An enumeration.

BINARY = <EventPayloadType.BINARY: 1>
JSON = <EventPayloadType.JSON: 2>
class SbsData(typing.NamedTuple):
35class SbsData(typing.NamedTuple):
36    module: str | None
37    type: str
38    data: bytes

SbsData(module, type, data)

SbsData(module: str | None, type: str, data: bytes)

Create new instance of SbsData(module, type, data)

module: str | None

Alias for field number 0

type: str

Alias for field number 1

data: bytes

Alias for field number 2

class EventPayload(typing.NamedTuple):
41class EventPayload(typing.NamedTuple):
42    type: EventPayloadType
43    data: bytes | json.Data | SbsData

EventPayload(type, data)

EventPayload( type: EventPayloadType, data: Union[bytes, NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], SbsData])

Create new instance of EventPayload(type, data)

Alias for field number 0

data: Union[bytes, NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], SbsData]

Alias for field number 1

class Timestamp(typing.NamedTuple):
46class Timestamp(typing.NamedTuple):
47    s: int
48    us: int
49
50    def add(self, s: float) -> 'Timestamp':
51        us = self.us + round((s - int(s)) * 1e6)
52        s = self.s + int(s)
53        return Timestamp(s=s + us // int(1e6),
54                         us=us % int(1e6))

Timestamp(s, us)

Timestamp(s: int, us: int)

Create new instance of Timestamp(s, us)

s: int

Alias for field number 0

us: int

Alias for field number 1

def add(self, s: float) -> Timestamp:
50    def add(self, s: float) -> 'Timestamp':
51        us = self.us + round((s - int(s)) * 1e6)
52        s = self.s + int(s)
53        return Timestamp(s=s + us // int(1e6),
54                         us=us % int(1e6))
class Event(typing.NamedTuple):
57class Event(typing.NamedTuple):
58    event_id: EventId
59    event_type: EventType
60    timestamp: Timestamp
61    source_timestamp: Timestamp | None
62    payload: EventPayload | None

Event(event_id, event_type, timestamp, source_timestamp, payload)

Event( event_id: EventId, event_type: Tuple[str, ...], timestamp: Timestamp, source_timestamp: Timestamp | None, payload: EventPayload | None)

Create new instance of Event(event_id, event_type, timestamp, source_timestamp, payload)

event_id: EventId

Alias for field number 0

event_type: Tuple[str, ...]

Alias for field number 1

timestamp: Timestamp

Alias for field number 2

source_timestamp: Timestamp | None

Alias for field number 3

payload: EventPayload | None

Alias for field number 4

class DbType(enum.Enum):
65class DbType(enum.Enum):
66    SYSTEM = 0
67    LATEST_DATA = 1
68    LATEST_TYPE = 2
69    ORDERED_DATA = 3
70    ORDERED_PARTITION = 4
71    ORDERED_COUNT = 5
72    REF = 6

An enumeration.

SYSTEM = <DbType.SYSTEM: 0>
LATEST_DATA = <DbType.LATEST_DATA: 1>
LATEST_TYPE = <DbType.LATEST_TYPE: 2>
ORDERED_DATA = <DbType.ORDERED_DATA: 3>
ORDERED_PARTITION = <DbType.ORDERED_PARTITION: 4>
ORDERED_COUNT = <DbType.ORDERED_COUNT: 5>
REF = <DbType.REF: 6>
ServerId: TypeAlias = int
EventTypeRef: TypeAlias = int
PartitionId: TypeAlias = int
class LatestEventRef(typing.NamedTuple):
80class LatestEventRef(typing.NamedTuple):
81    key: 'LatestDataDbKey'

LatestEventRef(key,)

LatestEventRef(key: ForwardRef('LatestDataDbKey'))

Create new instance of LatestEventRef(key,)

key: int

Alias for field number 0

class OrderedEventRef(typing.NamedTuple):
84class OrderedEventRef(typing.NamedTuple):
85    key: 'OrderedDataDbKey'

OrderedEventRef(key,)

OrderedEventRef(key: ForwardRef('OrderedDataDbKey'))

Create new instance of OrderedEventRef(key,)

key: tuple[int, Timestamp, EventId]

Alias for field number 0

EventRef: TypeAlias = LatestEventRef | OrderedEventRef
SystemDbKey = <class 'int'>
SystemDbValue = tuple[EventId, Timestamp]
LatestDataDbKey = <class 'int'>
LatestDataDbValue = <class 'Event'>
LatestTypeDbKey = <class 'int'>
LatestTypeDbValue = typing.Tuple[str, ...]
OrderedDataDbKey = tuple[int, Timestamp, EventId]
OrderedDataDbValue = <class 'Event'>
OrderedPartitionDbKey = <class 'int'>
OrderedPartitionDbValue = typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]
OrderedCountDbKey = <class 'int'>
OrderedCountDbValue = <class 'int'>
RefDbKey = <class 'EventId'>
RefDbValue = set[LatestEventRef | OrderedEventRef]
def encode_system_db_key(key: int) -> bytes:
112def encode_system_db_key(key: SystemDbKey) -> bytes:
113    return _encode_uint(key)
def decode_system_db_key(key_bytes: bytes) -> int:
116def decode_system_db_key(key_bytes: bytes) -> SystemDbKey:
117    key, _ = _decode_uint(key_bytes)
118    return key
def encode_system_db_value( value: tuple[EventId, Timestamp]) -> bytes:
121def encode_system_db_value(value: SystemDbValue) -> bytes:
122    event_id, timestamp = value
123    return _encode_event_id(event_id) + _encode_timestamp(timestamp)
def decode_system_db_value( value_bytes: bytes) -> tuple[EventId, Timestamp]:
126def decode_system_db_value(value_bytes: bytes) -> SystemDbValue:
127    event_id, rest = _decode_event_id(value_bytes)
128    timestamp, _ = _decode_timestamp(rest)
129    return event_id, timestamp
def encode_latest_data_db_key(key: int) -> bytes:
132def encode_latest_data_db_key(key: LatestDataDbKey) -> bytes:
133    return _encode_uint(key)
def decode_latest_data_db_key(key_bytes: bytes) -> int:
136def decode_latest_data_db_key(key_bytes: bytes) -> LatestDataDbKey:
137    key, _ = _decode_uint(key_bytes)
138    return key
def encode_latest_data_db_value(value: Event) -> bytes:
141def encode_latest_data_db_value(value: LatestDataDbValue) -> bytes:
142    return _encode_event(value)
def decode_latest_data_db_value(value_bytes: bytes) -> Event:
145def decode_latest_data_db_value(value_bytes: bytes) -> LatestDataDbValue:
146    return _decode_event(value_bytes)
def encode_latest_type_db_key(key: int) -> bytes:
149def encode_latest_type_db_key(key: LatestTypeDbKey) -> bytes:
150    return _encode_uint(key)
def decode_latest_type_db_key(key_bytes: bytes) -> int:
153def decode_latest_type_db_key(key_bytes: bytes) -> LatestTypeDbKey:
154    key, _ = _decode_uint(key_bytes)
155    return key
def encode_latest_type_db_value( value: tuple[int, Timestamp, EventId]) -> bytes:
158def encode_latest_type_db_value(value: OrderedDataDbKey) -> bytes:
159    return _encode_json(list(value))
def decode_latest_type_db_value( value_bytes: bytes) -> tuple[int, Timestamp, EventId]:
162def decode_latest_type_db_value(value_bytes: bytes) -> OrderedDataDbKey:
163    return tuple(_decode_json(value_bytes))
def encode_ordered_data_db_key( key: tuple[int, Timestamp, EventId]) -> bytes:
166def encode_ordered_data_db_key(key: OrderedDataDbKey) -> bytes:
167    partition_id, timestamp, event_id = key
168    return (_encode_uint(partition_id) +
169            _encode_timestamp(timestamp) +
170            _encode_event_id(event_id))
def decode_ordered_data_db_key( key_bytes: bytes) -> tuple[int, Timestamp, EventId]:
173def decode_ordered_data_db_key(key_bytes: bytes) -> OrderedDataDbKey:
174    partition_id, rest = _decode_uint(key_bytes)
175    timestamp, rest = _decode_timestamp(rest)
176    event_id, _ = _decode_event_id(rest)
177    return partition_id, timestamp, event_id
def encode_ordered_data_db_value(value: Event) -> bytes:
180def encode_ordered_data_db_value(value: OrderedDataDbValue) -> bytes:
181    return _encode_event(value)
def decode_ordered_data_db_value(value_bytes: bytes) -> Event:
184def decode_ordered_data_db_value(value_bytes: bytes) -> OrderedDataDbValue:
185    return _decode_event(value_bytes)
def encode_ordered_partition_db_key(key: int) -> bytes:
188def encode_ordered_partition_db_key(key: OrderedPartitionDbKey) -> bytes:
189    return _encode_uint(key)
def decode_ordered_partition_db_key(key_bytes: bytes) -> int:
192def decode_ordered_partition_db_key(key_bytes: bytes) -> OrderedPartitionDbKey:
193    key, _ = _decode_uint(key_bytes)
194    return key
def encode_ordered_partition_db_value( value: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]) -> bytes:
197def encode_ordered_partition_db_value(value: OrderedPartitionDbValue) -> bytes:
198    return _encode_json(value)
def decode_ordered_partition_db_value( value_bytes: bytes) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
201def decode_ordered_partition_db_value(value_bytes: bytes
202                                      ) -> OrderedPartitionDbValue:
203    return _decode_json(value_bytes)
def encode_ordered_count_db_key(key: int) -> bytes:
206def encode_ordered_count_db_key(key: OrderedCountDbKey) -> bytes:
207    return _encode_uint(key)
def decode_ordered_count_db_key(key_bytes: bytes) -> int:
210def decode_ordered_count_db_key(key_bytes: bytes) -> OrderedCountDbKey:
211    key, _ = _decode_uint(key_bytes)
212    return key
def encode_ordered_count_db_value(value: int) -> bytes:
215def encode_ordered_count_db_value(value: OrderedCountDbValue) -> bytes:
216    return _encode_uint(value)
def decode_ordered_count_db_value(value_bytes: bytes) -> int:
219def decode_ordered_count_db_value(value_bytes: bytes) -> OrderedCountDbValue:
220    value, _ = _decode_uint(value_bytes)
221    return value
def encode_ref_db_key(key: EventId) -> bytes:
224def encode_ref_db_key(key: RefDbKey) -> bytes:
225    return _encode_event_id(key)
def decode_ref_db_key(key_bytes: bytes) -> EventId:
228def decode_ref_db_key(key_bytes: bytes) -> RefDbKey:
229    event_id, _ = _decode_event_id(key_bytes)
230    return event_id
def encode_ref_db_value( value: set[LatestEventRef | OrderedEventRef]) -> bytes:
233def encode_ref_db_value(value: RefDbValue) -> bytes:
234    return bytes(itertools.chain.from_iterable(
235        _encode_event_ref(ref) for ref in value))
def decode_ref_db_value( value_bytes: bytes) -> set[LatestEventRef | OrderedEventRef]:
238def decode_ref_db_value(value_bytes: bytes) -> RefDbValue:
239    refs = set()
240    while value_bytes:
241        ref, value_bytes = _decode_event_ref(value_bytes)
242        refs.add(ref)
243    return refs
def open_db( env: Environment, db_type: DbType) -> _Database:
246def open_db(env: lmdb.Environment,
247            db_type: DbType
248            ) -> lmdb._Database:
249    return env.open_db(db_type.name.encode('utf-8'))
def create_env(path: pathlib.Path) -> Environment:
252def create_env(path: Path) -> lmdb.Environment:
253    max_dbs = len(DbType)
254    max_db_size = (512 * 1024 * 1024 * 1024
255                   if platform.architecture()[0] == '64bit'
256                   else 1024 * 1024 * 1024)
257    return lmdb.Environment(str(path),
258                            map_size=max_db_size,
259                            subdir=False,
260                            max_dbs=max_dbs)