
Common functionality shared between clients and event server

  1"""Common functionality shared between clients and event server"""
  3import time
  5from hat.event.common.backend import (BackendClosedError,
  6                                      Backend,
  7                                      BackendConf,
  8                                      BackendRegisteredEventsCb,
  9                                      BackendFlushedEventsCb,
 10                                      CreateBackend,
 11                                      BackendInfo,
 12                                      import_backend_info)
 13from hat.event.common.collection import (EventTypeCollection,
 14                                         create_event_type_collection)
 15from hat.event.common.common import (json_schema_repo,
 16                                     sbs_repo,
 17                                     ServerId,
 18                                     SessionId,
 19                                     InstanceId,
 20                                     EventTypeSegment,
 21                                     EventType,
 22                                     Timestamp,
 23                                     min_timestamp,
 24                                     max_timestamp,
 25                                     Status,
 26                                     Order,
 27                                     OrderBy,
 28                                     EventId,
 29                                     EventPayloadBinary,
 30                                     EventPayloadJson,
 31                                     EventPayload,
 32                                     Event,
 33                                     RegisterEvent,
 34                                     QueryLatestParams,
 35                                     QueryTimeseriesParams,
 36                                     QueryServerParams,
 37                                     QueryParams,
 38                                     QueryResult)
 39from hat.event.common.encoder import (timestamp_to_bytes,
 40                                      timestamp_from_bytes,
 41                                      timestamp_to_float,
 42                                      timestamp_from_float,
 43                                      timestamp_to_datetime,
 44                                      timestamp_from_datetime,
 45                                      timestamp_to_sbs,
 46                                      timestamp_from_sbs,
 47                                      status_to_sbs,
 48                                      status_from_sbs,
 49                                      event_to_sbs,
 50                                      event_from_sbs,
 51                                      register_event_to_sbs,
 52                                      register_event_from_sbs,
 53                                      query_params_to_sbs,
 54                                      query_params_from_sbs,
 55                                      query_result_to_sbs,
 56                                      query_result_from_sbs,
 57                                      event_payload_to_sbs,
 58                                      event_payload_from_sbs)
 59from hat.event.common.matches import matches_query_type
 60from hat.event.common.module import (SourceType,
 61                                     Source,
 62                                     Engine,
 63                                     Module,
 64                                     ModuleConf,
 65                                     CreateModule,
 66                                     ModuleInfo,
 67                                     import_module_info)
 68from hat.event.common.subscription import (Subscription,
 69                                           create_subscription)
140def now() -> Timestamp:
141    """Create new timestamp representing current time"""
142    return timestamp_from_float(time.time())
Backend closed

Backend ABC

async def register( self, events: Collection[Event]) -> Collection[Event] | None:
29    @abc.abstractmethod
30    async def register(self,
31                       events: Collection[Event]
32                       ) -> Collection[Event] | None:
33        """Register events"""

Register events

BackendConf = typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]
BackendRegisteredEventsCb = typing.Callable[[collections.abc.Collection[Event]], None | collections.abc.Awaitable[None]]
BackendFlushedEventsCb = typing.Callable[[collections.abc.Collection[Event]], None | collections.abc.Awaitable[None]]
CreateBackend = typing.Callable[[typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]], typing.Optional[typing.Callable[[collections.abc.Collection[Event]], None | collections.abc.Awaitable[None]]], typing.Optional[typing.Callable[[collections.abc.Collection[Event]], None | collections.abc.Awaitable[None]]]], Backend | collections.abc.Awaitable[Backend]]
class BackendInfo(typing.NamedTuple):
67class BackendInfo(typing.NamedTuple):
68    """Backend info
70    Backend is implemented as python module which is dynamically imported.
71    It is expected that this module contains `info` which is instance of
72    `BackendInfo`.
74    If backend defines JSON schema repository and JSON schema id, JSON schema
75    repository will be used for additional validation of backend configuration
76    with JSON schema id.
78    """
79    create: CreateBackend
80    json_schema_id: str | None = None
81    json_schema_repo: json.SchemaRepository | None = None

Backend info

Backend is implemented as python module which is dynamically imported. It is expected that this module contains info which is instance of BackendInfo.

If backend defines JSON schema repository and JSON schema id, JSON schema repository will be used for additional validation of backend configuration with JSON schema id.

def import_backend_info(py_module_str: str) -> BackendInfo:
84def import_backend_info(py_module_str: str) -> BackendInfo:
85    """Import backend info"""
86    py_module = importlib.import_module(py_module_str)
87    info = py_module.info
89    if not isinstance(info, BackendInfo):
90        raise Exception('invalid backend implementation')
92    return info

Import backend info

def create_event_type_collection( items: Iterable[Subscription, ~T] = []) -> EventTypeCollection[~T]:
23def create_event_type_collection(items: Iterable[Subscription, T] = []
24                                 ) -> EventTypeCollection[T]:
25    if CTreeEventTypeCollection is not None:
26        return CTreeEventTypeCollection(items)
28    return PyTreeEventTypeCollection(items)
sbs_repo = <hat.sbs.repository.Repository object>
ServerId = <class 'int'>
SessionId = <class 'int'>
InstanceId = <class 'int'>
EventTypeSegment = <class 'str'>
EventType = tuple[str, ...]
class Timestamp(typing.NamedTuple):
40class Timestamp(typing.NamedTuple):
41    s: int
42    """seconds since 1970-01-01 (can be negative)"""
43    us: int
44    """microseconds added to timestamp seconds in range [0, 1e6)"""
46    def add(self, s: float) -> 'Timestamp':
47        """Create new timestamp by adding seconds to existing timestamp"""
48        us = self.us + round((s - int(s)) * 1e6)
49        s = self.s + int(s)
50        return Timestamp(s=s + us // int(1e6),
51                         us=us % int(1e6))

min_timestamp = Timestamp(s=-9223372036854775808, us=0)
max_timestamp = Timestamp(s=9223372036854775807, us=999999)
class EventId(typing.NamedTuple):
78class EventId(typing.NamedTuple):
79    server: ServerId
80    session: SessionId
81    instance: InstanceId

class EventPayloadBinary(typing.NamedTuple):
84class EventPayloadBinary(typing.NamedTuple):
85    type: str
86    data: util.Bytes

class EventPayloadJson(typing.NamedTuple):
89class EventPayloadJson(typing.NamedTuple):
90    data: json.Data


class Event(typing.NamedTuple):
 96class Event(typing.NamedTuple):
 97    """Event
 99    Operators `>` and `<` test for natural order where it is assumed that
100    first operand is registered before second operand.
102    """
104    id: EventId
105    type: EventType
106    timestamp: Timestamp
107    source_timestamp: Timestamp | None
108    payload: EventPayload | None
110    def __lt__(self, other):
111        if not isinstance(other, Event):
112            return NotImplemented
114        if self.id == other.id:
115            return False
117        if self.id.server == other.id.server:
118            return self.id < other.id
120        if self.timestamp != other.timestamp:
121            return self.timestamp < other.timestamp
123        return True
125    def __gt__(self, other):
126        if not isinstance(other, Event):
127            return NotImplemented
129        if self.id == other.id:
130            return False
132        if self.id.server == other.id.server:
133            return self.id > other.id
135        if self.timestamp != other.timestamp:
136            return self.timestamp > other.timestamp
138        return False


class RegisterEvent(typing.NamedTuple):
141class RegisterEvent(typing.NamedTuple):
142    type: EventType
143    source_timestamp: Timestamp | None
144    payload: EventPayload | None

class QueryLatestParams(typing.NamedTuple):
147class QueryLatestParams(typing.NamedTuple):
148    event_types: Collection[EventType] | None = None


class QueryTimeseriesParams(typing.NamedTuple):
151class QueryTimeseriesParams(typing.NamedTuple):
152    event_types: Collection[EventType] | None = None
153    t_from: Timestamp | None = None
154    t_to: Timestamp | None = None
155    source_t_from: Timestamp | None = None
156    source_t_to: Timestamp | None = None
157    order: Order = Order.DESCENDING
158    order_by: OrderBy = OrderBy.TIMESTAMP
159    max_results: int | None = None
160    last_event_id: EventId | None = None

class QueryServerParams(typing.NamedTuple):
163class QueryServerParams(typing.NamedTuple):
164    server_id: ServerId
165    persisted: bool = False
166    max_results: int | None = None
167    last_event_id: EventId | None = None

class QueryResult(typing.NamedTuple):
175class QueryResult(typing.NamedTuple):
176    events: Collection[Event]
177    more_follows: bool

def timestamp_to_bytes(t: Timestamp) -> bytes | bytearray | memoryview:
26def timestamp_to_bytes(t: Timestamp) -> util.Bytes:
27    """Convert timestamp to 12 byte representation
29    Bytes [0, 8] are big endian unsigned `Timestamp.s` + 2^63 and
30    bytes [9, 12] are big endian unsigned `Timestamp.us`.
32    """
33    return struct.pack(">QI", t.s + (1 << 63), t.us)

Convert timestamp to 12 byte representation

Bytes [0, 8] are big endian unsigned Timestamp.s + 2^63 and bytes [9, 12] are big endian unsigned Timestamp.us.

def timestamp_from_bytes( data: bytes | bytearray | memoryview) -> Timestamp:
36def timestamp_from_bytes(data: util.Bytes) -> Timestamp:
37    """Create new timestamp from 12 byte representation
39    Bytes representation is same as defined for `timestamp_to_bytes` function.
41    """
42    s, us = struct.unpack(">QI", data)
43    return Timestamp(s - (1 << 63), us)

Create new timestamp from 12 byte representation

Bytes representation is same as defined for timestamp_to_bytes function.

def timestamp_to_float(t: Timestamp) -> float:
46def timestamp_to_float(t: Timestamp) -> float:
47    """Convert timestamp to floating number of seconds since 1970-01-01 UTC
49    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.
51    """
52    return t.s + t.us * 1E-6

Convert timestamp to floating number of seconds since 1970-01-01 UTC

For precise serialization see timestamp_to_bytes/timestamp_from_bytes.

def timestamp_from_float(ts: float) -> Timestamp:
55def timestamp_from_float(ts: float) -> Timestamp:
56    """Create timestamp from floating number of seconds since 1970-01-01 UTC
58    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.
60    """
61    s = int(ts)
62    if ts < 0:
63        s = s - 1
65    us = round((ts - s) * 1E6)
67    if us == 1_000_000:
68        return Timestamp(s + 1, 0)
70    else:
71        return Timestamp(s, us)

Create timestamp from floating number of seconds since 1970-01-01 UTC

For precise serialization see timestamp_to_bytes/timestamp_from_bytes.

def timestamp_to_datetime(t: Timestamp) -> datetime.datetime:
74def timestamp_to_datetime(t: Timestamp) -> datetime.datetime:
75    """Convert timestamp to datetime (representing utc time)
77    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.
79    """
80    try:
81        dt_from_s = datetime.datetime.fromtimestamp(t.s, datetime.timezone.utc)
83    except OSError:
84        dt_from_s = (
85            datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) +
86            datetime.timedelta(seconds=t.s))
88    return datetime.datetime(
89        year=dt_from_s.year,
90        month=dt_from_s.month,
91        day=dt_from_s.day,
92        hour=dt_from_s.hour,
93        minute=dt_from_s.minute,
94        second=dt_from_s.second,
95        microsecond=t.us,
96        tzinfo=datetime.timezone.utc)

Convert timestamp to datetime (representing utc time)

For precise serialization see timestamp_to_bytes/timestamp_from_bytes.

def timestamp_from_datetime(dt: datetime.datetime) -> Timestamp:
 99def timestamp_from_datetime(dt: datetime.datetime) -> Timestamp:
100    """Create new timestamp from datetime
102    If `tzinfo` is not set, it is assumed that provided datetime represents
103    utc time.
105    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.
107    """
108    if not dt.tzinfo:
109        dt = dt.replace(tzinfo=datetime.timezone.utc)
111    s = int(dt.timestamp())
113    if dt.timestamp() < 0:
114        s = s - 1
116    return Timestamp(s=s, us=dt.microsecond)

Create new timestamp from datetime

If tzinfo is not set, it is assumed that provided datetime represents utc time.

For precise serialization see timestamp_to_bytes/timestamp_from_bytes.

def matches_query_type(event_type: tuple[str, ...], query_type: tuple[str, ...]) -> bool:
 5def matches_query_type(event_type: EventType,
 6                       query_type: EventType
 7                       ) -> bool:
 8    """Determine if event type matches query type
10    Event type is tested if it matches query type according to the following
11    rules:
13        * Matching is performed on subtypes in increasing order.
14        * Event type is a match only if all its subtypes are matched by
15          corresponding query subtypes.
16        * Matching is finished when all query subtypes are exhausted.
17        * Query subtype '?' matches exactly one event subtype of any value.
18          The subtype must exist.
19        * Query subtype '*' matches 0 or more event subtypes of any value. It
20          must be the last query subtype.
21        * All other values of query subtype match exactly one event subtype
22          of the same value.
23        * Query type without subtypes is matched only by event type with no
24          subtypes.
26    As a consequence of aforementioned matching rules, event subtypes '*' and
27    '?' cannot be directly matched and it is advisable not to use them in event
28    types.
30    """
31    is_variable = bool(query_type and query_type[-1] == '*')
32    if is_variable:
33        query_type = query_type[:-1]
35    if len(event_type) < len(query_type):
36        return False
38    if len(event_type) > len(query_type) and not is_variable:
39        return False
41    for i, j in zip(event_type, query_type):
42        if j != '?' and i != j:
43            return False
45    return True

Determine if event type matches query type

Event type is tested if it matches query type according to the following rules:

* Matching is performed on subtypes in increasing order.
* Event type is a match only if all its subtypes are matched by
  corresponding query subtypes.
* Matching is finished when all query subtypes are exhausted.
* Query subtype '?' matches exactly one event subtype of any value.
  The subtype must exist.
* Query subtype '*' matches 0 or more event subtypes of any value. It
  must be the last query subtype.
* All other values of query subtype match exactly one event subtype
  of the same value.
* Query type without subtypes is matched only by event type with no

As a consequence of aforementioned matching rules, event subtypes '*' and '?' cannot be directly matched and it is advisable not to use them in event types.

class SourceType(enum.Enum):
18class SourceType(enum.Enum):
19    EVENTER = 1
20    MODULE = 2
21    ENGINE = 3
22    SERVER = 4

An enumeration.

EVENTER = <SourceType.EVENTER: 1>
MODULE = <SourceType.MODULE: 2>
ENGINE = <SourceType.ENGINE: 3>
SERVER = <SourceType.SERVER: 4>
class Source(typing.NamedTuple):
25class Source(typing.NamedTuple):
26    type: SourceType
27    id: int

class Engine(hat.aio.group.Resource):
30class Engine(aio.Resource):
31    """Engine ABC"""
33    @property
34    @abc.abstractmethod
35    def server_id(self) -> int:
36        """Event server identifier"""
38    @abc.abstractmethod
39    async def register(self,
40                       source: Source,
41                       events: Collection[RegisterEvent]
42                       ) -> Collection[Event] | None:
43        """Register events"""
45    @abc.abstractmethod
46    async def query(self,
47                    params: QueryParams
48                    ) -> QueryResult:
49        """Query events"""
51    @abc.abstractmethod
52    def get_client_names(self) -> Iterable[tuple[Source, str]]:
53        """Get client names connected to local eventer server"""
55    @abc.abstractmethod
56    def restart(self):
57        """Schedule engine restart"""
59    @abc.abstractmethod
60    def reset_monitor_ready(self):
61        """Schedule reseting of monitor component's ready flag"""

Engine ABC

class Module(hat.aio.group.Resource):
 64class Module(aio.Resource):
 65    """Module ABC"""
 67    @property
 68    @abc.abstractmethod
 69    def subscription(self) -> Subscription:
 70        """Subscribed event types filter.
 72        `subscription` is constant during module's lifetime.
 74        """
 76    async def on_session_start(self, session_id: int):
 77        """Called on start of a session, identified by session_id.
 79        This method can be coroutine or regular function.
 81        """
 83    async def on_session_stop(self, session_id: int):
 84        """Called on stop of a session, identified by session_id.
 86        This method can be coroutine or regular function.
 88        """
 90    @abc.abstractmethod
 91    async def process(self,
 92                      source: Source,
 93                      event: Event
 94                      ) -> Iterable[RegisterEvent] | None:
 95        """Process new session event.
 97        Provided event is matched by modules subscription filter.
 99        Processing of session event can result in registration of
100        new register events.
102        Single module session process is always called sequentially.
104        This method can be coroutine or regular function.
106        """

Module ABC

ModuleConf = typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]
CreateModule = typing.Callable[[typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]], Engine, Source], Module | collections.abc.Awaitable[Module]]
class ModuleInfo(typing.NamedTuple):
118class ModuleInfo(typing.NamedTuple):
119    """Module info
121    Module is implemented as python module which is dynamically imported.
122    It is expected that this module contains `info` which is instance of
123    `ModuleInfo`.
125    If module defines JSON schema repository and JSON schema id, JSON schema
126    repository will be used for additional validation of module configuration
127    with JSON schema id.
129    """
130    create: CreateModule
131    json_schema_id: str | None = None
132    json_schema_repo: json.SchemaRepository | None = None

Module info

Module is implemented as python module which is dynamically imported. It is expected that this module contains info which is instance of ModuleInfo.

If module defines JSON schema repository and JSON schema id, JSON schema repository will be used for additional validation of module configuration with JSON schema id.

def import_module_info(py_module_str: str) -> ModuleInfo:
135def import_module_info(py_module_str: str) -> ModuleInfo:
136    """Import module info"""
137    py_module = importlib.import_module(py_module_str)
138    info = py_module.info
140    if not isinstance(info, ModuleInfo):
141        raise Exception('invalid module implementation')
143    return info

Import module info

class Subscription(abc.ABC):
10class Subscription(abc.ABC):
11    """Subscription defined by query event types"""
13    @abc.abstractmethod
14    def __init__(self, query_types: Iterable[EventType]):
15        """Create subscription instance"""
17    @abc.abstractmethod
18    def get_query_types(self) -> Iterable[EventType]:
19        """Calculate sanitized query event types"""
21    @abc.abstractmethod
22    def matches(self, event_type: EventType) -> bool:
23        """Does `event_type` match subscription"""
25    @abc.abstractmethod
26    def union(self, *others: 'Subscription') -> 'Subscription':
27        """Create new subscription including event types from this and
28        other subscriptions."""
30    @abc.abstractmethod
31    def intersection(self, *others: 'Subscription') -> 'Subscription':
32        """Create new subscription containing event types in common with
33        other subscriptions."""
35    @abc.abstractmethod
36    def isdisjoint(self, other: 'Subscription') -> bool:
37        """Return ``True`` if this subscription has no event types in common
38        with other subscription."""

Subscription defined by query event types

def create_subscription( query_types: Iterable[tuple[str, ...]]) -> Subscription:
19def create_subscription(query_types: Iterable[EventType]) -> Subscription:
20    if CSubscription is not None:
21        return CSubscription(query_types)
23    return PySubscription(query_types)
def now() -> Timestamp:
141def now() -> Timestamp:
142    """Create new timestamp representing current time"""
143    return timestamp_from_float(time.time())

Create new timestamp representing current time