Source code for botocraft.mixins.ecs

# mypy: disable-error-code="attr-defined"
import re
import warnings
from functools import cached_property, wraps
from typing import TYPE_CHECKING, Any, Callable, Literal, cast

from botocraft.services.abstract import PrimaryBoto3ModelQuerySet

if TYPE_CHECKING:
    from botocraft.services import (
        Cluster,
        Daemon,
        DaemonTaskDefinition,
        DeleteTaskDefinitionsResponse,
        ECRImage,
        ExpressGatewayService,
        Failure,
        Service,
        ServiceDeploymentBrief,
        Task,
        TaskDefinition,
        TaskManager,
    )


# ---------
# Functions
# ---------


def extract_task_family_and_revision(task_definition_arn: str) -> str:
    """
    Extract the task family and revision from a task definition ARN.

    Args:
        task_definition_arn: The ARN of the task definition.

    Returns:
        The task family and revision in the format ``<family>:<revision>``.

    """
    task_definition_arn_re = r"arn:aws:ecs:[^:]+:[^:]+:task-definition/(?P<family>[^:]+):(?P<revision>[0-9]+)"  # noqa: E501
    match = re.match(task_definition_arn_re, task_definition_arn)
    assert match, (
        f"Could not extract task family and revision from {task_definition_arn}"
    )
    return f"{match.group('family')}:{match.group('revision')}"


# ----------
# Decorators
# ----------


# Service


[docs]def ecs_services_only( func: Callable[..., list[str]], ) -> Callable[..., "PrimaryBoto3ModelQuerySet"]: """ Wraps :py:meth:`botocraft.services.ecs.ServiceManager.list` to return a :py:class:`PrimaryBoto3ModelQuerySet` of :py:class:`botocraft.services.ecs.Service` objects instead of only a list of ARNs. """ @wraps(func) def wrapper(self, *args, **kwargs) -> "PrimaryBoto3ModelQuerySet": arns = func(self, *args, **kwargs) services = [] # We have to do this in batches of 10 because the get_many method, # which uses the boto3 ``describe_services`` method, only accepts 10 ARNs # at a time. for i in range(0, len(arns), 10): services.extend( self.get_many( arns[i : i + 10], cluster=kwargs["cluster"], include=["TAGS"] ).results ) return PrimaryBoto3ModelQuerySet(services) return wrapper
# Cluster
[docs]def ecs_clusters_only( func: Callable[..., list[str]], ) -> Callable[..., "PrimaryBoto3ModelQuerySet"]: """ Wraps :py:meth:`botocraft.services.ecs.ClusterManager.list` to return a list of :py:class:`botocraft.services.abstract.PrimaryBoto3ModelQuerySet` of :py:class:`botocraft.services.ecs.Cluster` objects instead of only a list of ARNs. """ @wraps(func) def wrapper(self, *args, **kwargs) -> "PrimaryBoto3ModelQuerySet": arns = func(self, *args, **kwargs) clusters: list[Cluster] = [] # We have to do this in batches of 100 because the get_many method, # which uses the boto3 ``describe_clusters`` method, only accepts 100 ARNs # at a time. for i in range(0, len(arns), 100): qs = self.get_many(clusters=arns[i : i + 100], include=["TAGS"]) clusters.extend( qs.results if isinstance(qs, PrimaryBoto3ModelQuerySet) else qs.clusters # type: ignore[arg-type] ) return PrimaryBoto3ModelQuerySet(clusters) # type: ignore[arg-type] return wrapper
def ecs_task_definition_include_tags( func: Callable[..., "TaskDefinition | None"], ) -> Callable[..., "TaskDefinition | None"]: """ Decorator to convert a :py:class:`botocraft.services.ecs.TaskDefinition` object to a :py:class:`botocraft.services.ecs.TaskDefinition` object with tags included. """ @wraps(func) def wrapper(self, *args, **kwargs) -> "TaskDefinition | None": response = func(self, *args, **kwargs) if not response: return None # If we got a TaskDefinition object, we need to convert it to a # TaskDefinition with tags. _td = response.taskDefinition _td.tags = response.tags return cast("TaskDefinition", _td) return wrapper
[docs]def ecs_task_definitions_only( func: Callable[..., list[str]], ) -> Callable[..., "PrimaryBoto3ModelQuerySet"]: """ Decorator to convert a list of ECS task definition identifiers to a list of :py:class:`botocraft.services.ecs.TaskDefinition` objects. """ @wraps(func) def wrapper(self, *args, **kwargs) -> "PrimaryBoto3ModelQuerySet": identifiers = func(self, *args, **kwargs) responses = [ self.get(identifier, include=["TAGS"]) for identifier in identifiers ] return PrimaryBoto3ModelQuerySet(responses) return wrapper
# ContainerInstance
[docs]def ecs_container_instances_only( func: Callable[..., list[str]], ) -> Callable[..., "PrimaryBoto3ModelQuerySet"]: """ Decorator to convert a list of ECS container instance arns to a :py:class:`botocraft.services.abstract.PrimaryBoto3ModelQuerySet` of :py:class:`botocraft.services.ecs.ContainerInstance` objects. """ def wrapper(self, *args, **kwargs) -> "PrimaryBoto3ModelQuerySet": arns = func(self, *args, **kwargs) container_instances = [] for i in range(0, len(arns), 100): _instances = self.get_many( cluster=kwargs["cluster"], containerInstances=arns[i : i + 100] ) if isinstance(_instances, PrimaryBoto3ModelQuerySet): _instances = _instances.results # If we got a list of ContainerInstanceBrief objects, we need to convert if _instances: container_instances.extend(_instances) return PrimaryBoto3ModelQuerySet(container_instances) return wrapper
def ecs_container_instances_tasks_only( func: Callable[..., list[str]], ) -> Callable[..., "PrimaryBoto3ModelQuerySet"]: """ Decorator to convert a list of ECS container instance arns to a list of :py:class:`botocraft.services.ecs.ContainerInstance` objects. """ @wraps(func) def wrapper(self, *args, **kwargs) -> "PrimaryBoto3ModelQuerySet": from botocraft.services.ecs import Task arns = func(self, *args, **kwargs) tasks: list[Task] = [] for i in range(0, len(arns), 100): tasks.extend( cast("TaskManager", Task.objects).get_many(arns[i : i + 100]).results # type: ignore[arg-type] ) return PrimaryBoto3ModelQuerySet(tasks) # type: ignore[arg-type] return wrapper def ecs_service_deployments_only( func: Callable[..., list["ServiceDeploymentBrief"]], ) -> Callable[..., "PrimaryBoto3ModelQuerySet"]: """ Decorator to convert a list of service deployment arns to a list of :py:class:`botocraft.services.ecs.Deployment` objects. """ @wraps(func) def wrapper(self, *args, **kwargs) -> "PrimaryBoto3ModelQuerySet": from botocraft.services.ecs import ( ServiceDeployment, ServiceDeploymentManager, ) response = func(self, *args, **kwargs) if response is None: return PrimaryBoto3ModelQuerySet([]) arns = [ d.serviceDeploymentArn for d in func(self, *args, **kwargs) if d.serviceDeploymentArn ] deployments: list[ServiceDeployment] = [] for i in range(0, len(arns), 20): _deployments = cast( "ServiceDeploymentManager", ServiceDeployment.objects ).get_many(arns[i : i + 20]) if isinstance(_deployments, PrimaryBoto3ModelQuerySet): _deployments = _deployments.results # type: ignore[assignment] # If we got a list of ServiceDeploymentBrief objects, we need to convert if _deployments: deployments.extend(_deployments) # type: ignore[arg-type] return PrimaryBoto3ModelQuerySet(deployments) # type: ignore[arg-type] return wrapper # Task
[docs]def ecs_task_populate_taskDefinition( func: Callable[..., "Task | None"], ) -> Callable[..., "Task | None"]: """ Wraps :py:meth:`botocraft.services.ecs.TaskManager.get` to populate the :py:attr:`botocraft.services.ecs.Task.taskDefinition` attribute. We set the ``taskDefinition`` attribute to the task family and revision in the format ``<family>:<revision>``. ``taskDefinition`` is an extra field that we add to the :py:class:`botocraft.services.ecs.Task` object that is not in the original botocore shape, but is useful for our purposes. """ @wraps(func) def wrapper(self, *args, **kwargs) -> "Task | None": task = func(self, *args, **kwargs) if task: task.taskDefinition = extract_task_family_and_revision( task.taskDefinitionArn ) return task return wrapper
[docs]def ecs_task_populate_taskDefinitions( func: Callable[..., "PrimaryBoto3ModelQuerySet"], ) -> Callable[..., "PrimaryBoto3ModelQuerySet"]: """ Wraps :py:meth:`botocraft.services.ecs.TaskManager.get_many` to populate the :py:attr:`botocraft.services.ecs.Task.taskDefinition` attribute on each task. We set the ``taskDefinition`` attribute to the task family and revision in the format ``<family>:<revision>``. ``taskDefinition`` is an extra field that we add to the :py:class:`botocraft.services.ecs.Task` object that is not in the original botocore shape, but is useful for our purposes. """ @wraps(func) def wrapper(self, *args, **kwargs) -> "PrimaryBoto3ModelQuerySet": tasks = func(self, *args, **kwargs) for task in tasks: task.taskDefinition = extract_task_family_and_revision( task.taskDefinitionArn ) return tasks return wrapper
[docs]def ecs_tasks_only( func: Callable[..., list[str]], ) -> Callable[..., "PrimaryBoto3ModelQuerySet"]: """ Wrap :py:meth:`botocraft.services.ecs.TaskManager.list` to return a list of :py:class:`botocraft.services.ecs.Task` objects instead of only a list of ARNs. """ @wraps(func) def wrapper(self, *args, **kwargs) -> "PrimaryBoto3ModelQuerySet": arns = func(self, *args, **kwargs) tasks = [] # We have to do this in batches of 100 because the get_many method, # which uses the boto3 ``describe_tasks`` method, only accepts 100 ARNs # at a time. for i in range(0, len(arns), 100): tasks.extend( self.get_many(cluster=kwargs["cluster"], tasks=arns[i : i + 100]) ) return PrimaryBoto3ModelQuerySet(tasks) # type: ignore[arg-type] return wrapper
def ecs_task_definition_delete_all( func: Callable[..., "DeleteTaskDefinitionsResponse"], ) -> Callable[..., "DeleteTaskDefinitionsResponse"]: """ Decorator to delete all task definitions. This is because the :py:meth:`botocraft.services.ecs.TaskDefinitionManager.delete` method only accepts up to 10 task definitions at a time, so we need to delete them in batches of 10 if the user passes in more than 10 task definitions. """ @wraps(func) def wrapper(self, *args, **kwargs) -> "DeleteTaskDefinitionsResponse": # delete_task_definitions only accepts up to 10 task definitions at a time # So we need to delete them in batches from botocraft.services import DeleteTaskDefinitionsResponse if len(args[0]) > 10: # noqa: PLR2004 response: DeleteTaskDefinitionsResponse = DeleteTaskDefinitionsResponse( taskDefinitions=[], failures=[], ) for i in range(0, len(args[0]), 10): _response = func(self, args[0][i : i + 10], **kwargs) if _response.taskDefinitions: cast("list[TaskDefinition]", response.taskDefinitions).extend( _response.taskDefinitions ) if _response.failures: cast("list[Failure]", response.failures).extend(_response.failures) # type: ignore[attr-defined] else: response = func(self, *args, **kwargs) return response return wrapper # Mixins
[docs]class ECSServiceModelMixin: """ A mixin for :py:class:`botocraft.services.ecs.Service` that adds some additional methods that we can't auto generate. """ @property def required_cpu(self) -> int: """ The required CPU for the service in CPU shares. One full CPU is equivalent to 1024 CPU shares. """ cpu: int = 0 td = self.task_definition # type: ignore[attr-defined] if td.cpu: cpu = int(td.cpu) else: for container in td.containerDefinitions: if container.cpu: cpu += container.cpu return cpu @property def required_memory(self) -> int: """ Return the required memory for the service in MiB. """ memory: int = 0 td = self.task_definition # type: ignore[attr-defined] if td.memory: memory = int(td.memory) else: for container in td.containerDefinitions: if container.memory: memory += container.memory return int(memory) @property def container_instances(self) -> "PrimaryBoto3ModelQuerySet": """ Return the :py:class:`botocraft.services.ecs.ContainerInstance` objects which are running our tasks for the service. """ return PrimaryBoto3ModelQuerySet( [task.container_instance for task in self.tasks] # type: ignore[attr-defined] ) @property def is_stable(self) -> bool: """ Return whether the service is stable or not. """ # this is the same test that the `services_stable` waiter uses return len(self.deployments) == 1 and (self.runningCount == self.desiredCount) # type: ignore[attr-defined]
[docs] def wait_until_stable(self, max_attempts: int = 40, delay: int = 15) -> None: """ Wait until the service is stable. Raises: botocore.exceptions.WaiterError: if the service is not stable after ``max_attempts``, or some other error occurred. Keyword Args: max_attempts: The maximum number of attempts to make before giving up. delay: The number of seconds to wait between attempts. """ waiter_config = {} if max_attempts: waiter_config["maxAttempts"] = max_attempts if delay: waiter_config["delay"] = delay if waiter_config: waiter_config["operation"] = "DescribeServices" # type: ignore[assignment] waiter = self.objects.using(self.session).get_waiter( "services_stable", WaiterConfig=waiter_config ) # type: ignore[attr-defined] waiter.wait(cluster=self.clusterArn, services=[self.serviceName]) # type: ignore[attr-defined]
[docs] def scale( self, desired_count: int, wait: bool = False, ) -> None: """ Scale the service to the desired count. If ``wait`` is True, this will wait for the service to reach the desired count using the ``services_stable`` boto3 waiter. Args: desired_count: The number of tasks to run. Keyword Args: wait: If True, wait for the service to reach the desired count. """ self.objects.using(self.session).partial_update( # type: ignore[attr-defined] self.serviceName, # type: ignore[attr-defined] cluster=self.clusterArn, # type: ignore[attr-defined] desiredCount=desired_count, ) waiter = self.objects.using(self.session).get_waiter("services_stable") # type: ignore[attr-defined] if wait: waiter.wait( cluster=self.clusterArn, # type: ignore[attr-defined] services=[self.serviceName], # type: ignore[attr-defined] )
@property def load_balancers(self) -> "PrimaryBoto3ModelQuerySet": """ Return the :py:class:`LoadBalancer` objects that are associated with the service. """ from botocraft.services import LoadBalancer arns: set[str] = set() for tg in self.target_groups: # type: ignore[attr-defined] for arn in tg.LoadBalancerArns: arns.add(arn) if arns: return LoadBalancer.objects.using(self.session).list( LoadBalancerArns=list(arns) ) return PrimaryBoto3ModelQuerySet([]) # type: ignore[arg-type]
class ECSServiceManagerMixin: """ A mixin for :py:class:`botocraft.services.ecs.ServiceManager` that adds some additional methods that we can't auto generate. """ def all( self, launchType: Literal["EC2", "FARGATE", "EXTERNAL"] | None = None, # noqa: N803 schedulingStrategy: Literal["REPLICA", "DAEMON"] | None = None, # noqa: N803 tags: dict[str, str] | None = None, ) -> "PrimaryBoto3ModelQuerySet": """ Return all the services in the account. This differs from :py:meth:`botocraft.services.ServiceManager.list` in that it iterates through all the clusters in the account and gets the services for each cluster. Normally you would expect to use :py:meth:`botocraft.services.ServiceManager.list` to get all the services, but ``describe_services``, on which our method is based, only returns services for a single cluster, so we need to roll our own method. Args: launchType: The launch type of the services to return. schedulingStrategy: The scheduling strategy of the services to return. tags: A dictionary of tags to filter the services Returns: A list of :py:class:`Service` objects. """ from botocraft.services import Cluster, Service if not tags: tags = {} clusters = Cluster.objects.using(self.session).list() services: list["Service"] = [] # noqa: UP037 for cluster in clusters: if tags.items() <= cluster.tags.items(): services.extend( Service.objects.using(self.session).list( cluster=cluster.clusterArn, launchType=launchType, schedulingStrategy=schedulingStrategy, ) ) return PrimaryBoto3ModelQuerySet(services) # type: ignore[arg-type] class CapacityProviderManagerMixin: """ A mixin for :py:class:`botocraft.services.ecs.CapacityProviderManager`. This mixin provides list behavior for capacity providers because ``describe_capacity_providers`` is both the detail endpoint and the only API that can enumerate all capacity providers, including paginated unfiltered results. """ def list( self, *, capacityProviders: list[str] | None = None, # noqa: N803 cluster: str | None = None, include: list[Literal["TAGS"]] | None = None, ) -> "PrimaryBoto3ModelQuerySet": """ Return capacity providers for the given scope. Keyword Args: capacityProviders: Optional capacity provider names or ARNs to resolve directly. cluster: Optional cluster name or ARN to scope the describe call. include: Optional extra fields to include in the response. Returns: A list of :py:class:`botocraft.services.ecs.CapacityProvider` objects. """ from botocraft.services import CapacityProvider if capacityProviders: return self.get_many( # type: ignore[attr-defined] capacityProviders=capacityProviders, cluster=cluster, include=include, ) providers: list[CapacityProvider] = [] next_token: str | None = None while True: args = { "cluster": cluster, "include": include, "maxResults": 100, "nextToken": next_token, } response = self.client.describe_capacity_providers( **{key: value for key, value in args.items() if value is not None} ) page = [ CapacityProvider(**provider) for provider in response.get("capacityProviders", []) ] self.sessionize(page) # type: ignore[attr-defined] providers.extend(page) next_token = response.get("nextToken") if not next_token: break return PrimaryBoto3ModelQuerySet(providers) # type: ignore[arg-type] class TaskSetManagerMixin: """ A mixin for :py:class:`botocraft.services.ecs.TaskSetManager`. This mixin provides a scoped list operation because ``describe_task_sets`` requires service and cluster context, requires a non-empty ``taskSets`` argument at runtime, and does not have a standalone paginator-backed list API. .. note:: Task sets apply only to services that use the ``EXTERNAL`` deployment controller. Services that use the default ``ECS`` deployment controller (including rolling and blue/green deployments managed by ECS) do not expose task sets on :py:meth:`~botocraft.services.ecs.ServiceManager.get_many` or :py:attr:`~botocraft.services.ecs.Service.taskSets`, so :py:meth:`list` and :py:attr:`~botocraft.services.ecs.Service.task_sets` typically return empty results for those services. """ def list( self, *, service: str, cluster: str, include: list[Literal["TAGS"]] | None = None, taskSets: list[str] | None = None, # noqa: N803 ) -> "PrimaryBoto3ModelQuerySet": """ Return task sets for a service and cluster. Task sets exist only for ``EXTERNAL`` deployment-controller services. For ``ECS`` deployment-controller services, omitting ``taskSets`` is expected and returns an empty queryset without calling AWS. Keyword Args: service: The service name or ARN that owns the task sets. cluster: The cluster name or ARN that owns the task sets. include: Optional extra fields to include in the response. taskSets: Task set names or ARNs to describe. Although botocore marks this argument optional, ECS rejects omitted, ``None``, and empty values with ``InvalidParameterException``. Returns: A list of :py:class:`botocraft.services.ecs.TaskSet` objects. """ if not taskSets: return PrimaryBoto3ModelQuerySet([]) # type: ignore[arg-type] return self.get_many( # type: ignore[attr-defined] cluster=cluster, service=service, include=include, taskSets=taskSets, ) class ServiceRevisionManagerMixin: """ A mixin for :py:class:`botocraft.services.ecs.ServiceRevisionManager`. This mixin preserves relation ergonomics for service revisions by exposing a list method that delegates to ``get_many`` with ARN batches. """ def list( self, *, serviceRevisionArns: list[str] | None = None, # noqa: N803 ) -> "PrimaryBoto3ModelQuerySet": """ Return service revisions for the given ARNs. Keyword Args: serviceRevisionArns: The service revision ARNs to describe. Returns: A list of :py:class:`botocraft.services.ecs.ServiceRevision` objects. """ if not serviceRevisionArns: return PrimaryBoto3ModelQuerySet([]) # type: ignore[arg-type] return self.get_many( # type: ignore[attr-defined] serviceRevisionArns=serviceRevisionArns ) class DaemonManagerMixin: """ A mixin for :py:class:`botocraft.services.ecs.DaemonManager`. This mixin preserves a model-centric daemon contract even though the ECS daemon APIs return partial objects for create, update, delete, and list. """ def _create_args(self, model: "Daemon") -> dict[str, Any]: """ Build create-daemon request arguments from a daemon model. Args: model: The daemon model to serialize. Returns: The request arguments for ``create_daemon``. """ args: dict[str, Any] = { "daemonName": model.daemonName, "clusterArn": model.clusterArn, "daemonTaskDefinitionArn": model.daemonTaskDefinitionArn, "capacityProviderArns": model.capacityProviderArns, "propagateTags": model.propagateTags, "enableECSManagedTags": model.enableECSManagedTags, "enableExecuteCommand": model.enableExecuteCommand, } if model.deploymentConfiguration: args["deploymentConfiguration"] = self.serialize( model.deploymentConfiguration ) if model.Tags: args["tags"] = self.serialize(model.Tags) return {key: value for key, value in args.items() if value is not None} def _update_args(self, model: "Daemon") -> dict[str, Any]: """ Build update-daemon request arguments from a daemon model. Args: model: The daemon model to serialize. Returns: The request arguments for ``update_daemon``. """ args: dict[str, Any] = { "daemonArn": model.daemonArn, "daemonTaskDefinitionArn": model.daemonTaskDefinitionArn, "capacityProviderArns": model.capacityProviderArns, "propagateTags": model.propagateTags, "enableECSManagedTags": model.enableECSManagedTags, "enableExecuteCommand": model.enableExecuteCommand, } if model.deploymentConfiguration: args["deploymentConfiguration"] = self.serialize( model.deploymentConfiguration ) return {key: value for key, value in args.items() if value is not None} def get_many( self, daemonArns: list[str], # noqa: N803 ) -> "PrimaryBoto3ModelQuerySet": """ Return daemons for the given ARNs. Args: daemonArns: The daemon ARNs to hydrate. Returns: A list of :py:class:`botocraft.services.ecs.Daemon` objects. """ daemons = [self.get(daemonArn=daemon_arn) for daemon_arn in daemonArns] # type: ignore[attr-defined] return PrimaryBoto3ModelQuerySet([daemon for daemon in daemons if daemon]) def list( self, *, clusterArn: str | None = None, # noqa: N803 capacityProviderArns: list[str] | None = None, # noqa: N803 ) -> "PrimaryBoto3ModelQuerySet": """ Return daemons for the given ECS scope. Keyword Args: clusterArn: Optional cluster ARN to scope the daemon listing. capacityProviderArns: Optional capacity provider ARNs to filter by. Returns: A list of :py:class:`botocraft.services.ecs.Daemon` objects. """ daemon_arns: list[str] = [] next_token: str | None = None while True: args = { "clusterArn": clusterArn, "capacityProviderArns": capacityProviderArns, "maxResults": 100, "nextToken": next_token, } response = self.client.list_daemons( **{key: value for key, value in args.items() if value is not None} ) daemon_arns.extend( summary["daemonArn"] for summary in response.get("daemonSummariesList", []) if summary.get("daemonArn") ) next_token = response.get("nextToken") if not next_token: break daemons = [self.get(daemonArn=daemon_arn) for daemon_arn in daemon_arns] # type: ignore[attr-defined] return PrimaryBoto3ModelQuerySet( [cast("Daemon", daemon) for daemon in daemons if daemon] ) def create(self, model: "Daemon") -> "Daemon": """ Create a daemon and return hydrated model. Args: model: The daemon to create. Side Effects: Calls ``create_daemon`` against AWS. Returns: The created :py:class:`botocraft.services.ecs.Daemon`. """ response = self.client.create_daemon(**self._create_args(model)) return self.get(daemonArn=response["daemonArn"]) # type: ignore[attr-defined] def update(self, model: "Daemon") -> "Daemon": """ Update a daemon and return hydrated model. Args: model: The daemon to update. Side Effects: Calls ``update_daemon`` against AWS. Returns: The updated :py:class:`botocraft.services.ecs.Daemon`. """ response = self.client.update_daemon(**self._update_args(model)) return self.get(daemonArn=response["daemonArn"]) # type: ignore[attr-defined] def delete(self, model: "Daemon") -> "Daemon": """ Delete a daemon and return pre-delete snapshot. Args: model: The daemon to delete. Side Effects: Calls ``delete_daemon`` against AWS. Returns: The prefetched :py:class:`botocraft.services.ecs.Daemon`. """ daemon = self.get(daemonArn=model.daemonArn) # type: ignore[attr-defined] self.client.delete_daemon(daemonArn=model.daemonArn) return cast("Daemon", daemon) class DaemonTaskDefinitionManagerMixin: """ A mixin for :py:class:`botocraft.services.ecs.DaemonTaskDefinitionManager`. This mixin hydrates identifier-centric daemon task definition workflows into full Botocraft models. """ def _create_args(self, model: "DaemonTaskDefinition") -> dict[str, Any]: """ Build register-daemon-task-definition request arguments. Args: model: The daemon task definition model to serialize. Returns: The request arguments for ``register_daemon_task_definition``. """ args: dict[str, Any] = { "family": model.family, "taskRoleArn": model.taskRoleArn, "executionRoleArn": model.executionRoleArn, "containerDefinitions": self.serialize(model.containerDefinitions), "cpu": model.cpu, "memory": model.memory, } if model.volumes: args["volumes"] = self.serialize(model.volumes) if model.Tags: args["tags"] = self.serialize(model.Tags) return {key: value for key, value in args.items() if value is not None} def get_many( self, daemonTaskDefinitions: list[str], # noqa: N803 ) -> "PrimaryBoto3ModelQuerySet": """ Return daemon task definitions for the given identifiers. Args: daemonTaskDefinitions: The daemon task definition identifiers to hydrate. Returns: A list of :py:class:`botocraft.services.ecs.DaemonTaskDefinition` objects. """ task_definitions = [ self.get(daemonTaskDefinition=identifier) # type: ignore[attr-defined] for identifier in daemonTaskDefinitions ] return PrimaryBoto3ModelQuerySet( [task_definition for task_definition in task_definitions if task_definition] ) def list( self, *, familyPrefix: str | None = None, # noqa: N803 family: str | None = None, revision: str | None = None, status: str | None = None, sort: str | None = None, ) -> "PrimaryBoto3ModelQuerySet": """ Return daemon task definitions for the given ECS scope. Keyword Args: familyPrefix: Optional family prefix filter. family: Optional exact family filter. revision: Optional revision filter. status: Optional status filter. sort: Optional sort order. Returns: A list of :py:class:`botocraft.services.ecs.DaemonTaskDefinition` objects. """ identifiers: list[str] = [] next_token: str | None = None while True: args = { "familyPrefix": familyPrefix, "family": family, "revision": revision, "status": status, "sort": sort, "maxResults": 100, "nextToken": next_token, } response = self.client.list_daemon_task_definitions( **{key: value for key, value in args.items() if value is not None} ) identifiers.extend( summary["daemonTaskDefinitionArn"] for summary in response.get("daemonTaskDefinitions", []) if summary.get("daemonTaskDefinitionArn") ) next_token = response.get("nextToken") if not next_token: break return self.get_many(identifiers) def create(self, model: "DaemonTaskDefinition") -> "DaemonTaskDefinition": """ Register daemon task definition and return hydrated model. Args: model: The daemon task definition to register. Side Effects: Calls ``register_daemon_task_definition`` against AWS. Returns: The created :py:class:`botocraft.services.ecs.DaemonTaskDefinition`. """ response = self.client.register_daemon_task_definition( **self._create_args(model) ) return self.get( # type: ignore[attr-defined] daemonTaskDefinition=response["daemonTaskDefinitionArn"] ) def delete(self, model: "DaemonTaskDefinition") -> "DaemonTaskDefinition": """ Delete daemon task definition and return pre-delete snapshot. Args: model: The daemon task definition to delete. Side Effects: Calls ``delete_daemon_task_definition`` against AWS. Returns: The prefetched :py:class:`botocraft.services.ecs.DaemonTaskDefinition`. """ task_definition = self.get( # type: ignore[attr-defined] daemonTaskDefinition=model.daemonTaskDefinitionArn ) self.client.delete_daemon_task_definition( daemonTaskDefinition=model.daemonTaskDefinitionArn ) return cast("DaemonTaskDefinition", task_definition) class DaemonDeploymentManagerMixin: """ A mixin for :py:class:`botocraft.services.ecs.DaemonDeploymentManager`. This mixin models daemon deployments as a scoped read-only list surface. """ def list( self, *, daemonArn: str, # noqa: N803 status: list[str] | None = None, createdAt: dict[str, Any] | None = None, # noqa: N803 nextToken: str | None = None, # noqa: N803 maxResults: int | None = None, # noqa: N803 ) -> "PrimaryBoto3ModelQuerySet": """ Return daemon deployments for a daemon scope. Keyword Args: daemonArn: The daemon ARN that owns the deployments. status: Optional deployment status filters. createdAt: Optional created-at filter object accepted by AWS. nextToken: Optional pagination token. maxResults: Optional page size. Returns: A list of :py:class:`botocraft.services.ecs.DaemonDeployment` objects. """ from botocraft.services import DaemonDeployment args = { "daemonArn": daemonArn, "status": status, "createdAt": createdAt, "nextToken": nextToken, "maxResults": maxResults, } response = self.client.list_daemon_deployments( **{key: value for key, value in args.items() if value is not None} ) deployments = [ DaemonDeployment(**deployment) for deployment in response.get("daemonDeployments", []) ] self.sessionize(deployments) # type: ignore[attr-defined] return PrimaryBoto3ModelQuerySet(deployments) # type: ignore[arg-type] class ExpressGatewayServiceManagerMixin: """ A mixin for :py:class:`botocraft.services.ecs.ExpressGatewayServiceManager`. This mixin preserves scoped list semantics and hydrates partial update responses into full Botocraft models. """ def _update_args(self, model: "ExpressGatewayService") -> dict[str, Any]: """ Build update-express-gateway-service request arguments. Args: model: The Express gateway service model to serialize. Returns: The request arguments for ``update_express_gateway_service``. """ args = { "serviceArn": model.serviceArn, "executionRoleArn": model.executionRoleArn, "healthCheckPath": model.healthCheckPath, "primaryContainer": self.serialize(model.primaryContainer), "taskRoleArn": model.taskRoleArn, "networkConfiguration": self.serialize(model.networkConfiguration), "cpu": model.cpu, "memory": model.memory, "scalingTarget": self.serialize(model.scalingTarget), } return {key: value for key, value in args.items() if value is not None} def list( self, *, namespace: str, nextToken: str | None = None, # noqa: N803 maxResults: int | None = None, # noqa: N803 ) -> "PrimaryBoto3ModelQuerySet": """ Return Express gateway services for a namespace scope. Keyword Args: namespace: The namespace that scopes the service list. nextToken: Optional pagination token. maxResults: Optional page size. Returns: A list of :py:class:`botocraft.services.ecs.ExpressGatewayService` objects. """ services = [] args = { "namespace": namespace, "nextToken": nextToken, "maxResults": maxResults, } response = self.client.list_services_by_namespace( **{key: value for key, value in args.items() if value is not None} ) for service_arn in response.get("serviceArns", []): service = self.get(serviceArn=service_arn) # type: ignore[attr-defined] if service: services.append(service) return PrimaryBoto3ModelQuerySet(services) def update(self, model: "ExpressGatewayService") -> "ExpressGatewayService": """ Update Express gateway service and return hydrated model. Args: model: The Express gateway service to update. Side Effects: Calls ``update_express_gateway_service`` against AWS. Returns: The updated :py:class:`botocraft.services.ecs.ExpressGatewayService`. """ response = self.client.update_express_gateway_service( **self._update_args(model) ) service_arn = response["service"]["serviceArn"] return self.get(serviceArn=service_arn) # type: ignore[attr-defined]
[docs]class ECSContainerInstanceModelMixin: @property def free_cpu(self) -> int: """ Return the free CPU shares on the container instance. One full CPU is equivalent to 1024 CPU shares. """ value: int = 0 for resource in self.remainingResources: # type: ignore[attr-defined] if resource.name == "CPU": value = int(resource.integerValue) return value @property def free_ram(self) -> int: """ Return the free RAM in MiB on the container instance. """ value: int = 0 for resource in self.remainingResources: # type: ignore[attr-defined] if resource.name == "MEMORY": value = int(resource.integerValue) return value
class TaskDefinitionManagerMixin: def in_use( self, tags: dict[str, str] | None = None, ) -> "PrimaryBoto3ModelQuerySet": """ Return a list of task definitions that are currently in use by a service or periodic task. A periodic task is a task that is run via a :py:class:`botocraft.services.events.EventRule`. Important: If you have tasks that are run ad-hoc, then this method will not return those task definitions. Keyword Args: tags: A dictionary of tags to filter the task, services and periodic tasks by. Default: None verbose: If True, print out some information about what is happening. Returns: A list of :py:class:`botocraft.services.ecs.TaskDefinition` objects that are currently in use. """ from botocraft.services import ( EventRule, Service, TaskDefinition, ) if not tags: tags = {} task_definitions: dict[str, TaskDefinition] = {} # First get all the services in the account services: list[Service] = Service.objects.using(self.session).all() ClientException = self.session.client("ecs").exceptions.ClientException # noqa: N806 # Now iterate through each service and get the append its task definition # to the list of task definitions if we have not already seen it for service in services: try: task_definition = cast("TaskDefinition", service.task_definition) except ClientException: warnings.warn( f"Task definition {service.taskDefinition} used by " f"{service.cluster_name}:{service.serviceName} does not exist", UserWarning, stacklevel=2, ) continue family_revision = task_definition.family_revision if family_revision not in task_definitions: task_definitions[family_revision] = task_definition # Now deal with the periodc tasks rules = EventRule.objects.using(self.session).list() for rule in rules: for target in rule.targets: if target.EcsParameters is not None: try: task_definition = TaskDefinition.objects.using( self.session ).get(target.EcsParameters.TaskDefinitionArn) except ClientException: warnings.warn( f"Task definition {target.EcsParameters.TaskDefinitionArn} " f"used by {rule.name} does not exist", UserWarning, stacklevel=2, ) continue family_revision = task_definition.family_revision if family_revision not in task_definitions: task_definitions[family_revision] = task_definition return PrimaryBoto3ModelQuerySet(list(task_definitions.values())) # type: ignore[arg-type] class TaskDefinitionModelMixin: @property def family_revision(self) -> str: """ Return the family and revision of the task definition in the format ``<family>:<revision>``. """ return f"{self.family}:{self.revision}" @property def image_objects(self) -> list["ECRImage"]: """ Return the :class:`~botocraft.services.ecr.ECRImage` objects that this task definition uses across all its container definitions. Returns: A list of :class:`~botocraft.services.ecr.ECRImage` objects. """ return [container.image_object for container in self.containerDefinitions] @property def images(self) -> list[str]: """ Return the container images as a list of strings. Returns: A list of container images. """ return [container.image for container in self.containerDefinitions] # type: ignore[attr-defined] @property def container_images(self) -> list[str]: """ Return the container images as a list of strings. Returns: A list of container images. """ return [container.image for container in self.containerDefinitions] # type: ignore[attr-defined] @cached_property def services(self) -> list["Service"]: """ Return the services that use this task definition revision. Warning: This will be quite slow because we need to all our services to see if there is a service that uses that task definition. There's no way to get all the services in an account, so we have to list all the clusters, then check each cluster for services, and see if the service uses this task definition. Returns: A list of :py:class:`botocraft.services.ecs.Service` objects that use this task definition. """ from botocraft.services import Cluster, Service clusters = Cluster.objects.using(self.session).list() services: list[Service] = [] for cluster in clusters: services.extend( service for service in cluster.services if service.taskDefinition == self.family_revision ) return services def delete(self) -> None: """ Delete the task definition. We're overriding the default delete method because in this case, the manager method accepts a list of task definitions to delete, so we need to pass in the task definition ARN as a list. """ self.objects.using(self.session).delete([self.taskDefinitionArn]) class ServiceDeploymentModelMixin: """ A mixin for :py:class:`botocraft.services.ecs.ServiceDeployment` that adds some additional methods that we can't auto generate. """ @property def source_task_definitions(self) -> "PrimaryBoto3ModelQuerySet": """ Return the task definition for the deployment. """ from botocraft.services import TaskDefinition arns = [source.arn for source in self.sourceServiceRevisions] return PrimaryBoto3ModelQuerySet( [TaskDefinition.objects.using(self.session).get(arn) for arn in arns] ) # type: ignore[arg-type]