diff options
Diffstat (limited to 'src/toolkit')
| -rw-r--r-- | src/toolkit/__init__.py | 1 | ||||
| -rw-r--r-- | src/toolkit/common.py | 192 | ||||
| -rw-r--r-- | src/toolkit/ffmpeg.py | 545 | ||||
| -rw-r--r-- | src/toolkit/frame.py | 117 |
4 files changed, 0 insertions, 855 deletions
diff --git a/src/toolkit/__init__.py b/src/toolkit/__init__.py deleted file mode 100644 index 55e5f84..0000000 --- a/src/toolkit/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .common import * diff --git a/src/toolkit/common.py b/src/toolkit/common.py deleted file mode 100644 index e35aba2..0000000 --- a/src/toolkit/common.py +++ /dev/null @@ -1,192 +0,0 @@ -""" -Common functions -""" - -from PyQt6 import QtWidgets -import string -import os -import sys -import subprocess -import logging -from copy import copy -from collections import OrderedDict - - -log = logging.getLogger("AVP.Toolkit.Common") - - -class blockSignals: - """ - Context manager to temporarily block list of QtWidgets from updating, - and guarantee restoring the previous state afterwards. - """ - - def __init__(self, widgets): - if type(widgets) is dict: - self.widgets = concatDictVals(widgets) - else: - self.widgets = widgets if hasattr(widgets, "__iter__") else [widgets] - - def __enter__(self): - log.verbose( - "Blocking signals for %s", - ", ".join([str(w.__class__.__name__) for w in self.widgets]), - ) - self.oldStates = [w.signalsBlocked() for w in self.widgets] - for w in self.widgets: - w.blockSignals(True) - - def __exit__(self, *args): - log.verbose("Resetting blockSignals to %s", str(bool(sum(self.oldStates)))) - for w, state in zip(self.widgets, self.oldStates): - w.blockSignals(state) - - -def concatDictVals(d): - """Concatenates all values in given dict into one list.""" - key, value = d.popitem() - d[key] = value - final = copy(value) - if type(final) is not list: - final = [final] - final.extend([val for val in d.values()]) - else: - value.extend([item for val in d.values() for item in val]) - return final - - -def badName(name): - """Returns whether a name contains non-alphanumeric chars""" - return any([letter in string.punctuation for letter in name]) - - -def alphabetizeDict(dictionary): - """Alphabetizes a dict into OrderedDict""" - return OrderedDict(sorted(dictionary.items(), key=lambda t: t[0])) - - -def presetToString(dictionary): - """Returns string repr of a preset""" - return repr(alphabetizeDict(dictionary)) - - -def presetFromString(string): - """Turns a string repr of OrderedDict into a regular dict""" - return dict(eval(string)) - - -def appendUppercase(lst): - for form, i in zip(lst, range(len(lst))): - lst.append(form.upper()) - return lst - - -def pipeWrapper(func): - """A decorator to insert proper kwargs into Popen objects.""" - - def pipeWrapper(commandList, **kwargs): - if sys.platform == "win32": - # Stop CMD window from appearing on Windows - startupinfo = subprocess.STARTUPINFO() - startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW - kwargs["startupinfo"] = startupinfo - - if "bufsize" not in kwargs: - kwargs["bufsize"] = 10**8 - if "stdin" not in kwargs: - kwargs["stdin"] = subprocess.DEVNULL - return func(commandList, **kwargs) - - return pipeWrapper - - -@pipeWrapper -def checkOutput(commandList, **kwargs): - return subprocess.check_output(commandList, **kwargs) - - -def disableWhenEncoding(func): - def decorator(self, *args, **kwargs): - if self.encoding: - return - else: - return func(self, *args, **kwargs) - - return decorator - - -def disableWhenOpeningProject(func): - def decorator(self, *args, **kwargs): - if self.core.openingProject: - return - else: - return func(self, *args, **kwargs) - - return decorator - - -def rgbFromString(string): - """Turns an RGB string like "255, 255, 255" into a tuple""" - try: - tup = tuple([int(i) for i in string.split(",")]) - if len(tup) != 3: - raise ValueError - for i in tup: - if i > 255 or i < 0: - raise ValueError - return tup - except: - return (255, 255, 255) - - -def formatTraceback(tb=None): - import traceback - - if tb is None: - import sys - - tb = sys.exc_info()[2] - return "Traceback:\n%s" % "\n".join(traceback.format_tb(tb)) - - -def connectWidget(widget, func): - if type(widget) == QtWidgets.QLineEdit: - widget.textChanged.connect(func) - elif type(widget) == QtWidgets.QSpinBox or type(widget) == QtWidgets.QDoubleSpinBox: - widget.valueChanged.connect(func) - elif type(widget) == QtWidgets.QCheckBox: - widget.stateChanged.connect(func) - elif type(widget) == QtWidgets.QComboBox: - widget.currentIndexChanged.connect(func) - else: - log.warning("Failed to connect %s ", str(widget.__class__.__name__)) - return False - return True - - -def setWidgetValue(widget, val): - """Generic setValue method for use with any typical QtWidget""" - log.verbose("Setting %s to %s" % (str(widget.__class__.__name__), val)) - if type(widget) == QtWidgets.QLineEdit: - widget.setText(val) - elif type(widget) == QtWidgets.QSpinBox or type(widget) == QtWidgets.QDoubleSpinBox: - widget.setValue(val) - elif type(widget) == QtWidgets.QCheckBox: - widget.setChecked(val) - elif type(widget) == QtWidgets.QComboBox: - widget.setCurrentIndex(val) - else: - log.warning("Failed to set %s ", str(widget.__class__.__name__)) - return False - return True - - -def getWidgetValue(widget): - if type(widget) == QtWidgets.QLineEdit: - return widget.text() - elif type(widget) == QtWidgets.QSpinBox or type(widget) == QtWidgets.QDoubleSpinBox: - return widget.value() - elif type(widget) == QtWidgets.QCheckBox: - return widget.isChecked() - elif type(widget) == QtWidgets.QComboBox: - return widget.currentIndex() diff --git a/src/toolkit/ffmpeg.py b/src/toolkit/ffmpeg.py deleted file mode 100644 index 5aedff3..0000000 --- a/src/toolkit/ffmpeg.py +++ /dev/null @@ -1,545 +0,0 @@ -""" -Tools for using ffmpeg -""" - -import numpy -import sys -import os -import subprocess -import threading -import signal -from queue import PriorityQueue -import logging - -from .. import core -from .common import checkOutput, pipeWrapper - - -log = logging.getLogger("AVP.Toolkit.Ffmpeg") - - -class FfmpegVideo: - """Opens a pipe to ffmpeg and stores a buffer of raw video frames.""" - - # error from the thread used to fill the buffer - threadError = None - - def __init__(self, **kwargs): - mandatoryArgs = [ - "inputPath", - "filter_", - "width", - "height", - "frameRate", # frames per second - "chunkSize", # number of bytes in one frame - "parent", # mainwindow object - "component", # component object - ] - for arg in mandatoryArgs: - setattr(self, arg, kwargs[arg]) - - self.frameNo = -1 - self.currentFrame = "None" - self.map_ = None - - if "loopVideo" in kwargs and kwargs["loopVideo"]: - self.loopValue = "-1" - else: - self.loopValue = "0" - if "filter_" in kwargs: - if kwargs["filter_"][0] != "-filter_complex": - kwargs["filter_"].insert(0, "-filter_complex") - else: - kwargs["filter_"] = None - - self.command = [ - core.Core.FFMPEG_BIN, - "-thread_queue_size", - "512", - "-r", - str(self.frameRate), - "-stream_loop", - str(self.loopValue), - "-i", - self.inputPath, - "-f", - "image2pipe", - "-pix_fmt", - "rgba", - ] - if type(kwargs["filter_"]) is list: - self.command.extend(kwargs["filter_"]) - self.command.extend( - [ - "-codec:v", - "rawvideo", - "-", - ] - ) - - self.frameBuffer = PriorityQueue() - self.frameBuffer.maxsize = self.frameRate - self.finishedFrames = {} - - self.thread = threading.Thread( - target=self.fillBuffer, name="FFmpeg Frame-Fetcher" - ) - self.thread.daemon = True - self.thread.start() - - def frame(self, num): - while True: - if num in self.finishedFrames: - image = self.finishedFrames.pop(num) - return image - - i, image = self.frameBuffer.get() - self.finishedFrames[i] = image - self.frameBuffer.task_done() - - def fillBuffer(self): - from ..component import ComponentError - - if core.Core.logEnabled: - logFilename = os.path.join( - core.Core.logDir, "render_%s.log" % str(self.component.compPos) - ) - log.debug("Creating ffmpeg process (log at %s)", logFilename) - with open(logFilename, "w") as logf: - logf.write(" ".join(self.command) + "\n\n") - with open(logFilename, "a") as logf: - self.pipe = openPipe( - self.command, - stdin=subprocess.DEVNULL, - stdout=subprocess.PIPE, - stderr=logf, - bufsize=10**8, - ) - else: - self.pipe = openPipe( - self.command, - stdin=subprocess.DEVNULL, - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL, - bufsize=10**8, - ) - - while True: - if self.parent.canceled: - break - self.frameNo += 1 - - # If we run out of frames, use the last good frame and loop. - try: - if len(self.currentFrame) == 0: - self.frameBuffer.put((self.frameNo - 1, self.lastFrame)) - continue - except AttributeError: - FfmpegVideo.threadError = ComponentError( - self.component, - "video", - "Video seemed playable but wasn't.", - ) - break - - try: - self.currentFrame = self.pipe.stdout.read(self.chunkSize) - except ValueError as e: - if str(e) == "PyMemoryView_FromBuffer(): info->buf must not be NULL": - log.debug( - "Ignored 'info->buf must not be NULL' error from FFmpeg pipe" - ) - return - else: - FfmpegVideo.threadError = ComponentError(self.component, "video") - - if len(self.currentFrame) != 0: - self.frameBuffer.put((self.frameNo, self.currentFrame)) - self.lastFrame = self.currentFrame - - -@pipeWrapper -def openPipe(commandList, **kwargs): - return subprocess.Popen(commandList, **kwargs) - - -def closePipe(pipe): - pipe.stdout.close() - pipe.send_signal(signal.SIGTERM) - - -def findFfmpeg(): - if sys.platform == "win32": - bin = "ffmpeg.exe" - else: - bin = "ffmpeg" - - if getattr(sys, "frozen", False): - # The application is frozen - bin = os.path.join(core.Core.wd, bin) - - with open(os.devnull, "w") as f: - try: - checkOutput([bin, "-version"], stderr=f) - except (subprocess.CalledProcessError, FileNotFoundError): - bin = "" - - return bin - - -def createFfmpegCommand(inputFile, outputFile, components, duration=-1): - """ - Constructs the major ffmpeg command used to export the video - """ - if duration == -1: - duration = getAudioDuration(inputFile) - safeDuration = "{0:.3f}".format(duration - 0.05) # used by filters - duration = "{0:.3f}".format(duration + 0.1) # used by input sources - Core = core.Core - - # Test if user has libfdk_aac - encoders = checkOutput("%s -encoders -hide_banner" % Core.FFMPEG_BIN, shell=True) - encoders = encoders.decode("utf-8") - - acodec = Core.settings.value("outputAudioCodec") - - options = Core.encoderOptions - containerName = Core.settings.value("outputContainer") - vcodec = Core.settings.value("outputVideoCodec") - vbitrate = str(Core.settings.value("outputVideoBitrate")) + "k" - acodec = Core.settings.value("outputAudioCodec") - abitrate = str(Core.settings.value("outputAudioBitrate")) + "k" - - for cont in options["containers"]: - if cont["name"] == containerName: - container = cont["container"] - break - - vencoders = options["video-codecs"][vcodec] - aencoders = options["audio-codecs"][acodec] - - def error(): - nonlocal encoders, encoder - log.critical( - "Selected encoder (%s) is not supported by Ffmpeg. The supported encoders are: %s", - encoder, - encoders, - ) - return [] - - for encoder in vencoders: - if encoder in encoders: - vencoder = encoder - break - else: - return error() - - for encoder in aencoders: - if encoder in encoders: - aencoder = encoder - break - else: - return error() - - ffmpegCommand = [ - Core.FFMPEG_BIN, - "-thread_queue_size", - "512", - "-y", # overwrite the output file if it already exists. - # INPUT VIDEO - "-f", - "rawvideo", - "-vcodec", - "rawvideo", - "-s", - f'{Core.settings.value("outputWidth")}x{Core.settings.value("outputHeight")}', - "-pix_fmt", - "rgba", - "-r", - str(Core.settings.value("outputFrameRate")), - "-t", - duration, - "-an", # the video input has no sound - "-i", - "-", # the video input comes from a pipe - # INPUT SOUND - "-t", - duration, - "-i", - inputFile, - ] - - extraAudio = [comp.audio for comp in components if "audio" in comp.properties()] - segment = createAudioFilterCommand(extraAudio, safeDuration) - ffmpegCommand.extend(segment) - # Map audio from the filters or the single audio input, and map video from the pipe - ffmpegCommand.extend( - [ - "-map", - "0:v", - "-map", - "[a]" if segment else "1:a", - ] - ) - - ffmpegCommand.extend( - [ - # OUTPUT - "-vcodec", - vencoder, - "-acodec", - aencoder, - "-b:v", - vbitrate, - "-b:a", - abitrate, - "-pix_fmt", - Core.settings.value("outputVideoFormat"), - "-preset", - Core.settings.value("outputPreset"), - "-f", - container, - ] - ) - - if acodec == "aac": - ffmpegCommand.append("-strict") - ffmpegCommand.append("-2") - - ffmpegCommand.append(outputFile) - return ffmpegCommand - - -def createAudioFilterCommand(extraAudio, duration): - """Add extra inputs and any needed filters to the main ffmpeg command.""" - # NOTE: Global filters are currently hard-coded here for debugging use - globalFilters = 0 # increase to add global filters - - if not extraAudio and not globalFilters: - return [] - - ffmpegCommand = [] - # Add -i options for extra input files - extraFilters = {} - for streamNo, params in enumerate(reversed(extraAudio)): - extraInputFile, params = params - ffmpegCommand.extend( - [ - "-t", - duration, - # Tell ffmpeg about shorter clips (seemingly not needed) - # streamDuration = getAudioDuration(extraInputFile) - # if streamDuration and streamDuration > float(safeDuration) - # else "{0:.3f}".format(streamDuration), - "-i", - extraInputFile, - ] - ) - # Construct dataset of extra filters we'll need to add later - for ffmpegFilter in params: - if streamNo + 2 not in extraFilters: - extraFilters[streamNo + 2] = [] - extraFilters[streamNo + 2].append((ffmpegFilter, params[ffmpegFilter])) - - # Start creating avfilters! Popen-style, so don't use semicolons; - extraFilterCommand = [] - - if globalFilters <= 0: - # Dictionary of last-used tmp labels for a given stream number - tmpInputs = {streamNo: -1 for streamNo in extraFilters} - else: - # Insert blank entries for global filters into extraFilters - # so the per-stream filters know what input to source later - for streamNo in range(len(extraAudio), 0, -1): - if streamNo + 1 not in extraFilters: - extraFilters[streamNo + 1] = [] - # Also filter the primary audio track - extraFilters[1] = [] - tmpInputs = {streamNo: globalFilters - 1 for streamNo in extraFilters} - - # Add the global filters! - # NOTE: list length must = globalFilters, currently hardcoded - if tmpInputs: - extraFilterCommand.extend( - [ - "[%s:a] ashowinfo [%stmp0]" % (str(streamNo), str(streamNo)) - for streamNo in tmpInputs - ] - ) - - # Now add the per-stream filters! - for streamNo, paramList in extraFilters.items(): - for param in paramList: - source = ( - "[%s:a]" % str(streamNo) - if tmpInputs[streamNo] == -1 - else "[%stmp%s]" % (str(streamNo), str(tmpInputs[streamNo])) - ) - tmpInputs[streamNo] = tmpInputs[streamNo] + 1 - extraFilterCommand.append( - "%s %s%s [%stmp%s]" - % ( - source, - param[0], - param[1], - str(streamNo), - str(tmpInputs[streamNo]), - ) - ) - - # Join all the filters together and combine into 1 stream - extraFilterCommand = "; ".join(extraFilterCommand) + "; " if tmpInputs else "" - ffmpegCommand.extend( - [ - "-filter_complex", - extraFilterCommand - + "%s amix=inputs=%s:duration=first [a]" - % ( - "".join( - [ - ( - "[%stmp%s]" % (str(i), tmpInputs[i]) - if i in extraFilters - else "[%s:a]" % str(i) - ) - for i in range(1, len(extraAudio) + 2) - ] - ), - str(len(extraAudio) + 1), - ), - ] - ) - return ffmpegCommand - - -def testAudioStream(filename): - """Test if an audio stream definitely exists""" - audioTestCommand = [ - core.Core.FFMPEG_BIN, - "-i", - filename, - "-vn", - "-f", - "null", - "-", - ] - try: - checkOutput(audioTestCommand, stderr=subprocess.DEVNULL) - except subprocess.CalledProcessError: - return False - else: - return True - - -def getAudioDuration(filename): - """Try to get duration of audio file as float, or False if not possible""" - command = [core.Core.FFMPEG_BIN, "-i", filename] - - try: - fileInfo = checkOutput(command, stderr=subprocess.STDOUT) - except subprocess.CalledProcessError as ex: - fileInfo = ex.output - except (FileNotFoundError, PermissionError): - # ffmpeg is possibly not installed - return False - - try: - info = fileInfo.decode("utf-8").split("\n") - except UnicodeDecodeError as e: - log.error("Unicode error:", str(e)) - return False - - for line in info: - if "Duration" in line: - d = line.split(",")[0] - d = d.split(" ")[3] - d = d.split(":") - duration = float(d[0]) * 3600 + float(d[1]) * 60 + float(d[2]) - break - else: - # String not found in output - return False - return duration - - -def readAudioFile(filename, videoWorker): - """ - Creates the completeAudioArray given to components - and used to draw the classic visualizer. - """ - duration = getAudioDuration(filename) - if not duration: - log.error(f"Audio file {filename} doesn't exist or unreadable.") - return - - command = [ - core.Core.FFMPEG_BIN, - "-i", - filename, - "-f", - "s16le", - "-acodec", - "pcm_s16le", - "-ar", - "44100", # ouput will have 44100 Hz - "-ac", - "1", # mono (set to '2' for stereo) - "-", - ] - in_pipe = openPipe( - command, - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL, - bufsize=10**8, - ) - - completeAudioArray = numpy.empty(0, dtype="int16") - - progress = 0 - lastPercent = None - while True: - if core.Core.canceled: - return - # read 2 seconds of audio - progress += 4 - raw_audio = in_pipe.stdout.read(88200 * 4) - if len(raw_audio) == 0: - break - audio_array = numpy.frombuffer(raw_audio, dtype="int16") - completeAudioArray = numpy.append(completeAudioArray, audio_array) - - percent = int(100 * (progress / duration)) - if percent >= 100: - percent = 100 - - if lastPercent != percent: - string = "Loading audio file: " + str(percent) + "%" - videoWorker.progressBarSetText.emit(string) - videoWorker.progressBarUpdate.emit(percent) - - lastPercent = percent - - in_pipe.kill() - in_pipe.wait() - - # add 0s the end - completeAudioArrayCopy = numpy.zeros(len(completeAudioArray) + 44100, dtype="int16") - completeAudioArrayCopy[: len(completeAudioArray)] = completeAudioArray - completeAudioArray = completeAudioArrayCopy - - return (completeAudioArray, duration) - - -def exampleSound(style="white", extra="apulsator=offset_l=0.35:offset_r=0.67"): - """Help generate an example sound for use in creating a preview""" - - if style == "white": - src = "-2+random(0)" - elif style == "freq": - src = "sin(1000*t*PI*t)" - elif style == "wave": - src = "sin(random(0)*2*PI*t)*tan(random(0)*2*PI*t)" - elif style == "stereo": - src = "0.1*sin(2*PI*(360-2.5/2)*t) | 0.1*sin(2*PI*(360+2.5/2)*t)" - - return "aevalsrc='%s', %s%s" % (src, extra, ", " if extra else "") diff --git a/src/toolkit/frame.py b/src/toolkit/frame.py deleted file mode 100644 index 94537a6..0000000 --- a/src/toolkit/frame.py +++ /dev/null @@ -1,117 +0,0 @@ -""" -Common tools for drawing compatible frames in a Component's frameRender() -""" - -from PyQt6 import QtGui -from PIL import Image -from PIL.ImageQt import ImageQt -from PyQt6 import QtCore -import sys -import os -import math -import logging -from .. import core - - -log = logging.getLogger("AVP.Toolkit.Frame") - - -class FramePainter(QtGui.QPainter): - """ - A QPainter for a blank frame, which can be converted into a - Pillow image with finalize() - """ - - def __init__(self, width, height): - image = BlankFrame(width, height) - log.debug("Creating QImage from PIL image object") - self.image = ImageQt(image) - super().__init__(self.image) - - def setPen(self, penStyle): - if type(penStyle) is tuple: - super().setPen(PaintColor(*penStyle)) - else: - super().setPen(penStyle) - - def finalize(self): - log.verbose("Finalizing FramePainter") - buffer = QtCore.QBuffer() - buffer.open(QtCore.QBuffer.OpenModeFlag.ReadWrite) - self.image.save(buffer, "PNG") - import io - - frame = Image.open(io.BytesIO(buffer.data())) - buffer.close() - self.end() - return frame - imBytes = self.image.bits().asstring(self.image.byteCount()) - frame = Image.frombytes( - "RGBA", (self.image.width(), self.image.height()), imBytes - ) - self.end() - return frame - - -class PaintColor(QtGui.QColor): - """ - Subclass of QtGui.QColor with an added scale() method - Previously this class reversed the painter colour to solve - hardware issues related to endianness, - but Qt appears to deal with this itself nowadays - """ - - def __init__(self, r, g, b, a=255): - super().__init__(r, g, b, a) - - -def scale(scalePercent, width, height, returntype=None): - width = (float(width) / 100.0) * float(scalePercent) - height = (float(height) / 100.0) * float(scalePercent) - if returntype == str: - return (str(math.ceil(width)), str(math.ceil(height))) - elif returntype == int: - return (math.ceil(width), math.ceil(height)) - else: - return (width, height) - - -def defaultSize(framefunc): - """Makes width/height arguments optional""" - - def decorator(*args): - if len(args) < 2: - newArgs = list(args) - if len(args) == 0 or len(args) == 1: - height = int(core.Core.settings.value("outputHeight")) - newArgs.append(height) - if len(args) == 0: - width = int(core.Core.settings.value("outputWidth")) - newArgs.insert(0, width) - args = tuple(newArgs) - return framefunc(*args) - - return decorator - - -def FloodFrame(width, height, RgbaTuple): - return Image.new("RGBA", (width, height), RgbaTuple) - - -@defaultSize -def BlankFrame(width, height): - """The base frame used by each component to start drawing.""" - return FloodFrame(width, height, (0, 0, 0, 0)) - - -@defaultSize -def Checkerboard(width, height): - """ - A checkerboard to represent transparency to the user. - """ - # TODO: Would be cool to generate this image with numpy instead. - log.debug("Creating new %s*%s checkerboard" % (width, height)) - image = FloodFrame(1920, 1080, (0, 0, 0, 0)) - image.paste(Image.open(os.path.join(core.Core.wd, "gui", "background.png")), (0, 0)) - image = image.resize((width, height)) - return image |
