diff options
author | David Timber <dxdt@dev.snart.me> | 2025-06-02 10:43:48 +0900 |
---|---|---|
committer | David Timber <dxdt@dev.snart.me> | 2025-06-02 11:53:40 +0900 |
commit | 35f2ceaa417262bab002067e65b6a84389a199e0 (patch) | |
tree | 50f515cebed7ec2568c6d03abf50a065d333d9fe | |
parent | 17d3e1285b9507a99c0d903a2716ca43831d98ee (diff) |
Add mmfwd.recsync ...calls
Encode and upload recordings to S3. Requires awscli installed and
configured. Set destination S3 prefix using env var.
Invocation: python -m mmfwd.recsync
-rw-r--r-- | .vscode/launch.json | 13 | ||||
-rw-r--r-- | src/mmfwd/recsync/__init__.py | 125 | ||||
-rw-r--r-- | src/mmfwd/recsync/__main__.py | 9 | ||||
-rw-r--r-- | src/mmfwd/recsync/exceptions.py | 2 |
4 files changed, 148 insertions, 1 deletions
diff --git a/.vscode/launch.json b/.vscode/launch.json index 1a65bdf..04398d4 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -5,11 +5,22 @@ "version": "0.2.0", "configurations": [ { - "name": "Python Debugger: Module", + "name": "Python Debugger: Module mmfwd", "type": "debugpy", "request": "launch", "module": "mmfwd", "cwd": "${workspaceFolder}/src", + }, + { + "name": "Python Debugger: Module mmfwd.recsync", + "type": "debugpy", + "request": "launch", + "module": "mmfwd.recsync", + "cwd": "${workspaceFolder}/src", + "env": { + // your s3 prefix here, with or without trailing slash + "MMFWD_RECSYNC_S3_UPLOAD_PREFIX": "s3://.../mmfwd/rec/" + } } ] } diff --git a/src/mmfwd/recsync/__init__.py b/src/mmfwd/recsync/__init__.py new file mode 100644 index 0000000..32d0fc1 --- /dev/null +++ b/src/mmfwd/recsync/__init__.py @@ -0,0 +1,125 @@ +import datetime +from fcntl import LOCK_EX, flock +import glob +from io import SEEK_SET +import os +import subprocess +import sys +from typing import assert_never +from .exceptions import * + +def acquire_plock (): + f = open("mmfwd-recsync.pid", "w+") + try: + flock(f, LOCK_EX) + f.write(str(os.getpid())) + f.flush() + + return f + except OSError as e: + f.seek(0, SEEK_SET) + pid = f.read(255).strip() + f.close() + + raise MMFWDRSProcessLockException( + "Process lock failed (Another process: %s)" % (pid), + e) + + assert_never() + +def strip_fileext (x: str) -> str: + last_dot = x.rfind('.') + last_sep = x.rfind(os.path.sep) + + # the last . is after the last / + # or there's no /, but dot exists + if last_dot >= 0 and last_sep < last_dot: + # then, it's safe to strip + return x[:last_dot] + # there's no . + return x + + +def proc_dir (dir: str): + # including trailing '/' if desired + s3_upload_prefix = os.getenv('MMFWD_RECSYNC_S3_UPLOAD_PREFIX') + # minimum audio length: 5 seconds + minsize = 5 * 1 * 2 * 16000 # seconds * nb_channels * bytes_per_sample * rate + + for path in glob.glob(dir + "/*.pcm"): # for each raw audio file, + if not os.path.isfile(path): + continue + + # do stat() + stat = os.stat(path) + + # delete short recordings + if stat.st_size < minsize: + sys.stderr.write( + "mmfwd.recsync: deleting short recording: " + + path + + os.linesep) + os.remove(path) + continue + + # ignore if less than an hour as a security measure against KYC attempts + now = datetime.datetime.now(datetime.UTC) + f_mtime = datetime.datetime.fromtimestamp(stat.st_mtime, datetime.UTC) + if now - f_mtime < datetime.timedelta(hours = 1.0): + continue + + # be defensive: align the original raw audio file to 2 bytes boundary + # so that ffmpeg won't complain + fsize = stat.st_size + if fsize % 2 != 0: + fsize = (fsize // 2) * 2 + os.truncate(path, fsize) + + # sep file ext, fabricate output file names + basename = strip_fileext(os.path.basename(path)) + s3_basename = "%d-%02d/%s" % (f_mtime.year, f_mtime.month, basename) + local_basepath = os.path.dirname(path) + '/' + basename + s3_basepath = s3_upload_prefix + s3_basename + local_outf_flac = local_basepath + ".flac" + local_outf_ogg = local_basepath + ".ogg" + s3_outf_flac = s3_basepath + ".flac" + s3_outf_ogg = s3_basepath + ".ogg" + + # run ffmpeg for FLAC, lossless original not post processed + subprocess.run([ + "ffmpeg", "-nostdin", "-loglevel", "error", + "-f", "s16le", "-ac", "1", "-ar", "16000", + "-i", path, + "-y", + local_outf_flac + ], check = True) + # run ffmpeg for opus OGG w/ RMS normalisation + subprocess.run([ + "ffmpeg", "-nostdin", "-loglevel", "error", + "-f", "s16le", "-ac", "1", "-ar", "16000", + "-i", path, + "-filter:a", "loudnorm", + "-c:a", "libopus", + "-y", + local_outf_ogg + ], check = True) + + # do upload + subprocess.run([ + "aws", "s3", "cp", "--no-progress", local_outf_flac, s3_outf_flac + ], check = True) + subprocess.run([ + "aws", "s3", "cp", "--no-progress", local_outf_ogg, s3_outf_ogg + ], check = True) + + # good! now delete files including the original from local + os.remove(local_outf_flac) + os.remove(local_outf_ogg) + os.remove(path) + + # if the directory becomes empty + dir_not_empty = True + for _ in os.scandir(dir): + dir_not_empty = False + break + if dir_not_empty: os.rmdir(dir) diff --git a/src/mmfwd/recsync/__main__.py b/src/mmfwd/recsync/__main__.py new file mode 100644 index 0000000..6f56adf --- /dev/null +++ b/src/mmfwd/recsync/__main__.py @@ -0,0 +1,9 @@ +import glob +import os +from . import * + +with acquire_plock(): # flock on pid file + for path in glob.glob("rec/????-??"): # for each YYYY-MM dir, + if not os.path.isdir(path): + continue + proc_dir(path) diff --git a/src/mmfwd/recsync/exceptions.py b/src/mmfwd/recsync/exceptions.py new file mode 100644 index 0000000..052c48d --- /dev/null +++ b/src/mmfwd/recsync/exceptions.py @@ -0,0 +1,2 @@ +class MMFWDRSException (BaseException): ... +class MMFWDRSProcessLockException (MMFWDRSException): ... |