HEX
Server: LiteSpeed
System: Linux shams.tasjeel.ae 5.14.0-611.5.1.el9_7.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Nov 11 08:09:09 EST 2025 x86_64
User: infowars (1469)
PHP: 8.2.29
Disabled: NONE
Upload Files
File: //usr/lib/python3.9/site-packages/ipaclient/plugins/vault.py
# Authors:
#   Endi S. Dewata <edewata@redhat.com>
#
# Copyright (C) 2015  Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from __future__ import print_function

import base64
import errno
import io
import json
import logging
import os
import ssl
import tempfile

from cryptography.fernet import Fernet, InvalidToken
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
try:
    # cryptography>=43.0.0
    from cryptography.hazmat.decrepit.ciphers.algorithms import TripleDES
except ImportError:
    # will be removed from this module in cryptography 48.0.0
    from cryptography.hazmat.primitives.ciphers.algorithms import TripleDES
from cryptography.hazmat.primitives.padding import PKCS7
from cryptography.hazmat.primitives.serialization import (
    load_pem_public_key, load_pem_private_key)

from ipaclient.frontend import MethodOverride
from ipalib import x509
from ipalib import constants
from ipalib.frontend import Local, Method, Object
from ipalib.util import classproperty
from ipalib import api, errors
from ipalib import Bytes, Flag, Str
from ipalib.plugable import Registry
from ipalib import _
from ipapython import ipautil
from ipapython.dnsutil import DNSName

logger = logging.getLogger(__name__)


def validated_read(argname, filename, mode='r', encoding=None):
    """Read file and catch errors

    IOError and UnicodeError (for text files) are turned into a
    ValidationError
    """
    try:
        with io.open(filename, mode=mode, encoding=encoding) as f:
            data = f.read()
    except IOError as exc:
        raise errors.ValidationError(
            name=argname,
            error=_("Cannot read file '%(filename)s': %(exc)s") % {
                'filename': filename, 'exc': exc.args[1]
                }
        )
    except UnicodeError as exc:
        raise errors.ValidationError(
            name=argname,
            error=_("Cannot decode file '%(filename)s': %(exc)s") % {
                'filename': filename, 'exc': exc
                }
        )
    return data


register = Registry()

MAX_VAULT_DATA_SIZE = 2**20  # = 1 MB


def generate_symmetric_key(password, salt):
    """
    Generates symmetric key from password and salt.
    """
    kdf = PBKDF2HMAC(
        algorithm=hashes.SHA256(),
        length=32,
        salt=salt,
        iterations=100000,
        backend=default_backend()
    )

    return base64.b64encode(kdf.derive(password.encode('utf-8')))


def encrypt(data, symmetric_key=None, public_key=None):
    """
    Encrypts data with symmetric key or public key.
    """
    if symmetric_key is not None:
        if public_key is not None:
            raise ValueError(
                "Either a symmetric or a public key is required, not both."
            )
        fernet = Fernet(symmetric_key)
        return fernet.encrypt(data)

    elif public_key is not None:
        public_key_obj = load_pem_public_key(
            data=public_key,
            backend=default_backend()
        )
        return public_key_obj.encrypt(
            data,
            padding.OAEP(
                mgf=padding.MGF1(algorithm=hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None
            )
        )
    else:
        raise ValueError("Either a symmetric or a public key is required.")


def decrypt(data, symmetric_key=None, private_key=None):
    """
    Decrypts data with symmetric key or public key.
    """
    if symmetric_key is not None:
        if private_key is not None:
            raise ValueError(
                "Either a symmetric or a private key is required, not both."
            )
        try:
            fernet = Fernet(symmetric_key)
            return fernet.decrypt(data)
        except InvalidToken:
            raise errors.AuthenticationError(
                message=_('Invalid credentials'))

    elif private_key is not None:
        try:
            private_key_obj = load_pem_private_key(
                data=private_key,
                password=None,
                backend=default_backend()
            )
            return private_key_obj.decrypt(
                data,
                padding.OAEP(
                    mgf=padding.MGF1(algorithm=hashes.SHA256()),
                    algorithm=hashes.SHA256(),
                    label=None
                )
            )
        except ValueError:
            raise errors.AuthenticationError(
                message=_('Invalid credentials'))
    else:
        raise ValueError("Either a symmetric or a private key is required.")


@register(no_fail=True)
class _fake_vault(Object):
    name = 'vault'


@register(no_fail=True)
class _fake_vault_add_internal(Method):
    name = 'vault_add_internal'
    NO_CLI = True


@register()
class vault_add(Local):
    __doc__ = _('Create a new vault.')

    takes_options = (
        Str(
            'password?',
            cli_name='password',
            doc=_('Vault password'),
        ),
        Str(  # TODO: use File parameter
            'password_file?',
            cli_name='password_file',
            doc=_('File containing the vault password'),
        ),
        Str(  # TODO: use File parameter
            'public_key_file?',
            cli_name='public_key_file',
            doc=_('File containing the vault public key'),
        ),
    )

    @classmethod
    def __NO_CLI_getter(cls):
        return (api.Command.get_plugin('vault_add_internal') is
                _fake_vault_add_internal)

    NO_CLI = classproperty(__NO_CLI_getter)

    @property
    def api_version(self):
        return self.api.Command.vault_add_internal.api_version

    def get_args(self):
        for arg in self.api.Command.vault_add_internal.args():
            yield arg
        for arg in super(vault_add, self).get_args():
            yield arg

    def get_options(self):
        for option in self.api.Command.vault_add_internal.options():
            if option.name not in ('ipavaultsalt', 'version'):
                yield option
        for option in super(vault_add, self).get_options():
            yield option

    def get_output_params(self):
        for param in self.api.Command.vault_add_internal.output_params():
            yield param
        for param in super(vault_add, self).get_output_params():
            yield param

    def _iter_output(self):
        return self.api.Command.vault_add_internal.output()

    def forward(self, *args, **options):

        vault_type = options.get('ipavaulttype')

        if vault_type is None:
            internal_cmd = self.api.Command.vault_add_internal
            vault_type = internal_cmd.params.ipavaulttype.default

        password = options.get('password')
        password_file = options.get('password_file')
        public_key = options.get('ipavaultpublickey')
        public_key_file = options.get('public_key_file')

        # don't send these parameters to server
        if 'password' in options:
            del options['password']
        if 'password_file' in options:
            del options['password_file']
        if 'public_key_file' in options:
            del options['public_key_file']

        if vault_type != u'symmetric' and (password or password_file):
            raise errors.MutuallyExclusiveError(
                reason=_('Password can be specified only for '
                         'symmetric vault')
            )

        if vault_type != u'asymmetric' and (public_key or public_key_file):
            raise errors.MutuallyExclusiveError(
                reason=_('Public key can be specified only for '
                         'asymmetric vault')
            )

        if self.api.env.in_server:
            backend = self.api.Backend.ldap2
        else:
            backend = self.api.Backend.rpcclient
        if not backend.isconnected():
            backend.connect()

        if vault_type == u'standard':

            pass

        elif vault_type == u'symmetric':

            # get password
            if password and password_file:
                raise errors.MutuallyExclusiveError(
                    reason=_('Password specified multiple times'))

            elif password:
                pass

            elif password_file:
                password = validated_read('password-file',
                                          password_file,
                                          encoding='utf-8')
                password = password.rstrip('\n')

            else:
                password = self.api.Backend.textui.prompt_password(
                    'New password')

            # generate vault salt
            options['ipavaultsalt'] = os.urandom(16)

        elif vault_type == u'asymmetric':

            # get new vault public key
            if public_key and public_key_file:
                raise errors.MutuallyExclusiveError(
                    reason=_('Public key specified multiple times'))

            elif public_key:
                pass

            elif public_key_file:
                public_key = validated_read('public-key-file',
                                            public_key_file,
                                            mode='rb')

                # store vault public key
                options['ipavaultpublickey'] = public_key

            else:
                raise errors.ValidationError(
                    name='ipavaultpublickey',
                    error=_('Missing vault public key'))

            # validate public key and prevent users from accidentally
            # sending a private key to the server.
            try:
                load_pem_public_key(
                    data=public_key,
                    backend=default_backend()
                )
            except ValueError as e:
                raise errors.ValidationError(
                    name='ipavaultpublickey',
                    error=_('Invalid or unsupported vault public key: %s') % e,
                )

        # create vault
        response = self.api.Command.vault_add_internal(*args, **options)

        # prepare parameters for archival
        opts = options.copy()
        if 'description' in opts:
            del opts['description']
        if 'ipavaulttype' in opts:
            del opts['ipavaulttype']

        if vault_type == u'symmetric':
            opts['password'] = password
            del opts['ipavaultsalt']

        elif vault_type == u'asymmetric':
            del opts['ipavaultpublickey']

        # archive blank data
        self.api.Command.vault_archive(*args, **opts)

        return response


@register(no_fail=True)
class _fake_vault_mod_internal(Method):
    name = 'vault_mod_internal'
    NO_CLI = True


@register()
class vault_mod(Local):
    __doc__ = _('Modify a vault.')

    takes_options = (
        Flag(
            'change_password?',
            doc=_('Change password'),
        ),
        Str(
            'old_password?',
            cli_name='old_password',
            doc=_('Old vault password'),
        ),
        Str(  # TODO: use File parameter
            'old_password_file?',
            cli_name='old_password_file',
            doc=_('File containing the old vault password'),
        ),
        Str(
            'new_password?',
            cli_name='new_password',
            doc=_('New vault password'),
        ),
        Str(  # TODO: use File parameter
            'new_password_file?',
            cli_name='new_password_file',
            doc=_('File containing the new vault password'),
        ),
        Bytes(
            'private_key?',
            cli_name='private_key',
            doc=_('Old vault private key'),
        ),
        Str(  # TODO: use File parameter
            'private_key_file?',
            cli_name='private_key_file',
            doc=_('File containing the old vault private key'),
        ),
        Str(  # TODO: use File parameter
            'public_key_file?',
            cli_name='public_key_file',
            doc=_('File containing the new vault public key'),
        ),
    )

    @classmethod
    def __NO_CLI_getter(cls):
        return (api.Command.get_plugin('vault_mod_internal') is
                _fake_vault_mod_internal)

    NO_CLI = classproperty(__NO_CLI_getter)

    @property
    def api_version(self):
        return self.api.Command.vault_mod_internal.api_version

    def get_args(self):
        for arg in self.api.Command.vault_mod_internal.args():
            yield arg
        for arg in super(vault_mod, self).get_args():
            yield arg

    def get_options(self):
        for option in self.api.Command.vault_mod_internal.options():
            if option.name != 'version':
                yield option
        for option in super(vault_mod, self).get_options():
            yield option

    def get_output_params(self):
        for param in self.api.Command.vault_mod_internal.output_params():
            yield param
        for param in super(vault_mod, self).get_output_params():
            yield param

    def _iter_output(self):
        return self.api.Command.vault_mod_internal.output()

    def forward(self, *args, **options):

        vault_type = options.pop('ipavaulttype', False)
        salt = options.pop('ipavaultsalt', False)
        change_password = options.pop('change_password', False)

        old_password = options.pop('old_password', None)
        old_password_file = options.pop('old_password_file', None)
        new_password = options.pop('new_password', None)
        new_password_file = options.pop('new_password_file', None)

        old_private_key = options.pop('private_key', None)
        old_private_key_file = options.pop('private_key_file', None)
        new_public_key = options.pop('ipavaultpublickey', None)
        new_public_key_file = options.pop('public_key_file', None)

        if self.api.env.in_server:
            backend = self.api.Backend.ldap2
        else:
            backend = self.api.Backend.rpcclient
        if not backend.isconnected():
            backend.connect()

        # determine the vault type based on parameters specified
        if vault_type:
            pass

        elif change_password or new_password or new_password_file or salt:
            vault_type = u'symmetric'

        elif new_public_key or new_public_key_file:
            vault_type = u'asymmetric'

        # if vault type is specified, retrieve existing secret
        if vault_type:
            opts = options.copy()
            opts.pop('description', None)

            opts['password'] = old_password
            opts['password_file'] = old_password_file
            opts['private_key'] = old_private_key
            opts['private_key_file'] = old_private_key_file

            response = self.api.Command.vault_retrieve(*args, **opts)
            data = response['result']['data']

        opts = options.copy()

        # if vault type is specified, update crypto attributes
        if vault_type:
            opts['ipavaulttype'] = vault_type

            if vault_type == u'standard':
                opts['ipavaultsalt'] = None
                opts['ipavaultpublickey'] = None

            elif vault_type == u'symmetric':
                if salt:
                    opts['ipavaultsalt'] = salt
                else:
                    opts['ipavaultsalt'] = os.urandom(16)

                opts['ipavaultpublickey'] = None

            elif vault_type == u'asymmetric':

                # get new vault public key
                if new_public_key and new_public_key_file:
                    raise errors.MutuallyExclusiveError(
                        reason=_('New public key specified multiple times'))

                elif new_public_key:
                    pass

                elif new_public_key_file:
                    new_public_key = validated_read('public_key_file',
                                                    new_public_key_file,
                                                    mode='rb')

                else:
                    raise errors.ValidationError(
                        name='ipavaultpublickey',
                        error=_('Missing new vault public key'))

                opts['ipavaultsalt'] = None
                opts['ipavaultpublickey'] = new_public_key

        response = self.api.Command.vault_mod_internal(*args, **opts)

        # if vault type is specified, rearchive existing secret
        if vault_type:
            opts = options.copy()
            opts.pop('description', None)

            opts['data'] = data
            opts['password'] = new_password
            opts['password_file'] = new_password_file
            opts['override_password'] = True

            self.api.Command.vault_archive(*args, **opts)

        return response


class _KraConfigCache:
    """The KRA config cache stores vaultconfig-show result.
    """
    def __init__(self, api):
        self._dirname = os.path.join(api.env.cache_dir, 'kra-config')

    def _get_filename(self, domain):
        basename = DNSName(domain).ToASCII() + '.json'
        return os.path.join(self._dirname, basename)

    def load(self, domain):
        """Load config from cache

        :param domain: IPA domain
        :return: dict or None
        """
        filename = self._get_filename(domain)
        try:
            try:
                with open(filename) as f:
                    return json.load(f)
            except OSError as e:
                if e.errno != errno.ENOENT:
                    raise
        except Exception:
            logger.warning("Failed to load %s", filename, exc_info=True)
        return None

    def store(self, domain, response):
        """Store config in cache

        :param domain: IPA domain
        :param config: ipa vaultconfig-show response
        :return: True if config was stored successfully
        """
        config = response['result'].copy()
        # store certificate as PEM-encoded ASCII
        config['transport_cert'] = ssl.DER_cert_to_PEM_cert(
            config['transport_cert']
        )
        filename = self._get_filename(domain)
        try:
            try:
                os.makedirs(self._dirname)
            except EnvironmentError as e:
                if e.errno != errno.EEXIST:
                    raise
            with tempfile.NamedTemporaryFile(dir=self._dirname, delete=False,
                                             mode='w') as f:
                try:
                    json.dump(config, f)
                    ipautil.flush_sync(f)
                    f.close()
                    os.rename(f.name, filename)
                except Exception:
                    os.unlink(f.name)
                    raise
        except Exception:
            logger.warning("Failed to save %s", filename, exc_info=True)
            return False
        else:
            return True

    def remove(self, domain):
        """Remove a config from cache, ignores errors

        :param domain: IPA domain
        :return: True if cert was found and removed
        """
        filename = self._get_filename(domain)
        try:
            os.unlink(filename)
        except EnvironmentError as e:
            if e.errno != errno.ENOENT:
                logger.warning("Failed to remove %s", filename, exc_info=True)
            return False
        else:
            return True


_kra_config_cache = _KraConfigCache(api)


@register(override=True, no_fail=True)
class vaultconfig_show(MethodOverride):
    def forward(self, *args, **options):

        file = options.get('transport_out')

        # don't send these parameters to server
        if 'transport_out' in options:
            del options['transport_out']

        response = super(vaultconfig_show, self).forward(*args, **options)

        # cache config
        _kra_config_cache.store(self.api.env.domain, response)

        if file:
            with open(file, 'wb') as f:
                f.write(response['result']['transport_cert'])

        return response


class ModVaultData(Local):
    def _generate_session_key(self, name):
        if name not in constants.VAULT_WRAPPING_SUPPORTED_ALGOS:
            msg = _("{algo} is not a supported vault wrapping algorithm")
            raise errors.ValidationError(msg.format(algo=repr(name)))
        if name == constants.VAULT_WRAPPING_AES128_CBC:
            return algorithms.AES(os.urandom(128 // 8))
        elif name == constants.VAULT_WRAPPING_3DES:
            return TripleDES(os.urandom(196 // 8))
        else:
            # unreachable
            raise ValueError(name)

    def _get_vaultconfig(self, force_refresh=False):
        config = None
        if not force_refresh:
            config = _kra_config_cache.load(self.api.env.domain)
        if config is None:
            # vaultconfig_show also caches data
            response = self.api.Command.vaultconfig_show()
            config = response['result']
            transport_cert = x509.load_der_x509_certificate(
                config['transport_cert']
            )
        else:
            # cached JSON uses PEM-encoded ASCII string
            transport_cert = x509.load_pem_x509_certificate(
                config['transport_cert'].encode('ascii')
            )

        default_algo = config.get('wrapping_default_algorithm')
        if default_algo is None:
            # old server
            wrapping_algo = constants.VAULT_WRAPPING_3DES
        elif default_algo in constants.VAULT_WRAPPING_SUPPORTED_ALGOS:
            # try to use server default
            wrapping_algo = default_algo
        else:
            # prefer server's sorting order
            for algo in config['wrapping_supported_algorithms']:
                if algo in constants.VAULT_WRAPPING_SUPPORTED_ALGOS:
                    wrapping_algo = algo
                    break
            else:
                raise errors.ValidationError(
                    "No overlapping wrapping algorithm between server and "
                    "client."
                )
        return transport_cert, wrapping_algo

    def _do_internal(self, algo, transport_cert, raise_unexpected,
                     use_oaep=False, *args, **options):
        public_key = transport_cert.public_key()

        # wrap session key with transport certificate
        # KRA may be configured using either the default PKCS1v15 or RSA-OAEP.
        # there is no way to query this info using the REST interface.
        if not use_oaep:
            # PKCS1v15() causes an OpenSSL exception when FIPS is enabled
            # if so, we fallback to RSA-OAEP
            try:
                wrapped_session_key = public_key.encrypt(
                    algo.key,
                    padding.PKCS1v15()
                )
            except ValueError:
                wrapped_session_key = public_key.encrypt(
                    algo.key,
                    padding.OAEP(
                        mgf=padding.MGF1(algorithm=hashes.SHA256()),
                        algorithm=hashes.SHA256(),
                        label=None
                    )
                )
        else:
            wrapped_session_key = public_key.encrypt(
                algo.key,
                padding.OAEP(
                    mgf=padding.MGF1(algorithm=hashes.SHA256()),
                    algorithm=hashes.SHA256(),
                    label=None
                )
            )

        options['session_key'] = wrapped_session_key

        name = self.name + '_internal'
        try:
            # ipalib.errors.NotFound exception can be propagated
            return self.api.Command[name](*args, **options)
        except (errors.InternalError,
                errors.ExecutionError,
                errors.GenericError):
            _kra_config_cache.remove(self.api.env.domain)
            if raise_unexpected and use_oaep:
                raise
        return None

    def internal(self, algo, transport_cert, *args, **options):
        """
        Calls the internal counterpart of the command.
        """
        # try call with cached transport certificate
        try:
            result = self._do_internal(algo, transport_cert, False,
                                       False, *args, **options)
        except errors.EncodingError:
            result = self._do_internal(algo, transport_cert, False,
                                       True, *args, **options)
        if result is not None:
            return result

        # retrieve transport certificate (cached by vaultconfig_show)
        transport_cert = self._get_vaultconfig(force_refresh=True)[0]

        # call with the retrieved transport certificate
        result = self._do_internal(algo, transport_cert, True,
                                   False, *args, **options)

        if result is not None:
            return result

        # call and use_oaep this time, last attempt
        return self._do_internal(algo, transport_cert, True,
                                 True, *args, **options)


@register(no_fail=True)
class _fake_vault_archive_internal(Method):
    name = 'vault_archive_internal'
    NO_CLI = True


@register()
class vault_archive(ModVaultData):
    __doc__ = _('Archive data into a vault.')

    takes_options = (
        Bytes(
            'data?',
            doc=_('Binary data to archive'),
        ),
        Str(  # TODO: use File parameter
            'in?',
            doc=_('File containing data to archive'),
        ),
        Str(
            'password?',
            cli_name='password',
            doc=_('Vault password'),
        ),
        Str(  # TODO: use File parameter
            'password_file?',
            cli_name='password_file',
            doc=_('File containing the vault password'),
        ),
        Flag(
            'override_password?',
            doc=_('Override existing password'),
        ),
    )

    @classmethod
    def __NO_CLI_getter(cls):
        return (api.Command.get_plugin('vault_archive_internal') is
                _fake_vault_archive_internal)

    NO_CLI = classproperty(__NO_CLI_getter)

    @property
    def api_version(self):
        return self.api.Command.vault_archive_internal.api_version

    def get_args(self):
        for arg in self.api.Command.vault_archive_internal.args():
            yield arg
        for arg in super(vault_archive, self).get_args():
            yield arg

    def get_options(self):
        for option in self.api.Command.vault_archive_internal.options():
            if option.name not in ('nonce',
                                   'session_key',
                                   'vault_data',
                                   'version',
                                   'wrapping_algo'):
                yield option
        for option in super(vault_archive, self).get_options():
            yield option

    def get_output_params(self):
        for param in self.api.Command.vault_archive_internal.output_params():
            yield param
        for param in super(vault_archive, self).get_output_params():
            yield param

    def _iter_output(self):
        return self.api.Command.vault_archive_internal.output()

    def _wrap_data(self, algo, json_vault_data):
        """Encrypt data with wrapped session key and transport cert

        :param algo: wrapping algorithm instance
        :param bytes json_vault_data: dumped vault data
        :return:
        """
        nonce = os.urandom(algo.block_size // 8)

        # wrap vault_data with session key
        padder = PKCS7(algo.block_size).padder()
        padded_data = padder.update(json_vault_data)
        padded_data += padder.finalize()

        cipher = Cipher(algo, modes.CBC(nonce), backend=default_backend())
        encryptor = cipher.encryptor()
        wrapped_vault_data = encryptor.update(padded_data) + encryptor.finalize()

        return nonce, wrapped_vault_data

    def forward(self, *args, **options):
        data = options.get('data')
        input_file = options.get('in')

        password = options.get('password')
        password_file = options.get('password_file')

        override_password = options.pop('override_password', False)

        # don't send these parameters to server
        if 'data' in options:
            del options['data']
        if 'in' in options:
            del options['in']
        if 'password' in options:
            del options['password']
        if 'password_file' in options:
            del options['password_file']

        # get data
        if data and input_file:
            raise errors.MutuallyExclusiveError(
                reason=_('Input data specified multiple times'))

        elif data:
            if len(data) > MAX_VAULT_DATA_SIZE:
                raise errors.ValidationError(name="data", error=_(
                    "Size of data exceeds the limit. Current vault data size "
                    "limit is %(limit)d B")
                    % {'limit': MAX_VAULT_DATA_SIZE})

        elif input_file:
            try:
                stat = os.stat(input_file)
            except OSError as exc:
                raise errors.ValidationError(name="in", error=_(
                    "Cannot read file '%(filename)s': %(exc)s")
                    % {'filename': input_file, 'exc': exc.args[1]})
            if stat.st_size > MAX_VAULT_DATA_SIZE:
                raise errors.ValidationError(name="in", error=_(
                    "Size of data exceeds the limit. Current vault data size "
                    "limit is %(limit)d B")
                    % {'limit': MAX_VAULT_DATA_SIZE})
            data = validated_read('in', input_file, mode='rb')

        else:
            data = b''

        if self.api.env.in_server:
            backend = self.api.Backend.ldap2
        else:
            backend = self.api.Backend.rpcclient
        if not backend.isconnected():
            backend.connect()

        # retrieve vault info
        vault = self.api.Command.vault_show(*args, **options)['result']

        vault_type = vault['ipavaulttype'][0]

        if vault_type == u'standard':

            encrypted_key = None

        elif vault_type == u'symmetric':

            # get password
            if password and password_file:
                raise errors.MutuallyExclusiveError(
                    reason=_('Password specified multiple times'))

            elif password:
                pass

            elif password_file:
                password = validated_read('password-file',
                                          password_file,
                                          encoding='utf-8')
                password = password.rstrip('\n')

            else:
                if override_password:
                    password = self.api.Backend.textui.prompt_password(
                        'New password')
                else:
                    password = self.api.Backend.textui.prompt_password(
                        'Password', confirm=False)

            if not override_password:
                # verify password by retrieving existing data
                opts = options.copy()
                opts['password'] = password
                try:
                    self.api.Command.vault_retrieve(*args, **opts)
                except errors.NotFound:
                    pass

            salt = vault['ipavaultsalt'][0]

            # generate encryption key from vault password
            encryption_key = generate_symmetric_key(password, salt)

            # encrypt data with encryption key
            data = encrypt(data, symmetric_key=encryption_key)

            encrypted_key = None

        elif vault_type == u'asymmetric':

            public_key = vault['ipavaultpublickey'][0]

            # generate encryption key
            encryption_key = base64.b64encode(os.urandom(32))

            # encrypt data with encryption key
            data = encrypt(data, symmetric_key=encryption_key)

            # encrypt encryption key with public key
            encrypted_key = encrypt(encryption_key, public_key=public_key)

        else:
            raise errors.ValidationError(
                name='vault_type',
                error=_('Invalid vault type'))


        vault_data = {
            'data': base64.b64encode(data).decode('utf-8')
        }
        if encrypted_key:
            vault_data[u'encrypted_key'] = base64.b64encode(encrypted_key)\
                .decode('utf-8')

        json_vault_data = json.dumps(vault_data).encode('utf-8')

        # get config
        transport_cert, wrapping_algo = self._get_vaultconfig()
        # let options override wrapping algo
        # For backwards compatibility do not send old legacy wrapping algo
        # to server. Only send the option when non-3DES is used.
        wrapping_algo = options.pop('wrapping_algo', wrapping_algo)
        if wrapping_algo != constants.VAULT_WRAPPING_3DES:
            options['wrapping_algo'] = wrapping_algo

        # generate session key
        algo = self._generate_session_key(wrapping_algo)
        # wrap vault data
        nonce, wrapped_vault_data = self._wrap_data(algo, json_vault_data)
        options.update(
            nonce=nonce,
            vault_data=wrapped_vault_data
        )
        return self.internal(algo, transport_cert, *args, **options)


@register(no_fail=True)
class _fake_vault_retrieve_internal(Method):
    name = 'vault_retrieve_internal'
    NO_CLI = True


@register()
class vault_retrieve(ModVaultData):
    __doc__ = _('Retrieve a data from a vault.')

    takes_options = (
        Str(
            'out?',
            doc=_('File to store retrieved data'),
        ),
        Str(
            'password?',
            cli_name='password',
            doc=_('Vault password'),
        ),
        Str(  # TODO: use File parameter
            'password_file?',
            cli_name='password_file',
            doc=_('File containing the vault password'),
        ),
        Bytes(
            'private_key?',
            cli_name='private_key',
            doc=_('Vault private key'),
        ),
        Str(  # TODO: use File parameter
            'private_key_file?',
            cli_name='private_key_file',
            doc=_('File containing the vault private key'),
        ),
    )

    has_output_params = (
        Bytes(
            'data',
            label=_('Data'),
        ),
    )

    @classmethod
    def __NO_CLI_getter(cls):
        return (api.Command.get_plugin('vault_retrieve_internal') is
                _fake_vault_retrieve_internal)

    NO_CLI = classproperty(__NO_CLI_getter)

    @property
    def api_version(self):
        return self.api.Command.vault_retrieve_internal.api_version

    def get_args(self):
        for arg in self.api.Command.vault_retrieve_internal.args():
            yield arg
        for arg in super(vault_retrieve, self).get_args():
            yield arg

    def get_options(self):
        for option in self.api.Command.vault_retrieve_internal.options():
            if option.name not in ('session_key', 'version', 'wrapping_algo'):
                yield option
        for option in super(vault_retrieve, self).get_options():
            yield option

    def get_output_params(self):
        for param in self.api.Command.vault_retrieve_internal.output_params():
            yield param
        for param in super(vault_retrieve, self).get_output_params():
            yield param

    def _iter_output(self):
        return self.api.Command.vault_retrieve_internal.output()

    def _unwrap_response(self, algo, nonce, vault_data):
        cipher = Cipher(algo, modes.CBC(nonce), backend=default_backend())
        # decrypt
        decryptor = cipher.decryptor()
        padded_data = decryptor.update(vault_data)
        padded_data += decryptor.finalize()
        # remove padding
        unpadder = PKCS7(algo.block_size).unpadder()
        json_vault_data = unpadder.update(padded_data)
        json_vault_data += unpadder.finalize()
        # load JSON
        return json.loads(json_vault_data.decode('utf-8'))

    def forward(self, *args, **options):
        output_file = options.get('out')

        password = options.get('password')
        password_file = options.get('password_file')
        private_key = options.get('private_key')
        private_key_file = options.get('private_key_file')

        # don't send these parameters to server
        if 'out' in options:
            del options['out']
        if 'password' in options:
            del options['password']
        if 'password_file' in options:
            del options['password_file']
        if 'private_key' in options:
            del options['private_key']
        if 'private_key_file' in options:
            del options['private_key_file']

        if self.api.env.in_server:
            backend = self.api.Backend.ldap2
        else:
            backend = self.api.Backend.rpcclient
        if not backend.isconnected():
            backend.connect()

        # retrieve vault info
        vault = self.api.Command.vault_show(*args, **options)['result']
        vault_type = vault['ipavaulttype'][0]

        # get config
        transport_cert, wrapping_algo = self._get_vaultconfig()
        # let options override wrapping algo
        # For backwards compatibility do not send old legacy wrapping algo
        # to server. Only send the option when non-3DES is used.
        wrapping_algo = options.pop('wrapping_algo', wrapping_algo)
        if wrapping_algo != constants.VAULT_WRAPPING_3DES:
            options['wrapping_algo'] = wrapping_algo

        # generate session key
        algo = self._generate_session_key(wrapping_algo)
        # send retrieval request to server
        response = self.internal(algo, transport_cert, *args, **options)
        # unwrap data with session key
        vault_data = self._unwrap_response(
            algo,
            response['result']['nonce'],
            response['result']['vault_data']
        )
        del algo

        data = base64.b64decode(vault_data[u'data'].encode('utf-8'))
        encrypted_key = None

        if 'encrypted_key' in vault_data:
            encrypted_key = base64.b64decode(vault_data[u'encrypted_key']
                                             .encode('utf-8'))

        if vault_type == u'standard':

            pass

        elif vault_type == u'symmetric':

            salt = vault['ipavaultsalt'][0]

            # get encryption key from vault password
            if password and password_file:
                raise errors.MutuallyExclusiveError(
                    reason=_('Password specified multiple times'))

            elif password:
                pass

            elif password_file:
                password = validated_read('password-file',
                                          password_file,
                                          encoding='utf-8')
                password = password.rstrip('\n')

            else:
                password = self.api.Backend.textui.prompt_password(
                    'Password', confirm=False)

            # generate encryption key from password
            encryption_key = generate_symmetric_key(password, salt)

            # decrypt data with encryption key
            data = decrypt(data, symmetric_key=encryption_key)

        elif vault_type == u'asymmetric':

            # get encryption key with vault private key
            if private_key and private_key_file:
                raise errors.MutuallyExclusiveError(
                    reason=_('Private key specified multiple times'))

            elif private_key:
                pass

            elif private_key_file:
                private_key = validated_read('private-key-file',
                                             private_key_file,
                                             mode='rb')

            else:
                raise errors.ValidationError(
                    name='private_key',
                    error=_('Missing vault private key'))

            # decrypt encryption key with private key
            encryption_key = decrypt(encrypted_key, private_key=private_key)

            # decrypt data with encryption key
            data = decrypt(data, symmetric_key=encryption_key)

        else:
            raise errors.ValidationError(
                name='vault_type',
                error=_('Invalid vault type'))

        if output_file:
            with open(output_file, 'wb') as f:
                f.write(data)

        else:
            response['result'] = {'data': data}

        return response