import socket
import subprocess
import time
from contextlib import contextmanager
from datetime import datetime
from functools import wraps
from ipaddress import ip_address
from typing import TYPE_CHECKING, Callable, Final, Literal, cast
from zoneinfo import ZoneInfo
import psutil
from botocraft.services.abstract import PrimaryBoto3ModelQuerySet
if TYPE_CHECKING:
from botocraft.services import (
AMI,
Finding,
Instance,
Reservation,
Tag,
TagSpecification,
)
#: The EC2 resource types. We need this for specifying the proper tag for
#: resources in EC2.
ResourceType = Literal[
"capacity-reservation",
"client-vpn-endpoint",
"customer-gateway",
"carrier-gateway",
"coip-pool",
"dedicated-host",
"dhcp-options",
"egress-only-internet-gateway",
"elastic-ip",
"elastic-gpu",
"export-image-task",
"export-instance-task",
"fleet",
"fpga-image",
"host-reservation",
"image",
"import-image-task",
"import-snapshot-task",
"instance",
"instance-event-window",
"internet-gateway",
"ipam",
"ipam-pool",
"ipam-scope",
"ipv4pool-ec2",
"ipv6pool-ec2",
"key-pair",
"launch-template",
"local-gateway",
"local-gateway-route-table",
"local-gateway-virtual-interface",
"local-gateway-virtual-interface-group",
"local-gateway-route-table-vpc-association",
"local-gateway-route-table-virtual-interface-group-association",
"natgateway",
"network-acl",
"network-interface",
"network-insights-analysis",
"network-insights-path",
"network-insights-access-scope",
"network-insights-access-scope-analysis",
"placement-group",
"prefix-list",
"replace-root-volume-task",
"reserved-instances",
"route-table",
"security-group",
"security-group-rule",
"snapshot",
"spot-fleet-request",
"spot-instances-request",
"subnet",
"subnet-cidr-reservation",
"traffic-mirror-filter",
"traffic-mirror-session",
"traffic-mirror-target",
"transit-gateway",
"transit-gateway-attachment",
"transit-gateway-connect-peer",
"transit-gateway-multicast-domain",
"transit-gateway-policy-table",
"transit-gateway-route-table",
"transit-gateway-route-table-announcement",
"volume",
"vpc",
"vpc-endpoint",
"vpc-endpoint-connection",
"vpc-endpoint-service",
"vpc-endpoint-service-permission",
"vpc-peering-connection",
"vpn-connection",
"vpn-gateway",
"vpc-flow-log",
"capacity-reservation-fleet",
"traffic-mirror-filter-rule",
"vpc-endpoint-connection-device-type",
"verified-access-instance",
"verified-access-group",
"verified-access-endpoint",
"verified-access-policy",
"verified-access-trust-provider",
"vpn-connection-device-type",
"vpc-block-public-access-exclusion",
"ipam-resource-discovery",
"ipam-resource-discovery-association",
"instance-connect-endpoint",
]
# ----------
# Decorators
# ----------
def ec2_instances_only(
func: Callable[..., "PrimaryBoto3ModelQuerySet"],
) -> Callable[..., "PrimaryBoto3ModelQuerySet"]:
"""
Wraps a boto3 method that returns a list of :py:class:`Reservation` objects
to return a list of :py:class:`Instance` objects instead.
"""
@wraps(func)
def wrapper(*args, **kwargs) -> "PrimaryBoto3ModelQuerySet":
qs = func(*args, **kwargs)
instances: list["Instance"] = [] # noqa: UP037
if hasattr(qs, "results") and qs.results:
for reservation in qs.results:
instances.extend(cast("list[Instance]", reservation.Instances)) # type: ignore[attr-defined]
return PrimaryBoto3ModelQuerySet(instances) # type: ignore[arg-type]
return PrimaryBoto3ModelQuerySet([])
return wrapper
[docs]def ec2_instance_only(
func: Callable[..., "Reservation | None"],
) -> Callable[..., "Instance | None"]:
"""
Wraps a boto3 method that returns a list of :py:class:`Reservation` objects
to return a single :py:class:`Instance` object instead.
"""
@wraps(func)
def wrapper(*args, **kwargs) -> "Instance | None":
reservation = func(*args, **kwargs)
if not reservation:
return None
return cast("list[Instance]", reservation.Instances)[0]
return wrapper
# --------------
# Manager mixins
# --------------
[docs]class AMIManagerMixin:
"""
A mixin is used on :py:class:`botocraft.services.ec2.AMIManager` to add
miscellaneous methods to the class that are not normally part of the
object.
"""
#: The maximum number of filters that can be added to to :meth:`AMI.objects.list`.
MAX_AMI_FILTER_SIZE: Final[int] = 200
def _get_in_use_instance_amis(
self,
check_amis: list["AMI"],
amis: list["AMI"] | None = None,
) -> list["AMI"]:
"""
Return a list of AMIs that are in use by a running or stopped instance.
Args:
check_amis: the list of :py:class:`AMI` to check.
Keyword Args:
amis: the list of :py:class:`AMI` that have been identified so far as
being in use.
Returns:
A list of :py:class:`AMI` objects that are in use by a running or
stopped instance.
"""
from botocraft.services import (
Filter,
Instance,
)
if not amis:
amis = []
if not check_amis:
return amis
ami_by_id = {ami.ImageId: ami for ami in check_amis}
filters = [Filter(Name="image-id", Values=list(ami_by_id.keys()))]
instances = Instance.objects.list(Filters=filters)
seen_amis = {ami.ImageId for ami in amis}
for instance in instances:
image_id = getattr(instance, "ImageId", None)
if image_id in ami_by_id and image_id not in seen_amis:
seen_amis.add(image_id)
amis.append(ami_by_id[image_id])
return amis
def _get_in_use_asg_amis(
self,
check_amis: list["AMI"],
amis: list["AMI"] | None = None,
) -> list["AMI"]:
"""
Return a list of AMIs that are in use by an autoscaling group.
Args:
check_amis: the list of :py:class:`AMI` to check.
Keyword Args:
amis: the list of :py:class:`AMI` that have been identified so far as
being in use.
Returns:
A list of :py:class:`AMI` objects that are in use by an autoscaling group.
"""
# Avoid circular import
from botocraft.services import (
AutoScalingGroup,
LaunchConfiguration,
LaunchTemplateVersion,
ResponseLaunchTemplateData,
)
if amis is None:
amis = []
if not check_amis:
return amis
ami_by_id = {ami.ImageId: ami for ami in check_amis}
seen_amis = {ami.ImageId for ami in amis}
# Now search for any AMIs that are used by an autoscaling group
autoscaling_groups = AutoScalingGroup.objects.list()
for autoscaling_group in autoscaling_groups:
autoscaling_group = cast("AutoScalingGroup", autoscaling_group)
image_id = None
# First check if the AMI is used by a launch configuration
if autoscaling_group.LaunchConfigurationName:
launch_configuration = cast(
"LaunchConfiguration",
autoscaling_group.launch_configuration,
)
image_id = launch_configuration.ImageId
else:
# If there's no launch configuration, then the ASG uses a launch
# template. Check if the AMI is used by the launch template.
template = cast(
"LaunchTemplateVersion", autoscaling_group.launch_template
)
if template and template.LaunchTemplateData:
image_id = cast(
"ResponseLaunchTemplateData", template.LaunchTemplateData
).ImageId
if image_id in ami_by_id and image_id not in seen_amis:
seen_amis.add(image_id)
amis.append(ami_by_id[image_id])
return amis
[docs] def in_use(
self,
owners: list[str] | None = None,
tags: dict[str, str] | None = None,
created_since: datetime | None = None,
amis: list["AMI"] | None = None,
) -> "PrimaryBoto3ModelQuerySet":
"""
Return a list of AMIs that are currently in use by a running or stopped
instance, or by an autoscaling group's launch configuration or launch
template.
If ``amis`` is specified, all other filters are ignored, because we expect
the user to know which AMIs they are submitting.
.. important::
We're not checking for AMIs in the following places:
- CloudFormation templates
- OpsWorks stacks
- Elastic Beanstalk environments
- Launch Template Versions not in use by an autoscaling group
Keyword Args:
owners: Scopes the results to AMIs with the specified owners. You
can specify a combination of Amazon Web Services account IDs,
``self``, ``amazon``, and ``aws-marketplace``. If you omit this
parameter, the results include all images for which you have launch
permissions, regardless of ownership. If not specified, the
default is ``self``.
tags: Filters the AMIs to those who match the these tags.
created_since: Filters the AMIs to those created since this date.
amis: Filters the AMIs to those in this list. All other filters are
ignored if this is specified.
"""
from botocraft.services import (
Filter,
)
_owners = owners or ["self"]
_filters: list[Filter] = []
if tags:
_filters = [
Filter(Name=f"tag:{key}", Values=[value]) for key, value in tags.items()
]
if created_since:
# First convert the timezone to UTC if this is a timezone-aware
# datetime object.
if created_since.tzinfo:
created_since = created_since.astimezone(ZoneInfo("UTC"))
# Now append the filter to the list of filters for the AMI listing.
_filters.append(
Filter(Name="creation-date", Values=[created_since.isoformat()])
)
if amis is not None:
check_amis = amis
elif _filters:
check_amis = self.list(Owners=_owners, Filters=_filters) # type: ignore[attr-defined]
else:
check_amis = self.list(Owners=_owners) # type: ignore[attr-defined]
in_use_amis = self._get_in_use_instance_amis(list(check_amis))
in_use_amis = self._get_in_use_asg_amis(check_amis, amis=in_use_amis)
return PrimaryBoto3ModelQuerySet(in_use_amis) # type: ignore[arg-type]
# -------------
# Model mixins
# -------------
[docs]class InstanceModelMixin:
"""
Used on :py:class:`botocraft.services.ec2.Instance` to add miscellaneous
methods to the class that are not normally part of the object.
"""
def __maybe_resolve_ip(self, host: str) -> str:
"""
Given a hostname or IP address, return the IP address. If the host is
an IP address, return it as is. If the host is a hostname, resolve it
to an IP address.
Args:
host: either an IP address of the remote host to connect to, or
a hostname of that host
Returns:
The IP address of the host.
"""
try:
ip_address(host)
except ValueError:
# If it is not an IP address, then it must be a hostname.
# Resolve the hostname to an IP address.
try:
return socket.gethostbyname(host)
except socket.gaierror as e:
msg = f"Could not resolve hostname {host}: {e}"
raise RuntimeError(msg) from e
return host
def __find_open_port(self, start_port: int) -> int:
"""
Find an open port starting from ``start_port``. This is used to find
an open port for the SSH tunnel.
Args:
start_port: The port to start searching from.
Returns:
An open port.
Raises:
ValueError: If no open port is found.
"""
port = start_port
while True:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
if sock.connect_ex(("127.0.0.1", port)) != 0:
break
port += 1
if port > 65535: # noqa: PLR2004
msg = f"No available port found starting at {start_port}."
raise ValueError(msg)
return port
[docs] @contextmanager
def tunnel(
self,
host: str,
remote_port: int,
local_port: int | None = None,
profile: str | None = None,
):
"""
A context manager for opening and closing a tunnel.
This will call `open_tunnel` when entering the context and
`close_tunnel` when exiting the context.
Args:
host: The remote host to connect to (IP or hostname).
remote_port: The remote port to connect to.
local_port: The local port to use. If None, an unused port will be chosen.
profile: The AWS profile to use. If None, the default profile will be used.
Raises:
ValueError: If the local port is already in use.
RuntimeError: If the instance is not connected to SSM or if there
is an error when starting the session or SSH tunnel.
Yields:
The local port used for the tunnel.
"""
local_port = self.open_tunnel(
host=host,
remote_port=remote_port,
local_port=local_port,
profile=profile,
)
try:
yield local_port
finally:
self.close_tunnel(host=host, local_port=local_port)
[docs] def open_tunnel(
self,
host: str,
remote_port: int,
local_port: int | None = None,
profile: str | None = None,
) -> int:
"""
Open a tunnel to the instance using SSM and SSH. This is useful for
connecting to a database or other service on the instance that
is in a private subnet. This will open a tunnel from the ``local_port``
on the local machine through the instance to the ``remote_port`` on
``host``.
If ``local_port`` is not specified, a random port will be chosen
starting from between 8800 and 65535.
Args:
host: either an IP address of the remote host to connect to, or
a hostname of that host
remote_port: The remote port to connect to.
Keyword Args:
local_port: The local port to connect to.
profile: The AWS profile to use. If not specified, the default
profile will be used.
Raises:
ValueError: If the local port is already in use.
RuntimeError: If the instance is not connected to SSM or if there
is an error when starting the session or SSH tunnel.
Returns:
The local port that was used for the tunnel.
"""
# Find an unused local port if local_port is None
if local_port is None:
local_port = self.__find_open_port(8800)
else:
# Check if the local port is already in use
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
if sock.connect_ex(("127.0.0.1", local_port)) == 0:
msg = f"Local port {local_port} is already in use."
raise ValueError(msg)
host_ip = self.__maybe_resolve_ip(host)
# Build the AWS SSM start-session command
ssm_command = [
"aws",
"ssm",
"start-session",
"--target",
self.InstanceId, # type: ignore[attr-defined]
"--document-name",
"AWS-StartPortForwardingSessionToRemoteHost",
"--parameters",
f"host={host_ip},portNumber={remote_port},localPortNumber={local_port}",
]
if profile:
ssm_command.extend(["--profile", profile])
try:
# Start the SSM session
ssm_process = subprocess.Popen(
ssm_command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except FileNotFoundError as e:
msg = "AWS CLI is not installed or not in PATH"
raise RuntimeError(msg) from e
# Wait for the port to be open
for _ in range(40):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
if sock.connect_ex(("127.0.0.1", local_port)) == 0:
break
time.sleep(0.25)
else:
# If the port is not open after 10 seconds, kill the process
ssm_process.kill()
msg = (
f"Failed to open tunnel to {host_ip}:{remote_port} on local "
f"port {local_port}."
)
raise RuntimeError(msg)
# Store the processes for later management
if self.Tunnels is None: # type: ignore[has-type]
self.Tunnels = {} # type: ignore[var-annotated]
if host_ip not in self.Tunnels:
self.Tunnels[host_ip] = []
self.Tunnels[host_ip].append(
{
"ssm_process": ssm_process,
"local_port": local_port,
}
)
return local_port
[docs] def close_tunnel(self, host: str, local_port: int | None = None) -> None:
"""
Close one or more tunnels to ``host``. This will terminate the SSM
session(s) and SSH process(es) that were opened by
:py:meth:`start_tunnel`.
If ``local_port`` is not specified, all tunnels to the host will be
closed. If ``local_port`` is specified, only the tunnel to that port
will be closed.
Args:
host: Either an IP address of the remote host to connect to, or
a hostname of that host.
Keyword Args:
local_port: The local port to close. If not specified, all
tunnels to the host will be closed.
Raises:
ValueError: If no tunnels are found for the given host or port.
"""
def terminate_process(process: subprocess.Popen) -> None:
"""
Helper function to terminate a process and its children.
Args:
process: The process to terminate.
"""
try:
for child in psutil.Process(process.pid).children(recursive=True):
child.terminate()
process.terminate()
except psutil.NoSuchProcess:
pass
host_ip = self.__maybe_resolve_ip(host)
if self.Tunnels is None or host_ip not in self.Tunnels:
msg = f"No open tunnels found for {host_ip}. Did you call start_tunnel?"
raise ValueError(msg)
tunnels = self.Tunnels[host_ip]
if local_port is not None:
# Close the specific tunnel for the given local port
for tunnel in tunnels:
if tunnel["local_port"] == local_port:
terminate_process(tunnel["ssm_process"])
tunnels.remove(tunnel)
break
else:
msg = f"No open tunnel found for {host_ip} on local port {local_port}."
raise ValueError(msg)
else:
# Close all tunnels for the given host
for tunnel in tunnels:
terminate_process(tunnel["ssm_process"])
del self.Tunnels[host_ip]
[docs]class SecurityGroupModelMixin:
"""
A mixin is used on :py:class:`botocraft.services.ec2.SecurityGroup` to
enhance the ``.save()`` method to allow for managing ingress and egress
rules at the same time as saving the security group.
Normally this is done with several boto3 calls, but this mixin allows for a
single call to ``.save()`` to create a security group and manage the rules.
"""
[docs] def save(self, **kwargs):
"""
Save the model. For security groups, ingress rules are managed via
separate boto3 calls than the security group itself. This override of
the ``save`` method will allow the user to create a security group and
add ingress rules in one step.
"""
# TODO: this needs to be enhanced to handle egress rules as well.
if not self.pk:
group_id = self.objects.create(self, **kwargs)
self.objects.using(self.session).authorize_ingress(
group_id, self.IpPermissions, **kwargs
)
else:
old_obj = self.objects.using(self.session).get(self.pk)
if self.IpPermissions != old_obj.IpPermissions:
if old_obj.IpPermissions:
self.objects.using(self.session).revoke_ingress(
self.pk, old_obj.IpPermissions, **kwargs
)
if self.IpPermissions:
self.objects.using(self.session).authorize_ingress(
self.pk, self.IpPermissions, **kwargs
)
[docs]class AMIModelMixin:
@property
def in_use(self) -> bool:
"""
Return ``True`` if the AMI is in use by a running or stopped instance.
"""
ids = self.objects.in_use(image_id=self.ImageId).values_list( # type: ignore[attr-defined]
"ImageId", flat=True
)
return self.ImageId in ids # type: ignore[attr-defined]
@property
def vulnerabilities(self) -> "PrimaryBoto3ModelQuerySet":
"""
Return a list of vulnerabilities for the instance. This is a
convenience method to get the vulnerabilities for the instance.
Returns:
A list of :py:class:`Finding` objects.
"""
from botocraft.services import (
FilterCriteria,
Finding,
StringFilter,
)
return Finding.objects.using(self.session).list( # type: ignore[attr-defined]
FilterCriteria(
ec2InstanceImageId=[
StringFilter(value=self.ImageId, comparison="EQUALS") # type: ignore[attr-defined]
],
),
)
class SubnetModelMixin:
"""
A mixin is used on :py:class:`botocraft.services.ec2.Subnet` to add
miscellaneous methods to the class that are not normally part of the
object.
"""
@property
def vulnerabilities(self) -> list["Finding"]:
"""
Return a list of vulnerabilities for the instance. This is a
convenience method to get the vulnerabilities for the instance.
Returns:
A list of :py:class:`Finding` objects.
"""
from botocraft.services import (
FilterCriteria,
Finding,
StringFilter,
)
return Finding.objects.using(self.session).list( # type: ignore[attr-defined]
filterCriteria=FilterCriteria(
ec2InstanceSubnetId=[
StringFilter(value=self.SubnetId, comparison="EQUALS") # type: ignore[attr-defined]
],
),
)
class VpcModelMixin:
"""
A mixin is used on :py:class:`botocraft.services.ec2.Vpc` to add
miscellaneous methods to the class that are not normally part of the
object.
"""
@property
def vulnerabilities(self) -> list["Finding"]:
"""
Return a list of vulnerabilities for the instance. This is a
convenience method to get the vulnerabilities for the instance.
Returns:
A list of :py:class:`Finding` objects.
"""
from botocraft.services import (
FilterCriteria,
Finding,
StringFilter,
)
return Finding.objects.using(self.session).list( # type: ignore[attr-defined]
filterCriteria=FilterCriteria(
ec2InstanceVpcId=[
StringFilter(value=self.VpcId, comparison="EQUALS") # type: ignore[attr-defined]
],
),
)