HEX
Server: LiteSpeed
System: Linux shams.tasjeel.ae 5.14.0-611.5.1.el9_7.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Nov 11 08:09:09 EST 2025 x86_64
User: infowars (1469)
PHP: 8.2.29
Disabled: NONE
Upload Files
File: //usr/lib/python3.9/site-packages/ipalib/ipajson.py
#
# Copyright (C) 2024  FreeIPA Contributors see COPYING for license
#

import base64
from cryptography import x509 as crypto_x509
import datetime
from decimal import Decimal
import json
import six
from ipalib.constants import LDAP_GENERALIZED_TIME_FORMAT
from ipalib import capabilities, x509
from ipalib.x509 import Encoding as x509_Encoding
from ipapython.dn import DN
from ipapython.dnsutil import DNSName
from ipapython.kerberos import Principal

if six.PY3:
    unicode = str


class _JSONPrimer(dict):
    """Fast JSON primer and pre-converter

    Prepare a data structure for JSON serialization. In an ideal world, priming
    could be handled by the default hook of json.dumps(). Unfortunately the
    hook treats Python 2 str as text while IPA considers str as bytes.

    The primer uses a couple of tricks to archive maximum performance:

    * O(1) type look instead of O(n) chain of costly isinstance() calls
    * __missing__ and __mro__ with caching to handle subclasses
    * inline code with minor code duplication (func lookup in enc_list/dict)
    * avoid surplus function calls (e.g. func is _identity, obj.__class__
      instead if type(obj))
    * function default arguments to turn global into local lookups
    * avoid re-creation of bound method objects (e.g. result.append)
    * on-demand lookup of client capabilities with cached values

    Depending on the client version number, the primer converts:

    * bytes -> {'__base64__': b64encode}
    * datetime -> {'__datetime__': LDAP_GENERALIZED_TIME}
    * DNSName -> {'__dns_name__': unicode}

    The _ipa_obj_hook() functions unserializes the marked JSON objects to
    bytes, datetime and DNSName.

    :see: _ipa_obj_hook
    """
    __slots__ = ('version', '_cap_datetime', '_cap_dnsname')

    _identity = object()

    def __init__(self, version, _identity=_identity):
        super(_JSONPrimer, self).__init__()
        self.version = version
        self._cap_datetime = None
        self._cap_dnsname = None
        self.update({
            unicode: _identity,
            bool: _identity,
            int: _identity,
            type(None): _identity,
            float: _identity,
            Decimal: unicode,
            DN: str,
            Principal: unicode,
            DNSName: self._enc_dnsname,
            datetime.datetime: self._enc_datetime,
            bytes: self._enc_bytes,
            list: self._enc_list,
            tuple: self._enc_list,
            dict: self._enc_dict,
            x509.IPACertificate: self._enc_certificate,
            crypto_x509.CertificateSigningRequest: self._enc_certificate,
        })

    def __missing__(self, typ):
        # walk MRO to find best match
        for c in typ.__mro__:
            if c in self:
                self[typ] = self[c]
                return self[c]
        # use issubclass to check for registered ABCs
        for c in self:
            if issubclass(typ, c):
                self[typ] = self[c]
                return self[c]
        raise TypeError(typ)

    def convert(self, obj, _identity=_identity):
        # obj.__class__ is twice as fast as type(obj)
        func = self[obj.__class__]
        return obj if func is _identity else func(obj)

    def _enc_datetime(self, val):
        cap = self._cap_datetime
        if cap is None:
            cap = capabilities.client_has_capability(self.version,
                                                     'datetime_values')
            self._cap_datetime = cap
        if cap:
            return {'__datetime__': val.strftime(LDAP_GENERALIZED_TIME_FORMAT)}
        else:
            return val.strftime(LDAP_GENERALIZED_TIME_FORMAT)

    def _enc_dnsname(self, val):
        cap = self._cap_dnsname
        if cap is None:
            cap = capabilities.client_has_capability(self.version,
                                                     'dns_name_values')
            self._cap_dnsname = cap
        if cap:
            return {'__dns_name__': unicode(val)}
        else:
            return unicode(val)

    def _enc_bytes(self, val):
        encoded = base64.b64encode(val)
        if not six.PY2:
            encoded = encoded.decode('ascii')
        return {'__base64__': encoded}

    def _enc_list(self, val, _identity=_identity):
        result = []
        append = result.append
        for v in val:
            func = self[v.__class__]
            append(v if func is _identity else func(v))
        return result

    def _enc_dict(self, val, _identity=_identity, _iteritems=six.iteritems):
        result = {}
        for k, v in _iteritems(val):
            func = self[v.__class__]
            result[k] = v if func is _identity else func(v)
        return result

    def _enc_certificate(self, val):
        return self._enc_bytes(val.public_bytes(x509_Encoding.DER))


def json_encode_binary(val, version, pretty_print=False):
    """Serialize a Python object structure to JSON

    :param object val: Python object structure
    :param str version: client version
    :param bool pretty_print: indent and sort JSON (warning: slow!)
    :return: text
    :note: pretty printing triggers a slow path in Python's JSON module. Only
           use pretty_print in debug mode.
    """
    result = _JSONPrimer(version).convert(val)
    if pretty_print:
        return json.dumps(result, indent=4, sort_keys=True)
    else:
        return json.dumps(result)


def _ipa_obj_hook(dct, _iteritems=six.iteritems, _list=list):
    """JSON object hook

    :see: _JSONPrimer
    """
    if '__base64__' in dct:
        return base64.b64decode(dct['__base64__'])
    elif '__datetime__' in dct:
        return datetime.datetime.strptime(dct['__datetime__'],
                                          LDAP_GENERALIZED_TIME_FORMAT)
    elif '__dns_name__' in dct:
        return DNSName(dct['__dns_name__'])
    else:
        # XXX tests assume tuples. Is this really necessary?
        for k, v in _iteritems(dct):
            if v.__class__ is _list:
                dct[k] = tuple(v)
        return dct


def json_decode_binary(val):
    """Convert serialized JSON string back to Python data structure

    :param val: JSON string
    :type val: str, bytes
    :return: Python data structure
    :see: _ipa_obj_hook, _JSONPrimer
    """
    if isinstance(val, bytes):
        val = val.decode('utf-8')

    return json.loads(val, object_hook=_ipa_obj_hook)