hat.event.server.common
Common event server structures and functionality
1"""Common event server structures and functionality""" 2 3from hat.event.common import * # NOQA 4 5import abc 6import enum 7import typing 8 9from hat import aio 10from hat import json 11from hat import util 12 13from hat.event.common import (EventId, 14 Event, 15 QueryData, 16 Subscription, 17 RegisterEvent) 18 19 20SourceType = enum.Enum('SourceType', [ 21 'SYNCER', 22 'EVENTER', 23 'MODULE', 24 'ENGINE']) 25 26 27class Source(typing.NamedTuple): 28 type: SourceType 29 id: int 30 31 32EventsCb: typing.TypeAlias = typing.Callable[[list[Event]], None] 33"""Events callback""" 34 35 36class Engine(aio.Resource): 37 """Engine ABC""" 38 39 @abc.abstractmethod 40 def register_events_cb(self, 41 cb: EventsCb 42 ) -> util.RegisterCallbackHandle: 43 """Register events callback""" 44 45 @abc.abstractmethod 46 async def register(self, 47 source: Source, 48 events: list[RegisterEvent] 49 ) -> list[Event | None]: 50 """Register events""" 51 52 @abc.abstractmethod 53 async def query(self, 54 data: QueryData 55 ) -> list[Event]: 56 """Query events""" 57 58 59BackendConf: typing.TypeAlias = json.Data 60"""Backend configuration""" 61 62CreateBackend: typing.TypeAlias = aio.AsyncCallable[[BackendConf], 'Backend'] 63"""Create backend callable""" 64 65 66class Backend(aio.Resource): 67 """Backend ABC 68 69 Backend is implemented as python module which is dynamically imported. 70 It is expected that this module implements: 71 72 * json_schema_id (typing.Optional[str]): JSON schema id 73 * json_schema_repo (typing.Optional[json.SchemaRepository]): 74 JSON schema repo 75 * create (CreateBackend): create new backend instance 76 77 If module defines JSON schema repository and JSON schema id, JSON schema 78 repository will be used for additional validation of backend configuration 79 with JSON schema id. 80 81 """ 82 83 @abc.abstractmethod 84 def register_registered_events_cb(self, 85 cb: typing.Callable[[typing.List[Event]], 86 None] 87 ) -> util.RegisterCallbackHandle: 88 """Register registered events callback""" 89 90 @abc.abstractmethod 91 def register_flushed_events_cb(self, 92 cb: typing.Callable[[typing.List[Event]], 93 None] 94 ) -> util.RegisterCallbackHandle: 95 """Register flushed events callback""" 96 97 @abc.abstractmethod 98 async def get_last_event_id(self, 99 server_id: int 100 ) -> EventId: 101 """Get last registered event id associated with server id""" 102 103 @abc.abstractmethod 104 async def register(self, 105 events: typing.List[Event] 106 ) -> typing.List[typing.Optional[Event]]: 107 """Register events""" 108 109 @abc.abstractmethod 110 async def query(self, 111 data: QueryData 112 ) -> typing.List[Event]: 113 """Query events""" 114 115 @abc.abstractmethod 116 async def query_flushed(self, 117 event_id: EventId 118 ) -> typing.AsyncIterable[typing.List[Event]]: 119 """Get events with the same event_id.server, and event_id.instance 120 greater than provided. Iterates over lists of Events from the 121 same session. Only permanently persisted events (flushed) are 122 returned.""" 123 124 @abc.abstractmethod 125 async def flush(self): 126 """Flush internal buffers and permanently persist events""" 127 128 129ModuleConf: typing.TypeAlias = json.Data 130 131CreateModule: typing.TypeAlias = aio.AsyncCallable[[ModuleConf, Engine, 132 Source], 133 'Module'] 134 135 136class Module(aio.Resource): 137 """Module ABC 138 139 Module is implemented as python module which is dynamically imported. 140 It is expected that this module implements: 141 142 * json_schema_id (typing.Optional[str]): JSON schema id 143 * json_schema_repo (typing.Optional[json.SchemaRepository]): 144 JSON schema repo 145 * create (CreateModule): create new module instance 146 147 If module defines JSON schema repository and JSON schema id, JSON schema 148 repository will be used for additional validation of module configuration 149 with JSON schema id. 150 151 Module's `subscription` is constant during module's lifetime. 152 153 """ 154 155 @property 156 @abc.abstractmethod 157 def subscription(self) -> Subscription: 158 """Subscribed event types filter""" 159 160 async def on_session_start(self, 161 session_id: int): 162 """Called on start of a session, identified by session_id.""" 163 164 async def on_session_stop(self, 165 session_id: int): 166 """Called on stop of a session, identified by session_id.""" 167 168 @abc.abstractmethod 169 async def process(self, 170 source: Source, 171 event: Event 172 ) -> typing.AsyncIterable[RegisterEvent]: 173 """Process new session event. 174 175 Provided event is matched by modules subscription filter. 176 177 Processing of session event can result in registration of 178 new register events. 179 180 Single module session process is always called sequentially. 181 182 """
An enumeration.
Inherited Members
- enum.Enum
- name
- value
Source(type, id)
Inherited Members
- builtins.tuple
- index
- count
Events callback
37class Engine(aio.Resource): 38 """Engine ABC""" 39 40 @abc.abstractmethod 41 def register_events_cb(self, 42 cb: EventsCb 43 ) -> util.RegisterCallbackHandle: 44 """Register events callback""" 45 46 @abc.abstractmethod 47 async def register(self, 48 source: Source, 49 events: list[RegisterEvent] 50 ) -> list[Event | None]: 51 """Register events""" 52 53 @abc.abstractmethod 54 async def query(self, 55 data: QueryData 56 ) -> list[Event]: 57 """Query events"""
Engine ABC
40 @abc.abstractmethod 41 def register_events_cb(self, 42 cb: EventsCb 43 ) -> util.RegisterCallbackHandle: 44 """Register events callback"""
Register events callback
46 @abc.abstractmethod 47 async def register(self, 48 source: Source, 49 events: list[RegisterEvent] 50 ) -> list[Event | None]: 51 """Register events"""
Register events
53 @abc.abstractmethod 54 async def query(self, 55 data: QueryData 56 ) -> list[Event]: 57 """Query events"""
Query events
Inherited Members
- hat.aio.group.Resource
- async_group
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close
Backend configuration
Create backend callable
67class Backend(aio.Resource): 68 """Backend ABC 69 70 Backend is implemented as python module which is dynamically imported. 71 It is expected that this module implements: 72 73 * json_schema_id (typing.Optional[str]): JSON schema id 74 * json_schema_repo (typing.Optional[json.SchemaRepository]): 75 JSON schema repo 76 * create (CreateBackend): create new backend instance 77 78 If module defines JSON schema repository and JSON schema id, JSON schema 79 repository will be used for additional validation of backend configuration 80 with JSON schema id. 81 82 """ 83 84 @abc.abstractmethod 85 def register_registered_events_cb(self, 86 cb: typing.Callable[[typing.List[Event]], 87 None] 88 ) -> util.RegisterCallbackHandle: 89 """Register registered events callback""" 90 91 @abc.abstractmethod 92 def register_flushed_events_cb(self, 93 cb: typing.Callable[[typing.List[Event]], 94 None] 95 ) -> util.RegisterCallbackHandle: 96 """Register flushed events callback""" 97 98 @abc.abstractmethod 99 async def get_last_event_id(self, 100 server_id: int 101 ) -> EventId: 102 """Get last registered event id associated with server id""" 103 104 @abc.abstractmethod 105 async def register(self, 106 events: typing.List[Event] 107 ) -> typing.List[typing.Optional[Event]]: 108 """Register events""" 109 110 @abc.abstractmethod 111 async def query(self, 112 data: QueryData 113 ) -> typing.List[Event]: 114 """Query events""" 115 116 @abc.abstractmethod 117 async def query_flushed(self, 118 event_id: EventId 119 ) -> typing.AsyncIterable[typing.List[Event]]: 120 """Get events with the same event_id.server, and event_id.instance 121 greater than provided. Iterates over lists of Events from the 122 same session. Only permanently persisted events (flushed) are 123 returned.""" 124 125 @abc.abstractmethod 126 async def flush(self): 127 """Flush internal buffers and permanently persist events"""
Backend ABC
Backend is implemented as python module which is dynamically imported. It is expected that this module implements:
- json_schema_id (typing.Optional[str]): JSON schema id
- json_schema_repo (typing.Optional[json.SchemaRepository]): JSON schema repo
- create (CreateBackend): create new backend instance
If module defines JSON schema repository and JSON schema id, JSON schema repository will be used for additional validation of backend configuration with JSON schema id.
84 @abc.abstractmethod 85 def register_registered_events_cb(self, 86 cb: typing.Callable[[typing.List[Event]], 87 None] 88 ) -> util.RegisterCallbackHandle: 89 """Register registered events callback"""
Register registered events callback
91 @abc.abstractmethod 92 def register_flushed_events_cb(self, 93 cb: typing.Callable[[typing.List[Event]], 94 None] 95 ) -> util.RegisterCallbackHandle: 96 """Register flushed events callback"""
Register flushed events callback
98 @abc.abstractmethod 99 async def get_last_event_id(self, 100 server_id: int 101 ) -> EventId: 102 """Get last registered event id associated with server id"""
Get last registered event id associated with server id
104 @abc.abstractmethod 105 async def register(self, 106 events: typing.List[Event] 107 ) -> typing.List[typing.Optional[Event]]: 108 """Register events"""
Register events
110 @abc.abstractmethod 111 async def query(self, 112 data: QueryData 113 ) -> typing.List[Event]: 114 """Query events"""
Query events
116 @abc.abstractmethod 117 async def query_flushed(self, 118 event_id: EventId 119 ) -> typing.AsyncIterable[typing.List[Event]]: 120 """Get events with the same event_id.server, and event_id.instance 121 greater than provided. Iterates over lists of Events from the 122 same session. Only permanently persisted events (flushed) are 123 returned."""
Get events with the same event_id.server, and event_id.instance greater than provided. Iterates over lists of Events from the same session. Only permanently persisted events (flushed) are returned.
125 @abc.abstractmethod 126 async def flush(self): 127 """Flush internal buffers and permanently persist events"""
Flush internal buffers and permanently persist events
Inherited Members
- hat.aio.group.Resource
- async_group
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close
137class Module(aio.Resource): 138 """Module ABC 139 140 Module is implemented as python module which is dynamically imported. 141 It is expected that this module implements: 142 143 * json_schema_id (typing.Optional[str]): JSON schema id 144 * json_schema_repo (typing.Optional[json.SchemaRepository]): 145 JSON schema repo 146 * create (CreateModule): create new module instance 147 148 If module defines JSON schema repository and JSON schema id, JSON schema 149 repository will be used for additional validation of module configuration 150 with JSON schema id. 151 152 Module's `subscription` is constant during module's lifetime. 153 154 """ 155 156 @property 157 @abc.abstractmethod 158 def subscription(self) -> Subscription: 159 """Subscribed event types filter""" 160 161 async def on_session_start(self, 162 session_id: int): 163 """Called on start of a session, identified by session_id.""" 164 165 async def on_session_stop(self, 166 session_id: int): 167 """Called on stop of a session, identified by session_id.""" 168 169 @abc.abstractmethod 170 async def process(self, 171 source: Source, 172 event: Event 173 ) -> typing.AsyncIterable[RegisterEvent]: 174 """Process new session event. 175 176 Provided event is matched by modules subscription filter. 177 178 Processing of session event can result in registration of 179 new register events. 180 181 Single module session process is always called sequentially. 182 183 """
Module ABC
Module is implemented as python module which is dynamically imported.
It is expected that this module implements:
- json_schema_id (typing.Optional[str]): JSON schema id
- json_schema_repo (typing.Optional[json.SchemaRepository]): JSON schema repo
- create (CreateModule): create new module instance
If module defines JSON schema repository and JSON schema id, JSON schema repository will be used for additional validation of module configuration with JSON schema id.
Module's subscription
is constant during module's lifetime.
Subscribed event types filter
161 async def on_session_start(self, 162 session_id: int): 163 """Called on start of a session, identified by session_id."""
Called on start of a session, identified by session_id.
165 async def on_session_stop(self, 166 session_id: int): 167 """Called on stop of a session, identified by session_id."""
Called on stop of a session, identified by session_id.
169 @abc.abstractmethod 170 async def process(self, 171 source: Source, 172 event: Event 173 ) -> typing.AsyncIterable[RegisterEvent]: 174 """Process new session event. 175 176 Provided event is matched by modules subscription filter. 177 178 Processing of session event can result in registration of 179 new register events. 180 181 Single module session process is always called sequentially. 182 183 """
Process new session event.
Provided event is matched by modules subscription filter.
Processing of session event can result in registration of new register events.
Single module session process is always called sequentially.
Inherited Members
- hat.aio.group.Resource
- async_group
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close