Source code for botocraft.mixins.ecr

# mypy: disable-error-code="attr-defined"

import base64
import datetime
import re
from functools import cached_property, wraps
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    ClassVar,
    Literal,
    cast,
)

import click
import docker
from pydantic import BaseModel

if TYPE_CHECKING:
    from botocraft.services import (
        ECRImage,
        ECRImageManager,
        ImageIdentifier,
        Repository,
        RepositoryManager,
    )
    from botocraft.services.abstract import PrimaryBoto3ModelQuerySet  # noqa: TC004


class ECRDockerClient(BaseModel):
    """
    A return type suitable for the docker client.

    We need to return a docker client that is logged into our ECR registry,
    along with the username, password, and registry, because you need the
    latter 3 to do any pulling or pushing of images.
    """

    #: The docker client.
    client: Any
    #: The username to use for the remote registry.
    username: str
    #: The password to use for the remote registry.
    password: str
    #: The registry
    registry: str


[docs]class ImageInfo(BaseModel): """ A class to hold information about a :py:class:`botocraft.services.ecr.Image` that is not available from the boto3 library. We extract this information by pulling the image from the repository and inspecting it with the docker Python library. Important: You must have the docker daemon running to use the methods that return this object. """ # The image name, including the registry, repository, and tag. name: str #: The OS platform of the image platform: str #: The architecture of the image architecture: str #: Size of the image in bytes size: int #: This is a dictionary of port mappings. The key is the port #: and the value is i'm not sure what ports: dict[str, dict[str, Any]] = {} #: Docker Version used to build the image docker_version: str #: The user that the image runs as user: str | None = None #: When the image was created, as a UTC datetime object created: datetime.datetime
# ----------- # Decorators # -----------
[docs]def repo_list_images_ecr_images_only( func: Callable[..., "PrimaryBoto3ModelQuerySet"], ) -> Callable[..., "PrimaryBoto3ModelQuerySet"]: """ Convert a list of ECR image identifiers returned by :py:meth:`botocraft.services.ecr.RepositoryManager.list_images` into a list of :py:class:`botocraft.services.ecr.Image` objects. """ @wraps(func) def wrapper(self, *args, **kwargs) -> "PrimaryBoto3ModelQuerySet": from botocraft.services.abstract import PrimaryBoto3ModelQuerySet qs: "PrimaryBoto3ModelQuerySet" = func(self, *args, **kwargs) # noqa: UP037 images: list["ECRImage"] = [] # noqa: UP037 # NOTE: to be honest i'm not sure if there is a per request limit # for the number of images that can be retrieved, but i'm going to # assume that there is a limit of 100 images per request. for i in range(0, len(qs.results), 100): _images = self.get_images( repositoryName=kwargs["repositoryName"], imageIds=qs.results[i : i + 100], ) if _images: images.extend(_images) return PrimaryBoto3ModelQuerySet(images) # type: ignore[arg-type] return wrapper
def repo_list_add_tags( func: Callable[..., "PrimaryBoto3ModelQuerySet"], ) -> Callable[..., "PrimaryBoto3ModelQuerySet"]: """ Add tags to all :py:class:`botocraft.services.ecr.Repository` objects returned by :py:meth:`botocraft.services.ecr.RepositoryManager.list`. This has to be done in a separate call because the tags are not returned in the response from the get call. """ @wraps(func) def wrapper(self, *args, **kwargs) -> "PrimaryBoto3ModelQuerySet": qs: PrimaryBoto3ModelQuerySet = func(self, *args, **kwargs) extras = kwargs.get("include", []) if "TAGS" in extras: for repo in qs.results: tags = self.get_tags(resourceArn=repo.arn) if tags: repo.tags = tags return qs return wrapper def repo_get_add_tags( func: Callable[..., "Repository | None"], ) -> Callable[..., "Repository | None"]: """ Add tags to a :py:class:`botocraft.services.ecr.Repository` object returned by :py:meth:`botocraft.services.ecr.RepositoryManager.get`. This has to be done in a separate call because the tags are not returned in the response from the get call. """ @wraps(func) def wrapper(self, *args, **kwargs) -> "Repository | None": repo = func(self, *args, **kwargs) if repo is None: return None extras = kwargs.get("include", []) if "TAGS" in extras: tags = self.get_tags(resourceArn=repo.arn) if tags: repo.tags = tags return repo return wrapper
[docs]def image_list_images_ecr_images_only( func: Callable[..., "PrimaryBoto3ModelQuerySet"], ) -> Callable[..., "list[ECRImage]"]: """ Convert a list of ECR image identifiers returned by :py:meth:`botocraft.services.ecr.Image.list` into a list of :py:class:`botocraft.services.ecr.Image` objects. """ @wraps(func) def wrapper(self, *args, **kwargs) -> "list[ECRImage]": qs: "PrimaryBoto3ModelQuerySet" = func(self, *args, **kwargs) # noqa: UP037 images: list["ECRImage"] = [] # noqa: UP037 # NOTE: to be honest i'm not sure if there is a per request limit # for the number of images that can be retrieved, but i'm going to # assume that there is a limit of 100 images per request. for i in range(0, len(qs.results), 100): _images = self.get_many( repositoryName=args[0], imageIds=qs.results[i : i + 100] ) if _images: images.extend(_images.images) return PrimaryBoto3ModelQuerySet(images) # type: ignore[arg-type] return wrapper
# ------------- # Mixins # -------------
[docs]class RepositoryMixin: objects: ClassVar["RepositoryManager"] # properties @property def images(self) -> "PrimaryBoto3ModelQuerySet": """ Get a list of images for a given repository. """ return self.objects.using(self.session).list_images( repositoryName=self.repositoryName ) # type: ignore[attr-defined] # methods
[docs] def get_image(self, imageId: "ImageIdentifier") -> "ECRImage | None": # noqa: N803 """ Get an image object for a given repository and image identifier. Args: imageId: The image ID or tag to describe. The format of the imageId reference is ``imageTag=tag`` or ``imageDigest=digest`` """ return self.objects.using(self.session).get_image( self.repositoryName, # type: ignore[attr-defined] imageId=imageId, )
class ECRImageManagerMixin: @cached_property def account_id(self) -> str: """ Get the account id for the current session. """ from botocraft.services import CallerIdentity return CallerIdentity.objects.using(self.session).get().Account def __filter_image( self, image_id: str, repositoryNames: list[str] | None = None, # noqa: N803 repositoryPrefix: str | None = None, # noqa: N803 tags: dict[str, str] | None = None, ) -> "ECRImage | None": """ Filter an image by repository name, prefix, or tags. If no filters are provided, then the image is returned. Raises: LookupError: If the image is not found in this account, either because its from another account, it is a public-ecr image in this account, or it doesn't exist. Args: image_id: the image id of the image we want to examine repositoryNames: a list of repository names to filter by repositoryPrefix: a prefix to filter the repositories by tags: a dictionary of tags to filter the image by Returns: The :py:class:`botocraft.services.ecr.ECRImage` object if the image passes the filters, otherwise None. """ from botocraft.services import ImageIdentifier if tags is None: tags = {} image: "ECRImage" | None = None # noqa: UP037 # See if this is even an ECR image if not image_id.startswith(self.account_id): if not re.match(r"\d{12}\.dkr\.ecr\..+\.amazonaws\.com", image_id): msg = f"Image {image_id} is from a different AWS account. Skipping." raise LookupError(msg) # TODO: we should also check our own public ECR repositories msg = f"Image {image_id} is not an ECR image. Skipping." raise LookupError(msg) # break image_id into its parts: repository_name, image_tag image_tag = image_id.split(":")[1] repository_name = image_id.split(".com/")[1].split(":", maxsplit=1)[0] _image = self.using(self.session).get( repository_name, imageId=ImageIdentifier(imageTag=image_tag), ) if not _image: msg = f"Image {image_id} belongs to this AWS account, but does not exist." raise LookupError(msg) if repositoryNames or repositoryPrefix: if repositoryNames: if _image.repositoryName in repositoryNames: image = _image if repositoryPrefix: if _image.repositoryName.startswith(repositoryPrefix): image = _image elif tags: if tags.items() <= _image.repository.tags.items(): image = _image return image def in_use( # noqa: PLR0912 self, repositoryNames: list[str] | None = None, # noqa: N803 repositoryPrefix: str | None = None, # noqa: N803 tags: dict[str, str] | None = None, verbose: bool = False, ) -> "PrimaryBoto3ModelQuerySet": """ Return a list of :py:class:`botocraft.services.ECRImage` objects are currently in use by a task definition. Important: Purge any task definitions that are not in use before running this method, otherwise you will get a lot of false positives and keep images from being deleted. Keyword Args: repositoryNames: Look at only the repositories with these names. This and ``repositoryPrefix`` are mutually exclusive. repositoryPrefix: A prefix to filter the repositories by. tags: A dictionary of tags to filter the task, services and periodic tasks by. verbose: If True, print out some information about what is happening. Returns: A list of :py:class:`botocraft.services.ecs.TaskDefinition` objects that are currently in use. """ from botocraft.services import ( TaskDefinition, ) assert not (repositoryNames and repositoryPrefix), ( "You can't use both repositoryNames and repositoryPrefix at the same time." ) if not tags: tags = {} # I'd like to use a set() here, but pydantic classes are not hashable # unless they are frozen, which ECRImage is not used_images: dict[str, "ECRImage"] = {} # noqa: UP037 # First get all the task definitions and add their images to the # used_images dictionary if verbose: click.secho( "Listing all task definitions ...", fg="green", ) tds = TaskDefinition.objects.using(self.session).list() if verbose: click.secho( f"Found {len(tds)} task definitions. Adding their images to the " "used_images dictionary...", fg="green", ) for td in tds: for image_object in td.image_objects: if image_object is None: # An image object will be None if the task definition has a # container definition that uses an image that does not # exist. if verbose: click.secho( f"Task definition {td.family_revision} has a None " "image object. Which means the image does not exist.", fg="red", ) for cd in td.containerDefinitions: if cd.image_object is not None: continue click.secho( f" Container definition: {cd.name}", fg="cyan", ) click.secho( f" Image: {cd.image}", fg="cyan", ) click.secho( f" Image object: {cd.image_object}", fg="cyan", ) continue if image_object.image_name in used_images: continue used_images[image_object.image_name] = image_object # Now filter the images by the repository names, prefix, or tags, and # add the remaining images to the _used_images dictionary if verbose: click.secho( "Filtering the images by the repository names, prefix, or tags...", fg="green", ) _used_images = used_images.copy() for image in used_images.values(): # Filter out images that are not in the list of desired repository # names or match the repository prefix. if repositoryNames and image.repositoryName not in repositoryNames: del _used_images[image.image] if repositoryPrefix and not image.repositoryName.startswith( repositoryPrefix ): del _used_images[image.image] # Image themselves don't have AWS tags, so instead we will look at # the repository tags to see if it matches the supplied tags. repo = image.repository for tag in tags: if tag not in repo.tags or tags[tag] != repo.tags[tag]: del _used_images[image.image] # Now we should have a list of images that are in use by task definitions, # so we can return a list of ECRImage objects as a PrimaryBoto3ModelQuerySet. return PrimaryBoto3ModelQuerySet(list(used_images.values())) # type: ignore[arg-type]
[docs]class ECRImageMixin: """ Add a bunch of support for inspecting ECR images and getting information from them that AWS does not provide. This is done by using the docker Python library to pull the image and inspect it. Note: I don't love doing this because it is not pure AWS, which was my intention for botocraft, but I need these features for business purposes and they are not available in the boto3 library. """ objects: ClassVar["ECRImageManager"] repositoryName: str | None imageId: "ImageIdentifier" @property def version(self) -> str: """ Get the version of the image. """ return cast("str", self.imageId.imageTag) @property def name(self) -> str: """ Get the name of the image. """ if self.imageId.imageTag is None: return f"{self.repository.repositoryUri}:{self.imageId.imageDigest}" # type: ignore[attr-defined] return f"{self.repository.repositoryUri}:{self.imageId.imageTag}" # type: ignore[attr-defined] @property def image_name(self) -> str: """ Return just the image name, excluding the registry. """ if self.imageId.imageTag is None: return f"{self.repository.repositoryName}:{self.imageId.imageDigest}" # type: ignore[attr-defined] return f"{self.repository.repositoryName}:{self.imageId.imageTag}" # type: ignore[attr-defined] @property def is_pulled(self) -> bool: """ Check if the image is pulled. Returns: ``True`` if the image is pulled, ``False`` otherwise. """ ecr_client = self.docker_client exists = False if ecr_client.client.images.list(self.name): exists = True ecr_client.client.close() return exists @property def dockerd_is_running(self) -> bool: """ Check if the docker daemon is running. We need dockerd to be running to perform these operations: * :py:meth:`docker_client` * :py:meth:`pull` * :py:meth:`is_pulled` * :py:meth:`info` * :py:meth:`docker_image` * :py:meth:`history` * :py:meth:`clean` * :py:meth:`clean_other_versions` """ try: docker.from_env() except docker.errors.DockerException: return False return True @property def docker_client(self) -> ECRDockerClient: """ Return a docker client, logged into our ECR registry. Raises: RuntimeError: If the docker daemon is not running. Returns: A :py:class:`botocraft.mixins.ecr.ECRDockerClient` object, which has a docker client, username, password, and registry. """ if not self.dockerd_is_running: msg = "Docker daemon is not running, so this command is not available." raise RuntimeError(msg) docker_client = docker.from_env() # Get our authorization token from AWS response = self.objects.using(self.session).client.get_authorization_token() # type: ignore[attr-defined] auth_token = base64.b64decode( response["authorizationData"][0]["authorizationToken"] ) username, password = auth_token.decode().split(":") registry = response["authorizationData"][0]["proxyEndpoint"] bare_registry = registry.split("//")[1] docker_client.login(username, password=password, registry=registry, reauth=True) return ECRDockerClient( client=docker_client, username=username, password=password, registry=bare_registry, ) @property def info(self) -> ImageInfo: """ Return information about the image. We're doing this by pulling the image from the repository and inspecting it. Note: I'd love to get the base image for this image, but there is no direct way to do it. You would to look up the layers for the image, get the sha256 hash of the first layer (which is the base image), then look in in various repositories to find the image that has the same layer, then get that image's name. That seems stupid hard to do, especially if the base image is in the ECR registry of another AWS account. Raises: RuntimeError: If the docker daemon is not running. Returns: A :py:class:`botocraft.services.ecr.ImageInfo` object. """ ecr_client = self.docker_client data = ecr_client.client.api.inspect_image(self.name) # you can't be logged into two ECR registries at the same time for some reason # so we need to log out of the registry we are using. ecr_client.client.close() # Strip off the nanoseconds from the created date so that strptime can # parse it. created_date = data["Created"].split(".")[0] + "Z" return ImageInfo( name=data["RepoTags"][0], platform=data["Os"], architecture=data["Architecture"], size=data["Size"], docker_version=data["DockerVersion"], user=data["Config"]["User"], ports=data["Config"]["ExposedPorts"], # Created date looks like: '2024-08-19T21:59:57', convert # that to a datetime object. created=datetime.datetime.strptime(created_date, "%Y-%m-%dT%H:%M:%SZ"), # noqa: DTZ007 ) @cached_property def docker_image(self) -> docker.models.images.Image: """ Return the :py:class:`docker.models.images.Image` object for this image. Raises: RuntimeError: If the docker daemon is not running. """ ecr_client = self.docker_client if not self.is_pulled: docker_image = ecr_client.client.images.pull( f"{ecr_client.registry}/{self.repositoryName}", auth_config={ "username": ecr_client.username, "password": ecr_client.password, }, tag=self.imageId.imageTag, ) else: docker_image = ecr_client.client.images.get(self.name) ecr_client.client.close() return docker_image @cached_property def history(self) -> list[dict[str, Any]]: """ Return the build history for this image. You can use this to reconstruct **most** of the Dockerfile that was used to build the image. You won't have the ``FROM`` line, but you can get most of the rest of it. Raises: RuntimeError: If the docker daemon is not running. """ return self.docker_image.history()
[docs] def clean(self) -> None: """ Remove the image from our local docker storage, if it exists. Raises: RuntimeError: If the docker daemon is not running. """ if self.is_pulled: ecr_client = self.docker_client ecr_client.client.images.remove(self.name) ecr_client.client.close()
[docs] def clean_other_versions(self) -> None: """ Remove the all images for this repository except for the one with our version. Raises: RuntimeError: If the docker daemon is not running. """ ecr_client = self.docker_client prefix = f"{ecr_client.registry}/{self.repositoryName}" images = ecr_client.client.images.list(prefix) for image in images: if self.name not in image.tags: ecr_client.client.images.remove(f"{prefix}:{image.imageTag}") ecr_client.client.close()
[docs] def task_definitions( self, status: Literal["ACTIVE", "INACTIVE", "ALL"] | None = "ACTIVE", tags: dict[str, str] | None = None, verbose: bool = False, ) -> "PrimaryBoto3ModelQuerySet": """ Return a list of ECS task definitions that use this image. Warning: This will be quite slow if you have a lot of families and revisions, because the only way to deal with this is to get all the task definition families, and then look at each revision to see if one of its containers uses this image. There is no way to filter the task definitions by image. Args: status: The status of the task definition to filter by. Valid values are ``ACTIVE``, ``INACTIVE``, or ``ALL``. The default is ``ACTIVE``. tags: A dictionary of tags to filter by. The default is an empty dictionary. verbose: If ``True``, print out the task definition family and revision that uses this image. The default is ``False``. Returns: A list of ECS task definitions that use this image. """ from botocraft.services import TaskDefinition if not tags: tags = {} # First get the families families = TaskDefinition.objects.using(self.session).families(status=status) task_definitions: list[TaskDefinition] = [] # Now iterate through each family and revision for family in families: if verbose: click.secho(f" Family: {family}", fg="cyan") revisions = TaskDefinition.objects.using(self.session).list( familyPrefix=family, sort="DESC", status=status, ) task_definitions.extend( [ revision for revision in revisions if self.name in revision.container_images if tags.items() <= revision.tags.items() ] ) return PrimaryBoto3ModelQuerySet(task_definitions) # type: ignore[arg-type]
[docs] def services( self, status: Literal["ACTIVE", "INACTIVE", "ALL"] | None = "ACTIVE", tags: dict[str, str] | None = None, verbose: bool = False, ) -> "PrimaryBoto3ModelQuerySet": """ Return a list of ECS Services that use this image. Warning: This will be quite slow if you have a lot of families and revisions, because the only way to deal with this is to get all the task definition families, and then look at each revision to see if one of its containers uses this image. Then look through all our services to see if there is a service that uses that task definition. Args: status: The status of the task definition to filter by. Valid values are ``ACTIVE``, ``INACTIVE``, or ``ALL``. The default is ``ACTIVE``. tags: A dictionary of tags to filter task definitions and services by. The default is an empty dictionary. verbose: If ``True``, print out status messages as we work. Returns: A list of ECS Services that use this image. """ from botocraft.services import Cluster, Service if not tags: tags = {} task_definitions = self.task_definitions( status=status, tags=tags, verbose=verbose ) # There's no way to directly list all services in an account. We have # to list all clusters, then check each service in each cluster. services: list[Service] = [] clusters = Cluster.objects.using(self.session).list() for cluster in clusters: services.extend( [ service for service in cluster.services if service.taskDefinition in task_definitions if tags.items() <= service.tags.items() ] ) return PrimaryBoto3ModelQuerySet(services) # type: ignore[arg-type]
@property def vulnerabilities(self) -> "PrimaryBoto3ModelQuerySet": """ Return a list of vulnerabilities for this image. This is done by using the AWS Inspector2 service to scan the image and return the vulnerabilities. Note: The AWS Inspector service is not instantaneous, but runs occasionally. This doesn't matter much for us, because we are using the ECR immutable images, so we can just get the vulnerabilities for the image we are using. Warning: If this image was just pushed, then the scan may not have run yet. In that case, you will need to wait for the scan to run before you can get the vulnerabilities. Returns: A list of vulnerabilities for this image. """ from botocraft.services import ( FilterCriteria, Finding, StringFilter, ) return Finding.objects.using(self.session).list( # type: ignore[attr-defined] filterCriteria=FilterCriteria( ecrImageHash=[ StringFilter(value=self.imageId.imageDigest, comparison="EQUALS") ], ) )