Source code for pyEpiabm.py2c.py2c_population

import time
from pyEpiabm.core import Population
from pyEpiabm.property import InfectionStatus


[docs] def py2c_convert_population(py_population: Population, c_factory, c_status_map): return _py2c_converter(py_population, c_factory, c_status_map).c_population
class _Timer: def __init__(self, name: str): self.name = name self.t = time.perf_counter() def __del__(self): print(f'{self.name} took {time.perf_counter()-self.t}s') class _py2c_converter: def __init__(self, py_population: Population, c_factory, c_status_map): self.py_population = py_population self.c_factory = c_factory self.c_population = None self.c_status_map = c_status_map self._index_population() self._validate_households() self._copy_structure() self._add_people() self._configure_people() self._configure_households() self._link_places() def _index_population(self): _ = _Timer("_index_population") place_i = 0 for c_i, cell in enumerate(self.py_population.cells): cell._index = c_i # Index cell person_index = 0 self.households = set() for mc_i, m_cell in enumerate(cell.microcells): # Index microcells m_cell._index = (c_i, mc_i) assert m_cell.cell._index == c_i, \ "Microcell incorrectly linked to cell" for person in m_cell.persons: # Index people assert person.microcell.cell == cell, \ "Person incorrectly linked to microcell\ outside of cell" assert person.microcell == m_cell assert not hasattr(person, "_index"), \ "Person already indexed (is person in two microcells?)" person._index = (c_i, person.microcell._index[1], person_index) person_index += 1 if person.household is not None: person.household._microcell_index = None self.households.add(person.household) for place in cell.places: # cEpiabm stores places alongside population not cell place._index = place_i place_i += 1 self.n_places = place_i def _validate_households(self): _ = _Timer("_validate_households") for household in self.households: for person in household.persons: if household._microcell_index is None: household._microcell_index = person.microcell._index else: assert household._microcell_index ==\ person.microcell._index, \ ("Household cannot link two people" + " in different microcells.") if household._microcell_index is None: print("Warning: Empty Household exists.") for cell in self.py_population.cells: for m_cell in cell.microcells: m_cell._n_households = 0 for household in self.households: id = household._microcell_index if id is None: continue self.py_population.cells[id[0]].microcells[id[1]]._n_households\ += 1 def _copy_structure(self): _ = _Timer("_copy_structure") self.c_population = self.c_factory.make_empty_population() n_cells = len(self.py_population.cells) self.c_factory.add_cells(self.c_population, n_cells) self.c_factory.add_places(self.c_population, self.n_places) for py_cell, c_cell in zip( self.py_population.cells, self.c_population.cells()): if py_cell.location is not None: c_cell.set_location(py_cell.location) n_microcells = len(py_cell.microcells) self.c_factory.add_microcells(c_cell, n_microcells) for py_mcell, c_mcell in zip( py_cell.microcells, c_cell.microcells()): self.c_factory.add_households(c_mcell, py_mcell._n_households) def _add_people(self): _ = _Timer("_add_people") for py_cell in self.py_population.cells: c_cell = self.c_population.get_cell(py_cell._index) for py_mcell in py_cell.microcells: c_mcell = c_cell.get_microcell(py_mcell._index[1]) self.c_factory.add_persons( c_cell, c_mcell, len(py_mcell.persons)) def _configure_people(self): _ = _Timer("_configure_people") for py_cell, c_cell in zip( self.py_population.cells, self.c_population.cells()): assert py_cell._index == c_cell.index() for py_person, c_person in zip(py_cell.persons, c_cell.persons()): (c_i, mc_i, p_i) = py_person._index assert c_i == py_cell._index params = c_person.params() params.infectiousness = py_person.infectiousness params.initial_infectiousness =\ py_person.initial_infectiousness params.susceptibility = 1.0 # Is this correct? each # person doesn't have their own susceptibility? params.age_group = py_person.age_group \ if py_person.age_group is not None else 0 params.next_status_time = int( py_person.time_of_status_change) \ if py_person.time_of_status_change is not None else 0 if py_person.next_infection_status is not None: params.next_status =\ self.c_status_map[py_person.next_infection_status] c_person.set_status( self.c_status_map[py_person.infection_status]) if py_person.infection_status == InfectionStatus.Susceptible: c_cell.mark_non_infectious(c_i) elif py_person.infection_status == InfectionStatus.Exposed: c_cell.mark_exposed(c_i) elif py_person.infection_status == InfectionStatus.Recovered: c_cell.mark_recovered(c_i) elif py_person.infection_status == InfectionStatus.Dead: c_cell.mark_dead(c_i) else: c_cell.mark_infectious(c_i) def _configure_households(self): _ = _Timer("_configure_households") index_counter = {} for _, py_household in enumerate(self.households): (c_i, mc_i) = py_household._microcell_index c_cell = self.c_population.get_cell(c_i) c_mcell = c_cell.get_microcell(mc_i) if py_household._microcell_index in index_counter: index_counter[py_household._microcell_index] += 1 else: index_counter[py_household._microcell_index] = 0 hh_i = index_counter[py_household._microcell_index] py_household._microcell_index = (c_i, mc_i, hh_i) c_household = c_mcell.get_household(hh_i) # Link People for py_person in py_household.persons: c_person = c_cell.get_person(py_person._index[2]) assert c_person.set_household(hh_i), \ "Person was already in a different household." c_household.add_member(c_person.microcell_pos()) # Configure Household Parameters params = c_household.params() params.infectiousness = py_household.infectiousness params.susceptibility = py_household.susceptibility params.location = py_household.location def _link_places(self): _ = _Timer("_link_places") for py_cell in self.py_population.cells: for py_place in py_cell.places: c_place = self.c_population.get_place(py_place._index) assert c_place.index() == py_place._index for group, persons in py_place.person_groups.items(): for py_person in persons: (c_i, mc_i, p_i) = py_person._index c_cell = self.c_population.get_cell(c_i) c_cell.get_person(p_i).add_place( self.c_population, c_cell, c_place.index(), group)