hat.event.backends.dummy

Dummy backend

Simple backend which returns constant values where:

Registered and flushed events callback is called on every register.

 1"""Dummy backend
 2
 3Simple backend which returns constant values where:
 4
 5    * `DummyBackend.get_last_event_id` returns session and instance ``0``
 6    * `DummyBackend.register` returns input arguments
 7    * `DummyBackend.query` returns ``QueryResult([], False)``
 8
 9Registered and flushed events callback is called on every register.
10
11"""
12
13from hat import aio
14
15from hat.event import common
16
17
18class DummyBackend(common.Backend):
19
20    def __init__(self, conf, registered_events_cb, flushed_events_cb):
21        self._registered_events_cbs = registered_events_cb
22        self._flushed_events_cbs = flushed_events_cb
23        self._async_group = aio.Group()
24
25    @property
26    def async_group(self):
27        return self._async_group
28
29    async def get_last_event_id(self, server_id):
30        return common.EventId(server_id, 0, 0)
31
32    async def register(self, events):
33        if self._registered_events_cbs:
34            await aio.call(self._registered_events_cbs, events)
35
36        if self._flushed_events_cbs:
37            await aio.call(self._flushed_events_cbs, events)
38
39        return events
40
41    async def query(self, params):
42        return common.QueryResult(events=[],
43                                  more_follows=False)
44
45    async def flush(self):
46        pass
47
48
49info = common.BackendInfo(DummyBackend)
class DummyBackend(hat.event.common.backend.Backend):
19class DummyBackend(common.Backend):
20
21    def __init__(self, conf, registered_events_cb, flushed_events_cb):
22        self._registered_events_cbs = registered_events_cb
23        self._flushed_events_cbs = flushed_events_cb
24        self._async_group = aio.Group()
25
26    @property
27    def async_group(self):
28        return self._async_group
29
30    async def get_last_event_id(self, server_id):
31        return common.EventId(server_id, 0, 0)
32
33    async def register(self, events):
34        if self._registered_events_cbs:
35            await aio.call(self._registered_events_cbs, events)
36
37        if self._flushed_events_cbs:
38            await aio.call(self._flushed_events_cbs, events)
39
40        return events
41
42    async def query(self, params):
43        return common.QueryResult(events=[],
44                                  more_follows=False)
45
46    async def flush(self):
47        pass

Backend ABC

DummyBackend(conf, registered_events_cb, flushed_events_cb)
21    def __init__(self, conf, registered_events_cb, flushed_events_cb):
22        self._registered_events_cbs = registered_events_cb
23        self._flushed_events_cbs = flushed_events_cb
24        self._async_group = aio.Group()
async_group
26    @property
27    def async_group(self):
28        return self._async_group

Group controlling resource's lifetime.

async def get_last_event_id(self, server_id):
30    async def get_last_event_id(self, server_id):
31        return common.EventId(server_id, 0, 0)

Get last registered event id associated with server id

async def register(self, events):
33    async def register(self, events):
34        if self._registered_events_cbs:
35            await aio.call(self._registered_events_cbs, events)
36
37        if self._flushed_events_cbs:
38            await aio.call(self._flushed_events_cbs, events)
39
40        return events

Register events

async def query(self, params):
42    async def query(self, params):
43        return common.QueryResult(events=[],
44                                  more_follows=False)

Query events

async def flush(self):
46    async def flush(self):
47        pass

Flush internal buffers and permanently persist events

info = BackendInfo(create=<class 'DummyBackend'>, json_schema_id=None, json_schema_repo=None)