aboutsummaryrefslogtreecommitdiff
path: root/toss-aws-eip/toss-aws-eip.py
diff options
context:
space:
mode:
Diffstat (limited to 'toss-aws-eip/toss-aws-eip.py')
-rwxr-xr-xtoss-aws-eip/toss-aws-eip.py378
1 files changed, 378 insertions, 0 deletions
diff --git a/toss-aws-eip/toss-aws-eip.py b/toss-aws-eip/toss-aws-eip.py
new file mode 100755
index 0000000..2029ae0
--- /dev/null
+++ b/toss-aws-eip/toss-aws-eip.py
@@ -0,0 +1,378 @@
+#!/bin/env python3
+
+# Copyright (c) 2023 David Timber <dxdt@dev.snart.me>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+
+import getopt
+import ipaddress
+import math
+import signal
+import sys
+import time
+from decimal import Decimal
+from enum import Enum
+from typing import Callable
+
+import boto3
+
+VER_STR = "0.0.0 (Nov 2023)"
+
+HELP_STR = '''Usage: {prog} [options] <SPEC> [SPEC ...]
+Repeat allocating Elastic IP address until an address in desired range is
+acquired. The multiple specs will be OR'd.
+
+SPEC: <RANGE | NET>
+ RANGE: <ADDR>-<ADDR>
+ ADDR: an IPv4 address
+ NET: <ADDR>/<CIDR>
+ CIDR: integer [0,32]
+Options:
+ -r <string> aws region name. Used if the default is not set in profile
+ -p <string> aws profile to use. Use the default profile if unspecified
+ -l <string> set the name for the allocated EIP. Defaults to an empty string
+ -x <string> the tag spec in "name=value" format
+ -n <int> limit number of attempts. Default: -1
+ -t <decimal> limit run time in seconds. Default: 'inf'
+ -d do a dry run
+ -h print this message and exit gracefully
+ -v increase verbosity
+ -q shut up
+ -V print other info and exit gracefully
+
+WARNING: check the pricing policy of your region prior to using this tool!'''
+V_STR = '''Version: {ver}
+by David Timber <dxdt@dev.snart.me> (c) 2023'''
+
+# Global classes
+
+class Verbosity (Enum):
+ '''Verbosity Level Enum'''
+ Q = ERR = 0
+ DEFAULT = WARN = 1
+ INFO = 2
+ DBG0 = 3
+ DBG1 = 4
+
+class AddrSpec:
+ '''The class works in two modes - A to B range mode and CIDR mode. `range()`
+ and `net()` factory methods can be used to instantiate a working instance.
+ The default constructor instantiates a unusable dummy instance.'''
+ def _range_in_op (self, addr: ipaddress.IPv4Address) -> bool:
+ return self.a <= addr and addr <= self.b
+
+ def _net_in_op (self, addr: ipaddress.IPv4Address) -> bool:
+ return addr in self.net
+
+ def _range_str_op (self) -> str:
+ return '''{a}-{b}'''.format(a = str(self.a), b = str(self.b))
+
+ def _net_str_op (self) -> str:
+ return str(self.net)
+
+ def range (a: ipaddress.IPv4Address, b: ipaddress.IPv4Address):
+ ret = AddrSpec()
+ ret.a = a
+ ret.b = b
+ ret.net = None
+ ret._contains_f = ret._range_in_op
+ ret._str_f = ret._range_str_op
+ return ret
+
+ def net (n: ipaddress.IPv4Network):
+ ret = AddrSpec()
+ ret.a = ret.b = None
+ ret.net = n
+ ret._contains_f = ret._net_in_op
+ ret._str_f = ret._net_str_op
+ return ret
+
+ def __init__(self):
+ self.a = None
+ self.b = None
+ self.net = None
+
+ def __contains__ (self, addr: ipaddress.IPv4Address) -> bool:
+ return self._contains_f(addr)
+
+ def __str__ (self) -> str:
+ return self._str_f()
+
+class ProgConf:
+ '''The program configuration class. The members represent the parametres
+ from the command line arguments.'''
+ def __init__(self):
+ self.range_specs = list[AddrSpec]()
+ self.profile = None
+ self.tag_spec = []
+ self.nb_runs = math.inf
+ self.runtime = math.inf
+ self.verbose = Verbosity.DEFAULT.value
+ self.help = False
+ self.dryrun = False
+ self.region = None
+ self.ver = False
+
+ def CompVerbosity (self, v: Verbosity) -> bool:
+ return self.verbose >= v.value
+
+class EIP:
+ '''Class for holding the EIP and the allocation id for a successful
+ iteration. The `str()` operator should return a string that can be more or
+ less parsed by a YAML parser.'''
+ def __init__(self, addr: ipaddress.IPv4Address, alloc_id: str):
+ self.addr = addr
+ self.alloc_id = alloc_id
+
+ def __str__(self) -> str:
+ return '''PublicIp: {addr}
+AllocationId: {alloc_id}'''.format(
+ addr = str(self.addr),
+ alloc_id = self.alloc_id)
+
+# Exceptions
+class FormatError (Exception):
+ '''Used for cmd line args parse error'''
+ ...
+
+
+def ParseAddrSpec (x: str) -> AddrSpec:
+ '''If the string contains a hyphen, try to extract the two addresses. If the
+ string contains a slash, treat the characters before it as an IP address and
+ the ones after it a CIDR length.'''
+ sep = x.find("-")
+ if sep > 0: # range
+ a = x[:sep].strip()
+ b = x[sep + 1:].strip()
+ return AddrSpec.range(ipaddress.IPv4Address(a), ipaddress.IPv4Address(b))
+
+ sep = x.find("/")
+ if sep > 0: # network
+ return AddrSpec.net(ipaddress.IPv4Network(x))
+
+ raise FormatError("Invalid address spec: " + x)
+
+def ParseParam (argv: list[str]) -> ProgConf:
+ '''The cmd line args parser'''
+ ret = ProgConf()
+ opt, args = getopt.getopt(argv, "p:l:x:n:t:hdvqr:V")
+
+ for v in args:
+ ret.range_specs.append(ParseAddrSpec(v))
+
+ for k, v in opt:
+ match k:
+ case "-r": ret.region = v
+ case "-p": ret.profile = v
+ case "-l": ret.tag_spec.append({ 'Key': 'Name', 'Value': v })
+ case "-x":
+ i = v.find("=")
+ if i < 0:
+ raise FormatError("Invalid format for option '-x': " + v)
+ tname = v[:i]
+ tvalue = v[i + 1:]
+ ret.tag_spec.append({ 'Key': tname, 'Value': tvalue })
+ case '-n':
+ ret.nb_runs = int(v)
+ if ret.nb_runs <= 0: ret.nb_runs = math.inf
+ case '-t': ret.runtime = Decimal(v) * 1000000000
+ case '-h': ret.help = True
+ case '-v': ret.verbose += 1
+ case '-q': ret.verbose = Verbosity.Q.value
+ case '-d': ret.dryrun = True
+ case '-V': ret.ver = True
+
+ return ret
+
+def GetRefClock () -> int:
+ '''A wrapper function for retrieving the monotonic clock tick value used to
+ measure the process run time.'''
+ return time.monotonic_ns()
+
+def DoPrint (s: str, v: Verbosity):
+ '''Check verbosity level and print the string to stderr.'''
+ if conf.CompVerbosity(v):
+ return sys.stderr.write(s)
+
+try:
+ conf = ParseParam(sys.argv[1:])
+except (FormatError, ipaddress.AddressValueError) as e:
+ sys.stderr.write(str(e) + "\n")
+ sys.exit(2)
+run_cnt = 0 # the number of iteration performed
+run_start = GetRefClock() # time the process started
+it_ret = None # acquired EIP information returned after a successful iteration
+flag = True # flag used to stop the main loop to serve exit signals(TERM, INT)
+
+if conf.help:
+ print(HELP_STR.format(prog = sys.argv[0]))
+ sys.exit(0)
+if conf.ver:
+ print(V_STR.format(ver = VER_STR))
+ sys.exit(0)
+
+if not conf.range_specs:
+ DoPrint("No SPEC specified. Run with '-h' option for help.\n", Verbosity.ERR)
+ sys.exit(2)
+
+# Init Boto3
+session = boto3.Session(
+ region_name = conf.region,
+ profile_name = conf.profile)
+client = session.client("ec2")
+
+def PrintedCall (func: Callable, fname: str, v: Verbosity, **kwargs):
+ DoPrint('''CALL {fname}({kwargs})\n'''.format(fname = fname, kwargs = kwargs), v)
+ return func(**kwargs)
+
+def PrintReturn (fname: str, ret, v: Verbosity):
+ return DoPrint('''RET {fname}(): {ret}'''.format(fname = fname, ret = str(ret)), v)
+
+def ShouldIterate () -> bool:
+ '''Check if the main loop should continue'''
+ global run_cnt, conf, flag
+
+ run_elapsed = GetRefClock() - run_start
+ ret = flag and run_cnt < conf.nb_runs and run_elapsed < conf.runtime
+ run_cnt += 1
+
+ return ret
+
+def OptInSignalHandler (sname: str, handler: Callable):
+ '''Install the signal handler if the signal with the name exists on the
+ platform.'''
+ if hasattr(signal, sname):
+ return signal.signal(signal.Signals(sname), handler)
+
+def HandleSignal (sn, sf):
+ '''Exit signal handler'''
+ global flag
+
+ flag = False
+ # Deregister the handler so that the subsequent signals kill the process
+ signal.signal(sn, signal.SIG_DFL)
+
+ # Signal names are not supported by all platforms
+ try:
+ signame = signal.Signals(sn).name
+ except:
+ signame = "?"
+ DoPrint('''CAUGHT {signame}({sn})\n'''.format(
+ signame = signame,
+ sn = sn), Verbosity.WARN)
+
+def ExtractBotoError (e: Exception) -> str:
+ '''Not many types of Boto3 exceptions are defined for errors. Use the "duck
+ typing technique" to extract the error code returned from the AWS
+ endpoint.'''
+ if (hasattr(e, "response") and
+ type(e.response) == dict and
+ type(e.response.get("Error")) == dict):
+ return e.response["Error"].get("Code")
+
+def IsDryRunError (e: Exception) -> bool:
+ return ExtractBotoError(e) == "DryRunOperation"
+
+def DoIteration () -> EIP | None:
+ '''Returns the EIP in the desired range if successful. None otherwise.'''
+ global conf
+ # Pre-construct the tag spec.
+ tag_spec = [ { 'ResourceType': 'elastic-ip' , 'Tags': conf.tag_spec } ] if conf.tag_spec else None
+
+ try:
+ # Send the allocation request!
+ r = PrintedCall(
+ client.allocate_address,
+ "client.allocate_address",
+ Verbosity.DBG0,
+ TagSpecifications = tag_spec,
+ DryRun = conf.dryrun)
+ PrintReturn("client.allocate_address", r, Verbosity.DBG0)
+ # The method will return the response object if successful
+ ip = ipaddress.IPv4Address(r['PublicIp'])
+ alloc_id = r['AllocationId']
+
+ DoPrint('''Got {ip}\n'''.format(ip = ip), Verbosity.INFO)
+ except Exception as e:
+ if conf.dryrun and IsDryRunError(e):
+ # This is expected for dry run. Carry on with mock data.
+ ip = None
+ alloc_id = "DryIce"
+ else:
+ # Propagate other errors
+ # Could be bad internet connection or insufficient privileges
+ raise e
+
+ if ip:
+ # Check if the address allocated is within the desired range
+ for spec in conf.range_specs:
+ DoPrint("IS {ip} in {range}?: ".format(
+ ip = ip,
+ range = spec
+ ), Verbosity.DBG1)
+ ret = ip in spec # this calls `_net_in_op()` or `_range_in_op()`
+ DoPrint("{verdict}\n".format(
+ verdict = "yes" if ret else "no"), Verbosity.DBG1)
+ if ret:
+ # Instantiate and return the result!
+ return EIP(ip, alloc_id)
+
+ # Reached because the allocated EIP is not in the desired range
+ # Release the EIP.
+ try:
+ r = PrintedCall(
+ client.release_address,
+ "client.release_address",
+ Verbosity.DBG0,
+ AllocationId = alloc_id,
+ DryRun = conf.dryrun)
+ PrintReturn("client.release_address", r, Verbosity.DBG0)
+ except Exception as e:
+ if conf.dryrun and IsDryRunError(e):
+ # This is expected for dry run. Let the function return. It is
+ # possible that the user has allocate rights but not release rights.
+ # If that's the case, this is where the user will find out.
+ pass
+ else:
+ # Propagate other errors
+ # Could be bad internet connection or insufficient privileges
+ raise e
+
+# Catch these normal case signals so that the current iteration can release the
+# EIP before coming out of the main loop.
+OptInSignalHandler("SIGINT", HandleSignal)
+OptInSignalHandler("SIGTERM", HandleSignal)
+OptInSignalHandler("SIGHUP", HandleSignal) # multitoss support
+
+while ShouldIterate():
+ DoPrint('''Iteration #{nr_run} ...\n'''.format(nr_run = run_cnt), Verbosity.INFO)
+ it_ret = DoIteration()
+ if it_ret:
+ DoPrint("Got EIP in target range!\n", Verbosity.INFO)
+ print(it_ret)
+ break
+
+run_elapsed = GetRefClock() - run_start
+DoPrint(
+ '''Run complete after {nb_runs} run(s) in {run_elapsed:.3f}\n'''.format(
+ nb_runs = run_cnt,
+ run_elapsed = run_elapsed / 1000000000.0
+ ), Verbosity.INFO)
+
+sys.exit(0 if it_ret else 3)