all repos — videocr @ 9360ebdd4067739cb0658dbc983c6117512243d0

Extract hardcoded subtitles from videos using machine learning

videocr/video.py (view raw)

  1from __future__ import annotations
  2from multiprocessing import Pool
  3import datetime
  4import pytesseract
  5import cv2
  6
  7from . import constants
  8from .models import PredictedFrame, PredictedSubtitle
  9from .opencv_adapter import Capture
 10
 11
 12class Video:
 13    path: str
 14    lang: str
 15    use_fullframe: bool
 16    num_frames: int
 17    fps: float
 18    height: int
 19    pred_frames: List[PredictedFrame]
 20    pred_subs: List[PredictedSubtitle]
 21
 22    def __init__(self, path: str):
 23        self.path = path
 24        with Capture(path) as v:
 25            self.num_frames = int(v.get(cv2.CAP_PROP_FRAME_COUNT))
 26            self.fps = v.get(cv2.CAP_PROP_FPS)
 27            self.height = int(v.get(cv2.CAP_PROP_FRAME_HEIGHT))
 28
 29    def run_ocr(self, lang: str, time_start: str, time_end: str,
 30                conf_threshold:int, use_fullframe: bool) -> None:
 31        self.lang = lang
 32        self.use_fullframe = use_fullframe
 33
 34        ocr_start = self._frame_index(time_start) if time_start else 0
 35        ocr_end = self._frame_index(time_end) if time_end else self.num_frames
 36
 37        if ocr_end < ocr_start:
 38            raise ValueError('time_start is later than time_end')
 39        num_ocr_frames = ocr_end - ocr_start
 40
 41        # get frames from ocr_start to ocr_end
 42        with Capture(self.path) as v, multiprocessing.Pool() as pool:
 43            v.set(cv2.CAP_PROP_POS_FRAMES, ocr_start)
 44            frames = (v.read()[1] for _ in range(num_ocr_frames))
 45
 46            # perform ocr to frames in parallel
 47            it_ocr = pool.imap(self._image_to_data, frames, chunksize=10)
 48            self.pred_frames = [
 49                PredictedFrame(i + ocr_start, data, conf_threshold) 
 50                for i, data in enumerate(it_ocr)]
 51
 52        v.release()
 53
 54    # convert time str to frame index
 55    def _frame_index(self, time: str) -> int:
 56        t = time.split(':')
 57        t = list(map(float, t))
 58        if len(t) == 3:
 59            td = datetime.timedelta(hours=t[0], minutes=t[1], seconds=t[2])
 60        elif len(t) == 2:
 61            td = datetime.timedelta(minutes=t[0], seconds=t[1])
 62        else:
 63            raise ValueError(
 64                'time data "{}" does not match format "%H:%M:%S"'.format(time))
 65
 66        index = int(td.total_seconds() * self.fps)
 67        if index > self.num_frames or index < 0:
 68            raise ValueError(
 69                'time data "{}" exceeds video duration'.format(time))
 70
 71        return index
 72
 73    def _single_frame_ocr(self, img) -> str:
 74        if not self.use_fullframe:
 75            # only use bottom half of the frame by default
 76            img = img[self.height // 2:, :]
 77        config = '--tessdata-dir "{}"'.format(constants.TESSDATA_DIR)
 78        return pytesseract.image_to_data(img, lang=self.lang, config=config)
 79
 80    def get_subtitles(self, sim_threshold: int) -> str:
 81        self._generate_subtitles(sim_threshold)
 82        return ''.join(
 83            '{}\n{} --> {}\n{}\n\n'.format(
 84                i,
 85                self._srt_timestamp(sub.index_start),
 86                self._srt_timestamp(sub.index_end),
 87                sub.text)
 88            for i, sub in enumerate(self.pred_subs))
 89
 90    def _generate_subtitles(self, sim_threshold: int) -> None:
 91        self.pred_subs = []
 92
 93        if self.pred_frames is None:
 94            raise AttributeError(
 95                'Please call self.run_ocr() first to perform ocr on frames')
 96
 97        # divide ocr of frames into subtitle paragraphs using sliding window
 98        WIN_BOUND = int(self.fps // 2)  # 1/2 sec sliding window boundary
 99        bound = WIN_BOUND
100        i = 0
101        j = 1
102        while j < len(self.pred_frames):
103            fi, fj = self.pred_frames[i], self.pred_frames[j]
104
105            if fi.is_similar_to(fj):
106                bound = WIN_BOUND
107            elif bound > 0:
108                bound -= 1
109            else:
110                # divide subtitle paragraphs
111                para_new = j - WIN_BOUND
112                self._append_sub(PredictedSubtitle(
113                    self.pred_frames[i:para_new], sim_threshold))
114                i = para_new
115                j = i
116                bound = WIN_BOUND
117
118            j += 1
119
120        # also handle the last remaining frames
121        if i < len(self.pred_frames) - 1:
122            self._append_sub(PredictedSubtitle(
123                self.pred_frames[i:], sim_threshold))
124
125    def _append_sub(self, sub: PredictedSubtitle) -> None:
126        if len(sub.text) == 0:
127            return
128
129        # merge new sub to the last subs if they are similar
130        while self.pred_subs and sub.is_similar_to(self.pred_subs[-1]):
131            ls = self.pred_subs[-1]
132            del self.pred_subs[-1]
133            sub = PredictedSubtitle(ls.frames + sub.frames, sub.sim_threshold)
134
135        self.pred_subs.append(sub)
136
137    def _srt_timestamp(self, frame_index: int) -> str:
138        td = datetime.timedelta(seconds=frame_index / self.fps)
139        ms = td.microseconds // 1000
140        m, s = divmod(td.seconds, 60)
141        h, m = divmod(m, 60)
142        return '{:02d}:{:02d}:{:02d},{:03d}'.format(h, m, s, ms)