hat.event.backends.lmdb.latestdb
1import itertools 2import typing 3 4import lmdb 5 6from hat.event.backends.lmdb import common 7from hat.event.backends.lmdb import environment 8from hat.event.backends.lmdb.conditions import Conditions 9 10 11class Changes(typing.NamedTuple): 12 data: dict[common.EventTypeRef, common.Event] 13 types: dict[common.EventTypeRef, common.EventId] 14 15 16class AddResult(typing.NamedTuple): 17 added_ref: common.EventRef | None 18 removed_ref: tuple[common.EventId, common.EventRef] | None 19 20 21def ext_create(env: environment.Environment, 22 txn: lmdb.Transaction, 23 conditions: Conditions, 24 subscription: common.Subscription 25 ) -> 'LatestDb': 26 db = LatestDb() 27 db._env = env 28 db._subscription = subscription 29 db._changes = Changes({}, {}) 30 31 db._event_type_refs = { 32 event_type: ref 33 for ref, event_type in env.ext_read(txn, common.DbType.LATEST_TYPE)} 34 35 db._events = { 36 event.type: event 37 for ref, event in env.ext_read(txn, common.DbType.LATEST_DATA) 38 if conditions.matches(event) and 39 subscription.matches(event.type) and 40 db._event_type_refs.get(event.type) == ref} 41 42 db._next_event_type_refs = itertools.count( 43 max(db._event_type_refs.values(), default=0) + 1) 44 45 return db 46 47 48class LatestDb: 49 50 def add(self, 51 event: common.Event 52 ) -> AddResult: 53 if not self._subscription.matches(event.type): 54 return AddResult(added_ref=None, 55 removed_ref=None) 56 57 previous_event = self._events.get(event.type) 58 if previous_event and previous_event > event: 59 return AddResult(added_ref=None, 60 removed_ref=None) 61 62 event_type_ref = self._event_type_refs.get(event.type) 63 if event_type_ref is None: 64 event_type_ref = next(self._next_event_type_refs) 65 self._changes.types[event_type_ref] = event.type 66 self._event_type_refs[event.type] = event_type_ref 67 68 self._changes.data[event_type_ref] = event 69 self._events[event.type] = event 70 71 added_ref = common.LatestEventRef(event_type_ref) 72 removed_ref = ((previous_event.id, added_ref) 73 if previous_event else None) 74 return AddResult(added_ref=added_ref, 75 removed_ref=removed_ref) 76 77 def query(self, 78 params: common.QueryLatestParams 79 ) -> common.QueryResult: 80 event_types = (set(params.event_types) 81 if params.event_types is not None 82 else None) 83 84 if event_types is None or ('*', ) in event_types: 85 events = self._events.values() 86 87 elif any('*' in event_type or '?' in event_type 88 for event_type in event_types): 89 subscription = common.create_subscription(event_types) 90 events = (event for event in self._events.values() 91 if subscription.matches(event.type)) 92 93 elif len(event_types) < len(self._events): 94 events = (self._events.get(event_type) 95 for event_type in event_types) 96 events = (event for event in events if event) 97 98 else: 99 events = (event for event in self._events.values() 100 if event.type in event_types) 101 102 return common.QueryResult(events=list(events), 103 more_follows=False) 104 105 def create_changes(self) -> Changes: 106 changes, self._changes = self._changes, Changes({}, {}) 107 return changes 108 109 def ext_write(self, 110 txn: lmdb.Transaction, 111 changes: Changes): 112 self._env.ext_write(txn, common.DbType.LATEST_DATA, 113 changes.data.items()) 114 115 self._env.ext_write(txn, common.DbType.LATEST_TYPE, 116 changes.types.items())
class
Changes(typing.NamedTuple):
12class Changes(typing.NamedTuple): 13 data: dict[common.EventTypeRef, common.Event] 14 types: dict[common.EventTypeRef, common.EventId]
Changes(data, types)
Changes( data: dict[int, hat.event.common.Event], types: dict[int, hat.event.common.EventId])
Create new instance of Changes(data, types)
class
AddResult(typing.NamedTuple):
17class AddResult(typing.NamedTuple): 18 added_ref: common.EventRef | None 19 removed_ref: tuple[common.EventId, common.EventRef] | None
AddResult(added_ref, removed_ref)
AddResult( added_ref: hat.event.backends.lmdb.common.LatestEventRef | hat.event.backends.lmdb.common.TimeseriesEventRef | None, removed_ref: tuple[hat.event.common.EventId, hat.event.backends.lmdb.common.LatestEventRef | hat.event.backends.lmdb.common.TimeseriesEventRef] | None)
Create new instance of AddResult(added_ref, removed_ref)
added_ref: hat.event.backends.lmdb.common.LatestEventRef | hat.event.backends.lmdb.common.TimeseriesEventRef | None
Alias for field number 0
removed_ref: tuple[hat.event.common.EventId, hat.event.backends.lmdb.common.LatestEventRef | hat.event.backends.lmdb.common.TimeseriesEventRef] | None
Alias for field number 1
def
ext_create( env: hat.event.backends.lmdb.environment.Environment, txn: Transaction, conditions: hat.event.backends.lmdb.conditions.Conditions, subscription: hat.event.common.Subscription) -> LatestDb:
22def ext_create(env: environment.Environment, 23 txn: lmdb.Transaction, 24 conditions: Conditions, 25 subscription: common.Subscription 26 ) -> 'LatestDb': 27 db = LatestDb() 28 db._env = env 29 db._subscription = subscription 30 db._changes = Changes({}, {}) 31 32 db._event_type_refs = { 33 event_type: ref 34 for ref, event_type in env.ext_read(txn, common.DbType.LATEST_TYPE)} 35 36 db._events = { 37 event.type: event 38 for ref, event in env.ext_read(txn, common.DbType.LATEST_DATA) 39 if conditions.matches(event) and 40 subscription.matches(event.type) and 41 db._event_type_refs.get(event.type) == ref} 42 43 db._next_event_type_refs = itertools.count( 44 max(db._event_type_refs.values(), default=0) + 1) 45 46 return db
class
LatestDb:
49class LatestDb: 50 51 def add(self, 52 event: common.Event 53 ) -> AddResult: 54 if not self._subscription.matches(event.type): 55 return AddResult(added_ref=None, 56 removed_ref=None) 57 58 previous_event = self._events.get(event.type) 59 if previous_event and previous_event > event: 60 return AddResult(added_ref=None, 61 removed_ref=None) 62 63 event_type_ref = self._event_type_refs.get(event.type) 64 if event_type_ref is None: 65 event_type_ref = next(self._next_event_type_refs) 66 self._changes.types[event_type_ref] = event.type 67 self._event_type_refs[event.type] = event_type_ref 68 69 self._changes.data[event_type_ref] = event 70 self._events[event.type] = event 71 72 added_ref = common.LatestEventRef(event_type_ref) 73 removed_ref = ((previous_event.id, added_ref) 74 if previous_event else None) 75 return AddResult(added_ref=added_ref, 76 removed_ref=removed_ref) 77 78 def query(self, 79 params: common.QueryLatestParams 80 ) -> common.QueryResult: 81 event_types = (set(params.event_types) 82 if params.event_types is not None 83 else None) 84 85 if event_types is None or ('*', ) in event_types: 86 events = self._events.values() 87 88 elif any('*' in event_type or '?' in event_type 89 for event_type in event_types): 90 subscription = common.create_subscription(event_types) 91 events = (event for event in self._events.values() 92 if subscription.matches(event.type)) 93 94 elif len(event_types) < len(self._events): 95 events = (self._events.get(event_type) 96 for event_type in event_types) 97 events = (event for event in events if event) 98 99 else: 100 events = (event for event in self._events.values() 101 if event.type in event_types) 102 103 return common.QueryResult(events=list(events), 104 more_follows=False) 105 106 def create_changes(self) -> Changes: 107 changes, self._changes = self._changes, Changes({}, {}) 108 return changes 109 110 def ext_write(self, 111 txn: lmdb.Transaction, 112 changes: Changes): 113 self._env.ext_write(txn, common.DbType.LATEST_DATA, 114 changes.data.items()) 115 116 self._env.ext_write(txn, common.DbType.LATEST_TYPE, 117 changes.types.items())
51 def add(self, 52 event: common.Event 53 ) -> AddResult: 54 if not self._subscription.matches(event.type): 55 return AddResult(added_ref=None, 56 removed_ref=None) 57 58 previous_event = self._events.get(event.type) 59 if previous_event and previous_event > event: 60 return AddResult(added_ref=None, 61 removed_ref=None) 62 63 event_type_ref = self._event_type_refs.get(event.type) 64 if event_type_ref is None: 65 event_type_ref = next(self._next_event_type_refs) 66 self._changes.types[event_type_ref] = event.type 67 self._event_type_refs[event.type] = event_type_ref 68 69 self._changes.data[event_type_ref] = event 70 self._events[event.type] = event 71 72 added_ref = common.LatestEventRef(event_type_ref) 73 removed_ref = ((previous_event.id, added_ref) 74 if previous_event else None) 75 return AddResult(added_ref=added_ref, 76 removed_ref=removed_ref)
78 def query(self, 79 params: common.QueryLatestParams 80 ) -> common.QueryResult: 81 event_types = (set(params.event_types) 82 if params.event_types is not None 83 else None) 84 85 if event_types is None or ('*', ) in event_types: 86 events = self._events.values() 87 88 elif any('*' in event_type or '?' in event_type 89 for event_type in event_types): 90 subscription = common.create_subscription(event_types) 91 events = (event for event in self._events.values() 92 if subscription.matches(event.type)) 93 94 elif len(event_types) < len(self._events): 95 events = (self._events.get(event_type) 96 for event_type in event_types) 97 events = (event for event in events if event) 98 99 else: 100 events = (event for event in self._events.values() 101 if event.type in event_types) 102 103 return common.QueryResult(events=list(events), 104 more_follows=False)