Source code for limix.stats._confusion

from __future__ import division

import logging

import numpy as np
from numpy import argsort, asarray, concatenate, nan, where


def _get_jit():
    try:
        from numba import jit
    except ImportError:

        def jit(x, *args, **kwargs):
            return x

    return jit


def _get_walk_left():
    jit = _get_jit()

    @jit(cache=True)
    def _walk_left(pos, c, dist):
        step = 0
        middle = pos[c]
        i = c
        while i > 0 and step < dist:
            i -= 1
            step = middle - pos[i]
        if step > dist:
            i += 1
        return i

    return _walk_left


def _get_walk_right():
    jit = _get_jit()

    @jit(cache=True)
    def _walk_right(pos, c, dist):
        step = 0
        middle = pos[c]
        i = c
        n = len(pos)
        while i < n - 1 and step < dist:
            i += 1
            step = pos[i] - middle
        if step > dist:
            i -= 1
        return i

    return _walk_right


def roc_curve(multi_score, method, max_fpr=0.05):
    max_fpr = float(max_fpr)
    fprs = np.arange(0., max_fpr, step=0.001)
    tprs = np.empty_like(fprs)
    tprs_stde = np.empty_like(fprs)
    for (i, fpr) in enumerate(fprs):
        tprs_ = multi_score.get_tprs(method, fpr=fpr, approach="rank")
        tprs[i] = np.mean(tprs_)
        tprs_stde[i] = np.std(tprs_) / np.sqrt(len(tprs_))
    return (fprs, tprs, tprs_stde)


# TODO: convert to numpy style
[docs]def confusion_matrix(df, wsize=50000): """Provide a couple of scores based on the idea of windows around genetic markers. :param causals: Indices defining the causal markers. :param pos: Within-chromossome base-pair position of each candidate marker, in crescent order. """ logger = logging.getLogger(__name__) wsize = int(wsize) if "chrom" not in df: df = df.assign(chrom=["1"] * len(df)) df.sort_values(by=["chrom", "pos"], inplace=True) chromids = df["chrom"].unique() offset = 0 idx_offset = 0 pos = [] causal = [] pv = [] for cid in sorted(chromids): df0 = df.query("chrom=='%s'" % cid) pos.append(offset + asarray(df0["pos"], float)) pv.append(asarray(df0["pv"], float)) offset += pos[-1][-1] + wsize // 2 + 1 if df0["causal"].sum() > 0: causal.append(idx_offset + where(df0["causal"])[0]) idx_offset += len(df0) pos = concatenate(pos) pv = concatenate(pv) causal = concatenate(causal) causal = asarray(causal, int) total_size = pos[-1] - pos[0] if wsize > 0.1 * total_size: perc = wsize // total_size * 100 logger.warn( "The window size is %d%% of the total candidate" + " region.", int(perc) ) ld_causal_markers = set() for _, c in enumerate(causal): if wsize == 1: right = left = pos[c] else: left = _get_walk_left()(pos, c, wsize // 2) right = _get_walk_right()(pos, c, wsize // 2) for i in range(left, right + 1): ld_causal_markers.add(i) P = len(ld_causal_markers) N = len(pos) - P ld_causal_markers = list(ld_causal_markers) logger.info("Found %d positive and %d negative markers.", P, N) return ConfusionMatrix(P, N, ld_causal_markers, argsort(pv))
def getter(func): class ItemGetter(object): def __getitem__(self, i): return func(i) def __lt__(self, other): return func(np.s_[:]) < other def __le__(self, other): return func(np.s_[:]) <= other def __gt__(self, other): return func(np.s_[:]) > other def __ge__(self, other): return func(np.s_[:]) >= other def __eq__(self, other): return func(np.s_[:]) == other return ItemGetter() # TODO: document it class ConfusionMatrix(object): def __init__(self, P, N, true_set, idx_rank): self._TP = np.empty(P + N + 1, dtype=int) self._FP = np.empty(P + N + 1, dtype=int) if len(idx_rank) != P + N: raise ValueError( "Rank indices array has to have length equal" + " to ``P + N``." ) true_set = np.asarray(true_set, int) true_set.sort() idx_rank = np.asarray(idx_rank, int) ins_pos = np.searchsorted(true_set, idx_rank) _confusion_matrix_tp_fp(P + N, ins_pos, true_set, idx_rank, self._TP, self._FP) self._N = N self._P = P @property def TP(self): return getter(lambda i: self._TP[i]) @property def FP(self): return getter(lambda i: self._FP[i]) @property def TN(self): return getter(lambda i: self._N - self.FP[i]) @property def FN(self): return getter(lambda i: self._P - self.TP[i]) @property def sensitivity(self): """ Sensitivity (also known as true positive rate.) """ return getter(lambda i: self.TP[i] / self._P) @property def tpr(self): return self.sensitivity @property def recall(self): return self.sensitivity @property def specifity(self): """ Specifity (also known as true negative rate.) """ return getter(lambda i: self.TN[i] / self._N) @property def precision(self): return getter( lambda i: nan if i == 0 else self.TP[i] / (self.TP[i] + self.FP[i]) ) @property def npv(self): """ Negative predictive value. """ return getter(lambda i: self.TN[i] / (self.TN[i] + self.FN[i])) @property def fallout(self): """ Fall-out (also known as false positive rate.) """ return getter(lambda i: 1 - self.specifity[i]) @property def fpr(self): return self.fallout @property def fnr(self): """ False negative rate. """ return getter(lambda i: 1 - self.sensitivity[i]) @property def fdr(self): """ False discovery rate. """ return getter(lambda i: 1 - self.precision[i]) @property def accuracy(self): """ Accuracy. """ return getter(lambda i: (self.TP[i] + self.TN[i]) / (self._N + self._P)) @property def f1score(self): """ F1 score (harmonic mean of precision and sensitivity). """ def denominator(i): return 2 * self.TP[i] + self.FP[i] + self.FN[i] return getter(lambda i: 2 * self.TP[i] / denominator(i)) def roc(self): tpr = self.tpr[1:] fpr = self.fpr[1:] idx = np.argsort(fpr) fpr = fpr[idx] tpr = tpr[idx] return (fpr, tpr) def auc(fpr, tpr): left = fpr[0] area = 0. for i in range(1, len(fpr)): width = fpr[i] - left area += width * tpr[i - 1] left = fpr[i] area += (1 - left) * tpr[-1] return area def _confusion_matrix_tp_fp(n, ins_pos, true_set, idx_rank, TP, FP): TP[0] = 0 FP[0] = 0 i = 0 while i < n: FP[i + 1] = FP[i] TP[i + 1] = TP[i] j = ins_pos[i] if j == len(true_set) or true_set[j] != idx_rank[i]: FP[i + 1] += 1 else: TP[i + 1] += 1 i += 1