import sys
import numpy as np
import abc
from abc import abstractmethod
from collections import defaultdict
import quantities as pq
if sys.version_info >= (3, 4):
ABC = abc.ABC
else:
ABC = abc.ABCMeta('ABC', (), {})
def _unpack_kernel_tuple(kernel):
if sys.version_info >= (3, 4):
from inspect import signature
else:
from funcsigs import signature
if not isinstance(kernel, tuple):
raise TypeError("kernel is not a tuple", type(kernel))
if "w" in str(signature(kernel[0])):
temporal, spatial = kernel[0], kernel[1]
spatiotemporal_kernel = lambda w, kx, ky: temporal(w) * spatial(kx, ky)
else:
temporal, spatial = kernel[1], kernel[0]
spatiotemporal_kernel = lambda w, kx, ky: temporal(w) * spatial(kx, ky)
return spatiotemporal_kernel
[docs]def closure_params(closure):
"""
Stores closure parameters in a dict
Parameters
----------
closure : function
A closure function
Returns
-------
out : dict
Dictionary
"""
if sys.version_info >= (3, 4):
pass
else:
from qualname import qualname
attrs = {}
for cell_name, cell in zip(closure.__code__.co_freevars,
closure.__closure__):
cell_contents = cell.cell_contents
if not callable(cell_contents):
if "params" not in attrs:
attrs["params"] = {}
attrs["params"][cell_name] = cell_contents
if sys.version_info >= (3, 4):
attrs["type"] = closure.__qualname__.split(".")[0]
else:
attrs["type"] = qualname(closure).split(".")[0]
else:
attrs[cell_name] = closure_params(cell_contents)
if sys.version_info >= (3, 4):
attrs[cell_name]["type"] = cell_contents.__qualname__.split("."
)[0]
else:
attrs[cell_name]["type"] = qualname(cell_contents).split(".")[0]
return attrs
##############################################################
# Integrator class
##############################################################
[docs]class Integrator:
"""
Integrator class for fast Fourier transform calculations.
Attributes
----------
times
positions
temporal_angular_freqs
spatial_angular_freqs
Nt : int
Number of spatial points.
Nr : int
Number of temporal points
dt : quantity scalar
Temporal resolution
dr : quantity scalar
Spatial resolution
dw : quantity scalar
Temporal frequency resolution
dk : quantity scalar
Spatial frequency resolution
"""
[docs] def __init__(self, nt, nr, dt, dr):
"""
Integrator constructor
Parameters
----------
nt : int
The power to raise 2 to. Number of temporal points is 2**nt.
nr : int
The power to raise 2 to. Number of spatial points is 2**nr.
dt : quantity scalar
Temporal resolution
dr : quantity scalar
Spatial resolution
"""
self.Nt = int(2**nt)
self.Nr = int(2**nr)
self.dt = dt.rescale("ms") if isinstance(dt, pq.Quantity) else dt * pq.ms
self.dr = dr.rescale("deg") if isinstance(dr, pq.Quantity) else dr * pq.deg
self.w_s = 2 * np.pi / self.dt
self.k_s = 2 * np.pi / self.dr
self.dw = self.w_s / self.Nt
self.dk = self.k_s / self.Nr
self._t_vec = np.linspace(0, self.Nt - 1, self.Nt) * self.dt
self._r_vec = np.linspace(-self.Nr/2, self.Nr/2 - 1, self.Nr) * self.dr
self._w_vec = np.fft.fftfreq(self.Nt, self.dt) * -2 * np.pi
self._k_vec = np.fft.fftfreq(self.Nr, self.dr) * 2 * np.pi
self._fft_factor = self.dt.magnitude * self.dr.magnitude**2
self._ifft_factor = 1. / self._fft_factor
[docs] def meshgrid(self):
"""
Spatiotemporal meshgrid
Returns
-------
out : t_vec, y_vec, x_vec: quantity arrays
time, x, and y values.
"""
return np.meshgrid(self._t_vec, self._r_vec, self._r_vec,
indexing='ij', sparse=True)
[docs] def freq_meshgrid(self):
"""
Frequency meshgrid
Returns
-------
out : w_vec, ky_vec, kx_vec: quantity arrays
temporal and spatial frequency values.
"""
return np.meshgrid(self._w_vec, self._k_vec,
np.abs(self._k_vec[:int(self.Nr/2+1)]),
indexing='ij', sparse=True)
@property
def times(self):
"""
Time array
Returns
-------
out : quantity array
times
"""
return self._t_vec
@property
def positions(self):
"""
Position array
Returns
-------
out : quantity array
positions
"""
return self._r_vec
@property
def temporal_angular_freqs(self):
"""
Temporal angular frequency array
Returns
-------
out : quantity array
temporal angular frequencies
"""
return self._w_vec.rescale(pq.kHz)
@property
def temporal_freqs(self):
"""
Temporal frequency array
Returns
-------
out : quantity array
temporal frequencies
"""
return self.temporal_angular_freqs / 2 / np.pi
@property
def spatial_angular_freqs(self):
"""
Spatial angular frequency array
Returns
-------
out : quantity array
spatial angular frequencies
"""
return self._k_vec
@property
def spatial_freqs(self):
"""
Spatial frequency array
Returns
-------
out : quantity array
spatial frequencies
"""
return self.spatial_angular_freqs / 2 / np.pi
[docs] def compute_inverse_fft(self, cube):
"""
Computes inverse fast Fourier transform.
Parameters
----------
cube : array_like
input array (3-dimensional)
Returns
-------
out : array_like
transformed array
"""
cube = np.fft.irfftn(cube)
return np.fft.fftshift(cube, axes=(1, 2)) * self._ifft_factor
[docs] def compute_fft(self, cube):
"""
Computes fast Fourier transform.
Parameters
----------
cube : array_like
input array (3-dimensional)
Returns
-------
out : array_like
transformed array
"""
cube = np.fft.rfftn(np.fft.fftshift(cube, axes=(1, 2)))
return cube * self._fft_factor
##############################################################
# Network
##############################################################
[docs]class Network:
"""
Network class
Attributes
----------
neurons : list
List with pylgn.Neuron objects
integrator : pylgn.Integrator
Integrator object
stimulus : pylgn.Stimulus
Stimulus object
"""
[docs] def __init__(self, memory_efficient=False):
"""
Network constructor
"""
self.neurons = []
self.stimulus = None
self.memory_efficient = memory_efficient
[docs] def create_integrator(self, nt, nr, dt, dr):
"""
Create and set integrator
Parameters
----------
nt : int
The power to raise 2 to. Number of temporal points is 2**nt.
nr : int
The power to raise 2 to. Number of spatial points is 2**nr.
dt : quantity scalar
Temporal resolution
dr : quantity scalar
Spatial resolution
Returns
-------
out : pylgn.Integrator
Integrator object
"""
self.integrator = Integrator(nt, nr, dt, dr)
return self.integrator
[docs] def create_descriptive_neuron(self, background_response=0/pq.s, kernel=None,
annotations={}):
"""
Create descriptive neuron
Parameters
----------
background_response : quantity scalar
Background activity.
kernel : function
Impulse-response function.
annotations : dict
Dictionary with various annotations.
Returns
-------
out : pylgn.DescriptiveNeuron
Descriptive neuron object
"""
neuron = DescriptiveNeuron(background_response=background_response,
kernel=kernel, annotations=annotations)
self.neurons.append(neuron)
return neuron
[docs] def create_ganglion_cell(self, background_response=0/pq.s, kernel=None,
annotations={}):
"""
Create ganglion cell
Parameters
----------
background_response : quantity scalar
Background activity.
kernel : function
Impulse-response function.
annotations : dict
Dictionary with various annotations.
Returns
-------
out : pylgn.Ganglion
Ganglion object
"""
ganglion = Ganglion(background_response=background_response,
kernel=kernel, annotations=annotations)
self.neurons.append(ganglion)
return ganglion
[docs] def create_relay_cell(self, background_response=0/pq.s, annotations={}):
"""
Create relay cell
Parameters
----------
background_response : quantity scalar
Background activity.
annotations : dict
Dictionary with various annotations.
Returns
-------
out : pylgn.Relay
Relay object
"""
if len([neuron for neuron in self.neurons if type(neuron).__name__ == "Relay"]) != 0:
raise ValueError("network already has relay cell population")
relay = Relay(background_response, annotations=annotations)
self.neurons.append(relay)
return relay
[docs] def create_cortical_cell(self, background_response=0/pq.s,
annotations={}):
"""
Create cortical cell
Parameters
----------
background_response : quantity scalar
Background activity.
annotations : dict
Dictionary with various annotations.
Returns
-------
out : pylgn.Cortical
Cortical object
"""
cortical = Cortical(background_response, annotations=annotations)
self.neurons.append(cortical)
return cortical
[docs] def set_stimulus(self, closure, compute_fft=False):
"""
Sets stimulus.
Parameters
----------
closure : callable (closure)
stimulus function. If compute_fft is False the
stimulus function should be the Fourier transform
of the stimulus.
compute_fft : bool
If True numerical integration is used to calculate
the Fourier transform of the stimulus.
"""
from inspect import getargspec
self.stimulus = Stimulus(closure)
closure_args = getargspec(closure).args
if not compute_fft:
if not all(arg in ("w", "kx", "ky") for arg in closure_args):
raise TypeError("Fourier transformed stimulus closure should have w, kx, and ky as arguments, not {}".format(closure_args))
w_vec, ky_vec, kx_vec = self.integrator.freq_meshgrid()
stimulus_ft = closure(w=w_vec, kx=kx_vec, ky=ky_vec)
self.stimulus.ft = stimulus_ft
else:
if not all(arg in ("t", "x", "y") for arg in closure_args):
raise TypeError("Stimulus closure should have t, x, and y as arguments, not {} for computing FFT".format(closure_args))
t_vec, y_vec, x_vec = self.integrator.meshgrid()
self.stimulus.ft = self.integrator.compute_fft(closure(t=t_vec,
x=x_vec,
y=y_vec))
[docs] def connect(self, source, target, kernel, weight=1.0):
"""
Connect neurons.
Parameters
----------
source : pylgn.Neuron
Source neuron
target : pylgn.Neuron
Target neuron
kernel : function
Connectivity kernel
weight : float
Connectivity weight
"""
if isinstance(kernel, tuple):
spatiotemporal_kernel = _unpack_kernel_tuple(kernel)
else:
spatiotemporal_kernel = kernel
target.add_connection(source, spatiotemporal_kernel, weight)
[docs] def compute_irf_ft(self, neuron):
"""
Computes the Fourier transform of the
impulse-response function of a neuron.
Parameters
----------
neuron : pylgn.Neuron
"""
neuron.irf_ft_is_computed = True
w_vec, ky_vec, kx_vec = self.integrator.freq_meshgrid()
neuron.irf_ft = neuron.evaluate_irf_ft(w_vec, kx_vec, ky_vec)
[docs] def compute_response_ft(self, neuron, recompute_irf_ft=False):
"""
Computes the Fourier transform of the response of a neuron.
Parameters
----------
neuron : pylgn.Neuron
"""
if self.stimulus.ft is None:
raise ValueError("Stimulus is not set. Use network.set_stimulus(stimuls).")
neuron.response_ft_is_computed = True
if not neuron.irf_ft_is_computed or recompute_irf_ft:
self.compute_irf_ft(neuron)
neuron.response_ft = np.multiply(neuron.irf_ft, self.stimulus.ft)
if neuron.background_response.magnitude != 0:
neuron.response_ft[0, 0, 0] += 8*np.pi**3 / self.integrator.dk.magnitude**2 / self.integrator.dw.magnitude * neuron.background_response.rescale(1/pq.s).magnitude
if self.memory_efficient:
neuron.irf_ft_is_computed = False
neuron.irf_ft = None
self.stimulus.ft = None
[docs] def compute_irf(self, neuron, recompute_ft=False):
"""
Computes the impulse-response function of a neuron.
Parameters
----------
neuron : pylgn.Neuron
recompute_ft : bool
If True the Fourier transform is recalculated.
"""
if not neuron.irf_ft_is_computed or recompute_ft:
self.compute_irf_ft(neuron)
neuron.irf = self.integrator.compute_inverse_fft(neuron.irf_ft) * neuron.unit
if self.memory_efficient:
neuron.irf_ft_is_computed = False
neuron.irf_ft = None
[docs] def compute_response(self, neuron, recompute_ft=False):
"""
Computes the response of a neuron.
Parameters
----------
neuron : pylgn.Neuron
recompute_ft : bool
If True the Fourier transform is recalculated.
"""
if not neuron.response_ft_is_computed or recompute_ft:
self.compute_response_ft(neuron, recompute_irf_ft=True)
neuron.response = self.integrator.compute_inverse_fft(neuron.response_ft) * neuron.unit
if self.memory_efficient:
neuron.response_ft_is_computed = False
neuron.response_ft = None
# NOTE half-wave rectified function for cortical cells
if isinstance(neuron, Cortical):
neuron.response = neuron.response.clip(min=0*neuron.unit)
[docs] def clear(self):
"""
Clears the neuron list.
"""
del self.neurons[:]
##############################################################
# Stimulus
##############################################################
class Stimulus:
"""
Stimulus class
Attributes
----------
neurons : list
List with pylgn.Neuron objects
integrator : pylgn.Integrator
Integrator object
"""
def __init__(self, closure):
"""
Network constructor
"""
self.spatiotemporal = None
self.ft = None
self.closure = closure
##############################################################
# Neuron classes
##############################################################
[docs]class Neuron(ABC):
"""
Neuron base class.
Attributes
----------
center_response
background_response : quantity scalar
Background activity.
annotations : dict
Dictionary with various annotations on the Neuron object.
connections : dict
Dictionary with connected neurons including the connectivity
kernel and weight.
response : quantity array
Spatiotemporal response
response_ft : quantity array
Fourier transformed response
irf : quantity array
Spatiotemporal impulse-response function
irf_ft : quantity array
Fourier transformed impulse-response function
"""
[docs] def __init__(self, background_response, annotations):
"""
Neuron constructor
Parameters
----------
background_response : quantity scalar
Background activity.
annotations : dict
Dictionary with various annotations on the Neuron object.
"""
self.unit = 1. / pq.s
self.background_response = background_response if isinstance(background_response, pq.Quantity) else background_response * self.unit
self.connections = defaultdict(list)
self.response = None
self.response_ft = None
self.irf = None
self.irf_ft = None
self.irf_ft_is_computed = False
self.response_ft_is_computed = False
self.annotations = {"background_response": background_response}
self.annotations.update({"connections": defaultdict(list)})
self.annotate(annotations)
[docs] def annotate(self, annotations):
"""
Add annotations to a Neuron object.
Parameters
----------
annotations : dict
Dictionary containing annotations
"""
self.annotations.update(annotations)
[docs] def add_connection(self, neuron, kernel, weight):
"""
Add connection to another neuron.
Parameters
----------
neuron : pylgn.Neuron
Source neuron
kernel : functions
Connectivity kernel
weight : float
Connectivity weight
"""
self._check_if_connection_is_allowed(neuron)
connection = {"neuron": neuron,
"kernel": kernel,
"weight": weight}
self.connections[type(neuron).__name__].append(connection)
annotation = {"kernel": closure_params(kernel), "weight": weight}
self.annotations["connections"][type(neuron).__name__.lower()].append(annotation)
@abstractmethod
def _check_if_connection_is_allowed(self, neuron):
pass
[docs] @abstractmethod
def evaluate_irf_ft(self, w, kx, ky):
"""
Evaluates the Fourier transform of
impulse-response function
"""
pass
@property
def center_response(self):
"""
Response of neuron in the center of grid over time
Returns
-------
out : quantity array
Response of neuron in the center of grid over time
"""
idx = int(self.response.shape[1] / 2)
return np.real(self.response[:, idx, idx])
[docs]class DescriptiveNeuron(Neuron):
[docs] def __init__(self, background_response, kernel, annotations={}):
"""
Descriptive neuron constructor
Parameters
----------
background_response : quantity scalar
Background activity.
kernel : function
Impulse-response function.
annotations : dict
Dictionary with various annotations.
"""
Neuron.__init__(self, background_response, annotations)
self.set_kernel(kernel)
def _check_if_connection_is_allowed(self, neuron):
raise TypeError("Descriptive cells cannot receive connection")
[docs] def evaluate_irf_ft(self, w, kx, ky):
"""
Evaluates the Fourier transform of
impulse-response function
"""
return self.kernel(w, kx, ky)
[docs] def set_kernel(self, kernel):
"""
Set the impulse-response function.
Parameters
----------
kernel : func or tuple
Fourier transformed kernel/
tuple of Fourier transformed spatial
and temporal kernel
"""
if isinstance(kernel, tuple):
self.kernel = _unpack_kernel_tuple(kernel)
else:
self.kernel = kernel
self.annotations["kernel"] = closure_params(self.kernel)
[docs]class Ganglion(Neuron):
[docs] def __init__(self, background_response, kernel, annotations={}):
"""
Ganglion constructor
Parameters
----------
background_response : quantity scalar
Background activity.
kernel : function
Impulse-response function.
annotations : dict
Dictionary with various annotations.
"""
from .kernels.spatial import create_dog_ft
from .kernels.temporal import create_biphasic_ft
Neuron.__init__(self, background_response, annotations)
if kernel is None:
kernel = (create_dog_ft(), create_biphasic_ft())
self.set_kernel(kernel)
def _check_if_connection_is_allowed(self, neuron):
raise TypeError("Ganglion cells cannot receive connection")
[docs] def evaluate_irf_ft(self, w, kx, ky):
"""
Evaluates the Fourier transform of
impulse-response function
"""
return self.kernel(w, kx, ky)
[docs] def set_kernel(self, kernel):
"""
Set the impulse-response function.
Parameters
----------
kernel : func or tuple
Fourier transformed kernel/
tuple of Fourier transformed spatial
and temporal kernel
"""
if isinstance(kernel, tuple):
self.kernel = _unpack_kernel_tuple(kernel)
else:
self.kernel = kernel
self.annotations["kernel"] = closure_params(self.kernel)
[docs]class Relay(Neuron):
[docs] def __init__(self, background_response, annotations={}):
"""
Relay constructor
Parameters
----------
background_response : quantity scalar
Background activity.
annotations : dict
Dictionary with various annotations.
"""
Neuron.__init__(self, background_response, annotations)
def _check_if_connection_is_allowed(self, neuron):
if not isinstance(neuron, (Ganglion, Cortical)):
raise TypeError("Unsupported connection to relay cell: ", type(neuron))
[docs] def evaluate_irf_ft(self, w, kx, ky):
"""
Evaluates the Fourier transform of
impulse-response function
"""
retinal_input = 0
cortical_input = 0
for c in self.connections["Ganglion"]:
Krg = c["kernel"](w, kx, ky)
Wg = c["neuron"].evaluate_irf_ft(w, kx, ky)
w_rg = c["weight"]
retinal_input += w_rg * Krg * Wg
for c in self.connections["Cortical"]:
thalamocortical_connection = c["neuron"].connections["Relay"][0]
Kcr = thalamocortical_connection["kernel"](w, kx, ky)
Krc = c["kernel"](w, kx, ky)
w_rc = c["weight"]
w_cr = thalamocortical_connection["weight"]
cortical_input += w_rc * w_cr * Krc * Kcr
return retinal_input / (1 - cortical_input)
[docs]class Cortical(Neuron):
[docs] def __init__(self, background_response, annotations={}):
"""
Cortical constructor
Parameters
----------
background_response : quantity scalar
Background activity.
annotations : dict
Dictionary with various annotations.
"""
Neuron.__init__(self, background_response, annotations)
[docs] def evaluate_irf_ft(self, w, kx, ky):
"""
Evaluates the Fourier transform of
impulse-response function
"""
thalamic_input = 0
for c in self.connections["Relay"]:
Kcr = c["kernel"](w, kx, ky)
Rr = c["neuron"].evaluate_irf_ft(w, kx, ky)
w_cr = c["weight"]
thalamic_input += w_cr * Kcr * Rr
return thalamic_input
def _check_if_connection_is_allowed(self, neuron):
if not isinstance(neuron, Relay):
raise TypeError("Unsupported connection to cortical cell: ", type(neuron))
if type(neuron).__name__ in self.connections:
raise ValueError("Cortical cell already has a thalamic connection: ",
self.connections)