-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathatomutil.py
executable file
·125 lines (97 loc) · 4.06 KB
/
atomutil.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#!/usr/bin/env python
from pathlib import Path
import argparse
import cv2
from atomcam import DetectMeteor, ATOM_CAM_IP, ATOM_CAM_USER, ATOM_CAM_PASS
from atomcam import check_clock, set_clock
def make_ftpcmd(meteor_list, directory):
'''
検出されたログから画像をダウンロードするFTPコマンドを生成する。
'''
wget = "wget -nc -r -nv -nH --cut-dirs=3"
if directory:
wget += " -P {}".format(directory)
with open(meteor_list, "r") as f:
for line in f.readlines():
if line.startswith('#'):
continue
# 検出時刻から動画ファイル名を生成する。
(date, time) = line.split()[0:2]
hh, mm, ss = time.split(':')
date_dir = ''.join(date.split('/'))
mp4_file = "{}/{}/{}.mp4".format(date_dir, hh, mm)
url = "ftp://{}:{}@{}/media/mmc/record/{}".format(
ATOM_CAM_USER, ATOM_CAM_PASS, ATOM_CAM_IP, mp4_file
)
print("{} {}".format(wget, url))
def detect_meteors(meteor_list):
'''
検出された流星リストから再検出(修正中)
'''
with open(meteor_list, "r") as f:
prev_file = None
for line in f.readlines():
if line.startswith('#'):
continue
(date, time) = line.split()[0:2]
date_dir = ''.join(date.split('/'))
hh, mm, ss = time.split(':')
file_path = Path(date_dir, hh, "{}.mp4".format(mm))
if file_path != prev_file:
print(file_path)
detecter = DetectMeteor(str(file_path))
detecter.meteor(2)
prev_file = file_path
def make_movie(meteor_list, output="movie.mp4", fps=1.0):
'''
検出された流星リストから動画作成(未完成)
Args:
meteor_list: 検出された流星のログファイル
outout: 出力動画ファイル名
'''
data_dir = Path(meteor_list).parents[0]
date_dir = Path(meteor_list).stem
# とりあえずATOM Camサイズ
size = (1920, 1080)
fourcc = cv2.VideoWriter_fourcc('m', 'p', '4', 'v')
print(f'FPS={fps}')
video = cv2.VideoWriter(output, fourcc, fps, size)
with open(meteor_list, "r") as f:
for line in f.readlines():
if line.startswith('#'):
continue
(date, time) = line.split()[0:2]
date_str = ''.join(date.split('/'))
hh, mm, ss = time.split(':')
filename = "{}{}{}{}.jpg".format(date_str, hh, mm, ss)
file_path = str(Path(data_dir, date_dir, filename))
print(file_path)
try:
img = cv2.imread(file_path)
video.write(img)
except Exception as e:
print(str(e))
video.release()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('meteors', nargs='?', help="List of detected meteors (text file)")
parser.add_argument('-f', '--ftp', action='store_true', help='FTPコマンド作成')
parser.add_argument('-d', '--directory', default=None, help='FTPコマンド取得先ディレクトリ名')
parser.add_argument('-m', '--movie', action='store_true', help='FTPコマンド作成')
parser.add_argument('-o', '--output', default='movie.mp4', help='動画ファイル名(.mp4)')
parser.add_argument('-c', '--clock', action='store_true', help='ATOM Camの時計のチェック')
parser.add_argument('-s', '--set_clock', action='store_true', help='ATOM Camの時計をホスト側に合わせる')
parser.add_argument('-F', '--fps', default=1, type=int, help='動画生成時のFPS')
args = parser.parse_args()
# print("# {}".format(args.meteors))
if args.ftp:
make_ftpcmd(args.meteors, args.directory)
elif args.movie:
make_movie(args.meteors, args.output, args.fps)
#make_movie(args.meteors, args.output)
elif args.clock:
check_clock()
elif args.set_clock:
set_clock()
else:
detect_meteors(args.meteors)