"""Parse and analyze logging files (:mod:`fluidimage.topologies.log`)
=====================================================================
.. autoclass:: LogTopology
:members:
:private-members:
"""
import time
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
from fluiddyn.util import is_run_from_ipython
if is_run_from_ipython():
plt.ion()
def float_no_valueerror(word):
try:
return float(word)
except ValueError:
return np.nan
class DataLogFile:
def __init__(self, path_file):
self.works = works = []
self.works_ended = works_ended = []
self.nb_cpus_allowed = None
self.nb_max_workers = None
self.executor_name = None
self.topology_name = None
self._title = str(path_file)
print(f"Parsing log file: {path_file}")
with open(path_file, "r", encoding="utf-8") as logfile:
for iline, line in enumerate(logfile):
if iline % 100 == 0:
print(f"\rparse line {iline}", end="", flush=True)
if line.startswith("ERROR: "):
continue
if self.nb_cpus_allowed is None and line.startswith(
" nb_cpus_allowed = "
):
self.nb_cpus_allowed = int(line.split()[2])
self._title += f", nb_cpus_allowed = {self.nb_cpus_allowed}"
if self.nb_max_workers is None and line.startswith(
" nb_max_workers = "
):
self.nb_max_workers = int(line.split()[2])
self._title += f", nb_max_workers = {self.nb_max_workers}"
if self.topology_name is None:
begin = " topology: "
if line.startswith(begin):
self.topology_name = line.split(begin)[1].strip()
if self.executor_name is None:
begin = " executor: "
if line.startswith(begin):
self.executor_name = line.split(begin)[1].strip()
if ". mem usage: " in line:
# to remove the characters coding for color
line = line[5:]
words = line.split()
try:
mem = float(words[-2])
except ValueError:
pass
if ". Launch work " in line:
name = words[4]
key = words[5][1:-2]
t = float(words[0])
works.append(
{
"name": name,
"key": key,
"mem_start": mem,
"time": t,
}
)
else:
date = words[0][:-1]
t = time.mktime(
time.strptime(date[:-3], "%Y-%m-%d_%H-%M-%S")
) + float(date[-3:])
if ": starting execution. mem usage" in line:
self.date_start = date
self.mem_start = mem
time_start = t
elif ": end of `compute`. mem usage" in line:
self.date_end = date
self.duration = t - time_start
self.mem_end = mem
if line.startswith("work ") and " done in " in line:
words = line.split()
name = words[1]
key = words[2][1:-1]
try:
duration = float(words[-2])
except ValueError:
pass
else:
works_ended.append(
{"name": name, "key": key, "duration": duration}
)
print("\rdone" + 20 * " ")
[docs]class LogTopology:
"""Parse and analyze logging files."""
def __init__(self, path):
# path can point towards:
# - the result directory
# - a log file
# - a log directory
path = Path(path)
if path.is_dir() and not path.name.startswith("log_"):
paths = sorted(path.glob("log_*"))
if not paths:
raise ValueError("No log files found in the current directory.")
# last saved file
path = paths[-1]
if path.is_file():
path_log_file = path
path_log_dir = path.parent
else:
path_log_dir = path
paths = sorted(path_log_dir.glob("log_*.txt"))
if not paths:
raise ValueError(f"No log files found in {path_log_dir}.")
path_log_file = paths[-1]
self.path_log_file = path_log_file
self.path_log_dir = path_log_dir
self._title = str(path_log_file.name)
data_main_file = DataLogFile(path_log_file)
self.works = data_main_file.works
self.works_ended = data_main_file.works_ended
self.nb_cpus_allowed = data_main_file.nb_cpus_allowed
self.nb_max_workers = data_main_file.nb_max_workers
self.executor_name = data_main_file.executor_name
self.topology_name = data_main_file.topology_name
self.mem_start = data_main_file.mem_start
try:
self.duration = data_main_file.duration
except AttributeError:
pass
else:
self.mem_end = data_main_file.mem_end
self.paths_log_files = sorted(path_log_dir.glob("process_*.txt"))
for path_log_process in self.paths_log_files:
data = DataLogFile(path_log_process)
self.works.extend(data.works)
self.works_ended.extend(data.works_ended)
self.names_works = names_works = []
for work in self.works:
if work["name"] not in names_works:
names_works.append(work["name"])
self.durations = durations = {}
self.times = times = {}
self.keys = keys = {}
for name in self.names_works:
times[name] = []
keys[name] = []
for work in self.works:
if work["name"] == name:
times[name].append(work["time"])
keys[name].append(work["key"])
works_ended_name = [
work for work in self.works_ended if work["name"] == name
]
index_vs_keys = {
work["key"]: index for index, work in enumerate(works_ended_name)
}
durations[name] = []
for key in keys[name]:
try:
index_key = index_vs_keys[key]
except KeyError:
founded = False
else:
founded = True
work = works_ended_name[index_key]
durations[name].append(work["duration"])
if not founded:
durations[name].append(np.nan)
[docs] def plot_memory(self):
"""Plot the memory usage versus time."""
plt.figure()
ax = plt.gca()
ax.set_xlabel("time (s)")
ax.set_ylabel("memory (Mo)")
ax.set_title(self._title, fontdict={"fontsize": 12})
memories = np.empty(len(self.works))
times = np.empty(len(self.works))
for i, work in enumerate(self.works):
memories[i] = work["mem_start"]
times[i] = work["time"]
ax.plot(times, memories, "o-")
ax.plot(0, self.mem_start, "x")
if hasattr(self, "duration"):
ax.plot(self.duration, self.mem_end, "x")
plt.show()
[docs] def plot_durations(self):
"""Plot the duration of the works."""
plt.figure()
ax = plt.gca()
ax.set_xlabel("time (s)")
ax.set_ylabel("duration (s)")
ax.set_title(self._title, fontdict={"fontsize": 12})
lines = []
for i, name in enumerate(self.names_works):
times = np.array(self.times[name])
durations = self.durations[name]
(l,) = ax.plot(times, durations, f"C{i}o")
lines.append(l)
for it, t in enumerate(times):
d = durations[it]
ax.plot([t, t + d], [d, d], f"C{i}")
d = np.nanmean(durations)
ax.plot([times.min(), times.max()], [d, d], f"C{i}:", linewidth=2)
ax.legend(lines, self.names_works, loc="center left", fontsize="x-small")
plt.show()
[docs] def plot_nb_workers(self, str_names=None):
"""Plot the number of workers versus time."""
if str_names is not None:
names = [name for name in self.names_works if str_names in name]
else:
names = self.names_works
nb_workers = {}
times = {}
for name in names:
times_start = list(self.times[name])
times_stop = list(
np.array(times_start) + np.array(self.durations[name])
)
deltas = np.array([1 for t in times_start] + [-1 for t in times_stop])
times_unsorted = np.array(times_start + times_stop)
argsort = np.argsort(times_unsorted)
ts = times_unsorted[argsort]
nbws = np.cumsum(deltas[argsort])
ts2 = []
nbws2 = []
for i, t in enumerate(ts[:-1]):
nbw = nbws[i]
ts2.append(t)
ts2.append(ts[i + 1])
nbws2.append(nbw)
nbws2.append(nbw)
times[name] = ts2
nb_workers[name] = nbws2
plt.figure()
ax = plt.gca()
ax.set_xlabel("time (s)")
ax.set_ylabel("number of workers")
ax.set_title(self._title, fontdict={"fontsize": 12})
lines = []
for i, name in enumerate(self.names_works):
(l,) = ax.plot(times[name], nb_workers[name], f"C{i}-")
lines.append(l)
ax.legend(lines, names, loc="center left", fontsize="x-small")
plt.show()