Файловый менеджер - Редактировать - /home/freeclou/app.optimyar.com/front-web/build/assets/styles/__init__.py.tar
Назад
usr/lib/python3.9/site-packages/tracer/packageManagers/__init__.py 0000644 00000000000 15110473313 0021044 0 ustar 00 usr/lib/python3.9/site-packages/dnfpluginscore/__init__.py 0000644 00000002340 15111326677 0017561 0 ustar 00 # Copyright (C) 2014 Red Hat, Inc. # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # the GNU General Public License v.2, or (at your option) any later version. # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY expressed or implied, including the implied warranties of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. You should have received a copy of the # GNU General Public License along with this program; if not, write to the # Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301, USA. Any Red Hat trademarks that are incorporated in the # source code or documentation are not subject to the GNU General Public # License and may only be used or replicated with the express permission of # Red Hat, Inc. # """ Common code for dnf-plugins-core""" from __future__ import absolute_import from __future__ import unicode_literals import dnf.exceptions import logging _, P_ = dnf.i18n.translation('dnf-plugins-core') logger = logging.getLogger('dnf.plugin') rpm_logger = logging.getLogger('dnf.rpm') usr/lib/python3.9/site-packages/pip/_internal/operations/__init__.py 0000644 00000000000 15111422561 0021441 0 ustar 00 usr/lib/python3.9/site-packages/dnf/module/__init__.py 0000644 00000002356 15111605226 0016570 0 ustar 00 # Copyright (C) 2017 Red Hat, Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Library General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. from dnf.i18n import _ DIFFERENT_STREAM_INFO = 1 NOTHING_TO_SHOW = 2 INSTALLING_NEWER_VERSION = 4 ENABLED_MODULES = 5 NO_PROFILE_SPECIFIED = 6 module_messages = { DIFFERENT_STREAM_INFO: _("Enabling different stream for '{}'."), NOTHING_TO_SHOW: _("Nothing to show."), INSTALLING_NEWER_VERSION: _("Installing newer version of '{}' than specified. Reason: {}"), ENABLED_MODULES: _("Enabled modules: {}."), NO_PROFILE_SPECIFIED: _("No profile specified for '{}', please specify profile."), } usr/lib/python3.9/site-packages/dns/dnssecalgs/__init__.py 0000644 00000010267 15111613261 0017444 0 ustar 00 from typing import Dict, Optional, Tuple, Type, Union import dns.name from dns.dnssecalgs.base import GenericPrivateKey from dns.dnssectypes import Algorithm from dns.exception import UnsupportedAlgorithm from dns.rdtypes.ANY.DNSKEY import DNSKEY if dns._features.have("dnssec"): from dns.dnssecalgs.dsa import PrivateDSA, PrivateDSANSEC3SHA1 from dns.dnssecalgs.ecdsa import PrivateECDSAP256SHA256, PrivateECDSAP384SHA384 from dns.dnssecalgs.eddsa import PrivateED448, PrivateED25519 from dns.dnssecalgs.rsa import ( PrivateRSAMD5, PrivateRSASHA1, PrivateRSASHA1NSEC3SHA1, PrivateRSASHA256, PrivateRSASHA512, ) _have_cryptography = True else: _have_cryptography = False AlgorithmPrefix = Optional[Union[bytes, dns.name.Name]] algorithms: Dict[Tuple[Algorithm, AlgorithmPrefix], Type[GenericPrivateKey]] = {} if _have_cryptography: algorithms.update( { (Algorithm.RSAMD5, None): PrivateRSAMD5, (Algorithm.DSA, None): PrivateDSA, (Algorithm.RSASHA1, None): PrivateRSASHA1, (Algorithm.DSANSEC3SHA1, None): PrivateDSANSEC3SHA1, (Algorithm.RSASHA1NSEC3SHA1, None): PrivateRSASHA1NSEC3SHA1, (Algorithm.RSASHA256, None): PrivateRSASHA256, (Algorithm.RSASHA512, None): PrivateRSASHA512, (Algorithm.ECDSAP256SHA256, None): PrivateECDSAP256SHA256, (Algorithm.ECDSAP384SHA384, None): PrivateECDSAP384SHA384, (Algorithm.ED25519, None): PrivateED25519, (Algorithm.ED448, None): PrivateED448, } ) def get_algorithm_cls( algorithm: Union[int, str], prefix: AlgorithmPrefix = None ) -> Type[GenericPrivateKey]: """Get Private Key class from Algorithm. *algorithm*, a ``str`` or ``int`` specifying the DNSKEY algorithm. Raises ``UnsupportedAlgorithm`` if the algorithm is unknown. Returns a ``dns.dnssecalgs.GenericPrivateKey`` """ algorithm = Algorithm.make(algorithm) cls = algorithms.get((algorithm, prefix)) if cls: return cls raise UnsupportedAlgorithm( 'algorithm "%s" not supported by dnspython' % Algorithm.to_text(algorithm) ) def get_algorithm_cls_from_dnskey(dnskey: DNSKEY) -> Type[GenericPrivateKey]: """Get Private Key class from DNSKEY. *dnskey*, a ``DNSKEY`` to get Algorithm class for. Raises ``UnsupportedAlgorithm`` if the algorithm is unknown. Returns a ``dns.dnssecalgs.GenericPrivateKey`` """ prefix: AlgorithmPrefix = None if dnskey.algorithm == Algorithm.PRIVATEDNS: prefix, _ = dns.name.from_wire(dnskey.key, 0) elif dnskey.algorithm == Algorithm.PRIVATEOID: length = int(dnskey.key[0]) prefix = dnskey.key[0 : length + 1] return get_algorithm_cls(dnskey.algorithm, prefix) def register_algorithm_cls( algorithm: Union[int, str], algorithm_cls: Type[GenericPrivateKey], name: Optional[Union[dns.name.Name, str]] = None, oid: Optional[bytes] = None, ) -> None: """Register Algorithm Private Key class. *algorithm*, a ``str`` or ``int`` specifying the DNSKEY algorithm. *algorithm_cls*: A `GenericPrivateKey` class. *name*, an optional ``dns.name.Name`` or ``str``, for for PRIVATEDNS algorithms. *oid*: an optional BER-encoded `bytes` for PRIVATEOID algorithms. Raises ``ValueError`` if a name or oid is specified incorrectly. """ if not issubclass(algorithm_cls, GenericPrivateKey): raise TypeError("Invalid algorithm class") algorithm = Algorithm.make(algorithm) prefix: AlgorithmPrefix = None if algorithm == Algorithm.PRIVATEDNS: if name is None: raise ValueError("Name required for PRIVATEDNS algorithms") if isinstance(name, str): name = dns.name.from_text(name) prefix = name elif algorithm == Algorithm.PRIVATEOID: if oid is None: raise ValueError("OID required for PRIVATEOID algorithms") prefix = bytes([len(oid)]) + oid elif name: raise ValueError("Name only supported for PRIVATEDNS algorithm") elif oid: raise ValueError("OID only supported for PRIVATEOID algorithm") algorithms[(algorithm, prefix)] = algorithm_cls usr/lib/python3.9/site-packages/sos/cleaner/__init__.py 0000644 00000115166 15111613264 0016756 0 ustar 00 # Copyright 2020 Red Hat, Inc. Jake Hunsaker <jhunsake@redhat.com> # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. import hashlib import json import logging import os import shutil import fnmatch from concurrent.futures import ProcessPoolExecutor from datetime import datetime from pwd import getpwuid import sos.cleaner.preppers from sos import __version__ from sos.component import SoSComponent from sos.cleaner.parsers.ip_parser import SoSIPParser from sos.cleaner.parsers.mac_parser import SoSMacParser from sos.cleaner.parsers.hostname_parser import SoSHostnameParser from sos.cleaner.parsers.keyword_parser import SoSKeywordParser from sos.cleaner.parsers.username_parser import SoSUsernameParser from sos.cleaner.parsers.ipv6_parser import SoSIPv6Parser from sos.cleaner.archives.sos import (SoSReportArchive, SoSReportDirectory, SoSCollectorArchive, SoSCollectorDirectory) from sos.cleaner.archives.generic import DataDirArchive, TarballArchive from sos.cleaner.archives.insights import InsightsArchive from sos.utilities import (get_human_readable, import_module, ImporterHelper) # an auxiliary method to kick off child processes over its instances def obfuscate_arc_files(arc, flist): return arc.obfuscate_arc_files(flist) class SoSCleaner(SoSComponent): """ This function is designed to obfuscate potentially sensitive information from an sos report archive in a consistent and reproducible manner. It may either be invoked during the creation of a report by using the --clean option in the report command, or may be used on an already existing archive by way of 'sos clean'. The target of obfuscation are items such as IP addresses, MAC addresses, hostnames, usernames, and also keywords provided by users via the --keywords and/or --keyword-file options. For every collection made in a report the collection is parsed for such items, and when items are found SoS will generate an obfuscated replacement for it, and in all places that item is found replace the text with the obfuscated replacement mapped to it. These mappings are saved locally so that future iterations will maintain the same consistent obfuscation pairing. In the case of IP addresses, support is for IPv4 and IPv6 - effort is made to keep network topology intact so that later analysis is as accurate and easily understandable as possible. If an IP address is encountered that we cannot determine the netmask for, a private IP address from 172.17.0.0/22 range is used instead. For IPv6, note that IPv4-mapped addresses, e.g. ::ffff:10.11.12.13, are NOT supported currently, and will remain unobfuscated. For hostnames, domains are obfuscated as whole units, leaving the TLD in place. For instance, 'example.com' may be obfuscated to 'obfuscateddomain0.com' and 'foo.example.com' may end up being 'obfuscateddomain1.com'. Users will be notified of a 'mapping' file that records all items and the obfuscated counterpart mapped to them for ease of reference later on. This file should be kept private. """ desc = "Obfuscate sensitive networking information in a report" arg_defaults = { 'archive_type': 'auto', 'domains': [], 'disable_parsers': [], 'skip_cleaning_files': [], 'jobs': 4, 'keywords': [], 'keyword_file': None, 'map_file': '/etc/sos/cleaner/default_mapping', 'no_update': False, 'keep_binary_files': False, 'target': '', 'usernames': [] } def __init__(self, parser=None, args=None, cmdline=None, in_place=False, hook_commons=None): if not in_place: # we are running `sos clean` directly super().__init__(parser, args, cmdline) self.from_cmdline = True else: # we are being hooked by either SoSReport or SoSCollector, don't # re-init everything as that will cause issues, but instead load # the needed bits from the calling component self.opts = hook_commons['options'] self.tmpdir = hook_commons['tmpdir'] self.sys_tmp = hook_commons['sys_tmp'] self.policy = hook_commons['policy'] self.manifest = hook_commons['manifest'] self.from_cmdline = False # precede 'report -t' option above 'cleaner --jobs' if not hasattr(self.opts, 'jobs'): self.opts.jobs = self.opts.threads self.opts.archive_type = 'auto' self.soslog = logging.getLogger('sos') self.ui_log = logging.getLogger('sos_ui') # create the tmp subdir here to avoid a potential race condition # when obfuscating a SoSCollector run during archive extraction os.makedirs(os.path.join(self.tmpdir, 'cleaner'), exist_ok=True) self.review_parser_values() self.cleaner_mapping = self.load_map_file() os.umask(0o77) self.in_place = in_place self.hash_name = self.policy.get_preferred_hash_name() self.cleaner_md = self.manifest.components.add_section('cleaner') cleaner_dir = os.path.dirname(self.opts.map_file) \ if self.opts.map_file else '/etc/sos/cleaner' parser_args = [ self.cleaner_mapping, cleaner_dir, self.opts.skip_cleaning_files, ] self.parsers = [ SoSHostnameParser(*parser_args), SoSIPParser(*parser_args), SoSIPv6Parser(*parser_args), SoSMacParser(*parser_args), SoSKeywordParser(*parser_args), SoSUsernameParser(*parser_args), ] for _parser in self.opts.disable_parsers: for _loaded in self.parsers: _temp = _loaded.name.lower().split('parser', maxsplit=1)[0] _loaded_name = _temp.strip() if _parser.lower().strip() == _loaded_name: self.log_info(f"Disabling parser: {_loaded_name}") self.ui_log.warning( f"Disabling the '{_parser}' parser. Be aware that this" " may leave sensitive plain-text data in the archive." ) self.parsers.remove(_loaded) self.archive_types = [ SoSReportDirectory, SoSReportArchive, SoSCollectorDirectory, SoSCollectorArchive, InsightsArchive, # make sure these two are always last as they are fallbacks DataDirArchive, TarballArchive ] self.nested_archive = None self.log_info( f"Cleaner initialized. From cmdline: {self.from_cmdline}") def _fmt_log_msg(self, msg, caller=None): return f"[cleaner{f':{caller}' if caller else ''}] {msg}" def log_debug(self, msg, caller=None): self.soslog.debug(self._fmt_log_msg(msg, caller)) def log_info(self, msg, caller=None): self.soslog.info(self._fmt_log_msg(msg, caller)) def log_error(self, msg, caller=None): self.soslog.error(self._fmt_log_msg(msg, caller)) @classmethod def display_help(cls, section): section.set_title("SoS Cleaner Detailed Help") section.add_text(cls.__doc__) def load_map_file(self): """Verifies that the map file exists and has usable content. If the provided map file does not exist, or it is empty, we will print a warning and continue on with cleaning building a fresh map """ _conf = {} default_map = '/etc/sos/cleaner/default_mapping' if os.path.isdir(self.opts.map_file): raise Exception(f"Requested map file {self.opts.map_file} is a " "directory") if not os.path.exists(self.opts.map_file): if self.opts.map_file != default_map: self.log_error( f"ERROR: map file {self.opts.map_file} does not exist, " "will not load any obfuscation matches") else: with open(self.opts.map_file, 'r', encoding='utf-8') as mf: try: _conf = json.load(mf) except json.JSONDecodeError: self.log_error("ERROR: Unable to parse map file, json is " "malformed. Will not load any mappings.") except Exception as err: self.log_error("ERROR: Could not load " f"'{self.opts.map_file}': {err}") return _conf def print_disclaimer(self): """When we are directly running `sos clean`, rather than hooking into SoSCleaner via report or collect, print a disclaimer banner """ msg = self._fmt_msg("""\ This command will attempt to obfuscate information that is generally \ considered to be potentially sensitive. Such information includes IP \ addresses, MAC addresses, domain names, and any user-provided keywords. Note that this utility provides a best-effort approach to data obfuscation, \ but it does not guarantee that such obfuscation provides complete coverage of \ all such data in the archive, or that any obfuscation is provided to data that\ does not fit the description above. Users should review any resulting data and/or archives generated or processed \ by this utility for remaining sensitive content before being passed to a \ third party. """) self.ui_log.info(f"\nsos clean (version {__version__})\n") self.ui_log.info(msg) if not self.opts.batch: try: input("\nPress ENTER to continue, or CTRL-C to quit.\n") except KeyboardInterrupt: self.ui_log.info("\nExiting on user cancel") self._exit(130) except Exception as e: self._exit(1, e) @classmethod def add_parser_options(cls, parser): parser.usage = 'sos clean|mask TARGET [options]' clean_grp = parser.add_argument_group( 'Cleaner/Masking Options', 'These options control how data obfuscation is performed' ) clean_grp.add_argument('target', metavar='TARGET', help='The directory or archive to obfuscate') clean_grp.add_argument('--archive-type', default='auto', choices=['auto', 'report', 'collect', 'insights', 'data-dir', 'tarball'], help=('Specify what kind of archive the target ' 'was generated as')) clean_grp.add_argument('--domains', action='extend', default=[], help='List of domain names to obfuscate') clean_grp.add_argument('--disable-parsers', action='extend', default=[], dest='disable_parsers', help=('Disable specific parsers, so that those ' 'elements are not obfuscated')) clean_grp.add_argument('--skip-cleaning-files', '--skip-masking-files', action='extend', default=[], dest='skip_cleaning_files', help=('List of files to skip/ignore during ' 'cleaning. Globs are supported.')) clean_grp.add_argument('-j', '--jobs', default=4, type=int, help='Number of concurrent archives to clean') clean_grp.add_argument('--keywords', action='extend', default=[], dest='keywords', help='List of keywords to obfuscate') clean_grp.add_argument('--keyword-file', default=None, dest='keyword_file', help='Provide a file a keywords to obfuscate') clean_grp.add_argument('--map-file', dest='map_file', default='/etc/sos/cleaner/default_mapping', help=('Provide a previously generated mapping ' 'file for obfuscation')) clean_grp.add_argument('--no-update', dest='no_update', default=False, action='store_true', help='Do not update the --map-file with new ' 'mappings from this run') clean_grp.add_argument('--keep-binary-files', default=False, action='store_true', dest='keep_binary_files', help='Keep unprocessable binary files in the ' 'archive instead of removing them') clean_grp.add_argument('--usernames', dest='usernames', default=[], action='extend', help='List of usernames to obfuscate') def set_target_path(self, path): """For use by report and collect to set the TARGET option appropriately so that execute() can be called just as if we were running `sos clean` directly from the cmdline. """ self.opts.target = path def inspect_target_archive(self): """The target path is not a directory, so inspect it for being an archive or an archive of archives. In the event the target path is not an archive, abort. """ _arc = None if self.opts.archive_type != 'auto': check_type = self.opts.archive_type.replace('-', '_') for archive in self.archive_types: if archive.type_name == check_type: _arc = archive(self.opts.target, self.tmpdir, self.opts.keep_binary_files) else: for arc in self.archive_types: if arc.check_is_type(self.opts.target): _arc = arc(self.opts.target, self.tmpdir, self.opts.keep_binary_files) break if not _arc: return self.main_archive = _arc self.report_paths.append(_arc) if _arc.is_nested: self.report_paths.extend(_arc.get_nested_archives()) # We need to preserve the top level archive until all # nested archives are processed self.report_paths.remove(_arc) self.nested_archive = _arc if self.nested_archive: self.nested_archive.ui_name = self.nested_archive.description def review_parser_values(self): """Check any values passed to the parsers via the commandline: - For the --domains option, ensure that they are valid for the parser in question. - Convert --skip-cleaning-files from globs to regular expressions. """ for _dom in self.opts.domains: if len(_dom.split('.')) < 2: raise Exception( f"Invalid value '{_dom}' given: --domains values must be " "actual domains" ) self.opts.skip_cleaning_files = [fnmatch.translate(p) for p in self.opts.skip_cleaning_files] def execute(self): """SoSCleaner will begin by inspecting the TARGET option to determine if it is a directory, archive, or archive of archives. In the case of a directory, the default behavior will be to edit the data in place. For an archive will we unpack the archive, iterate over the contents, and then repack the archive. In the case of an archive of archives, such as one from SoSCollector, each archive will be unpacked, cleaned, and repacked and the final top-level archive will then be repacked as well. """ self.opts.target = self.opts.target.rstrip('/') self.arc_name = self.opts.target.split('/')[-1].split('.tar')[0] if self.from_cmdline: self.print_disclaimer() self.report_paths = [] if not os.path.exists(self.opts.target): self.ui_log.error("Invalid target: no such file or directory " f"{self.opts.target}") self._exit(1) self.inspect_target_archive() if not self.report_paths: self.ui_log.error("No valid archives or directories found\n") self._exit(1) # we have at least one valid target to obfuscate self.completed_reports = [] # TODO: as we separate mappings and parsers further, do this in a less # janky manner for parser in self.parsers: if parser.name == 'Hostname Parser': parser.mapping.set_initial_counts() self.preload_all_archives_into_maps() self.generate_parser_item_regexes() self.obfuscate_report_paths() if not self.completed_reports: if self.in_place: return None self.ui_log.info("No reports obfuscated, aborting...\n") self._exit(1) self.ui_log.info("\nSuccessfully obfuscated " f"{len(self.completed_reports)} report(s)\n") _map = self.compile_mapping_dict() map_path = self.write_map_for_archive(_map) self.write_map_for_config(_map) self.write_stats_to_manifest() if self.in_place: arc_paths = [a.final_archive_path for a in self.completed_reports] return map_path, arc_paths final_path = None if len(self.completed_reports) > 1: arc_path = self.rebuild_nested_archive() else: arc = self.completed_reports[0] arc_path = arc.final_archive_path checksum = self.get_new_checksum(arc.final_archive_path) if checksum is not None: chksum_name = self.obfuscate_string( f"{arc_path.split('/')[-1]}.{self.hash_name}" ) with open(os.path.join(self.sys_tmp, chksum_name), 'w', encoding='utf-8') as cf: cf.write(checksum) self.write_cleaner_log() final_path = os.path.join( self.sys_tmp, self.obfuscate_string(arc_path.split('/')[-1]) ) shutil.move(arc_path, final_path) arcstat = os.stat(final_path) # while these messages won't be included in the log file in the archive # some facilities, such as our avocado test suite, will sometimes not # capture print() output, so leverage the ui_log to print to console self.ui_log.info( f"A mapping of obfuscated elements is available at\n\t{map_path}" ) self.ui_log.info( f"\nThe obfuscated archive is available at\n\t{final_path}\n" ) self.ui_log.info(f"\tSize\t{get_human_readable(arcstat.st_size)}") self.ui_log.info(f"\tOwner\t{getpwuid(arcstat.st_uid).pw_name}\n") self.ui_log.info("Please send the obfuscated archive to your support " "representative and keep the mapping file private") self.cleanup() return None def rebuild_nested_archive(self): """Handles repacking the nested tarball, now containing only obfuscated copies of the reports, log files, manifest, etc... """ # we have an archive of archives, so repack the obfuscated tarball arc_name = self.arc_name + '-obfuscated' self.setup_archive(name=arc_name) for archive in self.completed_reports: arc_dest = archive.final_archive_path.split('/')[-1] checksum = self.get_new_checksum(archive.final_archive_path) if checksum is not None: dname = f"checksums/{arc_dest}.{self.hash_name}" self.archive.add_string(checksum, dest=dname) for dirn, _, files in os.walk(self.nested_archive.extracted_path): for filename in files: fname = os.path.join(dirn, filename) dname = fname.split(self.nested_archive.extracted_path)[-1] dname = dname.lstrip('/') self.archive.add_file(fname, dest=dname) # remove it now so we don't balloon our fs space needs os.remove(fname) self.write_cleaner_log(archive=True) return self.archive.finalize(self.opts.compression_type) def compile_mapping_dict(self): """Build a dict that contains each parser's map as a key, with the contents as that key's value. This will then be written to disk in the same directory as the obfuscated report so that sysadmins have a way to 'decode' the obfuscation locally """ _map = {} for parser in self.parsers: _map[parser.map_file_key] = {} _map[parser.map_file_key].update(parser.get_map_contents()) return _map def write_map_to_file(self, _map, path): """Write the mapping to a file on disk that is in the same location as the final archive(s). """ with open(path, 'w', encoding='utf-8') as mf: mf.write(json.dumps(_map, indent=4)) return path def write_map_for_archive(self, _map): try: map_path = os.path.join( self.sys_tmp, self.obfuscate_string(f"{self.arc_name}-private_map") ) return self.write_map_to_file(_map, map_path) except Exception as err: self.log_error(f"Could not write private map file: {err}") return None def write_map_for_config(self, _map): """Write the mapping to the config file so that subsequent runs are able to provide the same consistent mapping """ if self.opts.map_file and not self.opts.no_update: cleaner_dir = os.path.dirname(self.opts.map_file) # Attempt to create the directory /etc/sos/cleaner # just in case it didn't exist previously try: os.makedirs(cleaner_dir, exist_ok=True) self.write_map_to_file(_map, self.opts.map_file) self.log_debug(f"Wrote mapping to {self.opts.map_file}") except Exception as err: self.log_error(f"Could not update mapping config file: {err}") def write_cleaner_log(self, archive=False): """When invoked via the command line, the logging from SoSCleaner will not be added to the archive(s) it processes, so we need to write it separately to disk """ log_name = os.path.join( self.sys_tmp, f"{self.arc_name}-obfuscation.log" ) with open(log_name, 'w', encoding='utf-8') as logfile: self.sos_log_file.seek(0) for line in self.sos_log_file.readlines(): logfile.write(line) if archive: self.obfuscate_file(log_name) self.archive.add_file(log_name, dest="sos_logs/cleaner.log") def get_new_checksum(self, archive_path): """Calculate a new checksum for the obfuscated archive, as the previous checksum will no longer be valid """ try: hash_size = 1024**2 # Hash 1MiB of content at a time. with open(archive_path, 'rb') as archive_fp: digest = hashlib.new(self.hash_name) while True: hashdata = archive_fp.read(hash_size) if not hashdata: break digest.update(hashdata) return digest.hexdigest() + '\n' except Exception as err: self.log_debug(f"Could not generate new checksum: {err}") return None def obfuscate_report_paths(self): """Perform the obfuscation for each archive or sos directory discovered during setup. Each archive is handled in a separate thread, up to self.opts.jobs will be obfuscated concurrently. """ try: msg = ( f"Found {len(self.report_paths)} total reports to obfuscate, " f"processing up to {self.opts.jobs} concurrently within one " "archive\n" ) self.ui_log.info(msg) if self.opts.keep_binary_files: self.ui_log.warning( "WARNING: binary files that potentially contain sensitive " "information will NOT be removed from the final archive\n" ) for report_path in self.report_paths: self.ui_log.info(f"Obfuscating {report_path.archive_path}") self.obfuscate_report(report_path) # finally, obfuscate the nested archive if one exists if self.nested_archive: self._replace_obfuscated_archives() self.obfuscate_report(self.nested_archive) except KeyboardInterrupt: self.ui_log.info("Exiting on user cancel") os._exit(130) def _replace_obfuscated_archives(self): """When we have a nested archive, we need to rebuild the original archive, which entails replacing the existing archives with their obfuscated counterparts """ for archive in self.completed_reports: os.remove(archive.archive_path) dest = self.nested_archive.extracted_path tarball = archive.final_archive_path.split('/')[-1] dest_name = os.path.join(dest, tarball) shutil.move(archive.final_archive_path, dest) archive.final_archive_path = dest_name def generate_parser_item_regexes(self): """For the parsers that use prebuilt lists of items, generate those regexes now since all the parsers should be preloaded by the archive(s) as well as being handed cmdline options and mapping file configuration. """ for parser in self.parsers: parser.generate_item_regexes() def _prepare_archive_with_prepper(self, archive, prepper): """ For each archive we've determined we need to operate on, pass it to each prepper so that we can extract necessary files and/or items for direct regex replacement. Preppers define these methods per parser, so it is possible that a single prepper will read the same file for different parsers/mappings. This is preferable to the alternative of building up monolithic lists of file paths, as we'd still need to manipulate these on a per-archive basis. :param archive: The archive we are currently using to prepare our mappings with :type archive: ``SoSObfuscationArchive`` subclass :param prepper: The individual prepper we're using to source items :type prepper: ``SoSPrepper`` subclass """ for _parser in self.parsers: pname = _parser.name.lower().split()[0].strip() for _file in prepper.get_parser_file_list(pname, archive): content = archive.get_file_content(_file) if not content: continue self.log_debug(f"Prepping {pname} parser with file {_file} " f"from {archive.ui_name}") for line in content.splitlines(): try: _parser.parse_line(line) except Exception as err: self.log_debug( f"Failed to prep {pname} map from {_file}: {err}" ) map_items = prepper.get_items_for_map(pname, archive) if map_items: self.log_debug(f"Prepping {pname} mapping with items from " f"{archive.ui_name}") for item in map_items: _parser.mapping.add(item) for ritem in prepper.regex_items[pname]: _parser.mapping.add_regex_item(ritem) # we must initialize stuff inside (cloned processes') archive - REALLY? archive.set_parsers(self.parsers) def get_preppers(self): """ Discover all locally available preppers so that we can prepare the mappings with obfuscation matches in a controlled manner :returns: All preppers that can be leveraged locally :rtype: A generator of `SoSPrepper` items """ helper = ImporterHelper(sos.cleaner.preppers) preps = [] for _prep in helper.get_modules(): preps.extend(import_module(f"sos.cleaner.preppers.{_prep}")) for prepper in sorted(preps, key=lambda x: x.priority): yield prepper(options=self.opts) def preload_all_archives_into_maps(self): """Before doing the actual obfuscation, if we have multiple archives to obfuscate then we need to preload each of them into the mappings to ensure that node1 is obfuscated in node2 as well as node2 being obfuscated in node1's archive. """ self.log_info("Pre-loading all archives into obfuscation maps") for prepper in self.get_preppers(): for archive in self.report_paths: self._prepare_archive_with_prepper(archive, prepper) self.main_archive.set_parsers(self.parsers) def obfuscate_report(self, archive): # pylint: disable=too-many-branches """Individually handle each archive or directory we've discovered by running through each file therein. Positional arguments: :param archive str: Filepath to the directory or archive """ try: arc_md = self.cleaner_md.add_section(archive.archive_name) start_time = datetime.now() arc_md.add_field('start_time', start_time) # don't double extract nested archives if not archive.is_extracted: archive.extract() archive.report_msg("Beginning obfuscation...") file_list = list(archive.get_files()) # we can't call simple # executor.map(archive.obfuscate_arc_files,archive.get_files()) # because a child process does not carry forward internal changes # (e.g. mappings' datasets) from one call of obfuscate_arc_files # method to another. Each obfuscate_arc_files method starts with # vanilla parent archive, that is initialised *once* at its # beginning via initializer=archive.load_parser_entries # - but not afterwards.. # # So we must pass list of all files for each worker at the # beginning. This means less granularity of the child processes # work (one worker can finish much sooner than the other), but # it is the best we can have (or have found) # # At least, the "file_list[i::self.opts.jobs]" means subsequent # files (speculativelly of similar size and content) are # distributed to different processes, which attempts to split the # load evenly. Yet better approach might be reorderig file_list # based on files' sizes. files_obfuscated_count = total_sub_count = removed_file_count = 0 archive_list = [archive for i in range(self.opts.jobs)] with ProcessPoolExecutor( max_workers=self.opts.jobs, initializer=archive.load_parser_entries) as executor: futures = executor.map(obfuscate_arc_files, archive_list, [file_list[i::self.opts.jobs] for i in range(self.opts.jobs)]) for (foc, tsc, rfc) in futures: files_obfuscated_count += foc total_sub_count += tsc removed_file_count += rfc # As there is no easy way to get dataset dicts from child # processes' mappings, we can reload our own parent-process # archive from the disk files. The trick is that sequence of # files/entries is the source of truth of *sequence* of calling # *all* mapping.all(item) methods - so replaying this will # generate the right datasets! archive.load_parser_entries() try: self.obfuscate_directory_names(archive) except Exception as err: self.log_info(f"Failed to obfuscate directories: {err}", caller=archive.archive_name) try: self.obfuscate_symlinks(archive) except Exception as err: self.log_info(f"Failed to obfuscate symlinks: {err}", caller=archive.archive_name) # if the archive was already a tarball, repack it if not archive.is_nested: method = archive.get_compression() if method: archive.report_msg("Re-compressing...") try: archive.rename_top_dir( self.obfuscate_string(archive.archive_name) ) archive.compress(method) except Exception as err: self.log_debug(f"Archive {archive.archive_name} failed" f" to compress: {err}") archive.report_msg( f"Failed to re-compress archive: {err}") return self.completed_reports.append(archive) end_time = datetime.now() arc_md.add_field('end_time', end_time) arc_md.add_field('run_time', end_time - start_time) arc_md.add_field('files_obfuscated', files_obfuscated_count) arc_md.add_field('total_substitutions', total_sub_count) rmsg = '' if removed_file_count: rmsg = " [removed %s unprocessable files]" rmsg = rmsg % removed_file_count archive.report_msg(f"Obfuscation completed{rmsg}") except Exception as err: self.ui_log.info("Exception while processing " f"{archive.archive_name}: {err}") def obfuscate_file(self, filename): self.main_archive.obfuscate_arc_files([filename]) def obfuscate_symlinks(self, archive): """Iterate over symlinks in the archive and obfuscate their names. The content of the link target will have already been cleaned, and this second pass over just the names of the links is to ensure we avoid a possible race condition dependent on the order in which the link or the target get obfuscated. :param archive: The archive being obfuscated :type archive: ``SoSObfuscationArchive`` """ self.log_info("Obfuscating symlink names", caller=archive.archive_name) for symlink in archive.get_symlinks(): try: # relative name of the symlink in the archive _sym = symlink.split(archive.extracted_path)[1].lstrip('/') # don't obfuscate symlinks for files that we skipped the first # obfuscation of, as that would create broken links _parsers = [ _p for _p in self.parsers if not any(_skip.match(_sym) for _skip in _p.skip_patterns) ] if not _parsers: self.log_debug( f"Skipping obfuscation of symlink {_sym} due to skip " f"pattern match" ) continue self.log_debug(f"Obfuscating symlink {_sym}", caller=archive.archive_name) # current target of symlink, again relative to the archive _target = os.readlink(symlink) # get the potentially renamed symlink name, this time the full # path as it exists on disk _ob_sym_name = os.path.join(archive.extracted_path, self.obfuscate_string(_sym)) # get the potentially renamed relative target filename _ob_target = self.obfuscate_string(_target) # if either the symlink name or the target name has changed, # recreate the symlink if (_ob_sym_name != symlink) or (_ob_target != _target): os.remove(symlink) os.symlink(_ob_target, _ob_sym_name) except Exception as err: self.log_info(f"Error obfuscating symlink '{symlink}': {err}") def obfuscate_directory_names(self, archive): """For all directories that exist within the archive, obfuscate the directory name if it contains sensitive strings found during execution """ self.log_info("Obfuscating directory names in archive " f"{archive.archive_name}") for dirpath in sorted(archive.get_directory_list(), reverse=True): for _name in os.listdir(dirpath): _dirname = os.path.join(dirpath, _name) _arc_dir = _dirname.split(archive.extracted_path)[-1] if os.path.isdir(_dirname): _ob_dirname = self.obfuscate_string(_name) if _ob_dirname != _name: _ob_arc_dir = _arc_dir.rstrip(_name) _ob_arc_dir = os.path.join( archive.extracted_path, _ob_arc_dir.lstrip('/'), _ob_dirname ) os.rename(_dirname, _ob_arc_dir) # TODO: this is a duplicate method from SoSObfuscationArchive but we can't # easily remove either of them..? def obfuscate_string(self, string_data): for parser in self.parsers: try: string_data = parser.parse_string_for_keys(string_data) except Exception as err: self.log_info(f"Error obfuscating string data: {err}") return string_data def write_stats_to_manifest(self): """Write some cleaner-level, non-report-specific stats to the manifest """ parse_sec = self.cleaner_md.add_section('parsers') for parser in self.parsers: _sec = parse_sec.add_section(parser.name.replace(' ', '_').lower()) _sec.add_field('entries', len(parser.mapping.dataset.keys())) # vim: set et ts=4 sw=4 : usr/lib/python3.9/site-packages/dnf/__init__.py 0000644 00000002555 15112213753 0015305 0 ustar 00 # __init__.py # The toplevel DNF package. # # Copyright (C) 2012-2016 Red Hat, Inc. # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # the GNU General Public License v.2, or (at your option) any later version. # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY expressed or implied, including the implied warranties of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. You should have received a copy of the # GNU General Public License along with this program; if not, write to the # Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301, USA. Any Red Hat trademarks that are incorporated in the # source code or documentation are not subject to the GNU General Public # License and may only be used or replicated with the express permission of # Red Hat, Inc. # from __future__ import unicode_literals import warnings import dnf.pycomp warnings.filterwarnings('once', category=DeprecationWarning, module=r'^dnf\..*$') from dnf.const import VERSION __version__ = VERSION # :api import dnf.base Base = dnf.base.Base # :api import dnf.plugin Plugin = dnf.plugin.Plugin # :api # setup libraries dnf.pycomp.urlparse.uses_fragment.append("media") usr/lib/python3.9/site-packages/dnf/yum/__init__.py 0000644 00000001750 15112225662 0016115 0 ustar 00 # __init__.py # The legacy YUM subpackage. # # Copyright (C) 2013 Red Hat, Inc. # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # the GNU General Public License v.2, or (at your option) any later version. # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY expressed or implied, including the implied warranties of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. You should have received a copy of the # GNU General Public License along with this program; if not, write to the # Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301, USA. Any Red Hat trademarks that are incorporated in the # source code or documentation are not subject to the GNU General Public # License and may only be used or replicated with the express permission of # Red Hat, Inc. # usr/lib/python3.9/site-packages/certbot/tests/__init__.py 0000644 00000000052 15112241777 0017337 0 ustar 00 """Utilities for running Certbot tests""" usr/lib/python3.9/site-packages/sos/collector/clusters/__init__.py 0000644 00000037310 15112245474 0021176 0 ustar 00 # Copyright Red Hat 2020, Jake Hunsaker <jhunsake@redhat.com> # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. import logging from threading import Lock from sos.options import ClusterOption from sos.utilities import bold class Cluster(): """This is the class that cluster profiles should subclass in order to add support for different clustering technologies and environments to sos collect. A profile should at minimum define a package that indicates the node is configured for the type of cluster the profile is intended to serve and then additionally be able to return a list of enumerated nodes via the ``get_nodes()`` method :param commons: The commons dict containing system information. The same as what is handed to ``Plugin()`` :type commons: ``dict`` :cvar option_list: Options supported by the profile, and set by the --cluster-option cmdline arg :vartype option_list: ``list`` of ``tuples`` :cvar packages: What package(s) should this profile enable on :vartype packages: ``tuple`` :cvar sos_plugins: Which plugins to forcibly enable for node reports :vartype sos_plugins: ``list`` :cvar sos_options: Options to pass to report on every node :vartype sos_options: ``dict`` :cvar sos_plugin_options: Plugin options to forcibly set for nodes :vartype sos_plugin_options: ``dict`` :cvar sos_preset: A SoSReport preset to forcibly enable on nodes :vartype sos_preset: ``str`` :cvar cluster_name: The name of the cluster type :vartype cluster_name: ``str`` """ option_list = [] packages = ('',) sos_plugins = [] sos_options = {} sos_plugin_options = {} sos_preset = '' cluster_name = None # set this to True if the local host running collect should *not* be # forcibly added to the node list. This can be helpful in situations where # the host's fqdn and the name the cluster uses are different strict_node_list = False def __init__(self, commons): self.primary = None self.cluster_ssh_key = None self.tmpdir = commons['tmpdir'] self.opts = commons['cmdlineopts'] self.cluster_type = [self.__class__.__name__] for cls in self.__class__.__bases__: if cls.__name__ != 'Cluster': self.cluster_type.append(cls.__name__) self.node_list = None self.lock = Lock() self.soslog = logging.getLogger('sos') self.ui_log = logging.getLogger('sos_ui') self.options = [] self._get_options() @classmethod def name(cls): """Returns the cluster's name as a string. """ if cls.cluster_name: return cls.cluster_name return cls.__name__.lower() @classmethod def display_help(cls, section): # pylint: disable=too-many-branches if cls is Cluster: cls.display_self_help(section) return section.set_title(f"{cls.cluster_name} Cluster Profile Detailed Help") if cls.__doc__ and cls.__doc__ is not Cluster.__doc__: section.add_text(cls.__doc__) # [1] here is the actual cluster profile elif cls.__mro__[1].__doc__ and cls.__mro__[1] is not Cluster: section.add_text(cls.__mro__[1].__doc__) else: section.add_text( "\n\tDetailed help not available for this profile\n" ) if cls.packages: section.add_text( "Enabled by the following packages: " f"{', '.join(p for p in cls.packages)}", newline=False ) if cls.sos_preset: section.add_text( f"Uses the following sos preset: {cls.sos_preset}", newline=False ) if cls.sos_options: _opts = ', '.join(f'--{k} {v}' for k, v in cls.sos_options.items()) section.add_text(f"Sets the following sos options: {_opts}") if cls.sos_plugins: section.add_text( "Enables the following plugins: " f"{', '.join(plug for plug in cls.sos_plugins)}", newline=False ) if cls.sos_plugin_options: _opts = cls.sos_plugin_options opts = ', '.join(f"{k}={v}" for k, v in _opts.items()) section.add_text( f"Sets the following plugin options: {opts}", newline=False ) if cls.option_list: optsec = section.add_section("Available cluster options") optsec.add_text( "These options may be toggled or changed using " f"'{bold(f'-c {cls.__name__}.$option=$value')}'" ) optsec.add_text( bold( f"\n{' ':<4}{'Option Name':<20}{'Default':<30}" f"{'Description':<20}\n"), newline=False ) for opt in cls.option_list: val = opt[1] if isinstance(val, bool): if val: val = 'True/On' else: val = 'False/Off' _ln = f"{' ':<4}{opt[0]:<20}{val:<30}{opt[2]:<20}" optsec.add_text(_ln, newline=False) @classmethod def display_self_help(cls, section): section.set_title('SoS Collect Cluster Profiles Detailed Help') section.add_text( '\nCluster profiles are used to represent different clustering ' 'technologies or platforms. Profiles define how cluster nodes are ' 'discovered, and optionally filtered, for default executions of ' 'collector.' ) section.add_text( 'Cluster profiles are enabled similarly to SoS report plugins; ' 'usually by package, command, or configuration file presence. ' 'Clusters may also define default transports for SoS collect.' ) from sos.collector import SoSCollector import inspect clusters = SoSCollector._load_modules(inspect.getmodule(cls), 'clusters') section.add_text( 'The following cluster profiles are locally available:\n' ) section.add_text( f"{' ':>8}{'Name':<40}{'Description':<30}", newline=False ) for cluster in clusters: _sec = bold(f"collect.clusters.{cluster[0]}") section.add_text( f"{' ':>8}{_sec:<40}{cluster[1].cluster_name:<30}", newline=False ) def _get_options(self): """Loads the options defined by a cluster and sets the default value""" for opt in self.option_list: option = ClusterOption(name=opt[0], opt_type=opt[1].__class__, value=opt[1], cluster=self.cluster_type, description=opt[2]) self.options.append(option) def _fmt_msg(self, msg): return f'[{self.cluster_type[0]}] {msg}' def log_info(self, msg): """Used to print info messages""" self.soslog.info(self._fmt_msg(msg)) def log_error(self, msg): """Used to print error messages""" self.soslog.error(msg) def log_debug(self, msg): """Used to print debug messages""" self.soslog.debug(self._fmt_msg(msg)) def log_warn(self, msg): """Used to print warning messages""" self.soslog.warning(self._fmt_msg(msg)) def get_option(self, option): """ This is used to by clusters to check if a cluster option was supplied to sos collect :param option: The name of the option to fetch :type option: ``str`` :returns: The value of the requested option if it exists, or ``False`` """ # check CLI before defaults for opt in self.opts.cluster_options: if opt.name == option and opt.cluster in self.cluster_type: return opt.value # provide defaults otherwise for opt in self.options: if opt.name == option: return opt.value return False def add_default_ssh_key(self, key): """Some clusters generate and/or deploy well-known and consistent SSH keys across environments. If this is the case, the cluster profile may call this command so that subsequent node connections will use that key rather than prompting the user for one or a password. Note this will only function if collector is being run locally on the primary node. """ self.cluster_ssh_key = key def set_node_options(self, node): """If there is a need to set specific options on ONLY the non-primary nodes in a collection, override this method in the cluster profile and do that here. :param node: The non-primary node :type node: ``SoSNode`` """ def set_transport_type(self): """The default connection type used by sos collect is to leverage the local system's SSH installation using ControlPersist, however certain cluster types may want to use something else. Override this in a specific cluster profile to set the ``transport`` option according to what type of transport should be used. """ return 'control_persist' def set_primary_options(self, node): """If there is a need to set specific options in the sos command being run on the cluster's primary nodes, override this method in the cluster profile and do that here. :param node: The primary node :type node: ``SoSNode`` """ def check_node_is_primary(self, node): """In the event there are multiple primaries, or if the collect command is being run from a system that is technically capable of enumerating nodes but the cluster profiles needs to specify primary-specific options for other nodes, override this method in the cluster profile :param node: The node for the cluster to check :type node: ``SoSNode`` """ return node.address == self.primary.address def exec_primary_cmd(self, cmd, need_root=False, timeout=180, use_shell='auto'): """Used to retrieve command output from a (primary) node in a cluster :param cmd: The command to run :type cmd: ``str`` :param need_root: Does the command require root privileges :type need_root: ``bool`` :param timeout: Amount of time to allow cmd to run in seconds :type timeout: ``int`` :param use_shell: Does the command required execution within a shell? :type use_shell: ``auto`` or ``bool`` :returns: The output and status of `cmd` :rtype: ``dict`` """ res = self.primary.run_command(cmd, need_root=need_root, use_shell=use_shell, timeout=timeout) if res['output']: res['output'] = res['output'].replace('Password:', '') return res def setup(self): """ This MAY be used by a cluster to do prep work in case there are extra commands to be run even if a node list is given by the user, and thus get_nodes() would not be called """ def check_enabled(self): """ This may be overridden by clusters This is called by sos collect on each cluster type that exists, and is meant to return True when the cluster type matches a criteria that indicates that is the cluster type is in use. Only the first cluster type to determine a match is run :returns: ``True`` if the cluster profile should be used, or ``False`` :rtype: ``bool`` """ for pkg in self.packages: if self.primary.is_installed(pkg): return True return False def cleanup(self): """ This may be overridden by clusters Perform any necessary cleanup steps required by the cluster profile. This helps ensure that sos does make lasting changes to the environment in which we are running """ def get_nodes(self): """ This MUST be overridden by a cluster profile subclassing this class A cluster should use this method to return a list or string that contains all the nodes that a report should be collected from :returns: A list of node FQDNs or IP addresses :rtype: ``list`` or ``None`` """ raise NotImplementedError def _get_nodes(self): try: return self.format_node_list() except Exception as e: self.log_debug(f'Failed to get node list: {e}') return [] def get_node_label(self, node): """ Used by ``SosNode()`` to retrieve the appropriate label from the cluster as set by ``set_node_label()`` in the cluster profile. :param node: The name of the node to get a label for :type node: ``str`` :returns: The label to use for the node's report :rtype: ``str`` """ label = self.set_node_label(node) node.manifest.add_field('label', label) return label def set_node_label(self, node): # pylint: disable=unused-argument """This may be overridden by clusters profiles subclassing this class If there is a distinction between primaries and nodes, or types of nodes, then this can be used to label the sos report archives as needed """ return '' def format_node_list(self): """ Format the returned list of nodes from a cluster into a known format. This being a list that contains no duplicates :returns: A list of nodes, without extraneous entries from cmd output :rtype: ``list`` """ try: nodes = self.get_nodes() except Exception as err: raise Exception(f"Cluster failed to enumerate nodes: {err}") \ from err if isinstance(nodes, list): node_list = [n.strip() for n in nodes if n] elif isinstance(nodes, str): node_list = [n.split(',').strip() for n in nodes] else: raise Exception(f"Cluster returned unexpected node list: {nodes}") node_list = list(set(node_list)) for node in node_list: if node.startswith(('-', '_', '(', ')', '[', ']', '/', '\\')): node_list.remove(node) return node_list def _run_extra_cmd(self): """ Ensures that any files returned by a cluster's run_extra_cmd() method are properly typed as a list for iterative collection. If any of the files are an additional sos report (e.g. the ovirt db dump) then the md5 sum file is automatically added to the list """ files = [] try: res = self.run_extra_cmd() if res: if not isinstance(res, list): res = [res] for extra_file in res: extra_file = extra_file.strip() files.append(extra_file) if 'sosreport' in extra_file: files.append(extra_file + '.md5') except AttributeError: # run_extra_cmd() not defined for cluster profile pass return files usr/lib/python3.9/site-packages/requests/__init__.py 0000644 00000010055 15112354150 0016400 0 ustar 00 # -*- coding: utf-8 -*- # __ # /__) _ _ _ _ _/ _ # / ( (- (/ (/ (- _) / _) # / """ Requests HTTP Library ~~~~~~~~~~~~~~~~~~~~~ Requests is an HTTP library, written in Python, for human beings. Basic GET usage: >>> import requests >>> r = requests.get('https://www.python.org') >>> r.status_code 200 >>> b'Python is a programming language' in r.content True ... or POST: >>> payload = dict(key1='value1', key2='value2') >>> r = requests.post('https://httpbin.org/post', data=payload) >>> print(r.text) { ... "form": { "key1": "value1", "key2": "value2" }, ... } The other HTTP methods are supported - see `requests.api`. Full documentation is at <https://requests.readthedocs.io>. :copyright: (c) 2017 by Kenneth Reitz. :license: Apache 2.0, see LICENSE for more details. """ import urllib3 import chardet import warnings from .exceptions import RequestsDependencyWarning def check_compatibility(urllib3_version, chardet_version): urllib3_version = urllib3_version.split('.') assert urllib3_version != ['dev'] # Verify urllib3 isn't installed from git. # Sometimes, urllib3 only reports its version as 16.1. if len(urllib3_version) == 2: urllib3_version.append('0') # Check urllib3 for compatibility. major, minor, patch = urllib3_version # noqa: F811 major, minor, patch = int(major), int(minor), int(patch) # urllib3 >= 1.21.1, <= 1.26 assert major == 1 assert minor >= 21 assert minor <= 26 # Check chardet for compatibility. major, minor, patch = chardet_version.split('.')[:3] major, minor, patch = int(major), int(minor), int(patch) # chardet >= 3.0.2, < 5.0.0 assert (3, 0, 2) <= (major, minor, patch) < (5, 0, 0) def _check_cryptography(cryptography_version): # cryptography < 1.3.4 try: cryptography_version = list(map(int, cryptography_version.split('.'))) except ValueError: return if cryptography_version < [1, 3, 4]: warning = 'Old version of cryptography ({}) may cause slowdown.'.format(cryptography_version) warnings.warn(warning, RequestsDependencyWarning) # Check imported dependencies for compatibility. try: check_compatibility(urllib3.__version__, chardet.__version__) except (AssertionError, ValueError): warnings.warn("urllib3 ({}) or chardet ({}) doesn't match a supported " "version!".format(urllib3.__version__, chardet.__version__), RequestsDependencyWarning) # Attempt to enable urllib3's fallback for SNI support # if the standard library doesn't support SNI or the # 'ssl' library isn't available. try: try: import ssl except ImportError: ssl = None if not getattr(ssl, "HAS_SNI", False): from urllib3.contrib import pyopenssl pyopenssl.inject_into_urllib3() # Check cryptography version from cryptography import __version__ as cryptography_version _check_cryptography(cryptography_version) except ImportError: pass # urllib3's DependencyWarnings should be silenced. from urllib3.exceptions import DependencyWarning warnings.simplefilter('ignore', DependencyWarning) from .__version__ import __title__, __description__, __url__, __version__ from .__version__ import __build__, __author__, __author_email__, __license__ from .__version__ import __copyright__, __cake__ from . import utils from . import packages from .models import Request, Response, PreparedRequest from .api import request, get, head, post, patch, put, delete, options from .sessions import session, Session from .status_codes import codes from .exceptions import ( RequestException, Timeout, URLRequired, TooManyRedirects, HTTPError, ConnectionError, FileModeWarning, ConnectTimeout, ReadTimeout ) # Set default logging handler to avoid "No handler found" warnings. import logging from logging import NullHandler logging.getLogger(__name__).addHandler(NullHandler()) # FileModeWarnings go off per the default. warnings.simplefilter('default', FileModeWarning, append=True)
| ver. 1.4 |
Github
|
.
| PHP 8.1.33 | Генерация страницы: 0.01 |
proxy
|
phpinfo
|
Настройка