How to start¶
First, you will need to obtain a certificate signed by SMOK sp. z o. o. Contact Piotr Maslanka to have you do that. A certificate is valid either for production or for testing environments.
Then you will need to think of what pathpoints your device supports. SMOK sensors are a server-side construct, so your device thinks in pathpoints. A pathpoint is a single value, that can be read and written to by SMOK. An example pathpoint would be a single MODBUS register.
You can view the example, or keep on reading these docs.
What exactly does smok-client handle for me¶
It handles:
executing read and write orders
executing message and wait orders
handling archiving of pathpoints
handling scheduled execution of macros
buffering data about executed macros, opened events and readed data until a contact with the server can be made
Starting with some code¶
First of all, you need to subclass SMOKDevice and define the method
provide_unknown_pathpoint()
.
from smok.client import SMOKDevice
from smok.basics import StorageLevel
from smok.pathpoint import Pathpoint, AdviseLevel, PathpointValueType
from concurrent.futures import Future
class MyModbusRegister(Pathpoint):
def on_read(advise: AdviseLevel) -> Future:
...
def on_write(value: PathpointValueType, advise: AdviseLevel) -> Future:
...
class MyDevice(SMOKDevice):
def __init__(self):
super().__init__('path to cert file', 'path to key file',
'path to pickle for predicates')
sd = MyDevice()
pp = MyModbusRegister('W1', StorageLevel.TREND)
sd.register_pathpoint(pp)
A very important method of your custom class is
provide_unknown_pathpoint()
. When smok-client encounters
an unknown pathpoint (for example, an order for it was made) it tries to create it.
This method should provide this pathpoint. Note that it doesn’t need to provide pathpoints
that you will create and register manually. If a pathpoint cannot be found, and smok-client should
ignore the order sent to it, it should raise KeyError.
The pickle for predicates will be used for persisting the state of alarm detectors, aka predicate .
Note that first letter of the pathpoint defines it’s type. Allowed are:
- class smok.pathpoint.PathpointType(value)¶
An enumeration.
- B = 'B'¶
binary, 0 or 1
- W = 'W'¶
unsigned 16-bit
- d = 'd'¶
IEEE 754 double
- f = 'f'¶
IEEE 754 float
- static get_type(path: str) PathpointType ¶
Return the type of given pathpoint
- Returns
type of the pathpoint
- u = 'u'¶
an Unicode string
- w = 'w'¶
signed 16-bit
If you need to coerce a value to target pathpoint’s type, use the following method:
- smok.pathpoint.to_type(value: Union[int, float, str], type_: PathpointType) Union[int, float, str] ¶
Coerces a value to target pathpoint type
- Parameters
value – value to coerce
type – type of target pathpoint
- Returns
a coerced value
If the first letter is r, then the type of the pathpoint is declared by the second letter. This pathpoint will be called a reparse pathpoint The rest represents an expression, where other pathpoint are taken in braces and the resulting expression is evaluated. This is called a reparse pathpoint, and you don’t need to deal directly with them. You just need to provide the non-reparse, ie. native pathpoints. Eg. a reparse pathpoint that would be a sum of two other pathpoints would be:
`
rW{W1r4002}+{W1r4002}
`
You should override two calls, ie. on_read()
and
on_write()
. By default they do nothing, not even read their
pathpoints.
on_read()
may return:
* a value - in that case it will be the pathpoint’s value
* raise an OperationFailedError - in that case it will be the pathpoint’s value
* return a Future, that results in:
a value - in that case it will be the pathpoint’s value
raises an OperationFailedError - in that case it will be the pathpoint’s value
on_write()
may return a Future, or it may just return a None
to signal that write has been successfully completed. The future must complete when the write
finishes, and it may either return a None to signal that the write went OK, or it may
raise an OperationFailedError to signal that it went wrong.
Operations will be retried according to their advise level policy.
- class smok.exceptions.OperationFailedError(reason: OperationFailedReason = OperationFailedReason.TIMEOUT, timestamp: Optional[Union[int, float]] = None)¶
Raised by the pathpoint’s on_read and on_write futures when the operation fails
- Variables
reason – reason of failure, or None if just not readed
timestamp – timestamp of failure in milliseconds
A reason has to be given, it is an enum
- class smok.exceptions.OperationFailedReason(value)¶
An enumeration.
- INVALID = 'invalid'¶
The device responded OK, but told us that this pathpoint is bogus
- MALFORMED = 'malformed'¶
The device responded with a malformed protocol frame
- TIMEOUT = 'timeout'¶
The device did not respond within given time
- TYPE = 'type'¶
There was an error with the typing of the value
The operation will be automatically retried, depending of the advise level of the command.
When you’re done, don’t forget to close the SMOKDevice, since it spawns 3 threads and makes temporary files with the certificate content, if you provide them not by files, but by file-like objects.
During invoking the smok.pathpoint.Pathpoint.get()
you might get the previous exceptions,
but also a new one:
- class smok.exceptions.NotReadedError(timestamp: Optional[float] = None)¶
The value is not available, due to it having not been yet read.
Note that this is invalid to return in read handling futures!
- Variables
timestamp – timestamp of failure in milliseconds
sd.close() # this may block for like 10 seconds
Logging¶
You might want to set the logging level for ngtt
to ERROR
, since it will spam with
different messages up to WARNING
in severity for it’s normal operation.
Threads¶
SMOKDevice
spawns 4 threads to help it out with it’s chores. They are as follows:
CommunicatorThread
handles communication with the SMOK server, and following things:reading macros (tasks to be executed in the future)
synchronizes BAOBs
fetches orders to execute on
OrderExecutorThread
synchronizes pathpoints and sensors
synchronizes and executes predicates
ArchivingAndMacroThread
takes care of reading the pathpoints that are archived, about executing macros, and synchronizes the metadata in the background.OrderExecutorThread
handles the loop executing orders.LogPublisherThread
handles publishing your logs to the SMOK server
SMOKDevice
also creates 2 temporary files which will hold the device
certificate and private key, if the certificate and private key is given not as a path to file,
but as raw bytes
data. . These are cleaned up on close()
.
Note
You can opt not to launch first three threads. The log publisher thread will always start.
Warning
This will fetch macros every 30 minutes, so don’t schedule events to happen sooner than 30 minutes from now on, they are likely to be missed (but still will be executed).
Nearly all of the callbacks that you provide will be called in the context of one of aforementioned threads. It will be documented which thread calls your callback.
All metadata calls are blocking so far. Metadata is best utilized when there’s an Internet uplink. It is not advised, due to performance reasons, to use it locally.
List of basic classes¶
SMOKDevice¶
Pathpoint¶
- class smok.pathpoint.Pathpoint(device: Optional[SMOKDevice], name: str, storage_level: StorageLevel = StorageLevel.TREND, callable_on_change: Optional[Callable[[Pathpoint], None]] = None, read_no_often_than: Optional[float] = None)¶
Base class for an user-defined pathpoint.
Note that pathpoint is registered in the device as part of it’s creation.
- Parameters
device – device that this pathpoint should be attached to. None is also a valid option.
name – pathpoint name.
storage_level – storage level for this pathpoint
callable_on_change – a callable to be called each time this pathpoint changes value, with this pathpoint as it’s sole argument. Should this callable return an exception, it will be logged as an ERROR along with it’s traceback.
read_no_often_than – this pathpoint should be readed at fastest each this seconds. Leave it at None (default) to disable this mechanism.
- Variables
name – pathpoint name
storage_level – pathpoint’s storage level
current_timestamp – a timestamp in seconds of the last read
current_value – last readed value or an exception instance
device – a weak reference to the device
- can_read() bool ¶
Called by the executor before a ReadOrder is processed to determine whether this pathpoint can be read.
- Returns
Can this pathpoint be read?
- get() Tuple[Union[int, float], Union[int, float, str]] ¶
Return the current pathpoint value
- Returns
a tuple of (last timestamp, when the operation has failed)
- Raises
OperationFailedError – when pathpoint’s read has failed
NotReadedError – pathpoint not yet readed
- get_archive(starting_at: int, stopping_at: Optional[int] = None) Iterator[Tuple[int, Union[int, float, str, OperationFailedError]]] ¶
Get archive readings.
This will be readed entirely from the device’s data, the server will not be queried
- Parameters
starting_at – timestamp of start, in milliseconds
stopping_at – timestamp of end, in milliseconds, or None for the end of the park
- Returns
an iterator of tuple (timestamp in milliseconds, pathpoint value or exception instance)
- on_new_storage_level(new_storage_level: StorageLevel) None ¶
Called when this pathpoint’s storage level is updated.
This is usually invoked by syncing the pathpoints with the server.
This should complete ASAP, as this is called by a thread that is responsible for communication with the server.
- Parameters
new_storage_level – new storage level for this pathpoint
- on_read(advise: AdviseLevel) Optional[Union[int, float, str, Future]] ¶
Called when there’s a request to read this pathpoint.
This is called from a separate thread spawned by SMOKDevice.
Note that
can_read()
will be checked for before this is called.This may return a Future, that results in the value readed, or should raise OperationFailedError when the read fails, or should directly return the value, or should directly raise the exception.
Note
current_timestamp
andcurrent_value
will be automatically updated, so there’s no need for the future to do that.- Parameters
advise – advise level of this read operation
- Returns
a Future that returns the value of this pathpoint or raises OperationFailedError. This Future can also return None, if the value will be set later via
set_new_value()
- Raises
OperationFailedError – operation failed
- on_write(value: Union[int, float, str], advise: AdviseLevel) Optional[Future] ¶
Called when there’s a request to write this pathpoint with a particular value
This is called from a separate thread spawned by SMOKDevice
The future should raise OperationFailedError when the write fails. This can return None if the write won’t be executed and should be treated as successful.
- Parameters
value – value to be written
advise – advise level of this read operation
- Returns
a Future that completes successfully if written correctly or excepts if failed (you should raise OperationFailedError).
- read(advise_level: AdviseLevel = AdviseLevel.ADVISE) Section ¶
Return an order that reads this pathpoint
Dispatch them like this:
>>> my_sd.execute(pp.read(2))
- Parameters
advise_level – required advise level
- Returns
a Section, whose execution will refresh the value in the pathpoint
- set_new_value(*args) None ¶
May be called asynchronously by user threads to asynchronously update a pathpoint.
This is also called by the executor thread upon reading a new piece of data.
You can use it in one of two ways:
>>> pp.set_new_value(timestamp, value)
or:
>>> pp.set_new_value(OperationFailedError(...))
- Parameters
timestamp – new timestamp
value – new value
- write(value: Union[int, float, str], advise_level: AdviseLevel = AdviseLevel.ADVISE, stale_after: Optional[float] = None) Section ¶
Return an order, whose execution will put target value in target pathpoint.
Dispatch them like this:
>>> my_sd.execute(pp.write(2))
- Parameters
value – value to write
advise_level – advise level with which to write
stale_after – timestamp, that if given, will prevent the write from being executed past it
- Returns
a Section, whose execution will put the target value in the pathpoint
- class smok.pathpoint.ReparsePathpoint(device: Optional[SMOKDevice], name: str, storage_level: StorageLevel = StorageLevel.TREND)¶
A pathpoint whose value is derived from other pathpoints.
It’s first letter is ‘r’ then there goes the type letter and a Python expression with other pathpoints substituted by braces, so eg. rf{W1r4002}+2 would be pathpoint “W1r4002” plus two.
Writing a reparse pathpoint always results in a zero operation, while reading it reads all constituent pathpoints.
Good news is that you don’t need to override it, nor provide your own implementations.
- get() Tuple[Union[int, float], Union[int, float, str]] ¶
Calculate the reparse pathpoint’s value
- Returns
a tuple of (timestamp in milliseconds, value)
- Raises
OperationFailedError – when any constituent’s pathpoint’s read has failed
- get_archive(starting_at: int, stopping_at: Optional[int] = None) Iterator[Tuple[int, Union[int, float, str, OperationFailedError]]] ¶
Get archive readings.
This will be readed entirely from the device’s data, the server will not be queried
- Parameters
starting_at – timestamp of start, in milliseconds
stopping_at – timestamp of end, in milliseconds, or None for the end of the park
- Returns
an iterator of tuple (timestamp in milliseconds, sensor value or exception instance)
- on_new_storage_level(new_storage_level: StorageLevel) None ¶
Called when this pathpoint’s storage level is updated.
- Parameters
new_storage_level – new storage level for this pathpoint
- on_read(advise) NoReturn ¶
It is an ImpossibleError to call this
- on_write(value, advise) NoReturn ¶
It is an ImpossibleError to call this
- read(advise_level=AdviseLevel.ADVISE) Section ¶
Reading a reparse pathpoint results in all of it’s constituents being read.
- Returns
a Section, whose execution will result in this pathpoint being re-read
- set_new_value(timestamp: Union[int, float], value: Union[int, float, str, OperationFailedError]) None ¶
A no-op
smok-client
will handle reparse pathpoints automatically for you, so you don’t need
to concern yourself with them being called as part of
smok.client.SMOKDevice.provide_unknown_pathpoint()
.
Enums¶
- class smok.pathpoint.AdviseLevel(value)¶
Advise level specifies how hard should the device try to execute this command.
- ADVISE = 0¶
Best-effort
- FORCE = 1¶
Nearly guarantees correct delivery, up to blocking the pipeline if need be
Executing orders¶
If you want to read given pathpoint, just do the following:
read_order = pathpoint.read()
sd.execute(read_order)
Note that any smok.pathpoint.orders.Section
is also a perfectly valid Future, so
you may cancel it and wait for it’s result:
read_order.result()
Sadly, the completion of a Section just means that all orders have been executed, it bears no relevance how they completed. You may even cancel it:
if read_order.cancel():
print('Successfully cancelled')
Custom order-execution loop¶
If you want to write a custom order-execution loop, just override
smok.client.SMOKDevice.execute_section()
. It will accept a single argument of
Section
, about which you can read up in orders.
You will also need to provide a method
smok.client.SMOKDevice.sync_sections()
to block until all orders from previous section
have been completed.
Both of these methods will be called by the OrderExecutorThread.