From 669756b391d26661cf2e2a97a304e73343ef6655 Mon Sep 17 00:00:00 2001 From: tassaron Date: Sun, 11 Jan 2026 14:29:58 -0500 Subject: update to Qt 6 and Pillow 12 and yeah, I accidentally ran black on the codebase. I don't want to spend more free time fixing that. All of these changes are simple renames or removals, nothing too major. --- src/toolkit/common.py | 87 +++++----- src/toolkit/ffmpeg.py | 455 ++++++++++++++++++++++++++++---------------------- src/toolkit/frame.py | 69 ++++---- 3 files changed, 338 insertions(+), 273 deletions(-) (limited to 'src/toolkit') diff --git a/src/toolkit/common.py b/src/toolkit/common.py index 2e800eb..e35aba2 100644 --- a/src/toolkit/common.py +++ b/src/toolkit/common.py @@ -1,7 +1,8 @@ -''' - Common functions -''' -from PyQt5 import QtWidgets +""" +Common functions +""" + +from PyQt6 import QtWidgets import string import os import sys @@ -11,43 +12,38 @@ from copy import copy from collections import OrderedDict -log = logging.getLogger('AVP.Toolkit.Common') +log = logging.getLogger("AVP.Toolkit.Common") class blockSignals: - ''' - Context manager to temporarily block list of QtWidgets from updating, - and guarantee restoring the previous state afterwards. - ''' + """ + Context manager to temporarily block list of QtWidgets from updating, + and guarantee restoring the previous state afterwards. + """ + def __init__(self, widgets): if type(widgets) is dict: self.widgets = concatDictVals(widgets) else: - self.widgets = ( - widgets if hasattr(widgets, '__iter__') - else [widgets] - ) + self.widgets = widgets if hasattr(widgets, "__iter__") else [widgets] def __enter__(self): log.verbose( - 'Blocking signals for %s', - ", ".join([ - str(w.__class__.__name__) for w in self.widgets - ]) + "Blocking signals for %s", + ", ".join([str(w.__class__.__name__) for w in self.widgets]), ) self.oldStates = [w.signalsBlocked() for w in self.widgets] for w in self.widgets: w.blockSignals(True) def __exit__(self, *args): - log.verbose( - 'Resetting blockSignals to %s', str(bool(sum(self.oldStates)))) + log.verbose("Resetting blockSignals to %s", str(bool(sum(self.oldStates)))) for w, state in zip(self.widgets, self.oldStates): w.blockSignals(state) def concatDictVals(d): - '''Concatenates all values in given dict into one list.''' + """Concatenates all values in given dict into one list.""" key, value = d.popitem() d[key] = value final = copy(value) @@ -60,22 +56,22 @@ def concatDictVals(d): def badName(name): - '''Returns whether a name contains non-alphanumeric chars''' + """Returns whether a name contains non-alphanumeric chars""" return any([letter in string.punctuation for letter in name]) def alphabetizeDict(dictionary): - '''Alphabetizes a dict into OrderedDict ''' + """Alphabetizes a dict into OrderedDict""" return OrderedDict(sorted(dictionary.items(), key=lambda t: t[0])) def presetToString(dictionary): - '''Returns string repr of a preset''' + """Returns string repr of a preset""" return repr(alphabetizeDict(dictionary)) def presetFromString(string): - '''Turns a string repr of OrderedDict into a regular dict''' + """Turns a string repr of OrderedDict into a regular dict""" return dict(eval(string)) @@ -86,19 +82,21 @@ def appendUppercase(lst): def pipeWrapper(func): - '''A decorator to insert proper kwargs into Popen objects.''' + """A decorator to insert proper kwargs into Popen objects.""" + def pipeWrapper(commandList, **kwargs): - if sys.platform == 'win32': + if sys.platform == "win32": # Stop CMD window from appearing on Windows startupinfo = subprocess.STARTUPINFO() startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW - kwargs['startupinfo'] = startupinfo + kwargs["startupinfo"] = startupinfo - if 'bufsize' not in kwargs: - kwargs['bufsize'] = 10**8 - if 'stdin' not in kwargs: - kwargs['stdin'] = subprocess.DEVNULL + if "bufsize" not in kwargs: + kwargs["bufsize"] = 10**8 + if "stdin" not in kwargs: + kwargs["stdin"] = subprocess.DEVNULL return func(commandList, **kwargs) + return pipeWrapper @@ -113,6 +111,7 @@ def disableWhenEncoding(func): return else: return func(self, *args, **kwargs) + return decorator @@ -122,13 +121,14 @@ def disableWhenOpeningProject(func): return else: return func(self, *args, **kwargs) + return decorator def rgbFromString(string): - '''Turns an RGB string like "255, 255, 255" into a tuple''' + """Turns an RGB string like "255, 255, 255" into a tuple""" try: - tup = tuple([int(i) for i in string.split(',')]) + tup = tuple([int(i) for i in string.split(",")]) if len(tup) != 3: raise ValueError for i in tup: @@ -141,42 +141,42 @@ def rgbFromString(string): def formatTraceback(tb=None): import traceback + if tb is None: import sys + tb = sys.exc_info()[2] - return 'Traceback:\n%s' % "\n".join(traceback.format_tb(tb)) + return "Traceback:\n%s" % "\n".join(traceback.format_tb(tb)) def connectWidget(widget, func): if type(widget) == QtWidgets.QLineEdit: widget.textChanged.connect(func) - elif type(widget) == QtWidgets.QSpinBox \ - or type(widget) == QtWidgets.QDoubleSpinBox: + elif type(widget) == QtWidgets.QSpinBox or type(widget) == QtWidgets.QDoubleSpinBox: widget.valueChanged.connect(func) elif type(widget) == QtWidgets.QCheckBox: widget.stateChanged.connect(func) elif type(widget) == QtWidgets.QComboBox: widget.currentIndexChanged.connect(func) else: - log.warning('Failed to connect %s ', str(widget.__class__.__name__)) + log.warning("Failed to connect %s ", str(widget.__class__.__name__)) return False return True def setWidgetValue(widget, val): - '''Generic setValue method for use with any typical QtWidget''' - log.verbose('Setting %s to %s' % (str(widget.__class__.__name__), val)) + """Generic setValue method for use with any typical QtWidget""" + log.verbose("Setting %s to %s" % (str(widget.__class__.__name__), val)) if type(widget) == QtWidgets.QLineEdit: widget.setText(val) - elif type(widget) == QtWidgets.QSpinBox \ - or type(widget) == QtWidgets.QDoubleSpinBox: + elif type(widget) == QtWidgets.QSpinBox or type(widget) == QtWidgets.QDoubleSpinBox: widget.setValue(val) elif type(widget) == QtWidgets.QCheckBox: widget.setChecked(val) elif type(widget) == QtWidgets.QComboBox: widget.setCurrentIndex(val) else: - log.warning('Failed to set %s ', str(widget.__class__.__name__)) + log.warning("Failed to set %s ", str(widget.__class__.__name__)) return False return True @@ -184,8 +184,7 @@ def setWidgetValue(widget, val): def getWidgetValue(widget): if type(widget) == QtWidgets.QLineEdit: return widget.text() - elif type(widget) == QtWidgets.QSpinBox \ - or type(widget) == QtWidgets.QDoubleSpinBox: + elif type(widget) == QtWidgets.QSpinBox or type(widget) == QtWidgets.QDoubleSpinBox: return widget.value() elif type(widget) == QtWidgets.QCheckBox: return widget.isChecked() diff --git a/src/toolkit/ffmpeg.py b/src/toolkit/ffmpeg.py index ff06469..5aedff3 100644 --- a/src/toolkit/ffmpeg.py +++ b/src/toolkit/ffmpeg.py @@ -1,6 +1,7 @@ -''' - Tools for using ffmpeg -''' +""" +Tools for using ffmpeg +""" + import numpy import sys import os @@ -14,67 +15,74 @@ from .. import core from .common import checkOutput, pipeWrapper -log = logging.getLogger('AVP.Toolkit.Ffmpeg') +log = logging.getLogger("AVP.Toolkit.Ffmpeg") class FfmpegVideo: - '''Opens a pipe to ffmpeg and stores a buffer of raw video frames.''' + """Opens a pipe to ffmpeg and stores a buffer of raw video frames.""" # error from the thread used to fill the buffer threadError = None def __init__(self, **kwargs): mandatoryArgs = [ - 'inputPath', - 'filter_', - 'width', - 'height', - 'frameRate', # frames per second - 'chunkSize', # number of bytes in one frame - 'parent', # mainwindow object - 'component', # component object + "inputPath", + "filter_", + "width", + "height", + "frameRate", # frames per second + "chunkSize", # number of bytes in one frame + "parent", # mainwindow object + "component", # component object ] for arg in mandatoryArgs: setattr(self, arg, kwargs[arg]) self.frameNo = -1 - self.currentFrame = 'None' + self.currentFrame = "None" self.map_ = None - if 'loopVideo' in kwargs and kwargs['loopVideo']: - self.loopValue = '-1' + if "loopVideo" in kwargs and kwargs["loopVideo"]: + self.loopValue = "-1" else: - self.loopValue = '0' - if 'filter_' in kwargs: - if kwargs['filter_'][0] != '-filter_complex': - kwargs['filter_'].insert(0, '-filter_complex') + self.loopValue = "0" + if "filter_" in kwargs: + if kwargs["filter_"][0] != "-filter_complex": + kwargs["filter_"].insert(0, "-filter_complex") else: - kwargs['filter_'] = None + kwargs["filter_"] = None self.command = [ core.Core.FFMPEG_BIN, - '-thread_queue_size', '512', - '-r', str(self.frameRate), - '-stream_loop', str(self.loopValue), - '-i', self.inputPath, - '-f', 'image2pipe', - '-pix_fmt', 'rgba', + "-thread_queue_size", + "512", + "-r", + str(self.frameRate), + "-stream_loop", + str(self.loopValue), + "-i", + self.inputPath, + "-f", + "image2pipe", + "-pix_fmt", + "rgba", ] - if type(kwargs['filter_']) is list: - self.command.extend( - kwargs['filter_'] - ) - self.command.extend([ - '-codec:v', 'rawvideo', '-', - ]) + if type(kwargs["filter_"]) is list: + self.command.extend(kwargs["filter_"]) + self.command.extend( + [ + "-codec:v", + "rawvideo", + "-", + ] + ) self.frameBuffer = PriorityQueue() self.frameBuffer.maxsize = self.frameRate self.finishedFrames = {} self.thread = threading.Thread( - target=self.fillBuffer, - name='FFmpeg Frame-Fetcher' + target=self.fillBuffer, name="FFmpeg Frame-Fetcher" ) self.thread.daemon = True self.thread.start() @@ -91,22 +99,29 @@ class FfmpegVideo: def fillBuffer(self): from ..component import ComponentError + if core.Core.logEnabled: logFilename = os.path.join( - core.Core.logDir, 'render_%s.log' % str(self.component.compPos) + core.Core.logDir, "render_%s.log" % str(self.component.compPos) ) - log.debug('Creating ffmpeg process (log at %s)', logFilename) - with open(logFilename, 'w') as logf: - logf.write(" ".join(self.command) + '\n\n') - with open(logFilename, 'a') as logf: + log.debug("Creating ffmpeg process (log at %s)", logFilename) + with open(logFilename, "w") as logf: + logf.write(" ".join(self.command) + "\n\n") + with open(logFilename, "a") as logf: self.pipe = openPipe( - self.command, stdin=subprocess.DEVNULL, - stdout=subprocess.PIPE, stderr=logf, bufsize=10**8 + self.command, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=logf, + bufsize=10**8, ) else: self.pipe = openPipe( - self.command, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL, bufsize=10**8 + self.command, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + bufsize=10**8, ) while True: @@ -117,12 +132,13 @@ class FfmpegVideo: # If we run out of frames, use the last good frame and loop. try: if len(self.currentFrame) == 0: - self.frameBuffer.put((self.frameNo-1, self.lastFrame)) + self.frameBuffer.put((self.frameNo - 1, self.lastFrame)) continue except AttributeError: FfmpegVideo.threadError = ComponentError( - self.component, 'video', - "Video seemed playable but wasn't." + self.component, + "video", + "Video seemed playable but wasn't.", ) break @@ -130,11 +146,12 @@ class FfmpegVideo: self.currentFrame = self.pipe.stdout.read(self.chunkSize) except ValueError as e: if str(e) == "PyMemoryView_FromBuffer(): info->buf must not be NULL": - log.debug("Ignored 'info->buf must not be NULL' error from FFmpeg pipe") + log.debug( + "Ignored 'info->buf must not be NULL' error from FFmpeg pipe" + ) return else: - FfmpegVideo.threadError = ComponentError( - self.component, 'video') + FfmpegVideo.threadError = ComponentError(self.component, "video") if len(self.currentFrame) != 0: self.frameBuffer.put((self.frameNo, self.currentFrame)) @@ -153,19 +170,17 @@ def closePipe(pipe): def findFfmpeg(): if sys.platform == "win32": - bin = 'ffmpeg.exe' + bin = "ffmpeg.exe" else: - bin = 'ffmpeg' + bin = "ffmpeg" - if getattr(sys, 'frozen', False): + if getattr(sys, "frozen", False): # The application is frozen bin = os.path.join(core.Core.wd, bin) with open(os.devnull, "w") as f: try: - checkOutput( - [bin, '-version'], stderr=f - ) + checkOutput([bin, "-version"], stderr=f) except (subprocess.CalledProcessError, FileNotFoundError): bin = "" @@ -173,9 +188,9 @@ def findFfmpeg(): def createFfmpegCommand(inputFile, outputFile, components, duration=-1): - ''' - Constructs the major ffmpeg command used to export the video - ''' + """ + Constructs the major ffmpeg command used to export the video + """ if duration == -1: duration = getAudioDuration(inputFile) safeDuration = "{0:.3f}".format(duration - 0.05) # used by filters @@ -183,31 +198,33 @@ def createFfmpegCommand(inputFile, outputFile, components, duration=-1): Core = core.Core # Test if user has libfdk_aac - encoders = checkOutput( - "%s -encoders -hide_banner" % Core.FFMPEG_BIN, shell=True - ) + encoders = checkOutput("%s -encoders -hide_banner" % Core.FFMPEG_BIN, shell=True) encoders = encoders.decode("utf-8") - acodec = Core.settings.value('outputAudioCodec') + acodec = Core.settings.value("outputAudioCodec") options = Core.encoderOptions - containerName = Core.settings.value('outputContainer') - vcodec = Core.settings.value('outputVideoCodec') - vbitrate = str(Core.settings.value('outputVideoBitrate'))+'k' - acodec = Core.settings.value('outputAudioCodec') - abitrate = str(Core.settings.value('outputAudioBitrate'))+'k' - - for cont in options['containers']: - if cont['name'] == containerName: - container = cont['container'] + containerName = Core.settings.value("outputContainer") + vcodec = Core.settings.value("outputVideoCodec") + vbitrate = str(Core.settings.value("outputVideoBitrate")) + "k" + acodec = Core.settings.value("outputAudioCodec") + abitrate = str(Core.settings.value("outputAudioBitrate")) + "k" + + for cont in options["containers"]: + if cont["name"] == containerName: + container = cont["container"] break - vencoders = options['video-codecs'][vcodec] - aencoders = options['audio-codecs'][acodec] + vencoders = options["video-codecs"][vcodec] + aencoders = options["audio-codecs"][acodec] def error(): nonlocal encoders, encoder - log.critical("Selected encoder (%s) is not supported by Ffmpeg. The supported encoders are: %s", encoder, encoders) + log.critical( + "Selected encoder (%s) is not supported by Ffmpeg. The supported encoders are: %s", + encoder, + encoders, + ) return [] for encoder in vencoders: @@ -226,57 +243,75 @@ def createFfmpegCommand(inputFile, outputFile, components, duration=-1): ffmpegCommand = [ Core.FFMPEG_BIN, - '-thread_queue_size', '512', - '-y', # overwrite the output file if it already exists. - + "-thread_queue_size", + "512", + "-y", # overwrite the output file if it already exists. # INPUT VIDEO - '-f', 'rawvideo', - '-vcodec', 'rawvideo', - '-s', f'{Core.settings.value("outputWidth")}x{Core.settings.value("outputHeight")}', - '-pix_fmt', 'rgba', - '-r', str(Core.settings.value('outputFrameRate')), - '-t', duration, - '-an', # the video input has no sound - '-i', '-', # the video input comes from a pipe - + "-f", + "rawvideo", + "-vcodec", + "rawvideo", + "-s", + f'{Core.settings.value("outputWidth")}x{Core.settings.value("outputHeight")}', + "-pix_fmt", + "rgba", + "-r", + str(Core.settings.value("outputFrameRate")), + "-t", + duration, + "-an", # the video input has no sound + "-i", + "-", # the video input comes from a pipe # INPUT SOUND - '-t', duration, - '-i', inputFile + "-t", + duration, + "-i", + inputFile, ] - extraAudio = [ - comp.audio for comp in components - if 'audio' in comp.properties() - ] + extraAudio = [comp.audio for comp in components if "audio" in comp.properties()] segment = createAudioFilterCommand(extraAudio, safeDuration) ffmpegCommand.extend(segment) # Map audio from the filters or the single audio input, and map video from the pipe - ffmpegCommand.extend([ - '-map', '0:v', - '-map', '[a]' if segment else '1:a', - ]) - - ffmpegCommand.extend([ - # OUTPUT - '-vcodec', vencoder, - '-acodec', aencoder, - '-b:v', vbitrate, - '-b:a', abitrate, - '-pix_fmt', Core.settings.value('outputVideoFormat'), - '-preset', Core.settings.value('outputPreset'), - '-f', container - ]) - - if acodec == 'aac': - ffmpegCommand.append('-strict') - ffmpegCommand.append('-2') + ffmpegCommand.extend( + [ + "-map", + "0:v", + "-map", + "[a]" if segment else "1:a", + ] + ) + + ffmpegCommand.extend( + [ + # OUTPUT + "-vcodec", + vencoder, + "-acodec", + aencoder, + "-b:v", + vbitrate, + "-b:a", + abitrate, + "-pix_fmt", + Core.settings.value("outputVideoFormat"), + "-preset", + Core.settings.value("outputPreset"), + "-f", + container, + ] + ) + + if acodec == "aac": + ffmpegCommand.append("-strict") + ffmpegCommand.append("-2") ffmpegCommand.append(outputFile) return ffmpegCommand def createAudioFilterCommand(extraAudio, duration): - '''Add extra inputs and any needed filters to the main ffmpeg command.''' + """Add extra inputs and any needed filters to the main ffmpeg command.""" # NOTE: Global filters are currently hard-coded here for debugging use globalFilters = 0 # increase to add global filters @@ -288,21 +323,23 @@ def createAudioFilterCommand(extraAudio, duration): extraFilters = {} for streamNo, params in enumerate(reversed(extraAudio)): extraInputFile, params = params - ffmpegCommand.extend([ - '-t', duration, - # Tell ffmpeg about shorter clips (seemingly not needed) - # streamDuration = getAudioDuration(extraInputFile) - # if streamDuration and streamDuration > float(safeDuration) - # else "{0:.3f}".format(streamDuration), - '-i', extraInputFile - ]) + ffmpegCommand.extend( + [ + "-t", + duration, + # Tell ffmpeg about shorter clips (seemingly not needed) + # streamDuration = getAudioDuration(extraInputFile) + # if streamDuration and streamDuration > float(safeDuration) + # else "{0:.3f}".format(streamDuration), + "-i", + extraInputFile, + ] + ) # Construct dataset of extra filters we'll need to add later for ffmpegFilter in params: if streamNo + 2 not in extraFilters: extraFilters[streamNo + 2] = [] - extraFilters[streamNo + 2].append(( - ffmpegFilter, params[ffmpegFilter] - )) + extraFilters[streamNo + 2].append((ffmpegFilter, params[ffmpegFilter])) # Start creating avfilters! Popen-style, so don't use semicolons; extraFilterCommand = [] @@ -318,63 +355,73 @@ def createAudioFilterCommand(extraAudio, duration): extraFilters[streamNo + 1] = [] # Also filter the primary audio track extraFilters[1] = [] - tmpInputs = { - streamNo: globalFilters - 1 - for streamNo in extraFilters - } + tmpInputs = {streamNo: globalFilters - 1 for streamNo in extraFilters} # Add the global filters! # NOTE: list length must = globalFilters, currently hardcoded if tmpInputs: - extraFilterCommand.extend([ - '[%s:a] ashowinfo [%stmp0]' % ( - str(streamNo), - str(streamNo) - ) - for streamNo in tmpInputs - ]) + extraFilterCommand.extend( + [ + "[%s:a] ashowinfo [%stmp0]" % (str(streamNo), str(streamNo)) + for streamNo in tmpInputs + ] + ) # Now add the per-stream filters! for streamNo, paramList in extraFilters.items(): for param in paramList: - source = '[%s:a]' % str(streamNo) \ - if tmpInputs[streamNo] == -1 else \ - '[%stmp%s]' % ( - str(streamNo), str(tmpInputs[streamNo]) - ) + source = ( + "[%s:a]" % str(streamNo) + if tmpInputs[streamNo] == -1 + else "[%stmp%s]" % (str(streamNo), str(tmpInputs[streamNo])) + ) tmpInputs[streamNo] = tmpInputs[streamNo] + 1 extraFilterCommand.append( - '%s %s%s [%stmp%s]' % ( - source, param[0], param[1], str(streamNo), - str(tmpInputs[streamNo]) + "%s %s%s [%stmp%s]" + % ( + source, + param[0], + param[1], + str(streamNo), + str(tmpInputs[streamNo]), ) ) # Join all the filters together and combine into 1 stream - extraFilterCommand = "; ".join(extraFilterCommand) + '; ' \ - if tmpInputs else '' - ffmpegCommand.extend([ - '-filter_complex', - extraFilterCommand + - '%s amix=inputs=%s:duration=first [a]' - % ( - "".join([ - '[%stmp%s]' % (str(i), tmpInputs[i]) - if i in extraFilters else '[%s:a]' % str(i) - for i in range(1, len(extraAudio) + 2) - ]), - str(len(extraAudio) + 1) - ), - ]) + extraFilterCommand = "; ".join(extraFilterCommand) + "; " if tmpInputs else "" + ffmpegCommand.extend( + [ + "-filter_complex", + extraFilterCommand + + "%s amix=inputs=%s:duration=first [a]" + % ( + "".join( + [ + ( + "[%stmp%s]" % (str(i), tmpInputs[i]) + if i in extraFilters + else "[%s:a]" % str(i) + ) + for i in range(1, len(extraAudio) + 2) + ] + ), + str(len(extraAudio) + 1), + ), + ] + ) return ffmpegCommand def testAudioStream(filename): - '''Test if an audio stream definitely exists''' + """Test if an audio stream definitely exists""" audioTestCommand = [ core.Core.FFMPEG_BIN, - '-i', filename, - '-vn', '-f', 'null', '-' + "-i", + filename, + "-vn", + "-f", + "null", + "-", ] try: checkOutput(audioTestCommand, stderr=subprocess.DEVNULL) @@ -385,8 +432,8 @@ def testAudioStream(filename): def getAudioDuration(filename): - '''Try to get duration of audio file as float, or False if not possible''' - command = [core.Core.FFMPEG_BIN, '-i', filename] + """Try to get duration of audio file as float, or False if not possible""" + command = [core.Core.FFMPEG_BIN, "-i", filename] try: fileInfo = checkOutput(command, stderr=subprocess.STDOUT) @@ -397,17 +444,17 @@ def getAudioDuration(filename): return False try: - info = fileInfo.decode("utf-8").split('\n') + info = fileInfo.decode("utf-8").split("\n") except UnicodeDecodeError as e: - log.error('Unicode error:', str(e)) + log.error("Unicode error:", str(e)) return False for line in info: - if 'Duration' in line: - d = line.split(',')[0] - d = d.split(' ')[3] - d = d.split(':') - duration = float(d[0])*3600 + float(d[1])*60 + float(d[2]) + if "Duration" in line: + d = line.split(",")[0] + d = d.split(" ")[3] + d = d.split(":") + duration = float(d[0]) * 3600 + float(d[1]) * 60 + float(d[2]) break else: # String not found in output @@ -416,10 +463,10 @@ def getAudioDuration(filename): def readAudioFile(filename, videoWorker): - ''' - Creates the completeAudioArray given to components - and used to draw the classic visualizer. - ''' + """ + Creates the completeAudioArray given to components + and used to draw the classic visualizer. + """ duration = getAudioDuration(filename) if not duration: log.error(f"Audio file {filename} doesn't exist or unreadable.") @@ -427,15 +474,23 @@ def readAudioFile(filename, videoWorker): command = [ core.Core.FFMPEG_BIN, - '-i', filename, - '-f', 's16le', - '-acodec', 'pcm_s16le', - '-ar', '44100', # ouput will have 44100 Hz - '-ac', '1', # mono (set to '2' for stereo) - '-'] + "-i", + filename, + "-f", + "s16le", + "-acodec", + "pcm_s16le", + "-ar", + "44100", # ouput will have 44100 Hz + "-ac", + "1", # mono (set to '2' for stereo) + "-", + ] in_pipe = openPipe( command, - stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8 + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + bufsize=10**8, ) completeAudioArray = numpy.empty(0, dtype="int16") @@ -447,18 +502,18 @@ def readAudioFile(filename, videoWorker): return # read 2 seconds of audio progress += 4 - raw_audio = in_pipe.stdout.read(88200*4) + raw_audio = in_pipe.stdout.read(88200 * 4) if len(raw_audio) == 0: break - audio_array = numpy.fromstring(raw_audio, dtype="int16") + audio_array = numpy.frombuffer(raw_audio, dtype="int16") completeAudioArray = numpy.append(completeAudioArray, audio_array) - percent = int(100*(progress/duration)) + percent = int(100 * (progress / duration)) if percent >= 100: percent = 100 if lastPercent != percent: - string = 'Loading audio file: '+str(percent)+'%' + string = "Loading audio file: " + str(percent) + "%" videoWorker.progressBarSetText.emit(string) videoWorker.progressBarUpdate.emit(percent) @@ -468,25 +523,23 @@ def readAudioFile(filename, videoWorker): in_pipe.wait() # add 0s the end - completeAudioArrayCopy = numpy.zeros( - len(completeAudioArray) + 44100, dtype="int16") - completeAudioArrayCopy[:len(completeAudioArray)] = completeAudioArray + completeAudioArrayCopy = numpy.zeros(len(completeAudioArray) + 44100, dtype="int16") + completeAudioArrayCopy[: len(completeAudioArray)] = completeAudioArray completeAudioArray = completeAudioArrayCopy return (completeAudioArray, duration) -def exampleSound( - style='white', extra='apulsator=offset_l=0.35:offset_r=0.67'): - '''Help generate an example sound for use in creating a preview''' +def exampleSound(style="white", extra="apulsator=offset_l=0.35:offset_r=0.67"): + """Help generate an example sound for use in creating a preview""" - if style == 'white': - src = '-2+random(0)' - elif style == 'freq': - src = 'sin(1000*t*PI*t)' - elif style == 'wave': - src = 'sin(random(0)*2*PI*t)*tan(random(0)*2*PI*t)' - elif style == 'stereo': - src = '0.1*sin(2*PI*(360-2.5/2)*t) | 0.1*sin(2*PI*(360+2.5/2)*t)' + if style == "white": + src = "-2+random(0)" + elif style == "freq": + src = "sin(1000*t*PI*t)" + elif style == "wave": + src = "sin(random(0)*2*PI*t)*tan(random(0)*2*PI*t)" + elif style == "stereo": + src = "0.1*sin(2*PI*(360-2.5/2)*t) | 0.1*sin(2*PI*(360+2.5/2)*t)" - return "aevalsrc='%s', %s%s" % (src, extra, ', ' if extra else '') + return "aevalsrc='%s', %s%s" % (src, extra, ", " if extra else "") diff --git a/src/toolkit/frame.py b/src/toolkit/frame.py index 520bd43..94537a6 100644 --- a/src/toolkit/frame.py +++ b/src/toolkit/frame.py @@ -1,25 +1,27 @@ -''' - Common tools for drawing compatible frames in a Component's frameRender() -''' -from PyQt5 import QtGui +""" +Common tools for drawing compatible frames in a Component's frameRender() +""" + +from PyQt6 import QtGui from PIL import Image from PIL.ImageQt import ImageQt +from PyQt6 import QtCore import sys import os import math import logging - from .. import core -log = logging.getLogger('AVP.Toolkit.Frame') +log = logging.getLogger("AVP.Toolkit.Frame") class FramePainter(QtGui.QPainter): - ''' - A QPainter for a blank frame, which can be converted into a - Pillow image with finalize() - ''' + """ + A QPainter for a blank frame, which can be converted into a + Pillow image with finalize() + """ + def __init__(self, width, height): image = BlankFrame(width, height) log.debug("Creating QImage from PIL image object") @@ -34,21 +36,33 @@ class FramePainter(QtGui.QPainter): def finalize(self): log.verbose("Finalizing FramePainter") + buffer = QtCore.QBuffer() + buffer.open(QtCore.QBuffer.OpenModeFlag.ReadWrite) + self.image.save(buffer, "PNG") + import io + + frame = Image.open(io.BytesIO(buffer.data())) + buffer.close() + self.end() + return frame imBytes = self.image.bits().asstring(self.image.byteCount()) - frame = Image.frombytes( - 'RGBA', (self.image.width(), self.image.height()), imBytes + frame = Image.frombytes( + "RGBA", (self.image.width(), self.image.height()), imBytes ) self.end() return frame class PaintColor(QtGui.QColor): - '''Reverse the painter colour if the hardware stores RGB values backward''' + """ + Subclass of QtGui.QColor with an added scale() method + Previously this class reversed the painter colour to solve + hardware issues related to endianness, + but Qt appears to deal with this itself nowadays + """ + def __init__(self, r, g, b, a=255): - if sys.byteorder == 'big': - super().__init__(r, g, b, a) - else: - super().__init__(b, g, r, a) + super().__init__(r, g, b, a) def scale(scalePercent, width, height, returntype=None): @@ -63,7 +77,8 @@ def scale(scalePercent, width, height, returntype=None): def defaultSize(framefunc): - '''Makes width/height arguments optional''' + """Makes width/height arguments optional""" + def decorator(*args): if len(args) < 2: newArgs = list(args) @@ -75,6 +90,7 @@ def defaultSize(framefunc): newArgs.insert(0, width) args = tuple(newArgs) return framefunc(*args) + return decorator @@ -84,21 +100,18 @@ def FloodFrame(width, height, RgbaTuple): @defaultSize def BlankFrame(width, height): - '''The base frame used by each component to start drawing.''' + """The base frame used by each component to start drawing.""" return FloodFrame(width, height, (0, 0, 0, 0)) @defaultSize def Checkerboard(width, height): - ''' - A checkerboard to represent transparency to the user. - TODO: Would be cool to generate this image with numpy instead. - ''' - log.debug('Creating new %s*%s checkerboard' % (width, height)) + """ + A checkerboard to represent transparency to the user. + """ + # TODO: Would be cool to generate this image with numpy instead. + log.debug("Creating new %s*%s checkerboard" % (width, height)) image = FloodFrame(1920, 1080, (0, 0, 0, 0)) - image.paste(Image.open( - os.path.join(core.Core.wd, 'gui', "background.png")), - (0, 0) - ) + image.paste(Image.open(os.path.join(core.Core.wd, "gui", "background.png")), (0, 0)) image = image.resize((width, height)) return image -- cgit v1.2.3