Source code for pyEpiabm.sweep.host_progression_sweep

#
# Progression of infection within individuals
#
import random
import typing

import numpy as np
from collections import defaultdict

import pyEpiabm as pe
from pyEpiabm.core import Parameters, Person
from pyEpiabm.property import InfectionStatus

from .abstract_sweep import AbstractSweep
from .transition_matrices import StateTransitionMatrix, TransitionTimeMatrix


[docs] class HostProgressionSweep(AbstractSweep): """Class for sweeping through population and updating host infection status and time to next infection status change. """ def __init__(self): """Initialise parameters to be used in class methods. State transition matrix is set where each row of the matrix corresponds to a current infection status of a person. The columns of that row then indicate the transition probabilities to the remaining infection statuses. Number of infection states is set by taking the size of the InfectionStatus enum. Waning transition matrix is initialised and adapts the state transition matrix with rate multipliers relating to waning immunity which are called from the parameters class. Transition time matrix is also initialised and associated parameters are called from the parameters class. Infectiousness progression defines an array used to scale a person's infectiousness and which depends on time since the start of the infection, measured in timesteps (following what is done in Covidsim). """ # Instantiate state transition matrix use_ages = Parameters.instance().use_ages coefficients = defaultdict(int, Parameters.instance() .host_progression_lists) multipliers = defaultdict(list, Parameters.instance() .rate_multiplier_params) matrix_object = StateTransitionMatrix(coefficients, multipliers, use_ages) self.state_transition_matrix = matrix_object.matrix if pe.Parameters.instance().use_waning_immunity: self.waning_transition_matrix = matrix_object.waning_matrix self.number_of_states = len(InfectionStatus) assert self.state_transition_matrix.shape == \ (self.number_of_states, self.number_of_states), \ 'Matrix dimensions must match number of infection states' # Instantiate transmission time matrix time_matrix_object = TransitionTimeMatrix() self.transition_time_matrix = \ time_matrix_object.create_transition_time_matrix() # Instantiate parameters to be used in update transition time # method self.latent_to_symptom_delay = \ pe.Parameters.instance().latent_to_sympt_delay # Defining the length of the model time step (in days, can be a # fraction of day as well). self.model_time_step = 1 / pe.Parameters.instance().time_steps_per_day self.delay = np.floor(self.latent_to_symptom_delay / self.model_time_step) # Infectiousness progression # Instantiate parameters to be used in update infectiousness infectious_profile = pe.Parameters.instance().infectiousness_prof inf_prof_resolution = len(infectious_profile) - 1 inf_prof_average = np.average(infectious_profile) infectious_period = pe.Parameters.instance().asympt_infect_period # Extreme case where model time step would be too small max_inf_steps = 2550 # Define number of time steps a person is infectious: num_infectious_ts = \ int(np.ceil(infectious_period / self.model_time_step)) if num_infectious_ts >= max_inf_steps: raise ValueError('Number of timesteps in infectious period exceeds' + ' limit') # Initialisation infectious_profile[inf_prof_resolution] = 0 infectiousness_prog = np.zeros(max_inf_steps) # Fill infectiousness progression array by doing linear interpolation # of infectious_profile for i in range(num_infectious_ts): t = (((i * self.model_time_step) / infectious_period) * inf_prof_resolution) # Infectiousness value associated to infectiousness profile: associated_inf_value = int(np.floor(t)) t -= associated_inf_value if associated_inf_value < inf_prof_resolution: infectiousness_prog[i] = \ (infectious_profile[associated_inf_value] * (1 - t) + infectious_profile[associated_inf_value + 1] * t) else: # limit case where we define infectiousness to 0 infectiousness_prog[i] = \ infectious_profile[inf_prof_resolution] # Scaling scaling_param = inf_prof_average for i in range(num_infectious_ts + 1): infectiousness_prog[i] /= scaling_param self.infectiousness_progression = infectiousness_prog
[docs] @staticmethod def set_infectiousness(person: Person, time: float): """Assigns the initial infectiousness of a person for when they go from the exposed infection state to the next state, either InfectAsympt, InfectMild or InfectGP. Also assigns the infection start time and stores it as an attribute of the person. Called right after an exposed person has been given its new infection status in the call method below. This static method is non private as it is also used by the initial infected sweep to give new infected individuals an infectiousness. Parameters ---------- Person : Person Instance of person class with infection status attributes time : float Current simulation time """ init_infectiousness = np.random.gamma(1, 1) if person.infection_status == InfectionStatus.InfectASympt: infectiousness = (init_infectiousness * pe.Parameters.instance().asympt_infectiousness) person.initial_infectiousness = infectiousness elif (person.infection_status == InfectionStatus.InfectMild or person.infection_status == InfectionStatus.InfectGP): infectiousness = (init_infectiousness * pe.Parameters.instance().sympt_infectiousness) person.initial_infectiousness = infectiousness # Add new infection start time, increment number of times infected and # add a new entry to the secondary_infections_counts list person.infection_start_times.append(time) person.increment_num_times_infected() person.secondary_infections_counts.append(0) if person.infection_start_times[-1] < 0: raise ValueError('The infection start time cannot be negative')
[docs] def update_next_infection_status(self, person: Person, time: float = None): """Assigns next infection status based on current infection status and on probabilities of transition to different statuses. Weights are taken from row in state transition matrix that corresponds to the person's current infection status. Weights are then used in random.choices method to select person's next infection status. Exception is carehome residents who die with probability=1 if reach ICU and probability=1-'carehome_rel_prob_hosp' if reach hospital. Parameters ---------- person : Person Instance of person class with infection status attributes time : float Current simulation time (if necessary for the method, default = None) """ if person.infection_status in [InfectionStatus.Dead, InfectionStatus.Vaccinated]: person.next_infection_status = None return elif (person.infection_status == InfectionStatus.Recovered and not Parameters.instance().use_waning_immunity): person.next_infection_status = None return elif (person.care_home_resident and person.infection_status == InfectionStatus.InfectICU): person.next_infection_status = InfectionStatus.Dead return elif (person.care_home_resident and person.infection_status == InfectionStatus.InfectHosp): carehome_params = Parameters.instance().carehome_params carehome_hosp = carehome_params['carehome_rel_prob_hosp'] if random.uniform(0, 1) > carehome_hosp: person.next_infection_status = InfectionStatus.Dead return row_index = person.infection_status.name # If we are not using waning immunity or person.time_of_recovery is # None (so they have never reached Recovered) then we choose weights # from the state_transition_matrix. Otherwise, we use the # waning_transition_matrix. if (not Parameters.instance().use_waning_immunity or not person.time_of_recovery): weights = self.state_transition_matrix.loc[row_index].to_numpy() weights = [w[person.age_group] if isinstance(w, list) else w for w in weights] else: if time is None: raise ValueError("Simulation time must be passed to " "update_next_infection_status when waning " "immunity is active") weights = self._get_waning_weights(person, time) outcomes = range(1, self.number_of_states + 1) if len(weights) != len(outcomes): raise AssertionError('The number of infection statuses must' + ' match the number of transition' + ' probabilities') next_infection_status_number = random.choices(outcomes, weights)[0] next_infection_status = \ InfectionStatus(next_infection_status_number) person.next_infection_status = next_infection_status
def _get_waning_weights(self, person: Person, time: float) -> typing.List: """Given that the current person has previously recovered, this method will return a list of updated weights based on the level of immunity the person has. The weights taken from the waning_transition_matrix are lambda expressions parameterized by t (time_since_recovery). Parameters ---------- person : Person Instance of person class with infection status attributes time : float Current simulation time Returns ------- list: List of weights representing the probability of transitioning to a given compartment. """ row_index = person.infection_status.name time_since_recovery = time - person.time_of_recovery weights = list(self.waning_transition_matrix.loc[row_index]) # Note that below, each entry w will be a lambda expression returning # a np.array either representing a float (shape = ()) or a list # (shape = (n,)) hence the conditions. new_weights = [] for w in weights: if isinstance(w, (int, float)): new_weights.append(w) else: # This is evaluating the lambda expressions at t = # time_since_recovery w_evaluated = w(time_since_recovery) if w_evaluated.shape: new_weights.append(w_evaluated[person.age_group]) else: new_weights.append(w_evaluated) return new_weights
[docs] def update_time_status_change(self, person: Person, time: float): """Calculates transition time as calculated in CovidSim, and updates the time_of_status_change for the given Person, given as the time until next infection status for a person who has a new infection status. If it is expected that the person will not transition again (for example in Recovered or Dead statuses), then the time of status change is set to infinity. Parameters ---------- person : Person Instance of Person class with :class:`InfectionStatus` attributes time : float Current simulation time """ # Defines the transition time. If the person will not transition again, # the transition time is set to infinity. Else, the transition time is # defined using the TransitionTimeMatrix class, with the method # `choose` from the InverseCdf class. if person.infection_status == InfectionStatus.Susceptible: raise ValueError("Method should not be used to infect people") if person.infection_status in [InfectionStatus.Dead, InfectionStatus.Vaccinated]: transition_time = np.inf elif (person.infection_status == InfectionStatus.Recovered and not Parameters.instance().use_waning_immunity): transition_time = np.inf else: row_index = person.infection_status.name column_index = person.next_infection_status.name # Checks for susceptible to exposed case # where transition time is zero try: if person.infection_status != InfectionStatus.Recovered: transition_time_icdf_object = \ self.transition_time_matrix.loc[row_index, column_index] transition_time = \ transition_time_icdf_object.icdf_choose_noexp() else: # If someone is recovered, then their transition time # will be equal to 1 when waning immunity is turned on. # This means that everyone spends exactly 1 day in the # Recovered compartment with waning immunity transition_time = 1 except AttributeError as e: if "object has no attribute 'icdf_choose_noexp'" in str(e): transition_time = transition_time_icdf_object assert isinstance( transition_time_icdf_object, (float, int)), \ ("Entries of transition time matrix" + " must either be ICDF" + " objects or numbers") else: raise # Adds delay to transition time for first level symptomatic infection # statuses (InfectMild or InfectGP), as is done in CovidSim. if person.infection_status in [InfectionStatus.InfectMild, InfectionStatus.InfectGP]: transition_time += self.delay # Assigns the time of status change using current time and transition # time: if transition_time < 0: raise ValueError('New transition time must be larger than' + ' or equal to 0') person.time_of_status_change = time + transition_time # Finally, if the person is Exposed, we can store their latency period # as the transition_time. This can be used for calculating the serial # interval. We can also store their generation time in this step. if person.infection_status == InfectionStatus.Exposed: latent_period = transition_time person.set_latent_period(latent_period) person.store_generation_time() person.store_serial_interval()
def _updates_infectiousness(self, person: Person, time: float): """Updates infectiousness. Scales using the initial infectiousness if the person is in an infectious state. Updates the infectiousness to 0 if the person has just been transferred to Recovered or Dead. Doesn't do anything if the person was already in Recovered, Dead, Susceptible, or Exposed (ie if the infectiousness of the person was 0). Parameters ---------- Person : Person Instance of Person class with :class:`InfectionStatus`, initial infectiousness, and infection start time attributes time : float Current simulation time """ # Updates infectiousness with scaling if person is infectious: if str(person.infection_status).startswith('InfectionStatus.Infect'): scale_infectiousness = self.infectiousness_progression time_since_infection = (int((time - person.infection_start_times[-1]) / self.model_time_step)) person.infectiousness = person.initial_infectiousness *\ scale_infectiousness[time_since_infection] # Sets infectiousness to 0 if person just became Recovered, Dead, or # Vaccinated elif person.infectiousness != 0: if person.infection_status in [InfectionStatus.Recovered, InfectionStatus.Dead, InfectionStatus.Vaccinated]: person.infectiousness = 0
[docs] def __call__(self, time: float): """Sweeps through all people in the population, updates their infection status if it is time and assigns them their next infection status and the time of their next status change. Also updates their infectiousness. Parameters ---------- time : float Current simulation time """ # store list of uninfected or asymptomatic people for processing # for disease testing. asympt_or_uninf_people = [] for cell in self._population.cells: for person in cell.persons: if person.time_of_status_change is None: assert person.is_susceptible() asympt_or_uninf_people.append((cell, person)) continue # pragma: no cover if person.infection_status in [InfectionStatus.Recovered, InfectionStatus.Vaccinated]: asympt_or_uninf_people.append((cell, person)) while person.time_of_status_change <= time: person.update_status(person.next_infection_status) if person.infection_status in \ [InfectionStatus.InfectASympt, InfectionStatus.InfectMild, InfectionStatus.InfectGP]: self.set_infectiousness(person, time) if not person.is_symptomatic(): asympt_or_uninf_people.append((cell, person)) self.update_next_infection_status(person, time) if person.infection_status == InfectionStatus.Susceptible: person.time_of_status_change = None break elif person.infection_status == InfectionStatus.Recovered: person.set_time_of_recovery(time) self.update_time_status_change(person, time) self.sympt_testing_queue(cell, person) self._updates_infectiousness(person, time) self.asympt_uninf_testing_queue(asympt_or_uninf_people, time)
[docs] def sympt_testing_queue(self, cell, person: Person): """ Adds symptomatic people to a testing queue with a given probability depedent on their status as either a care home resident or a key worker. Detailed description of the implementation can be found in github wiki: https://github.com/SABS-R3-Epidemiology/epiabm/wiki/Interventions#testing Parameters ---------- cell : Cell cell for which the person is a member of and therefore will be added to the testing queue of. person : Person symptomatic inndividual to be added to a testing queue. """ if hasattr(Parameters.instance(), 'intervention_params'): if 'disease_testing' in Parameters.instance(). \ intervention_params.keys(): testing_params = Parameters.instance(). \ intervention_params['disease_testing'] r = random.random() type_r = random.random() if (person.is_symptomatic() and person.date_positive is None): if person.care_home_resident: test_probability = testing_params['testing_sympt'][0] type_probability = testing_params['sympt_pcr'][0] elif person.key_worker: test_probability = testing_params['testing_sympt'][1] type_probability = testing_params['sympt_pcr'][1] else: test_probability = testing_params['testing_sympt'][2] type_probability = testing_params['sympt_pcr'][2] if r < test_probability: if type_r < type_probability: cell.enqueue_PCR_testing(person) else: cell.enqueue_LFT_testing(person) if (person.date_positive is not None and (person.next_infection_status in [InfectionStatus.Dead, InfectionStatus.Recovered])): person.date_positive = None
[docs] def asympt_uninf_testing_queue(self, person_list: list, time): """ Adds asymptomatic and uninfected people to a testing queue with a given probability depedent on their status as either a care home resident or key worker. Detailed description of the implementation can be found in github wiki: https://github.com/SABS-R3-Epidemiology/epiabm/wiki/Interventions#testing Parameters ---------- person_list : list list of (cell, person) tuples giving the list of people to be added to a testing queue and their cell. time : float current time point to determine whether uninfected indivuals should stop being considered as positive. """ if hasattr(Parameters.instance(), 'intervention_params'): if 'disease_testing' in Parameters.instance(). \ intervention_params.keys(): testing_params = Parameters.instance(). \ intervention_params['disease_testing'] for item in person_list: cell = item[0] person = item[1] if person.is_symptomatic(): raise ValueError("Function should not be called on" + "symptomatic individuals.") r = random.random() type_r = random.random() if person.care_home_resident: test_probability = testing_params[ 'testing_asympt_uninf'][0] type_probability = testing_params[ 'asympt_uninf_pcr'][0] elif person.key_worker: test_probability = testing_params[ 'testing_asympt_uninf'][1] type_probability = testing_params[ 'asympt_uninf_pcr'][1] else: test_probability = testing_params[ 'testing_asympt_uninf'][2] type_probability = testing_params[ 'asympt_uninf_pcr'][2] if (r < test_probability and person.date_positive is None): if type_r < type_probability: cell.enqueue_PCR_testing(person) else: cell.enqueue_LFT_testing(person) elif (person.date_positive is not None and person.date_positive + 10 >= time): person.date_positive = None