HEX
Server: LiteSpeed
System: Linux shams.tasjeel.ae 5.14.0-611.5.1.el9_7.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Nov 11 08:09:09 EST 2025 x86_64
User: infowars (1469)
PHP: 8.2.29
Disabled: NONE
Upload Files
File: //proc/self/root/lib/check_mk_agent/plugins/mk_postgres.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (C) 2019 Checkmk GmbH - License: GNU General Public License v2
# This file is part of Checkmk (https://checkmk.com). It is subject to the terms and
# conditions defined in the file COPYING, which is part of this source code package.
r"""Check_MK Agent Plugin: mk_postgres

This is a Check_MK Agent plugin. If configured, it will be called by the
agent without any arguments.

Can be configured with $MK_CONFDIR/postgres.cfg
Example for postgres.cfg file:

-----postgres.cfg-----------------------------------------
DBUSER=postgres
PG_BINARY_PATH=/usr/bin/psql
INSTANCE=/home/postgres/db1.env:USER_NAME:/PATH/TO/.pgpass:
INSTANCE=/home/postgres/db2.env:USER_NAME:/PATH/TO/.pgpass:
----------------------------------------------------------

Example of an environment file:

-----/home/postgres/db1.env-----------------------------------------
export PGDATABASE="data"
export PGPORT="5432"
export PGVERSION="14"
----------------------------------------------------------

Inside of the environment file, only `PGPORT` is mandatory.
In case there is no `INSTANCE` specified by the postgres.cfg, then the plugin assumes defaults.
For example, the configuration

-----postgres.cfg-----------------------------------------
DBUSER=postgres
PG_BINARY_PATH=/usr/bin/psql
----------------------------------------------------------

is equivalent to

-----postgres.cfg-----------------------------------------
DBUSER=postgres
PG_BINARY_PATH=/usr/bin/psql
INSTANCE=/home/postgres/does-not-exist.env:postgres::postgres
----------------------------------------------------------

-----/home/postgres/does-not-exist.env--------------------
export PGDATABASE="main"
export PGPORT="5432"
----------------------------------------------------------

The only difference being `/home/postgres/does-not-exist.env` does not exist in the first setup.
Different defaults are chosen for Windows.
"""

__version__ = "2.3.0p38"

import abc
import io
import logging

# optparse exist in python2.6 up to python 3.8. Do not use argparse, because it will not run with python2.6
import optparse  # pylint: disable=W0402
import os
import platform
import re
import stat
import subprocess
import sys
import tempfile

try:
    from collections.abc import (  # noqa: F401 # pylint: disable=unused-import
        Callable,
        Iterable,
        Sequence,
    )
    from typing import Any  # noqa: F401 # pylint: disable=unused-import
except ImportError:
    # We need typing only for testing
    pass

# For Python 3 sys.stdout creates \r\n as newline for Windows.
# Checkmk can't handle this therefore we rewrite sys.stdout to a new_stdout function.
# If you want to use the old behaviour just use old_stdout.
if sys.version_info[0] >= 3:
    new_stdout = io.TextIOWrapper(
        sys.stdout.buffer,
        newline="\n",
        encoding=sys.stdout.encoding,
        errors=sys.stdout.errors,
    )
    old_stdout, sys.stdout = sys.stdout, new_stdout

OS = platform.system()
IS_LINUX = OS == "Linux"
IS_WINDOWS = OS == "Windows"
LOGGER = logging.getLogger(__name__)
LINUX_PROCESS_MATCH_PATTERNS = [
    re.compile(pattern)
    for pattern in [
        "(.*)bin/postgres(.*)",
        "(.*)bin/postmaster(.*)",
        "(.*)bin/edb-postgres(.*)",
        "^[0-9]+ postgres(.*)",
        "^[0-9]+ postmaster(.*)",
        "^[0-9]+ edb-postgres(.*)",
    ]
]
WINDOWS_PROCESS_MATCH_PATTERNS = [
    re.compile(pattern)
    for pattern in [
        r"(.*)bin\\postgres(.*)",
        r"(.*)bin\\postmaster(.*)",
        r"(.*)bin\\edb-postgres(.*)",
    ]
]

if sys.version_info[0] >= 3:
    UTF_8_NEWLINE_CHARS = re.compile(r"[\n\r\u2028\u000B\u0085\u2028\u2029]+")
else:
    UTF_8_NEWLINE_CHARS = re.compile(u"[\u000A\u000D\u2028\u000B\u0085\u2028\u2029]+")  # fmt: skip


class OSNotImplementedError(NotImplementedError):
    def __str__(self):
        # type: () -> str
        return "The OS type ({}) is not yet implemented.".format(platform.system())


if IS_LINUX:
    import resource
elif IS_WINDOWS:
    import time
else:
    raise OSNotImplementedError


# for compatibility with python 2.6
def subprocess_check_output(args):
    return subprocess.Popen(args, stdout=subprocess.PIPE).communicate()[0]


# Borrowed from six
def ensure_str(s):
    if sys.version_info[0] >= 3:
        if isinstance(s, bytes):
            return s.decode("utf-8")
    else:
        if isinstance(s, unicode):  # pylint: disable=undefined-variable
            return s.encode("utf-8")
    return s


class PostgresPsqlError(RuntimeError):
    pass


class PostgresBase:
    """
    Base class for x-plattform postgres queries
    :param db_user: The postgres db user
    :param instance: Pass an instance, in case of monitoring a server with multiple instances

    All abstract methods must have individual implementation depending on the OS type
    which runs postgres.
    All non-abstract methods are meant to work on all OS types which were subclassed.
    """

    __metaclass__ = abc.ABCMeta
    _supported_pg_versions = ["12", "15"]

    def __init__(self, db_user, pg_binary_path, instance, process_match_patterns):
        # type: (str, str | None, dict, Sequence[re.Pattern]) -> None
        self.db_user = db_user
        self.name = instance["name"]
        self.pg_user = instance["pg_user"]
        self.pg_port = instance["pg_port"]
        self.pg_database = instance["pg_database"]
        self.pg_passfile = instance.get("pg_passfile", "")
        self.pg_version = instance.get("pg_version")
        self.my_env = os.environ.copy()
        self.my_env["PGPASSFILE"] = instance.get("pg_passfile", "")
        self.psql_binary_name = "psql"
        if pg_binary_path is None:
            self.psql_binary_path = self.get_psql_binary_path()
        else:
            self.psql_binary_path = pg_binary_path
        self.psql_binary_dirname = self.get_psql_binary_dirname()
        self.conn_time = ""  # For caching as conn_time and version are in one query
        self.process_match_patterns = process_match_patterns

    @abc.abstractmethod
    def run_sql_as_db_user(
        self,
        sql_cmd,
        extra_args="",
        field_sep=";",
        quiet=True,
        rows_only=True,
        mixed_cmd=False,
    ):
        # type: (str, str, str, bool, bool, bool) -> str
        """This method implements the system specific way to call the psql interface"""

    @abc.abstractmethod
    def get_psql_binary_path(self):
        """This method returns the system specific psql binary and its path"""

    @abc.abstractmethod
    def get_psql_binary_dirname(self):
        """This method returns the system specific psql binary and its path"""

    @abc.abstractmethod
    def get_instances(self):
        """Gets all instances"""

    @abc.abstractmethod
    def get_stats(self, databases):
        """Get the stats"""

    @abc.abstractmethod
    def get_version_and_connection_time(self):
        """Get the pg version and the time for the query connection"""

    @abc.abstractmethod
    def get_bloat(self, databases, numeric_version):
        """Get the db bloats"""

    def get_databases(self):
        """Gets all non template databases"""
        sql_cmd = "SELECT datname FROM pg_database WHERE datistemplate = false;"
        out = self.run_sql_as_db_user(sql_cmd)
        return out.replace("\r", "").split("\n")

    def get_server_version(self):
        """Gets the server version"""
        out = self.run_sql_as_db_user("SHOW server_version;")
        if out == "":
            raise PostgresPsqlError("psql connection returned with no data")
        version_as_string = out.split()[0]
        # Use Major and Minor version for float casting: "12.6.4" -> 12.6
        return float(".".join(version_as_string.split(".")[0:2]))

    def get_condition_vars(self, numeric_version):
        """Gets condition variables for other queries"""
        if numeric_version > 9.2:
            return "state", "'idle'"
        return "current_query", "'<IDLE>'"

    def get_connections(self):
        """Gets the the idle and active connections"""
        connection_sql_cmd = (
            "SELECT datname, "
            "(SELECT setting AS mc FROM pg_settings "
            "WHERE name = 'max_connections') AS mc, "
            "COUNT(state) FILTER (WHERE state='idle') AS idle, "
            "COUNT(state) FILTER (WHERE state='active') AS active "
            "FROM pg_stat_activity group by 1;"
        )

        return self.run_sql_as_db_user(
            connection_sql_cmd, rows_only=False, extra_args="-P footer=off"
        )

    def get_sessions(self, row, idle):
        """Gets idle and open sessions"""
        condition = "%s = %s" % (row, idle)

        sql_cmd = (
            "SELECT %s, count(*) FROM pg_stat_activity " "WHERE %s IS NOT NULL GROUP BY (%s);"
        ) % (condition, row, condition)

        out = self.run_sql_as_db_user(
            sql_cmd, quiet=False, extra_args="--variable ON_ERROR_STOP=1", field_sep=" "
        )

        # line with number of idle sessions is sometimes missing on Postgres 8.x. This can lead
        # to an altogether empty section and thus the check disappearing.
        if not out.startswith("t"):
            out += "\nt 0"
        return out

    def get_query_duration(self, numeric_version):
        """Gets the query duration"""
        # Previously part of simple_queries

        if numeric_version > 9.2:
            querytime_sql_cmd = (
                "SELECT datname, datid, usename, client_addr, state AS state, "
                "COALESCE(ROUND(EXTRACT(epoch FROM now()-query_start)),0) "
                "AS seconds, pid, "
                "query "
                "AS current_query FROM pg_stat_activity "
                "WHERE (query_start IS NOT NULL AND "
                "(state NOT LIKE 'idle%' OR state IS NULL)) "
                "ORDER BY query_start, pid DESC;"
            )

        else:
            querytime_sql_cmd = (
                "SELECT datname, datid, usename, client_addr, '' AS state,"
                " COALESCE(ROUND(EXTRACT(epoch FROM now()-query_start)),0) "
                "AS seconds, procpid as pid, query AS current_query "
                "FROM pg_stat_activity WHERE "
                "(query_start IS NOT NULL AND current_query NOT LIKE '<IDLE>%') "
                "ORDER BY query_start, procpid DESC;"
            )

        return self.run_sql_as_db_user(
            querytime_sql_cmd, rows_only=False, extra_args="-P footer=off"
        )

    def get_stat_database(self):
        """Gets the database stats"""
        # Previously part of simple_queries
        sql_cmd = (
            "SELECT datid, datname, numbackends, xact_commit, xact_rollback, blks_read, "
            "blks_hit, tup_returned, tup_fetched, tup_inserted, tup_updated, tup_deleted, "
            "pg_database_size(datname) AS datsize FROM pg_stat_database;"
        )
        return self.run_sql_as_db_user(sql_cmd, rows_only=False, extra_args="-P footer=off")

    def get_locks(self):
        """Get the locks"""
        # Previously part of simple_queries
        sql_cmd = (
            "SELECT datname, granted, mode FROM pg_locks l RIGHT "
            "JOIN pg_database d ON (d.oid=l.database) WHERE d.datallowconn;"
        )
        return self.run_sql_as_db_user(sql_cmd, rows_only=False, extra_args="-P footer=off")

    def get_version(self):
        """Wrapper around get_version_conn_time"""
        version, self.conn_time = self.get_version_and_connection_time()
        return version

    def get_connection_time(self):
        """
        Wrapper around get_version_conn time.
        Execute query only if conn_time wasn't already set
        """
        if self.conn_time == "":
            _, self.conn_time = self.get_version_and_connection_time()
        return self.conn_time

    def is_pg_ready(self):
        """Executes pg_isready.
        pg_isready is a utility for checking the connection status of a PostgreSQL database server.
        """

        out = subprocess_check_output(
            ["%s%spg_isready" % (self.psql_binary_dirname, os.sep), "-p", self.pg_port],
        )

        sys.stdout.write("%s\n" % ensure_str(out))

    def is_postgres_process(self, process):
        # type: (str) -> bool
        """Determine whether a process is a PostgreSQL process.

        Note that the relevant binaries are contained in PATH under Linux, so they
        may or may not be called using the full path. Starting from PostgreSQL
        verion >= 13, they are not called using the full path.

        Examples:

        1252 /usr/bin/postmaster -D /var/lib/pgsql/data
        3148 postmaster -D /var/lib/pgsql/data
        """
        return any(re.search(p, process) for p in self.process_match_patterns)

    def execute_all_queries(self):
        """Executes all queries and writes the output formatted to stdout"""
        instance = "\n[[[%s]]]" % self.name

        try:
            databases = self.get_databases()
            database_text = "\n[databases_start]\n%s\n[databases_end]" % "\n".join(databases)
            version = self.get_server_version()
            row, idle = self.get_condition_vars(version)
        except PostgresPsqlError:
            # if tcp connection to db instance failed variables are empty
            databases = ""
            database_text = ""
            version = None
            row, idle = "", ""

        out = "<<<postgres_instances>>>"
        out += instance
        out += "\n%s" % self.get_instances()
        sys.stdout.write("%s\n" % out)

        out = "<<<postgres_sessions>>>"
        if row and idle:
            out += instance
            out += "\n%s" % self.get_sessions(row, idle)
            sys.stdout.write("%s\n" % out)

        out = "<<<postgres_stat_database:sep(59)>>>"
        out += instance
        out += "\n%s" % self.get_stat_database()
        sys.stdout.write("%s\n" % out)

        out = "<<<postgres_locks:sep(59)>>>"
        if database_text:
            out += instance
            out += database_text
            out += "\n%s" % self.get_locks()
            sys.stdout.write("%s\n" % out)

        out = "<<<postgres_query_duration:sep(59)>>>"
        if version:
            out += instance
            out += database_text
            out += "\n%s" % self.get_query_duration(version)
            sys.stdout.write("%s\n" % out)

        out = "<<<postgres_connections:sep(59)>>>"
        if database_text:
            out += instance
            out += database_text
            out += "\n%s" % self.get_connections()
            sys.stdout.write("%s\n" % out)

        out = "<<<postgres_stats:sep(59)>>>"
        if databases:
            out += instance
            out += database_text
            out += "\n%s" % self.get_stats(databases)
            sys.stdout.write("%s\n" % out)

        out = "<<<postgres_version:sep(1)>>>"
        out += instance
        out += "\n%s" % self.get_version()
        sys.stdout.write("%s\n" % out)

        out = "<<<postgres_conn_time>>>"
        out += instance
        out += "\n%s" % self.get_connection_time()
        sys.stdout.write("%s\n" % out)

        out = "<<<postgres_bloat:sep(59)>>>"
        if databases and version:
            out += instance
            out += database_text
            out += "\n%s" % self.get_bloat(databases, version)
            sys.stdout.write("%s\n" % out)


def _sanitize_sql_query(out):
    # type: (bytes) -> str
    utf_8_out = ensure_str(out)
    # The sql queries may contain any char in `UTF_8_NEWLINE_CHARS`. However,
    # Checkmk only knows how to handle `\n`. Furthermore, `\n` is always
    # interpreted as a record break by Checkmk (see `parse_dbs`). This means
    # that we have to remove all newline chars, before printing the section. We
    # solve the issue in three steps.
    # - Make Postgres return the NULL byte (instead of newlines). This achieved
    #   by using the flag `-0`.
    # - Remove all newlines from whatever Postgres returns. This is safe,
    #   because of the first step.
    # - Finally, turn the NULL bytes into linebreaks, so Checkmk interprets
    #   them as record breaks.
    utf_8_out_no_new_lines = UTF_8_NEWLINE_CHARS.sub(" ", utf_8_out)
    return utf_8_out_no_new_lines.replace("\x00", "\n").rstrip()


class PostgresWin(PostgresBase):
    def run_sql_as_db_user(
        self,
        sql_cmd,
        extra_args="",
        field_sep=";",
        quiet=True,
        rows_only=True,
        mixed_cmd=False,
    ):
        # type: (str, str, str, bool | None, bool | None,bool | None) -> str
        """This method implements the system specific way to call the psql interface"""
        extra_args += " -U %s" % self.pg_user
        extra_args += " -d %s" % self.pg_database
        extra_args += " -p %s" % self.pg_port

        if quiet:
            extra_args += " -q"
        if rows_only:
            extra_args += " -t"

        if mixed_cmd:
            cmd_str = 'cmd /c echo %s | cmd /c ""%s" -X %s -A -0 -F"%s" -U %s"' % (
                sql_cmd,
                self.psql_binary_path,
                extra_args,
                field_sep,
                self.db_user,
            )

        else:
            cmd_str = 'cmd /c ""%s" -X %s -A -0 -F"%s" -U %s -c "%s"" ' % (
                self.psql_binary_path,
                extra_args,
                field_sep,
                self.db_user,
                sql_cmd,
            )
        proc = subprocess.Popen(  # pylint: disable=consider-using-with
            cmd_str,
            env=self.my_env,
            stdout=subprocess.PIPE,
        )
        out = proc.communicate()[0]
        return _sanitize_sql_query(out)

    @staticmethod
    def _call_wmic_logicaldisk():
        # type: () -> str
        return ensure_str(
            subprocess_check_output(
                [
                    "wmic",
                    "logicaldisk",
                    "get",
                    "deviceid",
                ]
            )
        )

    @staticmethod
    def _parse_wmic_logicaldisk(wmic_output):
        # type: (str) -> Iterable[str]
        for drive in wmic_output.replace("DeviceID", "").split(":")[:-1]:
            yield drive.strip()

    @classmethod
    def _logical_drives(cls):
        # type: () -> Iterable[str]
        for drive in cls._parse_wmic_logicaldisk(cls._call_wmic_logicaldisk()):
            yield drive

    def get_psql_binary_path(self):
        # type: () -> str
        """This method returns the system specific psql interface binary as callable string"""
        if self.pg_version is None:
            # This is a fallback in case the user does not have any instances
            # configured.
            return self._default_psql_binary_path()
        return self._psql_path(self.pg_version)

    def _default_psql_binary_path(self):
        # type: () -> str
        for pg_version in self._supported_pg_versions:
            try:
                return self._psql_path(pg_version)
            except IOError as e:
                ioerr = e
                continue
        raise ioerr

    def _psql_path(self, pg_version):
        # type: (str) -> str

        # TODO: Make this more clever...
        for drive in self._logical_drives():
            for program_path in [
                "Program Files\\PostgreSQL",
                "Program Files (x86)\\PostgreSQL",
                "PostgreSQL",
            ]:
                psql_path = (
                    "{drive}:\\{program_path}\\{pg_version}\\bin\\{psql_binary_name}.exe".format(
                        drive=drive,
                        program_path=program_path,
                        pg_version=pg_version.split(".", 1)[
                            0
                        ],  # Only the major version is relevant
                        psql_binary_name=self.psql_binary_name,
                    )
                )
                if os.path.isfile(psql_path):
                    return psql_path

        raise IOError("Could not determine %s bin and its path." % self.psql_binary_name)

    def get_psql_binary_dirname(self):
        # type: () -> str
        return self.psql_binary_path.rsplit("\\", 1)[0]

    def get_instances(self):
        # type: () -> str
        """Gets all instances"""

        taskslist = ensure_str(
            subprocess_check_output(
                ["wmic", "process", "get", "processid,commandline", "/format:list"]
            )
        ).split("\r\r\n\r\r\n\r\r\n")

        out = ""
        for task in taskslist:
            task = task.lstrip().rstrip()
            if len(task) == 0:
                continue
            cmd_line, PID = task.split("\r\r\n")
            cmd_line = cmd_line.split("CommandLine=")[1]
            PID = PID.split("ProcessId=")[1]
            if self.is_postgres_process(cmd_line):
                if task.find(self.name) != -1:
                    out += "%s %s\n" % (PID, cmd_line)
        return out.rstrip()

    def get_stats(self, databases):
        # type: (list[str]) -> str
        """Get the stats"""
        # The next query had to be slightly modified:
        # As cmd.exe interprets > as redirect and we need <> as "not equal", this was changed to
        # != as it has the same SQL implementation
        sql_cmd_lastvacuum = (
            "SELECT "
            "current_database() AS datname, nspname AS sname, "
            "relname AS tname, CASE WHEN v IS NULL THEN -1 "
            "ELSE round(extract(epoch FROM v)) END AS vtime, "
            "CASE WHEN g IS NULL THEN -1 ELSE round(extract(epoch FROM g)) "
            "END AS atime FROM (SELECT nspname, relname, "
            "GREATEST(pg_stat_get_last_vacuum_time(c.oid), "
            "pg_stat_get_last_autovacuum_time(c.oid)) AS v, "
            "GREATEST(pg_stat_get_last_analyze_time(c.oid), "
            "pg_stat_get_last_autoanalyze_time(c.oid)) AS g "
            "FROM pg_class c, pg_namespace n WHERE relkind = 'r' "
            "AND n.oid = c.relnamespace AND n.nspname != 'information_schema' "
            "ORDER BY 3) AS foo;"
        )

        query = "\\pset footer off \\\\ BEGIN;SET statement_timeout=30000;COMMIT;"

        cur_rows_only = False
        for cnt, database in enumerate(databases):
            query = "%s \\c %s \\\\ %s" % (query, database, sql_cmd_lastvacuum)
            if cnt == 0:
                query = "%s \\pset tuples_only on" % query

        return self.run_sql_as_db_user(query, mixed_cmd=True, rows_only=cur_rows_only)

    def get_version_and_connection_time(self):
        # type: () -> tuple[str, str]
        """Get the pg version and the time for the query connection"""
        cmd = "SELECT version() AS v"

        # TODO: Verify this time measurement
        start_time = time.time()
        out = self.run_sql_as_db_user(cmd)
        diff = time.time() - start_time
        return out, "%.3f" % diff

    def get_bloat(self, databases, numeric_version):
        # type: (list[Any], float) -> str
        """Get the db bloats"""
        # Bloat index and tables
        # Supports versions <9.0, >=9.0
        # This huge query has been gratefully taken from Greg Sabino Mullane's check_postgres.pl
        if numeric_version > 9.0:
            # TODO: Reformat query in a more readable way
            # Here as well: "<" and ">" must be escaped. As we're using meta-command + SQL in one
            # query, we need to use pipe. Due to Window's cmd behaviour, we need to escape those
            # symbols with 3 (!) carets. See https://ss64.com/nt/syntax-redirection.html
            bloat_query = (
                "SELECT current_database() AS db, "
                "schemaname, tablename, reltuples::bigint "
                "AS tups, relpages::bigint AS pages, otta, "
                "ROUND(CASE WHEN sml.relpages=0 "
                "OR sml.relpages=otta THEN 0.0 "
                "ELSE (sml.relpages-otta::numeric)/sml.relpages END,3) AS tbloat, "
                "CASE WHEN relpages ^^^< otta THEN 0 "
                "ELSE relpages::bigint - otta END AS wastedpages, "
                "CASE WHEN relpages ^^^< otta THEN 0 ELSE bs*(sml.relpages-otta)::bigint END "
                "AS wastedbytes, CASE WHEN relpages ^^^< otta THEN 0 "
                "ELSE (bs*(relpages-otta))::bigint END "
                "AS wastedsize, iname, ituples::bigint AS itups, ipages::bigint "
                "AS ipages, iotta, ROUND(CASE WHEN ipages=0 OR ipages^^^<=iotta THEN 0.0 "
                "ELSE (ipages-iotta::numeric)/ipages END,3) AS ibloat, "
                "CASE WHEN ipages ^^^< iotta THEN 0 ELSE ipages::bigint - iotta END "
                "AS wastedipages, CASE WHEN ipages ^^^< iotta THEN 0 ELSE bs*(ipages-iotta) "
                "END AS wastedibytes, CASE WHEN ipages ^^^< iotta THEN 0 "
                "ELSE (bs*(ipages-iotta))::bigint END AS wastedisize, "
                "CASE WHEN relpages ^^^< otta THEN CASE WHEN ipages ^^^< iotta THEN 0 "
                "ELSE bs*(ipages-iotta::bigint) END ELSE CASE WHEN ipages ^^^< iotta "
                "THEN bs*(relpages-otta::bigint) "
                "ELSE bs*(relpages-otta::bigint + ipages-iotta::bigint) "
                "END END AS totalwastedbytes "
                "FROM ( SELECT nn.nspname AS schemaname, cc.relname AS tablename, "
                "COALESCE(cc.reltuples,0) AS reltuples, COALESCE(cc.relpages,0) "
                "AS relpages, COALESCE(bs,0) AS bs, "
                "COALESCE(CEIL((cc.reltuples*((datahdr+ma- (CASE WHEN datahdr%ma=0 "
                "THEN ma ELSE datahdr%ma END))+nullhdr2+4))/(bs-20::float)),0) "
                "AS otta, COALESCE(c2.relname,'?') AS iname, COALESCE(c2.reltuples,0) "
                "AS ituples, COALESCE(c2.relpages,0) "
                "AS ipages, COALESCE(CEIL((c2.reltuples*(datahdr-12))/(bs-20::float)),0) "
                "AS iotta FROM pg_class cc "
                "JOIN pg_namespace nn ON cc.relnamespace = nn.oid "
                "AND nn.nspname != 'information_schema' LEFT JOIN "
                "( SELECT ma,bs,foo.nspname,foo.relname, "
                "(datawidth+(hdr+ma-(case when hdr%ma=0 "
                "THEN ma ELSE hdr%ma END)))::numeric AS datahdr, "
                "(maxfracsum*(nullhdr+ma-(case when nullhdr%ma=0 THEN ma "
                "ELSE nullhdr%ma END))) AS nullhdr2 "
                "FROM ( SELECT ns.nspname, tbl.relname, hdr, ma, bs, "
                "SUM((1-coalesce(null_frac,0))*coalesce(avg_width, 2048)) AS datawidth, "
                "MAX(coalesce(null_frac,0)) AS maxfracsum, hdr+( SELECT 1+count(*)/8 "
                "FROM pg_stats s2 WHERE null_frac != 0 AND s2.schemaname = ns.nspname "
                "AND s2.tablename = tbl.relname ) AS nullhdr FROM pg_attribute att "
                "JOIN pg_class tbl ON att.attrelid = tbl.oid JOIN pg_namespace ns "
                "ON ns.oid = tbl.relnamespace LEFT JOIN pg_stats s "
                "ON s.schemaname=ns.nspname AND s.tablename = tbl.relname AND "
                "s.inherited=false AND s.attname=att.attname, "
                "( SELECT (SELECT current_setting('block_size')::numeric) AS bs, CASE WHEN "
                "SUBSTRING(SPLIT_PART(v, ' ', 2) FROM '#\\[0-9]+.[0-9]+#\\%' for '#') "
                "IN ('8.0','8.1','8.2') THEN 27 ELSE 23 END AS hdr, CASE "
                "WHEN v ~ 'mingw32' OR v ~ '64-bit' THEN 8 ELSE 4 END AS ma "
                "FROM (SELECT version() AS v) AS foo ) AS constants WHERE att.attnum ^^^> 0 "
                "AND tbl.relkind='r' GROUP BY 1,2,3,4,5 ) AS foo ) AS rs "
                "ON cc.relname = rs.relname AND nn.nspname = rs.nspname LEFT "
                "JOIN pg_index i ON indrelid = cc.oid LEFT JOIN pg_class c2 "
                "ON c2.oid = i.indexrelid ) AS sml WHERE sml.relpages - otta ^^^> 0 "
                "OR ipages - iotta ^^^> 10 ORDER BY totalwastedbytes DESC LIMIT 10;"
            )

        else:
            bloat_query = (
                "SELECT "
                "current_database() AS db, schemaname, tablename, "
                "reltuples::bigint AS tups, relpages::bigint AS pages, otta, "
                "ROUND(CASE WHEN sml.relpages=0 OR sml.relpages=otta THEN 0.0 "
                "ELSE (sml.relpages-otta::numeric)/sml.relpages END,3) AS tbloat, "
                "CASE WHEN relpages ^^^< otta THEN 0 ELSE relpages::bigint - otta END "
                "AS wastedpages, CASE WHEN relpages ^^^< otta THEN 0 "
                "ELSE bs*(sml.relpages-otta)::bigint END AS wastedbytes, "
                "CASE WHEN relpages ^^^< otta THEN '0 bytes'::text "
                "ELSE (bs*(relpages-otta))::bigint || ' bytes' END AS wastedsize, "
                "iname, ituples::bigint AS itups, ipages::bigint AS ipages, iotta, "
                "ROUND(CASE WHEN ipages=0 OR ipages^^^<=iotta THEN 0.0 ELSE "
                "(ipages-iotta::numeric)/ipages END,3) AS ibloat, "
                "CASE WHEN ipages ^^^< iotta THEN 0 ELSE ipages::bigint - iotta END "
                "AS wastedipages, CASE WHEN ipages ^^^< iotta THEN 0 "
                "ELSE bs*(ipages-iotta) END AS wastedibytes, "
                "CASE WHEN ipages ^^^< iotta THEN '0 bytes' ELSE "
                "(bs*(ipages-iotta))::bigint || ' bytes' END AS wastedisize, CASE "
                "WHEN relpages ^^^< otta THEN CASE WHEN ipages ^^^< iotta THEN 0 "
                "ELSE bs*(ipages-iotta::bigint) END ELSE CASE WHEN ipages ^^^< iotta "
                "THEN bs*(relpages-otta::bigint) "
                "ELSE bs*(relpages-otta::bigint + ipages-iotta::bigint) END "
                "END AS totalwastedbytes FROM (SELECT nn.nspname AS schemaname, "
                "cc.relname AS tablename, COALESCE(cc.reltuples,0) AS reltuples, "
                "COALESCE(cc.relpages,0) AS relpages, COALESCE(bs,0) AS bs, "
                "COALESCE(CEIL((cc.reltuples*((datahdr+ma-(CASE WHEN datahdr%ma=0 "
                "THEN ma ELSE datahdr%ma END))+nullhdr2+4))/(bs-20::float)),0) AS otta, "
                "COALESCE(c2.relname,'?') AS iname, COALESCE(c2.reltuples,0) AS ituples, "
                "COALESCE(c2.relpages,0) AS ipages, "
                "COALESCE(CEIL((c2.reltuples*(datahdr-12))/(bs-20::float)),0) AS iotta "
                "FROM pg_class cc JOIN pg_namespace nn ON cc.relnamespace = nn.oid "
                "AND nn.nspname ^^^<^^^> 'information_schema' LEFT "
                "JOIN(SELECT ma,bs,foo.nspname,foo.relname, "
                "(datawidth+(hdr+ma-(case when hdr%ma=0 THEN ma "
                "ELSE hdr%ma END)))::numeric AS datahdr, "
                "(maxfracsum*(nullhdr+ma-(case when nullhdr%ma=0 THEN ma "
                "ELSE nullhdr%ma END))) AS nullhdr2 "
                "FROM (SELECT ns.nspname, tbl.relname, hdr, ma, bs, "
                "SUM((1-coalesce(null_frac,0))*coalesce(avg_width, 2048)) "
                "AS datawidth, MAX(coalesce(null_frac,0)) AS maxfracsum, hdr+("
                "SELECT 1+count(*)/8 FROM pg_stats s2 WHERE null_frac^^^<^^^>0 "
                "AND s2.schemaname = ns.nspname AND s2.tablename = tbl.relname) "
                "AS nullhdr FROM pg_attribute att JOIN pg_class tbl "
                "ON att.attrelid = tbl.oid JOIN pg_namespace ns ON "
                "ns.oid = tbl.relnamespace LEFT JOIN pg_stats s "
                "ON s.schemaname=ns.nspname AND s.tablename = tbl.relname "
                "AND s.attname=att.attname, (SELECT ("
                "SELECT current_setting('block_size')::numeric) AS bs, CASE WHEN "
                "SUBSTRING(SPLIT_PART(v, ' ', 2) FROM '#\"[0-9]+.[0-9]+#\"%' for '#') "
                "IN ('8.0','8.1','8.2') THEN 27 ELSE 23 END AS hdr, CASE "
                "WHEN v ~ 'mingw32' OR v ~ '64-bit' THEN 8 ELSE 4 END AS ma "
                "FROM (SELECT version() AS v) AS foo) AS constants WHERE att.attnum ^^^> 0 "
                "AND tbl.relkind='r' GROUP BY 1,2,3,4,5) AS foo) AS rs ON "
                "cc.relname = rs.relname AND nn.nspname = rs.nspname LEFT JOIN pg_index i "
                "ON indrelid = cc.oid LEFT JOIN pg_class c2 ON c2.oid = i.indexrelid) "
                "AS sml WHERE sml.relpages - otta ^^^> 0 OR ipages - iotta ^^^> 10 ORDER "
                "BY totalwastedbytes DESC LIMIT 10;"
            )

        cur_rows_only = False
        output = ""
        for idx, database in enumerate(databases):
            query = "\\pset footer off \\\\ \\c %s \\\\ %s" % (database, bloat_query)
            if idx == 0:
                query = "%s \\pset tuples_only on" % query
            output += self.run_sql_as_db_user(query, mixed_cmd=True, rows_only=cur_rows_only)
            cur_rows_only = True
        return output


class PostgresLinux(PostgresBase):
    def _run_sql_as_db_user(
        self, sql_file_path, extra_args="", field_sep=";", quiet=True, rows_only=True
    ):
        # type: (str, str, str, bool, bool) -> str
        base_cmd_list = [
            "su",
            "-",
            self.db_user,
            "-c",
            r"""PGPASSFILE=%s %s -X %s -A0 -F'%s' -f %s""",
        ]
        extra_args += " -U %s" % self.pg_user
        extra_args += " -d %s" % self.pg_database
        extra_args += " -p %s" % self.pg_port

        if quiet:
            extra_args += " -q"
        if rows_only:
            extra_args += " -t"

        base_cmd_list[-1] = base_cmd_list[-1] % (
            self.pg_passfile,
            self.psql_binary_path,
            extra_args,
            field_sep,
            sql_file_path,
        )
        proc = subprocess.Popen(  # pylint: disable=consider-using-with
            base_cmd_list, env=self.my_env, stdout=subprocess.PIPE
        )
        return _sanitize_sql_query(proc.communicate()[0])

    def run_sql_as_db_user(
        self, sql_cmd, extra_args="", field_sep=";", quiet=True, rows_only=True, mixed_cmd=False
    ):
        # type: (str, str, str, bool, bool, bool) -> str
        with tempfile.NamedTemporaryFile(delete=True) as tmp:
            tmp.write(sql_cmd.encode("utf-8"))
            # set cursor to the beginning of the file
            tmp.seek(0)
            # We use 'psql ... -f <FILE_PATH>', the tmp file has to be readable to all users,
            # ie. stat.S_IROTH
            os.chmod(tmp.name, stat.S_IROTH)
            return self._run_sql_as_db_user(
                tmp.name,
                extra_args=extra_args,
                field_sep=field_sep,
                quiet=quiet,
                rows_only=rows_only,
            )

    def get_psql_binary_path(self):
        # type: () -> str
        """If possible, do not use the binary from PATH directly. This could lead to a generic
        binary that is not able to find the correct UNIX socket. See SUP-11729.
        In case the user does not have any instances configured or if the assembled path does not
        exist, fallback to the PATH location. See SUP-12878"""

        if self.pg_version is None:
            return self._default_psql_binary_path()

        binary_path = "/{pg_database}/{pg_version}/bin/{psql_binary_name}".format(
            pg_database=self.pg_database,
            pg_version=self.pg_version,
            psql_binary_name=self.psql_binary_name,
        )

        if not os.path.isfile(binary_path):
            return self._default_psql_binary_path()
        return binary_path

    def _default_psql_binary_path(self):
        # type: () -> str
        proc = subprocess.Popen(  # pylint: disable=consider-using-with
            ["which", self.psql_binary_name], stdout=subprocess.PIPE
        )
        out = ensure_str(proc.communicate()[0])

        if proc.returncode != 0:
            raise RuntimeError("Could not determine %s executable." % self.psql_binary_name)

        return out.strip()

    def get_psql_binary_dirname(self):
        # type: () -> str
        return self.psql_binary_path.rsplit("/", 1)[0]

    def _matches_main(self, proc):
        # type: (str) -> bool
        # the data directory for the instance "main" is not called "main" but "data" on some
        # platforms
        return self.name == "main" and "data" in proc

    def _filter_instances(self, procs_list, proc_sensitive_filter):
        # type: (list[str], Callable[[str], bool]) -> list[str]
        return [
            proc
            for proc in procs_list
            if self.is_postgres_process(proc)
            and (proc_sensitive_filter(proc) or self._matches_main(proc))
        ]

    def get_instances(self):
        # type: () -> str

        procs_list = ensure_str(
            subprocess_check_output(["ps", "h", "-eo", "pid:1,command:1"])
        ).split("\n")

        # trying to address setups in SUP-12878 (instance "A01" -> process "A01") and SUP-12539
        # (instance "epcomt" -> process "EPCOMT") as well as possible future setups (containing
        # instances where the names only differ in case (e.g. "instance" and "INSTANCE"))
        procs = self._filter_instances(procs_list, proc_sensitive_filter=lambda p: self.name in p)
        if not procs:
            procs = self._filter_instances(
                procs_list,
                proc_sensitive_filter=lambda p: self.name.lower() in p.lower(),
            )
        out = "\n".join(procs)
        return out.rstrip()

    def get_query_duration(self, numeric_version):
        # type: (float) -> str
        # Previously part of simple_queries

        if numeric_version > 9.2:
            querytime_sql_cmd = (
                "SELECT datname, datid, usename, client_addr, state AS state, "
                "COALESCE(ROUND(EXTRACT(epoch FROM now()-query_start)),0) "
                "AS seconds, pid, "
                "query AS current_query FROM pg_stat_activity "
                "WHERE (query_start IS NOT NULL AND "
                "(state NOT LIKE 'idle%' OR state IS NULL)) "
                "ORDER BY query_start, pid DESC;"
            )

        else:
            querytime_sql_cmd = (
                "SELECT datname, datid, usename, client_addr, '' AS state, "
                "COALESCE(ROUND(EXTRACT(epoch FROM now()-query_start)),0) "
                "AS seconds, procpid as pid, "
                "query "
                "AS current_query FROM pg_stat_activity WHERE "
                "(query_start IS NOT NULL AND current_query NOT LIKE '<IDLE>%') "
                "ORDER BY query_start, procpid DESC;"
            )

        return self.run_sql_as_db_user(
            querytime_sql_cmd, rows_only=False, extra_args="-P footer=off"
        )

    def get_stats(self, databases):
        # type: (list[str]) -> str
        sql_cmd_lastvacuum = (
            "SELECT "
            "current_database() AS datname, nspname AS sname, "
            "relname AS tname, CASE WHEN v IS NULL THEN -1 "
            "ELSE round(extract(epoch FROM v)) END AS vtime, "
            "CASE WHEN g IS NULL THEN -1 ELSE round(extract(epoch FROM g)) "
            "END AS atime FROM (SELECT nspname, relname, "
            "GREATEST(pg_stat_get_last_vacuum_time(c.oid), "
            "pg_stat_get_last_autovacuum_time(c.oid)) AS v, "
            "GREATEST(pg_stat_get_last_analyze_time(c.oid), "
            "pg_stat_get_last_autoanalyze_time(c.oid)) AS g "
            "FROM pg_class c, pg_namespace n WHERE relkind = 'r' "
            "AND n.oid = c.relnamespace AND n.nspname <> 'information_schema' "
            "ORDER BY 3) AS foo;"
        )

        query = "\\pset footer off\nBEGIN;\nSET statement_timeout=30000;\nCOMMIT;"

        cur_rows_only = False
        for cnt, database in enumerate(databases):
            query = "%s\n\\c %s\n%s" % (query, database, sql_cmd_lastvacuum)
            if cnt == 0:
                query = "%s\n\\pset tuples_only on" % query

        return self.run_sql_as_db_user(query, mixed_cmd=True, rows_only=cur_rows_only)

    def get_version_and_connection_time(self):
        # type: () -> tuple[str, str]
        cmd = "SELECT version() AS v"
        usage_start = resource.getrusage(resource.RUSAGE_CHILDREN)
        out = self.run_sql_as_db_user(cmd)
        usage_end = resource.getrusage(resource.RUSAGE_CHILDREN)

        sys_time = usage_end.ru_stime - usage_start.ru_stime
        usr_time = usage_end.ru_utime - usage_start.ru_utime
        real = sys_time + usr_time

        return out, "%.3f" % real

    def get_bloat(self, databases, numeric_version):
        # type: (list[Any], float) -> str
        # Bloat index and tables
        # Supports versions <9.0, >=9.0
        # This huge query has been gratefully taken from Greg Sabino Mullane's check_postgres.pl
        if numeric_version > 9.0:
            # TODO: Reformat query in a more readable way
            bloat_query = (
                "SELECT current_database() AS db, schemaname, tablename, reltuples::bigint "
                "AS tups, relpages::bigint AS pages, otta, ROUND(CASE WHEN sml.relpages=0 "
                "OR sml.relpages=otta THEN 0.0 "
                "ELSE (sml.relpages-otta::numeric)/sml.relpages END,3) AS tbloat, "
                "CASE WHEN relpages < otta THEN 0 "
                "ELSE relpages::bigint - otta END AS wastedpages, "
                "CASE WHEN relpages < otta THEN 0 ELSE bs*(sml.relpages-otta)::bigint END "
                "AS wastedbytes, CASE WHEN relpages < otta THEN 0 "
                "ELSE (bs*(relpages-otta))::bigint END "
                "AS wastedsize, iname, ituples::bigint AS itups, ipages::bigint "
                "AS ipages, iotta, ROUND(CASE WHEN ipages=0 OR ipages<=iotta THEN 0.0 "
                "ELSE (ipages-iotta::numeric)/ipages END,3) AS ibloat, "
                "CASE WHEN ipages < iotta THEN 0 ELSE ipages::bigint - iotta END "
                "AS wastedipages, CASE WHEN ipages < iotta THEN 0 ELSE bs*(ipages-iotta) "
                "END AS wastedibytes, CASE WHEN ipages < iotta THEN 0 "
                "ELSE (bs*(ipages-iotta))::bigint END AS wastedisize, "
                "CASE WHEN relpages < otta THEN CASE WHEN ipages < iotta THEN 0 "
                "ELSE bs*(ipages-iotta::bigint) END ELSE CASE WHEN ipages < iotta "
                "THEN bs*(relpages-otta::bigint) "
                "ELSE bs*(relpages-otta::bigint + ipages-iotta::bigint) "
                "END END AS totalwastedbytes "
                "FROM ( SELECT nn.nspname AS schemaname, cc.relname AS tablename, "
                "COALESCE(cc.reltuples,0) AS reltuples, COALESCE(cc.relpages,0) "
                "AS relpages, COALESCE(bs,0) AS bs, "
                "COALESCE(CEIL((cc.reltuples*((datahdr+ma- (CASE WHEN datahdr%ma=0 "
                "THEN ma ELSE datahdr%ma END))+nullhdr2+4))/(bs-20::float)),0) "
                "AS otta, COALESCE(c2.relname,'?') AS iname, COALESCE(c2.reltuples,0) "
                "AS ituples, COALESCE(c2.relpages,0) "
                "AS ipages, COALESCE(CEIL((c2.reltuples*(datahdr-12))/(bs-20::float)),0) "
                "AS iotta FROM pg_class cc "
                "JOIN pg_namespace nn ON cc.relnamespace = nn.oid "
                "AND nn.nspname <> 'information_schema' LEFT JOIN "
                "( SELECT ma,bs,foo.nspname,foo.relname, "
                "(datawidth+(hdr+ma-(case when hdr%ma=0 "
                "THEN ma ELSE hdr%ma END)))::numeric AS datahdr, "
                "(maxfracsum*(nullhdr+ma-(case when nullhdr%ma=0 THEN ma "
                "ELSE nullhdr%ma END))) AS nullhdr2 "
                "FROM ( SELECT ns.nspname, tbl.relname, hdr, ma, bs, "
                "SUM((1-coalesce(null_frac,0))*coalesce(avg_width, 2048)) AS datawidth, "
                "MAX(coalesce(null_frac,0)) AS maxfracsum, hdr+( SELECT 1+count(*)/8 "
                "FROM pg_stats s2 WHERE null_frac<>0 AND s2.schemaname = ns.nspname "
                "AND s2.tablename = tbl.relname ) AS nullhdr FROM pg_attribute att "
                "JOIN pg_class tbl ON att.attrelid = tbl.oid JOIN pg_namespace ns "
                "ON ns.oid = tbl.relnamespace LEFT JOIN pg_stats s "
                "ON s.schemaname=ns.nspname AND s.tablename = tbl.relname AND "
                "s.inherited=false AND s.attname=att.attname, "
                "( SELECT (SELECT current_setting('block_size')::numeric) AS bs, CASE WHEN "
                "SUBSTRING(SPLIT_PART(v, ' ', 2) FROM '#\\[0-9]+.[0-9]+#\\%' for '#') "
                "IN ('8.0','8.1','8.2') THEN 27 ELSE 23 END AS hdr, CASE "
                "WHEN v ~ 'mingw32' OR v ~ '64-bit' THEN 8 ELSE 4 END AS ma "
                "FROM (SELECT version() AS v) AS foo ) AS constants WHERE att.attnum > 0 "
                "AND tbl.relkind='r' GROUP BY 1,2,3,4,5 ) AS foo ) AS rs "
                "ON cc.relname = rs.relname AND nn.nspname = rs.nspname LEFT JOIN pg_index i "
                "ON indrelid = cc.oid LEFT JOIN pg_class c2 ON c2.oid = i.indexrelid ) "
                "AS sml WHERE sml.relpages - otta > 0 OR ipages - iotta > 10 ORDER "
                "BY totalwastedbytes DESC LIMIT 10;"
            )
        else:
            bloat_query = (
                "SELECT "
                "current_database() AS db, schemaname, tablename, "
                "reltuples::bigint AS tups, relpages::bigint AS pages, otta, "
                "ROUND(CASE WHEN sml.relpages=0 OR sml.relpages=otta THEN 0.0 "
                "ELSE (sml.relpages-otta::numeric)/sml.relpages END,3) AS tbloat, "
                "CASE WHEN relpages < otta THEN 0 ELSE relpages::bigint - otta END "
                "AS wastedpages, CASE WHEN relpages < otta THEN 0 "
                "ELSE bs*(sml.relpages-otta)::bigint END AS wastedbytes, "
                "CASE WHEN relpages < otta THEN '0 bytes'::text "
                "ELSE (bs*(relpages-otta))::bigint || ' bytes' END AS wastedsize, "
                "iname, ituples::bigint AS itups, ipages::bigint AS ipages, iotta, "
                "ROUND(CASE WHEN ipages=0 OR ipages<=iotta THEN 0.0 ELSE "
                "(ipages-iotta::numeric)/ipages END,3) AS ibloat, "
                "CASE WHEN ipages < iotta THEN 0 ELSE ipages::bigint - iotta END "
                "AS wastedipages, CASE WHEN ipages < iotta THEN 0 "
                "ELSE bs*(ipages-iotta) END AS wastedibytes, "
                "CASE WHEN ipages < iotta THEN '0 bytes' ELSE "
                "(bs*(ipages-iotta))::bigint || ' bytes' END AS wastedisize, CASE "
                "WHEN relpages < otta THEN CASE WHEN ipages < iotta THEN 0 "
                "ELSE bs*(ipages-iotta::bigint) END ELSE CASE WHEN ipages < iotta "
                "THEN bs*(relpages-otta::bigint) "
                "ELSE bs*(relpages-otta::bigint + ipages-iotta::bigint) END "
                "END AS totalwastedbytes FROM (SELECT nn.nspname AS schemaname, "
                "cc.relname AS tablename, COALESCE(cc.reltuples,0) AS reltuples, "
                "COALESCE(cc.relpages,0) AS relpages, COALESCE(bs,0) AS bs, "
                "COALESCE(CEIL((cc.reltuples*((datahdr+ma-(CASE WHEN datahdr%ma=0 "
                "THEN ma ELSE datahdr%ma END))+nullhdr2+4))/(bs-20::float)),0) AS otta, "
                "COALESCE(c2.relname,'?') AS iname, COALESCE(c2.reltuples,0) AS ituples, "
                "COALESCE(c2.relpages,0) AS ipages, "
                "COALESCE(CEIL((c2.reltuples*(datahdr-12))/(bs-20::float)),0) AS iotta "
                "FROM pg_class cc JOIN pg_namespace nn ON cc.relnamespace = nn.oid "
                "AND nn.nspname <> 'information_schema' LEFT "
                "JOIN(SELECT ma,bs,foo.nspname,foo.relname, "
                "(datawidth+(hdr+ma-(case when hdr%ma=0 THEN ma "
                "ELSE hdr%ma END)))::numeric AS datahdr, "
                "(maxfracsum*(nullhdr+ma-(case when nullhdr%ma=0 THEN ma "
                "ELSE nullhdr%ma END))) AS nullhdr2 "
                "FROM (SELECT ns.nspname, tbl.relname, hdr, ma, bs, "
                "SUM((1-coalesce(null_frac,0))*coalesce(avg_width, 2048)) "
                "AS datawidth, MAX(coalesce(null_frac,0)) AS maxfracsum, hdr+("
                "SELECT 1+count(*)/8 FROM pg_stats s2 WHERE null_frac<>0 "
                "AND s2.schemaname = ns.nspname AND s2.tablename = tbl.relname) "
                "AS nullhdr FROM pg_attribute att JOIN pg_class tbl "
                "ON att.attrelid = tbl.oid JOIN pg_namespace ns ON "
                "ns.oid = tbl.relnamespace LEFT JOIN pg_stats s "
                "ON s.schemaname=ns.nspname AND s.tablename = tbl.relname "
                "AND s.attname=att.attname, (SELECT ("
                "SELECT current_setting('block_size')::numeric) AS bs, CASE WHEN "
                "SUBSTRING(SPLIT_PART(v, ' ', 2) FROM '#\"[0-9]+.[0-9]+#\"%' for '#') "
                "IN ('8.0','8.1','8.2') THEN 27 ELSE 23 END AS hdr, CASE "
                "WHEN v ~ 'mingw32' OR v ~ '64-bit' THEN 8 ELSE 4 END AS ma "
                "FROM (SELECT version() AS v) AS foo) AS constants WHERE att.attnum > 0 "
                "AND tbl.relkind='r' GROUP BY 1,2,3,4,5) AS foo) AS rs ON "
                "cc.relname = rs.relname AND nn.nspname = rs.nspname LEFT JOIN pg_index i "
                "ON indrelid = cc.oid LEFT JOIN pg_class c2 ON c2.oid = i.indexrelid) "
                "AS sml WHERE sml.relpages - otta > 0 OR ipages - iotta > 10 ORDER "
                "BY totalwastedbytes DESC LIMIT 10;"
            )

        query = "\\pset footer off"

        cur_rows_only = False
        for idx, database in enumerate(databases):
            query = "%s\n\\c %s\n%s" % (query, database, bloat_query)
            if idx == 0:
                query = "%s\n\\pset tuples_only on" % query

        return self.run_sql_as_db_user(query, mixed_cmd=True, rows_only=cur_rows_only)


def postgres_factory(db_user, pg_binary_path, pg_instance):
    # type: (str, str | None, dict[str, str | None]) -> PostgresBase
    if IS_LINUX:
        return PostgresLinux(db_user, pg_binary_path, pg_instance, LINUX_PROCESS_MATCH_PATTERNS)
    if IS_WINDOWS:
        return PostgresWin(db_user, pg_binary_path, pg_instance, WINDOWS_PROCESS_MATCH_PATTERNS)
    raise OSNotImplementedError


def helper_factory():
    # type: () -> Helpers
    if IS_LINUX:
        return LinuxHelpers()
    if IS_WINDOWS:
        return WindowsHelpers()
    raise OSNotImplementedError


class Helpers:
    """
    Base class for x-plattform postgres helper functions

    All abstract methods must have individual implementation depending on the OS type
    which runs postgres.
    All non-abstract methods are meant to work on all OS types which were subclassed.
    """

    __metaclass__ = abc.ABCMeta

    @staticmethod
    @abc.abstractmethod
    def get_default_postgres_user():
        pass

    @staticmethod
    @abc.abstractmethod
    def get_default_path():
        pass

    @staticmethod
    @abc.abstractmethod
    def get_conf_sep():
        pass

    @staticmethod
    @abc.abstractmethod
    def get_default_db_name():
        pass


class WindowsHelpers(Helpers):
    @staticmethod
    def get_default_postgres_user():
        return "postgres"

    @staticmethod
    def get_default_path():
        return "c:\\ProgramData\\checkmk\\agent\\config"

    @staticmethod
    def get_conf_sep():
        return "|"

    @staticmethod
    def get_default_db_name():
        return "data"


class LinuxHelpers(Helpers):
    @staticmethod
    def get_default_postgres_user():
        for user_id in ("pgsql", "postgres"):
            try:
                proc = subprocess.Popen(  # pylint: disable=consider-using-with
                    ["id", user_id], stdout=subprocess.PIPE, stderr=subprocess.PIPE
                )
                proc.communicate()
                if proc.returncode == 0:
                    return user_id.rstrip()
            except subprocess.CalledProcessError:
                pass
        LOGGER.warning('Could not determine postgres user, using "postgres" as default')
        return "postgres"

    @staticmethod
    def get_default_path():
        return "/etc/check_mk"

    @staticmethod
    def get_conf_sep():
        return ":"

    @staticmethod
    def get_default_db_name():
        return "main"


def open_env_file(file_to_open):
    """Wrapper around built-in open to be able to monkeypatch through all python versions"""
    return open(file_to_open).readlines()


def parse_env_file(env_file):
    # type: (str) -> tuple[str, str, str | None]
    pg_port = None  # mandatory in env_file
    pg_database = "postgres"  # default value
    pg_version = None

    for line in open_env_file(env_file):
        line = line.strip()
        if not line or "=" not in line or line.startswith("#"):
            continue
        if "PGDATABASE=" in line:
            pg_database = re.sub(re.compile("#.*"), "", line.split("=")[-1]).strip()
        elif "PGPORT=" in line:
            pg_port = re.sub(re.compile("#.*"), "", line.split("=")[-1]).strip()
        elif "PGVERSION=" in line:
            pg_version = re.sub(re.compile("#.*"), "", line.split("=")[-1]).strip()

    if pg_port is None:
        raise ValueError("PGPORT is not specified in %s" % env_file)
    return pg_database, pg_port, pg_version


def _parse_INSTANCE_value(value, config_separator):
    # type: (str, str) -> tuple[str, str, str, str]
    keys = value.split(config_separator)
    if len(keys) == 3:
        # Old format (deprecated in Werk 16016), but we don't force updates unless there is
        # a substantial benefit.
        keys = keys + [""]
    env_file, pg_user, pg_passfile, instance_name = keys
    env_file = env_file.strip()
    return (
        env_file,
        pg_user,
        pg_passfile,
        instance_name or env_file.split(os.sep)[-1].split(".")[0],
    )


def parse_postgres_cfg(postgres_cfg, config_separator):
    # type: (list[str], str) -> tuple[str, str | None, list[dict[str, str | None]]]
    """
    Parser for Postgres config. x-Plattform compatible.
    See comment at the beginning of this file for an example.
    """
    dbuser = None
    pg_binary_path = None
    instances = []
    for line in postgres_cfg:
        if line.startswith("#") or "=" not in line:
            continue
        line = line.strip()
        key, value = line.split("=")
        if key == "DBUSER":
            dbuser = value.rstrip()
        if key == "PG_BINARY_PATH":
            pg_binary_path = value.rstrip()
        if key == "INSTANCE":
            env_file, pg_user, pg_passfile, instance_name = _parse_INSTANCE_value(
                value, config_separator
            )
            pg_database, pg_port, pg_version = parse_env_file(env_file)
            instances.append(
                {
                    "name": instance_name.strip(),
                    "pg_user": pg_user.strip(),
                    "pg_passfile": pg_passfile.strip(),
                    "pg_database": pg_database,
                    "pg_port": pg_port,
                    "pg_version": pg_version,
                }
            )
    if dbuser is None:
        raise ValueError("DBUSER must be specified in postgres.cfg")
    return dbuser, pg_binary_path, instances


def parse_arguments(argv):
    parser = optparse.OptionParser()
    parser.add_option("-v", "--verbose", action="count", default=0)
    parser.add_option(
        "-t",
        "--test-connection",
        default=False,
        action="store_true",
        help="Test if postgres is ready",
    )
    options, _ = parser.parse_args(argv)
    return options


def main(argv=None):
    # type: (list | None) -> int

    helper = helper_factory()
    if argv is None:
        argv = sys.argv[1:]

    opt = parse_arguments(argv)

    logging.basicConfig(
        format="%(levelname)s %(asctime)s %(name)s: %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
        level={0: logging.WARN, 1: logging.INFO, 2: logging.DEBUG}.get(opt.verbose, logging.DEBUG),
    )

    instances = []  # type: list[dict[str, str | None]]
    try:
        postgres_cfg_path = os.path.join(
            os.getenv("MK_CONFDIR", helper.get_default_path()), "postgres.cfg"
        )
        with open(postgres_cfg_path) as opened_file:
            postgres_cfg = opened_file.readlines()
        postgres_cfg = [ensure_str(el) for el in postgres_cfg]
        dbuser, pg_binary_path, instances = parse_postgres_cfg(postgres_cfg, helper.get_conf_sep())
    except Exception:
        _, e = sys.exc_info()[:2]  # python2 and python3 compatible exception logging
        dbuser = helper.get_default_postgres_user()
        pg_binary_path = None
        LOGGER.debug("try_parse_config: exception: %s", str(e))
        LOGGER.debug('Using "%s" as default postgres user.', dbuser)

    if not instances:
        default_postgres_installation_parameters = {
            # default database name of postgres installation
            "name": helper.get_default_db_name(),
            "pg_user": "postgres",
            "pg_database": "postgres",
            "pg_port": "5432",
            # Assumption: if no pg_passfile is specified no password will be required.
            # If a password is required but no pg_passfile is specified the process will
            # interactivly prompt for a password.
            "pg_passfile": "",
        }
        instances.append(default_postgres_installation_parameters)

    for instance in instances:
        postgres = postgres_factory(dbuser, pg_binary_path, instance)
        if opt.test_connection:
            postgres.is_pg_ready()
            sys.exit(0)
        postgres.execute_all_queries()
    return 0


if __name__ == "__main__":
    main()