mirror of https://github.com/H-uru/korman.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
294 lines
11 KiB
294 lines
11 KiB
# This file is part of Korman. |
# |
# Korman is free software: you can redistribute it and/or modify |
# it under the terms of the GNU General Public License as published by |
# the Free Software Foundation, either version 3 of the License, or |
# (at your option) any later version. |
# |
# Korman is distributed in the hope that it will be useful, |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
# GNU General Public License for more details. |
# |
# You should have received a copy of the GNU General Public License |
# along with Korman. If not, see <http://www.gnu.org/licenses/>. |
from ..korlib import ConsoleToggler |
from pathlib import Path |
import threading |
import time |
class _ExportLogger: |
def __init__(self, print_logs, age_path=None): |
self._porting = [] |
self._warnings = [] |
self._age_path = Path(age_path) if age_path is not None else None |
self._file = None |
self._print_logs = print_logs |
self._time_start_overall = 0 |
def __enter__(self): |
assert self._age_path is not None |
# Make the log file name from the age file path -- this ensures we're not trying to write |
# the log file to the same directory Blender.exe is in, which might be a permission error |
my_path = self._age_path.with_name("{}_export".format(self._age_path.stem)).with_suffix(".log") |
self._file = open(str(my_path), "w") |
return self |
def __exit__(self, type, value, traceback): |
if value is not None: |
ConsoleToggler().keep_console = True |
self._file.close() |
return False |
def msg(self, *args, **kwargs): |
assert args |
if self._file is not None: |
indent = kwargs.get("indent", 0) |
msg = "{}{}".format(" " * indent, args[0]) |
if len(args) > 1: |
msg = msg.format(*args[1:], **kwargs) |
self._file.writelines((msg, "\n")) |
if self._print_logs: |
print(msg) |
def port(self, *args, **kwargs): |
assert args |
indent = kwargs.get("indent", 0) |
msg = "{}PORTING: {}".format(" " * indent, args[0]) |
if len(args) > 1: |
msg = msg.format(*args[1:], **kwargs) |
if self._file is not None: |
self._file.writelines((msg, "\n")) |
if self._print_logs: |
print(msg) |
self._porting.append(args[0]) |
def progress_add_step(self, name): |
pass |
def progress_advance(self): |
pass |
def progress_complete_step(self): |
pass |
def progress_end(self): |
if self._age_path is not None: |
export_time = time.perf_counter() - self._time_start_overall |
self.msg("\nExported '{}' in {:.2f}s", self._age_path.name, export_time) |
# Ensure the got dawg thread goes good-bye |
self._thread.join(timeout=5.0) |
assert not self._thread.is_alive() |
def progress_increment(self): |
pass |
def progress_start(self, action): |
if self._age_path is not None: |
self.msg("Exporting '{}'", self._age_path.name) |
self._time_start_overall = time.perf_counter() |
def save(self): |
# TODO |
pass |
def warn(self, *args, **kwargs): |
assert args |
indent = kwargs.get("indent", 0) |
msg = "{}WARNING: {}".format(" " * indent, args[0]) |
if len(args) > 1: |
msg = msg.format(*args[1:], **kwargs) |
if self._file is not None: |
self._file.writelines((msg, "\n")) |
if self._print_logs: |
print(msg) |
self._warnings.append(args[0]) |
class ExportProgressLogger(_ExportLogger): |
def __init__(self, age_path=None): |
super().__init__(False, age_path) |
# Long running operations like the Blender bake_image call make it seem like we've hung |
# because it is difficult to inspect the progress of Blender's internal operators. The best |
# solution here is to move printing into a thread that can detect long-running ops and display |
# something visible such as a moving elipsis |
self._thread = threading.Thread(target=self._progress_thread) |
self._queued_lines = [] |
self._print_condition = threading.Condition() |
self._volatile_line = "" |
# Progress manager |
self._progress_alive = False |
self._progress_steps = [] |
self._step_id = -1 |
self._step_max = 0 |
self._step_progress = 0 |
self._time_start_step = 0 |
def __exit__(self, type, value, traceback): |
if value is not None: |
export_time = time.perf_counter() - self._time_start_overall |
with self._print_condition: |
self._progress_print_step(done=(self._step_progress == self._step_max), error=True) |
self._progress_print_line("\nABORTED AFTER {:.2f}s".format(export_time)) |
self._progress_print_heading("ERROR") |
self._progress_print_line(str(value)) |
self._progress_print_heading() |
self._progress_alive = False |
return super().__exit__(type, value, traceback) |
def progress_add_step(self, name): |
assert self._step_id == -1 |
self._progress_steps.append(name) |
def progress_advance(self): |
"""Advances the progress bar to the next step""" |
if self._step_id != -1: |
self._progress_print_step(done=True) |
assert self._step_id < len(self._progress_steps) |
self._step_id += 1 |
self._step_max = 0 |
self._step_progress = 0 |
self._time_start_step = time.perf_counter() |
self._progress_print_step() |
def progress_complete_step(self): |
"""Manually completes the current step""" |
assert self._step_id != -1 |
self._progress_print_step(done=True) |
def progress_end(self): |
self._progress_print_step(done=True) |
assert self._step_id+1 == len(self._progress_steps) |
export_time = time.perf_counter() - self._time_start_overall |
with self._print_condition: |
if self._age_path is not None: |
self.msg("\nExported '{}' in {:.2f}s", self._age_path.name, export_time) |
self._progress_print_line("\nEXPORTED '{}' IN {:.2f}s".format(self._age_path.name, export_time)) |
else: |
self._progress_print_line("\nCOMPLETED IN {:.2f}s".format(export_time)) |
self._progress_print_heading() |
self._progress_print_line("") |
self._progress_alive = False |
# Ensure the got dawg thread goes good-bye |
self._thread.join(timeout=5.0) |
assert not self._thread.is_alive() |
def progress_increment(self): |
"""Increments the progress of the current step""" |
assert self._step_id != -1 |
self._step_progress += 1 |
if self._step_max != 0: |
self._progress_print_step() |
def _progress_print_line(self, line): |
with self._print_condition: |
self._queued_lines.append(line) |
self._print_condition.notify() |
def _progress_print_volatile(self, line): |
with self._print_condition: |
self._volatile_line = line |
self._print_condition.notify() |
def _progress_print_heading(self, text=None): |
if text: |
num_chars = len(text) |
border = "-" * int((_HEADING_SIZE - (num_chars + 2)) / 2) |
pad = " " if num_chars % 2 == 1 else "" |
line = "{border} {pad}{text} {border}".format(border=border, pad=pad, text=text) |
self._progress_print_line(line) |
else: |
self._progress_print_line("-" * _HEADING_SIZE) |
def _progress_print_step(self, done=False, error=False): |
with self._print_condition: |
if done: |
stage = "DONE IN {:.2f}s".format(time.perf_counter() - self._time_start_step) |
print_func = self._progress_print_line |
self._progress_print_volatile("") |
else: |
if self._step_max != 0 and self._step_progress != 0: |
stage = "{} of {}".format(self._step_progress, self._step_max) |
else: |
stage = "" |
print_func = self._progress_print_line if error else self._progress_print_volatile |
line = "{}\t(step {}/{}): {}".format(self._progress_steps[self._step_id], self._step_id+1, |
len(self._progress_steps), stage) |
print_func(line) |
def _progress_get_max(self): |
return self._step_max |
def _progress_set_max(self, value): |
assert self._step_id != -1 |
self._step_max = value |
self._progress_print_step() |
progress_range = property(_progress_get_max, _progress_set_max) |
def progress_start(self, action): |
super().progress_start(action) |
self._progress_print_heading("Korman") |
self._progress_print_heading(action) |
self._progress_alive = True |
self._thread.start() |
def _progress_thread(self): |
num_dots = 0 |
while self._progress_alive: |
with self._print_condition: |
signalled = self._print_condition.wait(timeout=1.0) |
print(end='\r') |
# First, we need to print out any queued whole lines. |
# NOTE: no need to lock anything here as Blender uses CPython (GIL) |
if self._queued_lines: |
print(*self._queued_lines, sep='\n') |
self._queued_lines.clear() |
# Now, we need to print out the current volatile line, if any. |
if self._volatile_line: |
print(self._volatile_line, end="") |
# If the proc is long running, let us display some elipses so as to not alarm the user |
if self._time_start_step != 0: |
if (time.perf_counter() - self._time_start_step) > _MAX_TIME_UNTIL_ELIPSES: |
num_dots = 0 if signalled or num_dots == _MAX_ELIPSES else num_dots + 1 |
else: |
num_dots = 0 |
print('.' * num_dots, end=" " * (_MAX_ELIPSES - num_dots)) |
def _progress_get_current(self): |
return self._step_progress |
def _progress_set_current(self, value): |
assert self._step_id != -1 |
self._step_progress = value |
if self._step_max != 0: |
self._progress_print_step() |
progress_value = property(_progress_get_current, _progress_set_current) |
class ExportVerboseLogger(_ExportLogger): |
def __init__(self, age_path): |
super().__init__(True, age_path) |
self.progress_range = 0 |
self.progress_value = 0 |
def __exit__(self, type, value, traceback): |
if value is not None: |
export_time = time.perf_counter() - self._time_start_overall |
self.msg("\nAborted after {:.2f}s", export_time) |
self.msg("Error: {}", value) |
return super().__exit__(type, value, traceback)