aboutsummaryrefslogtreecommitdiff
path: root/src/palhm/mod
diff options
context:
space:
mode:
Diffstat (limited to 'src/palhm/mod')
-rw-r--r--src/palhm/mod/__init__.py0
-rw-r--r--src/palhm/mod/aws.py229
2 files changed, 229 insertions, 0 deletions
diff --git a/src/palhm/mod/__init__.py b/src/palhm/mod/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/palhm/mod/__init__.py
diff --git a/src/palhm/mod/aws.py b/src/palhm/mod/aws.py
new file mode 100644
index 0000000..fcb16f1
--- /dev/null
+++ b/src/palhm/mod/aws.py
@@ -0,0 +1,229 @@
+from concurrent.futures import ThreadPoolExecutor, Future
+from decimal import Decimal
+from enum import Enum
+from time import sleep
+from typing import Callable, Iterable
+
+import boto3
+import botocore
+from palhm import BackupBackend, Exec, GlobalContext
+
+
+class CONST (Enum):
+ AWSCLI = "/bin/aws"
+
+def mks3objkey (keys: Iterable[str]) -> str:
+ ret = "/".join(keys)
+ return ret.strip("/")
+
+def mks3uri (bucket: str, keys: Iterable[str]) -> str:
+ return "s3://" + bucket + "/" + "/".join(keys)
+
+class S3BackupBackend (BackupBackend):
+ def __init__ (self, param: dict):
+ self.profile = param.get("profile", "default")
+ self.bucket = param["bucket"]
+ self.root_key = mks3objkey([param["root"]])
+ self.mkprefix = BackupBackend.mkprefix_iso8601
+ self.nb_copy_limit = Decimal(param.get("nb-copy-limit", "Infinity"))
+ self.root_size_limit = Decimal(param.get("root-size-limit", "Infinity"))
+ self.cur_backup_uri = None
+ self.cur_backup_key = None
+ self.sc_sink = param.get("sink-storage-class")
+ self.sc_rot = param.get("rot-storage-class")
+ self.client = None
+ self.sink_list = list[str]()
+
+ def _setup_cur_backup (self, ctx: GlobalContext):
+ self.cur_backup_key = mks3objkey([self.root_key, self.mkprefix()])
+ self.cur_backup_uri = mks3uri(self.bucket, [self.cur_backup_key])
+
+ def open (self, ctx: GlobalContext):
+ self.client = boto3.Session(profile_name = self.profile).client("s3")
+
+ try:
+ for i in range(0, 2):
+ self._setup_cur_backup(ctx)
+ # This should raise
+ self.client.head_object(
+ Bucket = self.bucket,
+ Key = self.cur_backup_key)
+ sleep(1)
+ # Make sure we don't proceed
+ raise FileExistsError(self.cur_backup_uri)
+ except botocore.exceptions.ClientError as e:
+ if e.response["Error"]["Code"] != "404": # expected status code
+ raise
+
+ return super().open(ctx)
+
+ def _cleanup_multiparts (self, ctx: GlobalContext) -> bool:
+ def do_abort (e):
+ try:
+ self.client.abort_multipart_upload(
+ Bucket = self.bucket,
+ Key = e["Key"],
+ UploadId = e["UploadId"])
+ except: pass
+
+ cont = None
+ fl = list[Future]()
+
+ with ThreadPoolExecutor(max_workers = ctx.nb_workers) as th_pool:
+ while True:
+ if cont:
+ r = self.client.list_multipart_uploads(
+ Bucket = self.bucket,
+ Prefix = self.cur_backup_key,
+ KeyMarker = cont[0],
+ UploadIdMarker = cont[1])
+ else:
+ r = self.client.list_multipart_uploads(
+ Bucket = self.bucket,
+ Prefix = self.cur_backup_key)
+
+ for i in r.get("Uploads", iter(())):
+ fl.append(th_pool.submit(do_abort, i))
+ for i in fl:
+ i.result()
+ fl.clear()
+
+ if r["IsTruncated"]:
+ cont = (r["NextKeyMarker"], r["UploadIdMarker"])
+ else:
+ break
+
+ def _foreach_objs (self, ctx: GlobalContext, prefix: str, cb: Callable):
+ cont_token = None
+
+ if not prefix.endswith("/"): prefix += "/"
+
+ while True:
+ if cont_token:
+ r = self.client.list_objects_v2(
+ Bucket = self.bucket,
+ Prefix = prefix,
+ ContinuationToken = cont_token)
+ else:
+ r = self.client.list_objects_v2(
+ Bucket = self.bucket,
+ Prefix = prefix)
+
+ for i in r["Contents"]:
+ cb(i)
+
+ if r["IsTruncated"]:
+ cont_token = r["NextContinuationToken"]
+ else:
+ break
+
+ def _fs_usage_info (self, ctx: GlobalContext) -> Iterable[tuple[str, int]]:
+ du_map = dict[str, int]()
+ ret = list[tuple[str, int]]()
+ prefix = self.root_key + "/"
+ def cb (i):
+ o_key = i["Key"]
+ o_size = i.get("Size", 0)
+ if not o_key.startswith(self.root_key):
+ raise RuntimeError("The endpoint returned an object " +
+ "irrelevant to the request: " + o_key)
+
+ l = o_key.find("/", len(prefix))
+ if l >= 0:
+ o_backup = o_key[:l]
+ else:
+ return
+
+ du_map[o_backup] = du_map.get(o_backup, 0) + o_size
+
+ self._foreach_objs(ctx, prefix, cb)
+ for i in sorted(du_map.keys()):
+ ret.append((i, du_map[i]))
+
+ return ret
+
+ def _excl_fs_copies (self, ctx: GlobalContext) -> set[str]:
+ ret = set[str]()
+ ret.add(self.cur_backup_key)
+ return ret
+
+ def _rm_fs_recursive (self, ctx: GlobalContext, pl: Iterable[str]):
+ l = self._logger(ctx)
+
+ with ThreadPoolExecutor(max_workers = ctx.nb_workers) as th_pool:
+ fl = list[Future]()
+
+ for i in pl:
+ e = Exec()
+ e.argv = [
+ CONST.AWSCLI.value,
+ "--profile=" + self.profile,
+ "s3",
+ "rm",
+ "--quiet",
+ "--recursive",
+ mks3uri(self.bucket, [i]) ]
+ l.debug("run: " + str(e))
+ fl.append(th_pool.submit(e.run, ctx))
+ for i in fl:
+ i.result()
+
+ def _fs_quota_target (self, ctx: GlobalContext) -> tuple[Decimal, Decimal]:
+ return (self.nb_copy_limit, self.root_size_limit)
+
+ def rollback (self, ctx: GlobalContext):
+ if not self.cur_backup_uri is None:
+ self._rm_fs_recursive(ctx, [self.cur_backup_uri])
+
+ def close (self, ctx: GlobalContext):
+ self._cleanup_multiparts(ctx)
+
+ def sink (self, ctx: GlobalContext, path: str) -> Exec:
+ l = self._logger(ctx)
+
+ e = Exec()
+ e.argv = [
+ CONST.AWSCLI.value,
+ "--profile=" + self.profile,
+ "s3",
+ "cp",
+ "--only-show-errors" ]
+ if self.sc_sink:
+ e.argv.append("--storage-class=" + self.sc_sink)
+ e.argv.extend(["-", "/".join([self.cur_backup_uri, path])])
+
+ l.debug("sink: " + str(e))
+ self.sink_list.append(mks3objkey([self.cur_backup_key, path]))
+
+ return e
+
+ def rotate (self, ctx: GlobalContext):
+ ret = super()._do_fs_rotate(ctx)
+
+ if self.sc_rot:
+ def chsc (k):
+ self.client.copy_object(
+ Bucket = self.bucket,
+ CopySource = mks3objkey([self.bucket, k]),
+ Key = k,
+ MetadataDirective = "COPY",
+ StorageClass = self.sc_rot)
+
+ with ThreadPoolExecutor(max_workers = ctx.nb_workers) as th_pool:
+ l = self._logger(ctx)
+ fl = list[Future]()
+
+ for i in self.sink_list:
+ l.debug("chsc: %s %s" % (self.sc_rot, i))
+ fl.append(th_pool.submit(chsc, i))
+ for i in fl:
+ i.result()
+
+ return ret
+
+ def __str__ (self):
+ return "aws-s3"
+
+backup_backends = {
+ "aws-s3": S3BackupBackend
+}