aboutsummaryrefslogtreecommitdiff
path: root/src/avp/toolkit
diff options
context:
space:
mode:
Diffstat (limited to 'src/avp/toolkit')
-rw-r--r--src/avp/toolkit/__init__.py1
-rw-r--r--src/avp/toolkit/common.py192
-rw-r--r--src/avp/toolkit/ffmpeg.py545
-rw-r--r--src/avp/toolkit/frame.py117
4 files changed, 855 insertions, 0 deletions
diff --git a/src/avp/toolkit/__init__.py b/src/avp/toolkit/__init__.py
new file mode 100644
index 0000000..55e5f84
--- /dev/null
+++ b/src/avp/toolkit/__init__.py
@@ -0,0 +1 @@
+from .common import *
diff --git a/src/avp/toolkit/common.py b/src/avp/toolkit/common.py
new file mode 100644
index 0000000..e35aba2
--- /dev/null
+++ b/src/avp/toolkit/common.py
@@ -0,0 +1,192 @@
+"""
+Common functions
+"""
+
+from PyQt6 import QtWidgets
+import string
+import os
+import sys
+import subprocess
+import logging
+from copy import copy
+from collections import OrderedDict
+
+
+log = logging.getLogger("AVP.Toolkit.Common")
+
+
+class blockSignals:
+ """
+ Context manager to temporarily block list of QtWidgets from updating,
+ and guarantee restoring the previous state afterwards.
+ """
+
+ def __init__(self, widgets):
+ if type(widgets) is dict:
+ self.widgets = concatDictVals(widgets)
+ else:
+ self.widgets = widgets if hasattr(widgets, "__iter__") else [widgets]
+
+ def __enter__(self):
+ log.verbose(
+ "Blocking signals for %s",
+ ", ".join([str(w.__class__.__name__) for w in self.widgets]),
+ )
+ self.oldStates = [w.signalsBlocked() for w in self.widgets]
+ for w in self.widgets:
+ w.blockSignals(True)
+
+ def __exit__(self, *args):
+ log.verbose("Resetting blockSignals to %s", str(bool(sum(self.oldStates))))
+ for w, state in zip(self.widgets, self.oldStates):
+ w.blockSignals(state)
+
+
+def concatDictVals(d):
+ """Concatenates all values in given dict into one list."""
+ key, value = d.popitem()
+ d[key] = value
+ final = copy(value)
+ if type(final) is not list:
+ final = [final]
+ final.extend([val for val in d.values()])
+ else:
+ value.extend([item for val in d.values() for item in val])
+ return final
+
+
+def badName(name):
+ """Returns whether a name contains non-alphanumeric chars"""
+ return any([letter in string.punctuation for letter in name])
+
+
+def alphabetizeDict(dictionary):
+ """Alphabetizes a dict into OrderedDict"""
+ return OrderedDict(sorted(dictionary.items(), key=lambda t: t[0]))
+
+
+def presetToString(dictionary):
+ """Returns string repr of a preset"""
+ return repr(alphabetizeDict(dictionary))
+
+
+def presetFromString(string):
+ """Turns a string repr of OrderedDict into a regular dict"""
+ return dict(eval(string))
+
+
+def appendUppercase(lst):
+ for form, i in zip(lst, range(len(lst))):
+ lst.append(form.upper())
+ return lst
+
+
+def pipeWrapper(func):
+ """A decorator to insert proper kwargs into Popen objects."""
+
+ def pipeWrapper(commandList, **kwargs):
+ if sys.platform == "win32":
+ # Stop CMD window from appearing on Windows
+ startupinfo = subprocess.STARTUPINFO()
+ startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
+ kwargs["startupinfo"] = startupinfo
+
+ if "bufsize" not in kwargs:
+ kwargs["bufsize"] = 10**8
+ if "stdin" not in kwargs:
+ kwargs["stdin"] = subprocess.DEVNULL
+ return func(commandList, **kwargs)
+
+ return pipeWrapper
+
+
+@pipeWrapper
+def checkOutput(commandList, **kwargs):
+ return subprocess.check_output(commandList, **kwargs)
+
+
+def disableWhenEncoding(func):
+ def decorator(self, *args, **kwargs):
+ if self.encoding:
+ return
+ else:
+ return func(self, *args, **kwargs)
+
+ return decorator
+
+
+def disableWhenOpeningProject(func):
+ def decorator(self, *args, **kwargs):
+ if self.core.openingProject:
+ return
+ else:
+ return func(self, *args, **kwargs)
+
+ return decorator
+
+
+def rgbFromString(string):
+ """Turns an RGB string like "255, 255, 255" into a tuple"""
+ try:
+ tup = tuple([int(i) for i in string.split(",")])
+ if len(tup) != 3:
+ raise ValueError
+ for i in tup:
+ if i > 255 or i < 0:
+ raise ValueError
+ return tup
+ except:
+ return (255, 255, 255)
+
+
+def formatTraceback(tb=None):
+ import traceback
+
+ if tb is None:
+ import sys
+
+ tb = sys.exc_info()[2]
+ return "Traceback:\n%s" % "\n".join(traceback.format_tb(tb))
+
+
+def connectWidget(widget, func):
+ if type(widget) == QtWidgets.QLineEdit:
+ widget.textChanged.connect(func)
+ elif type(widget) == QtWidgets.QSpinBox or type(widget) == QtWidgets.QDoubleSpinBox:
+ widget.valueChanged.connect(func)
+ elif type(widget) == QtWidgets.QCheckBox:
+ widget.stateChanged.connect(func)
+ elif type(widget) == QtWidgets.QComboBox:
+ widget.currentIndexChanged.connect(func)
+ else:
+ log.warning("Failed to connect %s ", str(widget.__class__.__name__))
+ return False
+ return True
+
+
+def setWidgetValue(widget, val):
+ """Generic setValue method for use with any typical QtWidget"""
+ log.verbose("Setting %s to %s" % (str(widget.__class__.__name__), val))
+ if type(widget) == QtWidgets.QLineEdit:
+ widget.setText(val)
+ elif type(widget) == QtWidgets.QSpinBox or type(widget) == QtWidgets.QDoubleSpinBox:
+ widget.setValue(val)
+ elif type(widget) == QtWidgets.QCheckBox:
+ widget.setChecked(val)
+ elif type(widget) == QtWidgets.QComboBox:
+ widget.setCurrentIndex(val)
+ else:
+ log.warning("Failed to set %s ", str(widget.__class__.__name__))
+ return False
+ return True
+
+
+def getWidgetValue(widget):
+ if type(widget) == QtWidgets.QLineEdit:
+ return widget.text()
+ elif type(widget) == QtWidgets.QSpinBox or type(widget) == QtWidgets.QDoubleSpinBox:
+ return widget.value()
+ elif type(widget) == QtWidgets.QCheckBox:
+ return widget.isChecked()
+ elif type(widget) == QtWidgets.QComboBox:
+ return widget.currentIndex()
diff --git a/src/avp/toolkit/ffmpeg.py b/src/avp/toolkit/ffmpeg.py
new file mode 100644
index 0000000..5aedff3
--- /dev/null
+++ b/src/avp/toolkit/ffmpeg.py
@@ -0,0 +1,545 @@
+"""
+Tools for using ffmpeg
+"""
+
+import numpy
+import sys
+import os
+import subprocess
+import threading
+import signal
+from queue import PriorityQueue
+import logging
+
+from .. import core
+from .common import checkOutput, pipeWrapper
+
+
+log = logging.getLogger("AVP.Toolkit.Ffmpeg")
+
+
+class FfmpegVideo:
+ """Opens a pipe to ffmpeg and stores a buffer of raw video frames."""
+
+ # error from the thread used to fill the buffer
+ threadError = None
+
+ def __init__(self, **kwargs):
+ mandatoryArgs = [
+ "inputPath",
+ "filter_",
+ "width",
+ "height",
+ "frameRate", # frames per second
+ "chunkSize", # number of bytes in one frame
+ "parent", # mainwindow object
+ "component", # component object
+ ]
+ for arg in mandatoryArgs:
+ setattr(self, arg, kwargs[arg])
+
+ self.frameNo = -1
+ self.currentFrame = "None"
+ self.map_ = None
+
+ if "loopVideo" in kwargs and kwargs["loopVideo"]:
+ self.loopValue = "-1"
+ else:
+ self.loopValue = "0"
+ if "filter_" in kwargs:
+ if kwargs["filter_"][0] != "-filter_complex":
+ kwargs["filter_"].insert(0, "-filter_complex")
+ else:
+ kwargs["filter_"] = None
+
+ self.command = [
+ core.Core.FFMPEG_BIN,
+ "-thread_queue_size",
+ "512",
+ "-r",
+ str(self.frameRate),
+ "-stream_loop",
+ str(self.loopValue),
+ "-i",
+ self.inputPath,
+ "-f",
+ "image2pipe",
+ "-pix_fmt",
+ "rgba",
+ ]
+ if type(kwargs["filter_"]) is list:
+ self.command.extend(kwargs["filter_"])
+ self.command.extend(
+ [
+ "-codec:v",
+ "rawvideo",
+ "-",
+ ]
+ )
+
+ self.frameBuffer = PriorityQueue()
+ self.frameBuffer.maxsize = self.frameRate
+ self.finishedFrames = {}
+
+ self.thread = threading.Thread(
+ target=self.fillBuffer, name="FFmpeg Frame-Fetcher"
+ )
+ self.thread.daemon = True
+ self.thread.start()
+
+ def frame(self, num):
+ while True:
+ if num in self.finishedFrames:
+ image = self.finishedFrames.pop(num)
+ return image
+
+ i, image = self.frameBuffer.get()
+ self.finishedFrames[i] = image
+ self.frameBuffer.task_done()
+
+ def fillBuffer(self):
+ from ..component import ComponentError
+
+ if core.Core.logEnabled:
+ logFilename = os.path.join(
+ core.Core.logDir, "render_%s.log" % str(self.component.compPos)
+ )
+ log.debug("Creating ffmpeg process (log at %s)", logFilename)
+ with open(logFilename, "w") as logf:
+ logf.write(" ".join(self.command) + "\n\n")
+ with open(logFilename, "a") as logf:
+ self.pipe = openPipe(
+ self.command,
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.PIPE,
+ stderr=logf,
+ bufsize=10**8,
+ )
+ else:
+ self.pipe = openPipe(
+ self.command,
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL,
+ bufsize=10**8,
+ )
+
+ while True:
+ if self.parent.canceled:
+ break
+ self.frameNo += 1
+
+ # If we run out of frames, use the last good frame and loop.
+ try:
+ if len(self.currentFrame) == 0:
+ self.frameBuffer.put((self.frameNo - 1, self.lastFrame))
+ continue
+ except AttributeError:
+ FfmpegVideo.threadError = ComponentError(
+ self.component,
+ "video",
+ "Video seemed playable but wasn't.",
+ )
+ break
+
+ try:
+ self.currentFrame = self.pipe.stdout.read(self.chunkSize)
+ except ValueError as e:
+ if str(e) == "PyMemoryView_FromBuffer(): info->buf must not be NULL":
+ log.debug(
+ "Ignored 'info->buf must not be NULL' error from FFmpeg pipe"
+ )
+ return
+ else:
+ FfmpegVideo.threadError = ComponentError(self.component, "video")
+
+ if len(self.currentFrame) != 0:
+ self.frameBuffer.put((self.frameNo, self.currentFrame))
+ self.lastFrame = self.currentFrame
+
+
+@pipeWrapper
+def openPipe(commandList, **kwargs):
+ return subprocess.Popen(commandList, **kwargs)
+
+
+def closePipe(pipe):
+ pipe.stdout.close()
+ pipe.send_signal(signal.SIGTERM)
+
+
+def findFfmpeg():
+ if sys.platform == "win32":
+ bin = "ffmpeg.exe"
+ else:
+ bin = "ffmpeg"
+
+ if getattr(sys, "frozen", False):
+ # The application is frozen
+ bin = os.path.join(core.Core.wd, bin)
+
+ with open(os.devnull, "w") as f:
+ try:
+ checkOutput([bin, "-version"], stderr=f)
+ except (subprocess.CalledProcessError, FileNotFoundError):
+ bin = ""
+
+ return bin
+
+
+def createFfmpegCommand(inputFile, outputFile, components, duration=-1):
+ """
+ Constructs the major ffmpeg command used to export the video
+ """
+ if duration == -1:
+ duration = getAudioDuration(inputFile)
+ safeDuration = "{0:.3f}".format(duration - 0.05) # used by filters
+ duration = "{0:.3f}".format(duration + 0.1) # used by input sources
+ Core = core.Core
+
+ # Test if user has libfdk_aac
+ encoders = checkOutput("%s -encoders -hide_banner" % Core.FFMPEG_BIN, shell=True)
+ encoders = encoders.decode("utf-8")
+
+ acodec = Core.settings.value("outputAudioCodec")
+
+ options = Core.encoderOptions
+ containerName = Core.settings.value("outputContainer")
+ vcodec = Core.settings.value("outputVideoCodec")
+ vbitrate = str(Core.settings.value("outputVideoBitrate")) + "k"
+ acodec = Core.settings.value("outputAudioCodec")
+ abitrate = str(Core.settings.value("outputAudioBitrate")) + "k"
+
+ for cont in options["containers"]:
+ if cont["name"] == containerName:
+ container = cont["container"]
+ break
+
+ vencoders = options["video-codecs"][vcodec]
+ aencoders = options["audio-codecs"][acodec]
+
+ def error():
+ nonlocal encoders, encoder
+ log.critical(
+ "Selected encoder (%s) is not supported by Ffmpeg. The supported encoders are: %s",
+ encoder,
+ encoders,
+ )
+ return []
+
+ for encoder in vencoders:
+ if encoder in encoders:
+ vencoder = encoder
+ break
+ else:
+ return error()
+
+ for encoder in aencoders:
+ if encoder in encoders:
+ aencoder = encoder
+ break
+ else:
+ return error()
+
+ ffmpegCommand = [
+ Core.FFMPEG_BIN,
+ "-thread_queue_size",
+ "512",
+ "-y", # overwrite the output file if it already exists.
+ # INPUT VIDEO
+ "-f",
+ "rawvideo",
+ "-vcodec",
+ "rawvideo",
+ "-s",
+ f'{Core.settings.value("outputWidth")}x{Core.settings.value("outputHeight")}',
+ "-pix_fmt",
+ "rgba",
+ "-r",
+ str(Core.settings.value("outputFrameRate")),
+ "-t",
+ duration,
+ "-an", # the video input has no sound
+ "-i",
+ "-", # the video input comes from a pipe
+ # INPUT SOUND
+ "-t",
+ duration,
+ "-i",
+ inputFile,
+ ]
+
+ extraAudio = [comp.audio for comp in components if "audio" in comp.properties()]
+ segment = createAudioFilterCommand(extraAudio, safeDuration)
+ ffmpegCommand.extend(segment)
+ # Map audio from the filters or the single audio input, and map video from the pipe
+ ffmpegCommand.extend(
+ [
+ "-map",
+ "0:v",
+ "-map",
+ "[a]" if segment else "1:a",
+ ]
+ )
+
+ ffmpegCommand.extend(
+ [
+ # OUTPUT
+ "-vcodec",
+ vencoder,
+ "-acodec",
+ aencoder,
+ "-b:v",
+ vbitrate,
+ "-b:a",
+ abitrate,
+ "-pix_fmt",
+ Core.settings.value("outputVideoFormat"),
+ "-preset",
+ Core.settings.value("outputPreset"),
+ "-f",
+ container,
+ ]
+ )
+
+ if acodec == "aac":
+ ffmpegCommand.append("-strict")
+ ffmpegCommand.append("-2")
+
+ ffmpegCommand.append(outputFile)
+ return ffmpegCommand
+
+
+def createAudioFilterCommand(extraAudio, duration):
+ """Add extra inputs and any needed filters to the main ffmpeg command."""
+ # NOTE: Global filters are currently hard-coded here for debugging use
+ globalFilters = 0 # increase to add global filters
+
+ if not extraAudio and not globalFilters:
+ return []
+
+ ffmpegCommand = []
+ # Add -i options for extra input files
+ extraFilters = {}
+ for streamNo, params in enumerate(reversed(extraAudio)):
+ extraInputFile, params = params
+ ffmpegCommand.extend(
+ [
+ "-t",
+ duration,
+ # Tell ffmpeg about shorter clips (seemingly not needed)
+ # streamDuration = getAudioDuration(extraInputFile)
+ # if streamDuration and streamDuration > float(safeDuration)
+ # else "{0:.3f}".format(streamDuration),
+ "-i",
+ extraInputFile,
+ ]
+ )
+ # Construct dataset of extra filters we'll need to add later
+ for ffmpegFilter in params:
+ if streamNo + 2 not in extraFilters:
+ extraFilters[streamNo + 2] = []
+ extraFilters[streamNo + 2].append((ffmpegFilter, params[ffmpegFilter]))
+
+ # Start creating avfilters! Popen-style, so don't use semicolons;
+ extraFilterCommand = []
+
+ if globalFilters <= 0:
+ # Dictionary of last-used tmp labels for a given stream number
+ tmpInputs = {streamNo: -1 for streamNo in extraFilters}
+ else:
+ # Insert blank entries for global filters into extraFilters
+ # so the per-stream filters know what input to source later
+ for streamNo in range(len(extraAudio), 0, -1):
+ if streamNo + 1 not in extraFilters:
+ extraFilters[streamNo + 1] = []
+ # Also filter the primary audio track
+ extraFilters[1] = []
+ tmpInputs = {streamNo: globalFilters - 1 for streamNo in extraFilters}
+
+ # Add the global filters!
+ # NOTE: list length must = globalFilters, currently hardcoded
+ if tmpInputs:
+ extraFilterCommand.extend(
+ [
+ "[%s:a] ashowinfo [%stmp0]" % (str(streamNo), str(streamNo))
+ for streamNo in tmpInputs
+ ]
+ )
+
+ # Now add the per-stream filters!
+ for streamNo, paramList in extraFilters.items():
+ for param in paramList:
+ source = (
+ "[%s:a]" % str(streamNo)
+ if tmpInputs[streamNo] == -1
+ else "[%stmp%s]" % (str(streamNo), str(tmpInputs[streamNo]))
+ )
+ tmpInputs[streamNo] = tmpInputs[streamNo] + 1
+ extraFilterCommand.append(
+ "%s %s%s [%stmp%s]"
+ % (
+ source,
+ param[0],
+ param[1],
+ str(streamNo),
+ str(tmpInputs[streamNo]),
+ )
+ )
+
+ # Join all the filters together and combine into 1 stream
+ extraFilterCommand = "; ".join(extraFilterCommand) + "; " if tmpInputs else ""
+ ffmpegCommand.extend(
+ [
+ "-filter_complex",
+ extraFilterCommand
+ + "%s amix=inputs=%s:duration=first [a]"
+ % (
+ "".join(
+ [
+ (
+ "[%stmp%s]" % (str(i), tmpInputs[i])
+ if i in extraFilters
+ else "[%s:a]" % str(i)
+ )
+ for i in range(1, len(extraAudio) + 2)
+ ]
+ ),
+ str(len(extraAudio) + 1),
+ ),
+ ]
+ )
+ return ffmpegCommand
+
+
+def testAudioStream(filename):
+ """Test if an audio stream definitely exists"""
+ audioTestCommand = [
+ core.Core.FFMPEG_BIN,
+ "-i",
+ filename,
+ "-vn",
+ "-f",
+ "null",
+ "-",
+ ]
+ try:
+ checkOutput(audioTestCommand, stderr=subprocess.DEVNULL)
+ except subprocess.CalledProcessError:
+ return False
+ else:
+ return True
+
+
+def getAudioDuration(filename):
+ """Try to get duration of audio file as float, or False if not possible"""
+ command = [core.Core.FFMPEG_BIN, "-i", filename]
+
+ try:
+ fileInfo = checkOutput(command, stderr=subprocess.STDOUT)
+ except subprocess.CalledProcessError as ex:
+ fileInfo = ex.output
+ except (FileNotFoundError, PermissionError):
+ # ffmpeg is possibly not installed
+ return False
+
+ try:
+ info = fileInfo.decode("utf-8").split("\n")
+ except UnicodeDecodeError as e:
+ log.error("Unicode error:", str(e))
+ return False
+
+ for line in info:
+ if "Duration" in line:
+ d = line.split(",")[0]
+ d = d.split(" ")[3]
+ d = d.split(":")
+ duration = float(d[0]) * 3600 + float(d[1]) * 60 + float(d[2])
+ break
+ else:
+ # String not found in output
+ return False
+ return duration
+
+
+def readAudioFile(filename, videoWorker):
+ """
+ Creates the completeAudioArray given to components
+ and used to draw the classic visualizer.
+ """
+ duration = getAudioDuration(filename)
+ if not duration:
+ log.error(f"Audio file {filename} doesn't exist or unreadable.")
+ return
+
+ command = [
+ core.Core.FFMPEG_BIN,
+ "-i",
+ filename,
+ "-f",
+ "s16le",
+ "-acodec",
+ "pcm_s16le",
+ "-ar",
+ "44100", # ouput will have 44100 Hz
+ "-ac",
+ "1", # mono (set to '2' for stereo)
+ "-",
+ ]
+ in_pipe = openPipe(
+ command,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL,
+ bufsize=10**8,
+ )
+
+ completeAudioArray = numpy.empty(0, dtype="int16")
+
+ progress = 0
+ lastPercent = None
+ while True:
+ if core.Core.canceled:
+ return
+ # read 2 seconds of audio
+ progress += 4
+ raw_audio = in_pipe.stdout.read(88200 * 4)
+ if len(raw_audio) == 0:
+ break
+ audio_array = numpy.frombuffer(raw_audio, dtype="int16")
+ completeAudioArray = numpy.append(completeAudioArray, audio_array)
+
+ percent = int(100 * (progress / duration))
+ if percent >= 100:
+ percent = 100
+
+ if lastPercent != percent:
+ string = "Loading audio file: " + str(percent) + "%"
+ videoWorker.progressBarSetText.emit(string)
+ videoWorker.progressBarUpdate.emit(percent)
+
+ lastPercent = percent
+
+ in_pipe.kill()
+ in_pipe.wait()
+
+ # add 0s the end
+ completeAudioArrayCopy = numpy.zeros(len(completeAudioArray) + 44100, dtype="int16")
+ completeAudioArrayCopy[: len(completeAudioArray)] = completeAudioArray
+ completeAudioArray = completeAudioArrayCopy
+
+ return (completeAudioArray, duration)
+
+
+def exampleSound(style="white", extra="apulsator=offset_l=0.35:offset_r=0.67"):
+ """Help generate an example sound for use in creating a preview"""
+
+ if style == "white":
+ src = "-2+random(0)"
+ elif style == "freq":
+ src = "sin(1000*t*PI*t)"
+ elif style == "wave":
+ src = "sin(random(0)*2*PI*t)*tan(random(0)*2*PI*t)"
+ elif style == "stereo":
+ src = "0.1*sin(2*PI*(360-2.5/2)*t) | 0.1*sin(2*PI*(360+2.5/2)*t)"
+
+ return "aevalsrc='%s', %s%s" % (src, extra, ", " if extra else "")
diff --git a/src/avp/toolkit/frame.py b/src/avp/toolkit/frame.py
new file mode 100644
index 0000000..94537a6
--- /dev/null
+++ b/src/avp/toolkit/frame.py
@@ -0,0 +1,117 @@
+"""
+Common tools for drawing compatible frames in a Component's frameRender()
+"""
+
+from PyQt6 import QtGui
+from PIL import Image
+from PIL.ImageQt import ImageQt
+from PyQt6 import QtCore
+import sys
+import os
+import math
+import logging
+from .. import core
+
+
+log = logging.getLogger("AVP.Toolkit.Frame")
+
+
+class FramePainter(QtGui.QPainter):
+ """
+ A QPainter for a blank frame, which can be converted into a
+ Pillow image with finalize()
+ """
+
+ def __init__(self, width, height):
+ image = BlankFrame(width, height)
+ log.debug("Creating QImage from PIL image object")
+ self.image = ImageQt(image)
+ super().__init__(self.image)
+
+ def setPen(self, penStyle):
+ if type(penStyle) is tuple:
+ super().setPen(PaintColor(*penStyle))
+ else:
+ super().setPen(penStyle)
+
+ def finalize(self):
+ log.verbose("Finalizing FramePainter")
+ buffer = QtCore.QBuffer()
+ buffer.open(QtCore.QBuffer.OpenModeFlag.ReadWrite)
+ self.image.save(buffer, "PNG")
+ import io
+
+ frame = Image.open(io.BytesIO(buffer.data()))
+ buffer.close()
+ self.end()
+ return frame
+ imBytes = self.image.bits().asstring(self.image.byteCount())
+ frame = Image.frombytes(
+ "RGBA", (self.image.width(), self.image.height()), imBytes
+ )
+ self.end()
+ return frame
+
+
+class PaintColor(QtGui.QColor):
+ """
+ Subclass of QtGui.QColor with an added scale() method
+ Previously this class reversed the painter colour to solve
+ hardware issues related to endianness,
+ but Qt appears to deal with this itself nowadays
+ """
+
+ def __init__(self, r, g, b, a=255):
+ super().__init__(r, g, b, a)
+
+
+def scale(scalePercent, width, height, returntype=None):
+ width = (float(width) / 100.0) * float(scalePercent)
+ height = (float(height) / 100.0) * float(scalePercent)
+ if returntype == str:
+ return (str(math.ceil(width)), str(math.ceil(height)))
+ elif returntype == int:
+ return (math.ceil(width), math.ceil(height))
+ else:
+ return (width, height)
+
+
+def defaultSize(framefunc):
+ """Makes width/height arguments optional"""
+
+ def decorator(*args):
+ if len(args) < 2:
+ newArgs = list(args)
+ if len(args) == 0 or len(args) == 1:
+ height = int(core.Core.settings.value("outputHeight"))
+ newArgs.append(height)
+ if len(args) == 0:
+ width = int(core.Core.settings.value("outputWidth"))
+ newArgs.insert(0, width)
+ args = tuple(newArgs)
+ return framefunc(*args)
+
+ return decorator
+
+
+def FloodFrame(width, height, RgbaTuple):
+ return Image.new("RGBA", (width, height), RgbaTuple)
+
+
+@defaultSize
+def BlankFrame(width, height):
+ """The base frame used by each component to start drawing."""
+ return FloodFrame(width, height, (0, 0, 0, 0))
+
+
+@defaultSize
+def Checkerboard(width, height):
+ """
+ A checkerboard to represent transparency to the user.
+ """
+ # TODO: Would be cool to generate this image with numpy instead.
+ log.debug("Creating new %s*%s checkerboard" % (width, height))
+ image = FloodFrame(1920, 1080, (0, 0, 0, 0))
+ image.paste(Image.open(os.path.join(core.Core.wd, "gui", "background.png")), (0, 0))
+ image = image.resize((width, height))
+ return image