#
# Factory for creation of a population based on an input file
#
import numpy as np
import pandas as pd
import random
import copy
import logging
from packaging import version
from pyEpiabm.core import Cell, Microcell, Person, Population, Parameters
from pyEpiabm.property import InfectionStatus, PlaceType
from pyEpiabm.sweep import HostProgressionSweep, InitialHouseholdSweep
from pyEpiabm.utility import log_exceptions
[docs]
class FilePopulationFactory:
""" Class that creates a population based on an input .csv file.
"""
[docs]
@staticmethod
@log_exceptions()
def make_pop(input_file: str, random_seed: int = None, time: float = 0):
"""Initialize a population object from an input csv file, with one
row per microcell. A uniform multinomial distribution is
used to distribute the number of people into the different households
within each microcell. A random seed may be specified for reproducible
populations.
Input file contains columns:
* `cell`: ID code for cell
* `microcell`: ID code for microcell
* `location_x`: The x coordinate of the parent cell location
* `location_y`: The y coordinate of the parent cell location
* `household_number`: Number of households in that microcell
* `place_number`: Number of places in that microcell
* Any number of columns with titles from the `InfectionStatus` \
enum (such as `InfectionStatus.Susceptible`), giving the \
number of people with that status in that cell
Parameters
----------
input_file : str
Path to input file
random_seed : int
Seed for reproducible household and place distribution
time : float
Start time of simulation where this population is used (default 0)
Returns
-------
Population
Population object with individuals distributed into households
"""
# If random seed is specified in parameters, set this
if random_seed is not None:
np.random.seed(random_seed)
random.seed(random_seed)
logging.info(f"Set population random seed to: {random_seed}")
# Read file into pandas dataframe
input = pd.read_csv(filepath_or_buffer=input_file, dtype={"cell": int,
"microcell": int})
loc_given = ("location_x" and "location_y" in input.columns.values)
# Sort csv on cell and microcell ID
input = input.sort_values(by=["cell", "microcell"])
# Validate all column names in input
valid_names = ["cell", "microcell", "location_x",
"location_y", "household_number", "place_number"]
for col in input.columns.values: # Check all column headings
if not ((col in valid_names) or hasattr(InfectionStatus, col)):
raise ValueError(f"Unknown column heading '{col}'")
# Initialise a population class
new_pop = Population()
# Initialise sweep to assign new people their next infection status
host_sweep = HostProgressionSweep()
# Store current cell
current_cell = None
# Iterate through lines (one per microcell)
for line in input.itertuples():
# Converting from float to string
cell_id_csv = str(line.cell)
microcell_id_csv = cell_id_csv + "." + str(line.microcell)
# Check if cell exists, or create it
cell = FilePopulationFactory.find_cell(new_pop, cell_id_csv,
current_cell)
if current_cell != cell:
current_cell = cell
if loc_given:
location = (line.location_x, line.location_y)
cell.set_location(location)
# Raise error if microcell exists, then create new one
microcell_ids = [microcell.id for microcell in cell.microcells]
if microcell_id_csv in microcell_ids:
raise ValueError(f"Duplicate microcells: {microcell_id_csv}"
+ f" already exists in cell {cell.id}")
new_microcell = Microcell(cell)
new_microcell.set_id(microcell_id_csv)
cell.microcells.append(new_microcell)
for column in input.columns.values:
if hasattr(InfectionStatus, column):
value = getattr(InfectionStatus, column)
for _ in range(int(getattr(line, column))):
person = Person(new_microcell)
person.set_random_age()
new_microcell.add_person(person)
person.update_status(InfectionStatus(value))
if (person.infection_status
== InfectionStatus.Susceptible):
continue # Next status set upon infection
host_sweep.update_next_infection_status(person)
host_sweep.update_time_status_change(person, time)
if str(person.infection_status).startswith('Infect'):
HostProgressionSweep.set_infectiousness(person,
time)
# Add households and places to microcell
if len(Parameters.instance().household_size_distribution) == 0:
if (hasattr(line, 'household_number') and
line.household_number > 0):
households = int(line.household_number)
FilePopulationFactory.add_households(new_microcell,
households)
if hasattr(line, 'place_number') and line.place_number > 0:
for _ in range(int(line.place_number)):
new_microcell.add_place(1, cell.location,
random.choice(list(PlaceType)))
# if household_size_distribution parameters are available use
# appropriate function
if len(Parameters.instance().household_size_distribution) != 0:
InitialHouseholdSweep().household_allocation(new_pop)
# Verify all people are logged in cell
for cell in new_pop.cells:
updated_persons = [person for mcell in cell.microcells
for person in mcell.persons]
assert len(updated_persons) == len(cell.persons), \
"Person gone missing in microcell allocation"
logging.info(f"New Population from file {input_file} configured")
return new_pop
[docs]
@staticmethod
def find_cell(population: Population, cell_id: str, current_cell: Cell):
"""Returns cell with given ID in population, creates one if
current cell has another ID. As input is sorted on cell no
cell will exist with that ID.
Parameters
----------
population : Population
Population containing target cell
cell_id : str
ID for target cell
current_cell : Cell or None
Cell object of current cell
Returns
-------
Cell
Cell with given ID in population
"""
if (current_cell is not None) and (current_cell.id == cell_id):
return current_cell
new_cell = Cell()
population.cells.append(new_cell)
new_cell.set_id(cell_id, population.cells)
return new_cell
[docs]
@staticmethod
def add_households(microcell: Microcell, household_number: int):
"""Groups people in a microcell into households together.
Parameters
----------
microcell : Microcell
Microcell containing all person objects to be considered
for grouping
household_number : int
Number of households to form
"""
# Initialises another multinomial distribution
q = [1 / household_number] * household_number
people_list = microcell.persons.copy()
people_number = len(people_list)
household_split = np.random.multinomial(people_number, q,
size=1)[0]
for j in range(household_number):
people_in_household = household_split[j]
household_people = []
for i in range(people_in_household):
person_choice = people_list[0]
people_list.remove(person_choice)
household_people.append(person_choice)
microcell.add_household(household_people)
[docs]
@staticmethod
@log_exceptions()
def print_population(population: Population, output_file: str):
"""Outputs population as .csv file, in format usable by the make_pop()
method. Used for verification, or saving current simulation state. Note
the current household distribution is random, and so the seed for
household allocation must also be recorded to precisely save the
simulation state.
WARNING: This function is only tested with versions of pandas > 1.4,
and may not function correctly in older cases. This will include cases
where the user is running python 3.7 or older versions.
Parameters
----------
population : Population
Population object to output
output_file: str
Path to output file
"""
if version.parse(pd.__version__) < version.parse("1.4.0"):
logging.warning(f"Pandas version {pd.__version__} is outdated,"
+ " only tests version 1.4 and above.")
columns = ['cell', 'microcell', 'location_x', 'location_y',
'household_number', 'place_number']
for status in InfectionStatus:
columns.append(str(status.name))
df = pd.DataFrame(columns=columns)
for cell in population.cells:
for microcell in cell.microcells:
data_dict = {
"cell": cell.id,
"microcell": microcell.id,
"location_x": cell.location[0],
"location_y": cell.location[1],
}
inf_dict = {str(status.name): 0 for status in InfectionStatus}
data_dict.update(inf_dict)
for person in microcell.persons:
status = str(person.infection_status.name)
data_dict[status] += 1
data_dict['household_number'] = len(microcell.households)
data_dict['place_number'] = len(microcell.places)
new_row = pd.DataFrame(data=data_dict, columns=columns,
index=[0])
df = pd.concat([df, new_row], ignore_index=True) \
if df.size else new_row
df['household_number'] = df['household_number'].astype(int)
df['place_number'] = df['place_number'].astype(int)
for status in InfectionStatus:
df[str(status.name)] = df[str(status.name)].fillna(0)\
.astype(int)
if (df[str(status.name)] == 0).all(): # Delete unused statuses
df.drop(columns=str(status.name), inplace=True)
output_df = copy.copy(df) # To access dataframe in testing
output_df.to_csv(output_file, header=True, index=False)
logging.info(f"Population saved to location {output_file}")