aboutsummaryrefslogtreecommitdiff
path: root/src/toolkit
diff options
context:
space:
mode:
Diffstat (limited to 'src/toolkit')
-rw-r--r--src/toolkit/common.py87
-rw-r--r--src/toolkit/ffmpeg.py455
-rw-r--r--src/toolkit/frame.py69
3 files changed, 338 insertions, 273 deletions
diff --git a/src/toolkit/common.py b/src/toolkit/common.py
index 2e800eb..e35aba2 100644
--- a/src/toolkit/common.py
+++ b/src/toolkit/common.py
@@ -1,7 +1,8 @@
-'''
- Common functions
-'''
-from PyQt5 import QtWidgets
+"""
+Common functions
+"""
+
+from PyQt6 import QtWidgets
import string
import os
import sys
@@ -11,43 +12,38 @@ from copy import copy
from collections import OrderedDict
-log = logging.getLogger('AVP.Toolkit.Common')
+log = logging.getLogger("AVP.Toolkit.Common")
class blockSignals:
- '''
- Context manager to temporarily block list of QtWidgets from updating,
- and guarantee restoring the previous state afterwards.
- '''
+ """
+ Context manager to temporarily block list of QtWidgets from updating,
+ and guarantee restoring the previous state afterwards.
+ """
+
def __init__(self, widgets):
if type(widgets) is dict:
self.widgets = concatDictVals(widgets)
else:
- self.widgets = (
- widgets if hasattr(widgets, '__iter__')
- else [widgets]
- )
+ self.widgets = widgets if hasattr(widgets, "__iter__") else [widgets]
def __enter__(self):
log.verbose(
- 'Blocking signals for %s',
- ", ".join([
- str(w.__class__.__name__) for w in self.widgets
- ])
+ "Blocking signals for %s",
+ ", ".join([str(w.__class__.__name__) for w in self.widgets]),
)
self.oldStates = [w.signalsBlocked() for w in self.widgets]
for w in self.widgets:
w.blockSignals(True)
def __exit__(self, *args):
- log.verbose(
- 'Resetting blockSignals to %s', str(bool(sum(self.oldStates))))
+ log.verbose("Resetting blockSignals to %s", str(bool(sum(self.oldStates))))
for w, state in zip(self.widgets, self.oldStates):
w.blockSignals(state)
def concatDictVals(d):
- '''Concatenates all values in given dict into one list.'''
+ """Concatenates all values in given dict into one list."""
key, value = d.popitem()
d[key] = value
final = copy(value)
@@ -60,22 +56,22 @@ def concatDictVals(d):
def badName(name):
- '''Returns whether a name contains non-alphanumeric chars'''
+ """Returns whether a name contains non-alphanumeric chars"""
return any([letter in string.punctuation for letter in name])
def alphabetizeDict(dictionary):
- '''Alphabetizes a dict into OrderedDict '''
+ """Alphabetizes a dict into OrderedDict"""
return OrderedDict(sorted(dictionary.items(), key=lambda t: t[0]))
def presetToString(dictionary):
- '''Returns string repr of a preset'''
+ """Returns string repr of a preset"""
return repr(alphabetizeDict(dictionary))
def presetFromString(string):
- '''Turns a string repr of OrderedDict into a regular dict'''
+ """Turns a string repr of OrderedDict into a regular dict"""
return dict(eval(string))
@@ -86,19 +82,21 @@ def appendUppercase(lst):
def pipeWrapper(func):
- '''A decorator to insert proper kwargs into Popen objects.'''
+ """A decorator to insert proper kwargs into Popen objects."""
+
def pipeWrapper(commandList, **kwargs):
- if sys.platform == 'win32':
+ if sys.platform == "win32":
# Stop CMD window from appearing on Windows
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
- kwargs['startupinfo'] = startupinfo
+ kwargs["startupinfo"] = startupinfo
- if 'bufsize' not in kwargs:
- kwargs['bufsize'] = 10**8
- if 'stdin' not in kwargs:
- kwargs['stdin'] = subprocess.DEVNULL
+ if "bufsize" not in kwargs:
+ kwargs["bufsize"] = 10**8
+ if "stdin" not in kwargs:
+ kwargs["stdin"] = subprocess.DEVNULL
return func(commandList, **kwargs)
+
return pipeWrapper
@@ -113,6 +111,7 @@ def disableWhenEncoding(func):
return
else:
return func(self, *args, **kwargs)
+
return decorator
@@ -122,13 +121,14 @@ def disableWhenOpeningProject(func):
return
else:
return func(self, *args, **kwargs)
+
return decorator
def rgbFromString(string):
- '''Turns an RGB string like "255, 255, 255" into a tuple'''
+ """Turns an RGB string like "255, 255, 255" into a tuple"""
try:
- tup = tuple([int(i) for i in string.split(',')])
+ tup = tuple([int(i) for i in string.split(",")])
if len(tup) != 3:
raise ValueError
for i in tup:
@@ -141,42 +141,42 @@ def rgbFromString(string):
def formatTraceback(tb=None):
import traceback
+
if tb is None:
import sys
+
tb = sys.exc_info()[2]
- return 'Traceback:\n%s' % "\n".join(traceback.format_tb(tb))
+ return "Traceback:\n%s" % "\n".join(traceback.format_tb(tb))
def connectWidget(widget, func):
if type(widget) == QtWidgets.QLineEdit:
widget.textChanged.connect(func)
- elif type(widget) == QtWidgets.QSpinBox \
- or type(widget) == QtWidgets.QDoubleSpinBox:
+ elif type(widget) == QtWidgets.QSpinBox or type(widget) == QtWidgets.QDoubleSpinBox:
widget.valueChanged.connect(func)
elif type(widget) == QtWidgets.QCheckBox:
widget.stateChanged.connect(func)
elif type(widget) == QtWidgets.QComboBox:
widget.currentIndexChanged.connect(func)
else:
- log.warning('Failed to connect %s ', str(widget.__class__.__name__))
+ log.warning("Failed to connect %s ", str(widget.__class__.__name__))
return False
return True
def setWidgetValue(widget, val):
- '''Generic setValue method for use with any typical QtWidget'''
- log.verbose('Setting %s to %s' % (str(widget.__class__.__name__), val))
+ """Generic setValue method for use with any typical QtWidget"""
+ log.verbose("Setting %s to %s" % (str(widget.__class__.__name__), val))
if type(widget) == QtWidgets.QLineEdit:
widget.setText(val)
- elif type(widget) == QtWidgets.QSpinBox \
- or type(widget) == QtWidgets.QDoubleSpinBox:
+ elif type(widget) == QtWidgets.QSpinBox or type(widget) == QtWidgets.QDoubleSpinBox:
widget.setValue(val)
elif type(widget) == QtWidgets.QCheckBox:
widget.setChecked(val)
elif type(widget) == QtWidgets.QComboBox:
widget.setCurrentIndex(val)
else:
- log.warning('Failed to set %s ', str(widget.__class__.__name__))
+ log.warning("Failed to set %s ", str(widget.__class__.__name__))
return False
return True
@@ -184,8 +184,7 @@ def setWidgetValue(widget, val):
def getWidgetValue(widget):
if type(widget) == QtWidgets.QLineEdit:
return widget.text()
- elif type(widget) == QtWidgets.QSpinBox \
- or type(widget) == QtWidgets.QDoubleSpinBox:
+ elif type(widget) == QtWidgets.QSpinBox or type(widget) == QtWidgets.QDoubleSpinBox:
return widget.value()
elif type(widget) == QtWidgets.QCheckBox:
return widget.isChecked()
diff --git a/src/toolkit/ffmpeg.py b/src/toolkit/ffmpeg.py
index ff06469..5aedff3 100644
--- a/src/toolkit/ffmpeg.py
+++ b/src/toolkit/ffmpeg.py
@@ -1,6 +1,7 @@
-'''
- Tools for using ffmpeg
-'''
+"""
+Tools for using ffmpeg
+"""
+
import numpy
import sys
import os
@@ -14,67 +15,74 @@ from .. import core
from .common import checkOutput, pipeWrapper
-log = logging.getLogger('AVP.Toolkit.Ffmpeg')
+log = logging.getLogger("AVP.Toolkit.Ffmpeg")
class FfmpegVideo:
- '''Opens a pipe to ffmpeg and stores a buffer of raw video frames.'''
+ """Opens a pipe to ffmpeg and stores a buffer of raw video frames."""
# error from the thread used to fill the buffer
threadError = None
def __init__(self, **kwargs):
mandatoryArgs = [
- 'inputPath',
- 'filter_',
- 'width',
- 'height',
- 'frameRate', # frames per second
- 'chunkSize', # number of bytes in one frame
- 'parent', # mainwindow object
- 'component', # component object
+ "inputPath",
+ "filter_",
+ "width",
+ "height",
+ "frameRate", # frames per second
+ "chunkSize", # number of bytes in one frame
+ "parent", # mainwindow object
+ "component", # component object
]
for arg in mandatoryArgs:
setattr(self, arg, kwargs[arg])
self.frameNo = -1
- self.currentFrame = 'None'
+ self.currentFrame = "None"
self.map_ = None
- if 'loopVideo' in kwargs and kwargs['loopVideo']:
- self.loopValue = '-1'
+ if "loopVideo" in kwargs and kwargs["loopVideo"]:
+ self.loopValue = "-1"
else:
- self.loopValue = '0'
- if 'filter_' in kwargs:
- if kwargs['filter_'][0] != '-filter_complex':
- kwargs['filter_'].insert(0, '-filter_complex')
+ self.loopValue = "0"
+ if "filter_" in kwargs:
+ if kwargs["filter_"][0] != "-filter_complex":
+ kwargs["filter_"].insert(0, "-filter_complex")
else:
- kwargs['filter_'] = None
+ kwargs["filter_"] = None
self.command = [
core.Core.FFMPEG_BIN,
- '-thread_queue_size', '512',
- '-r', str(self.frameRate),
- '-stream_loop', str(self.loopValue),
- '-i', self.inputPath,
- '-f', 'image2pipe',
- '-pix_fmt', 'rgba',
+ "-thread_queue_size",
+ "512",
+ "-r",
+ str(self.frameRate),
+ "-stream_loop",
+ str(self.loopValue),
+ "-i",
+ self.inputPath,
+ "-f",
+ "image2pipe",
+ "-pix_fmt",
+ "rgba",
]
- if type(kwargs['filter_']) is list:
- self.command.extend(
- kwargs['filter_']
- )
- self.command.extend([
- '-codec:v', 'rawvideo', '-',
- ])
+ if type(kwargs["filter_"]) is list:
+ self.command.extend(kwargs["filter_"])
+ self.command.extend(
+ [
+ "-codec:v",
+ "rawvideo",
+ "-",
+ ]
+ )
self.frameBuffer = PriorityQueue()
self.frameBuffer.maxsize = self.frameRate
self.finishedFrames = {}
self.thread = threading.Thread(
- target=self.fillBuffer,
- name='FFmpeg Frame-Fetcher'
+ target=self.fillBuffer, name="FFmpeg Frame-Fetcher"
)
self.thread.daemon = True
self.thread.start()
@@ -91,22 +99,29 @@ class FfmpegVideo:
def fillBuffer(self):
from ..component import ComponentError
+
if core.Core.logEnabled:
logFilename = os.path.join(
- core.Core.logDir, 'render_%s.log' % str(self.component.compPos)
+ core.Core.logDir, "render_%s.log" % str(self.component.compPos)
)
- log.debug('Creating ffmpeg process (log at %s)', logFilename)
- with open(logFilename, 'w') as logf:
- logf.write(" ".join(self.command) + '\n\n')
- with open(logFilename, 'a') as logf:
+ log.debug("Creating ffmpeg process (log at %s)", logFilename)
+ with open(logFilename, "w") as logf:
+ logf.write(" ".join(self.command) + "\n\n")
+ with open(logFilename, "a") as logf:
self.pipe = openPipe(
- self.command, stdin=subprocess.DEVNULL,
- stdout=subprocess.PIPE, stderr=logf, bufsize=10**8
+ self.command,
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.PIPE,
+ stderr=logf,
+ bufsize=10**8,
)
else:
self.pipe = openPipe(
- self.command, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE,
- stderr=subprocess.DEVNULL, bufsize=10**8
+ self.command,
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL,
+ bufsize=10**8,
)
while True:
@@ -117,12 +132,13 @@ class FfmpegVideo:
# If we run out of frames, use the last good frame and loop.
try:
if len(self.currentFrame) == 0:
- self.frameBuffer.put((self.frameNo-1, self.lastFrame))
+ self.frameBuffer.put((self.frameNo - 1, self.lastFrame))
continue
except AttributeError:
FfmpegVideo.threadError = ComponentError(
- self.component, 'video',
- "Video seemed playable but wasn't."
+ self.component,
+ "video",
+ "Video seemed playable but wasn't.",
)
break
@@ -130,11 +146,12 @@ class FfmpegVideo:
self.currentFrame = self.pipe.stdout.read(self.chunkSize)
except ValueError as e:
if str(e) == "PyMemoryView_FromBuffer(): info->buf must not be NULL":
- log.debug("Ignored 'info->buf must not be NULL' error from FFmpeg pipe")
+ log.debug(
+ "Ignored 'info->buf must not be NULL' error from FFmpeg pipe"
+ )
return
else:
- FfmpegVideo.threadError = ComponentError(
- self.component, 'video')
+ FfmpegVideo.threadError = ComponentError(self.component, "video")
if len(self.currentFrame) != 0:
self.frameBuffer.put((self.frameNo, self.currentFrame))
@@ -153,19 +170,17 @@ def closePipe(pipe):
def findFfmpeg():
if sys.platform == "win32":
- bin = 'ffmpeg.exe'
+ bin = "ffmpeg.exe"
else:
- bin = 'ffmpeg'
+ bin = "ffmpeg"
- if getattr(sys, 'frozen', False):
+ if getattr(sys, "frozen", False):
# The application is frozen
bin = os.path.join(core.Core.wd, bin)
with open(os.devnull, "w") as f:
try:
- checkOutput(
- [bin, '-version'], stderr=f
- )
+ checkOutput([bin, "-version"], stderr=f)
except (subprocess.CalledProcessError, FileNotFoundError):
bin = ""
@@ -173,9 +188,9 @@ def findFfmpeg():
def createFfmpegCommand(inputFile, outputFile, components, duration=-1):
- '''
- Constructs the major ffmpeg command used to export the video
- '''
+ """
+ Constructs the major ffmpeg command used to export the video
+ """
if duration == -1:
duration = getAudioDuration(inputFile)
safeDuration = "{0:.3f}".format(duration - 0.05) # used by filters
@@ -183,31 +198,33 @@ def createFfmpegCommand(inputFile, outputFile, components, duration=-1):
Core = core.Core
# Test if user has libfdk_aac
- encoders = checkOutput(
- "%s -encoders -hide_banner" % Core.FFMPEG_BIN, shell=True
- )
+ encoders = checkOutput("%s -encoders -hide_banner" % Core.FFMPEG_BIN, shell=True)
encoders = encoders.decode("utf-8")
- acodec = Core.settings.value('outputAudioCodec')
+ acodec = Core.settings.value("outputAudioCodec")
options = Core.encoderOptions
- containerName = Core.settings.value('outputContainer')
- vcodec = Core.settings.value('outputVideoCodec')
- vbitrate = str(Core.settings.value('outputVideoBitrate'))+'k'
- acodec = Core.settings.value('outputAudioCodec')
- abitrate = str(Core.settings.value('outputAudioBitrate'))+'k'
-
- for cont in options['containers']:
- if cont['name'] == containerName:
- container = cont['container']
+ containerName = Core.settings.value("outputContainer")
+ vcodec = Core.settings.value("outputVideoCodec")
+ vbitrate = str(Core.settings.value("outputVideoBitrate")) + "k"
+ acodec = Core.settings.value("outputAudioCodec")
+ abitrate = str(Core.settings.value("outputAudioBitrate")) + "k"
+
+ for cont in options["containers"]:
+ if cont["name"] == containerName:
+ container = cont["container"]
break
- vencoders = options['video-codecs'][vcodec]
- aencoders = options['audio-codecs'][acodec]
+ vencoders = options["video-codecs"][vcodec]
+ aencoders = options["audio-codecs"][acodec]
def error():
nonlocal encoders, encoder
- log.critical("Selected encoder (%s) is not supported by Ffmpeg. The supported encoders are: %s", encoder, encoders)
+ log.critical(
+ "Selected encoder (%s) is not supported by Ffmpeg. The supported encoders are: %s",
+ encoder,
+ encoders,
+ )
return []
for encoder in vencoders:
@@ -226,57 +243,75 @@ def createFfmpegCommand(inputFile, outputFile, components, duration=-1):
ffmpegCommand = [
Core.FFMPEG_BIN,
- '-thread_queue_size', '512',
- '-y', # overwrite the output file if it already exists.
-
+ "-thread_queue_size",
+ "512",
+ "-y", # overwrite the output file if it already exists.
# INPUT VIDEO
- '-f', 'rawvideo',
- '-vcodec', 'rawvideo',
- '-s', f'{Core.settings.value("outputWidth")}x{Core.settings.value("outputHeight")}',
- '-pix_fmt', 'rgba',
- '-r', str(Core.settings.value('outputFrameRate')),
- '-t', duration,
- '-an', # the video input has no sound
- '-i', '-', # the video input comes from a pipe
-
+ "-f",
+ "rawvideo",
+ "-vcodec",
+ "rawvideo",
+ "-s",
+ f'{Core.settings.value("outputWidth")}x{Core.settings.value("outputHeight")}',
+ "-pix_fmt",
+ "rgba",
+ "-r",
+ str(Core.settings.value("outputFrameRate")),
+ "-t",
+ duration,
+ "-an", # the video input has no sound
+ "-i",
+ "-", # the video input comes from a pipe
# INPUT SOUND
- '-t', duration,
- '-i', inputFile
+ "-t",
+ duration,
+ "-i",
+ inputFile,
]
- extraAudio = [
- comp.audio for comp in components
- if 'audio' in comp.properties()
- ]
+ extraAudio = [comp.audio for comp in components if "audio" in comp.properties()]
segment = createAudioFilterCommand(extraAudio, safeDuration)
ffmpegCommand.extend(segment)
# Map audio from the filters or the single audio input, and map video from the pipe
- ffmpegCommand.extend([
- '-map', '0:v',
- '-map', '[a]' if segment else '1:a',
- ])
-
- ffmpegCommand.extend([
- # OUTPUT
- '-vcodec', vencoder,
- '-acodec', aencoder,
- '-b:v', vbitrate,
- '-b:a', abitrate,
- '-pix_fmt', Core.settings.value('outputVideoFormat'),
- '-preset', Core.settings.value('outputPreset'),
- '-f', container
- ])
-
- if acodec == 'aac':
- ffmpegCommand.append('-strict')
- ffmpegCommand.append('-2')
+ ffmpegCommand.extend(
+ [
+ "-map",
+ "0:v",
+ "-map",
+ "[a]" if segment else "1:a",
+ ]
+ )
+
+ ffmpegCommand.extend(
+ [
+ # OUTPUT
+ "-vcodec",
+ vencoder,
+ "-acodec",
+ aencoder,
+ "-b:v",
+ vbitrate,
+ "-b:a",
+ abitrate,
+ "-pix_fmt",
+ Core.settings.value("outputVideoFormat"),
+ "-preset",
+ Core.settings.value("outputPreset"),
+ "-f",
+ container,
+ ]
+ )
+
+ if acodec == "aac":
+ ffmpegCommand.append("-strict")
+ ffmpegCommand.append("-2")
ffmpegCommand.append(outputFile)
return ffmpegCommand
def createAudioFilterCommand(extraAudio, duration):
- '''Add extra inputs and any needed filters to the main ffmpeg command.'''
+ """Add extra inputs and any needed filters to the main ffmpeg command."""
# NOTE: Global filters are currently hard-coded here for debugging use
globalFilters = 0 # increase to add global filters
@@ -288,21 +323,23 @@ def createAudioFilterCommand(extraAudio, duration):
extraFilters = {}
for streamNo, params in enumerate(reversed(extraAudio)):
extraInputFile, params = params
- ffmpegCommand.extend([
- '-t', duration,
- # Tell ffmpeg about shorter clips (seemingly not needed)
- # streamDuration = getAudioDuration(extraInputFile)
- # if streamDuration and streamDuration > float(safeDuration)
- # else "{0:.3f}".format(streamDuration),
- '-i', extraInputFile
- ])
+ ffmpegCommand.extend(
+ [
+ "-t",
+ duration,
+ # Tell ffmpeg about shorter clips (seemingly not needed)
+ # streamDuration = getAudioDuration(extraInputFile)
+ # if streamDuration and streamDuration > float(safeDuration)
+ # else "{0:.3f}".format(streamDuration),
+ "-i",
+ extraInputFile,
+ ]
+ )
# Construct dataset of extra filters we'll need to add later
for ffmpegFilter in params:
if streamNo + 2 not in extraFilters:
extraFilters[streamNo + 2] = []
- extraFilters[streamNo + 2].append((
- ffmpegFilter, params[ffmpegFilter]
- ))
+ extraFilters[streamNo + 2].append((ffmpegFilter, params[ffmpegFilter]))
# Start creating avfilters! Popen-style, so don't use semicolons;
extraFilterCommand = []
@@ -318,63 +355,73 @@ def createAudioFilterCommand(extraAudio, duration):
extraFilters[streamNo + 1] = []
# Also filter the primary audio track
extraFilters[1] = []
- tmpInputs = {
- streamNo: globalFilters - 1
- for streamNo in extraFilters
- }
+ tmpInputs = {streamNo: globalFilters - 1 for streamNo in extraFilters}
# Add the global filters!
# NOTE: list length must = globalFilters, currently hardcoded
if tmpInputs:
- extraFilterCommand.extend([
- '[%s:a] ashowinfo [%stmp0]' % (
- str(streamNo),
- str(streamNo)
- )
- for streamNo in tmpInputs
- ])
+ extraFilterCommand.extend(
+ [
+ "[%s:a] ashowinfo [%stmp0]" % (str(streamNo), str(streamNo))
+ for streamNo in tmpInputs
+ ]
+ )
# Now add the per-stream filters!
for streamNo, paramList in extraFilters.items():
for param in paramList:
- source = '[%s:a]' % str(streamNo) \
- if tmpInputs[streamNo] == -1 else \
- '[%stmp%s]' % (
- str(streamNo), str(tmpInputs[streamNo])
- )
+ source = (
+ "[%s:a]" % str(streamNo)
+ if tmpInputs[streamNo] == -1
+ else "[%stmp%s]" % (str(streamNo), str(tmpInputs[streamNo]))
+ )
tmpInputs[streamNo] = tmpInputs[streamNo] + 1
extraFilterCommand.append(
- '%s %s%s [%stmp%s]' % (
- source, param[0], param[1], str(streamNo),
- str(tmpInputs[streamNo])
+ "%s %s%s [%stmp%s]"
+ % (
+ source,
+ param[0],
+ param[1],
+ str(streamNo),
+ str(tmpInputs[streamNo]),
)
)
# Join all the filters together and combine into 1 stream
- extraFilterCommand = "; ".join(extraFilterCommand) + '; ' \
- if tmpInputs else ''
- ffmpegCommand.extend([
- '-filter_complex',
- extraFilterCommand +
- '%s amix=inputs=%s:duration=first [a]'
- % (
- "".join([
- '[%stmp%s]' % (str(i), tmpInputs[i])
- if i in extraFilters else '[%s:a]' % str(i)
- for i in range(1, len(extraAudio) + 2)
- ]),
- str(len(extraAudio) + 1)
- ),
- ])
+ extraFilterCommand = "; ".join(extraFilterCommand) + "; " if tmpInputs else ""
+ ffmpegCommand.extend(
+ [
+ "-filter_complex",
+ extraFilterCommand
+ + "%s amix=inputs=%s:duration=first [a]"
+ % (
+ "".join(
+ [
+ (
+ "[%stmp%s]" % (str(i), tmpInputs[i])
+ if i in extraFilters
+ else "[%s:a]" % str(i)
+ )
+ for i in range(1, len(extraAudio) + 2)
+ ]
+ ),
+ str(len(extraAudio) + 1),
+ ),
+ ]
+ )
return ffmpegCommand
def testAudioStream(filename):
- '''Test if an audio stream definitely exists'''
+ """Test if an audio stream definitely exists"""
audioTestCommand = [
core.Core.FFMPEG_BIN,
- '-i', filename,
- '-vn', '-f', 'null', '-'
+ "-i",
+ filename,
+ "-vn",
+ "-f",
+ "null",
+ "-",
]
try:
checkOutput(audioTestCommand, stderr=subprocess.DEVNULL)
@@ -385,8 +432,8 @@ def testAudioStream(filename):
def getAudioDuration(filename):
- '''Try to get duration of audio file as float, or False if not possible'''
- command = [core.Core.FFMPEG_BIN, '-i', filename]
+ """Try to get duration of audio file as float, or False if not possible"""
+ command = [core.Core.FFMPEG_BIN, "-i", filename]
try:
fileInfo = checkOutput(command, stderr=subprocess.STDOUT)
@@ -397,17 +444,17 @@ def getAudioDuration(filename):
return False
try:
- info = fileInfo.decode("utf-8").split('\n')
+ info = fileInfo.decode("utf-8").split("\n")
except UnicodeDecodeError as e:
- log.error('Unicode error:', str(e))
+ log.error("Unicode error:", str(e))
return False
for line in info:
- if 'Duration' in line:
- d = line.split(',')[0]
- d = d.split(' ')[3]
- d = d.split(':')
- duration = float(d[0])*3600 + float(d[1])*60 + float(d[2])
+ if "Duration" in line:
+ d = line.split(",")[0]
+ d = d.split(" ")[3]
+ d = d.split(":")
+ duration = float(d[0]) * 3600 + float(d[1]) * 60 + float(d[2])
break
else:
# String not found in output
@@ -416,10 +463,10 @@ def getAudioDuration(filename):
def readAudioFile(filename, videoWorker):
- '''
- Creates the completeAudioArray given to components
- and used to draw the classic visualizer.
- '''
+ """
+ Creates the completeAudioArray given to components
+ and used to draw the classic visualizer.
+ """
duration = getAudioDuration(filename)
if not duration:
log.error(f"Audio file {filename} doesn't exist or unreadable.")
@@ -427,15 +474,23 @@ def readAudioFile(filename, videoWorker):
command = [
core.Core.FFMPEG_BIN,
- '-i', filename,
- '-f', 's16le',
- '-acodec', 'pcm_s16le',
- '-ar', '44100', # ouput will have 44100 Hz
- '-ac', '1', # mono (set to '2' for stereo)
- '-']
+ "-i",
+ filename,
+ "-f",
+ "s16le",
+ "-acodec",
+ "pcm_s16le",
+ "-ar",
+ "44100", # ouput will have 44100 Hz
+ "-ac",
+ "1", # mono (set to '2' for stereo)
+ "-",
+ ]
in_pipe = openPipe(
command,
- stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8
+ stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL,
+ bufsize=10**8,
)
completeAudioArray = numpy.empty(0, dtype="int16")
@@ -447,18 +502,18 @@ def readAudioFile(filename, videoWorker):
return
# read 2 seconds of audio
progress += 4
- raw_audio = in_pipe.stdout.read(88200*4)
+ raw_audio = in_pipe.stdout.read(88200 * 4)
if len(raw_audio) == 0:
break
- audio_array = numpy.fromstring(raw_audio, dtype="int16")
+ audio_array = numpy.frombuffer(raw_audio, dtype="int16")
completeAudioArray = numpy.append(completeAudioArray, audio_array)
- percent = int(100*(progress/duration))
+ percent = int(100 * (progress / duration))
if percent >= 100:
percent = 100
if lastPercent != percent:
- string = 'Loading audio file: '+str(percent)+'%'
+ string = "Loading audio file: " + str(percent) + "%"
videoWorker.progressBarSetText.emit(string)
videoWorker.progressBarUpdate.emit(percent)
@@ -468,25 +523,23 @@ def readAudioFile(filename, videoWorker):
in_pipe.wait()
# add 0s the end
- completeAudioArrayCopy = numpy.zeros(
- len(completeAudioArray) + 44100, dtype="int16")
- completeAudioArrayCopy[:len(completeAudioArray)] = completeAudioArray
+ completeAudioArrayCopy = numpy.zeros(len(completeAudioArray) + 44100, dtype="int16")
+ completeAudioArrayCopy[: len(completeAudioArray)] = completeAudioArray
completeAudioArray = completeAudioArrayCopy
return (completeAudioArray, duration)
-def exampleSound(
- style='white', extra='apulsator=offset_l=0.35:offset_r=0.67'):
- '''Help generate an example sound for use in creating a preview'''
+def exampleSound(style="white", extra="apulsator=offset_l=0.35:offset_r=0.67"):
+ """Help generate an example sound for use in creating a preview"""
- if style == 'white':
- src = '-2+random(0)'
- elif style == 'freq':
- src = 'sin(1000*t*PI*t)'
- elif style == 'wave':
- src = 'sin(random(0)*2*PI*t)*tan(random(0)*2*PI*t)'
- elif style == 'stereo':
- src = '0.1*sin(2*PI*(360-2.5/2)*t) | 0.1*sin(2*PI*(360+2.5/2)*t)'
+ if style == "white":
+ src = "-2+random(0)"
+ elif style == "freq":
+ src = "sin(1000*t*PI*t)"
+ elif style == "wave":
+ src = "sin(random(0)*2*PI*t)*tan(random(0)*2*PI*t)"
+ elif style == "stereo":
+ src = "0.1*sin(2*PI*(360-2.5/2)*t) | 0.1*sin(2*PI*(360+2.5/2)*t)"
- return "aevalsrc='%s', %s%s" % (src, extra, ', ' if extra else '')
+ return "aevalsrc='%s', %s%s" % (src, extra, ", " if extra else "")
diff --git a/src/toolkit/frame.py b/src/toolkit/frame.py
index 520bd43..94537a6 100644
--- a/src/toolkit/frame.py
+++ b/src/toolkit/frame.py
@@ -1,25 +1,27 @@
-'''
- Common tools for drawing compatible frames in a Component's frameRender()
-'''
-from PyQt5 import QtGui
+"""
+Common tools for drawing compatible frames in a Component's frameRender()
+"""
+
+from PyQt6 import QtGui
from PIL import Image
from PIL.ImageQt import ImageQt
+from PyQt6 import QtCore
import sys
import os
import math
import logging
-
from .. import core
-log = logging.getLogger('AVP.Toolkit.Frame')
+log = logging.getLogger("AVP.Toolkit.Frame")
class FramePainter(QtGui.QPainter):
- '''
- A QPainter for a blank frame, which can be converted into a
- Pillow image with finalize()
- '''
+ """
+ A QPainter for a blank frame, which can be converted into a
+ Pillow image with finalize()
+ """
+
def __init__(self, width, height):
image = BlankFrame(width, height)
log.debug("Creating QImage from PIL image object")
@@ -34,21 +36,33 @@ class FramePainter(QtGui.QPainter):
def finalize(self):
log.verbose("Finalizing FramePainter")
+ buffer = QtCore.QBuffer()
+ buffer.open(QtCore.QBuffer.OpenModeFlag.ReadWrite)
+ self.image.save(buffer, "PNG")
+ import io
+
+ frame = Image.open(io.BytesIO(buffer.data()))
+ buffer.close()
+ self.end()
+ return frame
imBytes = self.image.bits().asstring(self.image.byteCount())
- frame = Image.frombytes(
- 'RGBA', (self.image.width(), self.image.height()), imBytes
+ frame = Image.frombytes(
+ "RGBA", (self.image.width(), self.image.height()), imBytes
)
self.end()
return frame
class PaintColor(QtGui.QColor):
- '''Reverse the painter colour if the hardware stores RGB values backward'''
+ """
+ Subclass of QtGui.QColor with an added scale() method
+ Previously this class reversed the painter colour to solve
+ hardware issues related to endianness,
+ but Qt appears to deal with this itself nowadays
+ """
+
def __init__(self, r, g, b, a=255):
- if sys.byteorder == 'big':
- super().__init__(r, g, b, a)
- else:
- super().__init__(b, g, r, a)
+ super().__init__(r, g, b, a)
def scale(scalePercent, width, height, returntype=None):
@@ -63,7 +77,8 @@ def scale(scalePercent, width, height, returntype=None):
def defaultSize(framefunc):
- '''Makes width/height arguments optional'''
+ """Makes width/height arguments optional"""
+
def decorator(*args):
if len(args) < 2:
newArgs = list(args)
@@ -75,6 +90,7 @@ def defaultSize(framefunc):
newArgs.insert(0, width)
args = tuple(newArgs)
return framefunc(*args)
+
return decorator
@@ -84,21 +100,18 @@ def FloodFrame(width, height, RgbaTuple):
@defaultSize
def BlankFrame(width, height):
- '''The base frame used by each component to start drawing.'''
+ """The base frame used by each component to start drawing."""
return FloodFrame(width, height, (0, 0, 0, 0))
@defaultSize
def Checkerboard(width, height):
- '''
- A checkerboard to represent transparency to the user.
- TODO: Would be cool to generate this image with numpy instead.
- '''
- log.debug('Creating new %s*%s checkerboard' % (width, height))
+ """
+ A checkerboard to represent transparency to the user.
+ """
+ # TODO: Would be cool to generate this image with numpy instead.
+ log.debug("Creating new %s*%s checkerboard" % (width, height))
image = FloodFrame(1920, 1080, (0, 0, 0, 0))
- image.paste(Image.open(
- os.path.join(core.Core.wd, 'gui', "background.png")),
- (0, 0)
- )
+ image.paste(Image.open(os.path.join(core.Core.wd, "gui", "background.png")), (0, 0))
image = image.resize((width, height))
return image