-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathutils.py
129 lines (95 loc) · 4.04 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import os
import subprocess, random, string
from time import time, sleep
from glob import glob
from threading import Timer
from datetime import datetime
ASSETS_DIR = 'assets'
TMP_DIR = f"{ASSETS_DIR}/tmp"
LOG_FILE = open('ffm.log','a')
class RepeatTimer(Timer):
def run(self):
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)
def getRandomString():
return ''.join(random.choice(string.ascii_letters) for _ in range(10))
def getFileName(name: str, ext: str) -> str:
now = datetime.now()
fileName = f"{name}_{now.strftime('%d%m%y_%H%M%S')}.{ext}"
dirPath = os.path.join(ASSETS_DIR, f"{now.strftime('%d_%m_%Y')}")
os.makedirs(dirPath, exist_ok=True)
return os.path.join(dirPath, fileName)
def splitFilesInChunks(inFile, chunks=300):
# split in chunks of 5 minutes
if os.path.exists(TMP_DIR):
for f in glob(f'{TMP_DIR}/*'): os.remove(f)
else:
os.makedirs(TMP_DIR)
_, extn = os.path.splitext(inFile)
subprocess.call(
['ffmpeg', '-y','-i', inFile, '-f', 'segment', '-segment_time', f'{chunks}',
'-c', 'copy', '-reset_timestamps', 'true', f'{TMP_DIR}/out%03d.{extn}'],
shell=False,
stdin=subprocess.PIPE,
stdout=LOG_FILE,
stderr=LOG_FILE
)
return glob(f'{TMP_DIR}/*')
class AVRecorder:
def __init__(self):
# get list of devices with `ffmpeg -list_devices true -f dshow -i dummy`
# control the `video_size`, `vcodec`, `crf`, `-r`(FPS) to tweak the video quality/size
self.commonFlags = 'ffmpeg -hide_banner -f dshow -y -video_size 1280x720 -rtbufsize 2G'.split()
self.vidFlags = "-vcodec libx265 -crf 28 -r 17".split()
# self.audioInput = "audio=Headset (realme Buds Wireless 2 Neo Hands-Free AG Audio)"
# self.videoInput = "video=HP HD Camera"
self.videoInput = "video=GENERAL WEBCAM"
self.audioInput = "audio=Microphone (GENERAL WEBCAM)"
self.isRunning = False
def startVideoRec(self):
self.fileName = getFileName('Video', 'mp4')
self.startTime = time()
return self.runCommand([*self.commonFlags, '-i', f"{self.videoInput}:{self.audioInput}", *self.vidFlags, self.fileName])
def startAudeoRec(self):
self.fileName = getFileName('Audio', 'm4a')
self.startTime = time()
return self.runCommand([*self.commonFlags, '-i', self.audioInput, self.fileName])
def takePicture(self):
# Often times the first photo that ffmpeg takes does not have proper focus and exposure.
# Thus, take 5 photo to focus the camera and use the last photo
self.runCommand([*self.commonFlags, '-i', self.videoInput, '-frames:v', '5', f'{TMP_DIR}/pic%03d.jpg'])
ret = self.release()
if ret:
self.isRunning = False
return None
fileName = getFileName('Photo', 'jpg')
os.rename(f'{TMP_DIR}/pic005.jpg',fileName)
return fileName
def runCommand(self, command):
if self.isRunning : return
self.isRunning = True
self.recorder = subprocess.Popen(
command,
shell=False,
stdin=subprocess.PIPE,
stdout=LOG_FILE,
stderr=LOG_FILE,
)
sleep(1)
# wait and check if the command fails
# it should return None if it working
ifFailed = self.recorder.poll()
if ifFailed: self.isRunning = False
return ifFailed
def close(self):
self.recorder.stdin.write('q'.encode("GBK"))
self.release()
duration = time() - self.startTime
return self.fileName, duration
def release(self):
self.recorder.communicate()
# self.recorder.wait() # communicate explicitly calls the wait
self.isRunning = False
self.recorder.terminate()
self.recorder.kill()
return self.recorder.returncode