Add LogFile and AvgChunkFile readers

Implements changes proposed in #144
This commit is contained in:
Richard Berger 2021-02-25 15:32:53 -05:00
parent f245467f32
commit 977ba9ff66
No known key found for this signature in database
GPG Key ID: A9E83994E0BA0CAB
3 changed files with 263 additions and 0 deletions

167
python/lammps/formats.py Normal file
View File

@ -0,0 +1,167 @@
# ----------------------------------------------------------------------
# LAMMPS - Large-scale Atomic/Molecular Massively Parallel Simulator
# http://lammps.sandia.gov, Sandia National Laboratories
# Steve Plimpton, sjplimp@sandia.gov
#
# Copyright (2003) Sandia Corporation. Under the terms of Contract
# DE-AC04-94AL85000 with Sandia Corporation, the U.S. Government retains
# certain rights in this software. This software is distributed under
# the GNU General Public License.
#
# See the README file in the top-level LAMMPS directory.
# -------------------------------------------------------------------------
################################################################################
# LAMMPS data formats
# Written by Richard Berger <richard.berger@temple.edu>
################################################################################
import re
class LogFile:
STYLE_DEFAULT = 0
STYLE_MULTI = 1
def __init__(self, filename):
alpha = re.compile(r'[a-df-zA-DF-Z]') # except e or E for floating-point numbers
kvpairs = re.compile(r'([a-zA-Z_0-9]+)\s+=\s*([0-9\.eE\-]+)')
style = LogFile.STYLE_DEFAULT
self.runs = []
self.errors = []
with open(filename, 'rt') as f:
in_thermo = False
in_data_section = False
for line in f:
if "ERROR" in line or "exited on signal" in line:
self.errors.append(line)
elif line.startswith('Step '):
in_thermo = True
in_data_section = True
keys = line.split()
current_run = {}
for k in keys:
current_run[k] = []
elif line.startswith('---------------- Step'):
if not in_thermo:
current_run = {'Step': [], 'CPU': []}
in_thermo = True
in_data_section = True
style = LogFile.STYLE_MULTI
str_step, str_cpu = line.strip('-\n').split('-----')
step = float(str_step.split()[1])
cpu = float(str_cpu.split('=')[1].split()[0])
current_run["Step"].append(step)
current_run["CPU"].append(cpu)
elif line.startswith('Loop time of'):
in_thermo = False
self.runs.append(current_run)
elif in_thermo and in_data_section:
if style == LogFile.STYLE_DEFAULT:
if alpha.search(line):
continue
for k, v in zip(keys, map(float, line.split())):
current_run[k].append(v)
elif style == LogFile.STYLE_MULTI:
if '=' not in line:
in_data_section = False
continue
for k,v in kvpairs.findall(line):
if k not in current_run:
current_run[k] = [float(v)]
else:
current_run[k].append(float(v))
class AvgChunkFile:
def __init__(self, filename):
with open(filename, 'rt') as f:
timestep = None
chunks_read = 0
self.timesteps = []
self.total_count = []
self.chunks = []
for lineno, line in enumerate(f):
if lineno == 0:
if not line.startswith("# Chunk-averaged data for fix"):
raise Exception("Chunk data reader only supports default avg/chunk headers!")
parts = line.split()
self.fix_name = parts[5]
self.group_name = parts[8]
continue
elif lineno == 1:
if not line.startswith("# Timestep Number-of-chunks Total-count"):
raise Exception("Chunk data reader only supports default avg/chunk headers!")
continue
elif lineno == 2:
if not line.startswith("#"):
raise Exception("Chunk data reader only supports default avg/chunk headers!")
columns = line.split()[1:]
ndim = line.count("Coord")
compress = 'OrigID' in line
if ndim > 0:
coord_start = columns.index("Coord1")
coord_end = columns.index("Coord%d" % ndim)
ncount_start = coord_end + 1
data_start = ncount_start + 1
else:
coord_start = None
coord_end = None
ncount_start = 2
data_start = 3
continue
parts = line.split()
if timestep is None:
timestep = int(parts[0])
num_chunks = int(parts[1])
total_count = float(parts[2])
self.timesteps.append(timestep)
for i in range(num_chunks):
self.chunks.append({
'coord' : [],
'ncount' : []
})
elif chunks_read < num_chunks:
chunk = int(parts[0])
ncount = float(parts[ncount_start])
if compress:
chunk_id = int(parts[1])
else:
chunk_id = chunk
current = self.chunks[chunk_id - 1]
current['id'] = chunk_id
current['ncount'].append(ncount)
if ndim > 0:
coord = tuple(map(float, parts[coord_start:coord_end+1]))
current['coord'].append(coord)
for i, data_column in list(enumerate(columns))[data_start:]:
value = float(parts[i])
if data_column in current:
current[data_column].append(value)
else:
current[data_column] = [value]
chunks_read += 1
assert (chunk == chunks_read)
else:
# do not support changing number of chunks
if not (num_chunks == int(parts[1])):
raise Exception("Currently, changing numbers of chunks are not supported.")
timestep = int(parts[0])
total_count = float(parts[2])
chunks_read = 0
self.timesteps.append(timestep)
self.total_count.append(total_count)

View File

@ -79,6 +79,11 @@ if(Python_EXECUTABLE)
COMMAND ${PYTHON_TEST_RUNNER} ${CMAKE_CURRENT_SOURCE_DIR}/python-pylammps.py -v COMMAND ${PYTHON_TEST_RUNNER} ${CMAKE_CURRENT_SOURCE_DIR}/python-pylammps.py -v
WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH}) WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH})
set_tests_properties(PythonPyLammps PROPERTIES ENVIRONMENT "${PYTHON_TEST_ENVIRONMENT}") set_tests_properties(PythonPyLammps PROPERTIES ENVIRONMENT "${PYTHON_TEST_ENVIRONMENT}")
add_test(NAME PythonFormats
COMMAND ${PYTHON_TEST_RUNNER} ${CMAKE_CURRENT_SOURCE_DIR}/python-formats.py -v
WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH})
set_tests_properties(PythonFormats PROPERTIES ENVIRONMENT "${PYTHON_TEST_ENVIRONMENT}")
else() else()
message(STATUS "Skipping Tests for the LAMMPS Python Module: no suitable Python interpreter") message(STATUS "Skipping Tests for the LAMMPS Python Module: no suitable Python interpreter")
endif() endif()

View File

@ -0,0 +1,91 @@
import os
import unittest
from lammps.formats import LogFile, AvgChunkFile
EXAMPLES_DIR=os.path.abspath(os.path.join(__file__, '..', '..', '..', 'examples'))
DEFAULT_STYLE_EXAMPLE_LOG="melt/log.27Nov18.melt.g++.1"
MULTI_STYLE_EXAMPLE_LOG="USER/fep/CC-CO/fep10/log.lammps"
AVG_CHUNK_FILE="VISCOSITY/profile.13Oct16.nemd.2d.g++.1"
class Logfiles(unittest.TestCase):
def testLogFileNotFound(self):
with self.assertRaises(FileNotFoundError):
LogFile('test.log')
def testDefaultLogFile(self):
log = LogFile(os.path.join(EXAMPLES_DIR, DEFAULT_STYLE_EXAMPLE_LOG))
self.assertEqual(len(log.runs), 1)
run = log.runs[0]
self.assertEqual(len(run.keys()), 6)
self.assertIn("Step", run)
self.assertIn("Temp", run)
self.assertIn("E_pair", run)
self.assertIn("E_mol", run)
self.assertIn("TotEng", run)
self.assertIn("Press", run)
self.assertEqual(len(run["Step"]), 6)
self.assertEqual(len(run["Temp"]), 6)
self.assertEqual(len(run["E_pair"]), 6)
self.assertEqual(len(run["E_mol"]), 6)
self.assertEqual(len(run["TotEng"]), 6)
self.assertEqual(len(run["Press"]), 6)
self.assertEqual(log.runs[0]["Step"], [0, 50, 100, 150, 200, 250])
def testMultiLogFile(self):
log = LogFile(os.path.join(EXAMPLES_DIR, MULTI_STYLE_EXAMPLE_LOG))
self.assertEqual(len(log.runs), 2)
run0 = log.runs[0]
run1 = log.runs[1]
self.assertEqual(len(run0.keys()), 15)
self.assertIn("Step", run0)
self.assertIn("CPU", run0)
self.assertIn("TotEng", run0)
self.assertIn("KinEng", run0)
self.assertIn("Temp", run0)
self.assertIn("PotEng", run0)
self.assertIn("E_bond", run0)
self.assertIn("E_angle", run0)
self.assertIn("E_dihed", run0)
self.assertIn("E_impro", run0)
self.assertIn("E_vdwl", run0)
self.assertIn("E_coul", run0)
self.assertIn("E_long", run0)
self.assertIn("Press", run0)
self.assertIn("Volume", run0)
for k in run0:
self.assertEqual(len(run0[k]), 51)
self.assertEqual(run0["Step"], list(range(0,255000, 5000)))
class AvgChunkFiles(unittest.TestCase):
def testAvgChunkFileNotFound(self):
with self.assertRaises(FileNotFoundError):
AvgChunkFile('test.log')
def testRead(self):
cfile = AvgChunkFile(os.path.join(EXAMPLES_DIR, AVG_CHUNK_FILE))
self.assertEqual(cfile.fix_name, "4")
self.assertEqual(cfile.group_name, "all")
self.assertEqual(cfile.timesteps, list(range(10000, 110000, 5000)))
ntimesteps = len(cfile.timesteps)
nchunks = len(cfile.chunks)
self.assertEqual(nchunks, 20)
for i in range(1, nchunks+1):
chunk = cfile.chunks[i-1];
self.assertEqual(chunk['id'], i)
self.assertEqual(len(chunk['coord']), ntimesteps)
self.assertEqual(len(chunk['ncount']), ntimesteps)
self.assertIn("vx", chunk)
self.assertEqual(len(chunk['vx']), ntimesteps)
self.assertEqual(len(chunk['coord'][0]), 1)
if __name__ == "__main__":
unittest.main()