Source code for sceptre.plan.actions

# -*- coding: utf-8 -*-

"""
sceptre.plan.actions

This module implements the StackActions class which provides the functionality
available to a Stack.
"""

import json
import logging
import time
import typing
import urllib
import botocore

from datetime import datetime, timedelta
from dateutil.tz import tzutc
from os import path

from sceptre.connection_manager import ConnectionManager

from sceptre.exceptions import (
    CannotUpdateFailedStackError,
    ProtectedStackError,
    StackDoesNotExistError,
    UnknownStackChangeSetStatusError,
    UnknownStackStatusError,
)
from sceptre.helpers import extract_datetime_from_aws_response_headers
from sceptre.hooks import add_stack_hooks, add_stack_hooks_with_aliases
from sceptre.stack import Stack
from sceptre.stack_status import StackChangeSetStatus, StackStatus

from typing import Dict, Optional, Tuple, Union

if typing.TYPE_CHECKING:
    from sceptre.diffing.stack_differ import StackDiff, StackDiffer


[docs]class StackActions: """ StackActions stores the operations a Stack can take, such as creating or deleting the Stack. :param stack: A Stack object :type stack: sceptre.stack.Stack """ def __init__(self, stack: Stack): self.stack = stack self.name = self.stack.name self.logger = logging.getLogger(__name__) self.connection_manager = ConnectionManager( self.stack.region, self.stack.profile, self.stack.external_name, self.stack.sceptre_role, self.stack.sceptre_role_session_duration, )
[docs] @add_stack_hooks def create(self): """ Creates a Stack. :returns: The Stack's status. :rtype: sceptre.stack_status.StackStatus """ self._protect_execution() self.logger.info("%s - Creating Stack", self.stack.name) create_stack_kwargs = { "StackName": self.stack.external_name, "Parameters": self._format_parameters(self.stack.parameters), "Capabilities": [ "CAPABILITY_IAM", "CAPABILITY_NAMED_IAM", "CAPABILITY_AUTO_EXPAND", ], "NotificationARNs": self.stack.notifications, "Tags": [ {"Key": str(k), "Value": str(v)} for k, v in self.stack.tags.items() ], } # can specify either DisableRollback or OnFailure , but not both if self.stack.disable_rollback: create_stack_kwargs.update({"DisableRollback": self.stack.disable_rollback}) elif self.stack.on_failure: create_stack_kwargs.update({"OnFailure": self.stack.on_failure}) create_stack_kwargs.update(self.stack.template.get_boto_call_parameter()) create_stack_kwargs.update(self._get_role_arn()) create_stack_kwargs.update(self._get_stack_timeout()) try: response = self.connection_manager.call( service="cloudformation", command="create_stack", kwargs=create_stack_kwargs, ) self.logger.debug( "%s - Create stack response: %s", self.stack.name, response ) status = self._wait_for_completion(boto_response=response) except botocore.exceptions.ClientError as exp: if exp.response["Error"]["Code"] == "AlreadyExistsException": self.logger.info("%s - Stack already exists", self.stack.name) status = StackStatus.COMPLETE else: raise return status
[docs] @add_stack_hooks def update(self): """ Updates the Stack. :returns: The Stack's status. :rtype: sceptre.stack_status.StackStatus """ self._protect_execution() self.logger.info("%s - Updating Stack", self.stack.name) try: update_stack_kwargs = { "StackName": self.stack.external_name, "Parameters": self._format_parameters(self.stack.parameters), "Capabilities": [ "CAPABILITY_IAM", "CAPABILITY_NAMED_IAM", "CAPABILITY_AUTO_EXPAND", ], "NotificationARNs": self.stack.notifications, "Tags": [ {"Key": str(k), "Value": str(v)} for k, v in self.stack.tags.items() ], } update_stack_kwargs.update(self.stack.template.get_boto_call_parameter()) update_stack_kwargs.update(self._get_role_arn()) response = self.connection_manager.call( service="cloudformation", command="update_stack", kwargs=update_stack_kwargs, ) status = self._wait_for_completion( self.stack.stack_timeout, boto_response=response ) self.logger.debug( "%s - Update Stack response: %s", self.stack.name, response ) # Cancel update after timeout if status == StackStatus.IN_PROGRESS: status = self.cancel_stack_update() return status except botocore.exceptions.ClientError as exp: error_message = exp.response["Error"]["Message"] if error_message == "No updates are to be performed.": self.logger.info("%s - No updates to perform.", self.stack.name) return StackStatus.COMPLETE else: raise
[docs] def cancel_stack_update(self): """ Cancels a Stack update. :returns: The cancelled Stack status. :rtype: sceptre.stack_status.StackStatus """ self.logger.warning( "%s - Update Stack time exceeded the specified timeout", self.stack.name ) response = self.connection_manager.call( service="cloudformation", command="cancel_update_stack", kwargs={"StackName": self.stack.external_name}, ) self.logger.debug( "%s - Cancel update Stack response: %s", self.stack.name, response ) return self._wait_for_completion(boto_response=response)
[docs] @add_stack_hooks def launch(self) -> StackStatus: """ Launches the Stack. If the Stack status is create_failed or rollback_complete, the Stack is deleted. Launch then tries to create or update the Stack, depending if it already exists. If there are no updates to be performed, launch exits gracefully. :returns: The Stack's status. """ self._protect_execution() self.logger.info(f"{self.stack.name} - Launching Stack") try: existing_status = self._get_status() except StackDoesNotExistError: existing_status = "PENDING" self.logger.info( "%s - Stack is in the %s state", self.stack.name, existing_status ) if existing_status == "PENDING": status = self.create() elif existing_status in ["CREATE_FAILED", "ROLLBACK_COMPLETE"]: self.delete() status = self.create() elif existing_status.endswith("COMPLETE"): status = self.update() elif existing_status.endswith("IN_PROGRESS"): self.logger.info( "%s - Stack action is already in progress state and cannot " "be updated", self.stack.name, ) status = StackStatus.IN_PROGRESS elif existing_status.endswith("FAILED"): raise CannotUpdateFailedStackError( "'{0}' is in a the state '{1}' and cannot be updated".format( self.stack.name, existing_status ) ) else: raise UnknownStackStatusError("{0} is unknown".format(existing_status)) return status
[docs] @add_stack_hooks def delete(self): """ Deletes the Stack. :returns: The Stack's status. :rtype: sceptre.stack_status.StackStatus """ self._protect_execution() self.logger.info("%s - Deleting stack", self.stack.name) try: status = self._get_status() except StackDoesNotExistError: self.logger.info("%s - Does not exist.", self.stack.name) status = StackStatus.COMPLETE return status delete_stack_kwargs = {"StackName": self.stack.external_name} delete_stack_kwargs.update(self._get_role_arn()) response = self.connection_manager.call( service="cloudformation", command="delete_stack", kwargs=delete_stack_kwargs ) try: status = self._wait_for_completion(boto_response=response) except StackDoesNotExistError: status = StackStatus.COMPLETE except botocore.exceptions.ClientError as error: if error.response["Error"]["Message"].endswith("does not exist"): status = StackStatus.COMPLETE else: raise self.logger.info("%s - delete %s", self.stack.name, status) return status
[docs] def lock(self): """ Locks the Stack by applying a deny-all updates Stack Policy. """ policy_path = path.join( # need to get to the base install path. __file__ will take us into # sceptre/actions so need to walk up the path. path.abspath(path.join(__file__, "..", "..")), "stack_policies/lock.json", ) self.set_policy(policy_path) self.logger.info("%s - Successfully locked Stack", self.stack.name)
[docs] def unlock(self): """ Unlocks the Stack by applying an allow-all updates Stack Policy. """ policy_path = path.join( # need to get to the base install path. __file__ will take us into # sceptre/actions so need to walk up the path. path.abspath(path.join(__file__, "..", "..")), "stack_policies/unlock.json", ) self.set_policy(policy_path) self.logger.info("%s - Successfully unlocked Stack", self.stack.name)
[docs] def describe(self): """ Returns the a description of the Stack. :returns: A Stack description. :rtype: dict """ try: return self.connection_manager.call( service="cloudformation", command="describe_stacks", kwargs={"StackName": self.stack.external_name}, ) except botocore.exceptions.ClientError as e: if e.response["Error"]["Message"].endswith("does not exist"): return raise
[docs] def describe_events(self): """ Returns the CloudFormation events for a Stack. :returns: CloudFormation events for a Stack. :rtype: dict """ return self.connection_manager.call( service="cloudformation", command="describe_stack_events", kwargs={"StackName": self.stack.external_name}, )
[docs] def describe_resources(self): """ Returns the logical and physical resource IDs of the Stack's resources. :returns: Information about the Stack's resources. :rtype: dict """ self.logger.debug("%s - Describing stack resources", self.stack.name) try: response = self.connection_manager.call( service="cloudformation", command="describe_stack_resources", kwargs={"StackName": self.stack.external_name}, ) except botocore.exceptions.ClientError as e: if e.response["Error"]["Message"].endswith("does not exist"): return {self.stack.name: []} raise self.logger.debug( "%s - Describe Stack resource response: %s", self.stack.name, response ) desired_properties = ["LogicalResourceId", "PhysicalResourceId"] formatted_response = { self.stack.name: [ {k: v for k, v in item.items() if k in desired_properties} for item in response["StackResources"] ] } return formatted_response
[docs] def describe_outputs(self): """ Returns the Stack's outputs. :returns: The Stack's outputs. :rtype: list """ self.logger.debug("%s - Describing stack outputs", self.stack.name) try: response = self._describe() except botocore.exceptions.ClientError: return [] return {self.stack.name: response["Stacks"][0].get("Outputs", [])}
[docs] def continue_update_rollback(self): """ Rolls back a Stack in the UPDATE_ROLLBACK_FAILED state to UPDATE_ROLLBACK_COMPLETE. """ self.logger.debug("%s - Continuing update rollback", self.stack.name) continue_update_rollback_kwargs = {"StackName": self.stack.external_name} continue_update_rollback_kwargs.update(self._get_role_arn()) self.connection_manager.call( service="cloudformation", command="continue_update_rollback", kwargs=continue_update_rollback_kwargs, ) self.logger.info( "%s - Successfully initiated continuation of update rollback", self.stack.name, )
[docs] def set_policy(self, policy_path): """ Applies a Stack Policy. :param policy_path: The relative path of JSON file containing\ the AWS Policy to apply. :type policy_path: str """ with open(policy_path) as f: policy = f.read() self.logger.debug("%s - Setting Stack policy: \n%s", self.stack.name, policy) self.connection_manager.call( service="cloudformation", command="set_stack_policy", kwargs={"StackName": self.stack.external_name, "StackPolicyBody": policy}, ) self.logger.info("%s - Successfully set Stack Policy", self.stack.name)
[docs] def get_policy(self): """ Returns a Stack's Policy. :returns: The Stack's Stack Policy. :rtype: str """ self.logger.debug("%s - Getting Stack Policy", self.stack.name) response = self.connection_manager.call( service="cloudformation", command="get_stack_policy", kwargs={"StackName": self.stack.external_name}, ) json_formatting = json.loads( response.get("StackPolicyBody", json.dumps("No Policy Information")) ) return {self.stack.name: json_formatting}
[docs] @add_stack_hooks def create_change_set(self, change_set_name): """ Creates a Change Set with the name ``change_set_name``. :param change_set_name: The name of the Change Set. :type change_set_name: str """ create_change_set_kwargs = { "StackName": self.stack.external_name, "Parameters": self._format_parameters(self.stack.parameters), "Capabilities": [ "CAPABILITY_IAM", "CAPABILITY_NAMED_IAM", "CAPABILITY_AUTO_EXPAND", ], "ChangeSetName": change_set_name, "NotificationARNs": self.stack.notifications, "Tags": [ {"Key": str(k), "Value": str(v)} for k, v in self.stack.tags.items() ], } create_change_set_kwargs.update(self.stack.template.get_boto_call_parameter()) create_change_set_kwargs.update(self._get_role_arn()) self.logger.debug( "%s - Creating Change Set '%s'", self.stack.name, change_set_name ) self.connection_manager.call( service="cloudformation", command="create_change_set", kwargs=create_change_set_kwargs, ) # After the call successfully completes, AWS CloudFormation # starts creating the Change Set. self.logger.info( "%s - Successfully initiated creation of Change Set '%s'", self.stack.name, change_set_name, )
[docs] def delete_change_set(self, change_set_name): """ Deletes the Change Set ``change_set_name``. :param change_set_name: The name of the Change Set. :type change_set_name: str """ self.logger.debug( "%s - Deleting Change Set '%s'", self.stack.name, change_set_name ) self.connection_manager.call( service="cloudformation", command="delete_change_set", kwargs={ "ChangeSetName": change_set_name, "StackName": self.stack.external_name, }, ) # If the call successfully completes, AWS CloudFormation # successfully deleted the Change Set. self.logger.info( "%s - Successfully deleted Change Set '%s'", self.stack.name, change_set_name, )
[docs] def describe_change_set(self, change_set_name): """ Describes the Change Set ``change_set_name``. :param change_set_name: The name of the Change Set. :type change_set_name: str :returns: The description of the Change Set. :rtype: dict """ self.logger.debug( "%s - Describing Change Set '%s'", self.stack.name, change_set_name ) return self.connection_manager.call( service="cloudformation", command="describe_change_set", kwargs={ "ChangeSetName": change_set_name, "StackName": self.stack.external_name, }, )
[docs] def execute_change_set(self, change_set_name): """ Executes the Change Set ``change_set_name``. :param change_set_name: The name of the Change Set. :type change_set_name: str :returns: The Stack status :rtype: str """ self._protect_execution() change_set = self.describe_change_set(change_set_name) status = change_set.get("Status") reason = change_set.get("StatusReason") if status == "FAILED" and self.change_set_creation_failed_due_to_no_changes( reason ): self.logger.info( "Skipping ChangeSet on Stack: {} - there are no changes".format( change_set.get("StackName") ) ) return 0 self.logger.debug( "%s - Executing Change Set '%s'", self.stack.name, change_set_name ) response = self.connection_manager.call( service="cloudformation", command="execute_change_set", kwargs={ "ChangeSetName": change_set_name, "StackName": self.stack.external_name, }, ) status = self._wait_for_completion(boto_response=response) return status
[docs] def change_set_creation_failed_due_to_no_changes(self, reason: str) -> bool: """Indicates the change set failed when it was created because there were actually no changes introduced from the change set. :param reason: The reason reported by CloudFormation for the Change Set failure """ reason = reason.lower() no_change_substrings = ( "submitted information didn't contain changes", "no updates are to be performed", # The reason returned for SAM templates ) for substring in no_change_substrings: if substring in reason: return True return False
[docs] def list_change_sets(self, url=False): """ Lists the Stack's Change Sets. :param url: Write out a console URL instead. :type url: bool :returns: The Stack's Change Sets. :rtype: dict or list """ response = self._list_change_sets() summaries = response.get("Summaries", []) if url: summaries = self._convert_to_url(summaries) return {self.stack.name: summaries}
def _list_change_sets(self): self.logger.debug("%s - Listing change sets", self.stack.name) try: return self.connection_manager.call( service="cloudformation", command="list_change_sets", kwargs={"StackName": self.stack.external_name}, ) except botocore.exceptions.ClientError: return [] def _convert_to_url(self, summaries): """ Convert the list_change_sets response from CloudFormation to a URL in the AWS Console. """ new_summaries = [] for summary in summaries: stack_id = summary["StackId"] change_set_id = summary["ChangeSetId"] region = self.stack.region encoded = urllib.parse.urlencode( {"stackId": stack_id, "changeSetId": change_set_id} ) new_summaries.append( f"https://{region}.console.aws.amazon.com/cloudformation/home?" f"region={region}#/stacks/changesets/changes?{encoded}" ) return new_summaries
[docs] def generate(self): """ Returns the Template for the Stack. An alias for dump_template for historical reasons. """ return self.dump_template()
[docs] @add_stack_hooks def validate(self): """ Validates the Stack's CloudFormation Template. Raises an error if the Template is invalid. :returns: Validation information about the Template. :rtype: dict :raises: botocore.exceptions.ClientError """ self.logger.debug("%s - Validating Template", self.stack.name) response = self.connection_manager.call( service="cloudformation", command="validate_template", kwargs=self.stack.template.get_boto_call_parameter(), ) self.logger.debug( "%s - Validate Template response: %s", self.stack.name, response ) return response
[docs] def estimate_cost(self): """ Estimates a Stack's cost. :returns: An estimate of the Stack's cost. :rtype: dict :raises: botocore.exceptions.ClientError """ self.logger.debug("%s - Estimating template cost", self.stack.name) parameters = [ {"ParameterKey": key, "ParameterValue": value} for key, value in self.stack.parameters.items() ] kwargs = self.stack.template.get_boto_call_parameter() kwargs.update({"Parameters": parameters}) response = self.connection_manager.call( service="cloudformation", command="estimate_template_cost", kwargs=kwargs ) self.logger.debug( "%s - Estimate Stack cost response: %s", self.stack.name, response ) return response
[docs] def get_status(self): """ Returns the Stack's status. :returns: The Stack's status. :rtype: sceptre.stack_status.StackStatus """ try: return self._get_status() except StackDoesNotExistError: return "PENDING"
def _format_parameters(self, parameters): """ Converts CloudFormation parameters to the format used by Boto3. :param parameters: A dictionary of parameters. :type parameters: dict :returns: A list of the formatted parameters. :rtype: list """ formatted_parameters = [] for name, value in parameters.items(): if value is None: continue if isinstance(value, list): value = ",".join(value) formatted_parameters.append({"ParameterKey": name, "ParameterValue": value}) return formatted_parameters def _get_role_arn(self): """ Returns the Role ARN assumed by CloudFormation when building a Stack. Returns an empty dict if no Role is to be assumed. :returns: The a Role ARN :rtype: dict """ if self.stack.cloudformation_service_role: return {"RoleARN": self.stack.cloudformation_service_role} else: return {} def _get_stack_timeout(self): """ Return the timeout before considering the Stack to be failing. Returns an empty dict if no timeout is set. :returns: the creation/update timeout :rtype: dict """ if self.stack.stack_timeout: return {"TimeoutInMinutes": self.stack.stack_timeout} else: return {} def _protect_execution(self): """ Raises a ProtectedStackError if protect == True. :raises: sceptre.exceptions.ProtectedStackError """ if self.stack.protected: raise ProtectedStackError( "Cannot perform action on '{0}': Stack protection is " "currently enabled".format(self.stack.name) ) def _wait_for_completion( self, timeout=0, boto_response: Optional[dict] = None ) -> StackStatus: """ Waits for a Stack operation to finish. Prints CloudFormation events while it waits. :param timeout: Timeout before returning, in minutes. :param boto_response: Response from the boto call which initiated the stack change. :returns: The final Stack status. """ timeout = 60 * timeout def timed_out(elapsed): return elapsed >= timeout if timeout else False status = StackStatus.IN_PROGRESS most_recent_event_datetime = extract_datetime_from_aws_response_headers( boto_response ) or (datetime.now(tzutc()) - timedelta(seconds=3)) elapsed = 0 while status == StackStatus.IN_PROGRESS and not timed_out(elapsed): status = self._get_simplified_status(self._get_status()) most_recent_event_datetime = self._log_new_events( most_recent_event_datetime ) time.sleep(4) elapsed += 4 return status def _describe(self): return self.connection_manager.call( service="cloudformation", command="describe_stacks", kwargs={"StackName": self.stack.external_name}, ) def _get_status(self): try: status = self._describe()["Stacks"][0]["StackStatus"] except botocore.exceptions.ClientError as exp: if exp.response["Error"]["Message"].endswith("does not exist"): raise StackDoesNotExistError(exp.response["Error"]["Message"]) else: raise exp return status @staticmethod def _get_simplified_status(status): """ Returns the simplified Stack Status. The simplified Stack status is represented by the struct ``sceptre.StackStatus()`` and can take one of the following options: * complete * in_progress * failed :param status: The CloudFormation Stack status to simplify. :type status: str :returns: The Stack's simplified status :rtype: sceptre.stack_status.StackStatus """ if status.endswith("ROLLBACK_COMPLETE"): return StackStatus.FAILED elif status.endswith("_COMPLETE"): return StackStatus.COMPLETE elif status.endswith("_IN_PROGRESS"): return StackStatus.IN_PROGRESS elif status.endswith("_FAILED"): return StackStatus.FAILED else: raise UnknownStackStatusError("{0} is unknown".format(status)) def _log_new_events(self, after_datetime: datetime) -> datetime: """ Log the latest Stack events while the Stack is being built. :param after_datetime: Only events after this datetime will be logged. :returns: The datetime of the last logged event or after_datetime if no events were logged. """ events = self.describe_events()["StackEvents"] events.reverse() new_events = [event for event in events if event["Timestamp"] > after_datetime] for event in new_events: stack_event_status = [ self.stack.name, event["LogicalResourceId"], event["ResourceType"], event["ResourceStatus"], event.get("ResourceStatusReason", ""), ] if "HookStatus" in event: stack_event_status.extend( [ event["HookType"], event["HookStatus"], event.get("HookStatusReason", ""), event["HookFailureMode"], ] ) self.logger.info(" ".join(stack_event_status)) after_datetime = event["Timestamp"] return after_datetime
[docs] def wait_for_cs_completion(self, change_set_name): """ Waits while the Stack Change Set status is "pending". :param change_set_name: The name of the Change Set. :type change_set_name: str :returns: The Change Set's status. :rtype: sceptre.stack_status.StackChangeSetStatus """ while True: status = self._get_cs_status(change_set_name) if status != StackChangeSetStatus.PENDING: break time.sleep(2) return status
def _get_cs_status(self, change_set_name): """ Returns the status of a Change Set. :param change_set_name: The name of the Change Set. :type change_set_name: str :returns: The Change Set's status. :rtype: sceptre.stack_status.StackChangeSetStatus """ cs_description = self.describe_change_set(change_set_name) cs_status = cs_description["Status"] cs_exec_status = cs_description["ExecutionStatus"] possible_statuses = [ "CREATE_PENDING", "CREATE_IN_PROGRESS", "CREATE_COMPLETE", "DELETE_COMPLETE", "FAILED", ] possible_execution_statuses = [ "UNAVAILABLE", "AVAILABLE", "EXECUTE_IN_PROGRESS", "EXECUTE_COMPLETE", "EXECUTE_FAILED", "OBSOLETE", ] if cs_status not in possible_statuses: raise UnknownStackChangeSetStatusError( "Status {0} is unknown".format(cs_status) ) if cs_exec_status not in possible_execution_statuses: raise UnknownStackChangeSetStatusError( "ExecutionStatus {0} is unknown".format(cs_status) ) if cs_status == "CREATE_COMPLETE" and cs_exec_status == "AVAILABLE": return StackChangeSetStatus.READY elif cs_status in [ "CREATE_PENDING", "CREATE_IN_PROGRESS", "CREATE_COMPLETE", ] and cs_exec_status in ["UNAVAILABLE", "AVAILABLE"]: return StackChangeSetStatus.PENDING elif cs_status in ["DELETE_COMPLETE", "FAILED"] or cs_exec_status in [ "EXECUTE_IN_PROGRESS", "EXECUTE_COMPLETE", "EXECUTE_FAILED", "OBSOLETE", ]: return StackChangeSetStatus.DEFUNCT else: # pragma: no cover raise Exception("This else should not be reachable.")
[docs] def fetch_remote_template(self) -> Optional[str]: """ Returns the Template for the remote Stack :returns: the template body. """ self.logger.debug(f"{self.stack.name} - Fetching remote template") original_template = self._fetch_original_template_stage() if isinstance(original_template, dict): # While not documented behavior, boto3 will attempt to deserialize the TemplateBody # with json.loads and return the template as a dict if it is successful; otherwise (such # as in when the template is in yaml, it will return the string. Therefore, we need to # dump the template to json if we get a dict. original_template = json.dumps(original_template, indent=4) return original_template
def _fetch_original_template_stage(self) -> Optional[Union[str, dict]]: try: response = self.connection_manager.call( service="cloudformation", command="get_template", kwargs={ "StackName": self.stack.external_name, "TemplateStage": "Original", }, ) return response["TemplateBody"] # Sometimes boto returns a string, sometimes a dictionary except botocore.exceptions.ClientError as e: # AWS returns a ValidationError if the stack doesn't exist if e.response["Error"]["Code"] == "ValidationError": return None raise
[docs] def fetch_remote_template_summary(self): return self._get_template_summary(StackName=self.stack.external_name)
[docs] def fetch_local_template_summary(self): boto_call_parameter = self.stack.template.get_boto_call_parameter() return self._get_template_summary(**boto_call_parameter)
def _get_template_summary(self, **kwargs) -> Optional[dict]: try: template_summary = self.connection_manager.call( service="cloudformation", command="get_template_summary", kwargs=kwargs ) return template_summary except botocore.exceptions.ClientError as e: error_response = e.response["Error"] if ( error_response["Code"] == "ValidationError" and "does not exist" in error_response["Message"] ): return None raise
[docs] @add_stack_hooks def diff(self, stack_differ: "StackDiffer") -> "StackDiff": """ Returns a diff of local and deployed template and stack configuration using a specific diff library. :param stack_differ: The differ to use :returns: A StackDiff object with the full, computed diff """ return stack_differ.diff(self)
[docs] @add_stack_hooks def drift_detect(self) -> Dict[str, str]: """ Show stack drift for a running stack. :returns: The stack drift detection status. If the stack does not exist, we return a detection and stack drift status of STACK_DOES_NOT_EXIST. If drift detection times out after 5 minutes, we return TIMED_OUT. """ try: self._get_status() except StackDoesNotExistError: self.logger.info(f"{self.stack.name} - Does not exist.") return { "DetectionStatus": "STACK_DOES_NOT_EXIST", "StackDriftStatus": "STACK_DOES_NOT_EXIST", } response = self._detect_stack_drift() detection_id = response["StackDriftDetectionId"] try: response = self._wait_for_drift_status(detection_id) except TimeoutError as exc: self.logger.info(f"{self.stack.name} - {exc}") response = {"DetectionStatus": "TIMED_OUT", "StackDriftStatus": "TIMED_OUT"} return response
[docs] @add_stack_hooks def drift_show(self, drifted: bool = False) -> Tuple[str, dict]: """ Detect drift status on stacks. :param drifted: Filter out IN_SYNC resources. :returns: The detection status and resource drifts. """ response = self.drift_detect() detection_status = response["DetectionStatus"] if detection_status in ["DETECTION_COMPLETE", "DETECTION_FAILED"]: response = self._describe_stack_resource_drifts() elif detection_status in ["TIMED_OUT", "STACK_DOES_NOT_EXIST"]: response = {"StackResourceDriftStatus": detection_status} else: raise Exception("Not expected to be reachable") response = self._filter_drifts(response, drifted) return (detection_status, response)
def _wait_for_drift_status(self, detection_id: str) -> dict: """ Waits for drift detection to complete. :param detection_id: The drift detection ID. :returns: The response from describe_stack_drift_detection_status. """ timeout = 300 sleep_interval = 10 elapsed = 0 while True: if elapsed >= timeout: raise TimeoutError(f"Timed out after {elapsed} seconds") self.logger.info(f"{self.stack.name} - Waiting for drift detection") response = self._describe_stack_drift_detection_status(detection_id) detection_status = response["DetectionStatus"] self._log_drift_status(response) if detection_status == "DETECTION_IN_PROGRESS": time.sleep(sleep_interval) elapsed += sleep_interval else: return response def _log_drift_status(self, response: dict) -> None: """ Log the drift status while waiting for drift detection to complete. """ keys = [ "StackDriftDetectionId", "DetectionStatus", "DetectionStatusReason", "StackDriftStatus", ] for key in keys: if key in response: self.logger.debug(f"{self.stack.name} - {key} - {response[key]}") def _detect_stack_drift(self) -> dict: """ Run detect_stack_drift. """ self.logger.info(f"{self.stack.name} - Detecting Stack Drift") return self.connection_manager.call( service="cloudformation", command="detect_stack_drift", kwargs={"StackName": self.stack.external_name}, ) def _describe_stack_drift_detection_status(self, detection_id: str) -> dict: """ Run describe_stack_drift_detection_status. """ self.logger.info(f"{self.stack.name} - Describing Stack Drift Detection Status") return self.connection_manager.call( service="cloudformation", command="describe_stack_drift_detection_status", kwargs={"StackDriftDetectionId": detection_id}, ) def _describe_stack_resource_drifts(self) -> dict: """ Detects stack resource_drifts for a running stack. """ self.logger.info(f"{self.stack.name} - Describing Stack Resource Drifts") return self.connection_manager.call( service="cloudformation", command="describe_stack_resource_drifts", kwargs={"StackName": self.stack.external_name}, ) def _filter_drifts(self, response: dict, drifted: bool) -> dict: """ The filtered response after filtering out StackResourceDriftStatus. :param drifted: Filter out IN_SYNC resources from CLI --drifted. """ if "StackResourceDrifts" not in response: return response result = {"StackResourceDrifts": []} include_all_drift_statuses = not drifted for drift in response["StackResourceDrifts"]: is_drifted = drift["StackResourceDriftStatus"] != "IN_SYNC" if include_all_drift_statuses or is_drifted: result["StackResourceDrifts"].append(drift) return result
[docs] @add_stack_hooks def dump_config(self): """ Dump the config for a stack. """ return self.stack.config
[docs] @add_stack_hooks_with_aliases([generate.__name__]) def dump_template(self): """ Dump the template for the Stack. An alias for generate for historical reasons. """ return self.stack.template.body