from PIL import Image from PyQt5 import QtGui, QtCore, QtWidgets import os import math import subprocess import time import logging from ..component import Component from ..toolkit.frame import BlankFrame, scale from ..toolkit import checkOutput, connectWidget from ..toolkit.ffmpeg import ( openPipe, closePipe, getAudioDuration, FfmpegVideo, exampleSound ) log = logging.getLogger('AVP.Components.Spectrum') class Component(Component): name = 'Spectrum' version = '1.0.1' def widget(self, *args): self.previewFrame = None super().widget(*args) self._image = BlankFrame(self.width, self.height) self.chunkSize = 4 * self.width * self.height self.changedOptions = True self.previewSize = (214, 120) self.previewPipe = None if hasattr(self.parent, 'lineEdit_audioFile'): # update preview when audio file changes (if genericPreview is off) self.parent.lineEdit_audioFile.textChanged.connect( self.update ) self.trackWidgets({ 'filterType': self.page.comboBox_filterType, 'window': self.page.comboBox_window, 'mode': self.page.comboBox_mode, 'amplitude': self.page.comboBox_amplitude0, 'amplitude1': self.page.comboBox_amplitude1, 'amplitude2': self.page.comboBox_amplitude2, 'display': self.page.comboBox_display, 'zoom': self.page.spinBox_zoom, 'tc': self.page.spinBox_tc, 'x': self.page.spinBox_x, 'y': self.page.spinBox_y, 'mirror': self.page.checkBox_mirror, 'draw': self.page.checkBox_draw, 'scale': self.page.spinBox_scale, 'color': self.page.comboBox_color, 'compress': self.page.checkBox_compress, 'mono': self.page.checkBox_mono, 'hue': self.page.spinBox_hue, }, relativeWidgets=[ 'x', 'y', ]) for widget in self._trackedWidgets.values(): connectWidget(widget, lambda: self.changed()) def changed(self): self.changedOptions = True def update(self): filterType = self.page.comboBox_filterType.currentIndex() self.page.stackedWidget.setCurrentIndex(filterType) if filterType == 3: self.page.spinBox_hue.setEnabled(False) else: self.page.spinBox_hue.setEnabled(True) if filterType == 2 or filterType == 4: self.page.checkBox_mono.setEnabled(False) else: self.page.checkBox_mono.setEnabled(True) def previewRender(self): changedSize = self.updateChunksize() if not changedSize \ and not self.changedOptions \ and self.previewFrame is not None: log.debug( 'Spectrum #%s is reusing old preview frame' % self.compPos) return self.previewFrame frame = self.getPreviewFrame() self.changedOptions = False if not frame: log.warning( 'Spectrum #%s failed to create a preview frame' % self.compPos) self.previewFrame = None return BlankFrame(self.width, self.height) else: self.previewFrame = frame return frame def preFrameRender(self, **kwargs): super().preFrameRender(**kwargs) if self.previewPipe is not None: self.previewPipe.wait() self.updateChunksize() w, h = scale(self.scale, self.width, self.height, str) self.video = FfmpegVideo( inputPath=self.audioFile, filter_=self.makeFfmpegFilter(), width=w, height=h, chunkSize=self.chunkSize, frameRate=int(self.settings.value("outputFrameRate")), parent=self.parent, component=self, ) def frameRender(self, frameNo): if FfmpegVideo.threadError is not None: raise FfmpegVideo.threadError return self.finalizeFrame(self.video.frame(frameNo)) def postFrameRender(self): closePipe(self.video.pipe) def getPreviewFrame(self): genericPreview = self.settings.value("pref_genericPreview") startPt = 0 if not genericPreview: inputFile = self.parent.lineEdit_audioFile.text() if not inputFile or not os.path.exists(inputFile): return duration = getAudioDuration(inputFile) if not duration: return startPt = duration / 3 command = [ self.core.FFMPEG_BIN, '-thread_queue_size', '512', '-r', str(self.settings.value("outputFrameRate")), '-ss', "{0:.3f}".format(startPt), '-i', self.core.junkStream if genericPreview else inputFile, '-f', 'image2pipe', '-pix_fmt', 'rgba', ] command.extend(self.makeFfmpegFilter(preview=True, startPt=startPt)) command.extend([ '-an', '-s:v', '%sx%s' % scale(self.scale, self.width, self.height, str), '-codec:v', 'rawvideo', '-', '-frames:v', '1', ]) if self.core.logEnabled: logFilename = os.path.join( self.core.logDir, 'preview_%s.log' % str(self.compPos)) log.debug('Creating FFmpeg process (log at %s)' % logFilename) with open(logFilename, 'w') as logf: logf.write(" ".join(command) + '\n\n') with open(logFilename, 'a') as logf: self.previewPipe = openPipe( command, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=logf, bufsize=10**8 ) else: self.previewPipe = openPipe( command, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8 ) byteFrame = self.previewPipe.stdout.read(self.chunkSize) closePipe(self.previewPipe) frame = self.finalizeFrame(byteFrame) return frame def makeFfmpegFilter(self, preview=False, startPt=0): '''Makes final FFmpeg filter command''' def getFilterComplexCommand(): '''Inner function that creates the final, complex part of the filter command''' nonlocal self genericPreview = self.settings.value("pref_genericPreview") def getFilterComplexCommandForType(): '''Determine portion of filter command that changes depending on selected type''' nonlocal self if preview: w, h = self.previewSize else: w, h = (self.width, self.height) color = self.page.comboBox_color.currentText().lower() if self.filterType == 0: # Spectrum if self.amplitude == 0: amplitude = 'sqrt' elif self.amplitude == 1: amplitude = 'cbrt' elif self.amplitude == 2: amplitude = '4thrt' elif self.amplitude == 3: amplitude = '5thrt' elif self.amplitude == 4: amplitude = 'lin' elif self.amplitude == 5: amplitude = 'log' filter_ = ( f'showspectrum=s={w}x{h}:' 'slide=scroll:' f'win_func={self.page.comboBox_window.currentText()}:' f'color={color}:' f'scale={amplitude},' 'colorkey=color=black:' 'similarity=0.1:blend=0.5' ) elif self.filterType == 1: # Histogram if self.amplitude1 == 0: amplitude = 'log' elif self.amplitude1 == 1: amplitude = 'lin' if self.display == 0: display = 'log' elif self.display == 1: display = 'sqrt' elif self.display == 2: display = 'cbrt' elif self.display == 3: display = 'lin' elif self.display == 4: display = 'rlog' filter_ = ( f'ahistogram=r={str(self.settings.value("outputFrameRate"))}:' f's={w}x{h}:' 'dmode=separate:' f'ascale={amplitude}:' f'scale={display}' ) elif self.filterType == 2: # Vector Scope if self.amplitude2 == 0: amplitude = 'log' elif self.amplitude2 == 1: amplitude = 'sqrt' elif self.amplitude2 == 2: amplitude = 'cbrt' elif self.amplitude2 == 3: amplitude = 'lin' m = self.page.comboBox_mode.currentText() filter_ = ( f'avectorscope=s={w}x{h}:' f'draw={"line" if self.draw else "dot"}:' f'm={m}:' f'scale={amplitude}:' f'zoom={str(self.zoom)}' ) elif self.filterType == 3: # Musical Scale filter_ = ( f'showcqt=r={str(self.settings.value("outputFrameRate"))}:' f's={w}x{h}:' 'count=30:' 'text=0:' f'tc={str(self.tc)},' 'colorkey=color=black:' 'similarity=0.1:blend=0.5' ) elif self.filterType == 4: # Phase filter_ = ( f'aphasemeter=r={str(self.settings.value("outputFrameRate"))}:' f's={w}x{h}:' 'video=1 [atrash][vtmp1]; ' '[atrash] anullsink; ' '[vtmp1] colorkey=color=black:' 'similarity=0.1:blend=0.5, ' 'crop=in_w/8:in_h:(in_w/8)*7:0 ' ) return filter_ if self.filterType < 2: exampleSnd = exampleSound('freq') elif self.filterType == 2 or self.filterType == 4: exampleSnd = exampleSound('stereo') elif self.filterType == 3: exampleSnd = exampleSound('white') compression = 'compand=gain=4,' if self.compress else '' aformat = 'aformat=channel_layouts=mono,' if self.mono and self.filterType not in (2, 4) else '' filter_ = getFilterComplexCommandForType() hflip = 'hflip, ' if self.mirror else '' trim = 'trim=start=%s:end=%s, ' % ("{0:.3f}".format(startPt + 12), "{0:.3f}".format(startPt + 12.5)) if preview else '' scale_ = 'scale=%sx%s' % scale(self.scale, self.width, self.height, str) hue = ', hue=h=%s:s=10' % str(self.hue) if self.hue > 0 and self.filterType != 3 else '' convolution = ', convolution=-2 -1 0 -1 1 1 0 1 2:-2 -1 0 -1 1 1 0 1 2:-2 -1 0 -1 1 1 0 1 2:-2 -1 0 -1 1 1 0 1 2' if self.filterType == 3 else '' return ( f"{exampleSnd if preview and genericPreview else '[0:a] '}" f"{compression}{aformat}{filter_} [v1]; " f"[v1] {hflip}{trim}{scale_}{hue}{convolution} [v]" ) return [ '-filter_complex', getFilterComplexCommand(), '-map', '[v]', ] def updateChunksize(self): width, height = scale(self.scale, self.width, self.height, int) oldChunkSize = int(self.chunkSize) self.chunkSize = 4 * width * height changed = self.chunkSize != oldChunkSize return changed def finalizeFrame(self, imageData): try: image = Image.frombytes( 'RGBA', scale(self.scale, self.width, self.height, int), imageData ) self._image = image except ValueError: image = self._image frame = BlankFrame(self.width, self.height) frame.paste(image, box=(self.x, self.y)) return frame