forked from OSSInnovation/mindspore
!2770 Capture the time before hand over to the processes pool to avoid time flips
Merge pull request !2770 from LiHongzhang/capture_time
This commit is contained in:
commit
bc42685436
|
@ -91,7 +91,7 @@ def package_graph_event(data):
|
|||
return graph_event
|
||||
|
||||
|
||||
def package_summary_event(data_list, step):
|
||||
def package_summary_event(data_list, step, wall_time):
|
||||
"""
|
||||
Package the summary to event protobuffer.
|
||||
|
||||
|
@ -105,7 +105,7 @@ def package_summary_event(data_list, step):
|
|||
# create the event of summary
|
||||
summary_event = Event()
|
||||
summary = summary_event.summary
|
||||
summary_event.wall_time = time.time()
|
||||
summary_event.wall_time = wall_time
|
||||
summary_event.step = int(step)
|
||||
|
||||
for value in data_list:
|
||||
|
|
|
@ -14,15 +14,16 @@
|
|||
# ============================================================================
|
||||
"""Write events to disk in a base directory."""
|
||||
import os
|
||||
import time
|
||||
from collections import deque
|
||||
from multiprocessing import Pool, Process, Queue, cpu_count
|
||||
|
||||
from ._lineage_adapter import serialize_to_lineage_event
|
||||
from ._summary_adapter import package_graph_event, package_summary_event
|
||||
from ._summary_writer import SummaryWriter, LineageWriter
|
||||
from ._summary_writer import LineageWriter, SummaryWriter
|
||||
|
||||
|
||||
def _pack_data(datadict):
|
||||
def _pack_data(datadict, wall_time):
|
||||
"""Pack data according to which plugin."""
|
||||
result = []
|
||||
summaries, step, mode = [], None, None
|
||||
|
@ -37,7 +38,7 @@ def _pack_data(datadict):
|
|||
step = data.get('step')
|
||||
mode = data.get('mode')
|
||||
if summaries:
|
||||
result.append(['summary', mode, package_summary_event(summaries, step).SerializeToString()])
|
||||
result.append(['summary', mode, package_summary_event(summaries, step, wall_time).SerializeToString()])
|
||||
return result
|
||||
|
||||
|
||||
|
@ -70,7 +71,7 @@ class WriterPool(Process):
|
|||
if not self._queue.empty():
|
||||
action, data = self._queue.get()
|
||||
if action == 'WRITE':
|
||||
deq.append(pool.apply_async(_pack_data, (data,)))
|
||||
deq.append(pool.apply_async(_pack_data, (data, time.time())))
|
||||
elif action == 'FLUSH':
|
||||
for writer in writers:
|
||||
writer.flush()
|
||||
|
|
Loading…
Reference in New Issue