hat.event.backends.lmdb.convert.v06

  1from pathlib import Path
  2import enum
  3import struct
  4import platform
  5import typing
  6
  7import lmdb
  8
  9from hat import json
 10from hat import sbs
 11
 12
 13EventType: typing.TypeAlias = typing.Tuple[str, ...]
 14
 15
 16EventPayloadType = enum.Enum('EventPayloadType', [
 17    'BINARY',
 18    'JSON',
 19    'SBS'])
 20
 21
 22class EventId(typing.NamedTuple):
 23    server: int
 24    instance: int
 25
 26
 27class SbsData(typing.NamedTuple):
 28    module: str | None
 29    type: str
 30    data: bytes
 31
 32
 33class EventPayload(typing.NamedTuple):
 34    type: EventPayloadType
 35    data: bytes | json.Data | SbsData
 36
 37
 38class Timestamp(typing.NamedTuple):
 39    s: int
 40    us: int
 41
 42
 43class Event(typing.NamedTuple):
 44    event_id: EventId
 45    event_type: EventType
 46    timestamp: Timestamp
 47    source_timestamp: Timestamp | None
 48    payload: EventPayload | None
 49
 50
 51def decode_uint(x: bytes) -> int:
 52    return struct.unpack(">Q", x)[0]
 53
 54
 55def decode_timestamp(x: bytes) -> Timestamp:
 56    res = struct.unpack(">QI", x)
 57    return Timestamp(res[0] - (1 << 63), res[1])
 58
 59
 60def decode_tuple_str(x: bytes) -> typing.Tuple[str, ...]:
 61    return tuple(json.decode(str(x, encoding='utf-8')))
 62
 63
 64def decode_json(x: bytes) -> json.Data:
 65    return json.decode(str(x, encoding='utf-8'))
 66
 67
 68def decode_uint_timestamp_uint(x: bytes
 69                               ) -> tuple[int, Timestamp, int]:
 70    res = struct.unpack(">QQIQ", x)
 71    return res[0], Timestamp(res[1] - (1 << 63), res[2]), res[3]
 72
 73
 74def decode_event(event_bytes: bytes) -> Event:
 75    event_sbs = _sbs_repo.decode('HatEvent.Event', event_bytes)
 76    return _event_from_sbs(event_sbs)
 77
 78
 79def create_env(path: Path):
 80    max_dbs = 5
 81    max_db_size = (512 * 1024 * 1024 * 1024
 82                   if platform.architecture()[0] == '64bit'
 83                   else 1024 * 1024 * 1024)
 84    return lmdb.Environment(str(path),
 85                            map_size=max_db_size,
 86                            subdir=False,
 87                            max_dbs=max_dbs)
 88
 89
 90_sbs_repo = sbs.Repository(r"""
 91module HatEvent
 92
 93MsgSubscribe = Array(EventType)
 94
 95MsgNotify = Array(Event)
 96
 97MsgRegisterReq = Array(RegisterEvent)
 98
 99MsgRegisterRes = Array(Choice {
100    event:    Event
101    failure:  None
102})
103
104MsgQueryReq = QueryData
105
106MsgQueryRes = Array(Event)
107
108Timestamp = Record {
109    s:   Integer
110    us:  Integer
111}
112
113EventId = Record {
114    server:    Integer
115    instance:  Integer
116}
117
118Order = Choice {
119    descending:  None
120    ascending:   None
121}
122
123OrderBy = Choice {
124    timestamp:        None
125    sourceTimestamp:  None
126}
127
128EventType = Array(String)
129
130EventPayload = Choice {
131    binary:  Bytes
132    json:    String
133    sbs:     Record {
134        module:  Optional(String)
135        type:    String
136        data:    Bytes
137    }
138}
139
140Event = Record {
141    id:               EventId
142    type:             EventType
143    timestamp:        Timestamp
144    sourceTimestamp:  Optional(Timestamp)
145    payload:          Optional(EventPayload)
146}
147
148RegisterEvent = Record {
149    type:             EventType
150    sourceTimestamp:  Optional(Timestamp)
151    payload:          Optional(EventPayload)
152}
153
154QueryData = Record {
155    ids:                Optional(Array(EventId))
156    types:              Optional(Array(EventType))
157    tFrom:              Optional(Timestamp)
158    tTo:                Optional(Timestamp)
159    sourceTFrom:        Optional(Timestamp)
160    sourceTTo:          Optional(Timestamp)
161    payload:            Optional(EventPayload)
162    order:              Order
163    orderBy:            OrderBy
164    uniqueType:         Boolean
165    maxResults:         Optional(Integer)
166}
167""")
168
169
170def _event_from_sbs(data: sbs.Data) -> Event:
171    return Event(
172        event_id=_event_id_from_sbs(data['id']),
173        event_type=tuple(data['type']),
174        timestamp=_timestamp_from_sbs(data['timestamp']),
175        source_timestamp=_optional_from_sbs(data['sourceTimestamp'],
176                                            _timestamp_from_sbs),
177        payload=_optional_from_sbs(data['payload'], _event_payload_from_sbs))
178
179
180def _event_payload_from_sbs(data: sbs.Data) -> EventPayload:
181    data_type, data_data = data
182
183    if data_type == 'binary':
184        return EventPayload(type=EventPayloadType.BINARY,
185                            data=data_data)
186
187    if data_type == 'json':
188        return EventPayload(type=EventPayloadType.JSON,
189                            data=json.decode(data_data))
190
191    if data_type == 'sbs':
192        return EventPayload(type=EventPayloadType.SBS,
193                            data=_sbs_data_from_sbs(data_data))
194
195    raise ValueError('unsupported payload type')
196
197
198def _timestamp_from_sbs(data: sbs.Data) -> Timestamp:
199    return Timestamp(s=data['s'], us=data['us'])
200
201
202def _event_id_from_sbs(data: sbs.Data) -> EventId:
203    return EventId(server=data['server'],
204                   instance=data['instance'])
205
206
207def _sbs_data_from_sbs(data: sbs.Data) -> SbsData:
208    return SbsData(module=_optional_from_sbs(data['module']),
209                   type=data['type'],
210                   data=data['data'])
211
212
213def _optional_from_sbs(data: sbs.Data,
214                       fn=lambda i: i
215                       ) -> typing.Any | None:
216    return fn(data[1]) if data[0] == 'value' else None
EventType: TypeAlias = Tuple[str, ...]
class EventPayloadType(enum.Enum):

An enumeration.

BINARY = <EventPayloadType.BINARY: 1>
JSON = <EventPayloadType.JSON: 2>
class EventId(typing.NamedTuple):
23class EventId(typing.NamedTuple):
24    server: int
25    instance: int

EventId(server, instance)

EventId(server: int, instance: int)

Create new instance of EventId(server, instance)

server: int

Alias for field number 0

instance: int

Alias for field number 1

class SbsData(typing.NamedTuple):
28class SbsData(typing.NamedTuple):
29    module: str | None
30    type: str
31    data: bytes

SbsData(module, type, data)

SbsData(module: str | None, type: str, data: bytes)

Create new instance of SbsData(module, type, data)

module: str | None

Alias for field number 0

type: str

Alias for field number 1

data: bytes

Alias for field number 2

class EventPayload(typing.NamedTuple):
34class EventPayload(typing.NamedTuple):
35    type: EventPayloadType
36    data: bytes | json.Data | SbsData

EventPayload(type, data)

EventPayload( type: EventPayloadType, data: Union[bytes, NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], SbsData])

Create new instance of EventPayload(type, data)

Alias for field number 0

data: Union[bytes, NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], SbsData]

Alias for field number 1

class Timestamp(typing.NamedTuple):
39class Timestamp(typing.NamedTuple):
40    s: int
41    us: int

Timestamp(s, us)

Timestamp(s: int, us: int)

Create new instance of Timestamp(s, us)

s: int

Alias for field number 0

us: int

Alias for field number 1

class Event(typing.NamedTuple):
44class Event(typing.NamedTuple):
45    event_id: EventId
46    event_type: EventType
47    timestamp: Timestamp
48    source_timestamp: Timestamp | None
49    payload: EventPayload | None

Event(event_id, event_type, timestamp, source_timestamp, payload)

Event( event_id: EventId, event_type: Tuple[str, ...], timestamp: Timestamp, source_timestamp: Timestamp | None, payload: EventPayload | None)

Create new instance of Event(event_id, event_type, timestamp, source_timestamp, payload)

event_id: EventId

Alias for field number 0

event_type: Tuple[str, ...]

Alias for field number 1

timestamp: Timestamp

Alias for field number 2

source_timestamp: Timestamp | None

Alias for field number 3

payload: EventPayload | None

Alias for field number 4

def decode_uint(x: bytes) -> int:
52def decode_uint(x: bytes) -> int:
53    return struct.unpack(">Q", x)[0]
def decode_timestamp(x: bytes) -> Timestamp:
56def decode_timestamp(x: bytes) -> Timestamp:
57    res = struct.unpack(">QI", x)
58    return Timestamp(res[0] - (1 << 63), res[1])
def decode_tuple_str(x: bytes) -> Tuple[str, ...]:
61def decode_tuple_str(x: bytes) -> typing.Tuple[str, ...]:
62    return tuple(json.decode(str(x, encoding='utf-8')))
def decode_json( x: bytes) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
65def decode_json(x: bytes) -> json.Data:
66    return json.decode(str(x, encoding='utf-8'))
def decode_uint_timestamp_uint( x: bytes) -> tuple[int, Timestamp, int]:
69def decode_uint_timestamp_uint(x: bytes
70                               ) -> tuple[int, Timestamp, int]:
71    res = struct.unpack(">QQIQ", x)
72    return res[0], Timestamp(res[1] - (1 << 63), res[2]), res[3]
def decode_event(event_bytes: bytes) -> Event:
75def decode_event(event_bytes: bytes) -> Event:
76    event_sbs = _sbs_repo.decode('HatEvent.Event', event_bytes)
77    return _event_from_sbs(event_sbs)
def create_env(path: pathlib.Path):
80def create_env(path: Path):
81    max_dbs = 5
82    max_db_size = (512 * 1024 * 1024 * 1024
83                   if platform.architecture()[0] == '64bit'
84                   else 1024 * 1024 * 1024)
85    return lmdb.Environment(str(path),
86                            map_size=max_db_size,
87                            subdir=False,
88                            max_dbs=max_dbs)