aboutsummaryrefslogtreecommitdiff
path: root/src/avp/core.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/avp/core.py')
-rw-r--r--src/avp/core.py597
1 files changed, 597 insertions, 0 deletions
diff --git a/src/avp/core.py b/src/avp/core.py
new file mode 100644
index 0000000..df6ff63
--- /dev/null
+++ b/src/avp/core.py
@@ -0,0 +1,597 @@
+"""
+Home to the Core class which tracks program state. Used by GUI & commandline
+to create a list of components and create a video thread to export.
+"""
+
+from PyQt6 import QtCore, QtGui, uic
+import sys
+import os
+import json
+from importlib import import_module
+import logging
+
+from . import toolkit
+
+
+log = logging.getLogger("AVP.Core")
+STDOUT_LOGLVL = logging.WARNING
+FILE_LIBLOGLVL = logging.WARNING
+FILE_LOGLVL = logging.INFO
+
+
+class Core:
+ """
+ MainWindow and Command module both use an instance of this class
+ to store the core program state. This object tracks the components,
+ talks to the components, handles opening/creating project files
+ and presets, and creates the video thread to export.
+ This class also stores constants as class variables.
+ """
+
+ def __init__(self):
+ self.importComponents()
+ self.selectedComponents = []
+ self.savedPresets = {} # copies of presets to detect modification
+ self.openingProject = False
+
+ def __repr__(self):
+ return "\n=~=~=~=\n".join([repr(comp) for comp in self.selectedComponents])
+
+ def importComponents(self):
+ def findComponents():
+ for f in os.listdir(Core.componentsPath):
+ name, ext = os.path.splitext(f)
+ if name.startswith("__"):
+ continue
+ elif ext == ".py":
+ yield name
+
+ log.debug("Importing component modules")
+ self.modules = [
+ import_module(".components.%s" % name, __package__)
+ for name in findComponents()
+ ]
+ # store canonical module names and indexes
+ self.moduleIndexes = [i for i in range(len(self.modules))]
+ self.compNames = [mod.Component.name for mod in self.modules]
+ # alphabetize modules by Component name
+ sortedModules = sorted(zip(self.compNames, self.modules))
+ self.compNames = [y[0] for y in sortedModules]
+ self.modules = [y[1] for y in sortedModules]
+
+ # store alternative names for modules
+ self.altCompNames = []
+ for i, mod in enumerate(self.modules):
+ if hasattr(mod.Component, "names"):
+ for name in mod.Component.names():
+ self.altCompNames.append((name, i))
+
+ def componentListChanged(self):
+ for i, component in enumerate(self.selectedComponents):
+ component.compPos = i
+
+ def insertComponent(self, compPos, component, loader):
+ """
+ Creates a new component using these args:
+ (compPos, component obj or moduleIndex, MWindow/Command/Core obj)
+ """
+ if compPos < 0 or compPos > len(self.selectedComponents):
+ compPos = len(self.selectedComponents)
+ if len(self.selectedComponents) > 50:
+ return -1
+ if type(component) is int:
+ # create component using module index in self.modules
+ moduleIndex = int(component)
+ log.debug("Creating new component from module #%s", str(moduleIndex))
+ component = self.modules[moduleIndex].Component(moduleIndex, compPos, self)
+ component.widget(loader)
+ else:
+ moduleIndex = -1
+ log.debug("Inserting previously-created %s component", component.name)
+
+ component._error.connect(loader.videoThreadError)
+ self.selectedComponents.insert(compPos, component)
+ if hasattr(loader, "insertComponent"):
+ loader.insertComponent(compPos)
+
+ self.componentListChanged()
+ self.updateComponent(compPos)
+ return compPos
+
+ def moveComponent(self, startI, endI):
+ comp = self.selectedComponents.pop(startI)
+ self.selectedComponents.insert(endI, comp)
+
+ self.componentListChanged()
+ return endI
+
+ def removeComponent(self, i):
+ self.selectedComponents.pop(i)
+ self.componentListChanged()
+
+ def clearComponents(self):
+ self.selectedComponents = list()
+ self.componentListChanged()
+
+ def updateComponent(self, i):
+ log.debug("Auto-updating %s #%s", self.selectedComponents[i], str(i))
+ self.selectedComponents[i].update(auto=True)
+
+ def moduleIndexFor(self, compName):
+ try:
+ index = self.compNames.index(compName)
+ return self.moduleIndexes[index]
+ except ValueError:
+ for altName, modI in self.altCompNames:
+ if altName == compName:
+ return self.moduleIndexes[modI]
+
+ def clearPreset(self, compIndex):
+ self.selectedComponents[compIndex].currentPreset = None
+
+ def openPreset(self, filepath, compIndex, presetName):
+ """Applies a preset to a specific component"""
+ saveValueStore = self.getPreset(filepath)
+ if not saveValueStore:
+ return False
+ comp = self.selectedComponents[compIndex]
+ comp.loadPreset(saveValueStore, presetName)
+
+ self.savedPresets[presetName] = dict(saveValueStore)
+ return True
+
+ def getPreset(self, filepath):
+ """Returns the preset dict stored at this filepath"""
+ if not os.path.exists(filepath):
+ return False
+ with open(filepath, "r") as f:
+ for line in f:
+ saveValueStore = toolkit.presetFromString(line.strip())
+ break
+ return saveValueStore
+
+ def getPresetDir(self, comp):
+ """Get the preset subdir for a particular version of a component"""
+ return os.path.join(Core.presetDir, comp.name, str(comp.version))
+
+ def openProject(self, loader, filepath):
+ """loader is the object calling this method which must have
+ its own showMessage(**kwargs) method for displaying errors.
+ """
+ if not os.path.exists(filepath):
+ loader.showMessage(msg="Project file not found.")
+ return
+
+ errcode, data = self.parseAvFile(filepath)
+ if errcode == 0:
+ self.openingProject = True
+ try:
+ if hasattr(loader, "window"):
+ for widget, value in data["WindowFields"]:
+ widget = eval("loader.%s" % widget)
+ with toolkit.blockSignals(widget):
+ toolkit.setWidgetValue(widget, value)
+
+ for key, value in data["Settings"]:
+ Core.settings.setValue(key, value)
+ for tup in data["Components"]:
+ name, vers, preset = tup
+ clearThis = False
+ modified = False
+
+ # add loaded named presets to savedPresets dict
+ if "preset" in preset and preset["preset"] is not None:
+ nam = preset["preset"]
+ filepath2 = os.path.join(Core.presetDir, name, str(vers), nam)
+ origSaveValueStore = self.getPreset(filepath2)
+ if origSaveValueStore:
+ self.savedPresets[nam] = dict(origSaveValueStore)
+ modified = not origSaveValueStore == preset
+ else:
+ # saved preset was renamed or deleted
+ clearThis = True
+
+ # create the actual component object & get its index
+ i = self.insertComponent(-1, self.moduleIndexFor(name), loader)
+ if i is None:
+ loader.showMessage(
+ msg=f"Component '{name}' didn't initialize correctly and had to be removed."
+ )
+ continue
+ if i == -1:
+ loader.showMessage(msg="Too many components!")
+ break
+
+ try:
+ if "preset" in preset and preset["preset"] is not None:
+ self.selectedComponents[i].loadPreset(preset)
+ else:
+ self.selectedComponents[i].loadPreset(
+ preset, preset["preset"]
+ )
+ except KeyError as e:
+ log.warning(
+ "%s missing value: %s" % (self.selectedComponents[i], e)
+ )
+
+ if clearThis:
+ self.clearPreset(i)
+ if hasattr(loader, "updateComponentTitle"):
+ loader.updateComponentTitle(i, modified)
+ self.openingProject = False
+ return True
+ except Exception:
+ errcode = 1
+ data = sys.exc_info()
+
+ if errcode == 1:
+ typ, value, tb = data
+ if typ.__name__ == "KeyError":
+ # probably just an old version, still loadable
+ log.warning("Project file missing value: %s" % value)
+ return
+ if hasattr(loader, "createNewProject"):
+ loader.createNewProject(prompt=False)
+ msg = "%s: %s\n\n" % (typ.__name__, value)
+ msg += toolkit.formatTraceback(tb)
+ loader.showMessage(
+ msg="Project file '%s' is corrupted." % filepath,
+ showCancel=False,
+ icon="Warning",
+ detail=msg,
+ )
+ self.openingProject = False
+ return False
+
+ def parseAvFile(self, filepath):
+ """
+ Parses an avp (project) or avl (preset package) file.
+ Returns dictionary with section names as the keys, each one
+ contains a list of tuples: (compName, version, compPresetDict)
+ """
+ log.debug("Parsing av file: %s", filepath)
+ validSections = ("Components", "Settings", "WindowFields")
+ data = {sect: [] for sect in validSections}
+ try:
+ with open(filepath, "r") as f:
+
+ def parseLine(line):
+ """Decides if a file line is a section header"""
+ line = line.strip()
+ newSection = ""
+
+ if (
+ line.startswith("[")
+ and line.endswith("]")
+ and line[1:-1] in validSections
+ ):
+ newSection = line[1:-1]
+
+ return line, newSection
+
+ section = ""
+ i = 0
+ for line in f:
+ line, newSection = parseLine(line)
+ if newSection:
+ section = str(newSection)
+ continue
+ if line and section == "Components":
+ if i == 0:
+ lastCompName = str(line)
+ i += 1
+ elif i == 1:
+ lastCompVers = str(line)
+ i += 1
+ elif i == 2:
+ lastCompPreset = toolkit.presetFromString(line)
+ data[section].append(
+ (lastCompName, lastCompVers, lastCompPreset)
+ )
+ i = 0
+ elif line and section:
+ key, value = line.split("=", 1)
+ data[section].append((key, value.strip()))
+
+ return 0, data
+ except Exception:
+ return 1, sys.exc_info()
+
+ def importPreset(self, filepath):
+ errcode, data = self.parseAvFile(filepath)
+ returnList = []
+ if errcode == 0:
+ name, vers, preset = data["Components"][0]
+ presetName = (
+ preset["preset"]
+ if preset["preset"]
+ else os.path.basename(filepath)[:-4]
+ )
+ newPath = os.path.join(Core.presetDir, name, vers, presetName)
+ if os.path.exists(newPath):
+ return False, newPath
+ preset["preset"] = presetName
+ self.createPresetFile(name, vers, presetName, preset)
+ return True, presetName
+ elif errcode == 1:
+ # TODO: an error message
+ return False, ""
+
+ def exportPreset(self, exportPath, compName, vers, origName):
+ internalPath = os.path.join(Core.presetDir, compName, str(vers), origName)
+ if not os.path.exists(internalPath):
+ return
+ if os.path.exists(exportPath):
+ os.remove(exportPath)
+ with open(internalPath, "r") as f:
+ internalData = [line for line in f]
+ try:
+ saveValueStore = toolkit.presetFromString(internalData[0].strip())
+ self.createPresetFile(compName, vers, origName, saveValueStore, exportPath)
+ return True
+ except Exception:
+ return False
+
+ def createPresetFile(self, compName, vers, presetName, saveValueStore, filepath=""):
+ """Create a preset file (.avl) at filepath using args.
+ Or if filepath is empty, create an internal preset using args"""
+ if not filepath:
+ dirname = os.path.join(Core.presetDir, compName, str(vers))
+ if not os.path.exists(dirname):
+ os.makedirs(dirname)
+ filepath = os.path.join(dirname, presetName)
+ internal = True
+ else:
+ if not filepath.endswith(".avl"):
+ filepath += ".avl"
+ internal = False
+
+ with open(filepath, "w") as f:
+ if not internal:
+ f.write("[Components]\n")
+ f.write("%s\n" % compName)
+ f.write("%s\n" % str(vers))
+ f.write(toolkit.presetToString(saveValueStore))
+
+ def createProjectFile(self, filepath, window=None):
+ """Create a project file (.avp) using the current program state"""
+ log.info("Creating %s", filepath)
+ settingsKeys = [
+ "componentDir",
+ "inputDir",
+ "outputDir",
+ "presetDir",
+ "projectDir",
+ ]
+ try:
+ if not filepath.endswith(".avp"):
+ filepath += ".avp"
+ if os.path.exists(filepath):
+ os.remove(filepath)
+
+ with open(filepath, "w") as f:
+ f.write("[Components]\n")
+ for comp in self.selectedComponents:
+ saveValueStore = comp.savePreset()
+ saveValueStore["preset"] = comp.currentPreset
+ f.write("%s\n" % str(comp))
+ f.write("%s\n" % str(comp.version))
+ f.write("%s\n" % toolkit.presetToString(saveValueStore))
+
+ f.write("\n[Settings]\n")
+ for key in Core.settings.allKeys():
+ if key in settingsKeys:
+ f.write("%s=%s\n" % (key, Core.settings.value(key)))
+
+ if window:
+ f.write("\n[WindowFields]\n")
+ f.write(
+ "lineEdit_audioFile=%s\n"
+ "lineEdit_outputFile=%s\n"
+ % (
+ window.lineEdit_audioFile.text(),
+ window.lineEdit_outputFile.text(),
+ )
+ )
+ return True
+ except Exception:
+ return False
+
+ def newVideoWorker(self, loader, audioFile, outputPath):
+ """loader is MainWindow or Command object which must own the thread"""
+ from . import video_thread
+
+ self.videoThread = QtCore.QThread(loader)
+ videoWorker = video_thread.Worker(
+ loader, audioFile, outputPath, self.selectedComponents
+ )
+ videoWorker.moveToThread(self.videoThread)
+ videoWorker.videoCreated.connect(self.stopVideoThread)
+
+ self.videoThread.start()
+ return videoWorker
+
+ def stopVideoThread(self):
+ self.videoThread.quit()
+ self.videoThread.wait()
+
+ def cancel(self):
+ Core.canceled = True
+
+ def reset(self):
+ Core.canceled = False
+
+ @classmethod
+ def storeSettings(cls):
+ """Store settings/paths to directories as class variables"""
+ from .__init__ import wd
+ from .toolkit.ffmpeg import findFfmpeg
+
+ cls.wd = wd
+ dataDir = QtCore.QStandardPaths.writableLocation(
+ QtCore.QStandardPaths.StandardLocation.AppConfigLocation
+ )
+ # Windows: C:/Users/<USER>/AppData/Local/audio-visualizer
+ # macOS: ~/Library/Preferences/audio-visualizer
+ # Linux: ~/.config/audio-visualizer
+ with open(os.path.join(wd, "encoder-options.json")) as json_file:
+ encoderOptions = json.load(json_file)
+
+ # Locate FFmpeg
+ ffmpegBin = findFfmpeg()
+ if not ffmpegBin:
+ print("Could not find FFmpeg")
+
+ settings = {
+ "canceled": False,
+ "FFMPEG_BIN": ffmpegBin,
+ "dataDir": dataDir,
+ "settings": QtCore.QSettings(
+ os.path.join(dataDir, "settings.ini"),
+ QtCore.QSettings.Format.IniFormat,
+ ),
+ "presetDir": os.path.join(dataDir, "presets"),
+ "componentsPath": os.path.join(wd, "components"),
+ "junkStream": os.path.join(wd, "gui", "background.png"),
+ "encoderOptions": encoderOptions,
+ "resolutions": [
+ "1920x1080",
+ "1280x720",
+ "854x480",
+ ],
+ "logDir": os.path.join(dataDir, "log"),
+ "logEnabled": False,
+ "previewEnabled": True,
+ }
+
+ settings["videoFormats"] = toolkit.appendUppercase(
+ [
+ "*.mp4",
+ "*.mov",
+ "*.mkv",
+ "*.avi",
+ "*.webm",
+ "*.flv",
+ ]
+ )
+ settings["audioFormats"] = toolkit.appendUppercase(
+ [
+ "*.mp3",
+ "*.wav",
+ "*.ogg",
+ "*.fla",
+ "*.flac",
+ "*.aac",
+ ]
+ )
+ settings["imageFormats"] = toolkit.appendUppercase(
+ [
+ "*.png",
+ "*.jpg",
+ "*.tif",
+ "*.tiff",
+ "*.gif",
+ "*.bmp",
+ "*.ico",
+ "*.xbm",
+ "*.xpm",
+ ]
+ )
+
+ # Register all settings as class variables
+ for classvar, val in settings.items():
+ setattr(cls, classvar, val)
+
+ cls.loadDefaultSettings()
+ if not os.path.exists(cls.dataDir):
+ os.makedirs(cls.dataDir)
+ for neededDirectory in (
+ cls.presetDir,
+ cls.logDir,
+ cls.settings.value("projectDir"),
+ ):
+ if not os.path.exists(neededDirectory):
+ os.mkdir(neededDirectory)
+ cls.makeLogger(deleteOldLogs=True)
+
+ @classmethod
+ def loadDefaultSettings(cls):
+ # settings that get saved into the ini file
+ cls.defaultSettings = {
+ "outputWidth": 1280,
+ "outputHeight": 720,
+ "outputFrameRate": 30,
+ "outputAudioCodec": "AAC",
+ "outputAudioBitrate": "192",
+ "outputVideoCodec": "H264",
+ "outputVideoBitrate": "2500",
+ "outputVideoFormat": "yuv420p",
+ "outputPreset": "medium",
+ "outputFormat": "mp4",
+ "outputContainer": "MP4",
+ "projectDir": os.path.join(cls.dataDir, "projects"),
+ "pref_insertCompAtTop": True,
+ "pref_genericPreview": True,
+ "pref_undoLimit": 10,
+ }
+
+ for parm, value in cls.defaultSettings.items():
+ if cls.settings.value(parm) is None:
+ cls.settings.setValue(parm, value)
+
+ # Allow manual editing of prefs. (Surprisingly necessary as Qt seems to
+ # store True as 'true' but interprets a manually-added 'true' as str.)
+ for key in cls.settings.allKeys():
+ if not key.startswith("pref_"):
+ continue
+ val = cls.settings.value(key)
+ try:
+ val = int(val)
+ except ValueError:
+ if val == "true":
+ val = True
+ elif val == "false":
+ val = False
+ cls.settings.setValue(key, val)
+
+ @staticmethod
+ def makeLogger(deleteOldLogs=False):
+ # send critical log messages to stdout
+ logStream = logging.StreamHandler()
+ logStream.setLevel(STDOUT_LOGLVL)
+ streamFormatter = logging.Formatter("<%(name)s> %(levelname)s: %(message)s")
+ logStream.setFormatter(streamFormatter)
+ log = logging.getLogger("AVP")
+ log.addHandler(logStream)
+
+ if FILE_LOGLVL is not None:
+ # write log files as well!
+ Core.logEnabled = True
+ logFilename = os.path.join(Core.logDir, "avp_debug.log")
+ libLogFilename = os.path.join(Core.logDir, "global_debug.log")
+
+ if deleteOldLogs:
+ for log_ in (logFilename, libLogFilename):
+ if os.path.exists(log_):
+ os.remove(log_)
+
+ logFile = logging.FileHandler(logFilename, delay=True)
+ logFile.setLevel(FILE_LOGLVL)
+ libLogFile = logging.FileHandler(libLogFilename, delay=True)
+ libLogFile.setLevel(FILE_LIBLOGLVL)
+ fileFormatter = logging.Formatter(
+ "[%(asctime)s] %(threadName)-10.10s %(name)-23.23s %(levelname)s: "
+ "%(message)s"
+ )
+ logFile.setFormatter(fileFormatter)
+ libLogFile.setFormatter(fileFormatter)
+
+ libLog = logging.getLogger()
+ log.addHandler(logFile)
+ libLog.addHandler(libLogFile)
+ # lowest level must be explicitly set on the root Logger
+ libLog.setLevel(0)
+
+
+# always store settings in class variables even if a Core object is not created
+Core.storeSettings()