hat.event.common

Common functionality shared between clients and event server

 1"""Common functionality shared between clients and event server"""
 2
 3from hat.event.common.data import (json_schema_repo,
 4                                   sbs_repo,
 5                                   EventType,
 6                                   Order,
 7                                   OrderBy,
 8                                   EventPayloadType,
 9                                   EventId,
10                                   EventPayload,
11                                   SbsData,
12                                   Event,
13                                   RegisterEvent,
14                                   QueryData,
15                                   event_to_sbs,
16                                   event_from_sbs,
17                                   register_event_to_sbs,
18                                   register_event_from_sbs,
19                                   query_to_sbs,
20                                   query_from_sbs,
21                                   event_payload_to_sbs,
22                                   event_payload_from_sbs)
23from hat.event.common.timestamp import (Timestamp,
24                                        now,
25                                        timestamp_to_bytes,
26                                        timestamp_from_bytes,
27                                        timestamp_to_float,
28                                        timestamp_from_float,
29                                        timestamp_to_datetime,
30                                        timestamp_from_datetime,
31                                        timestamp_to_sbs,
32                                        timestamp_from_sbs)
33from hat.event.common.subscription import (matches_query_type,
34                                           Subscription)
35
36
37__all__ = ['json_schema_repo',
38           'sbs_repo',
39           'EventType',
40           'Order',
41           'OrderBy',
42           'EventPayloadType',
43           'EventId',
44           'EventPayload',
45           'SbsData',
46           'Event',
47           'RegisterEvent',
48           'QueryData',
49           'event_to_sbs',
50           'event_from_sbs',
51           'register_event_to_sbs',
52           'register_event_from_sbs',
53           'query_to_sbs',
54           'query_from_sbs',
55           'event_payload_to_sbs',
56           'event_payload_from_sbs',
57           'Timestamp',
58           'now',
59           'timestamp_to_bytes',
60           'timestamp_from_bytes',
61           'timestamp_to_float',
62           'timestamp_from_float',
63           'timestamp_to_datetime',
64           'timestamp_from_datetime',
65           'timestamp_to_sbs',
66           'timestamp_from_sbs',
67           'matches_query_type',
68           'Subscription']
json_schema_repo = <hat.json.repository.SchemaRepository object>
sbs_repo = <hat.sbs.repository.Repository object>
EventType = typing.Tuple[str, ...]
class Order(enum.Enum):

An enumeration.

DESCENDING = <Order.DESCENDING: 1>
ASCENDING = <Order.ASCENDING: 2>
Inherited Members
enum.Enum
name
value
class OrderBy(enum.Enum):

An enumeration.

TIMESTAMP = <OrderBy.TIMESTAMP: 1>
SOURCE_TIMESTAMP = <OrderBy.SOURCE_TIMESTAMP: 2>
Inherited Members
enum.Enum
name
value
class EventPayloadType(enum.Enum):

An enumeration.

BINARY = <EventPayloadType.BINARY: 1>
JSON = <EventPayloadType.JSON: 2>
Inherited Members
enum.Enum
name
value
class EventId(typing.NamedTuple):
47class EventId(typing.NamedTuple):
48    server: int
49    """server identifier"""
50    session: int
51    """session identifier"""
52    instance: int
53    """event instance identifier"""

EventId(server, session, instance)

EventId(server: int, session: int, instance: int)

Create new instance of EventId(server, session, instance)

server: int

server identifier

session: int

session identifier

instance: int

event instance identifier

Inherited Members
builtins.tuple
index
count
class EventPayload(typing.NamedTuple):
56class EventPayload(typing.NamedTuple):
57    type: EventPayloadType
58    data: typing.Union[bytes, json.Data, 'SbsData']

EventPayload(type, data)

EventPayload( type: hat.event.common.EventPayloadType, data: Union[bytes, ~Data, ForwardRef('SbsData')])

Create new instance of EventPayload(type, data)

Alias for field number 0

data: Union[bytes, ~Data, hat.event.common.SbsData]

Alias for field number 1

Inherited Members
builtins.tuple
index
count
class SbsData(typing.NamedTuple):
61class SbsData(typing.NamedTuple):
62    module: typing.Optional[str]
63    """SBS module name"""
64    type: str
65    """SBS type name"""
66    data: bytes

SbsData(module, type, data)

SbsData(module: Optional[str], type: str, data: bytes)

Create new instance of SbsData(module, type, data)

module: Optional[str]

SBS module name

type: str

SBS type name

data: bytes

Alias for field number 2

Inherited Members
builtins.tuple
index
count
class Event(typing.NamedTuple):
69class Event(typing.NamedTuple):
70    event_id: EventId
71    event_type: EventType
72    timestamp: 'Timestamp'
73    source_timestamp: typing.Optional['Timestamp']
74    payload: typing.Optional[EventPayload]

Event(event_id, event_type, timestamp, source_timestamp, payload)

Event( event_id: hat.event.common.EventId, event_type: Tuple[str, ...], timestamp: ForwardRef('Timestamp'), source_timestamp: Optional[ForwardRef('Timestamp')], payload: Optional[hat.event.common.EventPayload])

Create new instance of Event(event_id, event_type, timestamp, source_timestamp, payload)

Alias for field number 0

event_type: Tuple[str, ...]

Alias for field number 1

Alias for field number 2

source_timestamp: Optional[hat.event.common.Timestamp]

Alias for field number 3

payload: Optional[hat.event.common.EventPayload]

Alias for field number 4

Inherited Members
builtins.tuple
index
count
class RegisterEvent(typing.NamedTuple):
77class RegisterEvent(typing.NamedTuple):
78    event_type: EventType
79    source_timestamp: typing.Optional['Timestamp']
80    payload: typing.Optional[EventPayload]

RegisterEvent(event_type, source_timestamp, payload)

RegisterEvent( event_type: Tuple[str, ...], source_timestamp: Optional[ForwardRef('Timestamp')], payload: Optional[hat.event.common.EventPayload])

Create new instance of RegisterEvent(event_type, source_timestamp, payload)

event_type: Tuple[str, ...]

Alias for field number 0

source_timestamp: Optional[hat.event.common.Timestamp]

Alias for field number 1

payload: Optional[hat.event.common.EventPayload]

Alias for field number 2

Inherited Members
builtins.tuple
index
count
class QueryData(typing.NamedTuple):
83class QueryData(typing.NamedTuple):
84    server_id: typing.Optional[int] = None
85    event_ids: typing.Optional[typing.List[EventId]] = None
86    event_types: typing.Optional[typing.List[EventType]] = None
87    t_from: typing.Optional['Timestamp'] = None
88    t_to: typing.Optional['Timestamp'] = None
89    source_t_from: typing.Optional['Timestamp'] = None
90    source_t_to: typing.Optional['Timestamp'] = None
91    payload: typing.Optional[EventPayload] = None
92    order: Order = Order.DESCENDING
93    order_by: OrderBy = OrderBy.TIMESTAMP
94    unique_type: bool = False
95    max_results: typing.Optional[int] = None

QueryData(server_id, event_ids, event_types, t_from, t_to, source_t_from, source_t_to, payload, order, order_by, unique_type, max_results)

QueryData( server_id: Optional[int] = None, event_ids: Optional[List[hat.event.common.EventId]] = None, event_types: Optional[List[Tuple[str, ...]]] = None, t_from: Optional[ForwardRef('Timestamp')] = None, t_to: Optional[ForwardRef('Timestamp')] = None, source_t_from: Optional[ForwardRef('Timestamp')] = None, source_t_to: Optional[ForwardRef('Timestamp')] = None, payload: Optional[hat.event.common.EventPayload] = None, order: hat.event.common.Order = <Order.DESCENDING: 1>, order_by: hat.event.common.OrderBy = <OrderBy.TIMESTAMP: 1>, unique_type: bool = False, max_results: Optional[int] = None)

Create new instance of QueryData(server_id, event_ids, event_types, t_from, t_to, source_t_from, source_t_to, payload, order, order_by, unique_type, max_results)

server_id: Optional[int]

Alias for field number 0

event_ids: Optional[List[hat.event.common.EventId]]

Alias for field number 1

event_types: Optional[List[Tuple[str, ...]]]

Alias for field number 2

t_from: Optional[hat.event.common.Timestamp]

Alias for field number 3

t_to: Optional[hat.event.common.Timestamp]

Alias for field number 4

source_t_from: Optional[hat.event.common.Timestamp]

Alias for field number 5

source_t_to: Optional[hat.event.common.Timestamp]

Alias for field number 6

payload: Optional[hat.event.common.EventPayload]

Alias for field number 7

Alias for field number 8

Alias for field number 9

unique_type: bool

Alias for field number 10

max_results: Optional[int]

Alias for field number 11

Inherited Members
builtins.tuple
index
count
def event_to_sbs(event: hat.event.common.Event) -> ~Data:
 98def event_to_sbs(event: Event) -> sbs.Data:
 99    """Convert Event to SBS data"""
100    return {
101        'id': _event_id_to_sbs(event.event_id),
102        'type': list(event.event_type),
103        'timestamp': timestamp_to_sbs(event.timestamp),
104        'sourceTimestamp': _optional_to_sbs(event.source_timestamp,
105                                            timestamp_to_sbs),
106        'payload': _optional_to_sbs(event.payload, event_payload_to_sbs)}

Convert Event to SBS data

def event_from_sbs(data: ~Data) -> hat.event.common.Event:
109def event_from_sbs(data: sbs.Data) -> Event:
110    """Create new Event based on SBS data"""
111    return Event(
112        event_id=_event_id_from_sbs(data['id']),
113        event_type=tuple(data['type']),
114        timestamp=timestamp_from_sbs(data['timestamp']),
115        source_timestamp=_optional_from_sbs(data['sourceTimestamp'],
116                                            timestamp_from_sbs),
117        payload=_optional_from_sbs(data['payload'], event_payload_from_sbs))

Create new Event based on SBS data

def register_event_to_sbs(event: hat.event.common.RegisterEvent) -> ~Data:
120def register_event_to_sbs(event: RegisterEvent) -> sbs.Data:
121    """Convert RegisterEvent to SBS data"""
122    return {
123        'type': list(event.event_type),
124        'sourceTimestamp': _optional_to_sbs(event.source_timestamp,
125                                            timestamp_to_sbs),
126        'payload': _optional_to_sbs(event.payload, event_payload_to_sbs)}

Convert RegisterEvent to SBS data

def register_event_from_sbs(data: ~Data) -> hat.event.common.RegisterEvent:
129def register_event_from_sbs(data: sbs.Data) -> RegisterEvent:
130    """Create new RegisterEvent based on SBS data"""
131    return RegisterEvent(
132        event_type=tuple(data['type']),
133        source_timestamp=_optional_from_sbs(data['sourceTimestamp'],
134                                            timestamp_from_sbs),
135        payload=_optional_from_sbs(data['payload'], event_payload_from_sbs))

Create new RegisterEvent based on SBS data

def query_to_sbs(query: hat.event.common.QueryData) -> ~Data:
138def query_to_sbs(query: QueryData) -> sbs.Data:
139    """Convert QueryData to SBS data"""
140    return {
141        'serverId': _optional_to_sbs(query.server_id),
142        'ids': _optional_to_sbs(query.event_ids, lambda ids: [
143            _event_id_to_sbs(i) for i in ids]),
144        'types': _optional_to_sbs(query.event_types, lambda ets: [
145            list(et) for et in ets]),
146        'tFrom': _optional_to_sbs(query.t_from, timestamp_to_sbs),
147        'tTo': _optional_to_sbs(query.t_to, timestamp_to_sbs),
148        'sourceTFrom': _optional_to_sbs(query.source_t_from, timestamp_to_sbs),
149        'sourceTTo': _optional_to_sbs(query.source_t_to, timestamp_to_sbs),
150        'payload': _optional_to_sbs(query.payload, event_payload_to_sbs),
151        'order': {Order.DESCENDING: ('descending', None),
152                  Order.ASCENDING: ('ascending', None)}[query.order],
153        'orderBy': {OrderBy.TIMESTAMP: ('timestamp', None),
154                    OrderBy.SOURCE_TIMESTAMP: ('sourceTimestamp', None)
155                    }[query.order_by],
156        'uniqueType': query.unique_type,
157        'maxResults': _optional_to_sbs(query.max_results)}

Convert QueryData to SBS data

def query_from_sbs(data: ~Data) -> hat.event.common.QueryData:
160def query_from_sbs(data: sbs.Data) -> QueryData:
161    """Create new QueryData based on SBS data"""
162    return QueryData(
163        server_id=_optional_from_sbs(data['serverId']),
164        event_ids=_optional_from_sbs(data['ids'], lambda ids: [
165            _event_id_from_sbs(i) for i in ids]),
166        event_types=_optional_from_sbs(data['types'], lambda ets: [
167            tuple(et) for et in ets]),
168        t_from=_optional_from_sbs(data['tFrom'], timestamp_from_sbs),
169        t_to=_optional_from_sbs(data['tTo'], timestamp_from_sbs),
170        source_t_from=_optional_from_sbs(data['sourceTFrom'],
171                                         timestamp_from_sbs),
172        source_t_to=_optional_from_sbs(data['sourceTTo'], timestamp_from_sbs),
173        payload=_optional_from_sbs(data['payload'], event_payload_from_sbs),
174        order={'descending': Order.DESCENDING,
175               'ascending': Order.ASCENDING}[data['order'][0]],
176        order_by={'timestamp': OrderBy.TIMESTAMP,
177                  'sourceTimestamp': OrderBy.SOURCE_TIMESTAMP
178                  }[data['orderBy'][0]],
179        unique_type=data['uniqueType'],
180        max_results=_optional_from_sbs(data['maxResults']))

Create new QueryData based on SBS data

def event_payload_to_sbs(payload: hat.event.common.EventPayload) -> ~Data:
183def event_payload_to_sbs(payload: EventPayload) -> sbs.Data:
184    """Convert EventPayload to SBS data"""
185    if payload.type == EventPayloadType.BINARY:
186        return 'binary', payload.data
187
188    if payload.type == EventPayloadType.JSON:
189        return 'json', json.encode(payload.data)
190
191    if payload.type == EventPayloadType.SBS:
192        return 'sbs', _sbs_data_to_sbs(payload.data)
193
194    raise ValueError('unsupported payload type')

Convert EventPayload to SBS data

def event_payload_from_sbs(data: ~Data) -> hat.event.common.EventPayload:
197def event_payload_from_sbs(data: sbs.Data) -> EventPayload:
198    """Create new EventPayload based on SBS data"""
199    data_type, data_data = data
200
201    if data_type == 'binary':
202        return EventPayload(type=EventPayloadType.BINARY,
203                            data=data_data)
204
205    if data_type == 'json':
206        return EventPayload(type=EventPayloadType.JSON,
207                            data=json.decode(data_data))
208
209    if data_type == 'sbs':
210        return EventPayload(type=EventPayloadType.SBS,
211                            data=_sbs_data_from_sbs(data_data))
212
213    raise ValueError('unsupported payload type')

Create new EventPayload based on SBS data

class Timestamp(typing.NamedTuple):
 9class Timestamp(typing.NamedTuple):
10    s: int
11    """seconds since 1970-01-01 (can be negative)"""
12    us: int
13    """microseconds added to timestamp seconds in range [0, 1e6)"""
14
15    def __lt__(self, other):
16        if not isinstance(other, Timestamp):
17            return NotImplemented
18        return self.s * 1000000 + self.us < other.s * 1000000 + other.us
19
20    def __gt__(self, other):
21        if not isinstance(other, Timestamp):
22            return NotImplemented
23        return self.s * 1000000 + self.us > other.s * 1000000 + other.us
24
25    def __eq__(self, other):
26        if not isinstance(other, Timestamp):
27            return NotImplemented
28        return self.s * 1000000 + self.us == other.s * 1000000 + other.us
29
30    def __ne__(self, other):
31        return not self == other
32
33    def __le__(self, other):
34        return self < other or self == other
35
36    def __ge__(self, other):
37        return self > other or self == other
38
39    def __hash__(self):
40        return self.s * 1000000 + self.us
41
42    def add(self, s: float) -> 'Timestamp':
43        """Create new timestamp by adding seconds to existing timestamp"""
44        us = self.us + round((s - int(s)) * 1e6)
45        s = self.s + int(s)
46        return Timestamp(s=s + us // int(1e6),
47                         us=us % int(1e6))

Timestamp(s, us)

Timestamp(s: int, us: int)

Create new instance of Timestamp(s, us)

s: int

seconds since 1970-01-01 (can be negative)

us: int

microseconds added to timestamp seconds in range [0, 1e6)

def add(self, s: float) -> hat.event.common.Timestamp:
42    def add(self, s: float) -> 'Timestamp':
43        """Create new timestamp by adding seconds to existing timestamp"""
44        us = self.us + round((s - int(s)) * 1e6)
45        s = self.s + int(s)
46        return Timestamp(s=s + us // int(1e6),
47                         us=us % int(1e6))

Create new timestamp by adding seconds to existing timestamp

Inherited Members
builtins.tuple
index
count
def now() -> hat.event.common.Timestamp:
50def now() -> Timestamp:
51    """Create new timestamp representing current time"""
52    return timestamp_from_datetime(
53        datetime.datetime.now(datetime.timezone.utc))

Create new timestamp representing current time

def timestamp_to_bytes(t: hat.event.common.Timestamp) -> bytes:
56def timestamp_to_bytes(t: Timestamp) -> bytes:
57    """Convert timestamp to 12 byte representation
58
59    Bytes [0, 8] are big endian unsigned `Timestamp.s` + 2^63 and
60    bytes [9, 12] are big endian unsigned `Timestamp.us`.
61
62    """
63    return struct.pack(">QI", t.s + (1 << 63), t.us)

Convert timestamp to 12 byte representation

Bytes [0, 8] are big endian unsigned Timestamp.s + 2^63 and bytes [9, 12] are big endian unsigned Timestamp.us.

def timestamp_from_bytes(data: bytes) -> hat.event.common.Timestamp:
66def timestamp_from_bytes(data: bytes) -> Timestamp:
67    """Create new timestamp from 12 byte representation
68
69    Bytes representation is same as defined for `timestamp_to_bytes` function.
70
71    """
72    s, us = struct.unpack(">QI", data)
73    return Timestamp(s - (1 << 63), us)

Create new timestamp from 12 byte representation

Bytes representation is same as defined for timestamp_to_bytes function.

def timestamp_to_float(t: hat.event.common.Timestamp) -> float:
76def timestamp_to_float(t: Timestamp) -> float:
77    """Convert timestamp to floating number of seconds since 1970-01-01 UTC
78
79    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.
80
81    """
82    return t.s + t.us * 1E-6

Convert timestamp to floating number of seconds since 1970-01-01 UTC

For precise serialization see timestamp_to_bytes/timestamp_from_bytes.

def timestamp_from_float(ts: float) -> hat.event.common.Timestamp:
85def timestamp_from_float(ts: float) -> Timestamp:
86    """Create timestamp from floating number of seconds since 1970-01-01 UTC
87
88    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.
89
90    """
91    s = int(ts)
92    if ts < 0:
93        s = s - 1
94    us = round((ts - s) * 1E6)
95    if us == 1000000:
96        return Timestamp(s + 1, 0)
97    else:
98        return Timestamp(s, us)

Create timestamp from floating number of seconds since 1970-01-01 UTC

For precise serialization see timestamp_to_bytes/timestamp_from_bytes.

def timestamp_to_datetime(t: hat.event.common.Timestamp) -> datetime.datetime:
101def timestamp_to_datetime(t: Timestamp) -> datetime.datetime:
102    """Convert timestamp to datetime (representing utc time)
103
104    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.
105
106    """
107    try:
108        dt_from_s = datetime.datetime.fromtimestamp(t.s, datetime.timezone.utc)
109    except OSError:
110        dt_from_s = (
111            datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) +
112            datetime.timedelta(seconds=t.s))
113    return datetime.datetime(
114        year=dt_from_s.year,
115        month=dt_from_s.month,
116        day=dt_from_s.day,
117        hour=dt_from_s.hour,
118        minute=dt_from_s.minute,
119        second=dt_from_s.second,
120        microsecond=t.us,
121        tzinfo=datetime.timezone.utc)

Convert timestamp to datetime (representing utc time)

For precise serialization see timestamp_to_bytes/timestamp_from_bytes.

def timestamp_from_datetime(dt: datetime.datetime) -> hat.event.common.Timestamp:
124def timestamp_from_datetime(dt: datetime.datetime) -> Timestamp:
125    """Create new timestamp from datetime
126
127    If `tzinfo` is not set, it is assumed that provided datetime represents
128    utc time.
129
130    For precise serialization see `timestamp_to_bytes`/`timestamp_from_bytes`.
131
132    """
133    if not dt.tzinfo:
134        dt = dt.replace(tzinfo=datetime.timezone.utc)
135    s = int(dt.timestamp())
136    if dt.timestamp() < 0:
137        s = s - 1
138    return Timestamp(s=s, us=dt.microsecond)

Create new timestamp from datetime

If tzinfo is not set, it is assumed that provided datetime represents utc time.

For precise serialization see timestamp_to_bytes/timestamp_from_bytes.

def timestamp_to_sbs(t: hat.event.common.Timestamp) -> ~Data:
141def timestamp_to_sbs(t: Timestamp) -> sbs.Data:
142    """Convert timestamp to SBS data"""
143    return {'s': t.s, 'us': t.us}

Convert timestamp to SBS data

def timestamp_from_sbs(data: ~Data) -> hat.event.common.Timestamp:
146def timestamp_from_sbs(data: sbs.Data) -> Timestamp:
147    """Create new timestamp from SBS data"""
148    return Timestamp(s=data['s'], us=data['us'])

Create new timestamp from SBS data

def matches_query_type(event_type: Tuple[str, ...], query_type: Tuple[str, ...]) -> bool:
19def matches_query_type(event_type: EventType,
20                       query_type: EventType
21                       ) -> bool:
22    """Determine if event type matches query type
23
24    Event type is tested if it matches query type according to the following
25    rules:
26
27        * Matching is performed on subtypes in increasing order.
28        * Event type is a match only if all its subtypes are matched by
29          corresponding query subtypes.
30        * Matching is finished when all query subtypes are exhausted.
31        * Query subtype '?' matches exactly one event subtype of any value.
32          The subtype must exist.
33        * Query subtype '*' matches 0 or more event subtypes of any value. It
34          must be the last query subtype.
35        * All other values of query subtype match exactly one event subtype
36          of the same value.
37        * Query type without subtypes is matched only by event type with no
38          subtypes.
39
40    As a consequence of aforementioned matching rules, event subtypes '*' and
41    '?' cannot be directly matched and it is advisable not to use them in event
42    types.
43
44    """
45    is_variable = bool(query_type and query_type[-1] == '*')
46    if is_variable:
47        query_type = query_type[:-1]
48
49    if len(event_type) < len(query_type):
50        return False
51
52    if len(event_type) > len(query_type) and not is_variable:
53        return False
54
55    for i, j in zip(event_type, query_type):
56        if j != '?' and i != j:
57            return False
58
59    return True

Determine if event type matches query type

Event type is tested if it matches query type according to the following rules:

* Matching is performed on subtypes in increasing order.
* Event type is a match only if all its subtypes are matched by
  corresponding query subtypes.
* Matching is finished when all query subtypes are exhausted.
* Query subtype '?' matches exactly one event subtype of any value.
  The subtype must exist.
* Query subtype '*' matches 0 or more event subtypes of any value. It
  must be the last query subtype.
* All other values of query subtype match exactly one event subtype
  of the same value.
* Query type without subtypes is matched only by event type with no
  subtypes.

As a consequence of aforementioned matching rules, event subtypes '*' and '?' cannot be directly matched and it is advisable not to use them in event types.

class Subscription:

Subscription defined by query event types

def get_query_types(unknown):

Calculate sanitized query event types

def matches(unknown):

Does event_type match subscription

def union(unknown):

Create new subscription including event types from this and other subscriptions

def isdisjoint(unknown):

Return True if this subscription has no event types in common with other subscription