#
# Person Class
#
import random
import re
from pyEpiabm.property import InfectionStatus
from .parameters import Parameters
[docs]
class Person:
"""Class to represent each person in a population.
Parameters
----------
microcell : Microcell
An instance of an :class:`Microcell`
Attributes
----------
infection_status : InfectionStatus
Person's current infection status
next_infection_status : InfectionStatus
Person's next infection status after current one
time_of_status_change: int
Time when person's infection status is updated
"""
def __init__(self, microcell, age_group=None):
"""Constructor Method.
Parameters
----------
microcell : Microcell
Person's parent :class:`Microcell` instance
age_group : int or None
Integer identifying persons age group or None if random age should
be assigned
"""
self.initial_infectiousness = 0
self.infectiousness = 0
self.microcell = microcell
self.infection_status = InfectionStatus.Susceptible
self.household = None
self.places = []
self.place_types = []
self.next_infection_status = None
self.time_of_status_change = None
self.infection_start_times = []
self.secondary_infections_counts = []
self.time_of_recovery = None
self.num_times_infected = 0
self.latent_period = None
self.exposure_period = None
self.infector_latent_period = None
self.serial_interval_dict = {}
self.generation_time_dict = {}
self.care_home_resident = False
self.key_worker = False
self.date_positive = None
self.is_vaccinated = False
self.id = self.microcell.id + "." + "." + \
str(len(self.microcell.persons))
self.set_random_age(age_group)
[docs]
def set_random_age(self, age_group=None):
"""Set random age of person, and save index of their age group.
Note that the max age in the 80+ group is 84 here, however the precise
age of 80+ people is never used (exact ages are only used to assign
school/workplaces) so this does not cause any issues.
"""
if Parameters.instance().use_ages:
if age_group is None:
group_probs = Parameters.instance().age_proportions
self.age_group = random.choices(range(len(group_probs)),
weights=group_probs)[0]
else:
self.age_group = age_group
self.age = random.randint(0, 4) + 5 * self.age_group
else:
self.age = None
# If age is not used in the model, then every person is in the
# same age group (to conserve same output structure)
self.age_group = 0
[docs]
def update_age_group(self):
"""Update the age_group attribute based on the current age value.
This should be called after the age is set directly (e.g., in household
age assignment) to ensure the age_group is consistent with the age.
"""
if Parameters.instance().use_ages and self.age is not None:
# Assign age groups in 5-year bands, with all ages 80 and above
# in the last group (index 16)
self.age_group = min(self.age // 5, 16)
[docs]
def is_symptomatic(self):
"""Query if the person is currently symptomatic.
Returns
-------
bool
Whether person is currently symptomatic
"""
return Person.is_infectious(self) and self.infection_status != \
InfectionStatus.InfectASympt
[docs]
def is_infectious(self):
"""Query if the person is currently infectious.
Returns
-------
bool
Whether person is currently infectious
"""
return str(self.infection_status).startswith('InfectionStatus.Infect')
[docs]
def is_susceptible(self):
"""Query if the person is currently susceptible.
Returns
-------
bool
Whether person is currently susceptible
"""
return self.infection_status == InfectionStatus. \
Susceptible
def __repr__(self):
"""Returns a string representation of Person.
Returns
-------
str
String representation of person
"""
return f"Person ({self.id}), " \
f"Age = {self.age}, Status = {self.infection_status}."
[docs]
def update_status(self,
new_status: InfectionStatus) -> None:
"""Update Person's Infection Status.
Parameters
----------
new_status : InfectionStatus
Person's new status
"""
self.microcell.notify_person_status_change(
self.infection_status, new_status, self.age_group)
self.infection_status = new_status
if self.infection_status == InfectionStatus.Susceptible and \
self.household is not None:
self.household.add_susceptible_person(self)
if self.infection_status == InfectionStatus.Exposed:
if self.household is not None:
self.household.remove_susceptible_person(self)
[docs]
def add_place(self, place, person_group: int = 0):
"""Method adds a place to the place list if the person visits
or is associated with this place. Places are saved as a tuple
with the place as the first entry and the group the person is
associated with as the second.
Parameters
----------
place: Place
Place person should be added to
person_group : int
Key for the person group dictionary
"""
if place.cell != self.microcell.cell:
raise AttributeError("Place and person are not in the same\
cell")
self.places.append((place, person_group))
self.place_types.append(place.place_type)
[docs]
def remove_place(self, place):
"""Method to remove person for each associated place, to be
used when updating places.
Parameters
----------
place: Place
Place person should be removed from
"""
place_list = [i[0] for i in self.places]
if place not in place_list:
raise KeyError("Person not found in this place")
else:
ind = place_list.index(place)
self.places.pop(ind)
self.place_types.pop(ind)
[docs]
def is_place_closed(self, closure_place_type):
"""Method to check if any of the place in the person's place list
will be closed based on the place type, to be
used when place closure intervention is active.
Parameters
----------
closure_place_type: a list of PlaceType
PlaceType should be closed if in place closure intervention
"""
if (hasattr(self.microcell, 'closure_start_time')) and (
self.microcell.closure_start_time is not None):
for place_type in self.place_types:
if place_type.value in closure_place_type:
return True
return False
[docs]
def vaccinate(self, time):
"""Used to set a persons vaccination status to vaccinated
if they are drawn from the vaccine queue.
"""
self.is_vaccinated = True
self.date_vaccinated = time
[docs]
def remove_person(self):
"""Method to remove Person object from population.
Used to remove travellers from the population.
"""
self.microcell.cell.compartment_counter. \
_increment_compartment(-1, self.infection_status,
self.age_group)
self.microcell.compartment_counter. \
_increment_compartment(-1, self.infection_status,
self.age_group)
self.microcell.cell.persons.remove(self)
self.microcell.persons.remove(self)
self.household.persons.remove(self)
[docs]
def set_id(self, id: str):
"""Updates id of current person (i.e. for input from file).
id format: 4.3.2.1 represents cell 4, microcell 3 within this cell,
household 2 within this microcell, and person 1 within this
household. The id will only be changed if it is of the correct format.
Parameters
----------
id : str
Identity of person
"""
# Ensure id is a string
if not isinstance(id, str):
raise TypeError("Provided id must be a string")
# This regex will match on any string which takes the form "i.j.k.l"
# where i, j, k and l are integers (k can be empty)
if not re.match("^-?\\d+\\.-?\\d+\\.-?\\d*\\.-?\\d+$", id):
raise ValueError(f"Invalid id: {id}. id must be of the form "
f"'i.j.k.l' where i, j, k, l are integers (k"
f"can be empty)")
# Finally, check for duplicates
person_ids = [person.id for person in self.microcell.persons]
if id in person_ids:
raise ValueError(f"Duplicate id: {id}.")
self.id = id
[docs]
def set_time_of_recovery(self, time: float):
"""Records the time at which a person enters the Recovered compartment.
Parameters
----------
time : float
Current simulation time
"""
self.time_of_recovery = time
[docs]
def increment_num_times_infected(self):
"""Increments the number of times the person has been Infected as a
useful parameter to keep track of.
"""
self.num_times_infected += 1
[docs]
def increment_secondary_infections(self):
"""Increments the number of secondary infections the given person has
for this specific infection period (i.e. if the given person has been
infected multiple times, then we only increment the current secondary
infection count).
"""
try:
self.secondary_infections_counts[-1] += 1
except IndexError:
raise RuntimeError("Cannot call increment_secondary_infections "
"while secondary_infections_counts is empty")
[docs]
def set_latent_period(self, latent_period: float):
"""Sets the latent period of the current Person.
Parameters
----------
latent_period : float
The time between the exposure and infection onset of the current
Person.
"""
self.latent_period = latent_period
[docs]
def set_exposure_period(self, exposure_period: float):
"""Sets the exposure period (we define here as the time between a
primary case infection and a secondary case exposure, with the current
`Person` being the secondary case). We store this to be added to the
latent period of the infection to give a serial interval.
Parameters
----------
exposure_period : float
The time between the infector's time of infection and the time
of exposure to the current Person
"""
self.exposure_period = exposure_period
[docs]
def set_infector_latent_period(self, latent_period: float):
"""Sets the latent period of the primary infector of this Person. We
store this in order to calculate the generation_time of the interaction
between infector and infectee.
Parameters
----------
latent_period : float
The latency period of the primary infector (the individual who
infected the current Person).
"""
self.infector_latent_period = latent_period
[docs]
def store_serial_interval(self):
"""Adds the `latent_period` to the current `exposure_period` to give
a `serial_interval`, which will be stored in the
`serial_interval_dict`. The serial interval is the time between a
primary case infection and a secondary case infection. This method
is called immediately after a person becomes exposed.
"""
# This method has been called erroneously if the latent period or
# exposure period is None
if self.exposure_period is None:
raise RuntimeError("Cannot call store_serial_interval while the"
" exposure_period is None")
elif self.latent_period is None:
raise RuntimeError("Cannot call store_serial_interval while the"
" latent_period is None")
serial_interval = self.exposure_period + self.latent_period
# The reference day is the day the primary case was first infected
# This is what we will store in the dictionary
reference_day = self.time_of_status_change - serial_interval
try:
(self.serial_interval_dict[reference_day]
.append(serial_interval))
except KeyError:
self.serial_interval_dict[reference_day] = [serial_interval]
# Reset the exposure period for the next infection
self.exposure_period = None
[docs]
def store_generation_time(self):
"""Adds the `infector_latent_period` to the current
`exposure_period` to give a `generation_time`, which will be stored
in the `generation_time_dict`. The generation time is the time between
a primary case exposure and a secondary case exposure. This method
is called immediately after the infectee becomes exposed.
"""
# This method has been called erroneously if the exposure period is
# None or if the latent period of primary infector is None
if self.exposure_period is None:
raise RuntimeError("Cannot call store_generation_time while the"
" exposure_period is None")
elif self.latent_period is None:
raise RuntimeError("Cannot call store_generation_time while the"
" latent_period is None")
elif self.infector_latent_period is None:
if self.time_of_status_change - self.latent_period - \
self.exposure_period <= 0.0:
# We do not record the generation time if the infector has
# no latent period (if their time of infection was day 0)
return
raise RuntimeError("Cannot call store_generation_time while the"
" infector_latent_period is None")
generation_time = self.exposure_period + self.infector_latent_period
# The reference day is the day the primary case was first exposed
# This is what we will store in the dictionary
reference_day = (self.time_of_status_change - self.latent_period
- generation_time)
try:
(self.generation_time_dict[reference_day]
.append(generation_time))
except KeyError:
self.generation_time_dict[reference_day] = [generation_time]
# Reset the latency period of the infector for the next infection
self.infector_latent_period = None