Extras¶
If you wish to extend the functionality of smok-client somehow, here’s how you can do it.
Most options are, for this time, geared towards extended work in an Internet-less environment.
Note that if documentation says that client threads can invoke these routines, you should make them as threadsafe as feasible for you.
Predicate database¶
Implement a class with this interface:
- class smok.extras.BasePredicateDatabase¶
One predicate is a dict obtained from it’s
to_kwargs()
.All of these methods are called by CommunicatorThread, and
get_all_predicates()
by SMOKDevice during initialization.- abstract get_all_predicates() List[Dict] ¶
Return all predicates
- abstract set_new_predicates(v: List[Dict])¶
Called upon a successful predicate synchronization from the server
- abstract update_predicate(v: Dict) None ¶
A single predicate has been changed.
Note when this is changed there can be no doubt that the predicate has changed. This is verified by CommunicatorThread.
If you need a quick pickling solution, use:
- class smok.extras.PicklingPredicateDatabase(path: str)¶
- get_all_predicates() List[Dict] ¶
Return all predicates
- set_new_predicates(v: List[Dict])¶
Called upon a successful predicate synchronization from the server
- update_predicate(v: Dict) None ¶
A single predicate has been changed.
Note when this is changed there can be no doubt that the predicate has changed. This is verified by CommunicatorThread.
Sensor writes database¶
In a SCADA system it is very important to know who’s been changing what. SMOK allows you to register sensor writes to send to the cloud. However, if you would like them to persist a reboot, override this:
- class smok.extras.BaseSensorWriteSynchronization(events: List[SensorWriteEvent], swdb: BaseSensorWriteDatabase)¶
At most a single instance of this object will be kept alive.
Must be boolable - return False if this is empty
- class smok.extras.BaseSensorWriteDatabase¶
- abstract add_sw(event: SensorWriteEvent)¶
Add a new event to synchronize
- abstract get_sw_sync() BaseSensorWriteSynchronization ¶
Get a synchronization
This will never be called if dont_do_pathpoints is True
- on_sync_sw_failed(event: SensorWriteEvent)¶
Called when given event fails to sync.
This will never be called if dont_do_pathpoints is True
- on_synced_sw(event: SensorWriteEvent)¶
Called when given event becomes synced
This will never be called if dont_do_pathpoints is True
If you need something which persists the data, but isn’t too fast itself, try:
- class smok.extras.PicklingSensorWriteDatabase(pickle_addr: str)¶
- add_sw(event: SensorWriteEvent)¶
Add a new event to synchronize
- on_synced_sw(event: SensorWriteEvent)¶
Called when given event becomes synced
This will never be called if dont_do_pathpoints is True
Archive database¶
Lists of pathpoints to be archived also need to persist in case of restart. You need to implement the following:
- class smok.extras.BaseArchivesDatabase¶
A base class for databases with archiving instructions.
- abstract get_archiving_instructions() Dict[int, List[str]] ¶
Return currently stored archiving instructions, or an empty dict if none available.
Called by ArchivingAndMacroThread
- Returns
a dictionary of (archiving_interval::int => list of pathpoints to archive)
- abstract on_archiving_data_sync(new_data: Dict[int, List[str]]) None ¶
Called on new archiving data instructions.
Called by ArchivingAndMacroThread
- Parameters
new_data – a dictionary of (archiving_interval::int => list of pathpoints to archive)
In case you want a fast solution, there’s also
- class smok.extras.PicklingArchivesDatabase(path: str)¶
An archiving database basing off pickling things on disk
- Parameters
path – path to the pickle. If does not exist, will be created
- get_archiving_instructions() dict ¶
Return currently stored archiving instructions, or an empty dict if none available.
Called by ArchivingAndMacroThread
- Returns
a dictionary of (archiving_interval::int => list of pathpoints to archive)
- on_archiving_data_sync(new_data) None ¶
Called on new archiving data instructions.
Called by ArchivingAndMacroThread
- Parameters
new_data – a dictionary of (archiving_interval::int => list of pathpoints to archive)
BAOB storage¶
BAOBs are essentially similar to metadata, except for being versioned monotonically and being composed of bytes instead of characters.
If you want your BAOBs to persist restarts, feel free to implement following class:
- class smok.extras.BaseBAOBDatabase¶
- check_consistency() None ¶
Check that for all keys that are returned by get_all_keys version can be obtained via get_baob_version.
If not, delete such a key and log an ERROR.
Called by constructor of
SMOKDevice
- abstract delete_baob(key: str) None ¶
Delete particular BAOB
- Raises
KeyError – BAOB does not exist
- abstract get_all_keys() Iterator[str] ¶
Stream all keys of BAOBs stored locally
- abstract get_baob_value(key: str) bytes ¶
Return a value of a particular BAOB
- Raises
KeyError – BAOB does not exist
- abstract get_baob_version(key: str) int ¶
Return a version of particular BAOB
- Raises
KeyError – BAOB does not exist
- abstract set_baob_value(key: str, data: bytes, version: int)¶
Store a particular version of a BAOB within the database
- class smok.extras.BAOBDigest(key: str, version: int)¶
A descriptor of a BAOB
- Variables
key – key of this BAOB (str)
version – version of this BAOB (int)
If you want a quick database that persists restarts, take a look at:
- class smok.extras.PicklingBAOBDatabase(path: str)¶
- Parameters
path – path that has to be a directory where BAOB data will be stored. If that directory does not exist, it will be created
- delete_baob(key: str) None ¶
Delete particular BAOB
- Raises
KeyError – BAOB does not exist
- get_all_keys()¶
Stream all keys of BAOBs stored locally
- get_baob_value(key: str) bytes ¶
Return a value of a particular BAOB
- Raises
KeyError – BAOB does not exist
- get_baob_version(key: str) int ¶
Return a version of particular BAOB
- Raises
KeyError – BAOB does not exist
- set_baob_value(key: str, data: bytes, version: int)¶
Store a particular version of a BAOB within the database
Sensor storage¶
If you wish to persist your sensor definition across restarts, feel free to implement the following:
- class smok.extras.BaseSensorDatabase¶
- abstract get_all_sensors() Iterator[Sensor] ¶
Return all sensors stored in the database.
Called by user threads and possibly predicates (so also CommunicatorThread).
- abstract get_sensor(fqts: str) Sensor ¶
Return a sensor
Called by user threads and possibly predicates (so also CommunicatorThread)
- Parameters
fqts – fqts for target sensor, always in canonical form
- Raises
KeyError – sensor not defined
- on_register(device: SMOKDevice) None ¶
Called by SMOKDevice upon registering this database.
Called by SMOKDevice’s constructor.
If you need quickly a persisting pathpoint database, try
- class smok.extras.PicklingSensorDatabase(path: str)¶
Pathpoint value storage¶
If you want to store your pathpoint values in a way that would survive restarts, just define both classes:
- class smok.extras.BasePathpointDatabase¶
An abstract base class for pathpoint archives database.
Try to make it reasonably thread-safe. It is documented which call is called by which thread.
- checkpoint() None ¶
Called by the communicator thread, once every about 60 seconds. May be called much more often, it’s the function responsibility to throttle.
- abstract get_archive_data(pathpoint: str, start: Optional[Union[int, float]], stop: Optional[Union[int, float]]) Iterator[Tuple[Union[int, float], Union[int, float, str, OperationFailedError]]] ¶
Return some archival data for this pathpoint
Called by user threads.
- Parameters
pathpoint – name of the pathpoint
start – start of the period, in milliseconds since Epoch, or the earliest timestamp that is registered if None
stop – stop of the period, in milliseconds since Epoch, or the latest timestamp that is registered if None
- Returns
an iterator of two-tuple (timestamp in milliseconds, value or OperationReadError instance)
- abstract get_current_value(pathpoint: str) Tuple[Union[int, float], Union[int, float, str]] ¶
Get the current value for given pathpoint.
Called by user threads.
- Parameters
pathpoint – name of the pathpoint
- Returns
a tuple of timestamp, value
- Raises
OperationFailedError – read of this pathpoint has failed
NotReadedError – pathpoint has no last value (or does not exist)
- abstract get_data_to_sync() Optional[BaseDataToSynchronize] ¶
At most one instance of BaseDataToSynchronize is guaranteed to exist at given time.
Called by communicator thread.
- Returns
return data to synchronize, or None if nothing is to synchronize
- abstract on_new_data(pathpoint: str, timestamp: Union[int, float], value_or_exception: Union[int, float, str, OperationFailedError]) None ¶
Called whenever there’s a new value.
Called by order executor thread and user-threads if you make use of method
set_new_value()
.- Parameters
pathpoint – Native pathpoint that has been written
timestamp – timestamp of the operation in milliseconds
value_or_exception – a value of the pathpoint or an OperationFailedError instance
- class smok.extras.BaseDataToSynchronize¶
- abstract acknowledge() None ¶
Mark the data as successfully synchronized
- abstract negative_acknowledge() None ¶
Mark the data as failed to synchronize
- abstract to_json() List ¶
Return a JSON structure that looks like this (specification expressed in OpenAPI 3.0 format)
type: array items: type: object properties: path: type: string description: Name of the pathpoint to serve values: type: array items: type: object description: This has either error_code or value properties: timestamp: type: integer format: int64 description: Timestamp in milliseconds error_code: type: string description: Reason of error code enum: - malformed - timeout - invalid value: type: oneOf: - integer - string - number description: Value of the pathpoint required: - timestamp
And create an instance of BasePathpointDatabase and feed it to argument
of SMOKDevice
.
If you need quickly a persisting pathpoint database, try
- class smok.extras.PicklingPathpointDatabase(path: str)¶
An example pathpoint database that persists it’s data on disk.
It persists it’s store every
checkpoint()
call.- Parameters
path – path to file containing pickled data.
It does not however provide for archives.
Event storage¶
- class smok.extras.BaseEventSynchronization¶
- abstract acknowledge(*uuids: str) None ¶
Called by the communicator, when sync succeeds
- Parameters
uuids – UUIDs assigned to events
- negative_acknowledge() None ¶
Called by the communicator, when sync fails
- class smok.extras.BaseEventDatabase¶
- abstract add_event(event: Event) None ¶
Register a new event in the database.
Can be called by any thread.
- Parameters
event – event to register
- checkpoint() None ¶
Called by the communicator thread, once every about 60 seconds. May be called much more often, it’s the function responsibility to throttle.
- abstract clear_closed_and_synced_events() None ¶
Clear all events that were both closed and are already on the server
- abstract get_cache(predicate_id: str) Any ¶
Return predicate’s internal data
- Raises
KeyError – predicate internal data not found
- abstract get_events_to_sync() Optional[BaseEventSynchronization] ¶
At most a single instance of BaseEventSynchronization will be alive at a time.
- Returns
object to sync, or None if there’s nothing to sync.
- abstract on_predicate_deleted(predicate_id: str) None ¶
Called when a predicate is deleted.
Called by communicator thread.
- Parameters
predicate_id – ID of the predicate that was deleted
- abstract set_cache(predicate_id: str, cache) None ¶
Store predicate’s internal data. Do it in a way that will survive restarts.
If you need quickly a pickling database, try
- class smok.extras.PicklingEventDatabase(cache_path: str, data_path: str)¶
A simple database that pickles events on disk each
checkpoint()
- Parameters
cache_path – path to cache with predicate’s data
data_path – path to cache with event data
- add_event(event: Event) None ¶
Register a new event in the database.
Can be called by any thread.
- Parameters
event – event to register
- checkpoint()¶
Called by the communicator thread, once every about 60 seconds. May be called much more often, it’s the function responsibility to throttle.
- clear_closed_and_synced_events() None ¶
Clear all events that were both closed and are already on the server
- get_events_to_sync() Optional[PicklingEventSynchronization] ¶
At most a single instance of BaseEventSynchronization will be alive at a time.
- Returns
object to sync, or None if there’s nothing to sync.
If you don’t care about events, you can also use
- class smok.extras.NullEventDatabase¶
For these use cases where you don’t use events at all.
- add_event(event: Event) None ¶
Register a new event in the database.
Can be called by any thread.
- Parameters
event – event to register
- checkpoint() None ¶
Called by the communicator thread, once every about 60 seconds. May be called much more often, it’s the function responsibility to throttle.
- clear_closed_and_synced_events() None ¶
Clear all events that were both closed and are already on the server
- get_cache(predicate_id: str) Any ¶
Return predicate’s internal data
- Raises
KeyError – predicate internal data not found
- get_events_to_sync() Optional[BaseEventSynchronization] ¶
At most a single instance of BaseEventSynchronization will be alive at a time.
- Returns
object to sync, or None if there’s nothing to sync.
- on_predicate_deleted(predicate_id: str) None ¶
Called when a predicate is deleted.
Called by communicator thread.
- Parameters
predicate_id – ID of the predicate that was deleted
- set_cache(predicate_id: str, cache) None ¶
Store predicate’s internal data. Do it in a way that will survive restarts.
Macro storage¶
- class smok.extras.BaseMacroDatabase¶
- checkpoint() None ¶
Called about each 60 seconds by communicator thread
- abstract get_done_macros() Iterator[Tuple[str, int]] ¶
Get a list of macros that were done, but the server wasn’t told about that yet.
Called by communicator thread
- Returns
an iterator of a tuple (macro_id, executed timestamps)
- abstract get_macros() List[Macro] ¶
Get a list of macros to execute
- abstract notify_macro_executed(macro_id: str, timestamp: int) None ¶
Notify the DB that the device has executed a given macro.
This should update the database’s macro definition as well.
Called by archive & macro thread
- abstract notify_macro_synced(macro_id: str, timestamp: int) None ¶
Notify the DB that server has received notification about executing given macro.
Called by communicator thread
- abstract set_macros(macros: List[Macro]) None ¶
Set current list of macros to execute (have been not yet executed)
- Parameters
macros – list of macro to execute
If you need quickly a pickling database, try
- class smok.extras.PicklingMacroDatabase(path)¶
An example persistent macro database
- Parameters
path – path to pickle file
- get_done_macros() Iterator[Tuple[str, int]] ¶
Get a list of macros that were done, but the server wasn’t told about that yet.
Called by communicator thread
- Returns
an iterator of a tuple (macro_id, executed timestamps)
- get_macros() List[Macro] ¶
Get a list of macros to execute
- notify_macro_executed(macro_id: str, timestamp: int) None ¶
Notify the DB that the device has executed a given macro.
This should update the database’s macro definition as well.
Called by archive & macro thread
- notify_macro_synced(macro_id: str, timestamp: int) None ¶
Notify the DB that server has received notification about executing given macro.
Called by communicator thread
- set_macros(macros: List[Macro]) None ¶
Set current list of macros to execute (have been not yet executed)
- Parameters
macros – list of macro to execute
Metadata store¶
- class smok.extras.BaseMetadataDatabase¶
Base class for metadata databases
- abstract delete_plain(key: str) None ¶
Delete a particular key for plain database
No-op on key not being in the database.
- Parameters
key – key to delete
- abstract get_all_plain() Iterator[Tuple[str, str, float]] ¶
Get all plain keys in the database
- Returns
an iterator of tuple(key, value, timestamp of the write)
- abstract get_plain(key: str) str ¶
Return a value for given plain key
- Parameters
key – key to give a value for
- Raises
KeyError – key not found, ask the server
- abstract put_plain(key: str, value: str, timestamp: Optional[float] = None) None ¶
A plain value was just written.
- Parameters
key – key of the write
value – value of the write
timestamp – timestamp of the write, default is current
- abstract update_plain(key: str, value: str, timestamp: float) None ¶
On an update plain request from the server
- Parameters
key – key of the key
value – value of the key
timestamp – timestamp at which time it was last written
If you need quickly a pickling database, try
- class smok.extras.PicklingMetadataDatabase(path: str)¶
Base class for metadata databases that pickles changes to disk after every change.
- delete_plain(key: str) None ¶
Delete a particular key for plain database
No-op on key not being in the database.
- Parameters
key – key to delete
- get_all_plain() Iterator[Tuple[str, str, float]] ¶
Get all plain keys in the database
- Returns
an iterator of tuple(key, value, timestamp of the write)
- get_plain(key: str) str ¶
Return a value for given plain key
- Parameters
key – key to give a value for
- Raises
KeyError – key not found, ask the server
- put_plain(key: str, value: str, timestamp: Optional[float] = None) None ¶
A plain value was just written.
- Parameters
key – key of the write
value – value of the write
timestamp – timestamp of the write, default is current
- update_plain(key: str, value: str, timestamp: float) None ¶
On an update plain request from the server
- Parameters
key – key of the key
value – value of the key
timestamp – timestamp at which time it was last written