hat.event.backends.lmdb.convert.v09

 1import typing
 2
 3import lmdb
 4
 5from hat.event.backends.lmdb import common
 6
 7
 8version = '0.9'
 9
10Timestamp = common.Timestamp
11OrderBy = common.OrderBy
12EventId = common.EventId
13EventType = common.EventType
14EventPayloadBinary = common.EventPayloadBinary
15EventPayloadJson = common.EventPayloadJson
16Event = common.Event
17
18DbKey = common.DbKey
19DbValue = common.DbValue
20DbType = common.DbType
21SettingsId = common.SettingsId
22LatestEventRef = common.LatestEventRef
23TimeseriesEventRef = common.TimeseriesEventRef
24db_defs = common.db_defs
25create_env = common.ext_create_env
26open_db = common.ext_open_db
27
28
29def read(dbs: dict[DbType, lmdb._Database],
30         txn: lmdb.Transaction,
31         db_type: DbType
32         ) -> typing.Iterable[tuple[DbKey, DbValue]]:
33    db_def = db_defs[db_type]
34    with txn.cursor(dbs[db_type]) as cursor:
35        for encoded_key, encoded_value in cursor:
36            key = db_def.decode_key(encoded_key)
37            value = db_def.decode_value(encoded_value)
38            yield key, value
39
40
41def write(dbs: dict[DbType, lmdb._Database],
42          txn: lmdb.Transaction,
43          db_type: DbType,
44          key: DbKey,
45          value: DbValue):
46    db_def = db_defs[db_type]
47    encoded_key = db_def.encode_key(key)
48    encoded_value = db_def.encode_value(value)
49
50    txn.put(encoded_key, encoded_value, db=dbs[db_type])
version = '0.9'
class Timestamp(typing.NamedTuple):
40class Timestamp(typing.NamedTuple):
41    s: int
42    """seconds since 1970-01-01 (can be negative)"""
43    us: int
44    """microseconds added to timestamp seconds in range [0, 1e6)"""
45
46    def add(self, s: float) -> 'Timestamp':
47        """Create new timestamp by adding seconds to existing timestamp"""
48        us = self.us + round((s - int(s)) * 1e6)
49        s = self.s + int(s)
50        return Timestamp(s=s + us // int(1e6),
51                         us=us % int(1e6))

Timestamp(s, us)

Timestamp(s: int, us: int)

Create new instance of Timestamp(s, us)

s: int

seconds since 1970-01-01 (can be negative)

us: int

microseconds added to timestamp seconds in range [0, 1e6)

def add(self, s: float) -> Timestamp:
46    def add(self, s: float) -> 'Timestamp':
47        """Create new timestamp by adding seconds to existing timestamp"""
48        us = self.us + round((s - int(s)) * 1e6)
49        s = self.s + int(s)
50        return Timestamp(s=s + us // int(1e6),
51                         us=us % int(1e6))

Create new timestamp by adding seconds to existing timestamp

class OrderBy(enum.Enum):
73class OrderBy(enum.Enum):
74    TIMESTAMP = 'timestamp'
75    SOURCE_TIMESTAMP = 'sourceTimestamp'

An enumeration.

TIMESTAMP = <OrderBy.TIMESTAMP: 'timestamp'>
SOURCE_TIMESTAMP = <OrderBy.SOURCE_TIMESTAMP: 'sourceTimestamp'>
class EventId(typing.NamedTuple):
78class EventId(typing.NamedTuple):
79    server: ServerId
80    session: SessionId
81    instance: InstanceId

EventId(server, session, instance)

EventId(server: int, session: int, instance: int)

Create new instance of EventId(server, session, instance)

server: int

Alias for field number 0

session: int

Alias for field number 1

instance: int

Alias for field number 2

EventType = tuple[str, ...]
class EventPayloadBinary(typing.NamedTuple):
84class EventPayloadBinary(typing.NamedTuple):
85    type: str
86    data: util.Bytes

EventPayloadBinary(type, data)

EventPayloadBinary(type: str, data: bytes | bytearray | memoryview)

Create new instance of EventPayloadBinary(type, data)

type: str

Alias for field number 0

data: bytes | bytearray | memoryview

Alias for field number 1

class EventPayloadJson(typing.NamedTuple):
89class EventPayloadJson(typing.NamedTuple):
90    data: json.Data

EventPayloadJson(data,)

EventPayloadJson( data: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]])

Create new instance of EventPayloadJson(data,)

data: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]

Alias for field number 0

class Event(typing.NamedTuple):
 96class Event(typing.NamedTuple):
 97    """Event
 98
 99    Operators `>` and `<` test for natural order where it is assumed that
100    first operand is registered before second operand.
101
102    """
103
104    id: EventId
105    type: EventType
106    timestamp: Timestamp
107    source_timestamp: Timestamp | None
108    payload: EventPayload | None
109
110    def __lt__(self, other):
111        if not isinstance(other, Event):
112            return NotImplemented
113
114        if self.id == other.id:
115            return False
116
117        if self.id.server == other.id.server:
118            return self.id < other.id
119
120        if self.timestamp != other.timestamp:
121            return self.timestamp < other.timestamp
122
123        return True
124
125    def __gt__(self, other):
126        if not isinstance(other, Event):
127            return NotImplemented
128
129        if self.id == other.id:
130            return False
131
132        if self.id.server == other.id.server:
133            return self.id > other.id
134
135        if self.timestamp != other.timestamp:
136            return self.timestamp > other.timestamp
137
138        return False

Event

Operators > and < test for natural order where it is assumed that first operand is registered before second operand.

Event( id: EventId, type: tuple[str, ...], timestamp: Timestamp, source_timestamp: Timestamp | None, payload: EventPayloadBinary | EventPayloadJson | None)

Create new instance of Event(id, type, timestamp, source_timestamp, payload)

id: EventId

Alias for field number 0

type: tuple[str, ...]

Alias for field number 1

timestamp: Timestamp

Alias for field number 2

source_timestamp: Timestamp | None

Alias for field number 3

Alias for field number 4

class DbType(enum.Enum):
31class DbType(enum.Enum):
32    SYSTEM_SETTINGS = 0
33    SYSTEM_LAST_EVENT_ID = 1
34    SYSTEM_LAST_TIMESTAMP = 2
35    REF = 3
36    LATEST_DATA = 4
37    LATEST_TYPE = 5
38    TIMESERIES_DATA = 6
39    TIMESERIES_PARTITION = 7
40    TIMESERIES_COUNT = 8

An enumeration.

SYSTEM_SETTINGS = <DbType.SYSTEM_SETTINGS: 0>
SYSTEM_LAST_EVENT_ID = <DbType.SYSTEM_LAST_EVENT_ID: 1>
SYSTEM_LAST_TIMESTAMP = <DbType.SYSTEM_LAST_TIMESTAMP: 2>
REF = <DbType.REF: 3>
LATEST_DATA = <DbType.LATEST_DATA: 4>
LATEST_TYPE = <DbType.LATEST_TYPE: 5>
TIMESERIES_DATA = <DbType.TIMESERIES_DATA: 6>
TIMESERIES_PARTITION = <DbType.TIMESERIES_PARTITION: 7>
TIMESERIES_COUNT = <DbType.TIMESERIES_COUNT: 8>
class SettingsId(enum.Enum):
75class SettingsId(enum.Enum):
76    VERSION = 0
77    IDENTIFIER = 1

An enumeration.

VERSION = <SettingsId.VERSION: 0>
IDENTIFIER = <SettingsId.IDENTIFIER: 1>
class LatestEventRef(typing.NamedTuple):
80class LatestEventRef(typing.NamedTuple):
81    key: EventTypeRef

LatestEventRef(key,)

LatestEventRef(key: int)

Create new instance of LatestEventRef(key,)

key: int

Alias for field number 0

class TimeseriesEventRef(typing.NamedTuple):
84class TimeseriesEventRef(typing.NamedTuple):
85    key: TimeseriesKey

TimeseriesEventRef(key,)

TimeseriesEventRef( key: tuple[int, Timestamp, EventId])

Create new instance of TimeseriesEventRef(key,)

key: tuple[int, Timestamp, EventId]

Alias for field number 0

db_defs = {<DbType.SYSTEM_SETTINGS: 0>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.SYSTEM_LAST_EVENT_ID: 1>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.SYSTEM_LAST_TIMESTAMP: 2>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.REF: 3>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.LATEST_DATA: 4>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.LATEST_TYPE: 5>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.TIMESERIES_DATA: 6>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.TIMESERIES_PARTITION: 7>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.TIMESERIES_COUNT: 8>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>)}
def create_env( path: pathlib.Path, max_size: int = 1099511627776, readonly: bool = False) -> Environment:
147def ext_create_env(path: Path,
148                   max_size: int = default_max_size,
149                   readonly: bool = False
150                   ) -> lmdb.Environment:
151    return lmdb.Environment(str(path),
152                            map_size=max_size,
153                            subdir=False,
154                            max_dbs=len(DbType),
155                            readonly=readonly)
def open_db( env: Environment, db_type: DbType, create: bool = True) -> _Database:
158def ext_open_db(env: lmdb.Environment,
159                db_type: DbType,
160                create: bool = True
161                ) -> lmdb._Database:
162    return env.open_db(db_type.name.encode('utf-8'), create=create)
def read( dbs: dict[DbType, _Database], txn: Transaction, db_type: DbType) -> Iterable[tuple[~DbKey, ~DbValue]]:
30def read(dbs: dict[DbType, lmdb._Database],
31         txn: lmdb.Transaction,
32         db_type: DbType
33         ) -> typing.Iterable[tuple[DbKey, DbValue]]:
34    db_def = db_defs[db_type]
35    with txn.cursor(dbs[db_type]) as cursor:
36        for encoded_key, encoded_value in cursor:
37            key = db_def.decode_key(encoded_key)
38            value = db_def.decode_value(encoded_value)
39            yield key, value
def write( dbs: dict[DbType, _Database], txn: Transaction, db_type: DbType, key: ~DbKey, value: ~DbValue):
42def write(dbs: dict[DbType, lmdb._Database],
43          txn: lmdb.Transaction,
44          db_type: DbType,
45          key: DbKey,
46          value: DbValue):
47    db_def = db_defs[db_type]
48    encoded_key = db_def.encode_key(key)
49    encoded_value = db_def.encode_value(value)
50
51    txn.put(encoded_key, encoded_value, db=dbs[db_type])