hat.event.backends.dummy
Dummy backend
Simple backend which returns constant values where:
DummyBackend.get_last_event_id
returns session and instance0
DummyBackend.register
returns input argumentsDummyBackend.query
returnsQueryResult([], False)
Registered and flushed events callback is called on every register.
1"""Dummy backend 2 3Simple backend which returns constant values where: 4 5 * `DummyBackend.get_last_event_id` returns session and instance ``0`` 6 * `DummyBackend.register` returns input arguments 7 * `DummyBackend.query` returns ``QueryResult([], False)`` 8 9Registered and flushed events callback is called on every register. 10 11""" 12 13from hat import aio 14 15from hat.event import common 16 17 18class DummyBackend(common.Backend): 19 20 def __init__(self, conf, registered_events_cb, flushed_events_cb): 21 self._registered_events_cbs = registered_events_cb 22 self._flushed_events_cbs = flushed_events_cb 23 self._async_group = aio.Group() 24 25 @property 26 def async_group(self): 27 return self._async_group 28 29 async def get_last_event_id(self, server_id): 30 return common.EventId(server_id, 0, 0) 31 32 async def register(self, events): 33 if self._registered_events_cbs: 34 await aio.call(self._registered_events_cbs, events) 35 36 if self._flushed_events_cbs: 37 await aio.call(self._flushed_events_cbs, events) 38 39 return events 40 41 async def query(self, params): 42 return common.QueryResult(events=[], 43 more_follows=False) 44 45 async def flush(self): 46 pass 47 48 49info = common.BackendInfo(DummyBackend)
class
DummyBackend(hat.event.common.backend.Backend):
19class DummyBackend(common.Backend): 20 21 def __init__(self, conf, registered_events_cb, flushed_events_cb): 22 self._registered_events_cbs = registered_events_cb 23 self._flushed_events_cbs = flushed_events_cb 24 self._async_group = aio.Group() 25 26 @property 27 def async_group(self): 28 return self._async_group 29 30 async def get_last_event_id(self, server_id): 31 return common.EventId(server_id, 0, 0) 32 33 async def register(self, events): 34 if self._registered_events_cbs: 35 await aio.call(self._registered_events_cbs, events) 36 37 if self._flushed_events_cbs: 38 await aio.call(self._flushed_events_cbs, events) 39 40 return events 41 42 async def query(self, params): 43 return common.QueryResult(events=[], 44 more_follows=False) 45 46 async def flush(self): 47 pass
Backend ABC
async def
get_last_event_id(self, server_id):
Get last registered event id associated with server id
async def
register(self, events):
33 async def register(self, events): 34 if self._registered_events_cbs: 35 await aio.call(self._registered_events_cbs, events) 36 37 if self._flushed_events_cbs: 38 await aio.call(self._flushed_events_cbs, events) 39 40 return events
Register events
info =
BackendInfo(create=<class 'DummyBackend'>, json_schema_id=None, json_schema_repo=None)