hat.event.backends.memory
Simple memory backend
All registered events are stored in single unsorted continuous event list.
1"""Simple memory backend 2 3All registered events are stored in single unsorted continuous event list. 4 5""" 6 7import collections 8 9from hat import aio 10 11from hat.event import common 12 13 14class MemoryBackend(common.Backend): 15 16 def __init__(self, conf, registered_events_cb, flushed_events_cb): 17 self._registered_events_cb = registered_events_cb 18 self._flushed_events_cb = flushed_events_cb 19 self._async_group = aio.Group() 20 self._events = collections.deque() 21 22 @property 23 def async_group(self): 24 return self._async_group 25 26 async def get_last_event_id(self, server_id): 27 event_ids = (e.event_id for e in self._events 28 if e.server == server_id) 29 default = common.EventId(server=server_id, session=0, instance=0) 30 return max(event_ids, default=default) 31 32 async def register(self, events): 33 self._events.extend(events) 34 35 if self._registered_events_cb: 36 await aio.call(self._registered_events_cb, events) 37 38 if self._flushed_events_cb: 39 await aio.call(self._flushed_events_cb, events) 40 41 return events 42 43 async def query(self, params): 44 if isinstance(params, common.QueryLatestParams): 45 return self._query_latest(params) 46 47 if isinstance(params, common.QueryTimeseriesParams): 48 return self._query_timeseries(params) 49 50 if isinstance(params, common.QueryServerParams): 51 return self._query_server(params) 52 53 raise ValueError('unsupported params type') 54 55 async def flush(self): 56 pass 57 58 def _query_latest(self, params): 59 events = self._events 60 61 if params.event_types is not None: 62 events = _filter_event_types(events, params.event_types) 63 64 result = {} 65 for event in events: 66 previous = result.get(event.type) 67 if previous is None or previous < event: 68 result[event.type] = event 69 70 return common.QueryResult(events=list(result.values()), 71 more_follows=False) 72 73 def _query_timeseries(self, params): 74 events = self._events 75 76 if params.event_types is not None: 77 events = _filter_event_types(events, params.event_types) 78 79 if params.t_from is not None: 80 events = _filter_t_from(events, params.t_from) 81 82 if params.t_to is not None: 83 events = _filter_t_to(events, params.t_to) 84 85 if params.source_t_from is not None: 86 events = _filter_source_t_from(events, params.source_t_from) 87 88 if params.source_t_to is not None: 89 events = _filter_source_t_to(events, params.source_t_to) 90 91 if params.order_by == common.OrderBy.TIMESTAMP: 92 sort_key = lambda event: event.timestamp, event # NOQA 93 elif params.order_by == common.OrderBy.SOURCE_TIMESTAMP: 94 sort_key = lambda event: event.source_timestamp, event # NOQA 95 else: 96 raise ValueError('invalid order by') 97 98 if params.order == common.Order.ASCENDING: 99 sort_reverse = False 100 elif params.order == common.Order.DESCENDING: 101 sort_reverse = True 102 else: 103 raise ValueError('invalid order by') 104 105 events = sorted(events, key=sort_key, reverse=sort_reverse) 106 107 if params.last_event_id and events: 108 for i, event in enumerate(events): 109 if event.id == params.last_event_id: 110 break 111 events = events[i+1:] 112 113 if params.max_results is not None and len(events) > params.max_results: 114 more_follows = True 115 events = events[:params.max_results] 116 else: 117 more_follows = False 118 119 return common.QueryResult(events=events, 120 more_follows=more_follows) 121 122 def _query_server(self, params): 123 events = sorted(_filter_server_id(self._events, params.server_id)) 124 125 if params.last_event_id and events: 126 for i, event in enumerate(events): 127 if event.id > params.last_event_id: 128 break 129 events = events[i:] 130 131 if params.max_results is not None and len(events) > params.max_results: 132 more_follows = True 133 events = events[:params.max_results] 134 else: 135 more_follows = False 136 137 return common.QueryResult(events=events, 138 more_follows=more_follows) 139 140 141info = common.BackendInfo(MemoryBackend) 142 143 144def _filter_event_types(events, event_types): 145 subscription = common.create_subscription(event_types) 146 for event in events: 147 if subscription.matches(event.type): 148 yield event 149 150 151def _filter_t_from(events, t_from): 152 for event in events: 153 if event.timestamp >= t_from: 154 yield event 155 156 157def _filter_t_to(events, t_to): 158 for event in events: 159 if event.timestamp <= t_to: 160 yield event 161 162 163def _filter_source_t_from(events, source_t_from): 164 for event in events: 165 if event.source_timestamp >= source_t_from: 166 yield event 167 168 169def _filter_source_t_to(events, source_t_to): 170 for event in events: 171 if event.source_timestamp <= source_t_to: 172 yield event 173 174 175def _filter_server_id(events, server_id): 176 for event in events: 177 if event.event_id.server == server_id: 178 yield event
class
MemoryBackend(hat.event.common.backend.Backend):
15class MemoryBackend(common.Backend): 16 17 def __init__(self, conf, registered_events_cb, flushed_events_cb): 18 self._registered_events_cb = registered_events_cb 19 self._flushed_events_cb = flushed_events_cb 20 self._async_group = aio.Group() 21 self._events = collections.deque() 22 23 @property 24 def async_group(self): 25 return self._async_group 26 27 async def get_last_event_id(self, server_id): 28 event_ids = (e.event_id for e in self._events 29 if e.server == server_id) 30 default = common.EventId(server=server_id, session=0, instance=0) 31 return max(event_ids, default=default) 32 33 async def register(self, events): 34 self._events.extend(events) 35 36 if self._registered_events_cb: 37 await aio.call(self._registered_events_cb, events) 38 39 if self._flushed_events_cb: 40 await aio.call(self._flushed_events_cb, events) 41 42 return events 43 44 async def query(self, params): 45 if isinstance(params, common.QueryLatestParams): 46 return self._query_latest(params) 47 48 if isinstance(params, common.QueryTimeseriesParams): 49 return self._query_timeseries(params) 50 51 if isinstance(params, common.QueryServerParams): 52 return self._query_server(params) 53 54 raise ValueError('unsupported params type') 55 56 async def flush(self): 57 pass 58 59 def _query_latest(self, params): 60 events = self._events 61 62 if params.event_types is not None: 63 events = _filter_event_types(events, params.event_types) 64 65 result = {} 66 for event in events: 67 previous = result.get(event.type) 68 if previous is None or previous < event: 69 result[event.type] = event 70 71 return common.QueryResult(events=list(result.values()), 72 more_follows=False) 73 74 def _query_timeseries(self, params): 75 events = self._events 76 77 if params.event_types is not None: 78 events = _filter_event_types(events, params.event_types) 79 80 if params.t_from is not None: 81 events = _filter_t_from(events, params.t_from) 82 83 if params.t_to is not None: 84 events = _filter_t_to(events, params.t_to) 85 86 if params.source_t_from is not None: 87 events = _filter_source_t_from(events, params.source_t_from) 88 89 if params.source_t_to is not None: 90 events = _filter_source_t_to(events, params.source_t_to) 91 92 if params.order_by == common.OrderBy.TIMESTAMP: 93 sort_key = lambda event: event.timestamp, event # NOQA 94 elif params.order_by == common.OrderBy.SOURCE_TIMESTAMP: 95 sort_key = lambda event: event.source_timestamp, event # NOQA 96 else: 97 raise ValueError('invalid order by') 98 99 if params.order == common.Order.ASCENDING: 100 sort_reverse = False 101 elif params.order == common.Order.DESCENDING: 102 sort_reverse = True 103 else: 104 raise ValueError('invalid order by') 105 106 events = sorted(events, key=sort_key, reverse=sort_reverse) 107 108 if params.last_event_id and events: 109 for i, event in enumerate(events): 110 if event.id == params.last_event_id: 111 break 112 events = events[i+1:] 113 114 if params.max_results is not None and len(events) > params.max_results: 115 more_follows = True 116 events = events[:params.max_results] 117 else: 118 more_follows = False 119 120 return common.QueryResult(events=events, 121 more_follows=more_follows) 122 123 def _query_server(self, params): 124 events = sorted(_filter_server_id(self._events, params.server_id)) 125 126 if params.last_event_id and events: 127 for i, event in enumerate(events): 128 if event.id > params.last_event_id: 129 break 130 events = events[i:] 131 132 if params.max_results is not None and len(events) > params.max_results: 133 more_follows = True 134 events = events[:params.max_results] 135 else: 136 more_follows = False 137 138 return common.QueryResult(events=events, 139 more_follows=more_follows)
Backend ABC
async def
get_last_event_id(self, server_id):
27 async def get_last_event_id(self, server_id): 28 event_ids = (e.event_id for e in self._events 29 if e.server == server_id) 30 default = common.EventId(server=server_id, session=0, instance=0) 31 return max(event_ids, default=default)
Get last registered event id associated with server id
async def
register(self, events):
33 async def register(self, events): 34 self._events.extend(events) 35 36 if self._registered_events_cb: 37 await aio.call(self._registered_events_cb, events) 38 39 if self._flushed_events_cb: 40 await aio.call(self._flushed_events_cb, events) 41 42 return events
Register events
async def
query(self, params):
44 async def query(self, params): 45 if isinstance(params, common.QueryLatestParams): 46 return self._query_latest(params) 47 48 if isinstance(params, common.QueryTimeseriesParams): 49 return self._query_timeseries(params) 50 51 if isinstance(params, common.QueryServerParams): 52 return self._query_server(params) 53 54 raise ValueError('unsupported params type')
Query events
info =
BackendInfo(create=<class 'MemoryBackend'>, json_schema_id=None, json_schema_repo=None)