HEX
Server: LiteSpeed
System: Linux shams.tasjeel.ae 5.14.0-611.5.1.el9_7.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Nov 11 08:09:09 EST 2025 x86_64
User: infowars (1469)
PHP: 8.2.29
Disabled: NONE
Upload Files
File: //proc/self/root/lib/check_mk_agent/plugins/mk_tinkerforge_2.py
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# Copyright (C) 2019 Checkmk GmbH - License: GNU General Public License v2
# This file is part of Checkmk (https://checkmk.com). It is subject to the terms and
# conditions defined in the file COPYING, which is part of this source code package.
# Do not test generated 2.x files
# fmt: off
# type: ignore

from __future__ import with_statement
from __future__ import absolute_import
from io import open
__version__ = "2.3.0p38"

###################################################
# plugin to retrieve data from tinkerforge devices.
#
# please note that for this plugin to work, the tinkerforge api has to be installed
#  (included in OMD, otherwise get it from http://download.tinkerforge.com/bindings/python/)
# Also, if the tinkerforge device is connected directly to the computer via usb,
# the brick deamon has to be installed and running: http://download.tinkerforge.com/tools/brickd/)
#
# This has been designed to also work as a special agent. In this case the following configuration
# settings have to be provided on the command line

#######################################################
# sample configuration (/etc/check_mk/tinkerforge.cfg):
#
# host = "localhost"
# port = 4223
# segment_display_uid = "abc"         # uid of the sensor to display on the 7-segment display
# segment_display_brightness = 2      # brightness of the 7-segment display (0-7)
#
# to find the uid of a sensor, either use brickv or run the plugin
# manually. plugin output looks like this:
#   temperature,Ab3d5F.a.xyz,2475
# xyz is the uid you're looking for. It's always the last of the dot-separated sensor path
# (Ab3d5F is the id of the master brick to which the sensor is connected, a is the port
#  to which the sensor is connected)

##################
# developer notes:
#
# Support for individual bricklets has to be added in init_device_handlers.
#  Currently the bricklets included in the Starter Kit: Server Room Monitoring are
#  implemented

# Don't have tinkerforge module during tests. So disable those checks
# pylint: disable=import-error

import hashlib
import os
import sys
import time
from optparse import OptionParser  # pylint: disable=deprecated-module
from urllib2 import urlopen


def check_digest(data, expected):
    digest = hashlib.sha256(data.read()).hexdigest()
    if digest != expected:
        raise ValueError("Failed to validate digest: expected: %s, got: %s." % (expected, digest))


def install():
    dest = os.path.dirname(os.path.realpath(__file__))
    sys.stdout.write("installing tinkerforge python api to %s\n" % dest)
    if os.path.exists(os.path.join(dest, "tinkerforge")):
        sys.stdout.write("already installed\n")
        return 1

    if sys.version_info[0] >= 3:
        from io import BytesIO
    else:
        from cStringIO import StringIO as BytesIO
    import shutil
    from zipfile import ZipFile

    url = "https://download.tinkerforge.com/bindings/python/tinkerforge_python_bindings_2_1_30.zip"
    # sha256sum of the downloaded file. To update:
    #   `curl -s "https://download.tinkerforge.com/[new-version].zip | sha256sum`
    download_digest = "e735e0e53ad56e2c2919cf412f3ec28ec0997919eb556b20c27519a57fb7bad0"

    response = urlopen(url)  # nosec B310 # BNS:28af27 # pylint: disable=consider-using-with
    buf = BytesIO(response.read())
    check_digest(buf, download_digest)

    with ZipFile(buf) as z:
        extract_files = [f for f in z.namelist() if f.startswith("source/tinkerforge")]
        z.extractall(dest, extract_files)

    shutil.move(os.path.join(dest, "source", "tinkerforge"), os.path.join(dest, "tinkerforge"))
    shutil.rmtree(os.path.join(dest, "source"))

    return 0


DEFAULT_SETTINGS = {
    "host": "localhost",
    "port": 4223,
    "segment_display_uid": None,
    "segment_display_brightness": 2,
}

# globals
segment_display_value = None
segment_display_unit = ""
segment_display = None


def id_to_string(identifier):
    return "%s.%s.%s" % (identifier.connected_uid, identifier.position, identifier.uid)


def print_generic(settings, sensor_type, ident, factor, unit, *values):
    if ident.uid == settings["segment_display_uid"]:
        global segment_display_value, segment_display_unit
        segment_display_value = int(values[0] * factor)
        segment_display_unit = unit
    sys.stdout.write(
        "%s,%s,%s\n" % (sensor_type, id_to_string(ident), ",".join([str(val) for val in values]))
    )


def print_ambient_light(conn, settings, uid):
    from tinkerforge.bricklet_ambient_light import (  # type: ignore[import]
        BrickletAmbientLight,
    )

    br = BrickletAmbientLight(uid, conn)
    print_generic(settings, "ambient", br.get_identity(), 0.01, "L", br.get_illuminance())


def print_ambient_light_v2(conn, settings, uid):
    from tinkerforge.bricklet_ambient_light_v2 import (  # type: ignore[import]
        BrickletAmbientLightV2,
    )

    br = BrickletAmbientLightV2(uid, conn)
    print_generic(settings, "ambient", br.get_identity(), 0.01, "L", br.get_illuminance())


def print_temperature(conn, settings, uid):
    from tinkerforge.bricklet_temperature import (  # type: ignore[import]
        BrickletTemperature,
    )

    br = BrickletTemperature(uid, conn)
    print_generic(
        settings,
        "temperature",
        br.get_identity(),
        0.01,
        "\N{DEGREE SIGN}C",
        br.get_temperature(),
    )


def print_temperature_ext(conn, settings, uid):
    from tinkerforge.bricklet_ptc import BrickletPTC  # type: ignore[import]

    br = BrickletPTC(uid, conn)
    print_generic(
        settings,
        "temperature.ext",
        br.get_identity(),
        0.01,
        "\N{DEGREE SIGN}C",
        br.get_temperature(),
    )


def print_humidity(conn, settings, uid):
    from tinkerforge.bricklet_humidity import BrickletHumidity  # type: ignore[import]

    br = BrickletHumidity(uid, conn)
    print_generic(settings, "humidity", br.get_identity(), 0.1, "RH", br.get_humidity())


def print_master(conn, settings, uid):
    from tinkerforge.brick_master import BrickMaster  # type: ignore[import]

    br = BrickMaster(uid, conn)
    print_generic(
        settings,
        "master",
        br.get_identity(),
        1.0,
        "",
        br.get_stack_voltage(),
        br.get_stack_current(),
        br.get_chip_temperature(),
    )


def print_motion_detector(conn, settings, uid):
    from tinkerforge.bricklet_motion_detector import (  # type: ignore[import]
        BrickletMotionDetector,
    )

    br = BrickletMotionDetector(uid, conn)
    print_generic(settings, "motion", br.get_identity(), 1.0, "", br.get_motion_detected())


def display_on_segment(conn, settings, text):
    #        0x01
    #       ______
    #      |      |
    # 0x20 |      | 0x02
    #      |______|
    #      | 0x40 |
    # 0x10 |      | 0x04
    #      |______|
    #        0x08

    CHARACTERS = {
        "0": 0x3F,
        "1": 0x06,
        "2": 0x5B,
        "3": 0x4F,
        "4": 0x66,
        "5": 0x6D,
        "6": 0x7D,
        "7": 0x07,
        "8": 0x7F,
        "9": 0x6F,
        "C": 0x39,
        "H": 0x74,
        "L": 0x38,
        "R": 0x50,
        "\N{DEGREE SIGN}": 0x63,
    }

    from tinkerforge.bricklet_segment_display_4x7 import (  # type: ignore[import]
        BrickletSegmentDisplay4x7,
    )

    br = BrickletSegmentDisplay4x7(segment_display, conn)
    segments = []  # type: list
    for letter in text:
        if len(segments) >= 4:
            break
        if letter in CHARACTERS:
            segments.append(CHARACTERS[letter])

    # align to the right
    segments = [0] * (4 - len(segments)) + segments

    br.set_segments(segments, settings["segment_display_brightness"], False)


def init_device_handlers():
    device_handlers = {}

    # storing the dev_id is not necessary but may save a little time as otherwise the module
    # needs to be imported just to find out this id. If the bricklet is present the module
    # gets imported anyway of course
    for dev_id, module_name, clazz, handler in [
        (13, "brick_master", "BrickMaster", print_master),
        (21, "bricklet_ambient_light", "BrickletAmbientLight", print_ambient_light),
        (
            259,
            "bricklet_ambient_light_v2",
            "BrickletAmbientLightV2",
            print_ambient_light_v2,
        ),
        (216, "bricklet_temperature", "BrickletTemperature", print_temperature),
        (226, "bricklet_ptc", "BrickletPTC", print_temperature_ext),
        (27, "bricklet_humidity", "BrickletHumidity", print_humidity),
        (
            233,
            "bricklet_motion_detector",
            "BrickletMotionDetector",
            print_motion_detector,
        ),
    ]:
        if dev_id is not None:
            device_handlers[dev_id] = handler
        else:
            module = __import__("tinkerforge." + module_name)
            sub_module = module.__dict__[module_name]
            device_handlers[sub_module.__dict__[clazz].DEVICE_IDENTIFIER] = handler

    return device_handlers


def enumerate_callback(
    conn,
    device_handlers,
    settings,
    uid,
    connected_uid,
    position,
    hardware_version,
    firmware_version,
    device_identifier,
    enumeration_type,
):
    if device_identifier == 237:
        global segment_display
        segment_display = uid
    elif device_identifier in device_handlers:
        device_handlers[device_identifier](conn, settings, uid)


def read_config(env):
    settings = DEFAULT_SETTINGS
    cfg_path = os.path.join(os.getenv("MK_CONFDIR", "/etc/check_mk"), "tinkerforge.cfg")

    if os.path.isfile(cfg_path):
        with open(cfg_path) as opened_file:
            exec(opened_file.read(), settings, settings)  # nosec B102 # BNS:a29406
    return settings


def main():
    # host = "localhost"
    # port = 4223
    # segment_display_uid = "abc"         # uid of the sensor to display on the 7-segment display
    # segment_display_brightness = 2      # brightness of the 7-segment display (0-7)

    settings = read_config(os.environ)
    parser = OptionParser()
    parser.add_option(
        "--host",
        dest="host",
        default=settings["host"],
        help="host/ipaddress of the tinkerforge device",
        metavar="ADDRESS",
    )
    parser.add_option(
        "--port",
        dest="port",
        default=settings["port"],
        type=int,
        help="port of the tinkerforge device",
        metavar="PORT",
    )
    parser.add_option(
        "--segment_display_uid",
        dest="uid",
        default=settings["segment_display_uid"],
        help="uid of the bricklet which will be displayed in the 7-segment display",
        metavar="UID",
    )
    parser.add_option(
        "--segment_display_brightness",
        type=int,
        dest="brightness",
        default=settings["segment_display_brightness"],
        help="brightness of the 7-segment display (0-7)",
    )
    parser.add_option(
        "--install",
        action="store_true",
        help="install tinkerforge python api to same directory as the plugin",
    )

    options = parser.parse_args()[0]

    settings = {
        "host": options.host,
        "port": options.port,
        "segment_display_uid": options.uid,
        "segment_display_brightness": options.brightness,
    }

    if options.install:
        return install()

    try:
        from tinkerforge.ip_connection import IPConnection  # type: ignore[import]
    except ImportError:
        sys.stdout.write("<<<tinkerforge:sep(44)>>>\n")
        sys.stdout.write("master,0.0.0,tinkerforge api isn't installed\n")
        return 1

    conn = IPConnection()
    conn.connect(settings["host"], settings["port"])

    device_handlers = init_device_handlers()

    try:
        sys.stdout.write("<<<tinkerforge:sep(44)>>>\n")

        conn.register_callback(
            IPConnection.CALLBACK_ENUMERATE,
            lambda uid,
            connected_uid,
            position,
            hardware_version,
            firmware_version,
            device_identifier,
            enumeration_type: enumerate_callback(
                conn,
                device_handlers,
                settings,
                uid,
                connected_uid,
                position,
                hardware_version,
                firmware_version,
                device_identifier,
                enumeration_type,
            ),
        )
        conn.enumerate()

        # bricklets respond asynchronously in callbacks and we have no way of knowing
        # what bricklets to expect
        time.sleep(0.1)

        if segment_display is not None:
            if segment_display_value is not None:
                display_on_segment(
                    conn,
                    settings,
                    "%d%s" % (segment_display_value, segment_display_unit),
                )
            else:
                display_on_segment(conn, settings, "")
    finally:
        conn.disconnect()
    return None


if __name__ == "__main__":
    main()