Source code for pyEpiabm.routine.simulation
#
# Simulates a complete pandemic
#
import random
import os
import logging
import typing
import numpy as np
import pandas as pd
from tqdm import tqdm
from pyEpiabm.core import Parameters, Population
from pyEpiabm.output import _CsvDictWriter
from pyEpiabm.output import AbstractReporter
from pyEpiabm.property import InfectionStatus
from pyEpiabm.sweep import AbstractSweep
from pyEpiabm.utility import log_exceptions
[docs]
class Simulation:
"""Class to run a full simulation.
"""
def __init__(self):
""" Constructor
"""
self.writers = []
[docs]
@log_exceptions()
def configure(self,
population: Population,
initial_sweeps: typing.List[AbstractSweep],
sweeps: typing.List[AbstractSweep],
sim_params: typing.Dict,
file_params: typing.Dict,
inf_history_params: typing.Dict = None):
"""Initialise a population structure for use in the simulation.
sim_params Contains:
* `simulation_start_time`: The initial time for the simulation
* `simulation_end_time`: The final time to stop the simulation
* `initial_infected_number`: The initial number of infected \
individuals in the population
* `simulation_seed`: Random seed for reproducible simulations
* `include_waning`: Boolean to determine whether immunity waning \
is included in the simulation
file_params Contains:
* `output_file`: String for the name of the output .csv file
* `output_dir`: String for the location of the output file, \
as a relative path
* `spatial_output`: Boolean to determine whether a spatial output \
should be used
* `age_stratified`: Boolean to determine whether the output will \
be age stratified
inf_history_params Contains:
* `output_dir`: String for the location for the output files, \
as a relative path
* `status_output`: Boolean to determine whether we need \
a csv file containing infection status values
* `infectiousness_output`: Boolean to determine whether we need \
a csv file containing infectiousness (viral load) values
* `secondary_infections_output`: Boolean to determine whether we \
need a csv file containing secondary infections and R_t values
* `serial_interval_output`: Boolean to determine whether we \
need a csv file containing serial interval data
* `generation_time_output`: Boolean to determine whether we \
need a csv file containing generation time data
* `compress`: Boolean to determine whether we compress \
the infection history csv files
Parameters
----------
population : Population
Population structure for the model
pop_params : dict
Dictionary of parameter specific to the population
initial_sweeps : typing.List
List of sweeps used to initialise the simulation
sweeps : typing.List
List of sweeps used in the simulation. Queue sweep and host
progression sweep should appear at the end of the list
sim_params : dict
Dictionary of parameters specific to the simulation used and used
as input for call method of initial sweeps
file_params : dict
Dictionary of parameters specific to the output file
inf_history_params : dict
This is short for 'infection history file parameters' and we will
use the abbreviation 'ih' to refer to infection history throughout
this class. If `status_output`, `infectiousness_output`,
`secondary_infections_output`, `serial_interval_output` and
`generation_time_output` are all False, then no infection history
csv files are produced (or if the dictionary is None). These files
contain the infection status, infectiousness and secondary
infection counts of each person every time step respectively. The
EpiOS tool (https://github.com/SABS-R3-Epidemiology/EpiOS) samples
data from these files to mimic real life epidemic sampling
techniques. These files can be compressed when 'compress' is True,
reducing the size of these files.
"""
self.sim_params = sim_params
self.population = population
self.initial_sweeps = initial_sweeps
self.sweeps = sweeps
self.spatial_output = file_params["spatial_output"] \
if "spatial_output" in file_params else False
self.age_stratified = file_params["age_stratified"] \
if "age_stratified" in file_params else False
Parameters.instance().use_ages = self.age_stratified
self.include_waning = sim_params["include_waning"] \
if "include_waning" in sim_params else False
Parameters.instance().use_waning_immunity = self.include_waning
# If random seed is specified in parameters, set this in numpy
if "simulation_seed" in self.sim_params:
Simulation.set_random_seed(self.sim_params["simulation_seed"])
# Initial sweeps configure the population by changing the type,
# infection status, infectiousness or susceptibility of people
# or places. Only runs on the first timestep.
for s in initial_sweeps + sweeps:
s.bind_population(self.population)
logging.info(f"Bound sweep {s.__class__.__name__} to"
+ " population")
# General sweeps run through the population on every timestep, and
# include host progression and spatial infections.
folder = os.path.join(os.getcwd(),
file_params["output_dir"])
filename = file_params["output_file"]
logging.info(
f"Set output location to {os.path.join(folder, filename)}")
# Setting up writer for infection status distribution for each cell
output_titles = ["time"] + [s for s in InfectionStatus]
if self.spatial_output:
output_titles.insert(1, "cell")
output_titles.insert(2, "location_x")
output_titles.insert(3, "location_y")
if self.age_stratified:
output_titles.insert(1, "age_group")
self.writer = _CsvDictWriter(
folder, filename,
output_titles)
self.status_output = False
self.infectiousness_output = False
self.secondary_infections_output = False
self.serial_interval_output = False
self.generation_time_output = False
self.ih_status_writer = None
self.ih_infectiousness_writer = None
self.secondary_infections_writer = None
self.serial_interval_writer = None
self.generation_time_writer = None
self.compress = False
if inf_history_params:
# Setting up writer for infection history for each person. If the
# inf_history_params dict is empty then we do not need to record
# this
self.status_output = inf_history_params.get("status_output")
self.infectiousness_output = inf_history_params\
.get("infectiousness_output")
self.secondary_infections_output = inf_history_params\
.get("secondary_infections_output")
self.serial_interval_output = inf_history_params \
.get("serial_interval_output")
self.generation_time_output = inf_history_params \
.get("generation_time_output")
self.compress = inf_history_params.get("compress", False)
person_ids = []
person_ids += [person.id for cell in population.cells for person
in cell.persons]
self.ih_output_titles = ["time"] + person_ids
self.Rt_output_titles = ["time"] + person_ids + ["R_t"]
ts = 1 / Parameters.instance().time_steps_per_day
times = np.arange(self.sim_params["simulation_start_time"],
self.sim_params["simulation_end_time"] + ts,
ts).tolist()
self.si_output_titles = times
ih_folder = os.path.join(os.getcwd(),
inf_history_params["output_dir"])
if not (self.status_output or self.infectiousness_output
or self.secondary_infections_output
or self.serial_interval_output
or self.generation_time_output):
logging.warning("status_output, infectiousness_output, "
+ "secondary_infections_output, "
+ "serial_interval_output and "
+ "generation_time_output are False. "
+ "No infection history csvs will be created.")
if self.status_output:
ih_file_name = "inf_status_history.csv"
logging.info(
f"Set infection history infection status location to "
f"{os.path.join(ih_folder, ih_file_name)}")
self.ih_status_writer = _CsvDictWriter(
ih_folder, ih_file_name,
self.ih_output_titles
)
if self.infectiousness_output:
ih_file_name = "infectiousness_history.csv"
logging.info(
f"Set infection history infectiousness location to "
f"{os.path.join(ih_folder, ih_file_name)}")
self.ih_infectiousness_writer = _CsvDictWriter(
ih_folder, ih_file_name,
self.ih_output_titles
)
if self.secondary_infections_output:
ih_file_name = "secondary_infections.csv"
logging.info(
f"Set secondary infections (R_t) location to "
f"{os.path.join(ih_folder, ih_file_name)}")
self.secondary_infections_writer = _CsvDictWriter(
ih_folder, ih_file_name,
self.Rt_output_titles
)
if self.serial_interval_output:
file_name = "serial_intervals.csv"
logging.info(
f"Set serial interval location to "
f"{os.path.join(ih_folder, file_name)}")
self.serial_interval_writer = _CsvDictWriter(
ih_folder, file_name,
self.si_output_titles
)
if self.generation_time_output:
file_name = "generation_times.csv"
logging.info(
f"Set generation time location to "
f"{os.path.join(ih_folder, file_name)}")
self.generation_time_writer = _CsvDictWriter(
ih_folder, file_name,
self.si_output_titles
)
[docs]
@log_exceptions()
def run_sweeps(self):
"""Iteration step of the simulation. First the initialisation sweeps
configure the population on the first timestep. Then at each
subsequent timestep the sweeps run, updating the population. At each
timepoint, a count of each infection status is written to file. Note
that the elements of initial sweeps take the sim_params dict as an
argument for their call method but the elements of sweeps take time
as an argument for their call method.
"""
# Define time step between sweeps
ts = 1 / Parameters.instance().time_steps_per_day
# Initialise on the time step before starting.
for sweep in self.initial_sweeps:
sweep(self.sim_params)
logging.info("Initial Sweeps Completed at time "
+ f"{self.sim_params['simulation_start_time']} days")
# First entry of the data file is the initial state
self.write_to_file(self.sim_params["simulation_start_time"])
if self.ih_status_writer:
self.write_to_ih_file(self.sim_params["simulation_start_time"],
output_option="status")
if self.ih_infectiousness_writer:
self.write_to_ih_file(self.sim_params["simulation_start_time"],
output_option="infectiousness")
times = np.arange(self.sim_params["simulation_start_time"] + ts,
self.sim_params["simulation_end_time"] + ts,
ts)
for t in tqdm(times):
for sweep in self.sweeps:
sweep(t)
self.write_to_file(t)
if self.ih_status_writer:
self.write_to_ih_file(t, output_option="status")
if self.ih_infectiousness_writer:
self.write_to_ih_file(t, output_option="infectiousness")
for writer in self.writers:
writer.write(t, self.population)
logging.debug(f'Iteration at time {t} days completed')
logging.info(f"Final time {t} days reached")
if self.secondary_infections_writer:
self.write_to_Rt_file(times)
if self.serial_interval_writer:
self.write_to_serial_interval_file(times)
if self.generation_time_writer:
self.write_to_generation_time_file(times)
[docs]
def write_to_file(self, time):
"""Records the count number of a given list of infection statuses
and writes these to file.
Parameters
----------
time : float
Time of output data
"""
if Parameters.instance().use_ages:
nb_age_groups = len(Parameters.instance().age_proportions)
else:
nb_age_groups = 1
if Parameters.instance().use_ages:
if self.spatial_output: # Separate output line for each cell
for cell in self.population.cells:
for age_i in range(0, nb_age_groups):
data = {s: 0 for s in list(InfectionStatus)}
for inf_status in list(InfectionStatus):
data_per_inf_status = \
cell.compartment_counter.retrieve()[inf_status]
data[inf_status] += data_per_inf_status[age_i]
# Age groups are numbered from 1 to the total number
# of age groups (thus the +1):
data["age_group"] = age_i + 1
data["time"] = time
data["cell"] = cell.id
data["location_x"] = cell.location[0]
data["location_y"] = cell.location[1]
self.writer.write(data)
else: # Summed output across all cells in population
data = {s: 0 for s in list(InfectionStatus)}
for cell in self.population.cells:
for age_i in range(0, nb_age_groups):
for inf_status in list(InfectionStatus):
data_per_inf_status = \
cell.compartment_counter.retrieve()[inf_status]
data[inf_status] += data_per_inf_status[age_i]
data["age_group"] = age_i + 1
data["time"] = time
self.writer.write(data)
else: # If age not considered, age_group not written in csv
if self.spatial_output: # Separate output line for each cell
for cell in self.population.cells:
data = {s: 0 for s in list(InfectionStatus)}
for k in data:
data[k] += sum(cell.compartment_counter.retrieve()[k])
data["time"] = time
data["cell"] = cell.id
data["location_x"] = cell.location[0]
data["location_y"] = cell.location[1]
self.writer.write(data)
else: # Summed output across all cells in population
data = {s: 0 for s in list(InfectionStatus)}
for cell in self.population.cells:
for k in data:
# Sum across age compartments
data[k] += sum(cell.compartment_counter.retrieve()[k])
data["time"] = time
self.writer.write(data)
[docs]
def write_to_ih_file(self, time, output_option: str):
"""Records the infection history of the individual people
and writes these to file.
Parameters
----------
time : float
Time of output data
output_option : str
Determines if you write data of infection status where \
output_option="status" and/or infectiousness where \
output_option="infectiousness"
"""
if self.status_output and output_option == "status":
ih_data = {column: 0 for column in
self.ih_status_writer.fieldnames}
for cell in self.population.cells:
for person in cell.persons:
ih_data[person.id] = person.infection_status.value
ih_data["time"] = time
self.ih_status_writer.write(ih_data)
if self.infectiousness_output and output_option == "infectiousness":
infect_data = {column: 0 for column in
self.ih_infectiousness_writer.fieldnames}
for cell in self.population.cells:
for person in cell.persons:
infect_data[person.id] = person.infectiousness
infect_data["time"] = time
self.ih_infectiousness_writer.write(infect_data)
[docs]
def write_to_Rt_file(self, times: np.array):
"""Records the number of secondary infections of each `Person` at the
time they first became infected. Each `Person` may have multiple
entries if they have been infected multiple times. Also records the
R_t value for each time step.
Parameters
----------
times : np.array
An array of all time steps of the simulation
"""
# Initialise the dataframe
all_times = np.hstack((np.array(self
.sim_params["simulation_start_time"]),
times))
data_dict = {"time": all_times}
for cell in self.population.cells:
for person in cell.persons:
person_data = np.empty(len(all_times))
# Initialise the person column as a column of NaNs
person_data[:] = np.nan
# The only non-NaN entries will be at the time steps in which
# the person was infected, and each entry will represent the
# number of secondary cases the person accumulated for that
# specific infection (if they have been infected multiple
# times)
for j in range(person.num_times_infected):
person_data[int(person.infection_start_times[j])] = \
person.secondary_infections_counts[j]
data_dict[person.id] = person_data
# Change to dataframe to record the R_t values and to get the data
# in a list of dicts format
df = pd.DataFrame(data_dict)
df["R_t"] = np.nanmean(df.iloc[:, 1:].to_numpy(), axis=1)
# The below is a list of dictionaries for each time step
list_of_dicts = df.to_dict(orient='records')
for dict_row in list_of_dicts:
# Write each time step in dictionary form
self.secondary_infections_writer.write(dict_row)
[docs]
def write_to_serial_interval_file(self, times: np.array):
"""Records the intervals between an infector and an infectee getting
infected to provide an overall serial interval for each time-step of
the epidemic. This can be used as a histogram of values for each
time step.
Parameters
----------
times : np.array
An array of all time steps of the simulation
"""
# Initialise the dataframe
all_times = np.hstack((np.array(self
.sim_params["simulation_start_time"]),
times))
data_dict = {time: [] for time in all_times}
for cell in self.population.cells:
for person in cell.persons:
# For every time the person was infected, add their list of
# serial intervals to the timepoint at which their infector
# became infected
for t_inf, intervals in person.serial_interval_dict.items():
data_dict[t_inf] += intervals
# Here we will fill out the rest of the dataframe with NaN values,
# as all lists will have different lengths
max_list_length = max([len(intervals)
for intervals in data_dict.values()])
for t in data_dict.keys():
data_dict[t] += ([np.nan] * (max_list_length - len(data_dict[t])))
# Change to dataframe to get the data in a list of dicts format
df = pd.DataFrame(data_dict)
list_of_dicts = df.to_dict(orient='records')
for row_t in list_of_dicts:
self.serial_interval_writer.write(row_t)
[docs]
def write_to_generation_time_file(self, times: np.array):
"""Records the intervals between an infector and an infectee getting
exposed to provide an overall generation time for each time-step of
the epidemic. This can be used as a histogram of values for each
time step.
Parameters
----------
times : np.array
An array of all time steps of the simulation
"""
# Initialise the dataframe
all_times = np.hstack((np.array(self
.sim_params["simulation_start_time"]),
times))
data_dict = {time: [] for time in all_times}
for cell in self.population.cells:
for person in cell.persons:
# For every time the person was infected, add their list of
# generation times to the timepoint at which their infector
# became exposed
for t_inf, intervals in person.generation_time_dict.items():
data_dict[t_inf] += intervals
# Here we will fill out the rest of the dataframe with NaN values,
# as all lists will have different lengths
max_list_length = max([len(intervals)
for intervals in data_dict.values()])
for t in data_dict.keys():
data_dict[t] += ([np.nan] * (max_list_length - len(data_dict[t])))
# Change to dataframe to get the data in a list of dicts format
df = pd.DataFrame(data_dict)
list_of_dicts = df.to_dict(orient='records')
for row_t in list_of_dicts:
self.generation_time_writer.write(row_t)
def add_writer(self, writer: AbstractReporter):
self.writers.append(writer)
[docs]
def compress_csv(self):
"""Compresses the infection history csvs when they are written.
"""
if self.compress and self.ih_status_writer:
self.ih_status_writer.compress()
if self.compress and self.ih_infectiousness_writer:
self.ih_infectiousness_writer.compress()
if self.compress and self.secondary_infections_writer:
self.secondary_infections_writer.compress()
[docs]
@staticmethod
def set_random_seed(seed):
""" Set random seed for all subsequent operations. Should be used
before population configuration to control this process as well.
Parameters
----------
seed : int
Seed for RandomState
"""
random.seed(seed)
np.random.seed(seed)
logging.info(f"Set simulation random seed to: {seed}")