aboutsummaryrefslogtreecommitdiff
path: root/core.py
blob: 7b3c69a8226ada0236f49b298866ecdf5b58bfaf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import sys
import io
import os
from PyQt4 import QtCore, QtGui, uic
from os.path import expanduser
import subprocess as sp
import numpy
from PIL import Image
import tempfile
from shutil import rmtree
import atexit
import time
from collections import OrderedDict
import json


class Core():

    def __init__(self):
        self.FFMPEG_BIN = self.findFfmpeg()
        self.tempDir = os.path.join(
            tempfile.gettempdir(), 'audio-visualizer-python-data')
        if not os.path.exists(self.tempDir):
            os.makedirs(self.tempDir)
        atexit.register(self.deleteTempDir)
        self.wd = os.path.dirname(os.path.realpath(__file__))
        self.loadEncoderOptions()

    def loadEncoderOptions(self):
        file_path = os.path.join(self.wd, 'encoder-options.json')
        with open(file_path) as json_file:
            self.encoder_options = json.load(json_file)

    def findFfmpeg(self):
        if sys.platform == "win32":
            return "ffmpeg.exe"
        else:
            try:
                with open(os.devnull, "w") as f:
                    sp.check_call(['ffmpeg', '-version'], stdout=f, stderr=f)
                return "ffmpeg"
            except:
                return "avconv"

    def readAudioFile(self, filename, parent):
        command = [self.FFMPEG_BIN, '-i', filename]

        try:
            fileInfo = sp.check_output(command, stderr=sp.STDOUT, shell=False)
        except sp.CalledProcessError as ex:
            fileInfo = ex.output
            pass

        info = fileInfo.decode("utf-8").split('\n')
        for line in info:
            if 'Duration' in line:
                d = line.split(',')[0]
                d = d.split(' ')[3]
                d = d.split(':')
                duration = float(d[0])*3600 + float(d[1])*60 + float(d[2])

        command = [
            self.FFMPEG_BIN,
            '-i', filename,
            '-f', 's16le',
            '-acodec', 'pcm_s16le',
            '-ar', '44100',  # ouput will have 44100 Hz
            '-ac', '1',  # mono (set to '2' for stereo)
            '-']
        in_pipe = sp.Popen(
            command, stdout=sp.PIPE, stderr=sp.DEVNULL, bufsize=10**8)

        completeAudioArray = numpy.empty(0, dtype="int16")

        progress = 0
        lastPercent = None
        while True:
            if self.canceled:
                break
            # read 2 seconds of audio
            progress = progress + 4
            raw_audio = in_pipe.stdout.read(88200*4)
            if len(raw_audio) == 0:
                break
            audio_array = numpy.fromstring(raw_audio, dtype="int16")
            completeAudioArray = numpy.append(completeAudioArray, audio_array)

            percent = int(100*(progress/duration))
            if percent >= 100:
                percent = 100

            if lastPercent != percent:
                string = 'Loading audio file: '+str(percent)+'%'
                parent.progressBarSetText.emit(string)
                parent.progressBarUpdate.emit(percent)

            lastPercent = percent

        in_pipe.kill()
        in_pipe.wait()

        # add 0s the end
        completeAudioArrayCopy = numpy.zeros(
            len(completeAudioArray) + 44100, dtype="int16")
        completeAudioArrayCopy[:len(completeAudioArray)] = completeAudioArray
        completeAudioArray = completeAudioArrayCopy

        return completeAudioArray

    def deleteTempDir(self):
        try:
            rmtree(self.tempDir)
        except FileNotFoundError:
            pass

    def cancel(self):
        self.canceled = True

    def reset(self):
        self.canceled = False

    @staticmethod
    def stringOrderedDict(dictionary):
        sorted_ = OrderedDict(sorted(dictionary.items(), key=lambda t: t[0]))
        return repr(sorted_)