hat.event.common

Common functionality shared between clients and event server

 1"""Common functionality shared between clients and event server"""
 2
 3from hat.event.common.data import (json_schema_repo,
 4                                   sbs_repo,
 5                                   EventType,
 6                                   Order,
 7                                   OrderBy,
 8                                   EventPayloadType,
 9                                   EventId,
10                                   EventPayload,
11                                   SbsData,
12                                   Event,
13                                   RegisterEvent,
14                                   QueryData,
15                                   event_to_sbs,
16                                   event_from_sbs,
17                                   register_event_to_sbs,
18                                   register_event_from_sbs,
19                                   query_to_sbs,
20                                   query_from_sbs,
21                                   event_payload_to_sbs,
22                                   event_payload_from_sbs)
23from hat.event.common.timestamp import (Timestamp,
24                                        now,
25                                        timestamp_to_bytes,
26                                        timestamp_from_bytes,
27                                        timestamp_to_float,
28                                        timestamp_from_float,
29                                        timestamp_to_datetime,
30                                        timestamp_from_datetime,
31                                        timestamp_to_sbs,
32                                        timestamp_from_sbs)
33from hat.event.common.subscription import (matches_query_type,
34                                           Subscription)
35
36
37__all__ = ['json_schema_repo',
38           'sbs_repo',
39           'EventType',
40           'Order',
41           'OrderBy',
42           'EventPayloadType',
43           'EventId',
44           'EventPayload',
45           'SbsData',
46           'Event',
47           'RegisterEvent',
48           'QueryData',
49           'event_to_sbs',
50           'event_from_sbs',
51           'register_event_to_sbs',
52           'register_event_from_sbs',
53           'query_to_sbs',
54           'query_from_sbs',
55           'event_payload_to_sbs',
56           'event_payload_from_sbs',
57           'Timestamp',
58           'now',
59           'timestamp_to_bytes',
60           'timestamp_from_bytes',
61           'timestamp_to_float',
62           'timestamp_from_float',
63           'timestamp_to_datetime',
64           'timestamp_from_datetime',
65           'timestamp_to_sbs',
66           'timestamp_from_sbs',
67           'matches_query_type',
68           'Subscription']
json_schema_repo = <hat.json.repository.SchemaRepository object>
sbs_repo = <hat.sbs.repository.Repository object>
EventType = typing.Tuple[str, ...]
class Order(enum.Enum):

An enumeration.

DESCENDING = <Order.DESCENDING: 1>
ASCENDING = <Order.ASCENDING: 2>
Inherited Members
enum.Enum
name
value
class OrderBy(enum.Enum):

An enumeration.

TIMESTAMP = <OrderBy.TIMESTAMP: 1>
SOURCE_TIMESTAMP = <OrderBy.SOURCE_TIMESTAMP: 2>
Inherited Members
enum.Enum
name
value
class EventPayloadType(enum.Enum):

An enumeration.

BINARY = <EventPayloadType.BINARY: 1>
JSON = <EventPayloadType.JSON: 2>
Inherited Members
enum.Enum
name
value
class EventId(typing.NamedTuple):
55class EventId(typing.NamedTuple):
56    server: int
57    """server identifier"""
58    session: int
59    """session identifier"""
60    instance: int
61    """event instance identifier"""

EventId(server, session, instance)

EventId(server: int, session: int, instance: int)

Create new instance of EventId(server, session, instance)

server: int

server identifier

session: int

session identifier

instance: int

event instance identifier

Inherited Members
builtins.tuple
index
count
class EventPayload(typing.NamedTuple):
64class EventPayload(typing.NamedTuple):
65    type: EventPayloadType
66    data: typing.Union[bytes, json.Data, 'SbsData']

EventPayload(type, data)

EventPayload( type: EventPayloadType, data: Union[bytes, NoneType, bool, int, float, str, list[ForwardRef('Data')], dict[str, ForwardRef('Data')], ForwardRef('SbsData')])

Create new instance of EventPayload(type, data)

Alias for field number 0

data: Union[bytes, NoneType, bool, int, float, str, list[ForwardRef('Data')], dict[str, ForwardRef('Data')], ForwardRef('SbsData')]

Alias for field number 1

Inherited Members
builtins.tuple
index
count
class SbsData(typing.NamedTuple):
69class SbsData(typing.NamedTuple):
70    module: str | None
71    """SBS module name"""
72    type: str
73    """SBS type name"""
74    data: bytes

SbsData(module, type, data)

SbsData(module: str | None, type: str, data: bytes)

Create new instance of SbsData(module, type, data)

module: str | None

SBS module name

type: str

SBS type name

data: bytes

Alias for field number 2

Inherited Members
builtins.tuple
index
count
class Event(typing.NamedTuple):
77class Event(typing.NamedTuple):
78    event_id: EventId
79    event_type: EventType
80    timestamp: Timestamp
81    source_timestamp: Timestamp | None
82    payload: EventPayload | None

Event(event_id, event_type, timestamp, source_timestamp, payload)

Event( event_id: EventId, event_type: Tuple[str, ...], timestamp: Timestamp, source_timestamp: Timestamp | None, payload: EventPayload | None)

Create new instance of Event(event_id, event_type, timestamp, source_timestamp, payload)

event_id: EventId

Alias for field number 0

event_type: Tuple[str, ...]

Alias for field number 1

timestamp: Timestamp

Alias for field number 2

source_timestamp: Timestamp | None

Alias for field number 3

payload: EventPayload | None

Alias for field number 4

Inherited Members
builtins.tuple
index
count
class RegisterEvent(typing.NamedTuple):
85class RegisterEvent(typing.NamedTuple):
86    event_type: EventType
87    source_timestamp: Timestamp | None
88    payload: EventPayload | None

RegisterEvent(event_type, source_timestamp, payload)

RegisterEvent( event_type: Tuple[str, ...], source_timestamp: Timestamp | None, payload: EventPayload | None)

Create new instance of RegisterEvent(event_type, source_timestamp, payload)

event_type: Tuple[str, ...]

Alias for field number 0

source_timestamp: Timestamp | None

Alias for field number 1

payload: EventPayload | None

Alias for field number 2

Inherited Members
builtins.tuple
index
count
class QueryData(typing.NamedTuple):
 91class QueryData(typing.NamedTuple):
 92    server_id: int | None = None
 93    event_ids: list[EventId] | None = None
 94    event_types: list[EventType] | None = None
 95    t_from: Timestamp | None = None
 96    t_to: Timestamp | None = None
 97    source_t_from: Timestamp | None = None
 98    source_t_to: Timestamp | None = None
 99    payload: EventPayload | None = None
100    order: Order = Order.DESCENDING
101    order_by: OrderBy = OrderBy.TIMESTAMP
102    unique_type: bool = False
103    max_results: int | None = None

QueryData(server_id, event_ids, event_types, t_from, t_to, source_t_from, source_t_to, payload, order, order_by, unique_type, max_results)

QueryData( server_id: int | None = None, event_ids: list[EventId] | None = None, event_types: list[typing.Tuple[str, ...]] | None = None, t_from: Timestamp | None = None, t_to: Timestamp | None = None, source_t_from: Timestamp | None = None, source_t_to: Timestamp | None = None, payload: EventPayload | None = None, order: Order = <Order.DESCENDING: 1>, order_by: OrderBy = <OrderBy.TIMESTAMP: 1>, unique_type: bool = False, max_results: int | None = None)

Create new instance of QueryData(server_id, event_ids, event_types, t_from, t_to, source_t_from, source_t_to, payload, order, order_by, unique_type, max_results)

server_id: int | None

Alias for field number 0

event_ids: list[EventId] | None

Alias for field number 1

event_types: list[typing.Tuple[str, ...]] | None

Alias for field number 2

t_from: Timestamp | None

Alias for field number 3

t_to: Timestamp | None

Alias for field number 4

source_t_from: Timestamp | None

Alias for field number 5

source_t_to: Timestamp | None

Alias for field number 6

payload: EventPayload | None

Alias for field number 7

order: Order

Alias for field number 8

order_by: OrderBy

Alias for field number 9

unique_type: bool

Alias for field number 10

max_results: int | None

Alias for field number 11

Inherited Members
builtins.tuple
index
count
def event_to_sbs( event: Event) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
106def event_to_sbs(event: Event) -> sbs.Data:
107    """Convert Event to SBS data"""
108    return {
109        'id': _event_id_to_sbs(event.event_id),
110        'type': list(event.event_type),
111        'timestamp': timestamp_to_sbs(event.timestamp),
112        'sourceTimestamp': _optional_to_sbs(event.source_timestamp,
113                                            timestamp_to_sbs),
114        'payload': _optional_to_sbs(event.payload, event_payload_to_sbs)}

Convert Event to SBS data

def event_from_sbs( data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> Event:
117def event_from_sbs(data: sbs.Data) -> Event:
118    """Create new Event based on SBS data"""
119    return Event(
120        event_id=_event_id_from_sbs(data['id']),
121        event_type=tuple(data['type']),
122        timestamp=timestamp_from_sbs(data['timestamp']),
123        source_timestamp=_optional_from_sbs(data['sourceTimestamp'],
124                                            timestamp_from_sbs),
125        payload=_optional_from_sbs(data['payload'], event_payload_from_sbs))

Create new Event based on SBS data

def register_event_to_sbs( event: RegisterEvent) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
128def register_event_to_sbs(event: RegisterEvent) -> sbs.Data:
129    """Convert RegisterEvent to SBS data"""
130    return {
131        'type': list(event.event_type),
132        'sourceTimestamp': _optional_to_sbs(event.source_timestamp,
133                                            timestamp_to_sbs),
134        'payload': _optional_to_sbs(event.payload, event_payload_to_sbs)}

Convert RegisterEvent to SBS data

def register_event_from_sbs( data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> RegisterEvent:
137def register_event_from_sbs(data: sbs.Data) -> RegisterEvent:
138    """Create new RegisterEvent based on SBS data"""
139    return RegisterEvent(
140        event_type=tuple(data['type']),
141        source_timestamp=_optional_from_sbs(data['sourceTimestamp'],
142                                            timestamp_from_sbs),
143        payload=_optional_from_sbs(data['payload'], event_payload_from_sbs))

Create new RegisterEvent based on SBS data

def query_to_sbs( query: QueryData) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
146def query_to_sbs(query: QueryData) -> sbs.Data:
147    """Convert QueryData to SBS data"""
148    return {
149        'serverId': _optional_to_sbs(query.server_id),
150        'ids': _optional_to_sbs(query.event_ids, lambda ids: [
151            _event_id_to_sbs(i) for i in ids]),
152        'types': _optional_to_sbs(query.event_types, lambda ets: [
153            list(et) for et in ets]),
154        'tFrom': _optional_to_sbs(query.t_from, timestamp_to_sbs),
155        'tTo': _optional_to_sbs(query.t_to, timestamp_to_sbs),
156        'sourceTFrom': _optional_to_sbs(query.source_t_from, timestamp_to_sbs),
157        'sourceTTo': _optional_to_sbs(query.source_t_to, timestamp_to_sbs),
158        'payload': _optional_to_sbs(query.payload, event_payload_to_sbs),
159        'order': {Order.DESCENDING: ('descending', None),
160                  Order.ASCENDING: ('ascending', None)}[query.order],
161        'orderBy': {OrderBy.TIMESTAMP: ('timestamp', None),
162                    OrderBy.SOURCE_TIMESTAMP: ('sourceTimestamp', None)
163                    }[query.order_by],
164        'uniqueType': query.unique_type,
165        'maxResults': _optional_to_sbs(query.max_results)}

Convert QueryData to SBS data

def query_from_sbs( data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> QueryData:
168def query_from_sbs(data: sbs.Data) -> QueryData:
169    """Create new QueryData based on SBS data"""
170    return QueryData(
171        server_id=_optional_from_sbs(data['serverId']),
172        event_ids=_optional_from_sbs(data['ids'], lambda ids: [
173            _event_id_from_sbs(i) for i in ids]),
174        event_types=_optional_from_sbs(data['types'], lambda ets: [
175            tuple(et) for et in ets]),
176        t_from=_optional_from_sbs(data['tFrom'], timestamp_from_sbs),
177        t_to=_optional_from_sbs(data['tTo'], timestamp_from_sbs),
178        source_t_from=_optional_from_sbs(data['sourceTFrom'],
179                                         timestamp_from_sbs),
180        source_t_to=_optional_from_sbs(data['sourceTTo'], timestamp_from_sbs),
181        payload=_optional_from_sbs(data['payload'], event_payload_from_sbs),
182        order={'descending': Order.DESCENDING,
183               'ascending': Order.ASCENDING}[data['order'][0]],
184        order_by={'timestamp': OrderBy.TIMESTAMP,
185                  'sourceTimestamp': OrderBy.SOURCE_TIMESTAMP
186                  }[data['orderBy'][0]],
187        unique_type=data['uniqueType'],
188        max_results=_optional_from_sbs(data['maxResults']))

Create new QueryData based on SBS data

def event_payload_to_sbs( payload: EventPayload) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
191def event_payload_to_sbs(payload: EventPayload) -> sbs.Data:
192    """Convert EventPayload to SBS data"""
193    if payload.type == EventPayloadType.BINARY:
194        return 'binary', payload.data
195
196    if payload.type == EventPayloadType.JSON:
197        return 'json', json.encode(payload.data)
198
199    if payload.type == EventPayloadType.SBS:
200        return 'sbs', _sbs_data_to_sbs(payload.data)
201
202    raise ValueError('unsupported payload type')

Convert EventPayload to SBS data

def event_payload_from_sbs( data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> EventPayload:
205def event_payload_from_sbs(data: sbs.Data) -> EventPayload:
206    """Create new EventPayload based on SBS data"""
207    data_type, data_data = data
208
209    if data_type == 'binary':
210        return EventPayload(type=EventPayloadType.BINARY,
211                            data=data_data)
212
213    if data_type == 'json':
214        return EventPayload(type=EventPayloadType.JSON,
215                            data=json.decode(data_data))
216
217    if data_type == 'sbs':
218        return EventPayload(type=EventPayloadType.SBS,
219                            data=_sbs_data_from_sbs(data_data))
220
221    raise ValueError('unsupported payload type')

Create new EventPayload based on SBS data

class Timestamp(typing.NamedTuple):
 9class Timestamp(typing.NamedTuple):
10    s: int
11    """seconds since 1970-01-01 (can be negative)"""
12    us: int
13    """microseconds added to timestamp seconds in range [0, 1e6)"""
14
15    def __lt__(self, other):
16        if not isinstance(other, Timestamp):
17            return NotImplemented
18        return self.s * 1000000 + self.us < other.s * 1000000 + other.us
19
20    def __gt__(self, other):
21        if not isinstance(other, Timestamp):
22            return NotImplemented
23        return self.s * 1000000 + self.us > other.s * 1000000 + other.us
24
25    def __eq__(self, other):
26        if not isinstance(other, Timestamp):
27            return NotImplemented
28        return self.s * 1000000 + self.us == other.s * 1000000 + other.us
29
30    def __ne__(self, other):
31        return not self == other
32
33    def __le__(self, other):
34        return self < other or self == other
35
36    def __ge__(self, other):
37        return self > other or self == other
38
39    def __hash__(self):
40        return self.s * 1000000 + self.us
41
42    def add(self, s: float) -> 'Timestamp':
43        """Create new timestamp by adding seconds to existing timestamp"""
44        us = self.us + round((s - int(s)) * 1e6)
45        s = self.s + int(s)
46        return Timestamp(s=s + us // int(1e6),
47                         us=us % int(1e6))

Timestamp(s, us)

Timestamp(s: int, us: int)

Create new instance of Timestamp(s, us)

s: int

seconds since 1970-01-01 (can be negative)

us: int

microseconds added to timestamp seconds in range [0, 1e6)

def add(self, s: float) -> Timestamp:
42    def add(self, s: float) -> 'Timestamp':
43        """Create new timestamp by adding seconds to existing timestamp"""
44        us = self.us + round((s - int(s)) * 1e6)
45        s = self.s + int(s)
46        return Timestamp(s=s + us // int(1e6),
47                         us=us % int(1e6))

Create new timestamp by adding seconds to existing timestamp

Inherited Members
builtins.tuple
index
count
def now() -> Timestamp:
50def now() -> Timestamp:
51    """Create new timestamp representing current time"""
52    return timestamp_from_datetime(
53        datetime.datetime.now(datetime.timezone.utc))

Create new timestamp representing current time

def timestamp_to_bytes(t: Timestamp) -> bytes:
56def timestamp_to_bytes(t: Timestamp) -> bytes:
57    """Convert timestamp to 12 byte representation
58
59    Bytes [0, 8] are big endian unsigned `Timestamp.s` + 2^63 and
60    bytes [9, 12] are big endian unsigned `Timestamp.us`.
61
62    """
63    return struct.pack(">QI", t.s + (1 << 63), t.us)

Convert timestamp to 12 byte representation

Bytes [0, 8] are big endian unsigned Timestamp.s + 2^63 and bytes [9, 12] are big endian unsigned Timestamp.us.

def timestamp_from_bytes(data: bytes) -> Timestamp:
66def timestamp_from_bytes(data: bytes) -> Timestamp:
67    """Create new timestamp from 12 byte representation
68
69    Bytes representation is same as defined for `timestamp_to_bytes` function.
70
71    """
72    s, us = struct.unpack(">QI", data)
73    return Timestamp(s - (1 << 63), us)

Create new timestamp from 12 byte representation

Bytes representation is same as defined for timestamp_to_bytes function.

def timestamp_to_float(t: Timestamp) -> float:
76def timestamp_to_float(t: Timestamp) -> float:
77    """Convert timestamp to floating number of seconds since 1970-01-01 UTC
78
79    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.
80
81    """
82    return t.s + t.us * 1E-6

Convert timestamp to floating number of seconds since 1970-01-01 UTC

For precise serialization see timestamp_to_bytes/timestamp_from_bytes.

def timestamp_from_float(ts: float) -> Timestamp:
85def timestamp_from_float(ts: float) -> Timestamp:
86    """Create timestamp from floating number of seconds since 1970-01-01 UTC
87
88    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.
89
90    """
91    s = int(ts)
92    if ts < 0:
93        s = s - 1
94    us = round((ts - s) * 1E6)
95    if us == 1000000:
96        return Timestamp(s + 1, 0)
97    else:
98        return Timestamp(s, us)

Create timestamp from floating number of seconds since 1970-01-01 UTC

For precise serialization see timestamp_to_bytes/timestamp_from_bytes.

def timestamp_to_datetime(t: Timestamp) -> datetime.datetime:
101def timestamp_to_datetime(t: Timestamp) -> datetime.datetime:
102    """Convert timestamp to datetime (representing utc time)
103
104    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.
105
106    """
107    try:
108        dt_from_s = datetime.datetime.fromtimestamp(t.s, datetime.timezone.utc)
109    except OSError:
110        dt_from_s = (
111            datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) +
112            datetime.timedelta(seconds=t.s))
113    return datetime.datetime(
114        year=dt_from_s.year,
115        month=dt_from_s.month,
116        day=dt_from_s.day,
117        hour=dt_from_s.hour,
118        minute=dt_from_s.minute,
119        second=dt_from_s.second,
120        microsecond=t.us,
121        tzinfo=datetime.timezone.utc)

Convert timestamp to datetime (representing utc time)

For precise serialization see timestamp_to_bytes/timestamp_from_bytes.

def timestamp_from_datetime(dt: datetime.datetime) -> Timestamp:
124def timestamp_from_datetime(dt: datetime.datetime) -> Timestamp:
125    """Create new timestamp from datetime
126
127    If `tzinfo` is not set, it is assumed that provided datetime represents
128    utc time.
129
130    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.
131
132    """
133    if not dt.tzinfo:
134        dt = dt.replace(tzinfo=datetime.timezone.utc)
135    s = int(dt.timestamp())
136    if dt.timestamp() < 0:
137        s = s - 1
138    return Timestamp(s=s, us=dt.microsecond)

Create new timestamp from datetime

If tzinfo is not set, it is assumed that provided datetime represents utc time.

For precise serialization see timestamp_to_bytes/timestamp_from_bytes.

def timestamp_to_sbs( t: Timestamp) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
141def timestamp_to_sbs(t: Timestamp) -> sbs.Data:
142    """Convert timestamp to SBS data"""
143    return {'s': t.s, 'us': t.us}

Convert timestamp to SBS data

def timestamp_from_sbs( data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> Timestamp:
146def timestamp_from_sbs(data: sbs.Data) -> Timestamp:
147    """Create new timestamp from SBS data"""
148    return Timestamp(s=data['s'], us=data['us'])

Create new timestamp from SBS data

def matches_query_type(event_type: Tuple[str, ...], query_type: Tuple[str, ...]) -> bool:
 9def matches_query_type(event_type: EventType,
10                       query_type: EventType
11                       ) -> bool:
12    """Determine if event type matches query type
13
14    Event type is tested if it matches query type according to the following
15    rules:
16
17        * Matching is performed on subtypes in increasing order.
18        * Event type is a match only if all its subtypes are matched by
19          corresponding query subtypes.
20        * Matching is finished when all query subtypes are exhausted.
21        * Query subtype '?' matches exactly one event subtype of any value.
22          The subtype must exist.
23        * Query subtype '*' matches 0 or more event subtypes of any value. It
24          must be the last query subtype.
25        * All other values of query subtype match exactly one event subtype
26          of the same value.
27        * Query type without subtypes is matched only by event type with no
28          subtypes.
29
30    As a consequence of aforementioned matching rules, event subtypes '*' and
31    '?' cannot be directly matched and it is advisable not to use them in event
32    types.
33
34    """
35    is_variable = bool(query_type and query_type[-1] == '*')
36    if is_variable:
37        query_type = query_type[:-1]
38
39    if len(event_type) < len(query_type):
40        return False
41
42    if len(event_type) > len(query_type) and not is_variable:
43        return False
44
45    for i, j in zip(event_type, query_type):
46        if j != '?' and i != j:
47            return False
48
49    return True

Determine if event type matches query type

Event type is tested if it matches query type according to the following rules:

* Matching is performed on subtypes in increasing order.
* Event type is a match only if all its subtypes are matched by
  corresponding query subtypes.
* Matching is finished when all query subtypes are exhausted.
* Query subtype '?' matches exactly one event subtype of any value.
  The subtype must exist.
* Query subtype '*' matches 0 or more event subtypes of any value. It
  must be the last query subtype.
* All other values of query subtype match exactly one event subtype
  of the same value.
* Query type without subtypes is matched only by event type with no
  subtypes.

As a consequence of aforementioned matching rules, event subtypes '*' and '?' cannot be directly matched and it is advisable not to use them in event types.

Subscription = <class 'hat.event.common.subscription.csubscription.CSubscription'>