classVideoMaker:def__init__(self,video_filename,position_mean,orientation_mean,centroids,position_time,video_frame_inds=None,likelihoods=None,processor="matplotlib",frames=None,percent_frames=1,output_video_filename="output.mp4",cm_to_pixels=1.0,disable_progressbar=False,crop=None,batch_size=512,max_workers=256,max_jobs_in_queue=128,debug=False,key_hash=None,*args,**kwargs,):"""Create a video from a set of position data. Uses batch size as frame count for processing steps. All in temp_dir. 1. Extract frames from original video to 'orig_XXXX.png' 2. Multithread pool frames to matplotlib 'plot_XXXX.png' 3. Stitch frames into partial video 'partial_XXXX.mp4' 4. Concatenate partial videos into final video output """ifprocessor!="matplotlib":raiseValueError("open-cv processors are no longer supported. \n"+"Use matplotlib or submit a feature request via GitHub.")# key_hash supports resume from previous runself.temp_dir=Path(temp_dir)/f"dlc_vid_{key_hash}"self.temp_dir.mkdir(parents=True,exist_ok=True)logger.debug(f"Temporary directory: {self.temp_dir}")ifnotPath(video_filename).exists():raiseFileNotFoundError(f"Video not found: {video_filename}")try:position_mean=position_mean["DLC"]orientation_mean=orientation_mean["DLC"]exceptIndexError:pass# trodes data provides bare arraysself.video_filename=video_filenameself.video_frame_inds=video_frame_indsself.position_mean=position_meanself.orientation_mean=orientation_meanself.centroids=centroidsself.likelihoods=likelihoodsself.position_time=position_timeself.percent_frames=percent_framesself.frames=framesself.output_video_filename=output_video_filenameself.cm_to_pixels=cm_to_pixelsself.crop=cropself.window_ind=np.arange(501)-501//2self.debug=debugself.start_time=pd.to_datetime(position_time[0]*1e9,unit="ns")self.dropped_frames=set()self.batch_size=batch_sizeself.max_workers=max_workersself.max_jobs_in_queue=max_jobs_in_queueself.timeout=30iftest_modeelse300self.ffmpeg_log_args=["-hide_banner","-loglevel","error"]self.ffmpeg_fmt_args=["-c:v","libx264","-pix_fmt","yuv420p"]prev_backend=matplotlib.get_backend()matplotlib.use("Agg")# Use non-interactive backend_=self._set_frame_info()_=self._set_plot_bases()logger.info(f"Making video: {self.output_video_filename} "+f"in batches of {self.batch_size}")self.process_frames()plt.close(self.fig)logger.info(f"Finished video: {self.output_video_filename}")logger.debug(f"Dropped frames: {self.dropped_frames}")ifnotdebug:shutil.rmtree(self.temp_dir)# Clean up temp directorymatplotlib.use(prev_backend)# Reset to previous backenddef_set_frame_info(self):"""Set the frame information for the video."""logger.debug("Setting frame information")ret=subprocess.run(["ffprobe","-v","error","-select_streams","v","-show_entries","stream=width,height,r_frame_rate,nb_frames","-of","csv=p=0:s=x",str(self.video_filename),],stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True,)ifret.returncode!=0:raiseValueError(f"Error getting video dimensions: {ret.stderr}")stats=ret.stdout.strip().split("x")self.width,self.height=tuple(map(int,stats[:2]))self.frame_rate=eval(stats[2])self.frame_size=((self.width,self.height)ifnotself.cropelse(self.crop[1]-self.crop[0],self.crop[3]-self.crop[2],))self.ratio=((self.crop[3]-self.crop[2])/(self.crop[1]-self.crop[0])ifself.cropelseself.frame_size[1]/self.frame_size[0])self.fps=int(np.round(self.frame_rate))ifself.framesisNoneandself.video_frame_indsisnotNone:self.n_frames=int(len(self.video_frame_inds)*self.percent_frames)self.frames=np.arange(0,self.n_frames)elifself.framesisnotNone:self.n_frames=len(self.frames)else:self.n_frames=int(stats[3])ifself.debug:# If debugging, limit frames to available dataself.n_frames=min(len(self.position_mean),self.n_frames)self.pad_len=len(str(self.n_frames))def_set_plot_bases(self):"""Create the figure and axes for the video."""logger.debug("Setting plot bases")plt.style.use("dark_background")fig,axes=plt.subplots(2,1,figsize=(8,6),gridspec_kw={"height_ratios":[8,1]},constrained_layout=False,)axes[0].tick_params(colors="white",which="both")axes[0].spines["bottom"].set_color("white")axes[0].spines["left"].set_color("white")self.centroid_plot_objs={bodypart:axes[0].scatter([],[],s=2,zorder=102,color=color,label=f"{bodypart} position",alpha=0.6,)forcolor,bodypartinzip(COLOR_SWATCH,self.centroids.keys())}self.centroid_position_dot=axes[0].scatter([],[],s=5,zorder=102,color="#b045f3",label="centroid position",alpha=0.6,)(self.orientation_line,)=axes[0].plot([],[],color="cyan",linewidth=1,label="Orientation",)axes[0].set_xlabel("")axes[0].set_ylabel("")x_left,x_right=axes[0].get_xlim()y_low,y_high=axes[0].get_ylim()axes[0].set_aspect(abs((x_right-x_left)/(y_low-y_high))*self.ratio)axes[0].spines["top"].set_color("black")axes[0].spines["right"].set_color("black")time_delta=pd.Timedelta(self.position_time[0]-self.position_time[-1]).total_seconds()# TODO: Update legend location based on centroid positionaxes[0].legend(loc="lower right",fontsize=4)self.title=axes[0].set_title(f"time = {time_delta:3.4f}s\n frame = {0}",fontsize=8,)axes[0].axis("off")ifself.likelihoods:self.likelihood_objs={bodypart:axes[1].plot([],[],color=color,linewidth=1,clip_on=False,label=bodypart,)[0]forcolor,bodypartinzip(COLOR_SWATCH,self.likelihoods.keys())}axes[1].set_ylim((0.0,1))axes[1].set_xlim((self.window_ind[0]/self.frame_rate,self.window_ind[-1]/self.frame_rate,))axes[1].set_xlabel("Time [s]")axes[1].set_ylabel("Likelihood")axes[1].set_facecolor("black")axes[1].spines["top"].set_color("black")axes[1].spines["right"].set_color("black")axes[1].legend(loc="upper right",fontsize=4)self.fig=figself.axes=axesdef_get_centroid_data(self,pos_ind):defcentroid_to_px(*idx):return_to_px(data=self.position_mean[idx],cm_to_pixels=self.cm_to_pixels)ifnotself.crop:returncentroid_to_px(pos_ind)returnnp.hstack((centroid_to_px((pos_ind,0,np.newaxis))-self.crop_offset_x,centroid_to_px((pos_ind,1,np.newaxis))-self.crop_offset_y,))def_get_orient_line(self,pos_ind):orient=self.orientation_mean[pos_ind]ifisinstance(orient,np.ndarray):orient=orient[0]# Trodes passes orientation as a 1D arraydeforient_list(c,axis="x"):func=np.cosifaxis=="x"elsenp.sinreturn[c,c+30*func(orient)]ifnp.all(np.isnan(orient)):return([np.NaN],[np.NaN])else:x,y=self._get_centroid_data(pos_ind)return(orient_list(x),orient_list(y,axis="y"))def_generate_single_frame(self,frame_ind):"""Generate a single frame and save it as an image."""# Zero-padded filename based on the dynamic padding lengthpadded=self._pad(frame_ind)frame_out_path=self.temp_dir/f"plot_{padded}.png"ifframe_out_path.exists()andnotself.debug:returnframe_ind# Skip if frame already existsframe_file=self.temp_dir/f"orig_{padded}.png"ifnotframe_file.exists():# Skip if input frame not foundself.dropped_frames.add(frame_ind)self._debug_print(f"Frame not found: {frame_file}",end="")returnframe=plt.imread(frame_file)_=self.axes[0].imshow(frame)pos_ind=np.where(self.video_frame_inds==frame_ind)[0]iflen(pos_ind)==0:self.centroid_position_dot.set_offsets((np.NaN,np.NaN))forbodypartinself.centroid_plot_objs.keys():self.centroid_plot_objs[bodypart].set_offsets((np.NaN,np.NaN))empty_array=np.array([],dtype=float)self.orientation_line.set_data(empty_array,empty_array)self.title.set_text(f"time = {0:3.4f}s\n frame = {frame_ind}")self.fig.savefig(frame_out_path,dpi=400)plt.cla()# clear the current axesreturnframe_indpos_ind=pos_ind[0]likelihood_inds=pos_ind+self.window_indneg_inds=np.where(likelihood_inds<0)[0]likelihood_inds[neg_inds]=0iflen(neg_inds)>0else-1dlc_centroid_data=self._get_centroid_data(pos_ind)forbodypartinself.centroid_plot_objs:self.centroid_plot_objs[bodypart].set_offsets(_to_px(data=self.centroids[bodypart][pos_ind],cm_to_pixels=self.cm_to_pixels,))self.centroid_position_dot.set_offsets(dlc_centroid_data)self.orientation_line.set_data(self._get_orient_line(pos_ind))time_delta=pd.Timedelta(pd.to_datetime(self.position_time[pos_ind]*1e9,unit="ns")-self.start_time).total_seconds()self.title.set_text(f"time = {time_delta:3.4f}s\n frame = {frame_ind}")ifself.likelihoods:forbodypartinself.likelihood_objs.keys():self.likelihood_objs[bodypart].set_data(self.window_ind/self.frame_rate,np.asarray(self.likelihoods[bodypart][likelihood_inds]),)self.fig.savefig(frame_out_path,dpi=400)plt.cla()# clear the current axesreturnframe_inddefprocess_frames(self):"""Process video frames in batches and generate matplotlib frames."""progress_bar=tqdm(leave=True,position=0,disable=self.debug)progress_bar.reset(total=self.n_frames)forstart_frameinrange(0,self.n_frames,self.batch_size):ifstart_frame>=self.n_frames:# Skip if no frames leftbreakend_frame=min(start_frame+self.batch_size,self.n_frames)-1logger.debug(f"Processing frames: {start_frame} - {end_frame}")output_partial_video=(self.temp_dir/f"partial_{self._pad(start_frame)}.mp4")ifoutput_partial_video.exists():logger.debug(f"Skipping existing video: {output_partial_video}")progress_bar.update(end_frame-start_frame)continueself.ffmpeg_extract(start_frame,end_frame)self.plot_frames(start_frame,end_frame,progress_bar)self.ffmpeg_stitch_partial(start_frame,str(output_partial_video))forframe_fileinself.temp_dir.glob("*.png"):frame_file.unlink()# Delete orig and plot framesprogress_bar.close()logger.info("Concatenating partial videos")self.concat_partial_videos()def_debug_print(self,msg=" ",end=""):"""Print a self-overwiting message if debug is enabled."""ifself.debug:print(f"\r{msg}",end=end)defplot_frames(self,start_frame,end_frame,progress_bar=None,process_pool=True):logger.debug(f"Plotting frames: {start_frame} - {end_frame}")ifnotprocess_pool:# Single-threaded processing for debuggingforframe_indinrange(start_frame,end_frame):self._generate_single_frame(frame_ind)progress_bar.update()returnwithProcessPoolExecutor(max_workers=self.max_workers)asexecutor:jobs={}# dict of jobsframes_left=end_frame-start_frameframes_iter=iter(range(start_frame,end_frame))whileframes_left:whilelen(jobs)<self.max_jobs_in_queue:try:this_frame=next(frames_iter)self._debug_print(f"Submit: {self._pad(this_frame)}")job=executor.submit(self._generate_single_frame,this_frame)jobs[job]=this_frameexceptStopIteration:break# No more frames to submitforjobinas_completed(jobs):frames_left-=1try:ret=job.result(timeout=self.timeout)except(IndexError,TimeoutError)ase:ret=type(e).__name__self._debug_print(f"Finish: {self._pad(ret)}")progress_bar.update()deljobs[job]self._debug_print(msg="",end="\n")defffmpeg_extract(self,start_frame,end_frame):"""Use ffmpeg to extract a batch of frames."""logger.debug(f"Extracting frames: {start_frame} - {end_frame}")last_frame=self.temp_dir/f"orig_{self._pad(end_frame)}.png"iflast_frame.exists():# assumes all frames previously extractedlogger.debug(f"Skipping existing frames: {last_frame}")returnoutput_pattern=str(self.temp_dir/f"orig_%0{self.pad_len}d.png")# Use ffmpeg to extract framesffmpeg_cmd=["ffmpeg","-n",# no overwrite"-i",self.video_filename,"-vf",f"select=between(n\\,{start_frame}\\,{end_frame})","-vsync","vfr","-start_number",str(start_frame),output_pattern,*self.ffmpeg_log_args,]ret=subprocess.run(ffmpeg_cmd,stderr=subprocess.PIPE)extracted=len(list(self.temp_dir.glob("orig_*.png")))logger.debug(f"Extracted frames: {start_frame}, len: {extracted}")ifextracted<self.batch_size-1:logger.warning(f"Could not extract frames: {extracted} / {self.batch_size-1}")one_err="\n".join(str(ret.stderr).split("\\")[-3:-1])logger.debug(f"\nExtract Error: {one_err}")def_pad(self,frame_ind=None):"""Pad a frame index with leading zeros."""ifframe_indisNone:return"?"*self.pad_lenelifnotisinstance(frame_ind,int):returnframe_indreturnf"{frame_ind:0{self.pad_len}d}"defffmpeg_stitch_partial(self,start_frame,output_partial_video):"""Stitch a partial movie from processed frames."""logger.debug(f"Stitch part vid : {start_frame}")frame_pattern=str(self.temp_dir/f"plot_%0{self.pad_len}d.png")ffmpeg_cmd=["ffmpeg","-y",# overwrite"-r",str(self.fps),"-start_number",str(start_frame),"-i",frame_pattern,*self.ffmpeg_fmt_args,output_partial_video,*self.ffmpeg_log_args,]try:ret=subprocess.run(ffmpeg_cmd,stderr=subprocess.PIPE,stdout=subprocess.PIPE,check=True,text=True,)exceptsubprocess.CalledProcessErrorase:logger.error(f"Error stitching partial video: {e.stderr}")logger.debug(f"stderr: {ret.stderr}")defconcat_partial_videos(self):"""Concatenate all the partial videos into one final video."""partial_vids=sorted(self.temp_dir.glob("partial_*.mp4"))logger.debug(f"Concat part vids: {len(partial_vids)}")concat_list_path=self.temp_dir/"concat_list.txt"withopen(concat_list_path,"w")asf:forpartial_videoinpartial_vids:f.write(f"file '{partial_video}'\n")ffmpeg_cmd=["ffmpeg","-y",# overwrite"-f","concat","-safe","0","-i",str(concat_list_path),*self.ffmpeg_fmt_args,str(self.output_video_filename),*self.ffmpeg_log_args,]try:ret=subprocess.run(ffmpeg_cmd,stderr=subprocess.PIPE,stdout=subprocess.PIPE,text=True,check=True,)exceptsubprocess.CalledProcessErrorase:logger.error(f"Error stitching partial video: {e.stderr}")logger.debug(f"stderr: {ret.stderr}")
Uses batch size as frame count for processing steps. All in temp_dir.
1. Extract frames from original video to 'orig_XXXX.png'
2. Multithread pool frames to matplotlib 'plot_XXXX.png'
3. Stitch frames into partial video 'partial_XXXX.mp4'
4. Concatenate partial videos into final video output
Source code in src/spyglass/position/v1/dlc_utils_makevid.py
def__init__(self,video_filename,position_mean,orientation_mean,centroids,position_time,video_frame_inds=None,likelihoods=None,processor="matplotlib",frames=None,percent_frames=1,output_video_filename="output.mp4",cm_to_pixels=1.0,disable_progressbar=False,crop=None,batch_size=512,max_workers=256,max_jobs_in_queue=128,debug=False,key_hash=None,*args,**kwargs,):"""Create a video from a set of position data. Uses batch size as frame count for processing steps. All in temp_dir. 1. Extract frames from original video to 'orig_XXXX.png' 2. Multithread pool frames to matplotlib 'plot_XXXX.png' 3. Stitch frames into partial video 'partial_XXXX.mp4' 4. Concatenate partial videos into final video output """ifprocessor!="matplotlib":raiseValueError("open-cv processors are no longer supported. \n"+"Use matplotlib or submit a feature request via GitHub.")# key_hash supports resume from previous runself.temp_dir=Path(temp_dir)/f"dlc_vid_{key_hash}"self.temp_dir.mkdir(parents=True,exist_ok=True)logger.debug(f"Temporary directory: {self.temp_dir}")ifnotPath(video_filename).exists():raiseFileNotFoundError(f"Video not found: {video_filename}")try:position_mean=position_mean["DLC"]orientation_mean=orientation_mean["DLC"]exceptIndexError:pass# trodes data provides bare arraysself.video_filename=video_filenameself.video_frame_inds=video_frame_indsself.position_mean=position_meanself.orientation_mean=orientation_meanself.centroids=centroidsself.likelihoods=likelihoodsself.position_time=position_timeself.percent_frames=percent_framesself.frames=framesself.output_video_filename=output_video_filenameself.cm_to_pixels=cm_to_pixelsself.crop=cropself.window_ind=np.arange(501)-501//2self.debug=debugself.start_time=pd.to_datetime(position_time[0]*1e9,unit="ns")self.dropped_frames=set()self.batch_size=batch_sizeself.max_workers=max_workersself.max_jobs_in_queue=max_jobs_in_queueself.timeout=30iftest_modeelse300self.ffmpeg_log_args=["-hide_banner","-loglevel","error"]self.ffmpeg_fmt_args=["-c:v","libx264","-pix_fmt","yuv420p"]prev_backend=matplotlib.get_backend()matplotlib.use("Agg")# Use non-interactive backend_=self._set_frame_info()_=self._set_plot_bases()logger.info(f"Making video: {self.output_video_filename} "+f"in batches of {self.batch_size}")self.process_frames()plt.close(self.fig)logger.info(f"Finished video: {self.output_video_filename}")logger.debug(f"Dropped frames: {self.dropped_frames}")ifnotdebug:shutil.rmtree(self.temp_dir)# Clean up temp directorymatplotlib.use(prev_backend)# Reset to previous backend
defprocess_frames(self):"""Process video frames in batches and generate matplotlib frames."""progress_bar=tqdm(leave=True,position=0,disable=self.debug)progress_bar.reset(total=self.n_frames)forstart_frameinrange(0,self.n_frames,self.batch_size):ifstart_frame>=self.n_frames:# Skip if no frames leftbreakend_frame=min(start_frame+self.batch_size,self.n_frames)-1logger.debug(f"Processing frames: {start_frame} - {end_frame}")output_partial_video=(self.temp_dir/f"partial_{self._pad(start_frame)}.mp4")ifoutput_partial_video.exists():logger.debug(f"Skipping existing video: {output_partial_video}")progress_bar.update(end_frame-start_frame)continueself.ffmpeg_extract(start_frame,end_frame)self.plot_frames(start_frame,end_frame,progress_bar)self.ffmpeg_stitch_partial(start_frame,str(output_partial_video))forframe_fileinself.temp_dir.glob("*.png"):frame_file.unlink()# Delete orig and plot framesprogress_bar.close()logger.info("Concatenating partial videos")self.concat_partial_videos()
defffmpeg_extract(self,start_frame,end_frame):"""Use ffmpeg to extract a batch of frames."""logger.debug(f"Extracting frames: {start_frame} - {end_frame}")last_frame=self.temp_dir/f"orig_{self._pad(end_frame)}.png"iflast_frame.exists():# assumes all frames previously extractedlogger.debug(f"Skipping existing frames: {last_frame}")returnoutput_pattern=str(self.temp_dir/f"orig_%0{self.pad_len}d.png")# Use ffmpeg to extract framesffmpeg_cmd=["ffmpeg","-n",# no overwrite"-i",self.video_filename,"-vf",f"select=between(n\\,{start_frame}\\,{end_frame})","-vsync","vfr","-start_number",str(start_frame),output_pattern,*self.ffmpeg_log_args,]ret=subprocess.run(ffmpeg_cmd,stderr=subprocess.PIPE)extracted=len(list(self.temp_dir.glob("orig_*.png")))logger.debug(f"Extracted frames: {start_frame}, len: {extracted}")ifextracted<self.batch_size-1:logger.warning(f"Could not extract frames: {extracted} / {self.batch_size-1}")one_err="\n".join(str(ret.stderr).split("\\")[-3:-1])logger.debug(f"\nExtract Error: {one_err}")
defffmpeg_stitch_partial(self,start_frame,output_partial_video):"""Stitch a partial movie from processed frames."""logger.debug(f"Stitch part vid : {start_frame}")frame_pattern=str(self.temp_dir/f"plot_%0{self.pad_len}d.png")ffmpeg_cmd=["ffmpeg","-y",# overwrite"-r",str(self.fps),"-start_number",str(start_frame),"-i",frame_pattern,*self.ffmpeg_fmt_args,output_partial_video,*self.ffmpeg_log_args,]try:ret=subprocess.run(ffmpeg_cmd,stderr=subprocess.PIPE,stdout=subprocess.PIPE,check=True,text=True,)exceptsubprocess.CalledProcessErrorase:logger.error(f"Error stitching partial video: {e.stderr}")logger.debug(f"stderr: {ret.stderr}")
defconcat_partial_videos(self):"""Concatenate all the partial videos into one final video."""partial_vids=sorted(self.temp_dir.glob("partial_*.mp4"))logger.debug(f"Concat part vids: {len(partial_vids)}")concat_list_path=self.temp_dir/"concat_list.txt"withopen(concat_list_path,"w")asf:forpartial_videoinpartial_vids:f.write(f"file '{partial_video}'\n")ffmpeg_cmd=["ffmpeg","-y",# overwrite"-f","concat","-safe","0","-i",str(concat_list_path),*self.ffmpeg_fmt_args,str(self.output_video_filename),*self.ffmpeg_log_args,]try:ret=subprocess.run(ffmpeg_cmd,stderr=subprocess.PIPE,stdout=subprocess.PIPE,text=True,check=True,)exceptsubprocess.CalledProcessErrorase:logger.error(f"Error stitching partial video: {e.stderr}")logger.debug(f"stderr: {ret.stderr}")