hat.event.backends.lmdb.timeseriesdb
1from collections.abc import Iterable 2import collections 3import itertools 4import typing 5 6import lmdb 7 8from hat.event.backends.lmdb import common 9from hat.event.backends.lmdb import environment 10from hat.event.backends.lmdb.conditions import Conditions 11 12 13Changes: typing.TypeAlias = dict[common.PartitionId, 14 collections.deque[tuple[common.Timestamp, 15 common.Event]]] 16 17 18class Limit(typing.NamedTuple): 19 min_entries: int | None = None 20 max_entries: int | None = None 21 duration: float | None = None 22 size: int | None = None 23 24 25class Partition(typing.NamedTuple): 26 order_by: common.OrderBy 27 subscription: common.Subscription 28 limit: Limit | None 29 30 31def ext_create(env: environment.Environment, 32 txn: lmdb.Transaction, 33 conditions: Conditions, 34 partitions: Iterable[Partition], 35 max_results: int = 4096 36 ) -> 'TimeseriesDb': 37 db = TimeseriesDb() 38 db._env = env 39 db._conditions = conditions 40 db._max_results = max_results 41 db._changes = collections.defaultdict(collections.deque) 42 43 # depending on dict order 44 db._partitions = dict(_ext_init_partitions(env, txn, partitions)) 45 46 return db 47 48 49class TimeseriesDb: 50 51 def add(self, 52 event: common.Event 53 ) -> Iterable[common.EventRef]: 54 for partition_id, partition in self._partitions.items(): 55 if not partition.subscription.matches(event.type): 56 continue 57 58 if partition.order_by == common.OrderBy.TIMESTAMP: 59 timestamp = event.timestamp 60 61 elif partition.order_by == common.OrderBy.SOURCE_TIMESTAMP: 62 if event.source_timestamp is None: 63 continue 64 65 timestamp = event.source_timestamp 66 67 else: 68 raise ValueError('unsupported order by') 69 70 self._changes[partition_id].append((timestamp, event)) 71 72 yield common.TimeseriesEventRef( 73 (partition_id, timestamp, event.id)) 74 75 async def query(self, 76 params: common.QueryTimeseriesParams 77 ) -> common.QueryResult: 78 subscription = (common.create_subscription(params.event_types) 79 if params.event_types is not None else None) 80 81 max_results = (params.max_results 82 if params.max_results is not None and 83 params.max_results < self._max_results 84 else self._max_results) 85 86 for partition_id, partition in self._partitions.items(): 87 if partition.order_by != params.order_by: 88 continue 89 90 if (subscription and 91 subscription.isdisjoint(partition.subscription)): 92 continue 93 94 return await self._query_partition(partition_id, params, 95 subscription, max_results) 96 97 return common.QueryResult(events=[], 98 more_follows=False) 99 100 def create_changes(self) -> Changes: 101 changes, self._changes = (self._changes, 102 collections.defaultdict(collections.deque)) 103 return changes 104 105 def ext_write(self, 106 txn: lmdb.Transaction, 107 changes: Changes): 108 data = (((partition_id, timestamp, event.id), event) 109 for partition_id, partition_changes in changes.items() 110 for timestamp, event in partition_changes) 111 self._env.ext_write(txn, common.DbType.TIMESERIES_DATA, data) 112 113 counts = ((partition_id, len(partition_changes)) 114 for partition_id, partition_changes in changes.items()) 115 _ext_inc_partition_count(self._env, txn, counts) 116 117 def ext_cleanup(self, 118 txn: lmdb.Transaction, 119 now: common.Timestamp, 120 max_results: int | None = None, 121 ) -> collections.deque[tuple[common.EventId, 122 common.EventRef]]: 123 result = collections.deque() 124 125 for partition_id, partition in self._partitions.items(): 126 if not partition.limit: 127 continue 128 129 partition_max_results = (max_results - len(result) 130 if max_results is not None else None) 131 if partition_max_results is not None and partition_max_results < 1: 132 break 133 134 result.extend(_ext_cleanup_partition(self._env, txn, now, 135 partition_id, partition.limit, 136 partition_max_results)) 137 138 return result 139 140 async def _query_partition(self, partition_id, params, subscription, 141 max_results): 142 events = collections.deque() 143 changes = self._changes 144 145 filter = _Filter(subscription=subscription, 146 t_from=params.t_from, 147 t_to=params.t_to, 148 source_t_from=params.source_t_from, 149 source_t_to=params.source_t_to, 150 max_results=max_results + 1, 151 last_event_id=params.last_event_id) 152 153 if params.order == common.Order.DESCENDING: 154 events.extend(_query_partition_changes( 155 changes[partition_id], params, filter)) 156 157 if not filter.done: 158 events.extend(await self._env.execute( 159 _ext_query_partition_events, self._env, self._conditions, 160 partition_id, params, filter)) 161 162 elif params.order == common.Order.ASCENDING: 163 events.extend(await self._env.execute( 164 _ext_query_partition_events, self._env, self._conditions, 165 partition_id, params, filter)) 166 167 if not filter.done: 168 events.extend(_query_partition_changes( 169 changes[partition_id], params, filter)) 170 171 else: 172 raise ValueError('unsupported order') 173 174 more_follows = len(events) > max_results 175 while len(events) > max_results: 176 events.pop() 177 178 return common.QueryResult(events=events, 179 more_follows=more_follows) 180 181 182def _ext_init_partitions(env, txn, partitions): 183 db_data = dict(env.ext_read(txn, common.DbType.TIMESERIES_PARTITION)) 184 next_partition_ids = itertools.count(max(db_data.keys(), default=0) + 1) 185 186 for partition in partitions: 187 event_types = sorted(partition.subscription.get_query_types()) 188 partition_data = {'order': partition.order_by.value, 189 'subscriptions': [list(i) for i in event_types]} 190 191 for partition_id, i in db_data.items(): 192 if i == partition_data: 193 break 194 195 else: 196 partition_id = next(next_partition_ids) 197 db_data[partition_id] = partition_data 198 env.ext_write(txn, common.DbType.TIMESERIES_PARTITION, 199 [(partition_id, partition_data)]) 200 201 yield partition_id, partition 202 203 204def _ext_query_partition_events(env, conditions, partition_id, params, 205 filter): 206 if params.order_by == common.OrderBy.TIMESTAMP: 207 events = _ext_query_partition_events_range( 208 env, partition_id, params.t_from, params.t_to, params.order) 209 210 elif params.order_by == common.OrderBy.SOURCE_TIMESTAMP: 211 events = _ext_query_partition_events_range( 212 env, partition_id, params.source_t_from, params.source_t_to, 213 params.order) 214 215 else: 216 raise ValueError('unsupported order by') 217 218 events = (event for event in events if conditions.matches(event)) 219 events = filter.process(events) 220 return collections.deque(events) 221 222 223def _ext_query_partition_events_range(env, partition_id, t_from, t_to, order): 224 db_def = common.db_defs[common.DbType.TIMESERIES_DATA] 225 226 if not t_from: 227 t_from = common.min_timestamp 228 229 from_key = partition_id, t_from, common.EventId(0, 0, 0) 230 encoded_from_key = db_def.encode_key(from_key) 231 232 if not t_to: 233 t_to = common.max_timestamp 234 235 to_key = (partition_id, 236 t_to, 237 common.EventId((1 << 64) - 1, (1 << 64) - 1, (1 << 64) - 1)) 238 encoded_to_key = db_def.encode_key(to_key) 239 240 with env.ext_begin() as txn: 241 with env.ext_cursor(txn, common.DbType.TIMESERIES_DATA) as cursor: 242 if order == common.Order.DESCENDING: 243 encoded_start_key, encoded_stop_key = (encoded_to_key, 244 encoded_from_key) 245 246 if cursor.set_range(encoded_start_key): 247 more = cursor.prev() 248 else: 249 more = cursor.last() 250 251 while more and encoded_stop_key <= bytes(cursor.key()): 252 yield db_def.decode_value(cursor.value()) 253 more = cursor.prev() 254 255 elif order == common.Order.ASCENDING: 256 encoded_start_key, encoded_stop_key = (encoded_from_key, 257 encoded_to_key) 258 259 more = cursor.set_range(encoded_start_key) 260 261 while more and bytes(cursor.key()) <= encoded_stop_key: 262 yield db_def.decode_value(cursor.value()) 263 more = cursor.next() 264 265 else: 266 raise ValueError('unsupported order') 267 268 269def _ext_get_partition_count(env, txn, partition_id): 270 db_def = common.db_defs[common.DbType.TIMESERIES_COUNT] 271 272 with env.ext_cursor(txn, common.DbType.TIMESERIES_COUNT) as cursor: 273 encoded_key = db_def.encode_key(partition_id) 274 encoded_value = cursor.get(encoded_key) 275 276 return db_def.decode_value(encoded_value) if encoded_value else 0 277 278 279def _ext_set_partition_count(env, txn, partition_id, count): 280 env.ext_write(txn, common.DbType.TIMESERIES_COUNT, [(partition_id, count)]) 281 282 283def _ext_inc_partition_count(env, txn, partition_counts): 284 db_def = common.db_defs[common.DbType.TIMESERIES_COUNT] 285 286 with env.ext_cursor(txn, common.DbType.TIMESERIES_COUNT) as cursor: 287 for partition_id, count in partition_counts: 288 encoded_key = db_def.encode_key(partition_id) 289 encoded_value = cursor.get(encoded_key) 290 291 value = db_def.decode_value(encoded_value) if encoded_value else 0 292 inc_value = value + count 293 294 encoded_value = db_def.encode_value(inc_value) 295 cursor.put(encoded_key, encoded_value) 296 297 298def _ext_cleanup_partition(env, txn, now, partition_id, limit, max_results): 299 db_def = common.db_defs[common.DbType.TIMESERIES_DATA] 300 301 timestamp = common.min_timestamp 302 start_key = (partition_id, 303 timestamp, 304 common.EventId(0, 0, 0)) 305 stop_key = ((partition_id + 1), 306 timestamp, 307 common.EventId(0, 0, 0)) 308 309 encoded_start_key = db_def.encode_key(start_key) 310 encoded_stop_key = db_def.encode_key(stop_key) 311 312 min_entries = limit.min_entries or 0 313 max_entries = None 314 encoded_duration_key = None 315 316 if limit.size is not None: 317 stat = env.ext_stat(txn, common.DbType.TIMESERIES_DATA) 318 319 if stat['entries']: 320 total_size = stat['psize'] * (stat['branch_pages'] + 321 stat['leaf_pages'] + 322 stat['overflow_pages']) 323 entry_size = total_size / stat['entries'] 324 max_entries = int(limit.size / entry_size) 325 326 if limit.max_entries is not None: 327 max_entries = (limit.max_entries if max_entries is None 328 else min(max_entries, limit.max_entries)) 329 330 if limit.duration is not None: 331 duration_key = (partition_id, 332 now.add(-limit.duration), 333 common.EventId(0, 0, 0)) 334 encoded_duration_key = db_def.encode_key(duration_key) 335 336 result_count = 0 337 entries_count = _ext_get_partition_count(env, txn, partition_id) 338 339 with env.ext_cursor(txn, common.DbType.TIMESERIES_DATA) as cursor: 340 more = cursor.set_range(encoded_start_key) 341 while more: 342 if max_results is not None and result_count >= max_results: 343 break 344 345 if entries_count - result_count <= min_entries: 346 break 347 348 encoded_key = bytes(cursor.key()) 349 if encoded_key >= encoded_stop_key: 350 break 351 352 if ((max_entries is None or 353 entries_count - result_count <= max_entries) and 354 (encoded_duration_key is None or 355 encoded_key >= encoded_duration_key)): 356 break 357 358 key = db_def.decode_key(encoded_key) 359 event_id = key[2] 360 361 more = cursor.delete() 362 result_count += 1 363 364 yield event_id, common.TimeseriesEventRef(key) 365 366 if result_count > 0: 367 _ext_set_partition_count(env, txn, partition_id, 368 entries_count - result_count) 369 370 371def _query_partition_changes(changes, params, filter): 372 if params.order == common.Order.DESCENDING: 373 events = (event for _, event in reversed(changes)) 374 375 if (params.order_by == common.OrderBy.TIMESTAMP and 376 params.t_to is not None): 377 events = itertools.dropwhile( 378 lambda i: params.t_to < i.timestamp, 379 events) 380 381 elif (params.order_by == common.OrderBy.SOURCE_TIMESTAMP and 382 params.source_t_to is not None): 383 events = itertools.dropwhile( 384 lambda i: params.source_t_to < i.source_timestamp, 385 events) 386 387 if (params.order_by == common.OrderBy.TIMESTAMP and 388 params.t_from is not None): 389 events = itertools.takewhile( 390 lambda i: params.t_from <= i.timestamp, 391 events) 392 393 elif (params.order_by == common.OrderBy.SOURCE_TIMESTAMP and 394 params.source_t_from is not None): 395 events = itertools.takewhile( 396 lambda i: params.source_t_from <= i.source_timestamp, 397 events) 398 399 elif params.order == common.Order.ASCENDING: 400 events = (event for _, event in changes) 401 402 if (params.order_by == common.OrderBy.TIMESTAMP and 403 params.t_from is not None): 404 events = itertools.dropwhile( 405 lambda i: i.timestamp < params.t_from, 406 events) 407 408 elif (params.order_by == common.OrderBy.SOURCE_TIMESTAMP and 409 params.source_t_from is not None): 410 events = itertools.dropwhile( 411 lambda i: i.source_timestamp < params.source_t_from, 412 events) 413 414 if (params.order_by == common.OrderBy.TIMESTAMP and 415 params.t_to is not None): 416 events = itertools.takewhile( 417 lambda i: i.timestamp <= params.t_to, 418 events) 419 420 elif (params.order_by == common.OrderBy.SOURCE_TIMESTAMP and 421 params.source_t_to is not None): 422 events = itertools.takewhile( 423 lambda i: i.source_timestamp <= params.source_t_to, 424 events) 425 426 else: 427 raise ValueError('unsupported order') 428 429 return filter.process(events) 430 431 432class _Filter: 433 434 def __init__(self, 435 subscription: common.Subscription, 436 t_from: common.Timestamp | None, 437 t_to: common.Timestamp | None, 438 source_t_from: common.Timestamp | None, 439 source_t_to: common.Timestamp | None, 440 max_results: int, 441 last_event_id: common.EventId | None): 442 self._subscription = subscription 443 self._t_from = t_from 444 self._t_to = t_to 445 self._source_t_from = source_t_from 446 self._source_t_to = source_t_to 447 self._max_results = max_results 448 self._last_event_id = last_event_id 449 450 @property 451 def done(self): 452 return self._max_results < 1 453 454 def process(self, events: Iterable[common.Event]): 455 for event in events: 456 if self._max_results < 1: 457 return 458 459 if self._last_event_id: 460 if event.id == self._last_event_id: 461 self._last_event_id = None 462 463 continue 464 465 if self._t_from is not None and event.timestamp < self._t_from: 466 continue 467 468 if self._t_to is not None and self._t_to < event.timestamp: 469 continue 470 471 if self._source_t_from is not None and ( 472 event.source_timestamp is None or 473 event.source_timestamp < self._source_t_from): 474 continue 475 476 if self._source_t_to is not None and ( 477 event.source_timestamp is None or 478 self._source_t_to < event.source_timestamp): 479 continue 480 481 if (self._subscription and 482 not self._subscription.matches(event.type)): 483 continue 484 485 self._max_results -= 1 486 yield event
Changes: TypeAlias =
dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]]
class
Limit(typing.NamedTuple):
19class Limit(typing.NamedTuple): 20 min_entries: int | None = None 21 max_entries: int | None = None 22 duration: float | None = None 23 size: int | None = None
Limit(min_entries, max_entries, duration, size)
class
Partition(typing.NamedTuple):
26class Partition(typing.NamedTuple): 27 order_by: common.OrderBy 28 subscription: common.Subscription 29 limit: Limit | None
Partition(order_by, subscription, limit)
Partition( order_by: hat.event.common.OrderBy, subscription: hat.event.common.Subscription, limit: Limit | None)
Create new instance of Partition(order_by, subscription, limit)
def
ext_create( env: hat.event.backends.lmdb.environment.Environment, txn: Transaction, conditions: hat.event.backends.lmdb.conditions.Conditions, partitions: Iterable[Partition], max_results: int = 4096) -> TimeseriesDb:
32def ext_create(env: environment.Environment, 33 txn: lmdb.Transaction, 34 conditions: Conditions, 35 partitions: Iterable[Partition], 36 max_results: int = 4096 37 ) -> 'TimeseriesDb': 38 db = TimeseriesDb() 39 db._env = env 40 db._conditions = conditions 41 db._max_results = max_results 42 db._changes = collections.defaultdict(collections.deque) 43 44 # depending on dict order 45 db._partitions = dict(_ext_init_partitions(env, txn, partitions)) 46 47 return db
class
TimeseriesDb:
50class TimeseriesDb: 51 52 def add(self, 53 event: common.Event 54 ) -> Iterable[common.EventRef]: 55 for partition_id, partition in self._partitions.items(): 56 if not partition.subscription.matches(event.type): 57 continue 58 59 if partition.order_by == common.OrderBy.TIMESTAMP: 60 timestamp = event.timestamp 61 62 elif partition.order_by == common.OrderBy.SOURCE_TIMESTAMP: 63 if event.source_timestamp is None: 64 continue 65 66 timestamp = event.source_timestamp 67 68 else: 69 raise ValueError('unsupported order by') 70 71 self._changes[partition_id].append((timestamp, event)) 72 73 yield common.TimeseriesEventRef( 74 (partition_id, timestamp, event.id)) 75 76 async def query(self, 77 params: common.QueryTimeseriesParams 78 ) -> common.QueryResult: 79 subscription = (common.create_subscription(params.event_types) 80 if params.event_types is not None else None) 81 82 max_results = (params.max_results 83 if params.max_results is not None and 84 params.max_results < self._max_results 85 else self._max_results) 86 87 for partition_id, partition in self._partitions.items(): 88 if partition.order_by != params.order_by: 89 continue 90 91 if (subscription and 92 subscription.isdisjoint(partition.subscription)): 93 continue 94 95 return await self._query_partition(partition_id, params, 96 subscription, max_results) 97 98 return common.QueryResult(events=[], 99 more_follows=False) 100 101 def create_changes(self) -> Changes: 102 changes, self._changes = (self._changes, 103 collections.defaultdict(collections.deque)) 104 return changes 105 106 def ext_write(self, 107 txn: lmdb.Transaction, 108 changes: Changes): 109 data = (((partition_id, timestamp, event.id), event) 110 for partition_id, partition_changes in changes.items() 111 for timestamp, event in partition_changes) 112 self._env.ext_write(txn, common.DbType.TIMESERIES_DATA, data) 113 114 counts = ((partition_id, len(partition_changes)) 115 for partition_id, partition_changes in changes.items()) 116 _ext_inc_partition_count(self._env, txn, counts) 117 118 def ext_cleanup(self, 119 txn: lmdb.Transaction, 120 now: common.Timestamp, 121 max_results: int | None = None, 122 ) -> collections.deque[tuple[common.EventId, 123 common.EventRef]]: 124 result = collections.deque() 125 126 for partition_id, partition in self._partitions.items(): 127 if not partition.limit: 128 continue 129 130 partition_max_results = (max_results - len(result) 131 if max_results is not None else None) 132 if partition_max_results is not None and partition_max_results < 1: 133 break 134 135 result.extend(_ext_cleanup_partition(self._env, txn, now, 136 partition_id, partition.limit, 137 partition_max_results)) 138 139 return result 140 141 async def _query_partition(self, partition_id, params, subscription, 142 max_results): 143 events = collections.deque() 144 changes = self._changes 145 146 filter = _Filter(subscription=subscription, 147 t_from=params.t_from, 148 t_to=params.t_to, 149 source_t_from=params.source_t_from, 150 source_t_to=params.source_t_to, 151 max_results=max_results + 1, 152 last_event_id=params.last_event_id) 153 154 if params.order == common.Order.DESCENDING: 155 events.extend(_query_partition_changes( 156 changes[partition_id], params, filter)) 157 158 if not filter.done: 159 events.extend(await self._env.execute( 160 _ext_query_partition_events, self._env, self._conditions, 161 partition_id, params, filter)) 162 163 elif params.order == common.Order.ASCENDING: 164 events.extend(await self._env.execute( 165 _ext_query_partition_events, self._env, self._conditions, 166 partition_id, params, filter)) 167 168 if not filter.done: 169 events.extend(_query_partition_changes( 170 changes[partition_id], params, filter)) 171 172 else: 173 raise ValueError('unsupported order') 174 175 more_follows = len(events) > max_results 176 while len(events) > max_results: 177 events.pop() 178 179 return common.QueryResult(events=events, 180 more_follows=more_follows)
def
add( self, event: hat.event.common.Event) -> Iterable[hat.event.backends.lmdb.common.LatestEventRef | hat.event.backends.lmdb.common.TimeseriesEventRef]:
52 def add(self, 53 event: common.Event 54 ) -> Iterable[common.EventRef]: 55 for partition_id, partition in self._partitions.items(): 56 if not partition.subscription.matches(event.type): 57 continue 58 59 if partition.order_by == common.OrderBy.TIMESTAMP: 60 timestamp = event.timestamp 61 62 elif partition.order_by == common.OrderBy.SOURCE_TIMESTAMP: 63 if event.source_timestamp is None: 64 continue 65 66 timestamp = event.source_timestamp 67 68 else: 69 raise ValueError('unsupported order by') 70 71 self._changes[partition_id].append((timestamp, event)) 72 73 yield common.TimeseriesEventRef( 74 (partition_id, timestamp, event.id))
async def
query( self, params: hat.event.common.QueryTimeseriesParams) -> hat.event.common.QueryResult:
76 async def query(self, 77 params: common.QueryTimeseriesParams 78 ) -> common.QueryResult: 79 subscription = (common.create_subscription(params.event_types) 80 if params.event_types is not None else None) 81 82 max_results = (params.max_results 83 if params.max_results is not None and 84 params.max_results < self._max_results 85 else self._max_results) 86 87 for partition_id, partition in self._partitions.items(): 88 if partition.order_by != params.order_by: 89 continue 90 91 if (subscription and 92 subscription.isdisjoint(partition.subscription)): 93 continue 94 95 return await self._query_partition(partition_id, params, 96 subscription, max_results) 97 98 return common.QueryResult(events=[], 99 more_follows=False)
def
create_changes( self) -> dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]]:
def
ext_write( self, txn: Transaction, changes: dict[int, collections.deque[tuple[hat.event.common.Timestamp, hat.event.common.Event]]]):
106 def ext_write(self, 107 txn: lmdb.Transaction, 108 changes: Changes): 109 data = (((partition_id, timestamp, event.id), event) 110 for partition_id, partition_changes in changes.items() 111 for timestamp, event in partition_changes) 112 self._env.ext_write(txn, common.DbType.TIMESERIES_DATA, data) 113 114 counts = ((partition_id, len(partition_changes)) 115 for partition_id, partition_changes in changes.items()) 116 _ext_inc_partition_count(self._env, txn, counts)
def
ext_cleanup( self, txn: Transaction, now: hat.event.common.Timestamp, max_results: int | None = None) -> collections.deque[tuple[hat.event.common.EventId, hat.event.backends.lmdb.common.LatestEventRef | hat.event.backends.lmdb.common.TimeseriesEventRef]]:
118 def ext_cleanup(self, 119 txn: lmdb.Transaction, 120 now: common.Timestamp, 121 max_results: int | None = None, 122 ) -> collections.deque[tuple[common.EventId, 123 common.EventRef]]: 124 result = collections.deque() 125 126 for partition_id, partition in self._partitions.items(): 127 if not partition.limit: 128 continue 129 130 partition_max_results = (max_results - len(result) 131 if max_results is not None else None) 132 if partition_max_results is not None and partition_max_results < 1: 133 break 134 135 result.extend(_ext_cleanup_partition(self._env, txn, now, 136 partition_id, partition.limit, 137 partition_max_results)) 138 139 return result