hat.event.backends.lmdb.latestdb

  1import itertools
  2import typing
  3
  4import lmdb
  5
  6from hat.event.backends.lmdb import common
  7from hat.event.backends.lmdb import environment
  8from hat.event.backends.lmdb.conditions import Conditions
  9
 10
 11class Changes(typing.NamedTuple):
 12    data: dict[common.EventTypeRef, common.Event]
 13    types: dict[common.EventTypeRef, common.EventId]
 14
 15
 16class AddResult(typing.NamedTuple):
 17    added_ref: common.EventRef | None
 18    removed_ref: tuple[common.EventId, common.EventRef] | None
 19
 20
 21def ext_create(env: environment.Environment,
 22               txn: lmdb.Transaction,
 23               conditions: Conditions,
 24               subscription: common.Subscription
 25               ) -> 'LatestDb':
 26    db = LatestDb()
 27    db._env = env
 28    db._subscription = subscription
 29    db._changes = Changes({}, {})
 30
 31    db._event_type_refs = {
 32        event_type: ref
 33        for ref, event_type in env.ext_read(txn, common.DbType.LATEST_TYPE)}
 34
 35    db._events = {
 36        event.type: event
 37        for ref, event in env.ext_read(txn, common.DbType.LATEST_DATA)
 38        if conditions.matches(event) and
 39        subscription.matches(event.type) and
 40        db._event_type_refs.get(event.type) == ref}
 41
 42    db._next_event_type_refs = itertools.count(
 43        max(db._event_type_refs.values(), default=0) + 1)
 44
 45    return db
 46
 47
 48class LatestDb:
 49
 50    def add(self,
 51            event: common.Event
 52            ) -> AddResult:
 53        if not self._subscription.matches(event.type):
 54            return AddResult(added_ref=None,
 55                             removed_ref=None)
 56
 57        previous_event = self._events.get(event.type)
 58        if previous_event and previous_event > event:
 59            return AddResult(added_ref=None,
 60                             removed_ref=None)
 61
 62        event_type_ref = self._event_type_refs.get(event.type)
 63        if event_type_ref is None:
 64            event_type_ref = next(self._next_event_type_refs)
 65            self._changes.types[event_type_ref] = event.type
 66            self._event_type_refs[event.type] = event_type_ref
 67
 68        self._changes.data[event_type_ref] = event
 69        self._events[event.type] = event
 70
 71        added_ref = common.LatestEventRef(event_type_ref)
 72        removed_ref = ((previous_event.id, added_ref)
 73                       if previous_event else None)
 74        return AddResult(added_ref=added_ref,
 75                         removed_ref=removed_ref)
 76
 77    def query(self,
 78              params: common.QueryLatestParams
 79              ) -> common.QueryResult:
 80        event_types = (set(params.event_types)
 81                       if params.event_types is not None
 82                       else None)
 83
 84        if event_types is None or ('*', ) in event_types:
 85            events = self._events.values()
 86
 87        elif any('*' in event_type or '?' in event_type
 88                 for event_type in event_types):
 89            subscription = common.create_subscription(event_types)
 90            events = (event for event in self._events.values()
 91                      if subscription.matches(event.type))
 92
 93        elif len(event_types) < len(self._events):
 94            events = (self._events.get(event_type)
 95                      for event_type in event_types)
 96            events = (event for event in events if event)
 97
 98        else:
 99            events = (event for event in self._events.values()
100                      if event.type in event_types)
101
102        return common.QueryResult(events=list(events),
103                                  more_follows=False)
104
105    def create_changes(self) -> Changes:
106        changes, self._changes = self._changes, Changes({}, {})
107        return changes
108
109    def ext_write(self,
110                  txn: lmdb.Transaction,
111                  changes: Changes):
112        self._env.ext_write(txn, common.DbType.LATEST_DATA,
113                            changes.data.items())
114
115        self._env.ext_write(txn, common.DbType.LATEST_TYPE,
116                            changes.types.items())
class Changes(typing.NamedTuple):
12class Changes(typing.NamedTuple):
13    data: dict[common.EventTypeRef, common.Event]
14    types: dict[common.EventTypeRef, common.EventId]

Changes(data, types)

Changes( data: dict[int, hat.event.common.Event], types: dict[int, hat.event.common.EventId])

Create new instance of Changes(data, types)

data: dict[int, hat.event.common.Event]

Alias for field number 0

types: dict[int, hat.event.common.EventId]

Alias for field number 1

class AddResult(typing.NamedTuple):
17class AddResult(typing.NamedTuple):
18    added_ref: common.EventRef | None
19    removed_ref: tuple[common.EventId, common.EventRef] | None

AddResult(added_ref, removed_ref)

def ext_create( env: hat.event.backends.lmdb.environment.Environment, txn: Transaction, conditions: hat.event.backends.lmdb.conditions.Conditions, subscription: hat.event.common.Subscription) -> LatestDb:
22def ext_create(env: environment.Environment,
23               txn: lmdb.Transaction,
24               conditions: Conditions,
25               subscription: common.Subscription
26               ) -> 'LatestDb':
27    db = LatestDb()
28    db._env = env
29    db._subscription = subscription
30    db._changes = Changes({}, {})
31
32    db._event_type_refs = {
33        event_type: ref
34        for ref, event_type in env.ext_read(txn, common.DbType.LATEST_TYPE)}
35
36    db._events = {
37        event.type: event
38        for ref, event in env.ext_read(txn, common.DbType.LATEST_DATA)
39        if conditions.matches(event) and
40        subscription.matches(event.type) and
41        db._event_type_refs.get(event.type) == ref}
42
43    db._next_event_type_refs = itertools.count(
44        max(db._event_type_refs.values(), default=0) + 1)
45
46    return db
class LatestDb:
 49class LatestDb:
 50
 51    def add(self,
 52            event: common.Event
 53            ) -> AddResult:
 54        if not self._subscription.matches(event.type):
 55            return AddResult(added_ref=None,
 56                             removed_ref=None)
 57
 58        previous_event = self._events.get(event.type)
 59        if previous_event and previous_event > event:
 60            return AddResult(added_ref=None,
 61                             removed_ref=None)
 62
 63        event_type_ref = self._event_type_refs.get(event.type)
 64        if event_type_ref is None:
 65            event_type_ref = next(self._next_event_type_refs)
 66            self._changes.types[event_type_ref] = event.type
 67            self._event_type_refs[event.type] = event_type_ref
 68
 69        self._changes.data[event_type_ref] = event
 70        self._events[event.type] = event
 71
 72        added_ref = common.LatestEventRef(event_type_ref)
 73        removed_ref = ((previous_event.id, added_ref)
 74                       if previous_event else None)
 75        return AddResult(added_ref=added_ref,
 76                         removed_ref=removed_ref)
 77
 78    def query(self,
 79              params: common.QueryLatestParams
 80              ) -> common.QueryResult:
 81        event_types = (set(params.event_types)
 82                       if params.event_types is not None
 83                       else None)
 84
 85        if event_types is None or ('*', ) in event_types:
 86            events = self._events.values()
 87
 88        elif any('*' in event_type or '?' in event_type
 89                 for event_type in event_types):
 90            subscription = common.create_subscription(event_types)
 91            events = (event for event in self._events.values()
 92                      if subscription.matches(event.type))
 93
 94        elif len(event_types) < len(self._events):
 95            events = (self._events.get(event_type)
 96                      for event_type in event_types)
 97            events = (event for event in events if event)
 98
 99        else:
100            events = (event for event in self._events.values()
101                      if event.type in event_types)
102
103        return common.QueryResult(events=list(events),
104                                  more_follows=False)
105
106    def create_changes(self) -> Changes:
107        changes, self._changes = self._changes, Changes({}, {})
108        return changes
109
110    def ext_write(self,
111                  txn: lmdb.Transaction,
112                  changes: Changes):
113        self._env.ext_write(txn, common.DbType.LATEST_DATA,
114                            changes.data.items())
115
116        self._env.ext_write(txn, common.DbType.LATEST_TYPE,
117                            changes.types.items())
def add( self, event: hat.event.common.Event) -> AddResult:
51    def add(self,
52            event: common.Event
53            ) -> AddResult:
54        if not self._subscription.matches(event.type):
55            return AddResult(added_ref=None,
56                             removed_ref=None)
57
58        previous_event = self._events.get(event.type)
59        if previous_event and previous_event > event:
60            return AddResult(added_ref=None,
61                             removed_ref=None)
62
63        event_type_ref = self._event_type_refs.get(event.type)
64        if event_type_ref is None:
65            event_type_ref = next(self._next_event_type_refs)
66            self._changes.types[event_type_ref] = event.type
67            self._event_type_refs[event.type] = event_type_ref
68
69        self._changes.data[event_type_ref] = event
70        self._events[event.type] = event
71
72        added_ref = common.LatestEventRef(event_type_ref)
73        removed_ref = ((previous_event.id, added_ref)
74                       if previous_event else None)
75        return AddResult(added_ref=added_ref,
76                         removed_ref=removed_ref)
def query( self, params: hat.event.common.QueryLatestParams) -> hat.event.common.QueryResult:
 78    def query(self,
 79              params: common.QueryLatestParams
 80              ) -> common.QueryResult:
 81        event_types = (set(params.event_types)
 82                       if params.event_types is not None
 83                       else None)
 84
 85        if event_types is None or ('*', ) in event_types:
 86            events = self._events.values()
 87
 88        elif any('*' in event_type or '?' in event_type
 89                 for event_type in event_types):
 90            subscription = common.create_subscription(event_types)
 91            events = (event for event in self._events.values()
 92                      if subscription.matches(event.type))
 93
 94        elif len(event_types) < len(self._events):
 95            events = (self._events.get(event_type)
 96                      for event_type in event_types)
 97            events = (event for event in events if event)
 98
 99        else:
100            events = (event for event in self._events.values()
101                      if event.type in event_types)
102
103        return common.QueryResult(events=list(events),
104                                  more_follows=False)
def create_changes(self) -> Changes:
106    def create_changes(self) -> Changes:
107        changes, self._changes = self._changes, Changes({}, {})
108        return changes
def ext_write( self, txn: Transaction, changes: Changes):
110    def ext_write(self,
111                  txn: lmdb.Transaction,
112                  changes: Changes):
113        self._env.ext_write(txn, common.DbType.LATEST_DATA,
114                            changes.data.items())
115
116        self._env.ext_write(txn, common.DbType.LATEST_TYPE,
117                            changes.types.items())