diff options
Diffstat (limited to 'src/video_thread.py')
| -rw-r--r-- | src/video_thread.py | 419 |
1 files changed, 419 insertions, 0 deletions
diff --git a/src/video_thread.py b/src/video_thread.py new file mode 100644 index 0000000..5a28beb --- /dev/null +++ b/src/video_thread.py @@ -0,0 +1,419 @@ +''' + Thread created to export a video. It has a slot to begin export using + an input file, output path, and component list. During export multiple + threads are created to render the video as quickly as possible. Signals + are emitted to update MainWindow's progress bar, detail text, and preview. + Export can be cancelled with cancel() +''' +from PyQt5 import QtCore, QtGui +from PyQt5.QtCore import pyqtSignal, pyqtSlot +from PIL import Image +from PIL.ImageQt import ImageQt +import numpy +import subprocess as sp +import sys +import os +from queue import Queue, PriorityQueue +from threading import Thread, Event +import time +import signal +import logging + +from .component import ComponentError +from .toolkit.frame import Checkerboard +from .toolkit.ffmpeg import ( + openPipe, readAudioFile, + getAudioDuration, createFfmpegCommand +) + + +log = logging.getLogger("AVP.VideoThread") + + +class Worker(QtCore.QObject): + + imageCreated = pyqtSignal('QImage') + videoCreated = pyqtSignal() + progressBarUpdate = pyqtSignal(int) + progressBarSetText = pyqtSignal(str) + encoding = pyqtSignal(bool) + + def __init__(self, parent, inputFile, outputFile, components): + super().__init__() + self.core = parent.core + self.settings = parent.settings + self.modules = parent.core.modules + parent.createVideo.connect(self.createVideo) + + #self.parent = parent + self.components = components + self.outputFile = outputFile + self.inputFile = inputFile + + self.hertz = 44100 + self.sampleSize = 1470 # 44100 / 30 = 1470 + self.canceled = False + self.error = False + self.stopped = False + + def renderNode(self): + ''' + Grabs audio data indices at frames to export, from compositeQueue. + Sends it to the components' frameRender methods in layer order + to create subframes & composite them into the final frame. + The resulting frames are collected in the renderQueue + ''' + def err(): + self.closePipe() + self.cancelExport() + self.error = True + msg = 'A render node failed critically.' + log.critical(msg) + comp._error.emit(msg, str(e)) + + while not self.stopped: + audioI = self.compositeQueue.get() + bgI = int(audioI / self.sampleSize) + frame = None + for layerNo, comp in enumerate(reversed((self.components))): + try: + if layerNo in self.staticComponents: + if self.staticComponents[layerNo] is None: + # this layer was merged into a following layer + continue + # static component + if frame is None: # bottom-most layer + frame = self.staticComponents[layerNo] + else: + frame = Image.alpha_composite( + frame, self.staticComponents[layerNo] + ) + + else: + # animated component + if frame is None: # bottom-most layer + frame = comp.frameRender(bgI) + else: + frame = Image.alpha_composite( + frame, comp.frameRender(bgI) + ) + except Exception as e: + err() + + self.renderQueue.put([audioI, frame]) + self.compositeQueue.task_done() + + def renderDispatch(self): + ''' + Places audio data indices in the compositeQueue, to be used + by a renderNode later. All indices are multiples of self.sampleSize + sampleSize * frameNo = audioI, AKA audio data starting at frameNo + ''' + log.debug('Dispatching Frames for Compositing...') + + for audioI in range(0, self.audioArrayLen, self.sampleSize): + self.compositeQueue.put(audioI) + + def previewDispatch(self): + ''' + Grabs frames from the previewQueue, adds them to the checkerboard + and emits a final QImage to the MainWindow for the live preview + ''' + background = Checkerboard(self.width, self.height) + + while not self.stopped: + audioI, frame = self.previewQueue.get() + if time.time() - self.lastPreview >= 0.06 or audioI == 0: + image = Image.alpha_composite(background.copy(), frame) + self.imageCreated.emit(QtGui.QImage(ImageQt(image))) + self.lastPreview = time.time() + + self.previewQueue.task_done() + + @pyqtSlot() + def createVideo(self): + log.debug("Video worker received signal to createVideo") + numpy.seterr(divide='ignore') + self.encoding.emit(True) + self.extraAudio = [] + self.width = int(self.settings.value('outputWidth')) + self.height = int(self.settings.value('outputHeight')) + + self.compositeQueue = Queue() + self.compositeQueue.maxsize = 20 + self.renderQueue = PriorityQueue() + self.renderQueue.maxsize = 20 + self.previewQueue = PriorityQueue() + + self.reset() + progressBarValue = 0 + self.progressBarUpdate.emit(progressBarValue) + + # =~=~=~=~=~=~=~=~=~=~=~=~=~=~=~=~=~=~=~==~=~=~=~=~=~=~=~=~=~=~=~=~=~ + # READ AUDIO, INITIALIZE COMPONENTS, OPEN A PIPE TO FFMPEG + # =~=~=~=~=~=~=~=~=~=~=~=~=~=~=~=~=~=~=~==~=~=~=~=~=~=~=~=~=~=~=~=~=~ + log.debug("Determining length of audio...") + if any([ + True if 'pcm' in comp.properties() else False + for comp in self.components + ]): + self.progressBarSetText.emit("Loading audio file...") + audioFileTraits = readAudioFile( + self.inputFile, self + ) + if audioFileTraits is None: + self.cancelExport() + return + self.completeAudioArray, duration = audioFileTraits + self.audioArrayLen = len(self.completeAudioArray) + else: + duration = getAudioDuration(self.inputFile) + self.completeAudioArray = [] + self.audioArrayLen = int( + ((duration * self.hertz) + + self.hertz) - self.sampleSize) + + self.progressBarUpdate.emit(0) + self.progressBarSetText.emit("Starting components...") + canceledByComponent = False + initText = ", ".join([ + "%s) %s" % (num, str(component)) + for num, component in enumerate(reversed(self.components)) + ]) + print('Loaded Components:', initText) + log.info('Calling preFrameRender for %s', initText) + self.staticComponents = {} + for compNo, comp in enumerate(reversed(self.components)): + try: + comp.preFrameRender( + audioFile=self.inputFile, + completeAudioArray=self.completeAudioArray, + audioArrayLen=self.audioArrayLen, + sampleSize=self.sampleSize, + progressBarUpdate=self.progressBarUpdate, + progressBarSetText=self.progressBarSetText + ) + except ComponentError: + log.warning( + '#%s %s encountered an error in its preFrameRender method', + compNo, + comp + ) + + compProps = comp.properties() + if 'error' in compProps or comp._lockedError is not None: + self.cancel() + self.canceled = True + canceledByComponent = True + compError = comp.error() \ + if type(comp.error()) is tuple else (comp.error(), '') + errMsg = ( + "Component #%s (%s) encountered an error!" % ( + str(compNo), comp.name + ) + if comp.error() is None else + 'Export cancelled by component #%s (%s): %s' % ( + str(compNo), + comp.name, + compError[0] + ) + ) + log.error(errMsg) + comp._error.emit(errMsg, compError[1]) + break + if 'static' in compProps: + log.info('Saving static frame from #%s %s', compNo, comp) + self.staticComponents[compNo] = \ + comp.frameRender(0).copy() + + log.debug("Checking if a component wishes to cancel the export...") + if self.canceled: + if canceledByComponent: + log.error( + 'Export cancelled by component #%s (%s): %s', + compNo, + comp.name, + 'No message.' if comp.error() is None else ( + comp.error() if type(comp.error()) is str + else comp.error()[0] + ) + ) + self.cancelExport() + return + + log.info("Merging consecutive static component frames") + for compNo in range(len(self.components)): + if compNo not in self.staticComponents \ + or compNo + 1 not in self.staticComponents: + continue + self.staticComponents[compNo + 1] = Image.alpha_composite( + self.staticComponents.pop(compNo), + self.staticComponents[compNo + 1] + ) + self.staticComponents[compNo] = None + + try: + ffmpegCommand = createFfmpegCommand( + self.inputFile, self.outputFile, self.components, duration + ) + except sp.CalledProcessError as e: + #FIXME video_thread should own this error signal, not components + self.components[0]._error.emit("Ffmpeg could not be found. Is it installed?", str(e)) + self.error = True + return + + cmd = " ".join(ffmpegCommand) + print('###### FFMPEG COMMAND ######\n%s' % cmd) + print('############################') + if not cmd: + #FIXME video_thread should own this error signal, not components + self.components[0]._error.emit("The ffmpeg command could not be generated.", "") + log.critical("Cancelling render process due to failure while generating the ffmpeg command.") + self.failExport() + return + + log.info('Opening pipe to ffmpeg') + log.info(cmd) + try: + self.out_pipe = openPipe( + ffmpegCommand, + stdin=sp.PIPE, stdout=sys.stdout, stderr=sys.stdout + ) + except sp.CalledProcessError: + log.critical('Ffmpeg pipe couldn\'t be created!', exc_info=True) + raise + + # =~=~=~=~=~=~=~=~=~=~=~=~=~=~=~=~=~=~=~==~=~=~=~=~=~=~=~=~=~=~=~=~=~ + # START CREATING THE VIDEO + # =~=~=~=~=~=~=~=~=~=~=~=~=~=~=~=~=~=~=~==~=~=~=~=~=~=~=~=~=~=~=~=~=~ + + # Make 2 or 3 renderNodes in new threads to create the frames + self.renderThreads = [] + try: + numCpus = len(os.sched_getaffinity(0)) + except Exception: + numCpus = os.cpu_count() + + for i in range(2 if numCpus <= 2 else 3): + self.renderThreads.append( + Thread(target=self.renderNode, name="Render Thread")) + self.renderThreads[i].daemon = True + self.renderThreads[i].start() + + self.dispatchThread = Thread( + target=self.renderDispatch, name="Render Dispatch Thread") + self.dispatchThread.daemon = True + self.dispatchThread.start() + + self.lastPreview = 0.0 + self.previewDispatch = Thread( + target=self.previewDispatch, name="Render Dispatch Thread" + ) + self.previewDispatch.daemon = True + self.previewDispatch.start() + + # Begin piping into ffmpeg! + frameBuffer = {} + progressBarValue = 0 + self.progressBarUpdate.emit(progressBarValue) + self.progressBarSetText.emit("Exporting video...") + if not self.canceled: + for audioI in range( + 0, self.audioArrayLen, self.sampleSize): + while True: + if audioI in frameBuffer or self.canceled: + # if frame's in buffer, pipe it to ffmpeg + break + # else fetch the next frame & add to the buffer + audioI_, frame = self.renderQueue.get() + frameBuffer[audioI_] = frame + self.renderQueue.task_done() + if self.canceled: + break + + try: + self.out_pipe.stdin.write(frameBuffer[audioI].tobytes()) + self.previewQueue.put([audioI, frameBuffer.pop(audioI)]) + except Exception: + break + + # increase progress bar value + completion = (audioI / self.audioArrayLen) * 100 + if progressBarValue + 1 <= completion: + progressBarValue = numpy.floor(completion).astype(int) + self.progressBarUpdate.emit(progressBarValue) + self.progressBarSetText.emit( + "Exporting video: %s%%" % str(int(progressBarValue)) + ) + + numpy.seterr(all='print') + + self.closePipe() + + for comp in reversed(self.components): + comp.postFrameRender() + + if self.canceled: + print("Export Canceled") + try: + os.remove(self.outputFile) + except Exception: + pass + self.progressBarUpdate.emit(0) + self.progressBarSetText.emit('Export Canceled') + else: + if self.error: + self.failExport() + else: + print("Export Complete") + self.progressBarUpdate.emit(100) + self.progressBarSetText.emit('Export Complete') + + self.error = False + self.canceled = False + self.stopped = True + self.encoding.emit(False) + self.videoCreated.emit() + + def closePipe(self): + try: + self.out_pipe.stdin.close() + except BrokenPipeError: + log.error('Broken pipe to ffmpeg!') + if self.out_pipe.stderr is not None: + log.error(self.out_pipe.stderr.read()) + self.out_pipe.stderr.close() + self.error = True + self.out_pipe.wait() + + def cancelExport(self, message='Export Canceled'): + self.progressBarUpdate.emit(0) + self.progressBarSetText.emit(message) + self.encoding.emit(False) + self.videoCreated.emit() + + def failExport(self): + self.cancelExport('Export Failed') + + def updateProgress(self, pStr, pVal): + self.progressBarValue.emit(pVal) + self.progressBarSetText.emit(pStr) + + def cancel(self): + self.canceled = True + self.stopped = True + self.core.cancel() + + for comp in self.components: + comp.cancel() + + try: + self.out_pipe.send_signal(signal.SIGTERM) + except Exception: + pass + + def reset(self): + self.core.reset() + self.canceled = False + for comp in self.components: + comp.reset() |
