Event Server#

Event Server is a central component responsible for registering, processing, storing and providing access to events.

Configuration for Event Server is defined with schema hat-event://main.yaml#.

Running#

By installing Event Server from hat-event package, executable hat-event becomes available and can be used for starting this component.

usage: hat-event [-h] [--conf PATH]

options:
  -h, --help   show this help message and exit
  --conf PATH  configuration defined by hat-event://main.yaml# (default
               $XDG_CONFIG_HOME/hat/event.{yaml|yml|json})

Event Server can be run as standalone component or component connecting to Monitor Server.

Event#

Event is a generic data structure used for communication and storage of relevant changes happening through time in a Hat system. For example, event can represent a data change triggered by a device outside of the system, or an inner notification created by a component inside the system. Each event is immutable and uniquely identified by its event id. Event Server is the only component responsible for creating events - all other components shall request Event Server to create a new event.

Event data structure:

  • id

    A unique Event identifier always set by the server. It contains:

    • server

      Event Server unique identifier (defined in configuration as server_id).

    • session

      Unique identifier of event processing session (unique only on the server that creates it). It is sequential integer that starts from 1. Session of value 0 is never assigned to an event - it is reserved for identifier of non-existing session preceding first available session.

    • instance

      Unique identifier of an Event instance (unique only for the session and server that created it). It is sequential integer that starts from 1. Instance of value 0 is never assigned to an event, it is reserved for identifier of non-existing instance preceding first available instance in associated session.

  • type

    Type is a user (client) defined list of strings. Semantics of the list’s elements and their position in the list is determined by the user and is not predefined by the Event Server. This list should be used as the main identifier of the occurred event’s type. Each component registering an event should have its own naming convention defined which does not collide with other components’ naming conventions. This property is set by the user while registering an event. Subtypes ? and * are not allowed as parts of event type.

    When used in querying and subscription, this property has additional semantics. Any string in the list can be replaced with ? while the last string can also be replaced with *. Replacements must substitute an entire string in the list. The semantics of these replacements are:

    • The string ?

      is matched with a single arbitrary string.

    • The string *

      is matched with any number (zero or more) of arbitrary strings.

  • timestamp

    This property determines the moment when the event was registered on the server. It is always set by the server (that is by Engine).

  • source timestamp

    This property is optional. It represents the moment the Event occurred as detected by the source of the Event. It is always set by the Eventer Client (in the remainder of this article mostly referenced only as client).

  • payload

    This property is optional. It can be used to provide additional data bound to an event. The payload property is always set by the client registering the event. Its contents are determined by the Event’s type and can be decoded by clients who understand Events of that type. Payload can be encoded as binary data, JSON data or SBS encoded data. Event Server doesn’t decode payload while receiving event requests, storing events or providing query results. Payload can be optionally decoded by Event Server’s modules.

Eventer communication#

Communication between Eventer Server and Eventer Client (in the remainder of this article, mostly referred only as server and client) is based on Chatter communication, defined in HatEventer SBS module (see Eventer Chatter messages) with the following messages:

Message

Conversation

Direction

First

Last

Token

MsgSubscribe

T

T

T

c > s

MsgNotify

T

T

T

s > c

MsgRegisterReq

T

T/F

T

c > s

MsgRegisterRes

F

T

T

s > c

MsgQueryReq

T

F

T

c > s

MsgQueryRes

F

T

T

s > c

Actions available to clients which are directly mapped to exchange of communication messages:

  • subscribe

    A client can, at any time, subscribe to events of certain types by sending a subscribe message (MsgSubscribe) to the server. After server receives subscribe message, it will spontaneously notify the client (by sending MsgNotify) whenever an event occurs with the type matched to any type in the subscription message. Matching is done as described in event’s type property including the ? and * options. A client can send as many subscribe messages as it wants, each new subscription message implicitly invalidates previous subscriptions. Initially, after new connection between server and client is established, client isn’t subscribed to any events. Both subscribe and notify messages can be sent at any time independently of other communication messages. Events that are notified by single MsgNotify are part of same event processing session.

  • register event

    A client can, at any time, send new request for event registration. Those register requests are sent as part of MsgRegisterReq message. Single MsgRegisterReq may contain an arbitrary number of registration requests which are all registered at the same time (as part of single processing session). Single register event contains event type; and optional source timestamp and payload. Upon receiving MsgRegisterReq, it is responsibility of a server to create new event for each register event. All events created based on a single MsgRegisterReq have the same timestamp and same session identifier. If a client doesn’t end Chatter conversation (MsgRegisterReq last flag is false), once associated events are created, server will respond with MsgRegisterRes and end conversation. For each register event in MsgRegisterReq, associated MsgRegisterRes contains newly created event, or information about event registration failure.

  • query events

    At any time, client can initiate new event query by sending MsgQueryReq message. Upon receiving query request, server will provide all available events that match query criteria as part of single MsgQueryRes. Single query request can contain multiple filter conditions which ALL must be met for all events provided to client as query result. Query request contains:

    • server_id - optional filter condition

      If set, only events with event_id.server equal to server_id are matched.

    • event_ids - optional filter condition

      If set, only events with ids which are defined as part of filter condition are matched.

    • event_types - optional filter condition

      List of event types. If set, event type has to match at least one type from the list. Matching is done as defined in event’s type property description - including the ? and * options.

    • t_from - from timestamp, optional filter condition

      If set, only events with timestamp greater than or equal are matched.

    • t_to - to timestamp, optional filter condition

      If set, only events with timestamp lower than or equal are matched.

    • source_t_from - from source timestamp, optional filter condition

      If set, only events with source timestamp defined, and greater than or equal, are matched.

    • source_t_to - to source timestamp, optional filter condition

      If set, only events with source timestamp defined, and lower than or equal, are matched.

    • payload - optional filter condition

      If set, only events with payload defined and whose payload is the same as the query’s payload are matched.

    • order

      Can be set to ascending or descending. If set to ascending, matched events will be returned ordered from the earliest to the latest dependent on their timestamp or source timestamp (this choice is determined by the order by property of the query). Earliest meaning lower timestamp, latest meaning greater timestamp. If set to descending the same logic applies, but the order is reversed.

    • order_by

      Can be set to timestamp or source timestamp. Ordering events by source timestamp has events with source timestamp undefined returned last in an arbitrary order.

    • unique_type

      If set to true, it determines whether the matched events will contain only one event instance of the same type. With the query order set to descending, only one event with the greatest timestamp or source timestamp will be matched. Setting the order to ascending will match the event with the lowest timestamp or source timestamp.

    • max_results

      If set, limits the number of matched events to this number. Matched events are dependent on the query order the same way as in unique_type.

Redundancy#

To run multiple redundant Event Servers, monitor parameter has to be defined in the configuration. Event Server connects to configured Monitor Server and creates a corresponding Monitor Component. Besides execution of redundancy algorithm, Monitor Server is also used for service discovery. Each Monitor Component is described with its info, that is Component Information. One of the properties of component info is data, through which a component can share some arbitrary information with any other component. Event Server uses data (specified by hat-event://monitor_data.yaml#) to share the following information:

  • eventer_server_address

    Eventer Server address is needed to any component that uses Eventer Client to interact with currently active Event Server by events.

  • syncer_server_address

    Syncer Server address is needed to any component that uses Syncer Client to synchronize with currently active Event Server.

  • server_id

    Event Server unique identifier (defined in configuration as server_id). Needed to Syncer Client in order to request synchronization with the specific server.

  • syncer_token (optional property)

    This token string is used as a key for synchronization between servers. Event Server will request synchronization with a remote Event Server only if their syncer_token are identical (for more details see Syncer Client). If this token is not defined, synchronization is performed irrespective.

Syncer communication#

Communication between Syncer Server and Syncer Client, based on Chatter communication, is aimed primarily for events synchronization between two, or more redundant Event Servers. Thus, each Event Server implements both server and client side of this communication. Generally, this communication is made not to be exclusively used within an Event Server: Syncer Client can be used to establish synchronization of any remote instance (e.g. an arbitrary database) with an Event Server.

Messages used in Syncer communication, defined in HatSyncer SBS module (see Syncer Chatter messages), are the following:

Message

Conversation

Direction

First

Last

Token

MsgReq

T

T

T

c > s

MsgSynced

T

T

T

s > c

MsgEvents

T

T

T

s > c

Communication between Syncer Client and server starts with synchronization request initiated by the client with MsgReq message. The connection between server and client is considered established when MsgReq message is received on the server side. With this message, client requests for server events in order to get “synchronized”. More specifically, by MsgReq client represents itself by clientName and requests all the events that have EventId such that session is same or greater and instance is greater (ie. newer) than lastEventId and server corresponds to Syncer Server’s Event Server.

Upon receiving MsgReq, server communicates back the requested events by MsgEvents message. Events are grouped in MsgEvents such that all events in the message are from the same session (session of EventId). Once all of these events are sent, server sends MsgSynced message in order to signalize the client that synchronization phase ended.

All events created (by the Event Server with running Syncer Server) after the MsgReq (and before MsgSynced) are not included in the synchronization phase. They are buffered on the server side and sent to client only after the MsgSynced message, using the same MsgEvents message. In the remaining life of the connection between Syncer Server and client, only MsgEvents messages are sent from server to a client in order to notify client with the new events. All events in one MsgEvents message always belong to the same session.

Todo

add subscription to MsgReq.

Components#

Event Server functionality can be defined by using the following components:

Eventer Client#

Eventer Client is a component that provides client functionality in Eventer communication. Package hat-event provides python implementation of hat.event.eventer_client module which can be used as a basis for communication with Eventer Server. This module provides low-level and high-level communication API. For more detail see documentation of hat.event.eventer_client module.

Eventer Server#

Eventer Server module is responsible for providing implementation of server side of Eventer communication. This component translates client requests to Engine’s method calls. At the same time, it observes all new event notifications made by Engine and notifies clients with appropriate messages.

RegisterEvent objects obtained from client’s register requests are forwarded to Engine which converts them to Event and submits to further processing.

A unique identifier is assigned to each chatter connection established with Eventer Server (unique for the single execution lifetime of Event Server process). This identifier is associated with all Event`s obtained from the corresponding connection as through `id of Source (source type is EVENTER).

Each Client makes its subscription to Eventer Server by its last subscription message MsgSubscribe. This subscription is used as a filter for selecting subset of event notifications which are sent to associated connection.

Eventer Server module is responsible for registering events each time new chatter connection is established and each time existing chatter connection is closed. These events are defined by unique event type:

  • ‘event’, ‘eventer’

    • source_timestamp is None

    • payload is specified by hat-event://main.yaml#/definitions/events/eventer.

This events are registered through Engine with Source.type EVENTER.

Syncer Client#

Syncer Client module is responsible for providing implementation of client side of Syncer communication. It is instanced with reference to Backends instance and a Monitor Client instance from (see Monitor Server).

Event Server uses Syncer Client to synchronize with a remote Event Server. In case when Event Server is run without Monitor Server (without monitor in configuration), Syncer Client is not run: no other Event Servers are expected, so there is no need for synchronization.

Syncer Client uses Monitor Client in order to discover remote Event Servers, that is their Syncer Servers. It registers for notifications made by Monitor Client once global state changes. Once it gets ComponentInfo, with same group as its own (correspond to remote Event Server), it is interested in its data property in order to get syncer_token, server_id and syncer_server_address (see here). First it checks that syncer_token of the remote server matches with its own syncer_token (defined in configuration). This mechanism can be used, for example, to check if remote server node has matching system configuration. If they doesn’t match, client ignores that component and doesn’t connect to its Syncer Server. Otherwise, client uses syncer_server_address to connect to the remote Syncer Server. Once it connects, client immediately sends MsgReq in order to request synchronization. The main reference for synchronization of events is lastEventId, that it gets from backend using server_id obtained from Monitor Client.

On all the MsgEvents messages received from the Syncer Server, client transforms messages to corresponding Events and registers them to backend (see Syncer communication for more details about messages exchanged with server).

In case client discovered (through Monitor Client) multiple Syncer Servers, it will connect to all of them and try to synchronize with all of them independently.

Syncer Client is responsible for registering events in regard to synchronization status of all active Syncer Server connections. This event is defined with unique event type:

  • ‘event’, ‘syncer’, ‘client’, <server_id>, ‘synced’

    • source_timestamp is None

    • payload is specified by hat-event://main.yaml#/definitions/events/syncer/client.

More precisely, this event is not registered by the Syncer Client itself, but Event Server registers it on behalf of Syncer Client, with Source.type set to SYNCER (for the sake of simplicity hereafter we consider as Syncer Client registers this event).

Payload of this event is boolean flag that represents current synchronization state. When client receives remote events prior to receiving MsgSynced message (see Syncer communication), it will register new synced event with payload False. When client receives MsgSynced, it will register new synced event with payload True. If no new events are received prior to receiving MsgSynced, synced True event is registered without prior registration of associated synced False event.

Syncer Server#

Syncer Server module is responsible for providing implementation of server side of Syncer communication. It is in charge of synchronizing arbitrary number of clients connected to it, with all the events from its Event Server.

Server is instanced with backend instance. It registers to backend notifications of newly stored events. Server also retrieves events from backend, needed for a client synchronization, by its query method query_from_event_id, specialized for Syncer Server usage.

Syncer Server is responsible for registering event in regard to connection status of all active Syncer Client. This event represents current Syncer Server state and should be registered immediately after Engine initialization and each time this state changes. Event is defined with unique event type:

  • ‘event’, ‘syncer’, ‘server’

    • source_timestamp is None

    • payload is specified by hat-event://main.yaml#/definitions/events/syncer/server.

More precisely, this event is not registered by the Syncer Server itself, but Event Server registers it on behalf of Syncer Server, with Source.type set to SYNCER (for the sake of simplicity hereafter we consider as Syncer Server registers this event).

Payload of this event contains list of all currently active client connections. Once connection between server and client is broken, it will no longer be part of future event’s payloads. New entry to list of active connections is added once server receives MsgReq from client. Each client connection is represented with:

  • client_name

    Name of the Syncer Client defined as clientName in MsgReq message (see Syncer communication)

  • synced

    Status flag which is initially set to False. Once server has sent all the events that client requested to get “synchronized”, it changes connection’s synced status to True.

Multiple clients can be connected to a server, where each connection is handled independently, as described.

Engine#

Engine is responsible for creating Modules and coordinating event registration, executing processing sessions and backend (see Backends).

Engine’s method register enables registration of events to backend based on a list of register events. Entity who requests registration is identified by Source identifier which is used only during processing of events and is discarded once event is created. There are three types of sources that may register events on Engine:

By creating event, Engine enhances register event with

  • event_id: unique event identifier,

  • timestamp: single point in time when events are registered. All events registered in a single session get the same timestamp.

Process of creating events based on a single list of register events provided through register is called session. Engine starts new session each time Eventer Server, Syncer Server or a module requests new registration through register method. After processing session ends, events are registered with backend. Start and end of each session is notified to each module by calling module’s on_session_start and on_session_stop methods respectively. These methods are called sequentially for each module: only when method of one module ended, method of the other module can be called.

During session processing, each module is notified with a newly created event in case it matches its subscription. By subscription, each module defines event type filter condition used for filtering new events that will get notified to the module. Processing of this event by module can result with new register events. Out of register events, Engine creates events and adds them to the current session. All modules, including the one that added that new events, are notified with new additions. This process continues iteratively until all modules return empty lists of new register events. Processing events by single module is always sequential - Engine keeps order of new events added to session so that new events are always added to the end of the queue for module notification.

Warning

Care should be taken by module implementation not to cause self recursive or mutually recursive endless processing loop.

Engine registers an event that signalizes when the Engine was started or stopped. Event is registered by Engine, with Source.Type set to ENGINE and with the following event type:

* 'event', 'engine'

    * `source timestamp` - None

    * `payload`  is specified by
      ``hat-event://main.yaml#/definitions/events/engine``.

Modules#

Warning

Event Server does not provide sandbox environment for loading end executing modules. Modules have full access to Event Server functionality which is controlled with module execution. Module implementation and configuration should be written in accordance to other modules and Event Server as a whole, keeping in mind processing execution time overhead and possible interference between modules.

Each module represents predefined and configurable closely related functions that can modify the process of registering new events or initiate new event registration sessions. When created, module is provided with reference to Engine which can be used for registering and querying events and its own unique source identification (used in case of registering new events).

Implementation of specific module can be maintained independently of Event Server implementation. Location of Python module implementing Event Server’s module is defined by Event Server’s configuration.

Backends#

Backend is wrapper for storing and retrieving events from specialized storage engines.

Backends available as part of hat-event package:

Other backend implementations can be maintained independently of Event Server implementation. Location of Python module implementing backend is defined by Event Server’s configuration.

Implementation#

Documentation is available as part of generated API reference:

Eventer Chatter messages#

module HatEventer

MsgSubscribe = Array(EventType)

MsgNotify = Array(Event)

MsgRegisterReq = Array(RegisterEvent)

MsgRegisterRes = Array(Choice {
    event:    Event
    failure:  None
})

MsgQueryReq = QueryData

MsgQueryRes = Array(Event)

Timestamp = Record {
    s:   Integer
    us:  Integer
}

EventId = Record {
    server:    Integer
    session:   Integer
    instance:  Integer
}

Order = Choice {
    descending:  None
    ascending:   None
}

OrderBy = Choice {
    timestamp:        None
    sourceTimestamp:  None
}

EventType = Array(String)

EventPayload = Choice {
    binary:  Bytes
    json:    String
    sbs:     Hat.Data
}

Event = Record {
    id:               EventId
    type:             EventType
    timestamp:        Timestamp
    sourceTimestamp:  Optional(Timestamp)
    payload:          Optional(EventPayload)
}

RegisterEvent = Record {
    type:             EventType
    sourceTimestamp:  Optional(Timestamp)
    payload:          Optional(EventPayload)
}

QueryData = Record {
    serverId:           Optional(Integer)
    ids:                Optional(Array(EventId))
    types:              Optional(Array(EventType))
    tFrom:              Optional(Timestamp)
    tTo:                Optional(Timestamp)
    sourceTFrom:        Optional(Timestamp)
    sourceTTo:          Optional(Timestamp)
    payload:            Optional(EventPayload)
    order:              Order
    orderBy:            OrderBy
    uniqueType:         Boolean
    maxResults:         Optional(Integer)
}

Syncer Chatter messages#

module HatSyncer

MsgReq = Record {
    lastEventId:  HatEventer.EventId
    clientName:   String
}

MsgSynced = None

MsgEvents = Array(HatEventer.Event)

MsgFlushReq = None

MsgFlushRes = None

Configuration#

---
"$schema": "http://json-schema.org/schema#"
id: "hat-event://main.yaml#"
title: Event Server
description: Event Server's configuration
type: object
required:
    - type
    - log
    - backend
    - engine
    - eventer_server
    - syncer_server
properties:
    type:
        const: event
        description: configuration type identification
    version:
        type: string
        description: component version
    syncer_token:
        type: string
        description: |
            match of event servers' syncer_token is necessery condition for 
            their synchronization
    monitor:
        "$ref": "hat-monitor://client.yaml#"
    log:
        "$ref": "hat-json://logging.yaml#"
    backend:
        "$ref": "hat-event://main.yaml#/definitions/backend"
    engine:
        "$ref": "hat-event://main.yaml#/definitions/engine"
    eventer_server:
        "$ref": "hat-event://main.yaml#/definitions/eventer_server"
    syncer_server:
        "$ref": "hat-event://main.yaml#/definitions/syncer_server"
definitions:
    backend:
        type: object
        description: |
            structure of backend configuration depends on backend type
        required:
            - module
        properties:
            module:
                type: string
                description: |
                    full python module name that implements backend
    module:
        type: object
        description: |
            structure of module configuration depends on module type
        required:
            - module
        properties:
            module:
                type: string
                description: |
                    full python module name that implements module
    engine:
        type: object
        required:
            - server_id
            - modules
        properties:
            server_id:
                type: integer
            modules:
                type: array
                items:
                    "$ref": "hat-event://main.yaml#/definitions/module"
    eventer_server:
        type: object
        required:
            - address
        properties:
            address:
                type: string
                default: "tcp+sbs://localhost:23012"
    syncer_server:
        type: object
        required:
            - address
        properties:
            address:
                type: string
                default: "tcp+sbs://localhost:23013"
    events:
        eventer:
            enum:
                - CONNECTED
                - DISCONNECTED
        syncer:
            server:
                type: array
                items:
                    type: object
                    required:
                        - client_name
                        - synced
                    properties:
                        client_name:
                            type: string
                        synced:
                            type: boolean
            client:
                type: boolean
        engine:
            enum:
                - STARTED
                - STOPPED
...