hat.event.backends.memory

Simple memory backend

All registered events are stored in single unsorted continuous event list.

  1"""Simple memory backend
  2
  3All registered events are stored in single unsorted continuous event list.
  4
  5"""
  6
  7import collections
  8
  9from hat import aio
 10
 11from hat.event import common
 12
 13
 14class MemoryBackend(common.Backend):
 15
 16    def __init__(self, conf, registered_events_cb, flushed_events_cb):
 17        self._registered_events_cb = registered_events_cb
 18        self._flushed_events_cb = flushed_events_cb
 19        self._async_group = aio.Group()
 20        self._events = collections.deque()
 21
 22    @property
 23    def async_group(self):
 24        return self._async_group
 25
 26    async def get_last_event_id(self, server_id):
 27        event_ids = (e.event_id for e in self._events
 28                     if e.server == server_id)
 29        default = common.EventId(server=server_id, session=0, instance=0)
 30        return max(event_ids, default=default)
 31
 32    async def register(self, events):
 33        self._events.extend(events)
 34
 35        if self._registered_events_cb:
 36            await aio.call(self._registered_events_cb, events)
 37
 38        if self._flushed_events_cb:
 39            await aio.call(self._flushed_events_cb, events)
 40
 41        return events
 42
 43    async def query(self, params):
 44        if isinstance(params, common.QueryLatestParams):
 45            return self._query_latest(params)
 46
 47        if isinstance(params, common.QueryTimeseriesParams):
 48            return self._query_timeseries(params)
 49
 50        if isinstance(params, common.QueryServerParams):
 51            return self._query_server(params)
 52
 53        raise ValueError('unsupported params type')
 54
 55    async def flush(self):
 56        pass
 57
 58    def _query_latest(self, params):
 59        events = self._events
 60
 61        if params.event_types is not None:
 62            events = _filter_event_types(events, params.event_types)
 63
 64        result = {}
 65        for event in events:
 66            previous = result.get(event.type)
 67            if previous is None or previous < event:
 68                result[event.type] = event
 69
 70        return common.QueryResult(events=list(result.values()),
 71                                  more_follows=False)
 72
 73    def _query_timeseries(self, params):
 74        events = self._events
 75
 76        if params.event_types is not None:
 77            events = _filter_event_types(events, params.event_types)
 78
 79        if params.t_from is not None:
 80            events = _filter_t_from(events, params.t_from)
 81
 82        if params.t_to is not None:
 83            events = _filter_t_to(events, params.t_to)
 84
 85        if params.source_t_from is not None:
 86            events = _filter_source_t_from(events, params.source_t_from)
 87
 88        if params.source_t_to is not None:
 89            events = _filter_source_t_to(events, params.source_t_to)
 90
 91        if params.order_by == common.OrderBy.TIMESTAMP:
 92            sort_key = lambda event: event.timestamp, event  # NOQA
 93        elif params.order_by == common.OrderBy.SOURCE_TIMESTAMP:
 94            sort_key = lambda event: event.source_timestamp, event  # NOQA
 95        else:
 96            raise ValueError('invalid order by')
 97
 98        if params.order == common.Order.ASCENDING:
 99            sort_reverse = False
100        elif params.order == common.Order.DESCENDING:
101            sort_reverse = True
102        else:
103            raise ValueError('invalid order by')
104
105        events = sorted(events, key=sort_key, reverse=sort_reverse)
106
107        if params.last_event_id and events:
108            for i, event in enumerate(events):
109                if event.id == params.last_event_id:
110                    break
111            events = events[i+1:]
112
113        if params.max_results is not None and len(events) > params.max_results:
114            more_follows = True
115            events = events[:params.max_results]
116        else:
117            more_follows = False
118
119        return common.QueryResult(events=events,
120                                  more_follows=more_follows)
121
122    def _query_server(self, params):
123        events = sorted(_filter_server_id(self._events, params.server_id))
124
125        if params.last_event_id and events:
126            for i, event in enumerate(events):
127                if event.id > params.last_event_id:
128                    break
129            events = events[i:]
130
131        if params.max_results is not None and len(events) > params.max_results:
132            more_follows = True
133            events = events[:params.max_results]
134        else:
135            more_follows = False
136
137        return common.QueryResult(events=events,
138                                  more_follows=more_follows)
139
140
141info = common.BackendInfo(MemoryBackend)
142
143
144def _filter_event_types(events, event_types):
145    subscription = common.create_subscription(event_types)
146    for event in events:
147        if subscription.matches(event.type):
148            yield event
149
150
151def _filter_t_from(events, t_from):
152    for event in events:
153        if event.timestamp >= t_from:
154            yield event
155
156
157def _filter_t_to(events, t_to):
158    for event in events:
159        if event.timestamp <= t_to:
160            yield event
161
162
163def _filter_source_t_from(events, source_t_from):
164    for event in events:
165        if event.source_timestamp >= source_t_from:
166            yield event
167
168
169def _filter_source_t_to(events, source_t_to):
170    for event in events:
171        if event.source_timestamp <= source_t_to:
172            yield event
173
174
175def _filter_server_id(events, server_id):
176    for event in events:
177        if event.event_id.server == server_id:
178            yield event
class MemoryBackend(hat.event.common.backend.Backend):
 15class MemoryBackend(common.Backend):
 16
 17    def __init__(self, conf, registered_events_cb, flushed_events_cb):
 18        self._registered_events_cb = registered_events_cb
 19        self._flushed_events_cb = flushed_events_cb
 20        self._async_group = aio.Group()
 21        self._events = collections.deque()
 22
 23    @property
 24    def async_group(self):
 25        return self._async_group
 26
 27    async def get_last_event_id(self, server_id):
 28        event_ids = (e.event_id for e in self._events
 29                     if e.server == server_id)
 30        default = common.EventId(server=server_id, session=0, instance=0)
 31        return max(event_ids, default=default)
 32
 33    async def register(self, events):
 34        self._events.extend(events)
 35
 36        if self._registered_events_cb:
 37            await aio.call(self._registered_events_cb, events)
 38
 39        if self._flushed_events_cb:
 40            await aio.call(self._flushed_events_cb, events)
 41
 42        return events
 43
 44    async def query(self, params):
 45        if isinstance(params, common.QueryLatestParams):
 46            return self._query_latest(params)
 47
 48        if isinstance(params, common.QueryTimeseriesParams):
 49            return self._query_timeseries(params)
 50
 51        if isinstance(params, common.QueryServerParams):
 52            return self._query_server(params)
 53
 54        raise ValueError('unsupported params type')
 55
 56    async def flush(self):
 57        pass
 58
 59    def _query_latest(self, params):
 60        events = self._events
 61
 62        if params.event_types is not None:
 63            events = _filter_event_types(events, params.event_types)
 64
 65        result = {}
 66        for event in events:
 67            previous = result.get(event.type)
 68            if previous is None or previous < event:
 69                result[event.type] = event
 70
 71        return common.QueryResult(events=list(result.values()),
 72                                  more_follows=False)
 73
 74    def _query_timeseries(self, params):
 75        events = self._events
 76
 77        if params.event_types is not None:
 78            events = _filter_event_types(events, params.event_types)
 79
 80        if params.t_from is not None:
 81            events = _filter_t_from(events, params.t_from)
 82
 83        if params.t_to is not None:
 84            events = _filter_t_to(events, params.t_to)
 85
 86        if params.source_t_from is not None:
 87            events = _filter_source_t_from(events, params.source_t_from)
 88
 89        if params.source_t_to is not None:
 90            events = _filter_source_t_to(events, params.source_t_to)
 91
 92        if params.order_by == common.OrderBy.TIMESTAMP:
 93            sort_key = lambda event: event.timestamp, event  # NOQA
 94        elif params.order_by == common.OrderBy.SOURCE_TIMESTAMP:
 95            sort_key = lambda event: event.source_timestamp, event  # NOQA
 96        else:
 97            raise ValueError('invalid order by')
 98
 99        if params.order == common.Order.ASCENDING:
100            sort_reverse = False
101        elif params.order == common.Order.DESCENDING:
102            sort_reverse = True
103        else:
104            raise ValueError('invalid order by')
105
106        events = sorted(events, key=sort_key, reverse=sort_reverse)
107
108        if params.last_event_id and events:
109            for i, event in enumerate(events):
110                if event.id == params.last_event_id:
111                    break
112            events = events[i+1:]
113
114        if params.max_results is not None and len(events) > params.max_results:
115            more_follows = True
116            events = events[:params.max_results]
117        else:
118            more_follows = False
119
120        return common.QueryResult(events=events,
121                                  more_follows=more_follows)
122
123    def _query_server(self, params):
124        events = sorted(_filter_server_id(self._events, params.server_id))
125
126        if params.last_event_id and events:
127            for i, event in enumerate(events):
128                if event.id > params.last_event_id:
129                    break
130            events = events[i:]
131
132        if params.max_results is not None and len(events) > params.max_results:
133            more_follows = True
134            events = events[:params.max_results]
135        else:
136            more_follows = False
137
138        return common.QueryResult(events=events,
139                                  more_follows=more_follows)

Backend ABC

MemoryBackend(conf, registered_events_cb, flushed_events_cb)
17    def __init__(self, conf, registered_events_cb, flushed_events_cb):
18        self._registered_events_cb = registered_events_cb
19        self._flushed_events_cb = flushed_events_cb
20        self._async_group = aio.Group()
21        self._events = collections.deque()
async_group
23    @property
24    def async_group(self):
25        return self._async_group

Group controlling resource's lifetime.

async def get_last_event_id(self, server_id):
27    async def get_last_event_id(self, server_id):
28        event_ids = (e.event_id for e in self._events
29                     if e.server == server_id)
30        default = common.EventId(server=server_id, session=0, instance=0)
31        return max(event_ids, default=default)

Get last registered event id associated with server id

async def register(self, events):
33    async def register(self, events):
34        self._events.extend(events)
35
36        if self._registered_events_cb:
37            await aio.call(self._registered_events_cb, events)
38
39        if self._flushed_events_cb:
40            await aio.call(self._flushed_events_cb, events)
41
42        return events

Register events

async def query(self, params):
44    async def query(self, params):
45        if isinstance(params, common.QueryLatestParams):
46            return self._query_latest(params)
47
48        if isinstance(params, common.QueryTimeseriesParams):
49            return self._query_timeseries(params)
50
51        if isinstance(params, common.QueryServerParams):
52            return self._query_server(params)
53
54        raise ValueError('unsupported params type')

Query events

async def flush(self):
56    async def flush(self):
57        pass

Flush internal buffers and permanently persist events

info = BackendInfo(create=<class 'MemoryBackend'>, json_schema_id=None, json_schema_repo=None)