hat.event.server.common

Common event server structures and functionality

  1"""Common event server structures and functionality"""
  2
  3import abc
  4import enum
  5import typing
  6
  7from hat import aio
  8from hat import json
  9from hat import util
 10from hat.event.common import (EventId,
 11                              Event,
 12                              QueryData,
 13                              Subscription,
 14                              RegisterEvent)
 15from hat.event.common import *  # NOQA
 16
 17
 18SourceType = enum.Enum('SourceType', [
 19    'SYNCER',
 20    'EVENTER',
 21    'MODULE',
 22    'ENGINE'])
 23
 24
 25class Source(typing.NamedTuple):
 26    type: SourceType
 27    id: int
 28
 29
 30BackendConf = json.Data
 31"""Backend configuration"""
 32
 33CreateBackend = aio.AsyncCallable[[BackendConf], 'Backend']
 34"""Create backend callable"""
 35
 36
 37class Backend(aio.Resource):
 38    """Backend ABC
 39
 40    Backend is implemented as python module which is dynamically imported.
 41    It is expected that this module implements:
 42
 43    * json_schema_id (typing.Optional[str]): JSON schema id
 44    * json_schema_repo (typing.Optional[json.SchemaRepository]):
 45        JSON schema repo
 46    * create (CreateBackend): create new backend instance
 47
 48    If module defines JSON schema repository and JSON schema id, JSON schema
 49    repository will be used for additional validation of backend configuration
 50    with JSON schema id.
 51
 52    """
 53
 54    @abc.abstractmethod
 55    def register_flushed_events_cb(self,
 56                                   cb: typing.Callable[[typing.List[Event]],
 57                                                       None]
 58                                   ) -> util.RegisterCallbackHandle:
 59        """Register flushed events callback"""
 60
 61    @abc.abstractmethod
 62    async def get_last_event_id(self,
 63                                server_id: int
 64                                ) -> EventId:
 65        """Get last registered event id associated with server id"""
 66
 67    @abc.abstractmethod
 68    async def register(self,
 69                       events: typing.List[Event]
 70                       ) -> typing.List[typing.Optional[Event]]:
 71        """Register events"""
 72
 73    @abc.abstractmethod
 74    async def query(self,
 75                    data: QueryData
 76                    ) -> typing.List[Event]:
 77        """Query events"""
 78
 79    @abc.abstractmethod
 80    async def query_flushed(self,
 81                            event_id: EventId
 82                            ) -> typing.AsyncIterable[typing.List[Event]]:
 83        """Get events with the same event_id.server, and event_id.instance
 84        greater than provided. Iterates over lists of Events from the
 85        same session. Only permanently persisted events (flushed) are
 86        returned."""
 87
 88    @abc.abstractmethod
 89    async def flush(self):
 90        """Flush internal buffers and permanently persist events"""
 91
 92
 93ModuleConf = json.Data
 94
 95CreateModule = aio.AsyncCallable[
 96    [ModuleConf, 'hat.event.engine.Engine', Source],
 97    'Module']
 98
 99
100class Module(aio.Resource):
101    """Module ABC
102
103    Module is implemented as python module which is dynamically imported.
104    It is expected that this module implements:
105
106        * json_schema_id (typing.Optional[str]): JSON schema id
107        * json_schema_repo (typing.Optional[json.SchemaRepository]):
108            JSON schema repo
109        * create (CreateModule): create new module instance
110
111    If module defines JSON schema repository and JSON schema id, JSON schema
112    repository will be used for additional validation of module configuration
113    with JSON schema id.
114
115    Module's `subscription` is constant during module's lifetime.
116
117    """
118
119    @property
120    @abc.abstractmethod
121    def subscription(self) -> Subscription:
122        """Subscribed event types filter"""
123
124    async def on_session_start(self,
125                               session_id: int):
126        """Called on start of a session, identified by session_id."""
127
128    async def on_session_stop(self,
129                              session_id: int):
130        """Called on stop of a session, identified by session_id."""
131
132    @abc.abstractmethod
133    async def process(self,
134                      source: Source,
135                      event: Event
136                      ) -> typing.AsyncIterable[RegisterEvent]:
137        """Process new session event.
138
139        Provided event is matched by modules subscription filter.
140
141        Processing of session event can result in registration of
142        new register events.
143
144        Single module session process is always called sequentially.
145
146        """
class SourceType(enum.Enum):

An enumeration.

SYNCER = <SourceType.SYNCER: 1>
EVENTER = <SourceType.EVENTER: 2>
MODULE = <SourceType.MODULE: 3>
ENGINE = <SourceType.ENGINE: 4>
Inherited Members
enum.Enum
name
value
class Source(typing.NamedTuple):
26class Source(typing.NamedTuple):
27    type: SourceType
28    id: int

Source(type, id)

Source(type: hat.event.server.common.SourceType, id: int)

Create new instance of Source(type, id)

Alias for field number 0

id: int

Alias for field number 1

Inherited Members
builtins.tuple
index
count
BackendConf = ~Data

Backend configuration

CreateBackend = typing.Callable[[~Data], typing.Union[ForwardRef('Backend'), typing.Awaitable[ForwardRef('Backend')]]]

Create backend callable

class Backend(hat.aio.Resource):
38class Backend(aio.Resource):
39    """Backend ABC
40
41    Backend is implemented as python module which is dynamically imported.
42    It is expected that this module implements:
43
44    * json_schema_id (typing.Optional[str]): JSON schema id
45    * json_schema_repo (typing.Optional[json.SchemaRepository]):
46        JSON schema repo
47    * create (CreateBackend): create new backend instance
48
49    If module defines JSON schema repository and JSON schema id, JSON schema
50    repository will be used for additional validation of backend configuration
51    with JSON schema id.
52
53    """
54
55    @abc.abstractmethod
56    def register_flushed_events_cb(self,
57                                   cb: typing.Callable[[typing.List[Event]],
58                                                       None]
59                                   ) -> util.RegisterCallbackHandle:
60        """Register flushed events callback"""
61
62    @abc.abstractmethod
63    async def get_last_event_id(self,
64                                server_id: int
65                                ) -> EventId:
66        """Get last registered event id associated with server id"""
67
68    @abc.abstractmethod
69    async def register(self,
70                       events: typing.List[Event]
71                       ) -> typing.List[typing.Optional[Event]]:
72        """Register events"""
73
74    @abc.abstractmethod
75    async def query(self,
76                    data: QueryData
77                    ) -> typing.List[Event]:
78        """Query events"""
79
80    @abc.abstractmethod
81    async def query_flushed(self,
82                            event_id: EventId
83                            ) -> typing.AsyncIterable[typing.List[Event]]:
84        """Get events with the same event_id.server, and event_id.instance
85        greater than provided. Iterates over lists of Events from the
86        same session. Only permanently persisted events (flushed) are
87        returned."""
88
89    @abc.abstractmethod
90    async def flush(self):
91        """Flush internal buffers and permanently persist events"""

Backend ABC

Backend is implemented as python module which is dynamically imported. It is expected that this module implements:

  • json_schema_id (typing.Optional[str]): JSON schema id
  • json_schema_repo (typing.Optional[json.SchemaRepository]): JSON schema repo
  • create (CreateBackend): create new backend instance

If module defines JSON schema repository and JSON schema id, JSON schema repository will be used for additional validation of backend configuration with JSON schema id.

@abc.abstractmethod
def register_flushed_events_cb( self, cb: Callable[[List[hat.event.common.data.Event]], NoneType]) -> hat.util.RegisterCallbackHandle:
55    @abc.abstractmethod
56    def register_flushed_events_cb(self,
57                                   cb: typing.Callable[[typing.List[Event]],
58                                                       None]
59                                   ) -> util.RegisterCallbackHandle:
60        """Register flushed events callback"""

Register flushed events callback

@abc.abstractmethod
async def get_last_event_id(self, server_id: int) -> hat.event.common.data.EventId:
62    @abc.abstractmethod
63    async def get_last_event_id(self,
64                                server_id: int
65                                ) -> EventId:
66        """Get last registered event id associated with server id"""

Get last registered event id associated with server id

@abc.abstractmethod
async def register( self, events: List[hat.event.common.data.Event]) -> List[Optional[hat.event.common.data.Event]]:
68    @abc.abstractmethod
69    async def register(self,
70                       events: typing.List[Event]
71                       ) -> typing.List[typing.Optional[Event]]:
72        """Register events"""

Register events

@abc.abstractmethod
async def query( self, data: hat.event.common.data.QueryData) -> List[hat.event.common.data.Event]:
74    @abc.abstractmethod
75    async def query(self,
76                    data: QueryData
77                    ) -> typing.List[Event]:
78        """Query events"""

Query events

@abc.abstractmethod
async def query_flushed( self, event_id: hat.event.common.data.EventId) -> AsyncIterable[List[hat.event.common.data.Event]]:
80    @abc.abstractmethod
81    async def query_flushed(self,
82                            event_id: EventId
83                            ) -> typing.AsyncIterable[typing.List[Event]]:
84        """Get events with the same event_id.server, and event_id.instance
85        greater than provided. Iterates over lists of Events from the
86        same session. Only permanently persisted events (flushed) are
87        returned."""

Get events with the same event_id.server, and event_id.instance greater than provided. Iterates over lists of Events from the same session. Only permanently persisted events (flushed) are returned.

@abc.abstractmethod
async def flush(self):
89    @abc.abstractmethod
90    async def flush(self):
91        """Flush internal buffers and permanently persist events"""

Flush internal buffers and permanently persist events

Inherited Members
hat.aio.Resource
async_group
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
class Module(hat.aio.Resource):
101class Module(aio.Resource):
102    """Module ABC
103
104    Module is implemented as python module which is dynamically imported.
105    It is expected that this module implements:
106
107        * json_schema_id (typing.Optional[str]): JSON schema id
108        * json_schema_repo (typing.Optional[json.SchemaRepository]):
109            JSON schema repo
110        * create (CreateModule): create new module instance
111
112    If module defines JSON schema repository and JSON schema id, JSON schema
113    repository will be used for additional validation of module configuration
114    with JSON schema id.
115
116    Module's `subscription` is constant during module's lifetime.
117
118    """
119
120    @property
121    @abc.abstractmethod
122    def subscription(self) -> Subscription:
123        """Subscribed event types filter"""
124
125    async def on_session_start(self,
126                               session_id: int):
127        """Called on start of a session, identified by session_id."""
128
129    async def on_session_stop(self,
130                              session_id: int):
131        """Called on stop of a session, identified by session_id."""
132
133    @abc.abstractmethod
134    async def process(self,
135                      source: Source,
136                      event: Event
137                      ) -> typing.AsyncIterable[RegisterEvent]:
138        """Process new session event.
139
140        Provided event is matched by modules subscription filter.
141
142        Processing of session event can result in registration of
143        new register events.
144
145        Single module session process is always called sequentially.
146
147        """

Module ABC

Module is implemented as python module which is dynamically imported.

It is expected that this module implements:
  • json_schema_id (typing.Optional[str]): JSON schema id
  • json_schema_repo (typing.Optional[json.SchemaRepository]): JSON schema repo
  • create (CreateModule): create new module instance

If module defines JSON schema repository and JSON schema id, JSON schema repository will be used for additional validation of module configuration with JSON schema id.

Module's subscription is constant during module's lifetime.

subscription: hat.event.common._csubscription.Subscription

Subscribed event types filter

async def on_session_start(self, session_id: int):
125    async def on_session_start(self,
126                               session_id: int):
127        """Called on start of a session, identified by session_id."""

Called on start of a session, identified by session_id.

async def on_session_stop(self, session_id: int):
129    async def on_session_stop(self,
130                              session_id: int):
131        """Called on stop of a session, identified by session_id."""

Called on stop of a session, identified by session_id.

@abc.abstractmethod
async def process( self, source: hat.event.server.common.Source, event: hat.event.common.data.Event) -> AsyncIterable[hat.event.common.data.RegisterEvent]:
133    @abc.abstractmethod
134    async def process(self,
135                      source: Source,
136                      event: Event
137                      ) -> typing.AsyncIterable[RegisterEvent]:
138        """Process new session event.
139
140        Provided event is matched by modules subscription filter.
141
142        Processing of session event can result in registration of
143        new register events.
144
145        Single module session process is always called sequentially.
146
147        """

Process new session event.

Provided event is matched by modules subscription filter.

Processing of session event can result in registration of new register events.

Single module session process is always called sequentially.

Inherited Members
hat.aio.Resource
async_group
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close