Module library.classes.models
Expand source code
import os
import numpy as np
import tensorflow as tf
from library.classes.losses import BackmappingRelativeVectorLoss
from library.classes.generators import PADDING_X, PADDING_Y
import time
import matplotlib.pyplot as plt
class CNN:
def __init__(
input_size: tuple,
output_size: tuple,
display_name: str,
data_prefix: str,
keep_checkpoints: bool = False,
load_path: str = None,
loss: tf.keras.losses.Loss = tf.keras.losses.MeanSquaredError(),
test_sample: tuple = None,
This is the base class for all CNNs. It contains the basic structure of the CNN and the fit function.
input_size (tuple): The size of the input. Should be (x, y, 1)
output_size (tuple): The size of the output. Should be (x, y, 1)
display_name (str): The name of the model. Used for displaying the model summary and saving checkpoints/logs.
data_prefix (str): The prefix for all data. Used for saving the model and saving tensorboard logs.
keep_checkpoints (bool, optional): If true checkpoints will be kept. Defaults to False.
load_path (str, optional): The path to the weights to load. Defaults to None.
loss (tf.keras.losses.Loss, optional): The loss function to use. Defaults to tf.keras.losses.MeanSquaredError().
test_sample (tuple, optional): The test sample to track after each epoch. Defaults to None.
self.display_name = display_name
self.keep_checkpoints = keep_checkpoints
self.load_path = load_path
self.data_prefix = data_prefix
self.loss = loss
self.test_sample = test_sample
self.current_epoch = 0
# For showing samples
self.fig = plt.figure() = self.fig.add_subplot(111, projection='3d')
# Scale the model, this currently only affects the number of filters
scale = 32 # Scaled the filter dimensions by this factor
# The activation function to use for the convolutional layers
conv_activation = tf.keras.layers.LeakyReLU(alpha=0.01)
self.model = tf.keras.Sequential(
##### Input layer #####
tf.keras.layers.Input(input_size, sparse=False),
##### Encoder #####
filters=2**0 * scale,
kernel_size=(2, 2),
strides=(1, 1),
filters=2**1 * scale,
kernel_size=(2, 2),
strides=(1, 1),
filters=2**2 * scale,
kernel_size=(2, 1),
strides=(1, 1),
filters=2**3 * scale,
kernel_size=(2, 1),
strides=(1, 1),
filters=2**4 * scale,
kernel_size=(2, 1),
strides=(2, 1),
filters=2**5 * scale,
kernel_size=(2, 1),
strides=(2, 1),
filters=2**6 * scale,
kernel_size=(2, 1),
strides=(2, 1),
filters=2**7 * scale,
kernel_size=(2, 1),
strides=(2, 1),
pool_size=(3, 3),
##### Output #####
tf.keras.layers.Dense(, activation='tanh', kernel_initializer=tf.keras.initializers.Zeros()), # tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.1)),
], name=self.display_name)
# Compile the model
# learning_rate=0.00001, # Start with 1% of the default learning rate
metrics=['accuracy', 'mae'],
# Load weights if path is given
if load_path is not None and os.path.exists(load_path):
print("Loaded weights from " + load_path)
def fit(self,
Trains the model with the given data
train_generator (tf.keras.utils.Sequence): The training data generator
validation_gen (tf.keras.utils.Sequence): The validation data generator
epochs (int, optional): The number of epochs to train. Defaults to 150.
batch_size (int, optional): The batch size. Defaults to 64.
verbose (int, optional): The verbosity level. Defaults to 1.
early_stop (bool, optional): If true early stop will be used. Defaults to False.
# Create hist dir if it does not exist
if not os.path.exists(os.path.join(self.data_prefix, "hist")):
os.makedirs(os.path.join(self.data_prefix, "hist"))
callbacks = [
# The BackupAndRestore callback saves checkpoints every save_freq batches,
# this is set to 1 epoch here to save after each epoch. Make sure to set
# this to a value greater than one when training on a large dataset, because
# it can take a long time to save checkpoints.
backup_dir=os.path.join(self.data_prefix, "backup", self.display_name),
# save_freq=1,
delete_checkpoint=not self.keep_checkpoints,
# save_before_preemption=False,
# The ReduceLROnPlateau callback monitors a quantity and if no improvement
# is seen for a 'patience' number of epochs, the learning rate is reduced.
# Here, it monitors 'val_loss' and if no improvement is seen for 10 epochs,
# The CSVLogger callback streams epoch results to a CSV file.
# #TODO Send the CSV file via email or telegram after each epoch
# to monitor the training progress.
os.path.join(self.data_prefix, "hist", f"training_history_{self.display_name}.csv"),
# The TensorBoard callback writes a log for TensorBoard, which allows
# you to visualize dynamic graphs of your training and test metrics,
# as well as activation histograms for the different layers in your model.
log_dir=os.path.join(self.data_prefix, 'tensorboard', self.display_name),
# Update the current epoch and in the future other stuff
tf.keras.callbacks.LambdaCallback(on_epoch_end=lambda epoch, logs: self.update_internals(epoch, logs)),
# Track the test sample after each epoch
# tf.keras.callbacks.LambdaCallback(on_batch_end=lambda epoch, logs: self.track_test_samle(epoch, logs)),
# Track the test sample after each batch (live)
# tf.keras.callbacks.LambdaCallback(on_batch_end=lambda batch, logs: self.live_track_test_samle(batch, logs)),
# Add custom metrics to tensorboard
tf.keras.callbacks.LambdaCallback(on_batch_end=lambda batch, logs: self.custom_tensorboard_metrics(batch, logs)),
if early_stop:
# The EarlyStopping callback is used to exit the training early if
# the validation loss does not improve after 20 epochs.
# This makes sure that the model does not overfit the training data.
# Open plot for live tracking TODO: fix this
# self.fig.canvas.draw()
# Here we use generators to generate batches of data for the training.
# The training data is currently generated by the RelativeVectorsTrainingDataGenerator
# and is the main thing that changes the input data structure to the desired output.
self.hist =
def track_test_samle(self, batch, logs=None):
Tracks the test sample after each batch and saves the figure to the hist folder.
batch (int): The current batch
logs (dict, optional): The logs from the training. Defaults to None.
# Predict the output
pred = self.model.predict(self.test_sample[0][0:1, :, :, :])[0, :, :, 0]
true = self.test_sample[1][0, :, :, 0]
# Remove padding
# Make 3D scatter
pred_positions = []
true_positions = []
for i, bond in enumerate(pred):
pred_positions.append(pred[i, :])
for i, bond in enumerate(true):
true_positions.append(true[i, :])
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
ax.scatter([x[0] for x in true_positions], [x[1] for x in true_positions], [x[2] for x in true_positions], c='b', marker='o')
ax.scatter([x[0] for x in pred_positions], [x[1] for x in pred_positions], [x[2] for x in pred_positions], c='r', marker='o')
# Draw lines between the atoms predicted and true positions
for i in range(len(true_positions)):
ax.plot([true_positions[i][0], pred_positions[i][0]], [true_positions[i][1], pred_positions[i][1]], [true_positions[i][2], pred_positions[i][2]], color='gray', linestyle='-', linewidth=0.1)
# Add legend
ax.legend(["True", "Predicted"])
# Add labels
ax.set_title(f"Atom Positions Epoch {self.current_epoch:03d} Batch {batch:03d}")
# Fix min, max of every coordinate of the true positions
min_x = min([x[0] for x in true_positions])
max_x = max([x[0] for x in true_positions])
min_y = min([x[1] for x in true_positions])
max_y = max([x[1] for x in true_positions])
min_z = min([x[2] for x in true_positions])
max_z = max([x[2] for x in true_positions])
# Make size fixed with padding
padding_factor = 0.1
scale_factor = 1
fixed_padding = 0.1
ax.set_xlim(scale_factor * min_x - padding_factor * (max_x - min_x) - fixed_padding, scale_factor * max_x + padding_factor * (max_x - min_x) + fixed_padding)
ax.set_ylim(scale_factor * min_y - padding_factor * (max_y - min_y) - fixed_padding, scale_factor * max_y + padding_factor * (max_y - min_y) + fixed_padding)
ax.set_zlim(scale_factor * min_z - padding_factor * (max_z - min_z) - fixed_padding, scale_factor * max_z + padding_factor * (max_z - min_z) + fixed_padding)
# Make angle fixed from 45 in x and 45 in y
ax.view_init(azim=-40, elev=35)
# Create directory if it does not exist
if not os.path.exists(os.path.join(self.data_prefix, "hist")):
os.makedirs(os.path.join(self.data_prefix, "hist"))
if not os.path.exists(os.path.join(self.data_prefix, "hist", "pred")):
os.makedirs(os.path.join(self.data_prefix, "hist", "pred"))
# Find the max number of the files in the folder
index = len(os.listdir(os.path.join(self.data_prefix, "hist", "pred")))
# Save the figure
# TODO: Fix save folder for atom specific models
fig.savefig(os.path.join(self.data_prefix, "hist", "pred", f"batch_{index}.png"))
def live_track_test_samle(self, batch, logs=None):
Does the same as track_test_samle but live. This is not working yet.
batch (int): The current batch
logs (dict, optional): The logs from the training. Defaults to None.
# Predict the output
pred = self.model.predict(self.test_sample[0][0:1, :, :, :])[0, :, :, 0]
true = self.test_sample[1][0, :, :, 0]
# Make 3D scatter
pred_positions = []
true_positions = []
for i, bond in enumerate(pred):
pred_positions.append(pred[i, :])
for i, bond in enumerate(true):
true_positions.append(true[i, :])[x[0] for x in true_positions], [x[1] for x in true_positions], [x[2] for x in true_positions], c='b', marker='o')[x[0] for x in pred_positions], [x[1] for x in pred_positions], [x[2] for x in pred_positions], c='r', marker='o')
# Add legend["True", "Predicted"])
# Add labels'X')'Y')'Z')"Atom Positions Batch {batch}")
# Fix min, max of every coordinate of the true positions
min_x = min([x[0] for x in true_positions])
max_x = max([x[0] for x in true_positions])
min_y = min([x[1] for x in true_positions])
max_y = max([x[1] for x in true_positions])
min_z = min([x[2] for x in true_positions])
max_z = max([x[2] for x in true_positions])
# Make size fixed with padding
padding_factor = 0 - padding_factor * (max_x - min_x), max_x + padding_factor * (max_x - min_x)) - padding_factor * (max_y - min_y), max_y + padding_factor * (max_y - min_y)) - padding_factor * (max_z - min_z), max_z + padding_factor * (max_z - min_z))
# Make angle fixed from 45 in x and 45 in y, elev=35)
# Update the plot
def update_internals(self, epoch, logs):
Update the current epoch and in the future other stuff like learning rate etc.
epoch (int): The current epoch
logs (dict): Dict with the logs from the training.
self.current_epoch = epoch
def custom_tensorboard_metrics(self, batch, logs):
Add custom metrics to tensorboard for each batch. This is not working correctly yet.
batch (int): The current batch
logs (dict): Dict with the logs from the training.
# Add custom metrics to tensorboard for each batch
# Write loss to tensorboard
summary_dir1 = os.path.join("data", "tensorboard", self.display_name, "custom")
summary_writer1 = tf.summary.create_file_writer(summary_dir1)
# Set step
step = batch + 148 * batch # TODO: Fix this
with summary_writer1.as_default():
tf.summary.scalar("loss_b", logs["loss"], step=step)
tf.summary.scalar("accuracy_b", logs["accuracy"], step=step)
tf.summary.scalar("mae_b", logs["mae"], step=step)
def test(self, data_generator):
Test the model with a test data generator
X = data_generator.__getitem__(0)[0]
Y = data_generator.__getitem__(0)[1]
loss, acc = self.model.evaluate(X, Y, verbose=0)
print(f"CNN-Test: acc = {100*acc:5.2f}%, loss = {loss:7.4f}")
def find_dataset_with_lowest_loss(self, data_generator):
Finds the dataset with the lowest loss
lowest_loss = 100000
lowest_loss_dataset = None
for i in range(data_generator.__len__()):
X = data_generator.__getitem__(i)[0]
Y = data_generator.__getitem__(i)[1]
loss, acc = self.model.evaluate(X, Y, verbose=0)
if loss < lowest_loss:
lowest_loss = loss
lowest_loss_dataset = i
return lowest_loss_dataset
def predict(self, x):
Predicts the output for a given input
return self.model.predict(x)
def activation(self, x, layer_name):
Returns the activation map of a given input for a given layer
for layer in self.model.layers:
if == layer_name:
x =
def save(self, path=None):
Saves the model
path = self.load_path if path is None else path
class CNN (input_size: tuple, output_size: tuple, display_name: str, data_prefix: str, keep_checkpoints: bool = False, load_path: str = None, loss: keras.losses.Loss = <keras.losses.MeanSquaredError object>, test_sample: tuple = None)
This is the base class for all CNNs. It contains the basic structure of the CNN and the fit function.
- The size of the input. Should be (x, y, 1)
- The size of the output. Should be (x, y, 1)
- The name of the model. Used for displaying the model summary and saving checkpoints/logs.
- The prefix for all data. Used for saving the model and saving tensorboard logs.
, optional- If true checkpoints will be kept. Defaults to False.
, optional- The path to the weights to load. Defaults to None.
, optional- The loss function to use. Defaults to tf.keras.losses.MeanSquaredError().
, optional- The test sample to track after each epoch. Defaults to None.
Expand source code
class CNN: def __init__( self, input_size: tuple, output_size: tuple, display_name: str, data_prefix: str, keep_checkpoints: bool = False, load_path: str = None, loss: tf.keras.losses.Loss = tf.keras.losses.MeanSquaredError(), test_sample: tuple = None, ): """ This is the base class for all CNNs. It contains the basic structure of the CNN and the fit function. Args: input_size (tuple): The size of the input. Should be (x, y, 1) output_size (tuple): The size of the output. Should be (x, y, 1) display_name (str): The name of the model. Used for displaying the model summary and saving checkpoints/logs. data_prefix (str): The prefix for all data. Used for saving the model and saving tensorboard logs. keep_checkpoints (bool, optional): If true checkpoints will be kept. Defaults to False. load_path (str, optional): The path to the weights to load. Defaults to None. loss (tf.keras.losses.Loss, optional): The loss function to use. Defaults to tf.keras.losses.MeanSquaredError(). test_sample (tuple, optional): The test sample to track after each epoch. Defaults to None. """ super().__init__() self.display_name = display_name self.keep_checkpoints = keep_checkpoints self.load_path = load_path self.data_prefix = data_prefix self.loss = loss self.test_sample = test_sample self.current_epoch = 0 # For showing samples self.fig = plt.figure() = self.fig.add_subplot(111, projection='3d') # Scale the model, this currently only affects the number of filters scale = 32 # Scaled the filter dimensions by this factor # The activation function to use for the convolutional layers conv_activation = tf.keras.layers.LeakyReLU(alpha=0.01) self.model = tf.keras.Sequential( [ ##### Input layer ##### tf.keras.layers.Input(input_size, sparse=False), ##### Encoder ##### tf.keras.layers.Conv2D( filters=2**0 * scale, kernel_size=(2, 2), strides=(1, 1), padding='valid', activation=conv_activation, ), tf.keras.layers.Conv2D( filters=2**1 * scale, kernel_size=(2, 2), strides=(1, 1), padding='valid', activation=conv_activation, ), tf.keras.layers.Conv2D( filters=2**2 * scale, kernel_size=(2, 1), strides=(1, 1), padding='valid', activation=conv_activation, ), tf.keras.layers.Conv2D( filters=2**3 * scale, kernel_size=(2, 1), strides=(1, 1), padding='valid', activation=conv_activation, ), tf.keras.layers.Conv2D( filters=2**4 * scale, kernel_size=(2, 1), strides=(2, 1), padding='valid', activation=conv_activation, ), tf.keras.layers.Conv2D( filters=2**5 * scale, kernel_size=(2, 1), strides=(2, 1), padding='valid', activation=conv_activation, ), tf.keras.layers.Conv2D( filters=2**6 * scale, kernel_size=(2, 1), strides=(2, 1), padding='valid', activation=conv_activation, ), tf.keras.layers.Conv2D( filters=2**7 * scale, kernel_size=(2, 1), strides=(2, 1), padding='valid', activation=conv_activation, ), tf.keras.layers.MaxPool2D( pool_size=(3, 3), ), tf.keras.layers.BatchNormalization(), ##### Output ##### tf.keras.layers.Flatten(), tf.keras.layers.Dense(, activation='tanh', kernel_initializer=tf.keras.initializers.Zeros()), # tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.1)), tf.keras.layers.Reshape(output_size) ], name=self.display_name) # Compile the model self.model.compile( optimizer=tf.optimizers.Adam( # learning_rate=0.00001, # Start with 1% of the default learning rate ), loss=self.loss, metrics=['accuracy', 'mae'], run_eagerly=True ) self.model.summary() # Load weights if path is given if load_path is not None and os.path.exists(load_path): self.model.load_weights(load_path) print("Loaded weights from " + load_path) def fit(self, train_generator, validation_gen, epochs=150, batch_size=64, verbose=1, early_stop=False, ): """ Trains the model with the given data Args: train_generator (tf.keras.utils.Sequence): The training data generator validation_gen (tf.keras.utils.Sequence): The validation data generator epochs (int, optional): The number of epochs to train. Defaults to 150. batch_size (int, optional): The batch size. Defaults to 64. verbose (int, optional): The verbosity level. Defaults to 1. early_stop (bool, optional): If true early stop will be used. Defaults to False. """ # Create hist dir if it does not exist if not os.path.exists(os.path.join(self.data_prefix, "hist")): os.makedirs(os.path.join(self.data_prefix, "hist")) callbacks = [ # The BackupAndRestore callback saves checkpoints every save_freq batches, # this is set to 1 epoch here to save after each epoch. Make sure to set # this to a value greater than one when training on a large dataset, because # it can take a long time to save checkpoints. tf.keras.callbacks.experimental.BackupAndRestore( backup_dir=os.path.join(self.data_prefix, "backup", self.display_name), # save_freq=1, delete_checkpoint=not self.keep_checkpoints, # save_before_preemption=False, ), # The ReduceLROnPlateau callback monitors a quantity and if no improvement # is seen for a 'patience' number of epochs, the learning rate is reduced. # Here, it monitors 'val_loss' and if no improvement is seen for 10 epochs, tf.keras.callbacks.ReduceLROnPlateau( monitor='val_loss', factor=0.75, patience=3, verbose=0, mode='max', min_delta=0.00000005, cooldown=5, min_lr=0.0000000000001, ), # The CSVLogger callback streams epoch results to a CSV file. # #TODO Send the CSV file via email or telegram after each epoch # to monitor the training progress. tf.keras.callbacks.CSVLogger( os.path.join(self.data_prefix, "hist", f"training_history_{self.display_name}.csv"), separator=',', append=True, ), # The TensorBoard callback writes a log for TensorBoard, which allows # you to visualize dynamic graphs of your training and test metrics, # as well as activation histograms for the different layers in your model. tf.keras.callbacks.TensorBoard( log_dir=os.path.join(self.data_prefix, 'tensorboard', self.display_name), histogram_freq=1, write_graph=True, write_images=True, update_freq='batch', ), # Update the current epoch and in the future other stuff tf.keras.callbacks.LambdaCallback(on_epoch_end=lambda epoch, logs: self.update_internals(epoch, logs)), # Track the test sample after each epoch # tf.keras.callbacks.LambdaCallback(on_batch_end=lambda epoch, logs: self.track_test_samle(epoch, logs)), # Track the test sample after each batch (live) # tf.keras.callbacks.LambdaCallback(on_batch_end=lambda batch, logs: self.live_track_test_samle(batch, logs)), # Add custom metrics to tensorboard tf.keras.callbacks.LambdaCallback(on_batch_end=lambda batch, logs: self.custom_tensorboard_metrics(batch, logs)), ] if early_stop: # The EarlyStopping callback is used to exit the training early if # the validation loss does not improve after 20 epochs. # This makes sure that the model does not overfit the training data. callbacks.append( tf.keras.callbacks.EarlyStopping( monitor='val_loss', min_delta=0.0001, patience=20, verbose=0, mode='auto', baseline=None, restore_best_weights=True, )) # Open plot for live tracking TODO: fix this # # self.fig.canvas.draw() # Here we use generators to generate batches of data for the training. # The training data is currently generated by the RelativeVectorsTrainingDataGenerator # and is the main thing that changes the input data structure to the desired output. self.hist = train_generator, validation_data=validation_gen, batch_size=batch_size, shuffle=True, epochs=epochs, verbose=verbose, callbacks=callbacks, ) def track_test_samle(self, batch, logs=None): """ Tracks the test sample after each batch and saves the figure to the hist folder. Args: batch (int): The current batch logs (dict, optional): The logs from the training. Defaults to None. """ # Predict the output pred = self.model.predict(self.test_sample[0][0:1, :, :, :])[0, :, :, 0] true = self.test_sample[1][0, :, :, 0] # Remove padding pred = pred[PADDING_X:-PADDING_X, PADDING_Y:-PADDING_Y] true = true[PADDING_X:-PADDING_X, PADDING_Y:-PADDING_Y] # Make 3D scatter pred_positions = [] true_positions = [] for i, bond in enumerate(pred): pred_positions.append(pred[i, :]) for i, bond in enumerate(true): true_positions.append(true[i, :]) fig = plt.figure() ax = fig.add_subplot(111, projection='3d') ax.scatter([x[0] for x in true_positions], [x[1] for x in true_positions], [x[2] for x in true_positions], c='b', marker='o') ax.scatter([x[0] for x in pred_positions], [x[1] for x in pred_positions], [x[2] for x in pred_positions], c='r', marker='o') # Draw lines between the atoms predicted and true positions for i in range(len(true_positions)): ax.plot([true_positions[i][0], pred_positions[i][0]], [true_positions[i][1], pred_positions[i][1]], [true_positions[i][2], pred_positions[i][2]], color='gray', linestyle='-', linewidth=0.1) # Add legend ax.legend(["True", "Predicted"]) # Add labels ax.set_xlabel('X') ax.set_ylabel('Y') ax.set_zlabel('Z') ax.set_title(f"Atom Positions Epoch {self.current_epoch:03d} Batch {batch:03d}") # Fix min, max of every coordinate of the true positions min_x = min([x[0] for x in true_positions]) max_x = max([x[0] for x in true_positions]) min_y = min([x[1] for x in true_positions]) max_y = max([x[1] for x in true_positions]) min_z = min([x[2] for x in true_positions]) max_z = max([x[2] for x in true_positions]) # Make size fixed with padding padding_factor = 0.1 scale_factor = 1 fixed_padding = 0.1 ax.set_xlim(scale_factor * min_x - padding_factor * (max_x - min_x) - fixed_padding, scale_factor * max_x + padding_factor * (max_x - min_x) + fixed_padding) ax.set_ylim(scale_factor * min_y - padding_factor * (max_y - min_y) - fixed_padding, scale_factor * max_y + padding_factor * (max_y - min_y) + fixed_padding) ax.set_zlim(scale_factor * min_z - padding_factor * (max_z - min_z) - fixed_padding, scale_factor * max_z + padding_factor * (max_z - min_z) + fixed_padding) # Make angle fixed from 45 in x and 45 in y ax.view_init(azim=-40, elev=35) # Create directory if it does not exist if not os.path.exists(os.path.join(self.data_prefix, "hist")): os.makedirs(os.path.join(self.data_prefix, "hist")) if not os.path.exists(os.path.join(self.data_prefix, "hist", "pred")): os.makedirs(os.path.join(self.data_prefix, "hist", "pred")) # Find the max number of the files in the folder index = len(os.listdir(os.path.join(self.data_prefix, "hist", "pred"))) # Save the figure # TODO: Fix save folder for atom specific models fig.savefig(os.path.join(self.data_prefix, "hist", "pred", f"batch_{index}.png")) def live_track_test_samle(self, batch, logs=None): """ Does the same as track_test_samle but live. This is not working yet. Args: batch (int): The current batch logs (dict, optional): The logs from the training. Defaults to None. """ # Predict the output pred = self.model.predict(self.test_sample[0][0:1, :, :, :])[0, :, :, 0] true = self.test_sample[1][0, :, :, 0] # Make 3D scatter pred_positions = [] true_positions = [] for i, bond in enumerate(pred): pred_positions.append(pred[i, :]) for i, bond in enumerate(true): true_positions.append(true[i, :])[x[0] for x in true_positions], [x[1] for x in true_positions], [x[2] for x in true_positions], c='b', marker='o')[x[0] for x in pred_positions], [x[1] for x in pred_positions], [x[2] for x in pred_positions], c='r', marker='o') # Add legend["True", "Predicted"]) # Add labels'X')'Y')'Z')"Atom Positions Batch {batch}") # Fix min, max of every coordinate of the true positions min_x = min([x[0] for x in true_positions]) max_x = max([x[0] for x in true_positions]) min_y = min([x[1] for x in true_positions]) max_y = max([x[1] for x in true_positions]) min_z = min([x[2] for x in true_positions]) max_z = max([x[2] for x in true_positions]) # Make size fixed with padding padding_factor = 0 - padding_factor * (max_x - min_x), max_x + padding_factor * (max_x - min_x)) - padding_factor * (max_y - min_y), max_y + padding_factor * (max_y - min_y)) - padding_factor * (max_z - min_z), max_z + padding_factor * (max_z - min_z)) # Make angle fixed from 45 in x and 45 in y, elev=35) # Update the plot self.fig.canvas.draw() self.fig.canvas.flush_events() def update_internals(self, epoch, logs): """ Update the current epoch and in the future other stuff like learning rate etc. Args: epoch (int): The current epoch logs (dict): Dict with the logs from the training. """ self.current_epoch = epoch def custom_tensorboard_metrics(self, batch, logs): """ Add custom metrics to tensorboard for each batch. This is not working correctly yet. Args: batch (int): The current batch logs (dict): Dict with the logs from the training. """ # Add custom metrics to tensorboard for each batch # Write loss to tensorboard summary_dir1 = os.path.join("data", "tensorboard", self.display_name, "custom") summary_writer1 = tf.summary.create_file_writer(summary_dir1) # Set step step = batch + 148 * batch # TODO: Fix this with summary_writer1.as_default(): tf.summary.scalar("loss_b", logs["loss"], step=step) tf.summary.scalar("accuracy_b", logs["accuracy"], step=step) tf.summary.scalar("mae_b", logs["mae"], step=step) def test(self, data_generator): """ Test the model with a test data generator """ X = data_generator.__getitem__(0)[0] Y = data_generator.__getitem__(0)[1] loss, acc = self.model.evaluate(X, Y, verbose=0) print(f"CNN-Test: acc = {100*acc:5.2f}%, loss = {loss:7.4f}") def find_dataset_with_lowest_loss(self, data_generator): """ Finds the dataset with the lowest loss """ lowest_loss = 100000 lowest_loss_dataset = None for i in range(data_generator.__len__()): X = data_generator.__getitem__(i)[0] Y = data_generator.__getitem__(i)[1] loss, acc = self.model.evaluate(X, Y, verbose=0) if loss < lowest_loss: lowest_loss = loss lowest_loss_dataset = i return lowest_loss_dataset def predict(self, x): """ Predicts the output for a given input """ return self.model.predict(x) def activation(self, x, layer_name): """ Returns the activation map of a given input for a given layer """ for layer in self.model.layers: if == layer_name: return else: x = def save(self, path=None): """ Saves the model """ path = self.load_path if path is None else path
def activation(self, x, layer_name)
Returns the activation map of a given input for a given layer
Expand source code
def activation(self, x, layer_name): """ Returns the activation map of a given input for a given layer """ for layer in self.model.layers: if == layer_name: return else: x =
def custom_tensorboard_metrics(self, batch, logs)
Add custom metrics to tensorboard for each batch. This is not working correctly yet.
- The current batch
- Dict with the logs from the training.
Expand source code
def custom_tensorboard_metrics(self, batch, logs): """ Add custom metrics to tensorboard for each batch. This is not working correctly yet. Args: batch (int): The current batch logs (dict): Dict with the logs from the training. """ # Add custom metrics to tensorboard for each batch # Write loss to tensorboard summary_dir1 = os.path.join("data", "tensorboard", self.display_name, "custom") summary_writer1 = tf.summary.create_file_writer(summary_dir1) # Set step step = batch + 148 * batch # TODO: Fix this with summary_writer1.as_default(): tf.summary.scalar("loss_b", logs["loss"], step=step) tf.summary.scalar("accuracy_b", logs["accuracy"], step=step) tf.summary.scalar("mae_b", logs["mae"], step=step)
def find_dataset_with_lowest_loss(self, data_generator)
Finds the dataset with the lowest loss
Expand source code
def find_dataset_with_lowest_loss(self, data_generator): """ Finds the dataset with the lowest loss """ lowest_loss = 100000 lowest_loss_dataset = None for i in range(data_generator.__len__()): X = data_generator.__getitem__(i)[0] Y = data_generator.__getitem__(i)[1] loss, acc = self.model.evaluate(X, Y, verbose=0) if loss < lowest_loss: lowest_loss = loss lowest_loss_dataset = i return lowest_loss_dataset
def fit(self, train_generator, validation_gen, epochs=150, batch_size=64, verbose=1, early_stop=False)
Trains the model with the given data
- The training data generator
- The validation data generator
, optional- The number of epochs to train. Defaults to 150.
, optional- The batch size. Defaults to 64.
, optional- The verbosity level. Defaults to 1.
, optional- If true early stop will be used. Defaults to False.
Expand source code
def fit(self, train_generator, validation_gen, epochs=150, batch_size=64, verbose=1, early_stop=False, ): """ Trains the model with the given data Args: train_generator (tf.keras.utils.Sequence): The training data generator validation_gen (tf.keras.utils.Sequence): The validation data generator epochs (int, optional): The number of epochs to train. Defaults to 150. batch_size (int, optional): The batch size. Defaults to 64. verbose (int, optional): The verbosity level. Defaults to 1. early_stop (bool, optional): If true early stop will be used. Defaults to False. """ # Create hist dir if it does not exist if not os.path.exists(os.path.join(self.data_prefix, "hist")): os.makedirs(os.path.join(self.data_prefix, "hist")) callbacks = [ # The BackupAndRestore callback saves checkpoints every save_freq batches, # this is set to 1 epoch here to save after each epoch. Make sure to set # this to a value greater than one when training on a large dataset, because # it can take a long time to save checkpoints. tf.keras.callbacks.experimental.BackupAndRestore( backup_dir=os.path.join(self.data_prefix, "backup", self.display_name), # save_freq=1, delete_checkpoint=not self.keep_checkpoints, # save_before_preemption=False, ), # The ReduceLROnPlateau callback monitors a quantity and if no improvement # is seen for a 'patience' number of epochs, the learning rate is reduced. # Here, it monitors 'val_loss' and if no improvement is seen for 10 epochs, tf.keras.callbacks.ReduceLROnPlateau( monitor='val_loss', factor=0.75, patience=3, verbose=0, mode='max', min_delta=0.00000005, cooldown=5, min_lr=0.0000000000001, ), # The CSVLogger callback streams epoch results to a CSV file. # #TODO Send the CSV file via email or telegram after each epoch # to monitor the training progress. tf.keras.callbacks.CSVLogger( os.path.join(self.data_prefix, "hist", f"training_history_{self.display_name}.csv"), separator=',', append=True, ), # The TensorBoard callback writes a log for TensorBoard, which allows # you to visualize dynamic graphs of your training and test metrics, # as well as activation histograms for the different layers in your model. tf.keras.callbacks.TensorBoard( log_dir=os.path.join(self.data_prefix, 'tensorboard', self.display_name), histogram_freq=1, write_graph=True, write_images=True, update_freq='batch', ), # Update the current epoch and in the future other stuff tf.keras.callbacks.LambdaCallback(on_epoch_end=lambda epoch, logs: self.update_internals(epoch, logs)), # Track the test sample after each epoch # tf.keras.callbacks.LambdaCallback(on_batch_end=lambda epoch, logs: self.track_test_samle(epoch, logs)), # Track the test sample after each batch (live) # tf.keras.callbacks.LambdaCallback(on_batch_end=lambda batch, logs: self.live_track_test_samle(batch, logs)), # Add custom metrics to tensorboard tf.keras.callbacks.LambdaCallback(on_batch_end=lambda batch, logs: self.custom_tensorboard_metrics(batch, logs)), ] if early_stop: # The EarlyStopping callback is used to exit the training early if # the validation loss does not improve after 20 epochs. # This makes sure that the model does not overfit the training data. callbacks.append( tf.keras.callbacks.EarlyStopping( monitor='val_loss', min_delta=0.0001, patience=20, verbose=0, mode='auto', baseline=None, restore_best_weights=True, )) # Open plot for live tracking TODO: fix this # # self.fig.canvas.draw() # Here we use generators to generate batches of data for the training. # The training data is currently generated by the RelativeVectorsTrainingDataGenerator # and is the main thing that changes the input data structure to the desired output. self.hist = train_generator, validation_data=validation_gen, batch_size=batch_size, shuffle=True, epochs=epochs, verbose=verbose, callbacks=callbacks, )
def live_track_test_samle(self, batch, logs=None)
Does the same as track_test_samle but live. This is not working yet.
- The current batch
, optional- The logs from the training. Defaults to None.
Expand source code
def live_track_test_samle(self, batch, logs=None): """ Does the same as track_test_samle but live. This is not working yet. Args: batch (int): The current batch logs (dict, optional): The logs from the training. Defaults to None. """ # Predict the output pred = self.model.predict(self.test_sample[0][0:1, :, :, :])[0, :, :, 0] true = self.test_sample[1][0, :, :, 0] # Make 3D scatter pred_positions = [] true_positions = [] for i, bond in enumerate(pred): pred_positions.append(pred[i, :]) for i, bond in enumerate(true): true_positions.append(true[i, :])[x[0] for x in true_positions], [x[1] for x in true_positions], [x[2] for x in true_positions], c='b', marker='o')[x[0] for x in pred_positions], [x[1] for x in pred_positions], [x[2] for x in pred_positions], c='r', marker='o') # Add legend["True", "Predicted"]) # Add labels'X')'Y')'Z')"Atom Positions Batch {batch}") # Fix min, max of every coordinate of the true positions min_x = min([x[0] for x in true_positions]) max_x = max([x[0] for x in true_positions]) min_y = min([x[1] for x in true_positions]) max_y = max([x[1] for x in true_positions]) min_z = min([x[2] for x in true_positions]) max_z = max([x[2] for x in true_positions]) # Make size fixed with padding padding_factor = 0 - padding_factor * (max_x - min_x), max_x + padding_factor * (max_x - min_x)) - padding_factor * (max_y - min_y), max_y + padding_factor * (max_y - min_y)) - padding_factor * (max_z - min_z), max_z + padding_factor * (max_z - min_z)) # Make angle fixed from 45 in x and 45 in y, elev=35) # Update the plot self.fig.canvas.draw() self.fig.canvas.flush_events()
def predict(self, x)
Predicts the output for a given input
Expand source code
def predict(self, x): """ Predicts the output for a given input """ return self.model.predict(x)
def save(self, path=None)
Saves the model
Expand source code
def save(self, path=None): """ Saves the model """ path = self.load_path if path is None else path
def test(self, data_generator)
Test the model with a test data generator
Expand source code
def test(self, data_generator): """ Test the model with a test data generator """ X = data_generator.__getitem__(0)[0] Y = data_generator.__getitem__(0)[1] loss, acc = self.model.evaluate(X, Y, verbose=0) print(f"CNN-Test: acc = {100*acc:5.2f}%, loss = {loss:7.4f}")
def track_test_samle(self, batch, logs=None)
Tracks the test sample after each batch and saves the figure to the hist folder.
- The current batch
, optional- The logs from the training. Defaults to None.
Expand source code
def track_test_samle(self, batch, logs=None): """ Tracks the test sample after each batch and saves the figure to the hist folder. Args: batch (int): The current batch logs (dict, optional): The logs from the training. Defaults to None. """ # Predict the output pred = self.model.predict(self.test_sample[0][0:1, :, :, :])[0, :, :, 0] true = self.test_sample[1][0, :, :, 0] # Remove padding pred = pred[PADDING_X:-PADDING_X, PADDING_Y:-PADDING_Y] true = true[PADDING_X:-PADDING_X, PADDING_Y:-PADDING_Y] # Make 3D scatter pred_positions = [] true_positions = [] for i, bond in enumerate(pred): pred_positions.append(pred[i, :]) for i, bond in enumerate(true): true_positions.append(true[i, :]) fig = plt.figure() ax = fig.add_subplot(111, projection='3d') ax.scatter([x[0] for x in true_positions], [x[1] for x in true_positions], [x[2] for x in true_positions], c='b', marker='o') ax.scatter([x[0] for x in pred_positions], [x[1] for x in pred_positions], [x[2] for x in pred_positions], c='r', marker='o') # Draw lines between the atoms predicted and true positions for i in range(len(true_positions)): ax.plot([true_positions[i][0], pred_positions[i][0]], [true_positions[i][1], pred_positions[i][1]], [true_positions[i][2], pred_positions[i][2]], color='gray', linestyle='-', linewidth=0.1) # Add legend ax.legend(["True", "Predicted"]) # Add labels ax.set_xlabel('X') ax.set_ylabel('Y') ax.set_zlabel('Z') ax.set_title(f"Atom Positions Epoch {self.current_epoch:03d} Batch {batch:03d}") # Fix min, max of every coordinate of the true positions min_x = min([x[0] for x in true_positions]) max_x = max([x[0] for x in true_positions]) min_y = min([x[1] for x in true_positions]) max_y = max([x[1] for x in true_positions]) min_z = min([x[2] for x in true_positions]) max_z = max([x[2] for x in true_positions]) # Make size fixed with padding padding_factor = 0.1 scale_factor = 1 fixed_padding = 0.1 ax.set_xlim(scale_factor * min_x - padding_factor * (max_x - min_x) - fixed_padding, scale_factor * max_x + padding_factor * (max_x - min_x) + fixed_padding) ax.set_ylim(scale_factor * min_y - padding_factor * (max_y - min_y) - fixed_padding, scale_factor * max_y + padding_factor * (max_y - min_y) + fixed_padding) ax.set_zlim(scale_factor * min_z - padding_factor * (max_z - min_z) - fixed_padding, scale_factor * max_z + padding_factor * (max_z - min_z) + fixed_padding) # Make angle fixed from 45 in x and 45 in y ax.view_init(azim=-40, elev=35) # Create directory if it does not exist if not os.path.exists(os.path.join(self.data_prefix, "hist")): os.makedirs(os.path.join(self.data_prefix, "hist")) if not os.path.exists(os.path.join(self.data_prefix, "hist", "pred")): os.makedirs(os.path.join(self.data_prefix, "hist", "pred")) # Find the max number of the files in the folder index = len(os.listdir(os.path.join(self.data_prefix, "hist", "pred"))) # Save the figure # TODO: Fix save folder for atom specific models fig.savefig(os.path.join(self.data_prefix, "hist", "pred", f"batch_{index}.png"))
def update_internals(self, epoch, logs)
Update the current epoch and in the future other stuff like learning rate etc.
- The current epoch
- Dict with the logs from the training.
Expand source code
def update_internals(self, epoch, logs): """ Update the current epoch and in the future other stuff like learning rate etc. Args: epoch (int): The current epoch logs (dict): Dict with the logs from the training. """ self.current_epoch = epoch