aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/.gitignore1
-rw-r--r--src/conf/py-debug/aws.sample.jsonc160
-rw-r--r--src/conf/py-debug/conf.d/core.jsonc44
-rw-r--r--src/conf/py-debug/localfs.sample.jsonc157
-rw-r--r--src/conf/py-debug/null.sample.jsonc140
-rwxr-xr-xsrc/palhm-dnssec-check.sh43
-rwxr-xr-xsrc/palhm.py131
-rw-r--r--src/palhm/__init__.py736
-rw-r--r--src/palhm/mod/__init__.py0
-rw-r--r--src/palhm/mod/aws.py229
10 files changed, 1641 insertions, 0 deletions
diff --git a/src/.gitignore b/src/.gitignore
new file mode 100644
index 0000000..bee8a64
--- /dev/null
+++ b/src/.gitignore
@@ -0,0 +1 @@
+__pycache__
diff --git a/src/conf/py-debug/aws.sample.jsonc b/src/conf/py-debug/aws.sample.jsonc
new file mode 100644
index 0000000..46ad562
--- /dev/null
+++ b/src/conf/py-debug/aws.sample.jsonc
@@ -0,0 +1,160 @@
+// PALHM Instance Config
+{
+ "include": [ "conf/py-debug/conf.d/core.jsonc" ],
+ "modules": [ "aws" ],
+ "nb-workers": 0, // assumed $(nproc) - default
+ // "nb-workers": 1, // to disable concurrent task despatch
+ // To unlimit the number of workers.
+ // Does not fail on resource alloc failure.
+ // "nb-workers": -1,
+ "vl": 4,
+ "tasks": [
+ {
+ "id": "backup",
+ "type": "backup",
+ "backend": "aws-s3",
+ "backend-param": {
+ // "profile": "default",
+ "bucket": "palhm.test",
+ "root": "/palhm/backup",
+ "prefix": {
+ "type": "default"
+ // "type": "iso8601",
+ // "timespec": "seconds",
+ // "tz": "utc"
+ },
+ // "sink-storage-class": "STANDARD_IA",
+ // "rot-storage-class": "ONEZONE_IA",
+ "nb-copy-limit": 2, // or Infinity assumed(not in JSON spec)
+ "root-size-limit": "Infinity" // or Infinity assumed
+ },
+ "object-groups": [
+ { "id": "pre-start" },
+ {
+ "id": "data-dump",
+ "depends": [ "pre-start" ]
+ },
+ {
+ "id": "tar-media-0",
+ "depends": [ "data-dump" ]
+ },
+ {
+ "id": "tar-media-1",
+ "depends": [ "data-dump" ]
+ }
+ ],
+ "objects": [
+ {
+ "path": "pm-list.gz",
+ "group": "pre-start",
+ "pipeline": [
+ { "type": "exec", "exec-id": "dnf-list-installed" },
+ { "type": "exec", "exec-id": "filter-gzip-plain" }
+ ]
+ },
+ {
+ "path": "lsblk.json.gz",
+ "group": "pre-start",
+ "pipeline": [
+ {
+ "type": "exec-append",
+ "exec-id": "lsblk-all-json",
+ "argv": [ "-a" ]
+ },
+ { "type": "exec", "exec-id": "filter-gzip-plain" }
+ ]
+ },
+ {
+ "path": "random-dump.sql.xz",
+ "group": "data-dump",
+ "pipeline": [
+ {
+ "type": "exec-inline",
+ "argv": [
+ "/bin/dd",
+ "if=/dev/urandom",
+ "bs=4096",
+ "count=512",
+ "status=none"
+ ]
+ },
+ { "type": "exec", "exec-id": "filter-xz-parallel" }
+ ]
+ },
+ {
+ "path": "random-dump.0.xz",
+ "group": "tar-media-0",
+ "pipeline": [
+ {
+ "type": "exec-inline",
+ "argv": [
+ "/bin/dd",
+ "if=/dev/zero",
+ "bs=4096",
+ "count=512",
+ "status=none"
+ ]
+ },
+ { "type": "exec", "exec-id": "filter-xz-parallel" }
+ ]
+ },
+ {
+ "path": "random-dump.1.xz",
+ "group": "tar-media-1",
+ "pipeline": [
+ {
+ "type": "exec-inline",
+ "argv": [
+ "/bin/dd",
+ "if=/dev/zero",
+ "bs=4096",
+ "count=512",
+ "status=none"
+ ]
+ },
+ { "type": "exec", "exec-id": "filter-xz-parallel" }
+ ]
+ }
+ ]
+ },
+ {
+ "id": "update",
+ "type": "routine",
+ "routine": [
+ {
+ "type": "exec-inline",
+ "argv": [ "/bin/echo", "0" ]
+ },
+ {
+ "type": "exec-inline",
+ "argv": [ "/bin/sleep", "1" ]
+ },
+ {
+ "type": "exec-inline",
+ "argv": [ "/bin/echo", "1" ]
+ }
+ ]
+ },
+ {
+ "id": "default",
+ "type": "routine",
+ "routine": [
+ { "type": "task", "task-id": "backup" },
+ { "type": "task", "task-id": "update" },
+ {
+ // Block SIGTERM from systemd/init.d so the program is not
+ // affected by the reboot command.
+ "type": "builtin",
+ "builtin-id": "sigmask",
+ "param": [
+ { "action": "block", "sig": [ "TERM" ] }
+ ]
+ },
+ {
+ "type": "exec-inline",
+ "argv": [ "/bin/true" ]
+ }
+ ]
+ }
+ ]
+}
diff --git a/src/conf/py-debug/conf.d/core.jsonc b/src/conf/py-debug/conf.d/core.jsonc
new file mode 100644
index 0000000..4afe7f5
--- /dev/null
+++ b/src/conf/py-debug/conf.d/core.jsonc
@@ -0,0 +1,44 @@
+// PALHM Core Config
+{
+ "execs": [
+ // {
+ // "id": "Exec ID",
+ // "argv": [ "cmd", "--option1=opt1_val", "-o", "opt2_val" ],
+ // "env": { "NAME": "VAL" },
+ // "ec": "0", // this is assumed
+ // "ec": "0-127", // inclusive range (not terminated by a signal)
+ // "ec": "<1", // range (only 0)
+ // "ec": "<=1", // range (0 and 1)
+ // "ec": ">0", // range (always fail)
+ // "ec": ">=0", // range (only 0)
+ // "vl-stderr": 1 // verbosity level of stderr produced by this process
+ // verbosity level of stderr produced by this process. Ignored if used
+ // as part of pipeline
+ // "vl-stdout": 2
+ // },
+ {
+ "id": "tar",
+ "argv": [ "/usr/bin/tar", "--xattrs", "--selinux" ]
+ },
+ {
+ "id": "filter-xz-parallel",
+ "argv": [ "/usr/bin/xz", "-T0" ]
+ },
+ {
+ "id": "filter-gzip-plain",
+ "argv": [ "/usr/bin/gzip" ]
+ },
+ {
+ "id": "filter-zstd-plain",
+ "argv": [ "/usr/bin/zstd" ]
+ },
+ {
+ "id": "dnf-list-installed",
+ "argv": [ "/usr/bin/dnf", "-yq", "list", "installed" ]
+ },
+ {
+ "id": "lsblk-all-json",
+ "argv": [ "/usr/bin/lsblk", "-JbOa" ]
+ }
+ ]
+}
diff --git a/src/conf/py-debug/localfs.sample.jsonc b/src/conf/py-debug/localfs.sample.jsonc
new file mode 100644
index 0000000..ec12808
--- /dev/null
+++ b/src/conf/py-debug/localfs.sample.jsonc
@@ -0,0 +1,157 @@
+// PALHM Instance Config
+{
+ "include": [ "conf/py-debug/conf.d/core.jsonc" ],
+ "nb-workers": 0, // assumed $(nproc) - default
+ // "nb-workers": 1, // to disable concurrent task despatch
+ // To unlimit the number of workers.
+ // Does not fail on resource alloc failure.
+ // "nb-workers": -1,
+ "vl": 4,
+ "tasks": [
+ {
+ "id": "backup",
+ "type": "backup",
+ "backend": "localfs",
+ "backend-param": {
+ "root": "/var/tmp/palhm-backup-root",
+ "prefix": {
+ "type": "default"
+ // "type": "iso8601",
+ // "timespec": "seconds",
+ // "tz": "utc"
+ },
+ // "dmode": "755",
+ // "fmode": "644",
+ "nb-copy-limit": 2,
+ "root-size-limit": "Infinity"
+ },
+ "object-groups": [
+ { "id": "pre-start" },
+ {
+ "id": "data-dump",
+ "depends": [ "pre-start" ]
+ },
+ {
+ "id": "tar-media-0",
+ "depends": [ "data-dump" ]
+ },
+ {
+ "id": "tar-media-1",
+ "depends": [ "data-dump" ]
+ }
+ ],
+ "objects": [
+ {
+ "path": "pm-list.gz",
+ "group": "pre-start",
+ "pipeline": [
+ { "type": "exec", "exec-id": "dnf-list-installed" },
+ { "type": "exec", "exec-id": "filter-gzip-plain" }
+ ]
+ },
+ {
+ "path": "lsblk.json.gz",
+ "group": "pre-start",
+ "pipeline": [
+ {
+ "type": "exec-append",
+ "exec-id": "lsblk-all-json",
+ "argv": [ "-a" ]
+ },
+ { "type": "exec", "exec-id": "filter-gzip-plain" }
+ ]
+ },
+ {
+ "path": "random-dump.sql.xz",
+ "group": "data-dump",
+ "pipeline": [
+ {
+ "type": "exec-inline",
+ "argv": [
+ "/bin/dd",
+ "if=/dev/urandom",
+ "bs=4096",
+ "count=512",
+ "status=none"
+ ]
+ },
+ { "type": "exec", "exec-id": "filter-xz-parallel" }
+ ]
+ },
+ {
+ "path": "random-dump.0.xz",
+ "group": "tar-media-0",
+ "pipeline": [
+ {
+ "type": "exec-inline",
+ "argv": [
+ "/bin/dd",
+ "if=/dev/zero",
+ "bs=4096",
+ "count=512",
+ "status=none"
+ ]
+ },
+ { "type": "exec", "exec-id": "filter-xz-parallel" }
+ ]
+ },
+ {
+ "path": "random-dump.1.xz",
+ "group": "tar-media-1",
+ "pipeline": [
+ {
+ "type": "exec-inline",
+ "argv": [
+ "/bin/dd",
+ "if=/dev/zero",
+ "bs=4096",
+ "count=512",
+ "status=none"
+ ]
+ },
+ { "type": "exec", "exec-id": "filter-xz-parallel" }
+ ]
+ }
+ ]
+ },
+ {
+ "id": "update",
+ "type": "routine",
+ "routine": [
+ {
+ "type": "exec-inline",
+ "argv": [ "/bin/echo", "0" ]
+ },
+ {
+ "type": "exec-inline",
+ "argv": [ "/bin/sleep", "1" ]
+ },
+ {
+ "type": "exec-inline",
+ "argv": [ "/bin/echo", "1" ]
+ }
+ ]
+ },
+ {
+ "id": "default",
+ "type": "routine",
+ "routine": [
+ { "type": "task", "task-id": "backup" },
+ { "type": "task", "task-id": "update" },
+ {
+ // Block SIGTERM from systemd/init.d so the program is not
+ // affected by the reboot command.
+ "type": "builtin",
+ "builtin-id": "sigmask",
+ "param": [
+ { "action": "block", "sig": [ "TERM" ] }
+ ]
+ },
+ {
+ "type": "exec-inline",
+ "argv": [ "/bin/true" ]
+ }
+ ]
+ }
+ ]
+}
diff --git a/src/conf/py-debug/null.sample.jsonc b/src/conf/py-debug/null.sample.jsonc
new file mode 100644
index 0000000..a83de95
--- /dev/null
+++ b/src/conf/py-debug/null.sample.jsonc
@@ -0,0 +1,140 @@
+// PALHM Instance Config
+{
+ "include": [ "conf/py-debug/conf.d/core.jsonc" ],
+ "nb-workers": 1,
+ "vl": 3,
+ "tasks": [
+ {
+ "id": "backup",
+ "type": "backup",
+ "backend": "null",
+ "object-groups": [
+ { "id": "pre-start" },
+ {
+ "id": "data-dump",
+ "depends": [ "pre-start" ]
+ },
+ {
+ "id": "tar-media-0",
+ "depends": [ "data-dump" ]
+ },
+ {
+ "id": "tar-media-1",
+ "depends": [ "data-dump" ]
+ }
+ ],
+ "objects": [
+ {
+ "path": "pm-list.gz",
+ "group": "pre-start",
+ "pipeline": [
+ { "type": "exec", "exec-id": "dnf-list-installed" },
+ { "type": "exec", "exec-id": "filter-gzip-plain" }
+ ]
+ },
+ {
+ "path": "lsblk.json.gz",
+ "group": "pre-start",
+ "pipeline": [
+ {
+ "type": "exec-append",
+ "exec-id": "lsblk-all-json",
+ "argv": [ "-a" ]
+ },
+ { "type": "exec", "exec-id": "filter-gzip-plain" }
+ ]
+ },
+ {
+ "path": "random-dump.sql.xz",
+ "group": "data-dump",
+ "pipeline": [
+ {
+ "type": "exec-inline",
+ "argv": [
+ "/bin/dd",
+ "if=/dev/urandom",
+ "bs=4096",
+ "count=512",
+ "status=none"
+ ]
+ },
+ { "type": "exec", "exec-id": "filter-xz-parallel" }
+ ]
+ },
+ {
+ "path": "random-dump.0.xz",
+ "group": "tar-media-0",
+ "pipeline": [
+ {
+ "type": "exec-inline",
+ "argv": [
+ "/bin/dd",
+ "if=/dev/zero",
+ "bs=4096",
+ "count=512",
+ "status=none"
+ ]
+ },
+ { "type": "exec", "exec-id": "filter-xz-parallel" }
+ ]
+ },
+ {
+ "path": "random-dump.1.xz",
+ "group": "tar-media-1",
+ "pipeline": [
+ {
+ "type": "exec-inline",
+ "argv": [
+ "/bin/dd",
+ "if=/dev/zero",
+ "bs=4096",
+ "count=512",
+ "status=none"
+ ]
+ },
+ { "type": "exec", "exec-id": "filter-xz-parallel" }
+ ]
+ }
+ ]
+ },
+ {
+ "id": "update",
+ "type": "routine",
+ "routine": [
+ {
+ "type": "exec-inline",
+ "argv": [ "/bin/echo", "0" ]
+ },
+ {
+ "type": "exec-inline",
+ "argv": [ "/bin/sleep", "1" ]
+ },
+ {
+ "type": "exec-inline",
+ "argv": [ "/bin/echo", "1" ]
+ }
+ ]
+ },
+ {
+ "id": "default",
+ "type": "routine",
+ "routine": [
+ { "type": "task", "task-id": "backup" },
+ { "type": "task", "task-id": "update" },
+ {
+ // Block SIGTERM from systemd/init.d so the program is not
+ // affected by the reboot command.
+ "type": "builtin",
+ "builtin-id": "sigmask",
+ "param": [
+ { "action": "block", "sig": [ "TERM" ] }
+ ]
+ },
+ {
+ "type": "exec-inline",
+ "argv": [ "/bin/true" ]
+ }
+ ]
+ }
+ ]
+}
diff --git a/src/palhm-dnssec-check.sh b/src/palhm-dnssec-check.sh
new file mode 100755
index 0000000..f5ee466
--- /dev/null
+++ b/src/palhm-dnssec-check.sh
@@ -0,0 +1,43 @@
+#!/bin/bash
+set -e
+. "$( dirname -- "${BASH_SOURCE[0]}" )"/common.sh
+
+do_query () {
+ # dig returns 0 upon successful reception and parse of the response message.
+ # All the other exit codes other than 0 will cause the script to terminate
+ # as a result of set -e. +short option makes dig return the values of RR.
+ # We assume that a status code has returned when dig produces no output with
+ # the option. Caution must be taken in this approach as zones with no
+ # record will also return nothing with the status code zero.
+ dig +short +dnssec ANY "$TARGET" > "$tmpf"
+ if [ ! -s "$tmpf" ]; then
+ palhm_die \
+ "The nameserver returned no RR!
+DNSSEC verification probably failed."
+ fi
+}
+
+if [ "$#" -lt 1 ]; then
+ cat >&2 << EOF
+The Periodic Automatic Linux Host Maintenance (PALHM) DNSSEC check
+Usage: $0 <record name>
+
+The zone must contain at least one resource record set. The nameservers
+configured for the host must support DNSSEC validation.
+
+To test your host configuration, running
+ \$ $0 dnssec-failed.org
+should produce error messages.
+EOF
+ exit 2
+fi
+
+declare TARGET="$1"
+declare tmpf="$(mktemp --tmpdir "palhm-dnssec.XXXXXXXXXX")"
+
+do_query & set +e
+wait -f "$!"
+ec="$?"
+rm "$tmpf"
+
+exit "$ec"
diff --git a/src/palhm.py b/src/palhm.py
new file mode 100755
index 0000000..f3f412b
--- /dev/null
+++ b/src/palhm.py
@@ -0,0 +1,131 @@
+#!/usr/bin/env python3
+import logging
+import sys
+from abc import ABC, abstractmethod
+from getopt import getopt
+
+import palhm
+
+
+class ProgConf:
+ conf = "/etc/palhm/palhm.conf"
+ cmd = None
+ override_vl = None
+ ctx = None
+
+ def alloc_ctx ():
+ ProgConf.ctx = palhm.setup_conf(palhm.load_conf(ProgConf.conf))
+ if not ProgConf.override_vl is None:
+ ProgConf.ctx.l.setLevel(ProgConf.override_vl)
+
+def err_unknown_cmd ():
+ sys.stderr.write("Unknown command. Run '" + sys.argv[0] + " help' for usage.\n")
+ exit(2)
+
+class Cmd (ABC):
+ @abstractmethod
+ def do_cmd (self):
+ ...
+
+class ConfigCmd (Cmd):
+ def __init__ (self, *args, **kwargs):
+ pass
+
+ def do_cmd (self):
+ ProgConf.alloc_ctx()
+ print(ProgConf.ctx)
+ return 0
+
+ def print_help ():
+ print(
+"Usage: " + sys.argv[0] + " config" + '''
+Load and parse config. Print the structure to stdout.''')
+
+class RunCmd (Cmd):
+ def __init__ (self, optlist, args):
+ self.optlist = optlist
+ self.args = args
+
+ def do_cmd (self):
+ ProgConf.alloc_ctx()
+
+ if self.args:
+ task = self.args[0]
+ else:
+ task = palhm.DEFAULT.RUN_TASK.value
+
+ ProgConf.ctx.task_map[task].run(ProgConf.ctx)
+
+ return 0
+
+ def print_help ():
+ print(
+"Usage: " + sys.argv[0] + " run [TASK]" + '''
+Run a task in config. Run the "''' + palhm.DEFAULT.RUN_TASK.value +
+'''" task if [TASK] is not specified.''')
+
+class HelpCmd (Cmd):
+ def __init__ (self, optlist, args):
+ self.optlist = optlist
+ self.args = args
+
+ def do_cmd (self):
+ if len(self.args) >= 2:
+ if not args[0] in CmdMap:
+ err_unknown_cmd()
+ else:
+ CmdMap[self.args[0]].print_help()
+ else:
+ HelpCmd.print_help()
+
+ return 0
+
+ def print_help ():
+ print(
+"Usage: " + sys.argv[0] + " [options] CMD [command options ...]" + '''
+Options:
+ -q Set the verbosity level to 0(FATAL error only). Overrides config
+ -v Increase the verbosity level by 1. Overrides config
+ -f FILE Load config from FILE instead of the hard-coded default
+Config: ''' + ProgConf.conf + '''
+Commands:
+ run run a task
+ config load config and print the contents
+ help [CMD] print this message and exit normally if [CMD] is not specified.
+ Print usage of [CMD] otherwise''')
+
+ return 0
+
+CmdMap = {
+ "config": ConfigCmd,
+ "run": RunCmd,
+ "help": HelpCmd
+}
+
+optlist, args = getopt(sys.argv[1:], "qvf:")
+optkset = set()
+for p in optlist:
+ optkset.add(p[0])
+
+if "-v" in optkset and "-q" in optkset:
+ sys.stderr.write("Options -v and -q cannot not used together.\n")
+ exit(2)
+
+if not args or not args[0] in CmdMap:
+ err_unknown_cmd()
+
+for p in optlist:
+ match p[0]:
+ case "-q": ProgConf.override_vl = logging.ERROR
+ case "-v":
+ if ProgConf.override_vl is None:
+ ProgConf.override_vl = palhm.DEFAULT.VL.value - 10
+ else:
+ ProgConf.override_vl -= 10
+ case "-f": ProgConf.conf = p[1]
+
+logging.basicConfig(format = "%(name)s %(message)s")
+
+ProgConf.cmd = CmdMap[args[0]](optlist, args)
+del args[0]
+exit(ProgConf.cmd.do_cmd())
diff --git a/src/palhm/__init__.py b/src/palhm/__init__.py
new file mode 100644
index 0000000..8c44ace
--- /dev/null
+++ b/src/palhm/__init__.py
@@ -0,0 +1,736 @@
+import io
+import json
+import logging
+import os
+import re
+import shutil
+import signal
+import subprocess
+from abc import ABC, abstractmethod
+from concurrent import futures
+from concurrent.futures import ThreadPoolExecutor
+from contextlib import contextmanager
+from copy import deepcopy
+from datetime import datetime, timezone
+from decimal import Decimal
+from enum import Enum
+from importlib import import_module
+from mailbox import FormatError
+from multiprocessing import ProcessError
+from typing import Iterable
+
+
+def default_workers ():
+ try:
+ return len(os.sched_getaffinity(0))
+ except NotImplementedError as e:
+ return os.cpu_count()
+
+class DEFAULT (Enum):
+ VL = logging.INFO
+ OBJ_GRP = "default"
+ NB_WORKERS = default_workers()
+ RUN_TASK = "default"
+
+def trans_vl (x: int) -> int:
+ return 50 - x * 10
+
+class ExecvHolder (ABC):
+ @abstractmethod
+ def get_argv (self) -> list:
+ ...
+ @abstractmethod
+ def get_env (self) -> dict:
+ ...
+
+class ValidObject (ABC):
+ @abstractmethod
+ def validate (self):
+ ...
+
+class GlobalContext:
+ def __init__ (self, jobj: dict):
+ self.nb_workers = jobj.get("nb-workers", 0)
+ if "vl" in jobj:
+ self.vl = trans_vl(jobj["vl"])
+ else:
+ self.vl = DEFAULT.VL.value
+ self.modules = {}
+ self.backup_backends = {
+ "null": NullBackupBackend,
+ "localfs": LocalfsBackupBackend
+ }
+ self.exec_map = {}
+ self.task_map = {}
+ self.l = logging.getLogger("palhm")
+ self.l.setLevel(self.vl)
+
+ if self.nb_workers == 0:
+ self.nb_workers = DEFAULT.NB_WORKERS.value
+ elif self.nb_workers < 0:
+ self.nb_workers = None
+
+ for m in jobj.get("modules", iter(())):
+ loaded = self.modules[m] = import_module("." + m, "palhm.mod")
+ intersect = set(self.backup_backends.keys()).intersection(loaded.backup_backends.keys())
+ if intersect:
+ raise RuntimeError("Backup Backend conflict detected. ID(s): " + intersect)
+ self.backup_backends |= loaded.backup_backends
+
+ def get_vl (self) -> int:
+ return self.vl
+
+ def get_nb_workers (self) -> int:
+ return self.nb_workers
+
+ def test_vl (self, x: int) -> bool:
+ return x <= self.get_vl()
+
+ def test_workers (self, n: int) -> bool:
+ return n <= self.nb_workers if n > 0 else True
+
+ def __str__ (self) -> str:
+ return "\n".join([
+ "nb_workers: " + str(self.nb_workers),
+ "vl: " + str(self.vl),
+ "modules: " + " ".join([ i for i in self.modules ]),
+ "backup_backends: " + " ".join([ i for i in self.backup_backends.keys() ]),
+ ("exec_map:\n" + "\n".join([ i[0] + ": " + str(i[1]) for i in self.exec_map.items() ])).replace("\n", "\n\t"),
+ ("task_map:\n" + "\n".join([ (i[0] + ":\n" + str(i[1])).replace("\n", "\n\t") for i in self.task_map.items() ])).replace("\n", "\n\t")
+ ]).replace("\t", " ")
+
+class Runnable (ABC):
+ @abstractmethod
+ def run (self, ctx: GlobalContext):
+ return self
+
+class Exec (Runnable, ExecvHolder):
+ class RE (Enum):
+ EC_INC_RANGE = re.compile('''([0-9]+)(?:\s+)?-(?:\s+)?([0-9]+)''')
+ EC_RANGE = re.compile('''(<|<=|>|>=|==)?(?:\s+)?([0-9]+)''')
+
+ class DEFAULT (Enum):
+ EC = range(0, 1)
+ VL_STDERR = logging.ERROR
+ VL_STDOUT = logging.INFO
+
+ def parse_ec (ec: str) -> range:
+ x = ec.strip()
+
+ m = re.match(Exec.RE.EC_INC_RANGE.value, x)
+ if m:
+ a = int(m[1])
+ b = int(m[2])
+ ret = range(a, b + 1)
+ if len(ret) == 0:
+ raise ValueError("Invalid range: " + ec)
+ return ret
+ m = re.match(Exec.RE.EC_RANGE.value, x)
+ if m:
+ op = str(m[1]) if m[1] else "=="
+ n = int(m[2])
+ match op:
+ case "==": return range(n, n + 1)
+ case "<": return range(0, n)
+ case "<=": return range(0, n + 1)
+ case ">": return range(n + 1, 256)
+ case ">=": return range(n, 256)
+ case _: raise RuntimeError("FIXME")
+
+ raise ValueError("Invalid value: " + ec)
+
+ def from_conf (ctx: GlobalContext, jobj: dict):
+ match jobj["type"]:
+ case "exec":
+ exec_id = jobj["exec-id"]
+ exec = ctx.exec_map[exec_id]
+ ret = exec
+ case "exec-append":
+ exec_id = jobj["exec-id"]
+ exec = ctx.exec_map[exec_id]
+ ret = exec.mkappend(jobj["argv"])
+ case "exec-inline":
+ ret = Exec(jobj)
+ # case _:
+ # raise RuntimeError("FIXME")
+
+ ret.vl_stderr = jobj.get("vl-stderr", ret.vl_stderr)
+ ret.vl_stdout = jobj.get("vl-stdout", ret.vl_stdout)
+
+ return ret
+
+ def __init__ (self, jobj: dict = None):
+ if jobj is None:
+ self.argv = []
+ self.env = {}
+ self.ec = Exec.DEFAULT.EC.value
+ self.vl_stderr = Exec.DEFAULT.VL_STDERR.value
+ self.vl_stdout = Exec.DEFAULT.VL_STDOUT.value
+ else:
+ self.argv = jobj["argv"]
+ self.env = jobj.get("env") or {}
+ self.ec = Exec.parse_ec(jobj.get("ec", "0"))
+ self.vl_stderr = jobj.get("vl-stderr", Exec.DEFAULT.VL_STDERR.value)
+ self.vl_stdout = jobj.get("vl-stdout", Exec.DEFAULT.VL_STDOUT.value)
+
+ def mkappend (self, extra_argv: Iterable):
+ ny = deepcopy(self)
+ ny.argv.extend(extra_argv)
+ return ny
+
+ def run (self, ctx: GlobalContext):
+ stdout = None if ctx.test_vl(self.vl_stdout) else subprocess.DEVNULL
+ stderr = None if ctx.test_vl(self.vl_stderr) else subprocess.DEVNULL
+ p = subprocess.run(
+ self.argv,
+ env = self.env,
+ stdout = stdout,
+ stderr = stderr)
+ self.raise_oob_ec(p.returncode)
+
+ return self
+
+ def get_argv (self) -> list:
+ return self.argv
+
+ def get_env (self) -> dict:
+ return self.env
+
+ def test_ec (self, ec: int) -> bool:
+ return ec in self.ec
+
+ def raise_oob_ec (self, ec: int):
+ if not self.test_ec(ec):
+ raise ProcessError(
+ str(self) + " returned " + str(ec) + " not in " + str(self.ec))
+
+ def __str__ (self) -> str:
+ return str().join(
+ [ i[0] + "=\"" + i[1] + "\" " for i in self.env.items() ] +
+ [ i + " " for i in self.argv ]).strip()
+
+class BackupBackend (ABC):
+ @contextmanager
+ def open (self, ctx: GlobalContext):
+ try:
+ yield self
+ self.rotate(ctx)
+ except:
+ self.rollback(ctx)
+ raise
+ finally:
+ self.close(ctx)
+
+ @abstractmethod
+ def rollback (self, ctx: GlobalContext):
+ ...
+ @abstractmethod
+ def close (self, ctx: GlobalContext):
+ ...
+ @abstractmethod
+ def sink (self, ctx: GlobalContext, path: str) -> Exec:
+ ...
+ @abstractmethod
+ def rotate (self, ctx: GlobalContext):
+ ...
+ @abstractmethod
+ def _fs_usage_info (self, ctx: GlobalContext) -> Iterable[tuple[str, int]]:
+ # return: copy path: du
+ ...
+ @abstractmethod
+ def _excl_fs_copies (self, ctx: GlobalContext) -> set[str]:
+ ...
+ @abstractmethod
+ def _rm_fs_recursive (self, ctx: GlobalContext, pl: Iterable[str]):
+ ...
+
+ def _logger (self, ctx: GlobalContext) -> logging.Logger:
+ name = "bb." + str(self)
+ return ctx.l.getChild(name)
+
+ @abstractmethod
+ def _fs_quota_target (self, ctx: GlobalContext) -> tuple[Decimal, Decimal]:
+ # return: nb_copies, tot_size
+ ...
+
+ def _do_fs_rotate (self, ctx: GlobalContext):
+ nb_copy_limit, root_size_limit = self._fs_quota_target(ctx)
+ dirs = self._fs_usage_info(ctx)
+ excl_copies = self._excl_fs_copies(ctx)
+ l = self._logger(ctx)
+
+ tot_size = 0
+ for i in dirs:
+ tot_size += i[1]
+
+ l.debug("du: tot_size=%u, nb_copies=%u" % (tot_size, len(dirs)))
+ if root_size_limit >= tot_size and nb_copy_limit >= len(dirs):
+ l.debug("no action required for rotation")
+ return
+
+ size_delta = tot_size - root_size_limit
+ dir_delta = len(dirs) - nb_copy_limit
+ del_size = 0
+ del_list = list[str]()
+ while dirs and (del_size < size_delta or len(del_list) < dir_delta):
+ p = dirs.pop(0)
+ if p[0] in excl_copies:
+ continue
+ del_list.append(p[0])
+ del_size += p[1]
+
+ l.debug("deemed expired: %u copies, totalling %u bytes" %
+ (len(del_list), del_size))
+
+ self._rm_fs_recursive(ctx, del_list)
+
+ def mkprefix_iso8601 (
+ timespec: str = "seconds",
+ tz: datetime.tzinfo = timezone.utc) -> str:
+ return datetime.now(tz).isoformat(timespec = timespec)
+
+class NullBackupBackend (BackupBackend):
+ def __init__ (self, *args, **kwargs):
+ pass
+
+ def rollback (self, ctx: GlobalContext):
+ pass
+
+ def close (self, ctx: GlobalContext):
+ pass
+
+ def sink (self, *args, **kwargs):
+ e = Exec()
+ e.argv = [ "/bin/cp", "/dev/stdin", "/dev/null" ]
+
+ return e
+
+ def rotate (self, ctx: GlobalContext):
+ pass
+
+ def __str__ (self):
+ return "null"
+
+class LocalfsBackupBackend (BackupBackend):
+ def __init__ (self, param: dict):
+ self.backup_root = param["root"]
+ self.mkprefix = BackupBackend.mkprefix_iso8601
+ self.nb_copy_limit = Decimal(param.get("nb-copy-limit", "Infinity"))
+ self.root_size_limit = Decimal(param.get("root-size-limit", "Infinity"))
+ self.dmode = int(param.get("dmode", "750"), 8)
+ self.fmode = int(param.get("fmode", "640"), 8)
+ self.cur_backup_path = None
+ self.sink_list = list[str]()
+
+ def open (self, ctx: GlobalContext):
+ self.cur_backup_path = os.sep.join([ self.backup_root, self.mkprefix() ])
+ os.makedirs(self.cur_backup_path, self.dmode)
+
+ return super().open(ctx)
+
+ def rollback (self, ctx: GlobalContext):
+ shutil.rmtree(self.cur_backup_path, ignore_errors = True)
+
+ def close (self, ctx: GlobalContext):
+ pass
+
+ def sink (self, ctx: GlobalContext, path: str) -> Exec:
+ path = os.sep.join([ self.cur_backup_path, path ])
+ os.makedirs(os.path.dirname(path), self.dmode, True)
+ self.sink_list.append(path)
+
+ e = Exec()
+ e.argv = [ "/bin/cp", "/dev/stdin", path ]
+
+ return e
+
+ def _fs_usage_info (self, ctx: GlobalContext) -> Iterable[tuple[str, int]]:
+ def get_name (entry: os.DirEntry) -> str:
+ return entry.name
+ ret = list[tuple[str, int]]()
+ dirs = LocalfsBackupBackend.get_dirs(self.backup_root)
+
+ dirs.sort(key = get_name)
+ for i in dirs:
+ e = (i.path, LocalfsBackupBackend.du(i.path))
+ ret.append(e)
+
+ return ret
+
+ def _rm_fs_recursive (self, ctx: GlobalContext, pl: Iterable[str]):
+ l = self._logger(ctx)
+
+ for i in pl:
+ l.debug("rm: " + i)
+ shutil.rmtree(i)
+
+ def _fs_quota_target (self, ctx: GlobalContext) -> tuple[Decimal, Decimal]:
+ return (self.nb_copy_limit, self.root_size_limit)
+
+ def _excl_fs_copies (self, ctx: GlobalContext) -> set[str]:
+ ret = set[str]()
+ ret.add(self.cur_backup_path)
+ return ret
+
+ def rotate (self, ctx: GlobalContext):
+ for i in self.sink_list:
+ os.chmod(i, self.fmode)
+ return super()._do_fs_rotate(ctx)
+
+ def __str__ (self):
+ return "localfs"
+
+ def du (path: str) -> int:
+ ret = 0
+ for root, dirs, files in os.walk(path):
+ for f in files:
+ p = os.path.join(root, f)
+ if os.path.islink(p):
+ continue
+ ret += os.path.getsize(p)
+
+ return ret
+
+ def get_dirs (path: str) -> list[os.DirEntry]:
+ ret = []
+ for i in os.scandir(path):
+ if not i.is_symlink() and i.is_dir():
+ ret.append(i)
+
+ return ret
+
+class BuiltinRunnable (Runnable, ValidObject):
+ def __init__ (self):
+ self.param = {}
+
+def parse_signals (x: Iterable) -> set:
+ ret = set()
+
+ for sig in x:
+ if sig.isnumeric():
+ ret.add(signal.Signals(int(sig)))
+ else:
+ sig = sig.upper()
+ if not sig.startswith("SIG"):
+ sig = "SIG" + sig
+ ret.add(signal.Signals.__members__[sig])
+
+ return ret
+
+class Sigmask (BuiltinRunnable):
+ VALID_ACTIONS = { "block": signal.SIG_BLOCK, "unblock": signal.SIG_UNBLOCK }
+
+ def __init__ (self, param: dict):
+ self.param = param
+
+ def validate (self):
+ for i in self.param:
+ self.VALID_ACTIONS[i["action"].lower()]
+ parse_signals(i["sig"])
+
+ return self
+
+ def run (self, ctx: GlobalContext):
+ for i in self.param:
+ signal.pthread_sigmask(
+ self.VALID_ACTIONS[i["action"].lower()],
+ parse_signals(i["sig"]))
+
+ return self
+
+ def __str__ (self) -> str:
+ return "sigmask(" + str(self.param) + ")"
+
+BuiltinRunMap = {
+ "sigmask": Sigmask
+}
+
+class Task (Runnable):
+ ...
+
+class RoutineTask (Task):
+ def __init__ (self, ctx: GlobalContext, jobj: dict):
+ self.l = ctx.l.getChild("RoutineTask@" + jobj.get("id", hex(id(self))))
+ self.routines = [] # Should hold Runnables
+
+ for i in jobj["routine"]:
+ type_str = i["type"]
+
+ if type_str.startswith("exec"):
+ r = Exec.from_conf(ctx, i)
+ elif type_str == "task":
+ r = ctx.task_map[i["task-id"]]
+ elif type_str == "builtin":
+ r = BuiltinRunMap[i["builtin-id"]](i["param"])
+ else:
+ raise RuntimeError("FIXME")
+
+ self.routines.append(r)
+
+ def run (self, ctx: GlobalContext):
+ for r in self.routines:
+ self.l.debug("run: " + str(r))
+ p = r.run(ctx)
+ return self
+
+ def __str__ (self) -> str:
+ return "\n".join([ str(i) for i in self.routines ])
+
+class BackupObject (Runnable):
+ def __init__ (
+ self,
+ jobj: dict,
+ ctx: GlobalContext):
+ self.pipeline = []
+ self.path = jobj["path"]
+ self.bbctx = None
+
+ for e in jobj["pipeline"]:
+ ny_exec = Exec.from_conf(ctx, e)
+ self.pipeline.append(ny_exec)
+
+ def run (self, ctx: GlobalContext):
+ last_stdio = subprocess.DEVNULL # Just in case the pipeline is empty
+ pmap = {}
+
+ for eh in self.pipeline:
+ p = subprocess.Popen(
+ args = eh.argv,
+ stdin = last_stdio,
+ stdout = subprocess.PIPE,
+ stderr = None if ctx.test_vl(eh.vl_stderr) else subprocess.DEVNULL,
+ env = eh.env)
+ pmap[eh] = p
+ last_stdio = p.stdout
+
+ sink_exec = self.bbctx.sink(ctx, self.path)
+ sink_p = subprocess.Popen(
+ args = sink_exec.argv,
+ stdin = last_stdio,
+ stdout = None if ctx.test_vl(sink_exec.vl_stdout) else subprocess.DEVNULL,
+ stderr = None if ctx.test_vl(sink_exec.vl_stderr) else subprocess.DEVNULL,
+ env = sink_exec.env)
+ pmap[sink_exec] = sink_p
+
+ for eh in pmap:
+ p = pmap[eh]
+ ec = p.wait()
+ eh.raise_oob_ec(ec)
+
+ return self
+
+ def __str__ (self):
+ return " | ".join([ str(i) for i in self.pipeline ]) + " > " + self.path
+
+class BackupObjectGroup:
+ def __init__ (self):
+ self.depends = set()
+ self.objects = []
+
+class DepResolv:
+ def __init__ (self):
+ self.obj_dep_map = {}
+ self.dep_obj_map = {}
+ self.avail_q = []
+
+ def build (og_map: dict):
+ def dive (og: BackupObjectGroup, obj_set: set, recurse_path: set):
+ if og in recurse_path:
+ raise RecursionError("Circular reference detected.")
+ recurse_path.add(og)
+
+ obj_set.update(og.objects)
+ for dep_og in og.depends:
+ dive(dep_og, obj_set, recurse_path)
+
+ ret = DepResolv()
+
+ for gid in og_map:
+ og = og_map[gid]
+ if og.depends:
+ dep_objs = set()
+ recurse_path = set()
+ for dep_og in og.depends:
+ dive(dep_og, dep_objs, recurse_path)
+
+ for obj in og.objects:
+ if obj in ret.obj_dep_map:
+ s = ret.obj_dep_map[obj]
+ else:
+ s = ret.obj_dep_map[obj] = set()
+ s.update(dep_objs)
+ for obj in dep_objs:
+ if obj in ret.dep_obj_map:
+ s = ret.dep_obj_map[obj]
+ else:
+ s = ret.dep_obj_map[obj] = set()
+ s.update(og.objects)
+ else:
+ ret.avail_q.extend(og.objects)
+
+ return ret
+
+ def mark_fulfilled (self, obj):
+ if obj in self.dep_obj_map:
+ dep_s = self.dep_obj_map[obj]
+ del self.dep_obj_map[obj]
+
+ for dep in dep_s:
+ obj_s = self.obj_dep_map[dep]
+ obj_s.remove(obj)
+ if not obj_s:
+ del self.obj_dep_map[dep]
+ self.avail_q.append(dep)
+
+ return self
+
+ def __str__ (self):
+ def enclosed (self, o: BackupObject, sb: list, l: int):
+ sb.append("\t" * l + o.path)
+ for i in self.obj_dep_map.get(o, iter(())):
+ enclosed(self, i, sb, l + 1)
+
+ sb = []
+
+ for i in self.obj_dep_map.keys():
+ enclosed(self, i, sb, 0)
+
+ return "\n".join(sb)
+
+class BackupTask (Task):
+ def __init__ (self, ctx: GlobalContext, jobj: dict):
+ og_map = {}
+ jobj_ogrps = jobj["object-groups"]
+ jobj_list = jobj["objects"]
+ obj_path_set = set()
+
+ self.l = ctx.l.getChild("BackupTask@" + jobj.get("id", hex(id(self))))
+ self.bb = ctx.backup_backends[jobj["backend"]](jobj.get("backend-param"))
+
+ # check for dup ids
+ for og in jobj_ogrps:
+ ogid = og["id"]
+ if ogid in og_map:
+ raise KeyError("Duplicate object group: " + ogid)
+ og_map[ogid] = BackupObjectGroup()
+
+ # load depends
+ for og in jobj_ogrps:
+ ogid = og["id"]
+ for depend in og.get("depends", iter(())):
+ if ogid == depend:
+ raise ReferenceError(
+ "An object group dependent on itself: " + ogid)
+ og_map[ogid].depends.add(og_map[depend])
+
+ # implicit default
+ if not DEFAULT.OBJ_GRP.value in og_map:
+ og_map[DEFAULT.OBJ_GRP.value] = BackupObjectGroup()
+
+ # load objs
+ for jo in jobj_list:
+ path = jo["path"]
+ gid = jo.get("group", DEFAULT.OBJ_GRP.value)
+
+ if path in obj_path_set:
+ raise KeyError("Duplicate path: " + path)
+ obj_path_set.add(path)
+ og_map[gid].objects.append(BackupObject(jo, ctx))
+
+ self.dep_tree = DepResolv.build(og_map)
+
+ def run (self, ctx: GlobalContext):
+ fs = set()
+
+ with (self.bb.open(ctx) as bbctx,
+ ThreadPoolExecutor(max_workers = ctx.nb_workers) as th_pool):
+ while self.dep_tree.avail_q or self.dep_tree.obj_dep_map:
+ if not fs and not self.dep_tree.avail_q:
+ # No despatched task units, but DepResolv won't return more work
+ raise RuntimeError("Invalid dependancy tree!")
+
+ for bo in self.dep_tree.avail_q:
+ bo.bbctx = bbctx
+ self.l.debug("despatch: " + str(bo))
+ fs.add(th_pool.submit(bo.run, ctx))
+ self.dep_tree.avail_q.clear()
+
+ f_ret = futures.wait(
+ fs = fs,
+ return_when = futures.FIRST_COMPLETED)
+ for f in f_ret[0]:
+ r = f.result()
+ self.l.debug("reap: " + str(bo))
+ self.dep_tree.mark_fulfilled(r)
+ fs.difference_update(f_ret[0])
+
+ for f in fs:
+ self.dep_tree.mark_fulfilled(f.result())
+
+ return self
+
+ def __str__ (self):
+ return "bb: " + str(self.bb) + "\n" + ("obj_dep_tree:\n" + str(self.dep_tree).strip()).replace("\n", "\n\t")
+
+TaskClassMap = {
+ "backup": BackupTask,
+ "routine": RoutineTask
+}
+
+def merge_conf (a: dict, b: dict) -> dict:
+ def chk_dup_id (key, a: dict, b: dict):
+ c = set(i["id"] for i in a.get(key, iter(()))).intersection(
+ set(i["id"] for i in b.get(key, iter(()))))
+ return c
+
+ # exec conflicts
+ c = chk_dup_id("execs", a, b)
+ if c:
+ raise KeyError("Dup execs: " + c)
+ # task conflicts
+ c = chk_dup_id("tasks", a, b)
+ if c:
+ raise KeyError("Dup tasks: " + c)
+
+ return a | b
+
+def load_jsonc (path: str) -> dict:
+ with open(path) as in_file:
+ p = subprocess.run(
+ [ "/bin/json_reformat" ],
+ stdin = in_file,
+ capture_output = True)
+ if p.returncode != 0:
+ raise FormatError(path)
+
+ return json.load(io.BytesIO(p.stdout))
+
+def load_conf (path: str, inc_set: set = set()) -> dict:
+ if path in inc_set:
+ raise ReferenceError("Config included multiple times: " + path)
+ inc_set.add(path)
+
+ if path.endswith(".jsonc"):
+ jobj = load_jsonc(path)
+ else:
+ with open(path) as file:
+ jobj = json.load(file)
+
+ # TODO: do schema validation
+
+ for i in jobj.get("include", iter(())):
+ inc_conf = load_conf(i, inc_set)
+ jobj = merge_conf(jobj, inc_conf)
+
+ return jobj
+
+def setup_conf (jobj: dict) -> GlobalContext:
+ ret = GlobalContext(jobj)
+
+ for i in jobj.get("execs", iter(())):
+ ret.exec_map[i["id"]] = Exec(i)
+
+ for i in jobj["tasks"]:
+ ret.task_map[i["id"]] = TaskClassMap[i["type"]](ret, i)
+
+ return ret
diff --git a/src/palhm/mod/__init__.py b/src/palhm/mod/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/palhm/mod/__init__.py
diff --git a/src/palhm/mod/aws.py b/src/palhm/mod/aws.py
new file mode 100644
index 0000000..fcb16f1
--- /dev/null
+++ b/src/palhm/mod/aws.py
@@ -0,0 +1,229 @@
+from concurrent.futures import ThreadPoolExecutor, Future
+from decimal import Decimal
+from enum import Enum
+from time import sleep
+from typing import Callable, Iterable
+
+import boto3
+import botocore
+from palhm import BackupBackend, Exec, GlobalContext
+
+
+class CONST (Enum):
+ AWSCLI = "/bin/aws"
+
+def mks3objkey (keys: Iterable[str]) -> str:
+ ret = "/".join(keys)
+ return ret.strip("/")
+
+def mks3uri (bucket: str, keys: Iterable[str]) -> str:
+ return "s3://" + bucket + "/" + "/".join(keys)
+
+class S3BackupBackend (BackupBackend):
+ def __init__ (self, param: dict):
+ self.profile = param.get("profile", "default")
+ self.bucket = param["bucket"]
+ self.root_key = mks3objkey([param["root"]])
+ self.mkprefix = BackupBackend.mkprefix_iso8601
+ self.nb_copy_limit = Decimal(param.get("nb-copy-limit", "Infinity"))
+ self.root_size_limit = Decimal(param.get("root-size-limit", "Infinity"))
+ self.cur_backup_uri = None
+ self.cur_backup_key = None
+ self.sc_sink = param.get("sink-storage-class")
+ self.sc_rot = param.get("rot-storage-class")
+ self.client = None
+ self.sink_list = list[str]()
+
+ def _setup_cur_backup (self, ctx: GlobalContext):
+ self.cur_backup_key = mks3objkey([self.root_key, self.mkprefix()])
+ self.cur_backup_uri = mks3uri(self.bucket, [self.cur_backup_key])
+
+ def open (self, ctx: GlobalContext):
+ self.client = boto3.Session(profile_name = self.profile).client("s3")
+
+ try:
+ for i in range(0, 2):
+ self._setup_cur_backup(ctx)
+ # This should raise
+ self.client.head_object(
+ Bucket = self.bucket,
+ Key = self.cur_backup_key)
+ sleep(1)
+ # Make sure we don't proceed
+ raise FileExistsError(self.cur_backup_uri)
+ except botocore.exceptions.ClientError as e:
+ if e.response["Error"]["Code"] != "404": # expected status code
+ raise
+
+ return super().open(ctx)
+
+ def _cleanup_multiparts (self, ctx: GlobalContext) -> bool:
+ def do_abort (e):
+ try:
+ self.client.abort_multipart_upload(
+ Bucket = self.bucket,
+ Key = e["Key"],
+ UploadId = e["UploadId"])
+ except: pass
+
+ cont = None
+ fl = list[Future]()
+
+ with ThreadPoolExecutor(max_workers = ctx.nb_workers) as th_pool:
+ while True:
+ if cont:
+ r = self.client.list_multipart_uploads(
+ Bucket = self.bucket,
+ Prefix = self.cur_backup_key,
+ KeyMarker = cont[0],
+ UploadIdMarker = cont[1])
+ else:
+ r = self.client.list_multipart_uploads(
+ Bucket = self.bucket,
+ Prefix = self.cur_backup_key)
+
+ for i in r.get("Uploads", iter(())):
+ fl.append(th_pool.submit(do_abort, i))
+ for i in fl:
+ i.result()
+ fl.clear()
+
+ if r["IsTruncated"]:
+ cont = (r["NextKeyMarker"], r["UploadIdMarker"])
+ else:
+ break
+
+ def _foreach_objs (self, ctx: GlobalContext, prefix: str, cb: Callable):
+ cont_token = None
+
+ if not prefix.endswith("/"): prefix += "/"
+
+ while True:
+ if cont_token:
+ r = self.client.list_objects_v2(
+ Bucket = self.bucket,
+ Prefix = prefix,
+ ContinuationToken = cont_token)
+ else:
+ r = self.client.list_objects_v2(
+ Bucket = self.bucket,
+ Prefix = prefix)
+
+ for i in r["Contents"]:
+ cb(i)
+
+ if r["IsTruncated"]:
+ cont_token = r["NextContinuationToken"]
+ else:
+ break
+
+ def _fs_usage_info (self, ctx: GlobalContext) -> Iterable[tuple[str, int]]:
+ du_map = dict[str, int]()
+ ret = list[tuple[str, int]]()
+ prefix = self.root_key + "/"
+ def cb (i):
+ o_key = i["Key"]
+ o_size = i.get("Size", 0)
+ if not o_key.startswith(self.root_key):
+ raise RuntimeError("The endpoint returned an object " +
+ "irrelevant to the request: " + o_key)
+
+ l = o_key.find("/", len(prefix))
+ if l >= 0:
+ o_backup = o_key[:l]
+ else:
+ return
+
+ du_map[o_backup] = du_map.get(o_backup, 0) + o_size
+
+ self._foreach_objs(ctx, prefix, cb)
+ for i in sorted(du_map.keys()):
+ ret.append((i, du_map[i]))
+
+ return ret
+
+ def _excl_fs_copies (self, ctx: GlobalContext) -> set[str]:
+ ret = set[str]()
+ ret.add(self.cur_backup_key)
+ return ret
+
+ def _rm_fs_recursive (self, ctx: GlobalContext, pl: Iterable[str]):
+ l = self._logger(ctx)
+
+ with ThreadPoolExecutor(max_workers = ctx.nb_workers) as th_pool:
+ fl = list[Future]()
+
+ for i in pl:
+ e = Exec()
+ e.argv = [
+ CONST.AWSCLI.value,
+ "--profile=" + self.profile,
+ "s3",
+ "rm",
+ "--quiet",
+ "--recursive",
+ mks3uri(self.bucket, [i]) ]
+ l.debug("run: " + str(e))
+ fl.append(th_pool.submit(e.run, ctx))
+ for i in fl:
+ i.result()
+
+ def _fs_quota_target (self, ctx: GlobalContext) -> tuple[Decimal, Decimal]:
+ return (self.nb_copy_limit, self.root_size_limit)
+
+ def rollback (self, ctx: GlobalContext):
+ if not self.cur_backup_uri is None:
+ self._rm_fs_recursive(ctx, [self.cur_backup_uri])
+
+ def close (self, ctx: GlobalContext):
+ self._cleanup_multiparts(ctx)
+
+ def sink (self, ctx: GlobalContext, path: str) -> Exec:
+ l = self._logger(ctx)
+
+ e = Exec()
+ e.argv = [
+ CONST.AWSCLI.value,
+ "--profile=" + self.profile,
+ "s3",
+ "cp",
+ "--only-show-errors" ]
+ if self.sc_sink:
+ e.argv.append("--storage-class=" + self.sc_sink)
+ e.argv.extend(["-", "/".join([self.cur_backup_uri, path])])
+
+ l.debug("sink: " + str(e))
+ self.sink_list.append(mks3objkey([self.cur_backup_key, path]))
+
+ return e
+
+ def rotate (self, ctx: GlobalContext):
+ ret = super()._do_fs_rotate(ctx)
+
+ if self.sc_rot:
+ def chsc (k):
+ self.client.copy_object(
+ Bucket = self.bucket,
+ CopySource = mks3objkey([self.bucket, k]),
+ Key = k,
+ MetadataDirective = "COPY",
+ StorageClass = self.sc_rot)
+
+ with ThreadPoolExecutor(max_workers = ctx.nb_workers) as th_pool:
+ l = self._logger(ctx)
+ fl = list[Future]()
+
+ for i in self.sink_list:
+ l.debug("chsc: %s %s" % (self.sc_rot, i))
+ fl.append(th_pool.submit(chsc, i))
+ for i in fl:
+ i.result()
+
+ return ret
+
+ def __str__ (self):
+ return "aws-s3"
+
+backup_backends = {
+ "aws-s3": S3BackupBackend
+}