diff options
author | David Timber <dxdt@dev.snart.me> | 2022-04-27 17:40:41 +0800 |
---|---|---|
committer | David Timber <dxdt@dev.snart.me> | 2022-04-27 17:40:41 +0800 |
commit | a01c87416b241315a9268bb4eb5206ade8328069 (patch) | |
tree | 2aff1601c93d20c88ec505b8bc183b9fcbb847dd /src |
Initial commit
Diffstat (limited to 'src')
-rw-r--r-- | src/.gitignore | 1 | ||||
-rw-r--r-- | src/conf/py-debug/aws.sample.jsonc | 160 | ||||
-rw-r--r-- | src/conf/py-debug/conf.d/core.jsonc | 44 | ||||
-rw-r--r-- | src/conf/py-debug/localfs.sample.jsonc | 157 | ||||
-rw-r--r-- | src/conf/py-debug/null.sample.jsonc | 140 | ||||
-rwxr-xr-x | src/palhm-dnssec-check.sh | 43 | ||||
-rwxr-xr-x | src/palhm.py | 131 | ||||
-rw-r--r-- | src/palhm/__init__.py | 736 | ||||
-rw-r--r-- | src/palhm/mod/__init__.py | 0 | ||||
-rw-r--r-- | src/palhm/mod/aws.py | 229 |
10 files changed, 1641 insertions, 0 deletions
diff --git a/src/.gitignore b/src/.gitignore new file mode 100644 index 0000000..bee8a64 --- /dev/null +++ b/src/.gitignore @@ -0,0 +1 @@ +__pycache__ diff --git a/src/conf/py-debug/aws.sample.jsonc b/src/conf/py-debug/aws.sample.jsonc new file mode 100644 index 0000000..46ad562 --- /dev/null +++ b/src/conf/py-debug/aws.sample.jsonc @@ -0,0 +1,160 @@ +// PALHM Instance Config +{ + "include": [ "conf/py-debug/conf.d/core.jsonc" ], + "modules": [ "aws" ], + "nb-workers": 0, // assumed $(nproc) - default + // "nb-workers": 1, // to disable concurrent task despatch + // To unlimit the number of workers. + // Does not fail on resource alloc failure. + // "nb-workers": -1, + "vl": 4, + "tasks": [ + { + "id": "backup", + "type": "backup", + "backend": "aws-s3", + "backend-param": { + // "profile": "default", + "bucket": "palhm.test", + "root": "/palhm/backup", + "prefix": { + "type": "default" + // "type": "iso8601", + // "timespec": "seconds", + // "tz": "utc" + }, + // "sink-storage-class": "STANDARD_IA", + // "rot-storage-class": "ONEZONE_IA", + "nb-copy-limit": 2, // or Infinity assumed(not in JSON spec) + "root-size-limit": "Infinity" // or Infinity assumed + }, + "object-groups": [ + { "id": "pre-start" }, + { + "id": "data-dump", + "depends": [ "pre-start" ] + }, + { + "id": "tar-media-0", + "depends": [ "data-dump" ] + }, + { + "id": "tar-media-1", + "depends": [ "data-dump" ] + } + ], + "objects": [ + { + "path": "pm-list.gz", + "group": "pre-start", + "pipeline": [ + { "type": "exec", "exec-id": "dnf-list-installed" }, + { "type": "exec", "exec-id": "filter-gzip-plain" } + ] + }, + { + "path": "lsblk.json.gz", + "group": "pre-start", + "pipeline": [ + { + "type": "exec-append", + "exec-id": "lsblk-all-json", + "argv": [ "-a" ] + }, + { "type": "exec", "exec-id": "filter-gzip-plain" } + ] + }, + { + "path": "random-dump.sql.xz", + "group": "data-dump", + "pipeline": [ + { + "type": "exec-inline", + "argv": [ + "/bin/dd", + "if=/dev/urandom", + "bs=4096", + "count=512", + "status=none" + ] + }, + { "type": "exec", "exec-id": "filter-xz-parallel" } + ] + }, + { + "path": "random-dump.0.xz", + "group": "tar-media-0", + "pipeline": [ + { + "type": "exec-inline", + "argv": [ + "/bin/dd", + "if=/dev/zero", + "bs=4096", + "count=512", + "status=none" + ] + }, + { "type": "exec", "exec-id": "filter-xz-parallel" } + ] + }, + { + "path": "random-dump.1.xz", + "group": "tar-media-1", + "pipeline": [ + { + "type": "exec-inline", + "argv": [ + "/bin/dd", + "if=/dev/zero", + "bs=4096", + "count=512", + "status=none" + ] + }, + { "type": "exec", "exec-id": "filter-xz-parallel" } + ] + } + ] + }, + { + "id": "update", + "type": "routine", + "routine": [ + { + "type": "exec-inline", + "argv": [ "/bin/echo", "0" ] + }, + { + "type": "exec-inline", + "argv": [ "/bin/sleep", "1" ] + }, + { + "type": "exec-inline", + "argv": [ "/bin/echo", "1" ] + } + ] + }, + { + "id": "default", + "type": "routine", + "routine": [ + { "type": "task", "task-id": "backup" }, + { "type": "task", "task-id": "update" }, + { + // Block SIGTERM from systemd/init.d so the program is not + // affected by the reboot command. + "type": "builtin", + "builtin-id": "sigmask", + "param": [ + { "action": "block", "sig": [ "TERM" ] } + ] + }, + { + "type": "exec-inline", + "argv": [ "/bin/true" ] + } + ] + } + ] +} diff --git a/src/conf/py-debug/conf.d/core.jsonc b/src/conf/py-debug/conf.d/core.jsonc new file mode 100644 index 0000000..4afe7f5 --- /dev/null +++ b/src/conf/py-debug/conf.d/core.jsonc @@ -0,0 +1,44 @@ +// PALHM Core Config +{ + "execs": [ + // { + // "id": "Exec ID", + // "argv": [ "cmd", "--option1=opt1_val", "-o", "opt2_val" ], + // "env": { "NAME": "VAL" }, + // "ec": "0", // this is assumed + // "ec": "0-127", // inclusive range (not terminated by a signal) + // "ec": "<1", // range (only 0) + // "ec": "<=1", // range (0 and 1) + // "ec": ">0", // range (always fail) + // "ec": ">=0", // range (only 0) + // "vl-stderr": 1 // verbosity level of stderr produced by this process + // verbosity level of stderr produced by this process. Ignored if used + // as part of pipeline + // "vl-stdout": 2 + // }, + { + "id": "tar", + "argv": [ "/usr/bin/tar", "--xattrs", "--selinux" ] + }, + { + "id": "filter-xz-parallel", + "argv": [ "/usr/bin/xz", "-T0" ] + }, + { + "id": "filter-gzip-plain", + "argv": [ "/usr/bin/gzip" ] + }, + { + "id": "filter-zstd-plain", + "argv": [ "/usr/bin/zstd" ] + }, + { + "id": "dnf-list-installed", + "argv": [ "/usr/bin/dnf", "-yq", "list", "installed" ] + }, + { + "id": "lsblk-all-json", + "argv": [ "/usr/bin/lsblk", "-JbOa" ] + } + ] +} diff --git a/src/conf/py-debug/localfs.sample.jsonc b/src/conf/py-debug/localfs.sample.jsonc new file mode 100644 index 0000000..ec12808 --- /dev/null +++ b/src/conf/py-debug/localfs.sample.jsonc @@ -0,0 +1,157 @@ +// PALHM Instance Config +{ + "include": [ "conf/py-debug/conf.d/core.jsonc" ], + "nb-workers": 0, // assumed $(nproc) - default + // "nb-workers": 1, // to disable concurrent task despatch + // To unlimit the number of workers. + // Does not fail on resource alloc failure. + // "nb-workers": -1, + "vl": 4, + "tasks": [ + { + "id": "backup", + "type": "backup", + "backend": "localfs", + "backend-param": { + "root": "/var/tmp/palhm-backup-root", + "prefix": { + "type": "default" + // "type": "iso8601", + // "timespec": "seconds", + // "tz": "utc" + }, + // "dmode": "755", + // "fmode": "644", + "nb-copy-limit": 2, + "root-size-limit": "Infinity" + }, + "object-groups": [ + { "id": "pre-start" }, + { + "id": "data-dump", + "depends": [ "pre-start" ] + }, + { + "id": "tar-media-0", + "depends": [ "data-dump" ] + }, + { + "id": "tar-media-1", + "depends": [ "data-dump" ] + } + ], + "objects": [ + { + "path": "pm-list.gz", + "group": "pre-start", + "pipeline": [ + { "type": "exec", "exec-id": "dnf-list-installed" }, + { "type": "exec", "exec-id": "filter-gzip-plain" } + ] + }, + { + "path": "lsblk.json.gz", + "group": "pre-start", + "pipeline": [ + { + "type": "exec-append", + "exec-id": "lsblk-all-json", + "argv": [ "-a" ] + }, + { "type": "exec", "exec-id": "filter-gzip-plain" } + ] + }, + { + "path": "random-dump.sql.xz", + "group": "data-dump", + "pipeline": [ + { + "type": "exec-inline", + "argv": [ + "/bin/dd", + "if=/dev/urandom", + "bs=4096", + "count=512", + "status=none" + ] + }, + { "type": "exec", "exec-id": "filter-xz-parallel" } + ] + }, + { + "path": "random-dump.0.xz", + "group": "tar-media-0", + "pipeline": [ + { + "type": "exec-inline", + "argv": [ + "/bin/dd", + "if=/dev/zero", + "bs=4096", + "count=512", + "status=none" + ] + }, + { "type": "exec", "exec-id": "filter-xz-parallel" } + ] + }, + { + "path": "random-dump.1.xz", + "group": "tar-media-1", + "pipeline": [ + { + "type": "exec-inline", + "argv": [ + "/bin/dd", + "if=/dev/zero", + "bs=4096", + "count=512", + "status=none" + ] + }, + { "type": "exec", "exec-id": "filter-xz-parallel" } + ] + } + ] + }, + { + "id": "update", + "type": "routine", + "routine": [ + { + "type": "exec-inline", + "argv": [ "/bin/echo", "0" ] + }, + { + "type": "exec-inline", + "argv": [ "/bin/sleep", "1" ] + }, + { + "type": "exec-inline", + "argv": [ "/bin/echo", "1" ] + } + ] + }, + { + "id": "default", + "type": "routine", + "routine": [ + { "type": "task", "task-id": "backup" }, + { "type": "task", "task-id": "update" }, + { + // Block SIGTERM from systemd/init.d so the program is not + // affected by the reboot command. + "type": "builtin", + "builtin-id": "sigmask", + "param": [ + { "action": "block", "sig": [ "TERM" ] } + ] + }, + { + "type": "exec-inline", + "argv": [ "/bin/true" ] + } + ] + } + ] +} diff --git a/src/conf/py-debug/null.sample.jsonc b/src/conf/py-debug/null.sample.jsonc new file mode 100644 index 0000000..a83de95 --- /dev/null +++ b/src/conf/py-debug/null.sample.jsonc @@ -0,0 +1,140 @@ +// PALHM Instance Config +{ + "include": [ "conf/py-debug/conf.d/core.jsonc" ], + "nb-workers": 1, + "vl": 3, + "tasks": [ + { + "id": "backup", + "type": "backup", + "backend": "null", + "object-groups": [ + { "id": "pre-start" }, + { + "id": "data-dump", + "depends": [ "pre-start" ] + }, + { + "id": "tar-media-0", + "depends": [ "data-dump" ] + }, + { + "id": "tar-media-1", + "depends": [ "data-dump" ] + } + ], + "objects": [ + { + "path": "pm-list.gz", + "group": "pre-start", + "pipeline": [ + { "type": "exec", "exec-id": "dnf-list-installed" }, + { "type": "exec", "exec-id": "filter-gzip-plain" } + ] + }, + { + "path": "lsblk.json.gz", + "group": "pre-start", + "pipeline": [ + { + "type": "exec-append", + "exec-id": "lsblk-all-json", + "argv": [ "-a" ] + }, + { "type": "exec", "exec-id": "filter-gzip-plain" } + ] + }, + { + "path": "random-dump.sql.xz", + "group": "data-dump", + "pipeline": [ + { + "type": "exec-inline", + "argv": [ + "/bin/dd", + "if=/dev/urandom", + "bs=4096", + "count=512", + "status=none" + ] + }, + { "type": "exec", "exec-id": "filter-xz-parallel" } + ] + }, + { + "path": "random-dump.0.xz", + "group": "tar-media-0", + "pipeline": [ + { + "type": "exec-inline", + "argv": [ + "/bin/dd", + "if=/dev/zero", + "bs=4096", + "count=512", + "status=none" + ] + }, + { "type": "exec", "exec-id": "filter-xz-parallel" } + ] + }, + { + "path": "random-dump.1.xz", + "group": "tar-media-1", + "pipeline": [ + { + "type": "exec-inline", + "argv": [ + "/bin/dd", + "if=/dev/zero", + "bs=4096", + "count=512", + "status=none" + ] + }, + { "type": "exec", "exec-id": "filter-xz-parallel" } + ] + } + ] + }, + { + "id": "update", + "type": "routine", + "routine": [ + { + "type": "exec-inline", + "argv": [ "/bin/echo", "0" ] + }, + { + "type": "exec-inline", + "argv": [ "/bin/sleep", "1" ] + }, + { + "type": "exec-inline", + "argv": [ "/bin/echo", "1" ] + } + ] + }, + { + "id": "default", + "type": "routine", + "routine": [ + { "type": "task", "task-id": "backup" }, + { "type": "task", "task-id": "update" }, + { + // Block SIGTERM from systemd/init.d so the program is not + // affected by the reboot command. + "type": "builtin", + "builtin-id": "sigmask", + "param": [ + { "action": "block", "sig": [ "TERM" ] } + ] + }, + { + "type": "exec-inline", + "argv": [ "/bin/true" ] + } + ] + } + ] +} diff --git a/src/palhm-dnssec-check.sh b/src/palhm-dnssec-check.sh new file mode 100755 index 0000000..f5ee466 --- /dev/null +++ b/src/palhm-dnssec-check.sh @@ -0,0 +1,43 @@ +#!/bin/bash +set -e +. "$( dirname -- "${BASH_SOURCE[0]}" )"/common.sh + +do_query () { + # dig returns 0 upon successful reception and parse of the response message. + # All the other exit codes other than 0 will cause the script to terminate + # as a result of set -e. +short option makes dig return the values of RR. + # We assume that a status code has returned when dig produces no output with + # the option. Caution must be taken in this approach as zones with no + # record will also return nothing with the status code zero. + dig +short +dnssec ANY "$TARGET" > "$tmpf" + if [ ! -s "$tmpf" ]; then + palhm_die \ + "The nameserver returned no RR! +DNSSEC verification probably failed." + fi +} + +if [ "$#" -lt 1 ]; then + cat >&2 << EOF +The Periodic Automatic Linux Host Maintenance (PALHM) DNSSEC check +Usage: $0 <record name> + +The zone must contain at least one resource record set. The nameservers +configured for the host must support DNSSEC validation. + +To test your host configuration, running + \$ $0 dnssec-failed.org +should produce error messages. +EOF + exit 2 +fi + +declare TARGET="$1" +declare tmpf="$(mktemp --tmpdir "palhm-dnssec.XXXXXXXXXX")" + +do_query & set +e +wait -f "$!" +ec="$?" +rm "$tmpf" + +exit "$ec" diff --git a/src/palhm.py b/src/palhm.py new file mode 100755 index 0000000..f3f412b --- /dev/null +++ b/src/palhm.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python3 +import logging +import sys +from abc import ABC, abstractmethod +from getopt import getopt + +import palhm + + +class ProgConf: + conf = "/etc/palhm/palhm.conf" + cmd = None + override_vl = None + ctx = None + + def alloc_ctx (): + ProgConf.ctx = palhm.setup_conf(palhm.load_conf(ProgConf.conf)) + if not ProgConf.override_vl is None: + ProgConf.ctx.l.setLevel(ProgConf.override_vl) + +def err_unknown_cmd (): + sys.stderr.write("Unknown command. Run '" + sys.argv[0] + " help' for usage.\n") + exit(2) + +class Cmd (ABC): + @abstractmethod + def do_cmd (self): + ... + +class ConfigCmd (Cmd): + def __init__ (self, *args, **kwargs): + pass + + def do_cmd (self): + ProgConf.alloc_ctx() + print(ProgConf.ctx) + return 0 + + def print_help (): + print( +"Usage: " + sys.argv[0] + " config" + ''' +Load and parse config. Print the structure to stdout.''') + +class RunCmd (Cmd): + def __init__ (self, optlist, args): + self.optlist = optlist + self.args = args + + def do_cmd (self): + ProgConf.alloc_ctx() + + if self.args: + task = self.args[0] + else: + task = palhm.DEFAULT.RUN_TASK.value + + ProgConf.ctx.task_map[task].run(ProgConf.ctx) + + return 0 + + def print_help (): + print( +"Usage: " + sys.argv[0] + " run [TASK]" + ''' +Run a task in config. Run the "''' + palhm.DEFAULT.RUN_TASK.value + +'''" task if [TASK] is not specified.''') + +class HelpCmd (Cmd): + def __init__ (self, optlist, args): + self.optlist = optlist + self.args = args + + def do_cmd (self): + if len(self.args) >= 2: + if not args[0] in CmdMap: + err_unknown_cmd() + else: + CmdMap[self.args[0]].print_help() + else: + HelpCmd.print_help() + + return 0 + + def print_help (): + print( +"Usage: " + sys.argv[0] + " [options] CMD [command options ...]" + ''' +Options: + -q Set the verbosity level to 0(FATAL error only). Overrides config + -v Increase the verbosity level by 1. Overrides config + -f FILE Load config from FILE instead of the hard-coded default +Config: ''' + ProgConf.conf + ''' +Commands: + run run a task + config load config and print the contents + help [CMD] print this message and exit normally if [CMD] is not specified. + Print usage of [CMD] otherwise''') + + return 0 + +CmdMap = { + "config": ConfigCmd, + "run": RunCmd, + "help": HelpCmd +} + +optlist, args = getopt(sys.argv[1:], "qvf:") +optkset = set() +for p in optlist: + optkset.add(p[0]) + +if "-v" in optkset and "-q" in optkset: + sys.stderr.write("Options -v and -q cannot not used together.\n") + exit(2) + +if not args or not args[0] in CmdMap: + err_unknown_cmd() + +for p in optlist: + match p[0]: + case "-q": ProgConf.override_vl = logging.ERROR + case "-v": + if ProgConf.override_vl is None: + ProgConf.override_vl = palhm.DEFAULT.VL.value - 10 + else: + ProgConf.override_vl -= 10 + case "-f": ProgConf.conf = p[1] + +logging.basicConfig(format = "%(name)s %(message)s") + +ProgConf.cmd = CmdMap[args[0]](optlist, args) +del args[0] +exit(ProgConf.cmd.do_cmd()) diff --git a/src/palhm/__init__.py b/src/palhm/__init__.py new file mode 100644 index 0000000..8c44ace --- /dev/null +++ b/src/palhm/__init__.py @@ -0,0 +1,736 @@ +import io +import json +import logging +import os +import re +import shutil +import signal +import subprocess +from abc import ABC, abstractmethod +from concurrent import futures +from concurrent.futures import ThreadPoolExecutor +from contextlib import contextmanager +from copy import deepcopy +from datetime import datetime, timezone +from decimal import Decimal +from enum import Enum +from importlib import import_module +from mailbox import FormatError +from multiprocessing import ProcessError +from typing import Iterable + + +def default_workers (): + try: + return len(os.sched_getaffinity(0)) + except NotImplementedError as e: + return os.cpu_count() + +class DEFAULT (Enum): + VL = logging.INFO + OBJ_GRP = "default" + NB_WORKERS = default_workers() + RUN_TASK = "default" + +def trans_vl (x: int) -> int: + return 50 - x * 10 + +class ExecvHolder (ABC): + @abstractmethod + def get_argv (self) -> list: + ... + @abstractmethod + def get_env (self) -> dict: + ... + +class ValidObject (ABC): + @abstractmethod + def validate (self): + ... + +class GlobalContext: + def __init__ (self, jobj: dict): + self.nb_workers = jobj.get("nb-workers", 0) + if "vl" in jobj: + self.vl = trans_vl(jobj["vl"]) + else: + self.vl = DEFAULT.VL.value + self.modules = {} + self.backup_backends = { + "null": NullBackupBackend, + "localfs": LocalfsBackupBackend + } + self.exec_map = {} + self.task_map = {} + self.l = logging.getLogger("palhm") + self.l.setLevel(self.vl) + + if self.nb_workers == 0: + self.nb_workers = DEFAULT.NB_WORKERS.value + elif self.nb_workers < 0: + self.nb_workers = None + + for m in jobj.get("modules", iter(())): + loaded = self.modules[m] = import_module("." + m, "palhm.mod") + intersect = set(self.backup_backends.keys()).intersection(loaded.backup_backends.keys()) + if intersect: + raise RuntimeError("Backup Backend conflict detected. ID(s): " + intersect) + self.backup_backends |= loaded.backup_backends + + def get_vl (self) -> int: + return self.vl + + def get_nb_workers (self) -> int: + return self.nb_workers + + def test_vl (self, x: int) -> bool: + return x <= self.get_vl() + + def test_workers (self, n: int) -> bool: + return n <= self.nb_workers if n > 0 else True + + def __str__ (self) -> str: + return "\n".join([ + "nb_workers: " + str(self.nb_workers), + "vl: " + str(self.vl), + "modules: " + " ".join([ i for i in self.modules ]), + "backup_backends: " + " ".join([ i for i in self.backup_backends.keys() ]), + ("exec_map:\n" + "\n".join([ i[0] + ": " + str(i[1]) for i in self.exec_map.items() ])).replace("\n", "\n\t"), + ("task_map:\n" + "\n".join([ (i[0] + ":\n" + str(i[1])).replace("\n", "\n\t") for i in self.task_map.items() ])).replace("\n", "\n\t") + ]).replace("\t", " ") + +class Runnable (ABC): + @abstractmethod + def run (self, ctx: GlobalContext): + return self + +class Exec (Runnable, ExecvHolder): + class RE (Enum): + EC_INC_RANGE = re.compile('''([0-9]+)(?:\s+)?-(?:\s+)?([0-9]+)''') + EC_RANGE = re.compile('''(<|<=|>|>=|==)?(?:\s+)?([0-9]+)''') + + class DEFAULT (Enum): + EC = range(0, 1) + VL_STDERR = logging.ERROR + VL_STDOUT = logging.INFO + + def parse_ec (ec: str) -> range: + x = ec.strip() + + m = re.match(Exec.RE.EC_INC_RANGE.value, x) + if m: + a = int(m[1]) + b = int(m[2]) + ret = range(a, b + 1) + if len(ret) == 0: + raise ValueError("Invalid range: " + ec) + return ret + m = re.match(Exec.RE.EC_RANGE.value, x) + if m: + op = str(m[1]) if m[1] else "==" + n = int(m[2]) + match op: + case "==": return range(n, n + 1) + case "<": return range(0, n) + case "<=": return range(0, n + 1) + case ">": return range(n + 1, 256) + case ">=": return range(n, 256) + case _: raise RuntimeError("FIXME") + + raise ValueError("Invalid value: " + ec) + + def from_conf (ctx: GlobalContext, jobj: dict): + match jobj["type"]: + case "exec": + exec_id = jobj["exec-id"] + exec = ctx.exec_map[exec_id] + ret = exec + case "exec-append": + exec_id = jobj["exec-id"] + exec = ctx.exec_map[exec_id] + ret = exec.mkappend(jobj["argv"]) + case "exec-inline": + ret = Exec(jobj) + # case _: + # raise RuntimeError("FIXME") + + ret.vl_stderr = jobj.get("vl-stderr", ret.vl_stderr) + ret.vl_stdout = jobj.get("vl-stdout", ret.vl_stdout) + + return ret + + def __init__ (self, jobj: dict = None): + if jobj is None: + self.argv = [] + self.env = {} + self.ec = Exec.DEFAULT.EC.value + self.vl_stderr = Exec.DEFAULT.VL_STDERR.value + self.vl_stdout = Exec.DEFAULT.VL_STDOUT.value + else: + self.argv = jobj["argv"] + self.env = jobj.get("env") or {} + self.ec = Exec.parse_ec(jobj.get("ec", "0")) + self.vl_stderr = jobj.get("vl-stderr", Exec.DEFAULT.VL_STDERR.value) + self.vl_stdout = jobj.get("vl-stdout", Exec.DEFAULT.VL_STDOUT.value) + + def mkappend (self, extra_argv: Iterable): + ny = deepcopy(self) + ny.argv.extend(extra_argv) + return ny + + def run (self, ctx: GlobalContext): + stdout = None if ctx.test_vl(self.vl_stdout) else subprocess.DEVNULL + stderr = None if ctx.test_vl(self.vl_stderr) else subprocess.DEVNULL + p = subprocess.run( + self.argv, + env = self.env, + stdout = stdout, + stderr = stderr) + self.raise_oob_ec(p.returncode) + + return self + + def get_argv (self) -> list: + return self.argv + + def get_env (self) -> dict: + return self.env + + def test_ec (self, ec: int) -> bool: + return ec in self.ec + + def raise_oob_ec (self, ec: int): + if not self.test_ec(ec): + raise ProcessError( + str(self) + " returned " + str(ec) + " not in " + str(self.ec)) + + def __str__ (self) -> str: + return str().join( + [ i[0] + "=\"" + i[1] + "\" " for i in self.env.items() ] + + [ i + " " for i in self.argv ]).strip() + +class BackupBackend (ABC): + @contextmanager + def open (self, ctx: GlobalContext): + try: + yield self + self.rotate(ctx) + except: + self.rollback(ctx) + raise + finally: + self.close(ctx) + + @abstractmethod + def rollback (self, ctx: GlobalContext): + ... + @abstractmethod + def close (self, ctx: GlobalContext): + ... + @abstractmethod + def sink (self, ctx: GlobalContext, path: str) -> Exec: + ... + @abstractmethod + def rotate (self, ctx: GlobalContext): + ... + @abstractmethod + def _fs_usage_info (self, ctx: GlobalContext) -> Iterable[tuple[str, int]]: + # return: copy path: du + ... + @abstractmethod + def _excl_fs_copies (self, ctx: GlobalContext) -> set[str]: + ... + @abstractmethod + def _rm_fs_recursive (self, ctx: GlobalContext, pl: Iterable[str]): + ... + + def _logger (self, ctx: GlobalContext) -> logging.Logger: + name = "bb." + str(self) + return ctx.l.getChild(name) + + @abstractmethod + def _fs_quota_target (self, ctx: GlobalContext) -> tuple[Decimal, Decimal]: + # return: nb_copies, tot_size + ... + + def _do_fs_rotate (self, ctx: GlobalContext): + nb_copy_limit, root_size_limit = self._fs_quota_target(ctx) + dirs = self._fs_usage_info(ctx) + excl_copies = self._excl_fs_copies(ctx) + l = self._logger(ctx) + + tot_size = 0 + for i in dirs: + tot_size += i[1] + + l.debug("du: tot_size=%u, nb_copies=%u" % (tot_size, len(dirs))) + if root_size_limit >= tot_size and nb_copy_limit >= len(dirs): + l.debug("no action required for rotation") + return + + size_delta = tot_size - root_size_limit + dir_delta = len(dirs) - nb_copy_limit + del_size = 0 + del_list = list[str]() + while dirs and (del_size < size_delta or len(del_list) < dir_delta): + p = dirs.pop(0) + if p[0] in excl_copies: + continue + del_list.append(p[0]) + del_size += p[1] + + l.debug("deemed expired: %u copies, totalling %u bytes" % + (len(del_list), del_size)) + + self._rm_fs_recursive(ctx, del_list) + + def mkprefix_iso8601 ( + timespec: str = "seconds", + tz: datetime.tzinfo = timezone.utc) -> str: + return datetime.now(tz).isoformat(timespec = timespec) + +class NullBackupBackend (BackupBackend): + def __init__ (self, *args, **kwargs): + pass + + def rollback (self, ctx: GlobalContext): + pass + + def close (self, ctx: GlobalContext): + pass + + def sink (self, *args, **kwargs): + e = Exec() + e.argv = [ "/bin/cp", "/dev/stdin", "/dev/null" ] + + return e + + def rotate (self, ctx: GlobalContext): + pass + + def __str__ (self): + return "null" + +class LocalfsBackupBackend (BackupBackend): + def __init__ (self, param: dict): + self.backup_root = param["root"] + self.mkprefix = BackupBackend.mkprefix_iso8601 + self.nb_copy_limit = Decimal(param.get("nb-copy-limit", "Infinity")) + self.root_size_limit = Decimal(param.get("root-size-limit", "Infinity")) + self.dmode = int(param.get("dmode", "750"), 8) + self.fmode = int(param.get("fmode", "640"), 8) + self.cur_backup_path = None + self.sink_list = list[str]() + + def open (self, ctx: GlobalContext): + self.cur_backup_path = os.sep.join([ self.backup_root, self.mkprefix() ]) + os.makedirs(self.cur_backup_path, self.dmode) + + return super().open(ctx) + + def rollback (self, ctx: GlobalContext): + shutil.rmtree(self.cur_backup_path, ignore_errors = True) + + def close (self, ctx: GlobalContext): + pass + + def sink (self, ctx: GlobalContext, path: str) -> Exec: + path = os.sep.join([ self.cur_backup_path, path ]) + os.makedirs(os.path.dirname(path), self.dmode, True) + self.sink_list.append(path) + + e = Exec() + e.argv = [ "/bin/cp", "/dev/stdin", path ] + + return e + + def _fs_usage_info (self, ctx: GlobalContext) -> Iterable[tuple[str, int]]: + def get_name (entry: os.DirEntry) -> str: + return entry.name + ret = list[tuple[str, int]]() + dirs = LocalfsBackupBackend.get_dirs(self.backup_root) + + dirs.sort(key = get_name) + for i in dirs: + e = (i.path, LocalfsBackupBackend.du(i.path)) + ret.append(e) + + return ret + + def _rm_fs_recursive (self, ctx: GlobalContext, pl: Iterable[str]): + l = self._logger(ctx) + + for i in pl: + l.debug("rm: " + i) + shutil.rmtree(i) + + def _fs_quota_target (self, ctx: GlobalContext) -> tuple[Decimal, Decimal]: + return (self.nb_copy_limit, self.root_size_limit) + + def _excl_fs_copies (self, ctx: GlobalContext) -> set[str]: + ret = set[str]() + ret.add(self.cur_backup_path) + return ret + + def rotate (self, ctx: GlobalContext): + for i in self.sink_list: + os.chmod(i, self.fmode) + return super()._do_fs_rotate(ctx) + + def __str__ (self): + return "localfs" + + def du (path: str) -> int: + ret = 0 + for root, dirs, files in os.walk(path): + for f in files: + p = os.path.join(root, f) + if os.path.islink(p): + continue + ret += os.path.getsize(p) + + return ret + + def get_dirs (path: str) -> list[os.DirEntry]: + ret = [] + for i in os.scandir(path): + if not i.is_symlink() and i.is_dir(): + ret.append(i) + + return ret + +class BuiltinRunnable (Runnable, ValidObject): + def __init__ (self): + self.param = {} + +def parse_signals (x: Iterable) -> set: + ret = set() + + for sig in x: + if sig.isnumeric(): + ret.add(signal.Signals(int(sig))) + else: + sig = sig.upper() + if not sig.startswith("SIG"): + sig = "SIG" + sig + ret.add(signal.Signals.__members__[sig]) + + return ret + +class Sigmask (BuiltinRunnable): + VALID_ACTIONS = { "block": signal.SIG_BLOCK, "unblock": signal.SIG_UNBLOCK } + + def __init__ (self, param: dict): + self.param = param + + def validate (self): + for i in self.param: + self.VALID_ACTIONS[i["action"].lower()] + parse_signals(i["sig"]) + + return self + + def run (self, ctx: GlobalContext): + for i in self.param: + signal.pthread_sigmask( + self.VALID_ACTIONS[i["action"].lower()], + parse_signals(i["sig"])) + + return self + + def __str__ (self) -> str: + return "sigmask(" + str(self.param) + ")" + +BuiltinRunMap = { + "sigmask": Sigmask +} + +class Task (Runnable): + ... + +class RoutineTask (Task): + def __init__ (self, ctx: GlobalContext, jobj: dict): + self.l = ctx.l.getChild("RoutineTask@" + jobj.get("id", hex(id(self)))) + self.routines = [] # Should hold Runnables + + for i in jobj["routine"]: + type_str = i["type"] + + if type_str.startswith("exec"): + r = Exec.from_conf(ctx, i) + elif type_str == "task": + r = ctx.task_map[i["task-id"]] + elif type_str == "builtin": + r = BuiltinRunMap[i["builtin-id"]](i["param"]) + else: + raise RuntimeError("FIXME") + + self.routines.append(r) + + def run (self, ctx: GlobalContext): + for r in self.routines: + self.l.debug("run: " + str(r)) + p = r.run(ctx) + return self + + def __str__ (self) -> str: + return "\n".join([ str(i) for i in self.routines ]) + +class BackupObject (Runnable): + def __init__ ( + self, + jobj: dict, + ctx: GlobalContext): + self.pipeline = [] + self.path = jobj["path"] + self.bbctx = None + + for e in jobj["pipeline"]: + ny_exec = Exec.from_conf(ctx, e) + self.pipeline.append(ny_exec) + + def run (self, ctx: GlobalContext): + last_stdio = subprocess.DEVNULL # Just in case the pipeline is empty + pmap = {} + + for eh in self.pipeline: + p = subprocess.Popen( + args = eh.argv, + stdin = last_stdio, + stdout = subprocess.PIPE, + stderr = None if ctx.test_vl(eh.vl_stderr) else subprocess.DEVNULL, + env = eh.env) + pmap[eh] = p + last_stdio = p.stdout + + sink_exec = self.bbctx.sink(ctx, self.path) + sink_p = subprocess.Popen( + args = sink_exec.argv, + stdin = last_stdio, + stdout = None if ctx.test_vl(sink_exec.vl_stdout) else subprocess.DEVNULL, + stderr = None if ctx.test_vl(sink_exec.vl_stderr) else subprocess.DEVNULL, + env = sink_exec.env) + pmap[sink_exec] = sink_p + + for eh in pmap: + p = pmap[eh] + ec = p.wait() + eh.raise_oob_ec(ec) + + return self + + def __str__ (self): + return " | ".join([ str(i) for i in self.pipeline ]) + " > " + self.path + +class BackupObjectGroup: + def __init__ (self): + self.depends = set() + self.objects = [] + +class DepResolv: + def __init__ (self): + self.obj_dep_map = {} + self.dep_obj_map = {} + self.avail_q = [] + + def build (og_map: dict): + def dive (og: BackupObjectGroup, obj_set: set, recurse_path: set): + if og in recurse_path: + raise RecursionError("Circular reference detected.") + recurse_path.add(og) + + obj_set.update(og.objects) + for dep_og in og.depends: + dive(dep_og, obj_set, recurse_path) + + ret = DepResolv() + + for gid in og_map: + og = og_map[gid] + if og.depends: + dep_objs = set() + recurse_path = set() + for dep_og in og.depends: + dive(dep_og, dep_objs, recurse_path) + + for obj in og.objects: + if obj in ret.obj_dep_map: + s = ret.obj_dep_map[obj] + else: + s = ret.obj_dep_map[obj] = set() + s.update(dep_objs) + for obj in dep_objs: + if obj in ret.dep_obj_map: + s = ret.dep_obj_map[obj] + else: + s = ret.dep_obj_map[obj] = set() + s.update(og.objects) + else: + ret.avail_q.extend(og.objects) + + return ret + + def mark_fulfilled (self, obj): + if obj in self.dep_obj_map: + dep_s = self.dep_obj_map[obj] + del self.dep_obj_map[obj] + + for dep in dep_s: + obj_s = self.obj_dep_map[dep] + obj_s.remove(obj) + if not obj_s: + del self.obj_dep_map[dep] + self.avail_q.append(dep) + + return self + + def __str__ (self): + def enclosed (self, o: BackupObject, sb: list, l: int): + sb.append("\t" * l + o.path) + for i in self.obj_dep_map.get(o, iter(())): + enclosed(self, i, sb, l + 1) + + sb = [] + + for i in self.obj_dep_map.keys(): + enclosed(self, i, sb, 0) + + return "\n".join(sb) + +class BackupTask (Task): + def __init__ (self, ctx: GlobalContext, jobj: dict): + og_map = {} + jobj_ogrps = jobj["object-groups"] + jobj_list = jobj["objects"] + obj_path_set = set() + + self.l = ctx.l.getChild("BackupTask@" + jobj.get("id", hex(id(self)))) + self.bb = ctx.backup_backends[jobj["backend"]](jobj.get("backend-param")) + + # check for dup ids + for og in jobj_ogrps: + ogid = og["id"] + if ogid in og_map: + raise KeyError("Duplicate object group: " + ogid) + og_map[ogid] = BackupObjectGroup() + + # load depends + for og in jobj_ogrps: + ogid = og["id"] + for depend in og.get("depends", iter(())): + if ogid == depend: + raise ReferenceError( + "An object group dependent on itself: " + ogid) + og_map[ogid].depends.add(og_map[depend]) + + # implicit default + if not DEFAULT.OBJ_GRP.value in og_map: + og_map[DEFAULT.OBJ_GRP.value] = BackupObjectGroup() + + # load objs + for jo in jobj_list: + path = jo["path"] + gid = jo.get("group", DEFAULT.OBJ_GRP.value) + + if path in obj_path_set: + raise KeyError("Duplicate path: " + path) + obj_path_set.add(path) + og_map[gid].objects.append(BackupObject(jo, ctx)) + + self.dep_tree = DepResolv.build(og_map) + + def run (self, ctx: GlobalContext): + fs = set() + + with (self.bb.open(ctx) as bbctx, + ThreadPoolExecutor(max_workers = ctx.nb_workers) as th_pool): + while self.dep_tree.avail_q or self.dep_tree.obj_dep_map: + if not fs and not self.dep_tree.avail_q: + # No despatched task units, but DepResolv won't return more work + raise RuntimeError("Invalid dependancy tree!") + + for bo in self.dep_tree.avail_q: + bo.bbctx = bbctx + self.l.debug("despatch: " + str(bo)) + fs.add(th_pool.submit(bo.run, ctx)) + self.dep_tree.avail_q.clear() + + f_ret = futures.wait( + fs = fs, + return_when = futures.FIRST_COMPLETED) + for f in f_ret[0]: + r = f.result() + self.l.debug("reap: " + str(bo)) + self.dep_tree.mark_fulfilled(r) + fs.difference_update(f_ret[0]) + + for f in fs: + self.dep_tree.mark_fulfilled(f.result()) + + return self + + def __str__ (self): + return "bb: " + str(self.bb) + "\n" + ("obj_dep_tree:\n" + str(self.dep_tree).strip()).replace("\n", "\n\t") + +TaskClassMap = { + "backup": BackupTask, + "routine": RoutineTask +} + +def merge_conf (a: dict, b: dict) -> dict: + def chk_dup_id (key, a: dict, b: dict): + c = set(i["id"] for i in a.get(key, iter(()))).intersection( + set(i["id"] for i in b.get(key, iter(())))) + return c + + # exec conflicts + c = chk_dup_id("execs", a, b) + if c: + raise KeyError("Dup execs: " + c) + # task conflicts + c = chk_dup_id("tasks", a, b) + if c: + raise KeyError("Dup tasks: " + c) + + return a | b + +def load_jsonc (path: str) -> dict: + with open(path) as in_file: + p = subprocess.run( + [ "/bin/json_reformat" ], + stdin = in_file, + capture_output = True) + if p.returncode != 0: + raise FormatError(path) + + return json.load(io.BytesIO(p.stdout)) + +def load_conf (path: str, inc_set: set = set()) -> dict: + if path in inc_set: + raise ReferenceError("Config included multiple times: " + path) + inc_set.add(path) + + if path.endswith(".jsonc"): + jobj = load_jsonc(path) + else: + with open(path) as file: + jobj = json.load(file) + + # TODO: do schema validation + + for i in jobj.get("include", iter(())): + inc_conf = load_conf(i, inc_set) + jobj = merge_conf(jobj, inc_conf) + + return jobj + +def setup_conf (jobj: dict) -> GlobalContext: + ret = GlobalContext(jobj) + + for i in jobj.get("execs", iter(())): + ret.exec_map[i["id"]] = Exec(i) + + for i in jobj["tasks"]: + ret.task_map[i["id"]] = TaskClassMap[i["type"]](ret, i) + + return ret diff --git a/src/palhm/mod/__init__.py b/src/palhm/mod/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/palhm/mod/__init__.py diff --git a/src/palhm/mod/aws.py b/src/palhm/mod/aws.py new file mode 100644 index 0000000..fcb16f1 --- /dev/null +++ b/src/palhm/mod/aws.py @@ -0,0 +1,229 @@ +from concurrent.futures import ThreadPoolExecutor, Future +from decimal import Decimal +from enum import Enum +from time import sleep +from typing import Callable, Iterable + +import boto3 +import botocore +from palhm import BackupBackend, Exec, GlobalContext + + +class CONST (Enum): + AWSCLI = "/bin/aws" + +def mks3objkey (keys: Iterable[str]) -> str: + ret = "/".join(keys) + return ret.strip("/") + +def mks3uri (bucket: str, keys: Iterable[str]) -> str: + return "s3://" + bucket + "/" + "/".join(keys) + +class S3BackupBackend (BackupBackend): + def __init__ (self, param: dict): + self.profile = param.get("profile", "default") + self.bucket = param["bucket"] + self.root_key = mks3objkey([param["root"]]) + self.mkprefix = BackupBackend.mkprefix_iso8601 + self.nb_copy_limit = Decimal(param.get("nb-copy-limit", "Infinity")) + self.root_size_limit = Decimal(param.get("root-size-limit", "Infinity")) + self.cur_backup_uri = None + self.cur_backup_key = None + self.sc_sink = param.get("sink-storage-class") + self.sc_rot = param.get("rot-storage-class") + self.client = None + self.sink_list = list[str]() + + def _setup_cur_backup (self, ctx: GlobalContext): + self.cur_backup_key = mks3objkey([self.root_key, self.mkprefix()]) + self.cur_backup_uri = mks3uri(self.bucket, [self.cur_backup_key]) + + def open (self, ctx: GlobalContext): + self.client = boto3.Session(profile_name = self.profile).client("s3") + + try: + for i in range(0, 2): + self._setup_cur_backup(ctx) + # This should raise + self.client.head_object( + Bucket = self.bucket, + Key = self.cur_backup_key) + sleep(1) + # Make sure we don't proceed + raise FileExistsError(self.cur_backup_uri) + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] != "404": # expected status code + raise + + return super().open(ctx) + + def _cleanup_multiparts (self, ctx: GlobalContext) -> bool: + def do_abort (e): + try: + self.client.abort_multipart_upload( + Bucket = self.bucket, + Key = e["Key"], + UploadId = e["UploadId"]) + except: pass + + cont = None + fl = list[Future]() + + with ThreadPoolExecutor(max_workers = ctx.nb_workers) as th_pool: + while True: + if cont: + r = self.client.list_multipart_uploads( + Bucket = self.bucket, + Prefix = self.cur_backup_key, + KeyMarker = cont[0], + UploadIdMarker = cont[1]) + else: + r = self.client.list_multipart_uploads( + Bucket = self.bucket, + Prefix = self.cur_backup_key) + + for i in r.get("Uploads", iter(())): + fl.append(th_pool.submit(do_abort, i)) + for i in fl: + i.result() + fl.clear() + + if r["IsTruncated"]: + cont = (r["NextKeyMarker"], r["UploadIdMarker"]) + else: + break + + def _foreach_objs (self, ctx: GlobalContext, prefix: str, cb: Callable): + cont_token = None + + if not prefix.endswith("/"): prefix += "/" + + while True: + if cont_token: + r = self.client.list_objects_v2( + Bucket = self.bucket, + Prefix = prefix, + ContinuationToken = cont_token) + else: + r = self.client.list_objects_v2( + Bucket = self.bucket, + Prefix = prefix) + + for i in r["Contents"]: + cb(i) + + if r["IsTruncated"]: + cont_token = r["NextContinuationToken"] + else: + break + + def _fs_usage_info (self, ctx: GlobalContext) -> Iterable[tuple[str, int]]: + du_map = dict[str, int]() + ret = list[tuple[str, int]]() + prefix = self.root_key + "/" + def cb (i): + o_key = i["Key"] + o_size = i.get("Size", 0) + if not o_key.startswith(self.root_key): + raise RuntimeError("The endpoint returned an object " + + "irrelevant to the request: " + o_key) + + l = o_key.find("/", len(prefix)) + if l >= 0: + o_backup = o_key[:l] + else: + return + + du_map[o_backup] = du_map.get(o_backup, 0) + o_size + + self._foreach_objs(ctx, prefix, cb) + for i in sorted(du_map.keys()): + ret.append((i, du_map[i])) + + return ret + + def _excl_fs_copies (self, ctx: GlobalContext) -> set[str]: + ret = set[str]() + ret.add(self.cur_backup_key) + return ret + + def _rm_fs_recursive (self, ctx: GlobalContext, pl: Iterable[str]): + l = self._logger(ctx) + + with ThreadPoolExecutor(max_workers = ctx.nb_workers) as th_pool: + fl = list[Future]() + + for i in pl: + e = Exec() + e.argv = [ + CONST.AWSCLI.value, + "--profile=" + self.profile, + "s3", + "rm", + "--quiet", + "--recursive", + mks3uri(self.bucket, [i]) ] + l.debug("run: " + str(e)) + fl.append(th_pool.submit(e.run, ctx)) + for i in fl: + i.result() + + def _fs_quota_target (self, ctx: GlobalContext) -> tuple[Decimal, Decimal]: + return (self.nb_copy_limit, self.root_size_limit) + + def rollback (self, ctx: GlobalContext): + if not self.cur_backup_uri is None: + self._rm_fs_recursive(ctx, [self.cur_backup_uri]) + + def close (self, ctx: GlobalContext): + self._cleanup_multiparts(ctx) + + def sink (self, ctx: GlobalContext, path: str) -> Exec: + l = self._logger(ctx) + + e = Exec() + e.argv = [ + CONST.AWSCLI.value, + "--profile=" + self.profile, + "s3", + "cp", + "--only-show-errors" ] + if self.sc_sink: + e.argv.append("--storage-class=" + self.sc_sink) + e.argv.extend(["-", "/".join([self.cur_backup_uri, path])]) + + l.debug("sink: " + str(e)) + self.sink_list.append(mks3objkey([self.cur_backup_key, path])) + + return e + + def rotate (self, ctx: GlobalContext): + ret = super()._do_fs_rotate(ctx) + + if self.sc_rot: + def chsc (k): + self.client.copy_object( + Bucket = self.bucket, + CopySource = mks3objkey([self.bucket, k]), + Key = k, + MetadataDirective = "COPY", + StorageClass = self.sc_rot) + + with ThreadPoolExecutor(max_workers = ctx.nb_workers) as th_pool: + l = self._logger(ctx) + fl = list[Future]() + + for i in self.sink_list: + l.debug("chsc: %s %s" % (self.sc_rot, i)) + fl.append(th_pool.submit(chsc, i)) + for i in fl: + i.result() + + return ret + + def __str__ (self): + return "aws-s3" + +backup_backends = { + "aws-s3": S3BackupBackend +} |