hat.event.backends.lmdb.convert.convert_v07_to_v09

  1from pathlib import Path
  2
  3from hat.event.backends.lmdb.convert import v07
  4from hat.event.backends.lmdb.convert import v09
  5
  6
  7def convert(src_path: Path,
  8            dst_path: Path):
  9    with v07.create_env(src_path) as src_env:
 10        with v09.create_env(dst_path) as dst_env:
 11            src_dbs = {db_type: v07.open_db(src_env, db_type)
 12                       for db_type in v07.DbType}
 13            dst_dbs = {db_type: v09.open_db(dst_env, db_type)
 14                       for db_type in v09.DbType}
 15
 16            with src_env.begin(buffers=True) as src_txn:
 17                with dst_env.begin(write=True) as dst_txn:
 18                    _convert_system_db(src_txn, src_dbs, dst_txn, dst_dbs)
 19                    _convert_ref_db(src_txn, src_dbs, dst_txn, dst_dbs)
 20                    _convert_latest_db(src_txn, src_dbs, dst_txn, dst_dbs)
 21                    _convert_timeseries_db(src_txn, src_dbs, dst_txn, dst_dbs)
 22
 23
 24def _convert_system_db(src_txn, src_dbs, dst_txn, dst_dbs):
 25    v09.write(dst_dbs, dst_txn, v09.DbType.SYSTEM_SETTINGS,
 26              v09.SettingsId.VERSION, v09.version)
 27
 28    with src_txn.cursor(db=src_dbs[v07.DbType.SYSTEM]) as src_cursor:
 29        for src_key, src_value in src_cursor:
 30            server_id = v07.decode_system_db_key(src_key)
 31            src_event_id, src_timestamp = v07.decode_system_db_value(src_value)
 32
 33            dst_event_id = _convert_event_id(src_event_id)
 34            dst_timestamp = _convert_timestamp(src_timestamp)
 35
 36            v09.write(dst_dbs, dst_txn, v09.DbType.SYSTEM_LAST_EVENT_ID,
 37                      server_id, dst_event_id)
 38
 39            v09.write(dst_dbs, dst_txn, v09.DbType.SYSTEM_LAST_TIMESTAMP,
 40                      server_id, dst_timestamp)
 41
 42
 43def _convert_ref_db(src_txn, src_dbs, dst_txn, dst_dbs):
 44    with src_txn.cursor(db=src_dbs[v07.DbType.REF]) as src_cursor:
 45        for src_key, src_value in src_cursor:
 46            src_event_id = v07.decode_ref_db_key(src_key)
 47            src_event_refs = v07.decode_ref_db_value(src_value)
 48
 49            dst_event_id = _convert_event_id(src_event_id)
 50            dst_event_refs = {_convert_event_ref(src_event_ref)
 51                              for src_event_ref in src_event_refs}
 52
 53            v09.write(dst_dbs, dst_txn, v09.DbType.REF,
 54                      dst_event_id, dst_event_refs)
 55
 56
 57def _convert_latest_db(src_txn, src_dbs, dst_txn, dst_dbs):
 58    with src_txn.cursor(db=src_dbs[v07.DbType.LATEST_DATA]) as src_cursor:
 59        for src_key, src_value in src_cursor:
 60            event_type_ref = v07.decode_latest_data_db_key(src_key)
 61            src_event = v07.decode_latest_data_db_value(src_value)
 62
 63            dst_event = _convert_event(src_event)
 64
 65            v09.write(dst_dbs, dst_txn, v09.DbType.LATEST_DATA,
 66                      event_type_ref, dst_event)
 67
 68    with src_txn.cursor(db=src_dbs[v07.DbType.LATEST_TYPE]) as src_cursor:
 69        for src_key, src_value in src_cursor:
 70            event_type_ref = v07.decode_latest_type_db_key(src_key)
 71            event_type = v07.decode_latest_type_db_value(src_value)
 72
 73            v09.write(dst_dbs, dst_txn, v09.DbType.LATEST_TYPE,
 74                      event_type_ref, event_type)
 75
 76
 77def _convert_timeseries_db(src_txn, src_dbs, dst_txn, dst_dbs):
 78    with src_txn.cursor(db=src_dbs[v07.DbType.ORDERED_DATA]) as src_cursor:
 79        for src_key, src_value in src_cursor:
 80            partition_id, src_timestamp, src_event_id = \
 81                v07.decode_ordered_data_db_key(src_key)
 82            src_event = v07.decode_ordered_data_db_value(src_value)
 83
 84            dst_timestamp = _convert_timestamp(src_timestamp)
 85            dst_event_id = _convert_event_id(src_event_id)
 86            dst_event = _convert_event(src_event)
 87
 88            v09.write(dst_dbs, dst_txn, v09.DbType.TIMESERIES_DATA,
 89                      (partition_id, dst_timestamp, dst_event_id), dst_event)
 90
 91    with src_txn.cursor(db=src_dbs[v07.DbType.ORDERED_PARTITION]) as src_cursor:  # NOQA
 92        for src_key, src_value in src_cursor:
 93            partition_id = v07.decode_ordered_partition_db_key(src_key)
 94            src_partition_data = v07.decode_ordered_partition_db_value(
 95                src_value)
 96
 97            dst_partition_data = _convert_partition_data(src_partition_data)
 98
 99            v09.write(dst_dbs, dst_txn, v09.DbType.TIMESERIES_PARTITION,
100                      partition_id, dst_partition_data)
101
102    with src_txn.cursor(db=src_dbs[v07.DbType.ORDERED_COUNT]) as src_cursor:
103        for src_key, src_value in src_cursor:
104            partition_id = v07.decode_ordered_count_db_key(src_key)
105            count = v07.decode_ordered_count_db_value(src_value)
106
107            v09.write(dst_dbs, dst_txn, v09.DbType.TIMESERIES_COUNT,
108                      partition_id, count)
109
110
111def _convert_event_id(src_event_id):
112    return v09.EventId(*src_event_id)
113
114
115def _convert_timestamp(src_timestamp):
116    return v09.Timestamp(*src_timestamp)
117
118
119def _convert_event_ref(src_event_ref):
120    if isinstance(src_event_ref, v07.LatestEventRef):
121        return v09.LatestEventRef(src_event_ref.key)
122
123    if isinstance(src_event_ref, v07.OrderedEventRef):
124        partition_id, src_timestamp, src_event_id = src_event_ref.key
125
126        dst_timestamp = _convert_timestamp(src_timestamp)
127        dst_event_id = _convert_event_id(src_event_id)
128
129        dst_key = partition_id, dst_timestamp, dst_event_id
130        return v09.TimeseriesEventRef(dst_key)
131
132    raise ValueError('unsupported event reference')
133
134
135def _convert_event(src_event):
136    return v09.Event(
137        id=_convert_event_id(src_event.event_id),
138        type=src_event.event_type,
139        timestamp=_convert_timestamp(src_event.timestamp),
140        source_timestamp=(_convert_timestamp(src_event.source_timestamp)
141                          if src_event.source_timestamp else None),
142        payload=_convert_payload(src_event.payload))
143
144
145def _convert_payload(src_payload):
146    if src_payload is None:
147        return
148
149    if src_payload.type == v07.EventPayloadType.BINARY:
150        return v09.EventPayloadBinary(type='',
151                                      data=src_payload.data)
152
153    if src_payload.type == v07.EventPayloadType.JSON:
154        return v09.EventPayloadJson(data=src_payload.data)
155
156    if src_payload.type == v07.EventPayloadType.SBS:
157        binary_type = (f'{src_payload.data.module}.{src_payload.data.type}'
158                       if src_payload.data.module is not None else
159                       src_payload.data.type)
160        return v09.EventPayloadBinary(type=binary_type,
161                                      data=src_payload.data.data)
162
163    raise ValueError('unsupported payload type')
164
165
166def _convert_partition_data(src_data):
167    src_order_by = v07.OrderBy(src_data['order'])
168    dst_order_by = _convert_order_by(src_order_by)
169
170    return {'order': dst_order_by.value,
171            'subscriptions': src_data['subscriptions']}
172
173
174def _convert_order_by(src_order_by):
175    return v09.OrderBy[src_order_by.name]
def convert(src_path: pathlib.Path, dst_path: pathlib.Path):
 8def convert(src_path: Path,
 9            dst_path: Path):
10    with v07.create_env(src_path) as src_env:
11        with v09.create_env(dst_path) as dst_env:
12            src_dbs = {db_type: v07.open_db(src_env, db_type)
13                       for db_type in v07.DbType}
14            dst_dbs = {db_type: v09.open_db(dst_env, db_type)
15                       for db_type in v09.DbType}
16
17            with src_env.begin(buffers=True) as src_txn:
18                with dst_env.begin(write=True) as dst_txn:
19                    _convert_system_db(src_txn, src_dbs, dst_txn, dst_dbs)
20                    _convert_ref_db(src_txn, src_dbs, dst_txn, dst_dbs)
21                    _convert_latest_db(src_txn, src_dbs, dst_txn, dst_dbs)
22                    _convert_timeseries_db(src_txn, src_dbs, dst_txn, dst_dbs)