Extras

If you wish to extend the functionality of smok-client somehow, here’s how you can do it.

Most options are, for this time, geared towards extended work in an Internet-less environment.

Note that if documentation says that client threads can invoke these routines, you should make them as threadsafe as feasible for you.

Predicate database

Implement a class with this interface:

class smok.extras.BasePredicateDatabase

One predicate is a dict obtained from it’s to_kwargs().

All of these methods are called by CommunicatorThread, and get_all_predicates() by SMOKDevice during initialization.

abstract get_all_predicates() List[Dict]

Return all predicates

abstract set_new_predicates(v: List[Dict])

Called upon a successful predicate synchronization from the server

abstract update_predicate(v: Dict) None

A single predicate has been changed.

Note when this is changed there can be no doubt that the predicate has changed. This is verified by CommunicatorThread.

If you need a quick pickling solution, use:

class smok.extras.PicklingPredicateDatabase(path: str)
get_all_predicates() List[Dict]

Return all predicates

set_new_predicates(v: List[Dict])

Called upon a successful predicate synchronization from the server

update_predicate(v: Dict) None

A single predicate has been changed.

Note when this is changed there can be no doubt that the predicate has changed. This is verified by CommunicatorThread.

Sensor writes database

In a SCADA system it is very important to know who’s been changing what. SMOK allows you to register sensor writes to send to the cloud. However, if you would like them to persist a reboot, override this:

class smok.extras.BaseSensorWriteSynchronization(events: List[SensorWriteEvent], swdb: BaseSensorWriteDatabase)

At most a single instance of this object will be kept alive.

Must be boolable - return False if this is empty

class smok.extras.BaseSensorWriteDatabase
abstract add_sw(event: SensorWriteEvent)

Add a new event to synchronize

abstract get_sw_sync() BaseSensorWriteSynchronization

Get a synchronization

This will never be called if dont_do_pathpoints is True

on_sync_sw_failed(event: SensorWriteEvent)

Called when given event fails to sync.

This will never be called if dont_do_pathpoints is True

on_synced_sw(event: SensorWriteEvent)

Called when given event becomes synced

This will never be called if dont_do_pathpoints is True

If you need something which persists the data, but isn’t too fast itself, try:

class smok.extras.PicklingSensorWriteDatabase(pickle_addr: str)
add_sw(event: SensorWriteEvent)

Add a new event to synchronize

on_synced_sw(event: SensorWriteEvent)

Called when given event becomes synced

This will never be called if dont_do_pathpoints is True

Archive database

Lists of pathpoints to be archived also need to persist in case of restart. You need to implement the following:

class smok.extras.BaseArchivesDatabase

A base class for databases with archiving instructions.

abstract get_archiving_instructions() Dict[int, List[str]]

Return currently stored archiving instructions, or an empty dict if none available.

Called by ArchivingAndMacroThread

Returns

a dictionary of (archiving_interval::int => list of pathpoints to archive)

abstract on_archiving_data_sync(new_data: Dict[int, List[str]]) None

Called on new archiving data instructions.

Called by ArchivingAndMacroThread

Parameters

new_data – a dictionary of (archiving_interval::int => list of pathpoints to archive)

In case you want a fast solution, there’s also

class smok.extras.PicklingArchivesDatabase(path: str)

An archiving database basing off pickling things on disk

Parameters

path – path to the pickle. If does not exist, will be created

get_archiving_instructions() dict

Return currently stored archiving instructions, or an empty dict if none available.

Called by ArchivingAndMacroThread

Returns

a dictionary of (archiving_interval::int => list of pathpoints to archive)

on_archiving_data_sync(new_data) None

Called on new archiving data instructions.

Called by ArchivingAndMacroThread

Parameters

new_data – a dictionary of (archiving_interval::int => list of pathpoints to archive)

BAOB storage

BAOBs are essentially similar to metadata, except for being versioned monotonically and being composed of bytes instead of characters.

If you want your BAOBs to persist restarts, feel free to implement following class:

class smok.extras.BaseBAOBDatabase
check_consistency() None

Check that for all keys that are returned by get_all_keys version can be obtained via get_baob_version.

If not, delete such a key and log an ERROR.

Called by constructor of SMOKDevice

abstract delete_baob(key: str) None

Delete particular BAOB

Raises

KeyError – BAOB does not exist

abstract get_all_keys() Iterator[str]

Stream all keys of BAOBs stored locally

abstract get_baob_value(key: str) bytes

Return a value of a particular BAOB

Raises

KeyError – BAOB does not exist

abstract get_baob_version(key: str) int

Return a version of particular BAOB

Raises

KeyError – BAOB does not exist

abstract set_baob_value(key: str, data: bytes, version: int)

Store a particular version of a BAOB within the database

class smok.extras.BAOBDigest(key: str, version: int)

A descriptor of a BAOB

Variables
  • key – key of this BAOB (str)

  • version – version of this BAOB (int)

If you want a quick database that persists restarts, take a look at:

class smok.extras.PicklingBAOBDatabase(path: str)
Parameters

path – path that has to be a directory where BAOB data will be stored. If that directory does not exist, it will be created

delete_baob(key: str) None

Delete particular BAOB

Raises

KeyError – BAOB does not exist

get_all_keys()

Stream all keys of BAOBs stored locally

get_baob_value(key: str) bytes

Return a value of a particular BAOB

Raises

KeyError – BAOB does not exist

get_baob_version(key: str) int

Return a version of particular BAOB

Raises

KeyError – BAOB does not exist

set_baob_value(key: str, data: bytes, version: int)

Store a particular version of a BAOB within the database

Sensor storage

If you wish to persist your sensor definition across restarts, feel free to implement the following:

class smok.extras.BaseSensorDatabase
abstract get_all_sensors() Iterator[Sensor]

Return all sensors stored in the database.

Called by user threads and possibly predicates (so also CommunicatorThread).

abstract get_sensor(fqts: str) Sensor

Return a sensor

Called by user threads and possibly predicates (so also CommunicatorThread)

Parameters

fqts – fqts for target sensor, always in canonical form

Raises

KeyError – sensor not defined

on_register(device: SMOKDevice) None

Called by SMOKDevice upon registering this database.

Called by SMOKDevice’s constructor.

abstract on_sensors_sync(sensors: List[Sensor])

Sensors have just been synchronized, this is the entire list.

Called by communicator threads. Will never be called if dont_do_pathpoints is enabled.

If you need quickly a persisting pathpoint database, try

class smok.extras.PicklingSensorDatabase(path: str)

Pathpoint value storage

If you want to store your pathpoint values in a way that would survive restarts, just define both classes:

class smok.extras.BasePathpointDatabase

An abstract base class for pathpoint archives database.

Try to make it reasonably thread-safe. It is documented which call is called by which thread.

checkpoint() None

Called by the communicator thread, once every about 60 seconds. May be called much more often, it’s the function responsibility to throttle.

abstract get_archive_data(pathpoint: str, start: Optional[Union[int, float]], stop: Optional[Union[int, float]]) Iterator[Tuple[Union[int, float], Union[int, float, str, OperationFailedError]]]

Return some archival data for this pathpoint

Called by user threads.

Parameters
  • pathpoint – name of the pathpoint

  • start – start of the period, in milliseconds since Epoch, or the earliest timestamp that is registered if None

  • stop – stop of the period, in milliseconds since Epoch, or the latest timestamp that is registered if None

Returns

an iterator of two-tuple (timestamp in milliseconds, value or OperationReadError instance)

abstract get_current_value(pathpoint: str) Tuple[Union[int, float], Union[int, float, str]]

Get the current value for given pathpoint.

Called by user threads.

Parameters

pathpoint – name of the pathpoint

Returns

a tuple of timestamp, value

Raises
abstract get_data_to_sync() Optional[BaseDataToSynchronize]

At most one instance of BaseDataToSynchronize is guaranteed to exist at given time.

Called by communicator thread.

Returns

return data to synchronize, or None if nothing is to synchronize

abstract on_new_data(pathpoint: str, timestamp: Union[int, float], value_or_exception: Union[int, float, str, OperationFailedError]) None

Called whenever there’s a new value.

Called by order executor thread and user-threads if you make use of method set_new_value().

Parameters
  • pathpointNative pathpoint that has been written

  • timestamp – timestamp of the operation in milliseconds

  • value_or_exception – a value of the pathpoint or an OperationFailedError instance

class smok.extras.BaseDataToSynchronize
abstract acknowledge() None

Mark the data as successfully synchronized

abstract negative_acknowledge() None

Mark the data as failed to synchronize

abstract to_json() List

Return a JSON structure that looks like this (specification expressed in OpenAPI 3.0 format)

type: array
items:
    type: object
    properties:
        path:
            type: string
            description: Name of the pathpoint to serve
        values:
            type: array
            items:
                type: object
                description: This has either error_code or value
                properties:
                    timestamp:
                        type: integer
                        format: int64
                        description: Timestamp in milliseconds
                    error_code:
                        type: string
                        description: Reason of error code
                        enum:
                            - malformed
                            - timeout
                            - invalid
                    value:
                        type:
                            oneOf:
                                - integer
                                - string
                                - number
                        description: Value of the pathpoint
                required:
                    - timestamp

And create an instance of BasePathpointDatabase and feed it to argument of SMOKDevice.

If you need quickly a persisting pathpoint database, try

class smok.extras.PicklingPathpointDatabase(path: str)

An example pathpoint database that persists it’s data on disk.

It persists it’s store every checkpoint() call.

Parameters

path – path to file containing pickled data.

It does not however provide for archives.

Event storage

class smok.extras.BaseEventSynchronization
abstract acknowledge(*uuids: str) None

Called by the communicator, when sync succeeds

Parameters

uuids – UUIDs assigned to events

abstract get_events() List[Event]
Returns

a list of events to synchronize

negative_acknowledge() None

Called by the communicator, when sync fails

class smok.extras.BaseEventDatabase
abstract add_event(event: Event) None

Register a new event in the database.

Can be called by any thread.

Parameters

event – event to register

checkpoint() None

Called by the communicator thread, once every about 60 seconds. May be called much more often, it’s the function responsibility to throttle.

abstract clear_closed_and_synced_events() None

Clear all events that were both closed and are already on the server

abstract close_event(event: Event) None

Close provided event

Parameters

event – event to close

abstract get_all_events() Iterator[Event]
Returns

all events kept in the database

abstract get_cache(predicate_id: str) Any

Return predicate’s internal data

Raises

KeyError – predicate internal data not found

abstract get_events_to_sync() Optional[BaseEventSynchronization]

At most a single instance of BaseEventSynchronization will be alive at a time.

Returns

object to sync, or None if there’s nothing to sync.

abstract get_open_events() Iterator[Event]
Returns

an iterator with all open events

abstract on_predicate_deleted(predicate_id: str) None

Called when a predicate is deleted.

Called by communicator thread.

Parameters

predicate_id – ID of the predicate that was deleted

abstract set_cache(predicate_id: str, cache) None

Store predicate’s internal data. Do it in a way that will survive restarts.

If you need quickly a pickling database, try

class smok.extras.PicklingEventDatabase(cache_path: str, data_path: str)

A simple database that pickles events on disk each checkpoint()

Parameters
  • cache_path – path to cache with predicate’s data

  • data_path – path to cache with event data

add_event(event: Event) None

Register a new event in the database.

Can be called by any thread.

Parameters

event – event to register

checkpoint()

Called by the communicator thread, once every about 60 seconds. May be called much more often, it’s the function responsibility to throttle.

clear_closed_and_synced_events() None

Clear all events that were both closed and are already on the server

close_event(event: Event) None

Close provided event

Parameters

event – event to close

get_events_to_sync() Optional[PicklingEventSynchronization]

At most a single instance of BaseEventSynchronization will be alive at a time.

Returns

object to sync, or None if there’s nothing to sync.

If you don’t care about events, you can also use

class smok.extras.NullEventDatabase

For these use cases where you don’t use events at all.

add_event(event: Event) None

Register a new event in the database.

Can be called by any thread.

Parameters

event – event to register

checkpoint() None

Called by the communicator thread, once every about 60 seconds. May be called much more often, it’s the function responsibility to throttle.

clear_closed_and_synced_events() None

Clear all events that were both closed and are already on the server

close_event(event: Event) None

Close provided event

Parameters

event – event to close

get_all_events() Iterator[Event]
Returns

all events kept in the database

get_cache(predicate_id: str) Any

Return predicate’s internal data

Raises

KeyError – predicate internal data not found

get_events_to_sync() Optional[BaseEventSynchronization]

At most a single instance of BaseEventSynchronization will be alive at a time.

Returns

object to sync, or None if there’s nothing to sync.

get_open_events() Iterator[Event]
Returns

an iterator with all open events

on_predicate_deleted(predicate_id: str) None

Called when a predicate is deleted.

Called by communicator thread.

Parameters

predicate_id – ID of the predicate that was deleted

set_cache(predicate_id: str, cache) None

Store predicate’s internal data. Do it in a way that will survive restarts.

Macro storage

class smok.extras.BaseMacroDatabase
checkpoint() None

Called about each 60 seconds by communicator thread

abstract get_done_macros() Iterator[Tuple[str, int]]

Get a list of macros that were done, but the server wasn’t told about that yet.

Called by communicator thread

Returns

an iterator of a tuple (macro_id, executed timestamps)

abstract get_macros() List[Macro]

Get a list of macros to execute

abstract notify_macro_executed(macro_id: str, timestamp: int) None

Notify the DB that the device has executed a given macro.

This should update the database’s macro definition as well.

Called by archive & macro thread

abstract notify_macro_synced(macro_id: str, timestamp: int) None

Notify the DB that server has received notification about executing given macro.

Called by communicator thread

abstract set_macros(macros: List[Macro]) None

Set current list of macros to execute (have been not yet executed)

Parameters

macros – list of macro to execute

If you need quickly a pickling database, try

class smok.extras.PicklingMacroDatabase(path)

An example persistent macro database

Parameters

path – path to pickle file

get_done_macros() Iterator[Tuple[str, int]]

Get a list of macros that were done, but the server wasn’t told about that yet.

Called by communicator thread

Returns

an iterator of a tuple (macro_id, executed timestamps)

get_macros() List[Macro]

Get a list of macros to execute

notify_macro_executed(macro_id: str, timestamp: int) None

Notify the DB that the device has executed a given macro.

This should update the database’s macro definition as well.

Called by archive & macro thread

notify_macro_synced(macro_id: str, timestamp: int) None

Notify the DB that server has received notification about executing given macro.

Called by communicator thread

set_macros(macros: List[Macro]) None

Set current list of macros to execute (have been not yet executed)

Parameters

macros – list of macro to execute

Metadata store

class smok.extras.BaseMetadataDatabase

Base class for metadata databases

abstract delete_plain(key: str) None

Delete a particular key for plain database

No-op on key not being in the database.

Parameters

key – key to delete

abstract get_all_plain() Iterator[Tuple[str, str, float]]

Get all plain keys in the database

Returns

an iterator of tuple(key, value, timestamp of the write)

abstract get_plain(key: str) str

Return a value for given plain key

Parameters

key – key to give a value for

Raises

KeyError – key not found, ask the server

abstract put_plain(key: str, value: str, timestamp: Optional[float] = None) None

A plain value was just written.

Parameters
  • key – key of the write

  • value – value of the write

  • timestamp – timestamp of the write, default is current

abstract update_plain(key: str, value: str, timestamp: float) None

On an update plain request from the server

Parameters
  • key – key of the key

  • value – value of the key

  • timestamp – timestamp at which time it was last written

If you need quickly a pickling database, try

class smok.extras.PicklingMetadataDatabase(path: str)

Base class for metadata databases that pickles changes to disk after every change.

delete_plain(key: str) None

Delete a particular key for plain database

No-op on key not being in the database.

Parameters

key – key to delete

get_all_plain() Iterator[Tuple[str, str, float]]

Get all plain keys in the database

Returns

an iterator of tuple(key, value, timestamp of the write)

get_plain(key: str) str

Return a value for given plain key

Parameters

key – key to give a value for

Raises

KeyError – key not found, ask the server

put_plain(key: str, value: str, timestamp: Optional[float] = None) None

A plain value was just written.

Parameters
  • key – key of the write

  • value – value of the write

  • timestamp – timestamp of the write, default is current

update_plain(key: str, value: str, timestamp: float) None

On an update plain request from the server

Parameters
  • key – key of the key

  • value – value of the key

  • timestamp – timestamp at which time it was last written

Retrieving SMOK certificates