hat.event.backends.lmdb.convert.v09
1import typing 2 3import lmdb 4 5from hat.event.backends.lmdb import common 6 7 8version = '0.9' 9 10Timestamp = common.Timestamp 11OrderBy = common.OrderBy 12EventId = common.EventId 13EventType = common.EventType 14EventPayloadBinary = common.EventPayloadBinary 15EventPayloadJson = common.EventPayloadJson 16Event = common.Event 17 18DbKey = common.DbKey 19DbValue = common.DbValue 20DbType = common.DbType 21SettingsId = common.SettingsId 22LatestEventRef = common.LatestEventRef 23TimeseriesEventRef = common.TimeseriesEventRef 24db_defs = common.db_defs 25create_env = common.ext_create_env 26open_db = common.ext_open_db 27 28 29def read(dbs: dict[DbType, lmdb._Database], 30 txn: lmdb.Transaction, 31 db_type: DbType 32 ) -> typing.Iterable[tuple[DbKey, DbValue]]: 33 db_def = db_defs[db_type] 34 with txn.cursor(dbs[db_type]) as cursor: 35 for encoded_key, encoded_value in cursor: 36 key = db_def.decode_key(encoded_key) 37 value = db_def.decode_value(encoded_value) 38 yield key, value 39 40 41def write(dbs: dict[DbType, lmdb._Database], 42 txn: lmdb.Transaction, 43 db_type: DbType, 44 key: DbKey, 45 value: DbValue): 46 db_def = db_defs[db_type] 47 encoded_key = db_def.encode_key(key) 48 encoded_value = db_def.encode_value(value) 49 50 txn.put(encoded_key, encoded_value, db=dbs[db_type])
version =
'0.9'
class
Timestamp(typing.NamedTuple):
40class Timestamp(typing.NamedTuple): 41 s: int 42 """seconds since 1970-01-01 (can be negative)""" 43 us: int 44 """microseconds added to timestamp seconds in range [0, 1e6)""" 45 46 def add(self, s: float) -> 'Timestamp': 47 """Create new timestamp by adding seconds to existing timestamp""" 48 us = self.us + round((s - int(s)) * 1e6) 49 s = self.s + int(s) 50 return Timestamp(s=s + us // int(1e6), 51 us=us % int(1e6))
Timestamp(s, us)
46 def add(self, s: float) -> 'Timestamp': 47 """Create new timestamp by adding seconds to existing timestamp""" 48 us = self.us + round((s - int(s)) * 1e6) 49 s = self.s + int(s) 50 return Timestamp(s=s + us // int(1e6), 51 us=us % int(1e6))
Create new timestamp by adding seconds to existing timestamp
class
OrderBy(enum.Enum):
An enumeration.
TIMESTAMP =
<OrderBy.TIMESTAMP: 'timestamp'>
SOURCE_TIMESTAMP =
<OrderBy.SOURCE_TIMESTAMP: 'sourceTimestamp'>
class
EventId(typing.NamedTuple):
78class EventId(typing.NamedTuple): 79 server: ServerId 80 session: SessionId 81 instance: InstanceId
EventId(server, session, instance)
EventType =
tuple[str, ...]
class
EventPayloadBinary(typing.NamedTuple):
EventPayloadBinary(type, data)
class
EventPayloadJson(typing.NamedTuple):
EventPayloadJson(data,)
class
Event(typing.NamedTuple):
96class Event(typing.NamedTuple): 97 """Event 98 99 Operators `>` and `<` test for natural order where it is assumed that 100 first operand is registered before second operand. 101 102 """ 103 104 id: EventId 105 type: EventType 106 timestamp: Timestamp 107 source_timestamp: Timestamp | None 108 payload: EventPayload | None 109 110 def __lt__(self, other): 111 if not isinstance(other, Event): 112 return NotImplemented 113 114 if self.id == other.id: 115 return False 116 117 if self.id.server == other.id.server: 118 return self.id < other.id 119 120 if self.timestamp != other.timestamp: 121 return self.timestamp < other.timestamp 122 123 return True 124 125 def __gt__(self, other): 126 if not isinstance(other, Event): 127 return NotImplemented 128 129 if self.id == other.id: 130 return False 131 132 if self.id.server == other.id.server: 133 return self.id > other.id 134 135 if self.timestamp != other.timestamp: 136 return self.timestamp > other.timestamp 137 138 return False
Event
Operators >
and <
test for natural order where it is assumed that
first operand is registered before second operand.
Event( id: EventId, type: tuple[str, ...], timestamp: Timestamp, source_timestamp: Timestamp | None, payload: EventPayloadBinary | EventPayloadJson | None)
Create new instance of Event(id, type, timestamp, source_timestamp, payload)
class
DbType(enum.Enum):
31class DbType(enum.Enum): 32 SYSTEM_SETTINGS = 0 33 SYSTEM_LAST_EVENT_ID = 1 34 SYSTEM_LAST_TIMESTAMP = 2 35 REF = 3 36 LATEST_DATA = 4 37 LATEST_TYPE = 5 38 TIMESERIES_DATA = 6 39 TIMESERIES_PARTITION = 7 40 TIMESERIES_COUNT = 8
An enumeration.
SYSTEM_SETTINGS =
<DbType.SYSTEM_SETTINGS: 0>
SYSTEM_LAST_EVENT_ID =
<DbType.SYSTEM_LAST_EVENT_ID: 1>
SYSTEM_LAST_TIMESTAMP =
<DbType.SYSTEM_LAST_TIMESTAMP: 2>
REF =
<DbType.REF: 3>
LATEST_DATA =
<DbType.LATEST_DATA: 4>
LATEST_TYPE =
<DbType.LATEST_TYPE: 5>
TIMESERIES_DATA =
<DbType.TIMESERIES_DATA: 6>
TIMESERIES_PARTITION =
<DbType.TIMESERIES_PARTITION: 7>
TIMESERIES_COUNT =
<DbType.TIMESERIES_COUNT: 8>
class
SettingsId(enum.Enum):
An enumeration.
VERSION =
<SettingsId.VERSION: 0>
IDENTIFIER =
<SettingsId.IDENTIFIER: 1>
class
LatestEventRef(typing.NamedTuple):
LatestEventRef(key,)
class
TimeseriesEventRef(typing.NamedTuple):
TimeseriesEventRef(key,)
db_defs =
{<DbType.SYSTEM_SETTINGS: 0>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.SYSTEM_LAST_EVENT_ID: 1>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.SYSTEM_LAST_TIMESTAMP: 2>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.REF: 3>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.LATEST_DATA: 4>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.LATEST_TYPE: 5>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.TIMESERIES_DATA: 6>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.TIMESERIES_PARTITION: 7>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>), <DbType.TIMESERIES_COUNT: 8>: DbDef(encode_key=<function <lambda>>, decode_key=<function <lambda>>, encode_value=<function <lambda>>, decode_value=<function <lambda>>)}
def
create_env( path: pathlib.Path, max_size: int = 1099511627776, readonly: bool = False) -> Environment:
def
read( dbs: dict[DbType, _Database], txn: Transaction, db_type: DbType) -> Iterable[tuple[~DbKey, ~DbValue]]:
30def read(dbs: dict[DbType, lmdb._Database], 31 txn: lmdb.Transaction, 32 db_type: DbType 33 ) -> typing.Iterable[tuple[DbKey, DbValue]]: 34 db_def = db_defs[db_type] 35 with txn.cursor(dbs[db_type]) as cursor: 36 for encoded_key, encoded_value in cursor: 37 key = db_def.decode_key(encoded_key) 38 value = db_def.decode_value(encoded_value) 39 yield key, value
def
write( dbs: dict[DbType, _Database], txn: Transaction, db_type: DbType, key: ~DbKey, value: ~DbValue):
42def write(dbs: dict[DbType, lmdb._Database], 43 txn: lmdb.Transaction, 44 db_type: DbType, 45 key: DbKey, 46 value: DbValue): 47 db_def = db_defs[db_type] 48 encoded_key = db_def.encode_key(key) 49 encoded_value = db_def.encode_value(value) 50 51 txn.put(encoded_key, encoded_value, db=dbs[db_type])