hat.event.server.common
Common event server structures and functionality
1"""Common event server structures and functionality""" 2 3import abc 4import enum 5import typing 6 7from hat import aio 8from hat import json 9from hat import util 10from hat.event.common import (EventId, 11 Event, 12 QueryData, 13 Subscription, 14 RegisterEvent) 15from hat.event.common import * # NOQA 16 17 18SourceType = enum.Enum('SourceType', [ 19 'SYNCER', 20 'EVENTER', 21 'MODULE', 22 'ENGINE']) 23 24 25class Source(typing.NamedTuple): 26 type: SourceType 27 id: int 28 29 30BackendConf = json.Data 31"""Backend configuration""" 32 33CreateBackend = aio.AsyncCallable[[BackendConf], 'Backend'] 34"""Create backend callable""" 35 36 37class Backend(aio.Resource): 38 """Backend ABC 39 40 Backend is implemented as python module which is dynamically imported. 41 It is expected that this module implements: 42 43 * json_schema_id (typing.Optional[str]): JSON schema id 44 * json_schema_repo (typing.Optional[json.SchemaRepository]): 45 JSON schema repo 46 * create (CreateBackend): create new backend instance 47 48 If module defines JSON schema repository and JSON schema id, JSON schema 49 repository will be used for additional validation of backend configuration 50 with JSON schema id. 51 52 """ 53 54 @abc.abstractmethod 55 def register_flushed_events_cb(self, 56 cb: typing.Callable[[typing.List[Event]], 57 None] 58 ) -> util.RegisterCallbackHandle: 59 """Register flushed events callback""" 60 61 @abc.abstractmethod 62 async def get_last_event_id(self, 63 server_id: int 64 ) -> EventId: 65 """Get last registered event id associated with server id""" 66 67 @abc.abstractmethod 68 async def register(self, 69 events: typing.List[Event] 70 ) -> typing.List[typing.Optional[Event]]: 71 """Register events""" 72 73 @abc.abstractmethod 74 async def query(self, 75 data: QueryData 76 ) -> typing.List[Event]: 77 """Query events""" 78 79 @abc.abstractmethod 80 async def query_flushed(self, 81 event_id: EventId 82 ) -> typing.AsyncIterable[typing.List[Event]]: 83 """Get events with the same event_id.server, and event_id.instance 84 greater than provided. Iterates over lists of Events from the 85 same session. Only permanently persisted events (flushed) are 86 returned.""" 87 88 @abc.abstractmethod 89 async def flush(self): 90 """Flush internal buffers and permanently persist events""" 91 92 93ModuleConf = json.Data 94 95CreateModule = aio.AsyncCallable[ 96 [ModuleConf, 'hat.event.engine.Engine', Source], 97 'Module'] 98 99 100class Module(aio.Resource): 101 """Module ABC 102 103 Module is implemented as python module which is dynamically imported. 104 It is expected that this module implements: 105 106 * json_schema_id (typing.Optional[str]): JSON schema id 107 * json_schema_repo (typing.Optional[json.SchemaRepository]): 108 JSON schema repo 109 * create (CreateModule): create new module instance 110 111 If module defines JSON schema repository and JSON schema id, JSON schema 112 repository will be used for additional validation of module configuration 113 with JSON schema id. 114 115 Module's `subscription` is constant during module's lifetime. 116 117 """ 118 119 @property 120 @abc.abstractmethod 121 def subscription(self) -> Subscription: 122 """Subscribed event types filter""" 123 124 async def on_session_start(self, 125 session_id: int): 126 """Called on start of a session, identified by session_id.""" 127 128 async def on_session_stop(self, 129 session_id: int): 130 """Called on stop of a session, identified by session_id.""" 131 132 @abc.abstractmethod 133 async def process(self, 134 source: Source, 135 event: Event 136 ) -> typing.AsyncIterable[RegisterEvent]: 137 """Process new session event. 138 139 Provided event is matched by modules subscription filter. 140 141 Processing of session event can result in registration of 142 new register events. 143 144 Single module session process is always called sequentially. 145 146 """
An enumeration.
Inherited Members
- enum.Enum
- name
- value
Source(type, id)
Inherited Members
- builtins.tuple
- index
- count
Backend configuration
Create backend callable
38class Backend(aio.Resource): 39 """Backend ABC 40 41 Backend is implemented as python module which is dynamically imported. 42 It is expected that this module implements: 43 44 * json_schema_id (typing.Optional[str]): JSON schema id 45 * json_schema_repo (typing.Optional[json.SchemaRepository]): 46 JSON schema repo 47 * create (CreateBackend): create new backend instance 48 49 If module defines JSON schema repository and JSON schema id, JSON schema 50 repository will be used for additional validation of backend configuration 51 with JSON schema id. 52 53 """ 54 55 @abc.abstractmethod 56 def register_flushed_events_cb(self, 57 cb: typing.Callable[[typing.List[Event]], 58 None] 59 ) -> util.RegisterCallbackHandle: 60 """Register flushed events callback""" 61 62 @abc.abstractmethod 63 async def get_last_event_id(self, 64 server_id: int 65 ) -> EventId: 66 """Get last registered event id associated with server id""" 67 68 @abc.abstractmethod 69 async def register(self, 70 events: typing.List[Event] 71 ) -> typing.List[typing.Optional[Event]]: 72 """Register events""" 73 74 @abc.abstractmethod 75 async def query(self, 76 data: QueryData 77 ) -> typing.List[Event]: 78 """Query events""" 79 80 @abc.abstractmethod 81 async def query_flushed(self, 82 event_id: EventId 83 ) -> typing.AsyncIterable[typing.List[Event]]: 84 """Get events with the same event_id.server, and event_id.instance 85 greater than provided. Iterates over lists of Events from the 86 same session. Only permanently persisted events (flushed) are 87 returned.""" 88 89 @abc.abstractmethod 90 async def flush(self): 91 """Flush internal buffers and permanently persist events"""
Backend ABC
Backend is implemented as python module which is dynamically imported. It is expected that this module implements:
- json_schema_id (typing.Optional[str]): JSON schema id
- json_schema_repo (typing.Optional[json.SchemaRepository]): JSON schema repo
- create (CreateBackend): create new backend instance
If module defines JSON schema repository and JSON schema id, JSON schema repository will be used for additional validation of backend configuration with JSON schema id.
55 @abc.abstractmethod 56 def register_flushed_events_cb(self, 57 cb: typing.Callable[[typing.List[Event]], 58 None] 59 ) -> util.RegisterCallbackHandle: 60 """Register flushed events callback"""
Register flushed events callback
62 @abc.abstractmethod 63 async def get_last_event_id(self, 64 server_id: int 65 ) -> EventId: 66 """Get last registered event id associated with server id"""
Get last registered event id associated with server id
68 @abc.abstractmethod 69 async def register(self, 70 events: typing.List[Event] 71 ) -> typing.List[typing.Optional[Event]]: 72 """Register events"""
Register events
74 @abc.abstractmethod 75 async def query(self, 76 data: QueryData 77 ) -> typing.List[Event]: 78 """Query events"""
Query events
80 @abc.abstractmethod 81 async def query_flushed(self, 82 event_id: EventId 83 ) -> typing.AsyncIterable[typing.List[Event]]: 84 """Get events with the same event_id.server, and event_id.instance 85 greater than provided. Iterates over lists of Events from the 86 same session. Only permanently persisted events (flushed) are 87 returned."""
Get events with the same event_id.server, and event_id.instance greater than provided. Iterates over lists of Events from the same session. Only permanently persisted events (flushed) are returned.
89 @abc.abstractmethod 90 async def flush(self): 91 """Flush internal buffers and permanently persist events"""
Flush internal buffers and permanently persist events
Inherited Members
- hat.aio.Resource
- async_group
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close
101class Module(aio.Resource): 102 """Module ABC 103 104 Module is implemented as python module which is dynamically imported. 105 It is expected that this module implements: 106 107 * json_schema_id (typing.Optional[str]): JSON schema id 108 * json_schema_repo (typing.Optional[json.SchemaRepository]): 109 JSON schema repo 110 * create (CreateModule): create new module instance 111 112 If module defines JSON schema repository and JSON schema id, JSON schema 113 repository will be used for additional validation of module configuration 114 with JSON schema id. 115 116 Module's `subscription` is constant during module's lifetime. 117 118 """ 119 120 @property 121 @abc.abstractmethod 122 def subscription(self) -> Subscription: 123 """Subscribed event types filter""" 124 125 async def on_session_start(self, 126 session_id: int): 127 """Called on start of a session, identified by session_id.""" 128 129 async def on_session_stop(self, 130 session_id: int): 131 """Called on stop of a session, identified by session_id.""" 132 133 @abc.abstractmethod 134 async def process(self, 135 source: Source, 136 event: Event 137 ) -> typing.AsyncIterable[RegisterEvent]: 138 """Process new session event. 139 140 Provided event is matched by modules subscription filter. 141 142 Processing of session event can result in registration of 143 new register events. 144 145 Single module session process is always called sequentially. 146 147 """
Module ABC
Module is implemented as python module which is dynamically imported.
It is expected that this module implements:
- json_schema_id (typing.Optional[str]): JSON schema id
- json_schema_repo (typing.Optional[json.SchemaRepository]): JSON schema repo
- create (CreateModule): create new module instance
If module defines JSON schema repository and JSON schema id, JSON schema repository will be used for additional validation of module configuration with JSON schema id.
Module's subscription
is constant during module's lifetime.
125 async def on_session_start(self, 126 session_id: int): 127 """Called on start of a session, identified by session_id."""
Called on start of a session, identified by session_id.
129 async def on_session_stop(self, 130 session_id: int): 131 """Called on stop of a session, identified by session_id."""
Called on stop of a session, identified by session_id.
133 @abc.abstractmethod 134 async def process(self, 135 source: Source, 136 event: Event 137 ) -> typing.AsyncIterable[RegisterEvent]: 138 """Process new session event. 139 140 Provided event is matched by modules subscription filter. 141 142 Processing of session event can result in registration of 143 new register events. 144 145 Single module session process is always called sequentially. 146 147 """
Process new session event.
Provided event is matched by modules subscription filter.
Processing of session event can result in registration of new register events.
Single module session process is always called sequentially.
Inherited Members
- hat.aio.Resource
- async_group
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close