hat.event.backends.lmdb.convert.convert_v06_to_v07

  1from pathlib import Path
  2import itertools
  3
  4from hat.event.backends.lmdb.convert import v06
  5from hat.event.backends.lmdb.convert import v07
  6
  7
  8def convert(src_path: Path,
  9            dst_path: Path):
 10    with v06.create_env(src_path) as src_env:
 11        src_system_db = src_env.open_db(b'system')
 12        server_id = _get_server_id(src_env, src_system_db)
 13
 14        if server_id < 1:
 15            server_id = 1
 16
 17        with v07.create_env(dst_path) as dst_env:
 18            dst_system_db = v07.open_db(dst_env, v07.DbType.SYSTEM)
 19            dst_ref_db = v07.open_db(dst_env, v07.DbType.REF)
 20
 21            _convert_latest(src_env=src_env,
 22                            dst_env=dst_env,
 23                            dst_system_db=dst_system_db,
 24                            dst_ref_db=dst_ref_db,
 25                            server_id=server_id)
 26
 27            _convert_ordered(src_env=src_env,
 28                             dst_env=dst_env,
 29                             dst_system_db=dst_system_db,
 30                             dst_ref_db=dst_ref_db,
 31                             server_id=server_id)
 32
 33
 34def _convert_latest(src_env, dst_env, dst_system_db, dst_ref_db, server_id):
 35    src_latest_db = src_env.open_db(b'latest')
 36
 37    dst_latest_data_db = v07.open_db(dst_env, v07.DbType.LATEST_DATA)
 38    dst_latest_type_db = v07.open_db(dst_env, v07.DbType.LATEST_TYPE)
 39
 40    latest_event = None
 41    next_event_type_ref = itertools.count(1)
 42
 43    with src_env.begin(db=src_latest_db, buffers=True) as src_txn:
 44        for _, src_encoded_value in src_txn.cursor():
 45            src_event = v06.decode_event(src_encoded_value)
 46            dst_event = _convert_event(src_event, server_id)
 47
 48            if not latest_event or latest_event.event_id < dst_event.event_id:
 49                latest_event = dst_event
 50
 51            event_type_ref = next(next_event_type_ref)
 52
 53            with dst_env.begin(db=dst_latest_data_db, write=True) as dst_txn:
 54                dst_txn.put(
 55                    v07.encode_latest_data_db_key(
 56                        event_type_ref),
 57                    v07.encode_latest_data_db_value(
 58                        dst_event))
 59
 60            with dst_env.begin(db=dst_latest_type_db, write=True) as dst_txn:
 61                dst_txn.put(
 62                    v07.encode_latest_type_db_key(
 63                        event_type_ref),
 64                    v07.encode_latest_type_db_value(
 65                        dst_event.event_type))
 66
 67            _update_ref_db(dst_env=dst_env,
 68                           dst_ref_db=dst_ref_db,
 69                           dst_event=dst_event,
 70                           event_ref=v07.LatestEventRef(event_type_ref))
 71
 72    if latest_event:
 73        _update_system_db(dst_env=dst_env,
 74                          dst_system_db=dst_system_db,
 75                          server_id=server_id,
 76                          latest_event=latest_event)
 77
 78
 79def _convert_ordered(src_env, dst_env, dst_system_db, dst_ref_db, server_id):
 80    src_ordered_data_db = src_env.open_db(b'ordered_data')
 81    src_ordered_partition_db = src_env.open_db(b'ordered_partition')
 82    src_ordered_count_db = src_env.open_db(b'ordered_count')
 83
 84    dst_ordered_data_db = v07.open_db(dst_env, v07.DbType.ORDERED_DATA)
 85    dst_ordered_partition_db = v07.open_db(
 86        dst_env, v07.DbType.ORDERED_PARTITION)
 87    dst_ordered_count_db = v07.open_db(dst_env, v07.DbType.ORDERED_COUNT)
 88
 89    latest_event = None
 90
 91    with src_env.begin(db=src_ordered_data_db, buffers=True) as src_txn:
 92        for src_encoded_key, src_encoded_value in src_txn.cursor():
 93            partition_id, src_timestamp, _ = v06.decode_uint_timestamp_uint(
 94                src_encoded_key)
 95            src_event = v06.decode_event(src_encoded_value)
 96            dst_timestamp = _convert_timestamp(src_timestamp)
 97            dst_event = _convert_event(src_event, server_id)
 98
 99            if not latest_event or latest_event.event_id < dst_event.event_id:
100                latest_event = dst_event
101
102            dst_key = partition_id, dst_timestamp, dst_event.event_id
103
104            with dst_env.begin(db=dst_ordered_data_db, write=True) as dst_txn:
105                dst_txn.put(
106                    v07.encode_ordered_data_db_key(
107                        dst_key),
108                    v07.encode_ordered_data_db_value(
109                        dst_event))
110
111            _update_ref_db(dst_env=dst_env,
112                           dst_ref_db=dst_ref_db,
113                           dst_event=dst_event,
114                           event_ref=v07.OrderedEventRef(dst_key))
115
116    if latest_event:
117        _update_system_db(dst_env=dst_env,
118                          dst_system_db=dst_system_db,
119                          server_id=server_id,
120                          latest_event=latest_event)
121
122    with src_env.begin(db=src_ordered_partition_db, buffers=True) as src_txn:
123        for src_encoded_key, src_encoded_value in src_txn.cursor():
124            partition_id = v06.decode_uint(src_encoded_key)
125            partition_data = v06.decode_json(src_encoded_value)
126
127            with dst_env.begin(db=dst_ordered_partition_db,
128                               write=True) as dst_txn:
129                dst_txn.put(
130                    v07.encode_ordered_partition_db_key(
131                        partition_id),
132                    v07.encode_ordered_partition_db_value(
133                        partition_data))
134
135    with src_env.begin(db=src_ordered_count_db, buffers=True) as src_txn:
136        for src_encoded_key, src_encoded_value in src_txn.cursor():
137            partition_id = v06.decode_uint(src_encoded_key)
138            count = v06.decode_json(src_encoded_value)
139
140            with dst_env.begin(db=dst_ordered_count_db,
141                               write=True) as dst_txn:
142                dst_txn.put(
143                    v07.encode_ordered_count_db_key(
144                        partition_id),
145                    v07.encode_ordered_count_db_value(
146                        count))
147
148
149def _update_ref_db(dst_env, dst_ref_db, dst_event, event_ref):
150    with dst_env.begin(db=dst_ref_db,
151                       write=True,
152                       buffers=True) as dst_txn:
153        dst_encoded_key = v07.encode_ref_db_key(dst_event.event_id)
154        dst_encoded_value = dst_txn.get(dst_encoded_key)
155
156        event_refs = (v07.decode_ref_db_value(dst_encoded_value)
157                      if dst_encoded_value else set())
158        event_refs.add(event_ref)
159        dst_encoded_value = v07.encode_ref_db_value(event_refs)
160
161        dst_txn.put(dst_encoded_key, dst_encoded_value)
162
163
164def _update_system_db(dst_env, dst_system_db, server_id, latest_event):
165    with dst_env.begin(db=dst_system_db,
166                       write=True,
167                       buffers=True) as dst_txn:
168        dst_encoded_key = v07.encode_system_db_key(server_id)
169        dst_encoded_value = dst_txn.get(dst_encoded_key)
170
171        event_id, _ = (v07.decode_system_db_value(dst_encoded_value)
172                       if dst_encoded_value
173                       else (v07.EventId(server_id, 0, 0), None))
174
175        if latest_event.event_id > event_id:
176            dst_txn.put(dst_encoded_key,
177                        v07.encode_system_db_value((latest_event.event_id,
178                                                    latest_event.timestamp)))
179
180
181def _get_server_id(src_env, src_system_db):
182    with src_env.begin(db=src_system_db, buffers=True) as txn:
183        src_encoded_server_id = txn.get(b'server_id')
184        return v06.decode_uint(src_encoded_server_id)
185
186
187def _convert_event(src_event, server_id):
188    return v07.Event(
189        event_id=_convert_event_id(src_event.event_id, server_id),
190        event_type=src_event.event_type,
191        timestamp=_convert_timestamp(src_event.timestamp),
192        source_timestamp=(_convert_timestamp(src_event.source_timestamp)
193                          if src_event.source_timestamp else None),
194        payload=(_convert_event_payload(src_event.payload)
195                 if src_event.payload else None))
196
197
198def _convert_event_id(src_event_id, server_id):
199    return v07.EventId(server=(src_event_id.server if src_event_id.server > 0
200                               else server_id),
201                       session=src_event_id.instance,
202                       instance=src_event_id.instance)
203
204
205def _convert_event_payload(src_event_payload):
206    return v07.EventPayload(
207        type=v07.EventPayloadType[src_event_payload.type.name],
208        data=src_event_payload.data)
209
210
211def _convert_timestamp(src_timestamp):
212    return v07.Timestamp(s=src_timestamp.s,
213                         us=src_timestamp.us)
def convert(src_path: pathlib.Path, dst_path: pathlib.Path):
 9def convert(src_path: Path,
10            dst_path: Path):
11    with v06.create_env(src_path) as src_env:
12        src_system_db = src_env.open_db(b'system')
13        server_id = _get_server_id(src_env, src_system_db)
14
15        if server_id < 1:
16            server_id = 1
17
18        with v07.create_env(dst_path) as dst_env:
19            dst_system_db = v07.open_db(dst_env, v07.DbType.SYSTEM)
20            dst_ref_db = v07.open_db(dst_env, v07.DbType.REF)
21
22            _convert_latest(src_env=src_env,
23                            dst_env=dst_env,
24                            dst_system_db=dst_system_db,
25                            dst_ref_db=dst_ref_db,
26                            server_id=server_id)
27
28            _convert_ordered(src_env=src_env,
29                             dst_env=dst_env,
30                             dst_system_db=dst_system_db,
31                             dst_ref_db=dst_ref_db,
32                             server_id=server_id)