videocr/video.py (view raw)
1from __future__ import annotations
2from concurrent import futures
3import pytesseract
4import cv2
5import timeit
6
7from .models import PredictedFrame, PredictedSubtitle
8
9
10SUBTITLE_BOUND = 10
11
12
13class Video:
14 path: str
15 lang: str
16 num_frames: int
17 pred_frames: List[PredictedFrame]
18
19 def __init__(self, path, lang):
20 self.path = path
21 self.lang = lang
22 v = cv2.VideoCapture(path)
23 self.num_frames = int(v.get(cv2.CAP_PROP_FRAME_COUNT))
24 v.release()
25
26 def _single_frame_ocr(self, img) -> str:
27 img = img[img.shape[0] // 2:, :] # only use bottom half of the frame
28 data = pytesseract.image_to_data(img, lang=self.lang)
29 return data
30
31 def run_ocr(self) -> None:
32 v = cv2.VideoCapture(self.path)
33 frames = (v.read()[1] for _ in range(self.num_frames))
34
35 # perform ocr to all frames in parallel
36 with futures.ProcessPoolExecutor() as pool:
37 frames_ocr = pool.map(self._single_frame_ocr, frames, chunksize=10)
38 self.pred_frames = [PredictedFrame(i, data)
39 for i, data in enumerate(frames_ocr)]
40
41 v.release()
42
43 def get_subtitles(self) -> str:
44 if self.pred_frames is None:
45 raise AttributeError(
46 'Please call self.run_ocr() first to generate ocr of frames')
47
48 # divide ocr of frames into subtitle paragraphs using sliding window
49 i = 0
50 j = 1
51 bound = SUBTITLE_BOUND
52 while j < self.num_frames:
53 fi, fj = self.pred_frames[i], self.pred_frames[j]
54
55 if fi.is_similar_to(fj):
56 bound = SUBTITLE_BOUND
57 elif bound > 0:
58 bound -= 1
59 else:
60 # divide subtitle paragraphs
61 para_new = j - SUBTITLE_BOUND
62 print(PredictedSubtitle(self.pred_frames[i:para_new]).text)
63 i = para_new
64 j = i
65 bound = SUBTITLE_BOUND
66
67 j += 1
68
69 if i < self.num_frames - 1:
70 print(PredictedSubtitle(self.pred_frames[i:]).text)
71
72 return ''
73
74
75time_start = timeit.default_timer()
76v = Video('1.mp4', 'HanS')
77v.run_ocr()
78v.get_subtitles()
79time_stop = timeit.default_timer()
80print(time_stop - time_start)