aboutsummaryrefslogtreecommitdiff
path: root/src/avp/components/video.py
diff options
context:
space:
mode:
authorBrianna Rainey2026-01-13 19:34:55 -0500
committerGitHub2026-01-13 19:34:55 -0500
commit50f5a76603a3f97f2c6f6a1d3cefea88ed3497aa (patch)
tree226fe223b31af6f217b1dd413629ab2cf26964d4 /src/avp/components/video.py
parentb8703752ffc7768b0275897b3c2a869ff41504e5 (diff)
parentf975144f25d34f97329b2d4e52891061573cea08 (diff)
Merge pull request #85 from aeliton/add-pyproject
Use pyproject.toml + uv_build
Diffstat (limited to 'src/avp/components/video.py')
-rw-r--r--src/avp/components/video.py254
1 files changed, 254 insertions, 0 deletions
diff --git a/src/avp/components/video.py b/src/avp/components/video.py
new file mode 100644
index 0000000..65a05af
--- /dev/null
+++ b/src/avp/components/video.py
@@ -0,0 +1,254 @@
+from PIL import Image
+from PyQt6 import QtGui, QtCore, QtWidgets
+import os
+import math
+import subprocess
+import logging
+
+from ..component import Component
+from ..toolkit.frame import BlankFrame, scale
+from ..toolkit.ffmpeg import openPipe, closePipe, testAudioStream, FfmpegVideo
+from ..toolkit import checkOutput
+
+
+log = logging.getLogger("AVP.Components.Video")
+
+
+class Component(Component):
+ name = "Video"
+ version = "1.0.0"
+
+ def widget(self, *args):
+ self.videoPath = ""
+ self.badAudio = False
+ self.x = 0
+ self.y = 0
+ self.loopVideo = False
+ super().widget(*args)
+ self._image = BlankFrame(self.width, self.height)
+ self.page.pushButton_video.clicked.connect(self.pickVideo)
+ self.trackWidgets(
+ {
+ "videoPath": self.page.lineEdit_video,
+ "loopVideo": self.page.checkBox_loop,
+ "useAudio": self.page.checkBox_useAudio,
+ "distort": self.page.checkBox_distort,
+ "scale": self.page.spinBox_scale,
+ "volume": self.page.spinBox_volume,
+ "xPosition": self.page.spinBox_x,
+ "yPosition": self.page.spinBox_y,
+ },
+ presetNames={
+ "videoPath": "video",
+ "loopVideo": "loop",
+ "xPosition": "x",
+ "yPosition": "y",
+ },
+ relativeWidgets=[
+ "xPosition",
+ "yPosition",
+ ],
+ )
+
+ def update(self):
+ if self.page.checkBox_useAudio.isChecked():
+ self.page.label_volume.setEnabled(True)
+ self.page.spinBox_volume.setEnabled(True)
+ else:
+ self.page.label_volume.setEnabled(False)
+ self.page.spinBox_volume.setEnabled(False)
+
+ def previewRender(self):
+ self.updateChunksize()
+ frame = self.getPreviewFrame(self.width, self.height)
+ if not frame:
+ return BlankFrame(self.width, self.height)
+ else:
+ return frame
+
+ def properties(self):
+ props = []
+ outputFile = None
+ if hasattr(self.parent, "lineEdit_outputFile"):
+ # check only happens in GUI mode
+ outputFile = self.parent.lineEdit_outputFile.text()
+
+ if not self.videoPath:
+ self.lockError("There is no video selected.")
+ elif not os.path.exists(self.videoPath):
+ self.lockError("The video selected does not exist!")
+ elif outputFile and os.path.realpath(self.videoPath) == os.path.realpath(
+ outputFile
+ ):
+ self.lockError("Input and output paths match.")
+
+ if self.useAudio:
+ props.append("audio")
+ if not testAudioStream(self.videoPath) and self.error() is None:
+ self.lockError("Could not identify an audio stream in this video.")
+
+ return props
+
+ def audio(self):
+ params = {}
+ if self.volume != 1.0:
+ params["volume"] = "=%s:replaygain_noclip=0" % str(self.volume)
+ return (self.videoPath, params)
+
+ def preFrameRender(self, **kwargs):
+ super().preFrameRender(**kwargs)
+ self.updateChunksize()
+ self.video = (
+ FfmpegVideo(
+ inputPath=self.videoPath,
+ filter_=self.makeFfmpegFilter(),
+ width=self.width,
+ height=self.height,
+ chunkSize=self.chunkSize,
+ frameRate=int(self.settings.value("outputFrameRate")),
+ parent=self.parent,
+ loopVideo=self.loopVideo,
+ component=self,
+ )
+ if os.path.exists(self.videoPath)
+ else None
+ )
+
+ def frameRender(self, frameNo):
+ if FfmpegVideo.threadError is not None:
+ raise FfmpegVideo.threadError
+ return self.finalizeFrame(self.video.frame(frameNo))
+
+ def postFrameRender(self):
+ closePipe(self.video.pipe)
+
+ def pickVideo(self):
+ imgDir = self.settings.value("componentDir", os.path.expanduser("~"))
+ filename, _ = QtWidgets.QFileDialog.getOpenFileName(
+ self.page,
+ "Choose Video",
+ imgDir,
+ "Video Files (%s)" % " ".join(self.core.videoFormats),
+ )
+ if filename:
+ self.settings.setValue("componentDir", os.path.dirname(filename))
+ self.mergeUndo = False
+ self.page.lineEdit_video.setText(filename)
+ self.mergeUndo = True
+
+ def getPreviewFrame(self, width, height):
+ if not self.videoPath or not os.path.exists(self.videoPath):
+ return
+
+ command = [
+ self.core.FFMPEG_BIN,
+ "-thread_queue_size",
+ "512",
+ "-i",
+ self.videoPath,
+ "-f",
+ "image2pipe",
+ "-pix_fmt",
+ "rgba",
+ ]
+ command.extend(self.makeFfmpegFilter())
+ command.extend(
+ [
+ "-codec:v",
+ "rawvideo",
+ "-",
+ "-ss",
+ "90",
+ "-frames:v",
+ "1",
+ ]
+ )
+
+ if self.core.logEnabled:
+ logFilename = os.path.join(
+ self.core.logDir, "preview_%s.log" % str(self.compPos)
+ )
+ log.debug("Creating ffmpeg process (log at %s)" % logFilename)
+ with open(logFilename, "w") as logf:
+ logf.write(" ".join(command) + "\n\n")
+ with open(logFilename, "a") as logf:
+ pipe = openPipe(
+ command,
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.PIPE,
+ stderr=logf,
+ bufsize=10**8,
+ )
+ else:
+ pipe = openPipe(
+ command,
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL,
+ bufsize=10**8,
+ )
+
+ byteFrame = pipe.stdout.read(self.chunkSize)
+ closePipe(pipe)
+
+ frame = self.finalizeFrame(byteFrame)
+ return frame
+
+ def makeFfmpegFilter(self):
+ return [
+ "-filter_complex",
+ "[0:v] scale=%s:%s" % scale(self.scale, self.width, self.height, str),
+ ]
+
+ def updateChunksize(self):
+ if self.scale != 100 and not self.distort:
+ width, height = scale(self.scale, self.width, self.height, int)
+ else:
+ width, height = self.width, self.height
+ self.chunkSize = 4 * width * height
+
+ def command(self, arg):
+ if "=" in arg:
+ key, arg = arg.split("=", 1)
+ if key == "path" and os.path.exists(arg):
+ if "*%s" % os.path.splitext(arg)[1] in self.core.videoFormats:
+ self.page.lineEdit_video.setText(arg)
+ self.page.spinBox_scale.setValue(100)
+ self.page.checkBox_loop.setChecked(True)
+ return
+ else:
+ print("Not a supported video format")
+ quit(1)
+ elif arg == "audio":
+ if not self.page.lineEdit_video.text():
+ print("'audio' option must follow a video selection")
+ quit(1)
+ self.page.checkBox_useAudio.setChecked(True)
+ return
+ super().command(arg)
+
+ def commandHelp(self):
+ print("Load a video:\n path=/filepath/to/video.mp4")
+ print("Using audio:\n path=/filepath/to/video.mp4 audio")
+
+ def finalizeFrame(self, imageData):
+ try:
+ if self.distort:
+ image = Image.frombytes("RGBA", (self.width, self.height), imageData)
+ else:
+ image = Image.frombytes(
+ "RGBA",
+ scale(self.scale, self.width, self.height, int),
+ imageData,
+ )
+ self._image = image
+ except ValueError:
+ # use last good frame
+ image = self._image
+
+ if self.scale != 100 or self.xPosition != 0 or self.yPosition != 0:
+ frame = BlankFrame(self.width, self.height)
+ frame.paste(image, box=(self.xPosition, self.yPosition))
+ else:
+ frame = image
+ return frame