Source code for botocraft.mixins.autoscaling

# mypy: disable-error-code="attr-defined"

import time
from collections import OrderedDict
from typing import TYPE_CHECKING, ClassVar

import boto3
from botocore.exceptions import WaiterError

from botocraft.services.abstract import PrimaryBoto3ModelQuerySet

if TYPE_CHECKING:
    from botocraft.services import (
        AutoScalingGroupManager,
    )


[docs]class AutoScalingGroupModelMixin: """ A mixin for :py:class:`AutoScalingGroup` that adds some convenience methods. Sometimes we like full :py:class:`Instance` objects instead of the :py:class:`AutoScalingInstanceReference` objects that get listed on :py:attr:`AutoScalingGroup.Instances`. EC2 is weird and doesn't have an easy way to list instances in an autoscaling group except to use one of the ``Filter`` parameters on ``describe_instances``. Instances that are part of an autoscaling group have a tag called ``aws:autoscaling:groupName`` whose value is the name of the autoscaling group. We can use this to filter instances that belong to a particular autoscaling group. This method is too specialized at the moment to be included as one of the transformers for model related objects, so we'll just add it as a mixin. """ objects: ClassVar["AutoScalingGroupManager"] session: boto3.session.Session AutoScalingGroupName: str MinSize: int MaxSize: int @property def ec2_instances(self) -> PrimaryBoto3ModelQuerySet: """ Return the running :py:class:`Instance` objects that belong to this group, if any. """ # Avoid circular import from botocraft.services.ec2 import Instance if self.AutoScalingGroupName: pk = OrderedDict( Filters=[ { "Name": "tag:aws:autoscaling:groupName", "Values": [self.AutoScalingGroupName], }, {"Name": "instance-state-name", "Values": ["running"]}, ] ) return Instance.objects.using(self.session).list(**pk) # type: ignore[attr-defined] return PrimaryBoto3ModelQuerySet([]) @property def is_stable(self) -> bool: """ Return ``True`` if the autoscaling group is stable, ``False`` otherwise. An autoscaling group is considered stable if all instances are running and healthy and the DesiredCapacity is equal to the number of instances. """ ec2_instances = self.ec2_instances if len(ec2_instances) == self.DesiredCapacity: # type: ignore[attr-defined] # check if all instances are in service instance_ids = [instance.InstanceId for instance in ec2_instances] details = self.objects.using(self.session).instance_status( InstanceIds=instance_ids ) if all(detail.HealthStatus == "HEALTHY" for detail in details): return True return False
[docs] def wait_until_stable(self, max_attempts: int = 40, delay: int = 15) -> None: """ Since there is no waiter for this, we'll use this method to wait until the autoscaling group is stable. Raises: botocore.exceptions.WaiterError: If the autoscaling group is not stable after ``max_attempts``. Keyword Args: max_attempts: The maximum number of attempts to make before giving up. delay: The number of seconds to wait between attempts. """ from botocraft.services import AutoScalingGroupsType wait_count: int = 0 # There is no waiter for this, so we'll just poll until the desired # count is reached, or we reach max_attempts. while True: asg = self.objects.using(self.session).get( AutoScalingGroupName=self.AutoScalingGroupName ) assert asg is not None, ( "AutoScalingGroup.wait_until_stable(): Autoscaling group not found." ) if wait_count >= max_attempts: reason = "Max attempts exceeded" raise WaiterError( name="asg_stable", reason=reason, last_response=AutoScalingGroupsType( AutoScalingGroups=[asg] ).model_dump(), # type: ignore[arg-type] ) if asg.is_stable: break wait_count += 1 time.sleep(delay)
[docs] def scale( self, desired_count: int, wait: bool = False, max_attempts: int = 40, ) -> None: """ Scale the autoscaling group to the desired count. Args: desired_count: The number of tasks to run. Keyword Args: wait: If True, wait for the service to reach the desired count. max_attempts: The maximum number of attempts to make before giving up """ if desired_count < self.MinSize: msg = ( f"desired_count must be greater than or equal to MinSize, " f"which is {self.MinSize}." ) raise ValueError(msg) if desired_count > self.MaxSize: msg = ( f"desired_count must be less than or equal to MaxSize, " f"which is {self.MaxSize}." ) raise ValueError(msg) self.objects.using(self.session).scale(self.AutoScalingGroupName, desired_count) time.sleep(10) if wait: wait_count: int = 0 # There is no waiter for this, so we'll just poll until the desired # count is reached, or we reach max_attempts. while True: if wait_count >= max_attempts: msg = ( f"Reached max attempts of {max_attempts} to reach desired " f"count of {desired_count}." ) raise TimeoutError(msg) if self.is_stable: break wait_count += 1 time.sleep(5)