Source code for sceptre.config.reader

# -*- coding: utf-8 -*-


This module implements a ConfigReader class, which is responsible for reading
and constructing Stacks.

import collections
import copy
import datetime
import fnmatch
import logging
from os import environ, path, walk
from pkg_resources import iter_entry_points
from pathlib import Path
import yaml

from jinja2 import Environment
from jinja2 import StrictUndefined
from jinja2 import FileSystemLoader
from jinja2 import select_autoescape
from packaging.specifiers import SpecifierSet
from packaging.version import Version

from sceptre import __version__
from sceptre.exceptions import DependencyDoesNotExistError
from sceptre.exceptions import InvalidConfigFileError
from sceptre.exceptions import InvalidSceptreDirectoryError
from sceptre.exceptions import VersionIncompatibleError
from sceptre.exceptions import ConfigFileNotFoundError
from sceptre.helpers import sceptreise_path
from sceptre.stack import Stack
from sceptre.config import strategies

ConfigAttributes = collections.namedtuple("Attributes", "required optional")

    "dependencies": strategies.list_join,
    "hooks": strategies.child_wins,
    "iam_role": strategies.child_wins,
    "notifications": strategies.child_wins,
    "on_failure": strategies.child_wins,
    "parameters": strategies.child_wins,
    "profile": strategies.child_wins,
    "project_code": strategies.child_wins,
    "protect": strategies.child_wins,
    "region": strategies.child_wins,
    "required_version": strategies.child_wins,
    "role_arn": strategies.child_wins,
    "sceptre_user_data": strategies.child_wins,
    "stack_name": strategies.child_wins,
    "stack_tags": strategies.child_wins,
    "stack_timeout": strategies.child_wins,
    "template_bucket_name": strategies.child_wins,
    "template_key_value": strategies.child_wins,
    "template_path": strategies.child_wins,
    "template": strategies.child_wins





[docs]class ConfigReader(object): """ Parses YAML configuration files and produces Stack objects. Responsible for loading Resolvers and Hook classes and adding them as constructors to the PyYAML parser. :param context: A SceptreContext. :type sceptre.context.SceptreContext: """ def __init__(self, context): self.logger = logging.getLogger(__name__) self.context = context self.full_config_path = self.context.full_config_path() # Check is valid sceptre project folder self._check_valid_project_path(self.full_config_path) # Add Resolver and Hook classes to PyYAML loader self._add_yaml_constructors(["sceptre.hooks", "sceptre.resolvers"]) if not self.context.user_variables: self.context.user_variables = {} self.templating_vars = {"var": self.context.user_variables} def _add_yaml_constructors(self, entry_point_groups): """ Adds PyYAML constructor functions for all classes found registered at the given entry point groups. Classes are registered whereby the node tag is the entry point name. :param entry_point_groups: Names of entry point groups. :type entry_point_groups: list """ self.logger.debug( "Adding yaml constructors for the entry point groups {0}".format( entry_point_groups ) ) def constructor_factory(node_class): """ Returns constructor that will initialise objects from a given node class. :param node_class: Class representing the node. :type node_class: class :returns: Class initialiser. :rtype: func """ # This function signture is required by PyYAML def class_constructor(loader, node): return node_class( loader.construct_object(self.resolve_node_tag(loader, node)) ) # pragma: no cover return class_constructor for group in entry_point_groups: for entry_point in iter_entry_points(group): # Retrieve name and class from entry point node_tag = u'!' + node_class = entry_point.load() # Add constructor to PyYAML loader yaml.SafeLoader.add_constructor( node_tag, constructor_factory(node_class) ) self.logger.debug( "Added constructor for %s with node tag %s", str(node_class), node_tag )
[docs] def resolve_node_tag(self, loader, node): node = copy.copy(node) node.tag = loader.resolve(type(node), node.value, (True, False)) return node
[docs] def construct_stacks(self): """ Traverses the files under the command path. For each file encountered, a Stack is constructed using the correct config. Dependencies are traversed and a final set of Stacks is returned. :returns: A set of Stacks. :rtype: set """ stack_map = {} command_stacks = set() root = self.context.full_command_path() if self.context.full_scan: root = self.context.full_config_path() if path.isfile(root): todo = {root} else: todo = set() for directory_name, sub_directories, files in walk(root, followlinks=True): for filename in fnmatch.filter(files, '*.yaml'): if filename.startswith('config.'): continue todo.add(path.join(directory_name, filename)) stack_group_configs = {} full_todo = todo.copy() deps_todo = set() while todo: abs_path = todo.pop() rel_path = path.relpath( abs_path, start=self.context.full_config_path()) directory, filename = path.split(rel_path) if directory in stack_group_configs: stack_group_config = stack_group_configs[directory] else: stack_group_config = stack_group_configs[directory] = \, self.context.config_file)) stack = self._construct_stack(rel_path, stack_group_config) for dep in stack.dependencies: full_dep = str(Path(self.context.full_config_path(), dep)) if not path.exists(full_dep): raise DependencyDoesNotExistError( "{stackname}: Dependency {dep} not found. " "Please make sure that your dependencies stack_outputs " "have their full path from `config` defined." .format(, dep=dep)) if full_dep not in full_todo and full_dep not in deps_todo: todo.add(full_dep) deps_todo.add(full_dep) stack_map[sceptreise_path(rel_path)] = stack full_command_path = self.context.full_command_path() if abs_path == full_command_path\ or abs_path.startswith(full_command_path.rstrip(path.sep) + path.sep): command_stacks.add(stack) stacks = self.resolve_stacks(stack_map) return stacks, command_stacks
[docs] def resolve_stacks(self, stack_map): """ Transforms map of Stacks into a set of Stacks, transforms dependencies from a list of Strings (stack names) to a list of Stacks. :param stack_map: Map of stacks, containing dependencies as list of Strings. :type base_config: dict :returns: Set of stacks, containing dependencies as list of Stacks. :rtype: set :raises: sceptre.exceptions.DependencyDoesNotExistError """ stacks = set() for stack in stack_map.values(): if not self.context.ignore_dependencies: for i, dep in enumerate(stack.dependencies): try: if not isinstance(dep, Stack): # If the dependency was inherited from a stack group, it might already # have been mapped and so doesn't need to be mapped again. stack.dependencies[i] = stack_map[sceptreise_path(dep)] except KeyError: raise DependencyDoesNotExistError( "{stackname}: Dependency {dep} not found. " "Valid dependency names are: " "{stackkeys}. " "Please make sure that your dependencies stack_outputs " "have their full path from `config` defined." .format(, dep=dep, stackkeys=", ".join(stack_map.keys()))) else: stack.dependencies = [] stacks.add(stack) return stacks
[docs] def read(self, rel_path, base_config=None): """ Reads in configuration from one or more YAML files within the Sceptre project folder. :param rel_path: Relative path to config to read. :type rel_path: str :param base_config: Base config to provide defaults. :type base_config: dict :returns: Config read from config files. :rtype: dict """ self.logger.debug("Reading in '%s' files...", rel_path) directory_path, filename = path.split(rel_path) abs_path = path.join(self.full_config_path, rel_path) # Adding properties from class config = { "project_path": self.context.project_path, "stack_group_path": directory_path } # Adding defaults from base config. if base_config: config.update(base_config) # Check if file exists, but ignore config.yaml as can be inherited. if not path.isfile(abs_path)\ and not filename.endswith(self.context.config_file): raise ConfigFileNotFoundError( "Config file \"{0}\" not found.".format(rel_path) ) # Parse and read in the config files. this_config = self._recursive_read(directory_path, filename, config) if "dependencies" in config or "dependencies" in this_config: this_config['dependencies'] = \ CONFIG_MERGE_STRATEGIES['dependencies']( this_config.get("dependencies"), config.get("dependencies") ) config.update(this_config) self._check_version(config) self.logger.debug("Config: %s", config) return config
def _recursive_read(self, directory_path, filename, stack_group_config): """ Traverses the directory_path, from top to bottom, reading in all relevant config files. If config attributes are encountered further down the StackGroup they are merged with the parent as defined in the `CONFIG_MERGE_STRATEGIES` dict. :param directory_path: Relative directory path to config to read. :type directory_path: str :param filename: File name for the config to read. :type filename: dict :param stack_group_config: The loaded config file for the StackGroup :type stack_group_config: dict :returns: Representation of inherited config. :rtype: dict """ parent_directory = path.split(directory_path)[0] # Base condition for recursion config = {} if directory_path: config = self._recursive_read(parent_directory, filename, stack_group_config) # Combine the stack_group_config with the nested config dict config_group = stack_group_config.copy() config_group.update(config) # Read config file and overwrite inherited properties child_config = self._render(directory_path, filename, config_group) or {} for config_key, strategy in CONFIG_MERGE_STRATEGIES.items(): value = strategy( config.get(config_key), child_config.get(config_key) ) if value: child_config[config_key] = value config.update(child_config) return config def _render(self, directory_path, basename, stack_group_config): """ Reads a configuration file, loads the config file as a template and returns config loaded from the file. :param directory_path: Relative directory path to config to read. :type directory_path: str :param basename: The filename of the config file :type basename: str :param stack_group_config: The loaded config file for the StackGroup :type stack_group_config: dict :returns: rendered template of config file. :rtype: dict """ config = {} abs_directory_path = path.join(self.full_config_path, directory_path) if path.isfile(path.join(abs_directory_path, basename)): default_j2_environment_config = { "autoescape": select_autoescape( disabled_extensions=('yaml',), default=True, ), "loader": FileSystemLoader(abs_directory_path), "undefined": StrictUndefined } j2_environment_config = strategies.dict_merge( default_j2_environment_config, stack_group_config.get("j2_environment", {})) j2_environment = Environment(**j2_environment_config) template = j2_environment.get_template(basename) self.templating_vars.update(stack_group_config) rendered_template = template.render( self.templating_vars, command_path=self.context.command_path.split(path.sep), environment_variable=environ ) try: config = yaml.safe_load(rendered_template) except Exception as err: raise ValueError( "Error parsing {}:\n{}".format(abs_directory_path, err) ) return config @staticmethod def _check_valid_project_path(config_path): """ Raises an InvalidSceptreDirectoryError if ``path`` is not a directory. :param path: A config directory path. :type path: str :raises: sceptre.exceptions.InvalidSceptreDirectoryError """ if not path.isdir(config_path): raise InvalidSceptreDirectoryError( "Check '{0}' exists.".format(config_path) ) def _check_version(self, config): """ Raises a VersionIncompatibleException when the current Sceptre version does not comply with the configured version requirement. :raises: sceptre.exceptions.VersionIncompatibleException """ sceptre_version = __version__ if 'required_version' in config: required_version = config['required_version'] if Version(sceptre_version) not in SpecifierSet(required_version, True): raise VersionIncompatibleError( "Current sceptre version ({0}) does not meet version " "requirements: {1}".format( sceptre_version, required_version ) ) @staticmethod def _collect_s3_details(stack_name, config): """ Collects and constructs details for where to store the Template in S3. :param stack_name: Stack name. :type stack_name: str :param config: Config with details. :type config: dict :returns: S3 details. :rtype: dict """ s3_details = None if "template_bucket_name" in config: template_key = "/".join([ sceptreise_path(stack_name), "{time_stamp}.json".format( time_stamp=datetime.datetime.utcnow().strftime( "%Y-%m-%d-%H-%M-%S-%fZ" ) ) ]) if "template_key_prefix" in config: prefix = config["template_key_prefix"] template_key = "/".join([prefix.strip("/"), template_key]) s3_details = { "bucket_name": config["template_bucket_name"], "bucket_key": template_key } return s3_details def _construct_stack(self, rel_path, stack_group_config=None): """ Constructs an individual Stack object from a config path and a base config. :param rel_path: A relative config file path. :type rel_path: str :param stack_group_config: The Stack group config to use as defaults. :type stack_group_config: dict :returns: Stack object :rtype: sceptre.stack.Stack """ directory, filename = path.split(rel_path) if filename == self.context.config_file: pass self.templating_vars["stack_group_config"] = stack_group_config parsed_stack_group_config = self._parsed_stack_group_config(stack_group_config) config =, stack_group_config) stack_name = path.splitext(rel_path)[0] # Check for missing mandatory attributes for required_key in REQUIRED_KEYS: if required_key not in config: raise InvalidConfigFileError( "Required attribute '{0}' not found in configuration of '{1}'.".format( required_key, stack_name ) ) s3_details = self._collect_s3_details( stack_name, config ) stack = Stack( name=stack_name, project_code=config["project_code"], template_path=config.get("template_path"), template_handler_config=config.get("template"), region=config["region"], template_bucket_name=config.get("template_bucket_name"), template_key_prefix=config.get("template_key_prefix"), required_version=config.get("required_version"), iam_role=config.get("iam_role"), profile=config.get("profile"), parameters=config.get("parameters", {}), sceptre_user_data=config.get("sceptre_user_data", {}), hooks=config.get("hooks", {}), s3_details=s3_details, dependencies=config.get("dependencies", []), role_arn=config.get("role_arn"), protected=config.get("protect", False), tags=config.get("stack_tags", {}), external_name=config.get("stack_name"), notifications=config.get("notifications"), on_failure=config.get("on_failure"), stack_timeout=config.get("stack_timeout", 0), stack_group_config=parsed_stack_group_config ) del self.templating_vars["stack_group_config"] return stack def _parsed_stack_group_config(self, stack_group_config): """ Remove all config items that are supported by Sceptre and remove the `project_path` and `stack_group_path` added by `read()`. Return a dictionary that has only user-specified config items. """ parsed_config = { key: stack_group_config[key] for key in set(stack_group_config) - set(CONFIG_MERGE_STRATEGIES) } parsed_config.pop("stack_group_path") return parsed_config