How to start

First, you will need to obtain a certificate signed by SMOK sp. z o. o. Contact Piotr Maslanka to have you do that. A certificate is valid either for production or for testing environments.

Then you will need to think of what pathpoints your device supports. SMOK sensors are a server-side construct, so your device thinks in pathpoints. A pathpoint is a single value, that can be read and written to by SMOK. An example pathpoint would be a single MODBUS register.

You can view the example, or keep on reading these docs.

What exactly does smok-client handle for me

It handles:

  • executing read and write orders

  • executing message and wait orders

  • handling archiving of pathpoints

  • handling scheduled execution of macros

  • buffering data about executed macros, opened events and readed data until a contact with the server can be made

Starting with some code

First of all, you need to subclass SMOKDevice and define the method provide_unknown_pathpoint().

from smok.client import SMOKDevice
from smok.basics import StorageLevel
from smok.pathpoint import Pathpoint, AdviseLevel, PathpointValueType
from concurrent.futures import Future

class MyModbusRegister(Pathpoint):
    def on_read(advise: AdviseLevel) -> Future:
        ...

    def on_write(value: PathpointValueType, advise: AdviseLevel) -> Future:
        ...

class MyDevice(SMOKDevice):
    def __init__(self):
        super().__init__('path to cert file', 'path to key file',
                         'path to pickle for predicates')

sd = MyDevice()
pp = MyModbusRegister('W1', StorageLevel.TREND)
sd.register_pathpoint(pp)

A very important method of your custom class is provide_unknown_pathpoint(). When smok-client encounters an unknown pathpoint (for example, an order for it was made) it tries to create it. This method should provide this pathpoint. Note that it doesn’t need to provide pathpoints that you will create and register manually. If a pathpoint cannot be found, and smok-client should ignore the order sent to it, it should raise KeyError.

The pickle for predicates will be used for persisting the state of alarm detectors, aka predicate .

Note that first letter of the pathpoint defines it’s type. Allowed are:

class smok.pathpoint.PathpointType(value)

An enumeration.

B = 'B'

binary, 0 or 1

W = 'W'

unsigned 16-bit

d = 'd'

IEEE 754 double

f = 'f'

IEEE 754 float

static get_type(path: str) PathpointType

Return the type of given pathpoint

Returns

type of the pathpoint

u = 'u'

an Unicode string

w = 'w'

signed 16-bit

If you need to coerce a value to target pathpoint’s type, use the following method:

smok.pathpoint.to_type(value: Union[int, float, str], type_: PathpointType) Union[int, float, str]

Coerces a value to target pathpoint type

Parameters
  • value – value to coerce

  • type – type of target pathpoint

Returns

a coerced value

If the first letter is r, then the type of the pathpoint is declared by the second letter. This pathpoint will be called a reparse pathpoint The rest represents an expression, where other pathpoint are taken in braces and the resulting expression is evaluated. This is called a reparse pathpoint, and you don’t need to deal directly with them. You just need to provide the non-reparse, ie. native pathpoints. Eg. a reparse pathpoint that would be a sum of two other pathpoints would be:

` rW{W1r4002}+{W1r4002} `

You should override two calls, ie. on_read() and on_write(). By default they do nothing, not even read their pathpoints. on_read() may return: * a value - in that case it will be the pathpoint’s value * raise an OperationFailedError - in that case it will be the pathpoint’s value * return a Future, that results in:

  • a value - in that case it will be the pathpoint’s value

  • raises an OperationFailedError - in that case it will be the pathpoint’s value

on_write() may return a Future, or it may just return a None to signal that write has been successfully completed. The future must complete when the write finishes, and it may either return a None to signal that the write went OK, or it may raise an OperationFailedError to signal that it went wrong.

Operations will be retried according to their advise level policy.

class smok.exceptions.OperationFailedError(reason: OperationFailedReason = OperationFailedReason.TIMEOUT, timestamp: Optional[Union[int, float]] = None)

Raised by the pathpoint’s on_read and on_write futures when the operation fails

Variables
  • reason – reason of failure, or None if just not readed

  • timestamp – timestamp of failure in milliseconds

A reason has to be given, it is an enum

class smok.exceptions.OperationFailedReason(value)

An enumeration.

INVALID = 'invalid'

The device responded OK, but told us that this pathpoint is bogus

MALFORMED = 'malformed'

The device responded with a malformed protocol frame

TIMEOUT = 'timeout'

The device did not respond within given time

TYPE = 'type'

There was an error with the typing of the value

The operation will be automatically retried, depending of the advise level of the command.

When you’re done, don’t forget to close the SMOKDevice, since it spawns 3 threads and makes temporary files with the certificate content, if you provide them not by files, but by file-like objects.

During invoking the smok.pathpoint.Pathpoint.get() you might get the previous exceptions, but also a new one:

class smok.exceptions.NotReadedError(timestamp: Optional[float] = None)

The value is not available, due to it having not been yet read.

Note that this is invalid to return in read handling futures!

Variables

timestamp – timestamp of failure in milliseconds

sd.close()      # this may block for like 10 seconds

Logging

You might want to set the logging level for ngtt to ERROR, since it will spam with different messages up to WARNING in severity for it’s normal operation.

Threads

SMOKDevice spawns 4 threads to help it out with it’s chores. They are as follows:

  • CommunicatorThread handles communication with the SMOK server, and following things:
    • reading macros (tasks to be executed in the future)

    • synchronizes BAOBs

    • fetches orders to execute on OrderExecutorThread

    • synchronizes pathpoints and sensors

    • synchronizes and executes predicates

  • ArchivingAndMacroThread takes care of reading the pathpoints that are archived, about executing macros, and synchronizes the metadata in the background.

  • OrderExecutorThread handles the loop executing orders.

  • LogPublisherThread handles publishing your logs to the SMOK server

SMOKDevice also creates 2 temporary files which will hold the device certificate and private key, if the certificate and private key is given not as a path to file, but as raw bytes data. . These are cleaned up on close().

Note

You can opt not to launch first three threads. The log publisher thread will always start.

Warning

This will fetch macros every 30 minutes, so don’t schedule events to happen sooner than 30 minutes from now on, they are likely to be missed (but still will be executed).

Nearly all of the callbacks that you provide will be called in the context of one of aforementioned threads. It will be documented which thread calls your callback.

All metadata calls are blocking so far. Metadata is best utilized when there’s an Internet uplink. It is not advised, due to performance reasons, to use it locally.

List of basic classes

SMOKDevice

Pathpoint

class smok.pathpoint.Pathpoint(device: Optional[SMOKDevice], name: str, storage_level: StorageLevel = StorageLevel.TREND, callable_on_change: Optional[Callable[[Pathpoint], None]] = None, read_no_often_than: Optional[float] = None)

Base class for an user-defined pathpoint.

Note that pathpoint is registered in the device as part of it’s creation.

Parameters
  • device – device that this pathpoint should be attached to. None is also a valid option.

  • name – pathpoint name.

  • storage_level – storage level for this pathpoint

  • callable_on_change – a callable to be called each time this pathpoint changes value, with this pathpoint as it’s sole argument. Should this callable return an exception, it will be logged as an ERROR along with it’s traceback.

  • read_no_often_than – this pathpoint should be readed at fastest each this seconds. Leave it at None (default) to disable this mechanism.

Variables
  • name – pathpoint name

  • storage_level – pathpoint’s storage level

  • current_timestamp – a timestamp in seconds of the last read

  • current_value – last readed value or an exception instance

  • device – a weak reference to the device

can_read() bool

Called by the executor before a ReadOrder is processed to determine whether this pathpoint can be read.

Returns

Can this pathpoint be read?

get() Tuple[Union[int, float], Union[int, float, str]]

Return the current pathpoint value

Returns

a tuple of (last timestamp, when the operation has failed)

Raises
get_archive(starting_at: int, stopping_at: Optional[int] = None) Iterator[Tuple[int, Union[int, float, str, OperationFailedError]]]

Get archive readings.

This will be readed entirely from the device’s data, the server will not be queried

Parameters
  • starting_at – timestamp of start, in milliseconds

  • stopping_at – timestamp of end, in milliseconds, or None for the end of the park

Returns

an iterator of tuple (timestamp in milliseconds, pathpoint value or exception instance)

on_new_storage_level(new_storage_level: StorageLevel) None

Called when this pathpoint’s storage level is updated.

This is usually invoked by syncing the pathpoints with the server.

This should complete ASAP, as this is called by a thread that is responsible for communication with the server.

Parameters

new_storage_level – new storage level for this pathpoint

on_read(advise: AdviseLevel) Optional[Union[int, float, str, Future]]

Called when there’s a request to read this pathpoint.

This is called from a separate thread spawned by SMOKDevice.

Note that can_read() will be checked for before this is called.

This may return a Future, that results in the value readed, or should raise OperationFailedError when the read fails, or should directly return the value, or should directly raise the exception.

Note

current_timestamp and current_value will be automatically updated, so there’s no need for the future to do that.

Parameters

advise – advise level of this read operation

Returns

a Future that returns the value of this pathpoint or raises OperationFailedError. This Future can also return None, if the value will be set later via set_new_value()

Raises

OperationFailedError – operation failed

on_write(value: Union[int, float, str], advise: AdviseLevel) Optional[Future]

Called when there’s a request to write this pathpoint with a particular value

This is called from a separate thread spawned by SMOKDevice

The future should raise OperationFailedError when the write fails. This can return None if the write won’t be executed and should be treated as successful.

Parameters
  • value – value to be written

  • advise – advise level of this read operation

Returns

a Future that completes successfully if written correctly or excepts if failed (you should raise OperationFailedError).

read(advise_level: AdviseLevel = AdviseLevel.ADVISE) Section

Return an order that reads this pathpoint

Dispatch them like this:

>>> my_sd.execute(pp.read(2))
Parameters

advise_level – required advise level

Returns

a Section, whose execution will refresh the value in the pathpoint

set_new_value(*args) None

May be called asynchronously by user threads to asynchronously update a pathpoint.

This is also called by the executor thread upon reading a new piece of data.

You can use it in one of two ways:

>>> pp.set_new_value(timestamp, value)

or:

>>> pp.set_new_value(OperationFailedError(...))
Parameters
  • timestamp – new timestamp

  • value – new value

write(value: Union[int, float, str], advise_level: AdviseLevel = AdviseLevel.ADVISE, stale_after: Optional[float] = None) Section

Return an order, whose execution will put target value in target pathpoint.

Dispatch them like this:

>>> my_sd.execute(pp.write(2))
Parameters
  • value – value to write

  • advise_level – advise level with which to write

  • stale_after – timestamp, that if given, will prevent the write from being executed past it

Returns

a Section, whose execution will put the target value in the pathpoint

class smok.pathpoint.ReparsePathpoint(device: Optional[SMOKDevice], name: str, storage_level: StorageLevel = StorageLevel.TREND)

A pathpoint whose value is derived from other pathpoints.

It’s first letter is ‘r’ then there goes the type letter and a Python expression with other pathpoints substituted by braces, so eg. rf{W1r4002}+2 would be pathpoint “W1r4002” plus two.

Writing a reparse pathpoint always results in a zero operation, while reading it reads all constituent pathpoints.

Good news is that you don’t need to override it, nor provide your own implementations.

get() Tuple[Union[int, float], Union[int, float, str]]

Calculate the reparse pathpoint’s value

Returns

a tuple of (timestamp in milliseconds, value)

Raises

OperationFailedError – when any constituent’s pathpoint’s read has failed

get_archive(starting_at: int, stopping_at: Optional[int] = None) Iterator[Tuple[int, Union[int, float, str, OperationFailedError]]]

Get archive readings.

This will be readed entirely from the device’s data, the server will not be queried

Parameters
  • starting_at – timestamp of start, in milliseconds

  • stopping_at – timestamp of end, in milliseconds, or None for the end of the park

Returns

an iterator of tuple (timestamp in milliseconds, sensor value or exception instance)

on_new_storage_level(new_storage_level: StorageLevel) None

Called when this pathpoint’s storage level is updated.

Parameters

new_storage_level – new storage level for this pathpoint

on_read(advise) NoReturn

It is an ImpossibleError to call this

on_write(value, advise) NoReturn

It is an ImpossibleError to call this

read(advise_level=AdviseLevel.ADVISE) Section

Reading a reparse pathpoint results in all of it’s constituents being read.

Returns

a Section, whose execution will result in this pathpoint being re-read

set_new_value(timestamp: Union[int, float], value: Union[int, float, str, OperationFailedError]) None

A no-op

write(value, advise_level=AdviseLevel.ADVISE, stale_after=None) Section

Writing a reparse pathpoint is always a zero operation

smok-client will handle reparse pathpoints automatically for you, so you don’t need to concern yourself with them being called as part of smok.client.SMOKDevice.provide_unknown_pathpoint().

Enums

class smok.pathpoint.AdviseLevel(value)

Advise level specifies how hard should the device try to execute this command.

ADVISE = 0

Best-effort

FORCE = 1

Nearly guarantees correct delivery, up to blocking the pipeline if need be

class smok.basics.StorageLevel(value)

A storage level defines how long is the pathpoint kept at SMOK server.

PERMANENT = 0

hold all values indefinitely

TREND = 1

values at most 2 weeks old will be kept

class smok.basics.Environment(value)

An environment in which this device runs

LOCAL_DEVELOPMENT = 2

CI or local development

PRODUCTION = 0

production

STAGING = 1

testing environment

Executing orders

If you want to read given pathpoint, just do the following:

read_order = pathpoint.read()
sd.execute(read_order)

Note that any smok.pathpoint.orders.Section is also a perfectly valid Future, so you may cancel it and wait for it’s result:

read_order.result()

Sadly, the completion of a Section just means that all orders have been executed, it bears no relevance how they completed. You may even cancel it:

if read_order.cancel():
    print('Successfully cancelled')

Custom order-execution loop

If you want to write a custom order-execution loop, just override smok.client.SMOKDevice.execute_section(). It will accept a single argument of Section, about which you can read up in orders.

You will also need to provide a method smok.client.SMOKDevice.sync_sections() to block until all orders from previous section have been completed.

Both of these methods will be called by the OrderExecutorThread.