Source code for spiketimes.correlate

import numpy as np
import pandas as pd
from scipy import stats
from scipy import signal
from .binning import binned_spiketrain
from .utils import p_adjust, _ppois
from .statistics import ifr
from .surrogates import shuffled_isi_spiketrains
from .utils import _random_combination


[docs]def auto_corr( spiketrain: np.ndarray, binsize: float = 0.01, num_lags: int = 100, as_df: bool = False, t_start: float = None, t_stop: float = None, ): """ Returns the autocorrelation function of a spiketrain. Args: spiketrain: A numpy array of spiketimes binsize: The size of the time bin in seconds num_lags: The number of lags forward and backwards around lag 0 to return t_start: Minimum timepoint t_stop: Maximum timepoint as_df: Whether to return results as pandas DataFrame Returns: time_bins, autocorrelation_values """ # get lag labels time_span = binsize * num_lags time_bins = np.arange(-time_span, time_span + binsize, binsize) time_bins = np.delete(time_bins, len(time_bins) // 2) # delete 0 element # discretise the spiketrain if t_start is None: t_start = np.nanmin(spiketrain) if t_stop is None: t_stop = np.nanmax(spiketrain) _, binned_spiketrain_ = binned_spiketrain( spiketrain, fs=(1 / binsize), t_start=t_start, t_stop=t_stop ) # get autocorrelation values vals = signal.correlate(binned_spiketrain_, binned_spiketrain_, mode="same") zero_idx = len(vals) // 2 vals = vals[(zero_idx - num_lags) : (zero_idx + num_lags + 1)] vals = np.delete(vals, len(vals) // 2) # delete 0 element if not as_df: return time_bins, vals else: return pd.DataFrame({"time_bin": time_bins, "autocorrelation": vals})
[docs]def cross_corr( spiketrain_1: np.ndarray, spiketrain_2: np.ndarray, binsize: float = 0.01, num_lags: int = 100, as_df: bool = False, t_start: float = None, t_stop: float = None, delete_0_lag: bool = False, ): """ Calculate crosscorrelation between two spiketrains. Args: spiketrain_1: A numpy array of spiketimes. spiketrain_2: A numpy array of spiketimes binsize: The size of the time bin in seconds num_lags: The number of lags forward and backwards around lag 0 to return as_df: Whether to return results as pandas DataFrame t_start: Minimum timepoint t_stop: Maximum timepoint delete_0_lag: Wheter to remove the 0-lag element Returns: time_bins, crosscorrelation_values """ # get lag labels time_span = binsize * num_lags time_bins = np.arange(-time_span, time_span + binsize, binsize) # discretise the spiketrain if t_start is None: t_start = np.nanmin([np.nanmin(spiketrain_1), np.nanmin(spiketrain_2)]) if t_stop is None: t_stop = np.nanmax([np.nanmax(spiketrain_1), np.nanmax(spiketrain_2)]) _, bins_1 = binned_spiketrain( spiketrain_1, fs=(1 / binsize), t_start=t_start, t_stop=t_stop ) _, bins_2 = binned_spiketrain( spiketrain_2, fs=(1 / binsize), t_start=t_start, t_stop=t_stop ) # get crosscorrelation values vals = signal.correlate(bins_1, bins_2, mode="same") zero_idx = len(vals) // 2 vals = vals[(zero_idx - num_lags) : (zero_idx + num_lags + 1)] if delete_0_lag: time_bins = np.delete(time_bins, len(time_bins) // 2) vals = np.delete(vals, len(vals) // 2) if not as_df: return time_bins, vals else: return pd.DataFrame({"time_bin": time_bins, "crosscorrelation": vals})
[docs]def cross_corr_test( spiketrain_1: np.ndarray, spiketrain_2: np.ndarray, binsize: float = 0.01, num_lags: int = 100, as_df: bool = False, t_start: float = None, t_stop: float = None, tail: str = "two_tailed", adjust_p: bool = True, p_adjust_method: str = "Benjamini-Hochberg", ): """ Calculate crosscorrelation between two spiketrains. Also test significance of each bin. Significance test performed by comparing observed crosscorrelation to expected cross correlation of poisson spiketrains. Args: spiketrain_1: A numpy array of spiketimes. spiketrain_2: A numpy array of spiketimes binsize: The size of the time bin in seconds num_lags: The number of lags forward and backwards around lag 0 to return as_df: Whether to return results as pandas DataFrame t_start: Minimum timepoint t_stop: Maximum timepoint delete_0_lag: Wheter to remove the 0-lag element tail: Tail for hypothesis test {"two_tailed", "upper", "lower"}. Two tailed reccomended adjust_p: Whether to adjust p-values for multiple comparisons. p_adjust_method: If adjusting p-values, specified which method to use {Benjamini-Hochberg', 'Bonferroni', 'Bonferroni-Holm'} Returns: time_bins, crosscorrelation_values, p """ if t_start is None: t_start = np.nanmin([np.nanmin(spiketrain_1), np.nanmin(spiketrain_2)]) if t_stop is None: t_stop = np.nanmax([np.nanmax(spiketrain_1), np.nanmax(spiketrain_2)]) # get observed t, cc = cross_corr( spiketrain_1, spiketrain_2, binsize=binsize, num_lags=num_lags, as_df=False, t_start=t_start, t_stop=t_stop, ) lam = np.mean(cc) p = np.array(list(map(lambda x: _ppois(x, mu=lam, tail="two_tailed"), cc))) p = np.array(p) if tail == "two_tailed": p = p * 2 if adjust_p: p = p_adjust(p, method=p_adjust_method) if not as_df: return t, cc, p else: return pd.DataFrame({"time_bin": t, "crosscorrelation": cc, "p": p})
[docs]def spike_count_correlation( spiketrain_1: np.ndarray, spiketrain_2: np.ndarray, binsize: float, min_firing_rate: float = None, t_start: float = None, t_stop: float = None, ): """ Calculate spike count correlation between two spiketrains. Args: spiketrain_1: A numpy array of spiketimes. spiketrain_2: A numpy array of spiketimes binsize: The size of the time bin in seconds min_firing_rate: If selected, selects only bins where the geometric mean firing rate of the two spiketrains exeedes this value. t_start: Minimum timepoint t_stop: Maximum timepoint Returns: Pearson's correlation coefficient """ if t_start is None: t_start = np.min([np.min(spiketrain_1), np.min(spiketrain_2)]) if t_stop is None: t_stop = np.max([np.max(spiketrain_1), np.max(spiketrain_2)]) # convert to binned spike train _, st1_binned = binned_spiketrain( spiketrain_1, fs=(1 / binsize), t_start=t_start, t_stop=t_stop ) _, st2_binned = binned_spiketrain( spiketrain_2, fs=(1 / binsize), t_start=t_start, t_stop=t_stop ) if not min_firing_rate: return np.corrcoef(st1_binned, st2_binned)[0, 1] ifr_1 = ifr(spiketrain_1, fs=(1 / binsize), t_start=t_start, t_stop=t_stop).rename( columns={"ifr": "ifr_1"} ) ifr_2 = ifr(spiketrain_2, fs=(1 / binsize), t_start=t_start, t_stop=t_stop).rename( columns={"ifr": "ifr_2"} ) df = pd.merge(ifr_1, ifr_2).assign(st1_binned=st1_binned, st2_binned=st2_binned) df["gmean"] = stats.gmean(df.loc[:, ["ifr_1", "ifr_2"]], axis=1) df = df.copy().loc[df["gmean"] > min_firing_rate] if len(df) == 0: return np.nan return np.corrcoef(df["st1_binned"], df["st2_binned"])[0, 1]
[docs]def spike_count_correlation_test( spiketrain_1: np.ndarray, spiketrain_2: np.ndarray, binsize: float, n_boot: int = 500, min_firing_rate: float = None, t_start: float = None, t_stop: float = None, tail: str = "two_tailed", ): """ Calculate peason's correlation coefficient between spikecounts of a pair of spiketrains. Args: spiketrain_1: The first spiketrain to correlate. Should be an nd.array of spiketimes in seconds. spiketrain_2: The second spiketrain to correlate. Should be an nd.array of spiketimes in seconds. binsize: The size of time bins used in seconds. min_firing_rate: If selected, selects only bins where the geometric mean firing rate of the two spiketrains exeedes this value t_start: The startpoint of the first bin. Defaults to the first spike in the two trains t_stop: The maximum time for a time bin. Defaults to the last spike in the two trians Returns: R, p """ r = spike_count_correlation(spiketrain_1, spiketrain_2, binsize=binsize) n_surrogates = int(n_boot / 3) st1_shuffled = shuffled_isi_spiketrains(spiketrain_1, n=n_surrogates) st2_shuffled = shuffled_isi_spiketrains(spiketrain_2, n=n_surrogates) combs = np.array( [_random_combination(np.arange(n_surrogates), r=2) for _ in range(n_boot)], dtype=np.int64, ) replicates = np.apply_along_axis( lambda x: spike_count_correlation( st1_shuffled[x[0]], st2_shuffled[x[1]], binsize=binsize ), arr=combs, axis=1, ) if tail == "two_tailed": replicates = np.absolute(replicates) p = np.nanmean(replicates >= np.absolute(r)) * 2 elif tail == "upper": p = np.nanmean(replicates >= p) elif tail == "lower": p = np.nanmean(replicates <= p) else: raise ValueError( "Could not parse tail value. Select one of" "{'Two tailed', 'lower', 'upper'} - 'upper' if hypothesising a positive r" ) return r, p