hat.event.server.common

Common event server structures and functionality

  1"""Common event server structures and functionality"""
  2
  3from hat.event.common import *  # NOQA
  4
  5import abc
  6import enum
  7import typing
  8
  9from hat import aio
 10from hat import json
 11from hat import util
 12
 13from hat.event.common import (EventId,
 14                              Event,
 15                              QueryData,
 16                              Subscription,
 17                              RegisterEvent)
 18
 19
 20SourceType = enum.Enum('SourceType', [
 21    'SYNCER',
 22    'EVENTER',
 23    'MODULE',
 24    'ENGINE'])
 25
 26
 27class Source(typing.NamedTuple):
 28    type: SourceType
 29    id: int
 30
 31
 32EventsCb: typing.TypeAlias = typing.Callable[[list[Event]], None]
 33"""Events callback"""
 34
 35
 36class Engine(aio.Resource):
 37    """Engine ABC"""
 38
 39    @abc.abstractmethod
 40    def register_events_cb(self,
 41                           cb: EventsCb
 42                           ) -> util.RegisterCallbackHandle:
 43        """Register events callback"""
 44
 45    @abc.abstractmethod
 46    async def register(self,
 47                       source: Source,
 48                       events: list[RegisterEvent]
 49                       ) -> list[Event | None]:
 50        """Register events"""
 51
 52    @abc.abstractmethod
 53    async def query(self,
 54                    data: QueryData
 55                    ) -> list[Event]:
 56        """Query events"""
 57
 58
 59BackendConf: typing.TypeAlias = json.Data
 60"""Backend configuration"""
 61
 62CreateBackend: typing.TypeAlias = aio.AsyncCallable[[BackendConf], 'Backend']
 63"""Create backend callable"""
 64
 65
 66class Backend(aio.Resource):
 67    """Backend ABC
 68
 69    Backend is implemented as python module which is dynamically imported.
 70    It is expected that this module implements:
 71
 72    * json_schema_id (typing.Optional[str]): JSON schema id
 73    * json_schema_repo (typing.Optional[json.SchemaRepository]):
 74        JSON schema repo
 75    * create (CreateBackend): create new backend instance
 76
 77    If module defines JSON schema repository and JSON schema id, JSON schema
 78    repository will be used for additional validation of backend configuration
 79    with JSON schema id.
 80
 81    """
 82
 83    @abc.abstractmethod
 84    def register_registered_events_cb(self,
 85                                      cb: typing.Callable[[typing.List[Event]],
 86                                                          None]
 87                                      ) -> util.RegisterCallbackHandle:
 88        """Register registered events callback"""
 89
 90    @abc.abstractmethod
 91    def register_flushed_events_cb(self,
 92                                   cb: typing.Callable[[typing.List[Event]],
 93                                                       None]
 94                                   ) -> util.RegisterCallbackHandle:
 95        """Register flushed events callback"""
 96
 97    @abc.abstractmethod
 98    async def get_last_event_id(self,
 99                                server_id: int
100                                ) -> EventId:
101        """Get last registered event id associated with server id"""
102
103    @abc.abstractmethod
104    async def register(self,
105                       events: typing.List[Event]
106                       ) -> typing.List[typing.Optional[Event]]:
107        """Register events"""
108
109    @abc.abstractmethod
110    async def query(self,
111                    data: QueryData
112                    ) -> typing.List[Event]:
113        """Query events"""
114
115    @abc.abstractmethod
116    async def query_flushed(self,
117                            event_id: EventId
118                            ) -> typing.AsyncIterable[typing.List[Event]]:
119        """Get events with the same event_id.server, and event_id.instance
120        greater than provided. Iterates over lists of Events from the
121        same session. Only permanently persisted events (flushed) are
122        returned."""
123
124    @abc.abstractmethod
125    async def flush(self):
126        """Flush internal buffers and permanently persist events"""
127
128
129ModuleConf: typing.TypeAlias = json.Data
130
131CreateModule: typing.TypeAlias = aio.AsyncCallable[[ModuleConf, Engine,
132                                                    Source],
133                                                   'Module']
134
135
136class Module(aio.Resource):
137    """Module ABC
138
139    Module is implemented as python module which is dynamically imported.
140    It is expected that this module implements:
141
142        * json_schema_id (typing.Optional[str]): JSON schema id
143        * json_schema_repo (typing.Optional[json.SchemaRepository]):
144            JSON schema repo
145        * create (CreateModule): create new module instance
146
147    If module defines JSON schema repository and JSON schema id, JSON schema
148    repository will be used for additional validation of module configuration
149    with JSON schema id.
150
151    Module's `subscription` is constant during module's lifetime.
152
153    """
154
155    @property
156    @abc.abstractmethod
157    def subscription(self) -> Subscription:
158        """Subscribed event types filter"""
159
160    async def on_session_start(self,
161                               session_id: int):
162        """Called on start of a session, identified by session_id."""
163
164    async def on_session_stop(self,
165                              session_id: int):
166        """Called on stop of a session, identified by session_id."""
167
168    @abc.abstractmethod
169    async def process(self,
170                      source: Source,
171                      event: Event
172                      ) -> typing.AsyncIterable[RegisterEvent]:
173        """Process new session event.
174
175        Provided event is matched by modules subscription filter.
176
177        Processing of session event can result in registration of
178        new register events.
179
180        Single module session process is always called sequentially.
181
182        """
class SourceType(enum.Enum):

An enumeration.

SYNCER = <SourceType.SYNCER: 1>
EVENTER = <SourceType.EVENTER: 2>
MODULE = <SourceType.MODULE: 3>
ENGINE = <SourceType.ENGINE: 4>
Inherited Members
enum.Enum
name
value
class Source(typing.NamedTuple):
28class Source(typing.NamedTuple):
29    type: SourceType
30    id: int

Source(type, id)

Source(type: SourceType, id: int)

Create new instance of Source(type, id)

type: SourceType

Alias for field number 0

id: int

Alias for field number 1

Inherited Members
builtins.tuple
index
count
EventsCb: TypeAlias = typing.Callable[[list[hat.event.common.data.Event]], NoneType]

Events callback

class Engine(hat.aio.group.Resource):
37class Engine(aio.Resource):
38    """Engine ABC"""
39
40    @abc.abstractmethod
41    def register_events_cb(self,
42                           cb: EventsCb
43                           ) -> util.RegisterCallbackHandle:
44        """Register events callback"""
45
46    @abc.abstractmethod
47    async def register(self,
48                       source: Source,
49                       events: list[RegisterEvent]
50                       ) -> list[Event | None]:
51        """Register events"""
52
53    @abc.abstractmethod
54    async def query(self,
55                    data: QueryData
56                    ) -> list[Event]:
57        """Query events"""

Engine ABC

@abc.abstractmethod
def register_events_cb( self, cb: Callable[[list[hat.event.common.data.Event]], NoneType]) -> hat.util.RegisterCallbackHandle:
40    @abc.abstractmethod
41    def register_events_cb(self,
42                           cb: EventsCb
43                           ) -> util.RegisterCallbackHandle:
44        """Register events callback"""

Register events callback

@abc.abstractmethod
async def register( self, source: Source, events: list[hat.event.common.data.RegisterEvent]) -> list[hat.event.common.data.Event | None]:
46    @abc.abstractmethod
47    async def register(self,
48                       source: Source,
49                       events: list[RegisterEvent]
50                       ) -> list[Event | None]:
51        """Register events"""

Register events

@abc.abstractmethod
async def query( self, data: hat.event.common.data.QueryData) -> list[hat.event.common.data.Event]:
53    @abc.abstractmethod
54    async def query(self,
55                    data: QueryData
56                    ) -> list[Event]:
57        """Query events"""

Query events

Inherited Members
hat.aio.group.Resource
async_group
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
BackendConf: TypeAlias = None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]

Backend configuration

CreateBackend: TypeAlias = typing.Callable[[None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]], typing.Union[ForwardRef('Backend'), typing.Awaitable[ForwardRef('Backend')]]]

Create backend callable

class Backend(hat.aio.group.Resource):
 67class Backend(aio.Resource):
 68    """Backend ABC
 69
 70    Backend is implemented as python module which is dynamically imported.
 71    It is expected that this module implements:
 72
 73    * json_schema_id (typing.Optional[str]): JSON schema id
 74    * json_schema_repo (typing.Optional[json.SchemaRepository]):
 75        JSON schema repo
 76    * create (CreateBackend): create new backend instance
 77
 78    If module defines JSON schema repository and JSON schema id, JSON schema
 79    repository will be used for additional validation of backend configuration
 80    with JSON schema id.
 81
 82    """
 83
 84    @abc.abstractmethod
 85    def register_registered_events_cb(self,
 86                                      cb: typing.Callable[[typing.List[Event]],
 87                                                          None]
 88                                      ) -> util.RegisterCallbackHandle:
 89        """Register registered events callback"""
 90
 91    @abc.abstractmethod
 92    def register_flushed_events_cb(self,
 93                                   cb: typing.Callable[[typing.List[Event]],
 94                                                       None]
 95                                   ) -> util.RegisterCallbackHandle:
 96        """Register flushed events callback"""
 97
 98    @abc.abstractmethod
 99    async def get_last_event_id(self,
100                                server_id: int
101                                ) -> EventId:
102        """Get last registered event id associated with server id"""
103
104    @abc.abstractmethod
105    async def register(self,
106                       events: typing.List[Event]
107                       ) -> typing.List[typing.Optional[Event]]:
108        """Register events"""
109
110    @abc.abstractmethod
111    async def query(self,
112                    data: QueryData
113                    ) -> typing.List[Event]:
114        """Query events"""
115
116    @abc.abstractmethod
117    async def query_flushed(self,
118                            event_id: EventId
119                            ) -> typing.AsyncIterable[typing.List[Event]]:
120        """Get events with the same event_id.server, and event_id.instance
121        greater than provided. Iterates over lists of Events from the
122        same session. Only permanently persisted events (flushed) are
123        returned."""
124
125    @abc.abstractmethod
126    async def flush(self):
127        """Flush internal buffers and permanently persist events"""

Backend ABC

Backend is implemented as python module which is dynamically imported. It is expected that this module implements:

  • json_schema_id (typing.Optional[str]): JSON schema id
  • json_schema_repo (typing.Optional[json.SchemaRepository]): JSON schema repo
  • create (CreateBackend): create new backend instance

If module defines JSON schema repository and JSON schema id, JSON schema repository will be used for additional validation of backend configuration with JSON schema id.

@abc.abstractmethod
def register_registered_events_cb( self, cb: Callable[[List[hat.event.common.data.Event]], NoneType]) -> hat.util.RegisterCallbackHandle:
84    @abc.abstractmethod
85    def register_registered_events_cb(self,
86                                      cb: typing.Callable[[typing.List[Event]],
87                                                          None]
88                                      ) -> util.RegisterCallbackHandle:
89        """Register registered events callback"""

Register registered events callback

@abc.abstractmethod
def register_flushed_events_cb( self, cb: Callable[[List[hat.event.common.data.Event]], NoneType]) -> hat.util.RegisterCallbackHandle:
91    @abc.abstractmethod
92    def register_flushed_events_cb(self,
93                                   cb: typing.Callable[[typing.List[Event]],
94                                                       None]
95                                   ) -> util.RegisterCallbackHandle:
96        """Register flushed events callback"""

Register flushed events callback

@abc.abstractmethod
async def get_last_event_id(self, server_id: int) -> hat.event.common.data.EventId:
 98    @abc.abstractmethod
 99    async def get_last_event_id(self,
100                                server_id: int
101                                ) -> EventId:
102        """Get last registered event id associated with server id"""

Get last registered event id associated with server id

@abc.abstractmethod
async def register( self, events: List[hat.event.common.data.Event]) -> List[Optional[hat.event.common.data.Event]]:
104    @abc.abstractmethod
105    async def register(self,
106                       events: typing.List[Event]
107                       ) -> typing.List[typing.Optional[Event]]:
108        """Register events"""

Register events

@abc.abstractmethod
async def query( self, data: hat.event.common.data.QueryData) -> List[hat.event.common.data.Event]:
110    @abc.abstractmethod
111    async def query(self,
112                    data: QueryData
113                    ) -> typing.List[Event]:
114        """Query events"""

Query events

@abc.abstractmethod
async def query_flushed( self, event_id: hat.event.common.data.EventId) -> AsyncIterable[List[hat.event.common.data.Event]]:
116    @abc.abstractmethod
117    async def query_flushed(self,
118                            event_id: EventId
119                            ) -> typing.AsyncIterable[typing.List[Event]]:
120        """Get events with the same event_id.server, and event_id.instance
121        greater than provided. Iterates over lists of Events from the
122        same session. Only permanently persisted events (flushed) are
123        returned."""

Get events with the same event_id.server, and event_id.instance greater than provided. Iterates over lists of Events from the same session. Only permanently persisted events (flushed) are returned.

@abc.abstractmethod
async def flush(self):
125    @abc.abstractmethod
126    async def flush(self):
127        """Flush internal buffers and permanently persist events"""

Flush internal buffers and permanently persist events

Inherited Members
hat.aio.group.Resource
async_group
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
ModuleConf: TypeAlias = None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]
CreateModule: TypeAlias = typing.Callable[[None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], Engine, Source], typing.Union[ForwardRef('Module'), typing.Awaitable[ForwardRef('Module')]]]
class Module(hat.aio.group.Resource):
137class Module(aio.Resource):
138    """Module ABC
139
140    Module is implemented as python module which is dynamically imported.
141    It is expected that this module implements:
142
143        * json_schema_id (typing.Optional[str]): JSON schema id
144        * json_schema_repo (typing.Optional[json.SchemaRepository]):
145            JSON schema repo
146        * create (CreateModule): create new module instance
147
148    If module defines JSON schema repository and JSON schema id, JSON schema
149    repository will be used for additional validation of module configuration
150    with JSON schema id.
151
152    Module's `subscription` is constant during module's lifetime.
153
154    """
155
156    @property
157    @abc.abstractmethod
158    def subscription(self) -> Subscription:
159        """Subscribed event types filter"""
160
161    async def on_session_start(self,
162                               session_id: int):
163        """Called on start of a session, identified by session_id."""
164
165    async def on_session_stop(self,
166                              session_id: int):
167        """Called on stop of a session, identified by session_id."""
168
169    @abc.abstractmethod
170    async def process(self,
171                      source: Source,
172                      event: Event
173                      ) -> typing.AsyncIterable[RegisterEvent]:
174        """Process new session event.
175
176        Provided event is matched by modules subscription filter.
177
178        Processing of session event can result in registration of
179        new register events.
180
181        Single module session process is always called sequentially.
182
183        """

Module ABC

Module is implemented as python module which is dynamically imported.

It is expected that this module implements:
  • json_schema_id (typing.Optional[str]): JSON schema id
  • json_schema_repo (typing.Optional[json.SchemaRepository]): JSON schema repo
  • create (CreateModule): create new module instance

If module defines JSON schema repository and JSON schema id, JSON schema repository will be used for additional validation of module configuration with JSON schema id.

Module's subscription is constant during module's lifetime.

subscription: hat.event.common.subscription.csubscription.CSubscription

Subscribed event types filter

async def on_session_start(self, session_id: int):
161    async def on_session_start(self,
162                               session_id: int):
163        """Called on start of a session, identified by session_id."""

Called on start of a session, identified by session_id.

async def on_session_stop(self, session_id: int):
165    async def on_session_stop(self,
166                              session_id: int):
167        """Called on stop of a session, identified by session_id."""

Called on stop of a session, identified by session_id.

@abc.abstractmethod
async def process( self, source: Source, event: hat.event.common.data.Event) -> AsyncIterable[hat.event.common.data.RegisterEvent]:
169    @abc.abstractmethod
170    async def process(self,
171                      source: Source,
172                      event: Event
173                      ) -> typing.AsyncIterable[RegisterEvent]:
174        """Process new session event.
175
176        Provided event is matched by modules subscription filter.
177
178        Processing of session event can result in registration of
179        new register events.
180
181        Single module session process is always called sequentially.
182
183        """

Process new session event.

Provided event is matched by modules subscription filter.

Processing of session event can result in registration of new register events.

Single module session process is always called sequentially.

Inherited Members
hat.aio.group.Resource
async_group
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close