Source code for xrfeitoria.renderer.renderer_blender

import json
import os
import re
import shutil
import subprocess
import threading
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional, Tuple, Union

from loguru import logger

from .. import _tls
from ..actor.actor_blender import ActorBlender
from ..camera.camera_blender import CameraBlender
from ..data_structure.constants import (
    ImageFileFormatEnum,
    PathLike,
    RenderEngineEnumBlender,
    RenderOutputEnumBlender,
    actor_info_type,
    tmp_dir,
)
from ..rpc import remote_blender
from ..utils.functions import blender_functions
from .renderer_base import RendererBase, render_status

DISABLE_SPINNER = os.environ.get('XRFEITORIA__DISABLE_SPINNER', '').lower() in ('true', '1', 'yes')
LOG_INTERVAL = int(os.environ.get('XRFEITORIA__LOG_INTERVAL', '10'))

try:
    # linting and for engine
    from XRFeitoriaBpy.core.factory import XRFeitoriaBlenderFactory  # defined in src/XRFeitoriaBpy/core/factory.py
except ModuleNotFoundError:
    pass


try:
    from ..data_structure.models import RenderJobBlender, RenderPass  # isort:skip
except (ImportError, ModuleNotFoundError):
    pass


def receive_stdout(
    process: subprocess.Popen,
    in_background: bool,
    frame_range: Tuple[int, int],
    job_idx: Optional[int] = None,
    refresh_per_second: int = 1,
    disable_spinner: bool = DISABLE_SPINNER,
    log_interval: int = LOG_INTERVAL,
) -> None:
    """Receive output from the subprocess, and update the spinner.

    Args:
        process (subprocess.Popen): Subprocess of the blender process.
        frame_range (Tuple[int, int]): Frame range of the rendering job.
        job_range (Tuple[int, int]): Job range of all jobs in render queue.
        disable_spinner (bool, optional): If True, the spinner will be disabled. Defaults to environment variable XRFEITORIA__DISABLE_SPINNER (False).
        log_interval (int, optional): Log progress every N frames. Defaults to environment variable XRFEITORIA__LOG_INTERVAL (10).
    """
    from rich import get_console
    from rich.live import Live
    from rich.spinner import Spinner

    frame_count = 0
    frame_current = frame_range[0] - 1
    frame_length = frame_range[1] - frame_range[0] + 1
    job_info = f' {job_idx}' if job_idx else ''

    console = get_console()
    spinner = None
    live = None

    if not disable_spinner:
        # TODO: change to progress bar
        try:
            spinner: Spinner = console._live.renderable
            # Update the refresh rate of existing live display
            console._live.refresh_per_second = refresh_per_second
        except AttributeError:
            # Create new spinner with custom refresh rate
            spinner = Spinner('dots', '[bold green]:rocket: Rendering...[/bold green]')
            live = Live(spinner, refresh_per_second=refresh_per_second)
            live.start()
            spinner = live.renderable
    else:
        logger.warning(f'Spinner disabled, instead logging every {log_interval} frames')

    # init
    __frame_current__ = frame_current
    first_trigger = second_trigger = False
    # start receiving
    while True:
        try:
            data = process.stdout.readline().decode()
        except AttributeError:
            break

        if not data:
            break

        if in_background:
            # Fra:{idx}
            matched_frame_info = re.match(r'.*Fra:(\d+).*', data)
            if matched_frame_info:
                first_trigger = True
                __frame_current__ = int(matched_frame_info.group(1))
            if frame_current != __frame_current__:
                second_trigger = True
            if first_trigger and second_trigger:
                frame_current = __frame_current__
                frame_count += 1
                # XXX: can't leave process early,
                # must have on process reading stdout,
                # otherwise the process will hang
                if frame_count > frame_length:
                    break
                text = f'[bold green]:rocket: Rendering Job{job_info}: frame {frame_count}/{frame_length}[/bold green]'
                if spinner:
                    spinner.update(text=text)
                if disable_spinner and (frame_count % log_interval == 0 or frame_count == frame_length):
                    logger.info(text)
                else:
                    logger.debug(text)
                # reset
                first_trigger = second_trigger = False
        else:
            # Saved: ...
            matched_save_info = re.match(re.compile(r'.*Saved: .*', flags=re.DOTALL), data)
            #   Time: ...
            matched_time_info = re.match(re.compile(r'.*Time: .*', flags=re.DOTALL), data)
            matched_all_info = re.match(re.compile(r'.*Saved: .*Time: .*', flags=re.DOTALL), data)
            if matched_save_info:
                first_trigger = True
            if matched_time_info:
                second_trigger = True
            if matched_all_info:
                first_trigger = second_trigger = True

            if first_trigger and second_trigger:
                frame_count += 1
                if frame_count > frame_length:
                    break
                text = f'[bold green]:rocket: Rendering Job{job_info}: frame {frame_count}/{frame_length}[/bold green]'
                if spinner:
                    spinner.update(text=text)
                # Always log debug
                logger.debug(f'(XF-Rendering) Job{job_info}: frame {frame_count}/{frame_length}')
                # When spinner is disabled, also log info at intervals
                if disable_spinner and (frame_count % log_interval == 0 or frame_count == frame_length):
                    logger.info(f'(XF-Rendering) Job{job_info}: frame {frame_count}/{frame_length}')
                # reset
                first_trigger = second_trigger = False


[docs] @remote_blender(dec_class=True, suffix='_in_engine') class RendererBlender(RendererBase): """Renderer class for Blender.""" render_queue: 'List[RenderJobBlender]' = []
[docs] @classmethod def add_job( cls, sequence_name: str, output_path: PathLike, resolution: Tuple[int, int], render_passes: 'List[RenderPass]', render_engine: Union[RenderEngineEnumBlender, Literal['cycles', 'eevee', 'workbench']] = 'cycles', render_samples: int = 128, transparent_background: bool = False, arrange_file_structure: bool = True, ) -> None: """Add a rendering job with specific settings. Args: output_path (PathLike): Output path of the rendered images. resolution (Tuple[int, int]): Resolution of the rendered images. render_passes (List[RenderPass]): Render passes. scene_name (str, optional): Name of the scene be rendered. Defaults to 'XRFeitoria'. render_engine (Union[RenderEngineEnumBlender, Literal['cycles', 'eevee', 'workbench']], optional): Render engine. Defaults to cycles. render_samples (int, optional): Render samples. Defaults to 128. transparent_background (bool, optional): Transparent background. Defaults to False. arrange_file_structure (bool, optional): Arrange output images to every camera's folder. Defaults to True. """ if not isinstance(render_passes, list) or len(render_passes) == 0: raise ValueError('render_passes must be a list of RenderPass') if not isinstance(render_engine, RenderEngineEnumBlender): render_engine = RenderEngineEnumBlender.get(render_engine.lower()) job = RenderJobBlender( sequence_name=sequence_name, output_path=Path(output_path).resolve() / sequence_name, resolution=resolution, render_passes=render_passes, render_engine=render_engine, render_samples=render_samples, transparent_background=transparent_background, arrange_file_structure=arrange_file_structure, ) cls.render_queue.append(job)
[docs] @classmethod @render_status def render_jobs( cls, use_gpu: bool = True, disable_spinner: bool = DISABLE_SPINNER, log_interval: int = LOG_INTERVAL, ) -> None: """Render all jobs in the render queue, and this method will clear the render queue after rendering. Args: use_gpu (bool, optional): Use GPU to render. Defaults to True. disable_spinner (bool, optional): If True, the spinner will be disabled. Defaults to environment variable XRFEITORIA__DISABLE_SPINNER or False. The Spinner is a progress bar to show the rendering progress in the console. If you are using a container, you may want to set this to True. You can also set this via environment variable XRFEITORIA__DISABLE_SPINNER=true. log_interval (int, optional): Log progress every N frames. Defaults to environment variable XRFEITORIA__LOG_INTERVAL or 10. You can also set this via environment variable XRFEITORIA__LOG_INTERVAL=20. """ if len(cls.render_queue) == 0: logger.warning( '[bold red]Skip rendering[/bold red], no render job in the queue. \n' ':bulb: Please add a job first via: \n' '>>> with xf_runner.sequence(sequence_name=...) as seq:\n' '... seq.add_to_renderer(...)' ) return process: subprocess.Popen = _tls.cache.get('engine_process') in_background = blender_functions.is_background_mode() if use_gpu: blender_functions.enable_gpu() # render for job_idx, job in enumerate(cls.render_queue): if len(job.render_passes) == 0: raise RuntimeError('No render pass in the render job') logger.info( f'Job rendering {job_idx + 1}/{len(cls.render_queue)}: ' f'seq_name="{job.sequence_name}", saving to "{job.output_path.as_posix()}"' ) job.output_path.mkdir(exist_ok=True, parents=True) # set tmp path tmp_render_path = (tmp_dir / 'blender_render_tmp').as_posix() + '/' # set renderer active_cameras = cls._set_renderer_in_engine( job=job.model_dump(mode='json'), tmp_render_path=tmp_render_path, ) # export actor infos actor_infos: List[actor_info_type] = [] for actor_name in blender_functions.get_all_object_in_current_level(obj_type='MESH'): actor = ActorBlender(actor_name) mask_color = actor.mask_color actor_infos.append({'actor_name': actor_name, 'mask_color': mask_color}) with (job.output_path / RenderOutputEnumBlender.actor_infos.value).open('w') as f: json.dump(actor_infos, f, indent=4) # start a thread to receive stdout output_thread = threading.Thread( target=receive_stdout, kwargs={ 'process': process, 'in_background': in_background, 'frame_range': blender_functions.get_frame_range(), 'job_idx': job_idx + 1, 'disable_spinner': disable_spinner, 'log_interval': log_interval, }, ) output_thread.start() # render cls._render_in_engine() # delete tmp path folder shutil.rmtree(tmp_render_path) # ------ post-processing ------ # # export camera parameters for camera_name in active_cameras: camera = CameraBlender(name=camera_name) camera.dump_params( output_path=job.output_path / RenderOutputEnumBlender.camera_params.value / f'{camera_name}.json' ) # arrange output if job.arrange_file_structure: logger.debug(f'Arranging outputs for {job.output_path}') cls._arrange_output(job.render_passes, job.output_path, active_cameras) # clear render queue cls.clear()
@classmethod def _arrange_output(cls, render_passes: 'List[RenderPass]', output_path: PathLike, camera_names: List[str]) -> None: """Arrange output images to every camera's folder. Args: render_passes (List[RenderPass): Render passes of the render job. output_path (PathLike): Output path of the render job. camera_names (List[str]): all active camera names in the rendered scene. """ import glob output_path = Path(output_path) for render_pass in render_passes: pass_name = RenderOutputEnumBlender.get(render_pass.render_layer).name pass_ext = ImageFileFormatEnum.get(render_pass.image_format).name if len(camera_names) == 0: idx = f'{0:04d}' camera_names = [f.name.replace(idx, '') for f in output_path.glob(f'{idx}*.{pass_ext}')] pass_dir = output_path / pass_name for camera_name in camera_names: camera_pass_dir = pass_dir / camera_name camera_pass_dir.mkdir(exist_ok=True, parents=True) camera_name_escape = glob.escape(camera_name) camera_pass_files = pass_dir.glob(f'*{camera_name_escape}.*') for camera_pass_file in camera_pass_files: new_camera_pass_file = camera_pass_dir / camera_pass_file.name.replace(camera_name, '') if new_camera_pass_file.exists(): new_camera_pass_file.unlink() camera_pass_file.rename(new_camera_pass_file) ##################################### ###### RPC METHODS (Private) ######## ##################################### @staticmethod def _set_renderer_in_engine(job: 'Dict[str, Any]', tmp_render_path: str) -> None: """Set renderer in Blender. Args: job (Dict[str, Any]): Rendering job. Raises: RuntimeError: If no active camera in the scene. """ scene = XRFeitoriaBlenderFactory.open_sequence(seq_name=job['sequence_name']) # clear compositing nodes if scene.node_tree: scene.node_tree.nodes.clear() # get active cameras in the scene active_cameras = XRFeitoriaBlenderFactory.get_active_cameras(scene=scene) if len(active_cameras) == 0: raise RuntimeError(f'Cannot render, no active camera in "{scene.name}"') # set renderer XRFeitoriaBlenderFactory.set_render_engine(engine=job['render_engine'], scene=scene) XRFeitoriaBlenderFactory.add_render_passes( output_path=job['output_path'], render_passes=job['render_passes'], scene=scene, ) XRFeitoriaBlenderFactory.set_background_transparent(transparent=job['transparent_background'], scene=scene) XRFeitoriaBlenderFactory.set_render_samples(render_samples=job['render_samples'], scene=scene) XRFeitoriaBlenderFactory.set_resolution(resolution=job['resolution'], scene=scene) # set tmp path scene.render.filepath = tmp_render_path return active_cameras @staticmethod def _render_in_engine() -> 'List[str]': """Render a rendering job. Args: render_job (Dict[str, Any]): Rendering job. """ # render XRFeitoriaBlenderFactory.render() # close sequence XRFeitoriaBlenderFactory.close_sequence() @staticmethod def _clear_queue_in_engine(): # nothing to do pass