diff options
Diffstat (limited to 'src/palhm/mod')
-rw-r--r-- | src/palhm/mod/__init__.py | 0 | ||||
-rw-r--r-- | src/palhm/mod/aws.py | 229 |
2 files changed, 229 insertions, 0 deletions
diff --git a/src/palhm/mod/__init__.py b/src/palhm/mod/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/palhm/mod/__init__.py diff --git a/src/palhm/mod/aws.py b/src/palhm/mod/aws.py new file mode 100644 index 0000000..fcb16f1 --- /dev/null +++ b/src/palhm/mod/aws.py @@ -0,0 +1,229 @@ +from concurrent.futures import ThreadPoolExecutor, Future +from decimal import Decimal +from enum import Enum +from time import sleep +from typing import Callable, Iterable + +import boto3 +import botocore +from palhm import BackupBackend, Exec, GlobalContext + + +class CONST (Enum): + AWSCLI = "/bin/aws" + +def mks3objkey (keys: Iterable[str]) -> str: + ret = "/".join(keys) + return ret.strip("/") + +def mks3uri (bucket: str, keys: Iterable[str]) -> str: + return "s3://" + bucket + "/" + "/".join(keys) + +class S3BackupBackend (BackupBackend): + def __init__ (self, param: dict): + self.profile = param.get("profile", "default") + self.bucket = param["bucket"] + self.root_key = mks3objkey([param["root"]]) + self.mkprefix = BackupBackend.mkprefix_iso8601 + self.nb_copy_limit = Decimal(param.get("nb-copy-limit", "Infinity")) + self.root_size_limit = Decimal(param.get("root-size-limit", "Infinity")) + self.cur_backup_uri = None + self.cur_backup_key = None + self.sc_sink = param.get("sink-storage-class") + self.sc_rot = param.get("rot-storage-class") + self.client = None + self.sink_list = list[str]() + + def _setup_cur_backup (self, ctx: GlobalContext): + self.cur_backup_key = mks3objkey([self.root_key, self.mkprefix()]) + self.cur_backup_uri = mks3uri(self.bucket, [self.cur_backup_key]) + + def open (self, ctx: GlobalContext): + self.client = boto3.Session(profile_name = self.profile).client("s3") + + try: + for i in range(0, 2): + self._setup_cur_backup(ctx) + # This should raise + self.client.head_object( + Bucket = self.bucket, + Key = self.cur_backup_key) + sleep(1) + # Make sure we don't proceed + raise FileExistsError(self.cur_backup_uri) + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] != "404": # expected status code + raise + + return super().open(ctx) + + def _cleanup_multiparts (self, ctx: GlobalContext) -> bool: + def do_abort (e): + try: + self.client.abort_multipart_upload( + Bucket = self.bucket, + Key = e["Key"], + UploadId = e["UploadId"]) + except: pass + + cont = None + fl = list[Future]() + + with ThreadPoolExecutor(max_workers = ctx.nb_workers) as th_pool: + while True: + if cont: + r = self.client.list_multipart_uploads( + Bucket = self.bucket, + Prefix = self.cur_backup_key, + KeyMarker = cont[0], + UploadIdMarker = cont[1]) + else: + r = self.client.list_multipart_uploads( + Bucket = self.bucket, + Prefix = self.cur_backup_key) + + for i in r.get("Uploads", iter(())): + fl.append(th_pool.submit(do_abort, i)) + for i in fl: + i.result() + fl.clear() + + if r["IsTruncated"]: + cont = (r["NextKeyMarker"], r["UploadIdMarker"]) + else: + break + + def _foreach_objs (self, ctx: GlobalContext, prefix: str, cb: Callable): + cont_token = None + + if not prefix.endswith("/"): prefix += "/" + + while True: + if cont_token: + r = self.client.list_objects_v2( + Bucket = self.bucket, + Prefix = prefix, + ContinuationToken = cont_token) + else: + r = self.client.list_objects_v2( + Bucket = self.bucket, + Prefix = prefix) + + for i in r["Contents"]: + cb(i) + + if r["IsTruncated"]: + cont_token = r["NextContinuationToken"] + else: + break + + def _fs_usage_info (self, ctx: GlobalContext) -> Iterable[tuple[str, int]]: + du_map = dict[str, int]() + ret = list[tuple[str, int]]() + prefix = self.root_key + "/" + def cb (i): + o_key = i["Key"] + o_size = i.get("Size", 0) + if not o_key.startswith(self.root_key): + raise RuntimeError("The endpoint returned an object " + + "irrelevant to the request: " + o_key) + + l = o_key.find("/", len(prefix)) + if l >= 0: + o_backup = o_key[:l] + else: + return + + du_map[o_backup] = du_map.get(o_backup, 0) + o_size + + self._foreach_objs(ctx, prefix, cb) + for i in sorted(du_map.keys()): + ret.append((i, du_map[i])) + + return ret + + def _excl_fs_copies (self, ctx: GlobalContext) -> set[str]: + ret = set[str]() + ret.add(self.cur_backup_key) + return ret + + def _rm_fs_recursive (self, ctx: GlobalContext, pl: Iterable[str]): + l = self._logger(ctx) + + with ThreadPoolExecutor(max_workers = ctx.nb_workers) as th_pool: + fl = list[Future]() + + for i in pl: + e = Exec() + e.argv = [ + CONST.AWSCLI.value, + "--profile=" + self.profile, + "s3", + "rm", + "--quiet", + "--recursive", + mks3uri(self.bucket, [i]) ] + l.debug("run: " + str(e)) + fl.append(th_pool.submit(e.run, ctx)) + for i in fl: + i.result() + + def _fs_quota_target (self, ctx: GlobalContext) -> tuple[Decimal, Decimal]: + return (self.nb_copy_limit, self.root_size_limit) + + def rollback (self, ctx: GlobalContext): + if not self.cur_backup_uri is None: + self._rm_fs_recursive(ctx, [self.cur_backup_uri]) + + def close (self, ctx: GlobalContext): + self._cleanup_multiparts(ctx) + + def sink (self, ctx: GlobalContext, path: str) -> Exec: + l = self._logger(ctx) + + e = Exec() + e.argv = [ + CONST.AWSCLI.value, + "--profile=" + self.profile, + "s3", + "cp", + "--only-show-errors" ] + if self.sc_sink: + e.argv.append("--storage-class=" + self.sc_sink) + e.argv.extend(["-", "/".join([self.cur_backup_uri, path])]) + + l.debug("sink: " + str(e)) + self.sink_list.append(mks3objkey([self.cur_backup_key, path])) + + return e + + def rotate (self, ctx: GlobalContext): + ret = super()._do_fs_rotate(ctx) + + if self.sc_rot: + def chsc (k): + self.client.copy_object( + Bucket = self.bucket, + CopySource = mks3objkey([self.bucket, k]), + Key = k, + MetadataDirective = "COPY", + StorageClass = self.sc_rot) + + with ThreadPoolExecutor(max_workers = ctx.nb_workers) as th_pool: + l = self._logger(ctx) + fl = list[Future]() + + for i in self.sink_list: + l.debug("chsc: %s %s" % (self.sc_rot, i)) + fl.append(th_pool.submit(chsc, i)) + for i in fl: + i.result() + + return ret + + def __str__ (self): + return "aws-s3" + +backup_backends = { + "aws-s3": S3BackupBackend +} |