Skip to content

API reference

Auto-generated from in-source docstrings via mkdocstrings.

Variants

varcode.Variant

varcode.Variant(contig, start, ref, alt, genome=None, ensembl=None, allow_extended_nucleotides=False, normalize_contig_names=True, convert_ucsc_contig_names=None)

Bases: Serializable

Construct a Variant object.

PARAMETER DESCRIPTION
contig

Chromosome that this variant is on

TYPE: str

start

1-based position on the chromosome of first reference nucleotide

TYPE: int

ref

Reference nucleotide(s)

TYPE: str

alt

Alternate nucleotide(s)

TYPE: str

genome

Name of reference genome, Ensembl release number, or object derived from pyensembl.Genome. Default to latest available release of GRCh38

TYPE: Genome, EnsemblRelease, or str, or int DEFAULT: None

ensembl

Previous name used instead of 'genome', the two arguments should be mutually exclusive.

TYPE: Genome, EnsemblRelease, or str, or int (DEPRECATED) DEFAULT: None

allow_extended_nucleotides

Extended nucleotides include 'Y' for pyrimidies or 'N' for any base

TYPE: bool DEFAULT: False

normalize_contig_names

By default the contig name will be normalized by converting integers to strings (e.g. 1 -> "1"), and converting any letters after "chr" to uppercase (e.g. "chrx" -> "chrX"). If you don't want this behavior then pass normalize_contig_name=False.

TYPE: bool DEFAULT: True

convert_ucsc_contig_names

Setting this argument to True causes UCSC chromosome names to be coverted, such as "chr1" to "1". If the default value (None) is used then it defaults to whether or not a UCSC genome was pass in for the 'genome' argument.

TYPE: bool DEFAULT: None

Source code in varcode/variant.py
def __init__(
        self,
        contig,
        start,
        ref,
        alt,
        genome=None,
        ensembl=None,
        allow_extended_nucleotides=False,
        normalize_contig_names=True,
        convert_ucsc_contig_names=None):
    """
    Construct a Variant object.

    Parameters
    ----------
    contig : str
        Chromosome that this variant is on

    start : int
        1-based position on the chromosome of first reference nucleotide

    ref : str
        Reference nucleotide(s)

    alt : str
        Alternate nucleotide(s)

    genome : Genome, EnsemblRelease, or str, or int
        Name of reference genome, Ensembl release number, or object
        derived from pyensembl.Genome. Default to latest available release
        of GRCh38

    ensembl : Genome, EnsemblRelease, or str, or int (DEPRECATED)
        Previous name used instead of 'genome', the two arguments should
        be mutually exclusive.

    allow_extended_nucleotides : bool
        Extended nucleotides include 'Y' for pyrimidies or 'N' for any base

    normalize_contig_names : bool
        By default the contig name will be normalized by converting integers
        to strings (e.g. 1 -> "1"), and converting any letters after "chr"
        to uppercase (e.g. "chrx" -> "chrX"). If you don't want
        this behavior then pass normalize_contig_name=False.

    convert_ucsc_contig_names : bool, optional
        Setting this argument to True causes UCSC chromosome names to be
        coverted, such as "chr1" to "1". If the default value (None) is used
        then it defaults to whether or not a UCSC genome was pass in for
        the 'genome' argument.
    """

    # first initialize the fields we use to cache lists of overlapping
    # pyensembl Gene and Transcript objects, or their properties such
    # as names/IDs
    self._genes = None
    self._transcripts = None
    self._gene_ids = None
    self._gene_names = None

    # Provenance: when this variant was produced by a
    # ``varcode.transforms`` operation (e.g. ``pair_breakends``,
    # ``combine_cis_snvs``), the source rows are recorded here.
    # Empty tuple for variants loaded directly from a VCF/MAF.
    # Not part of hash/equality — identity is still
    # (contig, start, ref, alt, reference_name).
    self.source_variants: tuple = ()

    # store the options which affect how properties of this variant
    # may be changed/transformed
    self.normalize_contig_names = normalize_contig_names
    self.allow_extended_nucleotides = allow_extended_nucleotides

    # if genome not specified, try the old name 'ensembl'
    # if ensembl is also None, then default to "GRCh38"
    if genome is None and ensembl is None:
        genome = "GRCh38"
    elif genome is None:
        genome = ensembl


    # user might supply Ensembl release as an integer, reference name,
    # or pyensembl.Genome object
    self.original_genome = genome
    self.genome, self.original_genome_was_ucsc = infer_genome(genome)

    self.reference_name = self.genome.reference_name
    if self.original_genome_was_ucsc:
        self.original_reference_name = ensembl_to_ucsc_reference_names[
            self.reference_name]
    else:
        self.original_reference_name = self.reference_name

    self.original_contig = contig
    self.contig = normalize_chromosome(contig) if normalize_contig_names else contig

    if convert_ucsc_contig_names is None:
        self.convert_ucsc_contig_names = self.original_genome_was_ucsc
    else:
        self.convert_ucsc_contig_names = convert_ucsc_contig_names

    # trim off the starting "chr" from hg19 chromosome names to make them
    # match GRCh37, also convert "chrM" to "MT".
    if self.convert_ucsc_contig_names:
        self.contig = self._convert_ucsc_contig_name_to_ensembl(self.contig)

    if ref != alt and ref in STANDARD_NUCLEOTIDES and alt in STANDARD_NUCLEOTIDES:
        # Optimization for common case.
        self.original_ref = self.ref = ref
        self.original_alt = self.alt = alt
        self.original_start = self.start = self.end = int(start)
        return

    # we want to preserve the ref/alt/pos both as they appeared in the
    # original VCF or MAF file but also normalize variants to get rid
    # of shared prefixes/suffixes between the ref and alt nucleotide
    # strings e.g. g.10 CTT>T can be normalized into g.10delCT
    #
    # The normalized variant properties go into fields
    #    Variant.{original_ref, original_alt, original_pos}
    # whereas the trimmed fields are:
    #    Variant.{ref, alt, start, end}

    # the original entries must preserve the number of nucleotides in
    # ref and alt but we still want to normalize e.g. '-' and '.' into ''
    self.original_ref = normalize_nucleotide_string(
        ref,
        allow_extended_nucleotides=allow_extended_nucleotides)
    self.original_alt = normalize_nucleotide_string(
        alt,
        allow_extended_nucleotides=allow_extended_nucleotides)
    self.original_start = int(start)

    # normalize the variant by trimming any shared prefix or suffix
    # between ref and alt nucleotide sequences and then
    # offset the variant position in a strand-dependent manner
    (trimmed_ref, trimmed_alt, prefix, _) = \
        trim_shared_flanking_strings(self.original_ref, self.original_alt)

    self.ref = trimmed_ref
    self.alt = trimmed_alt

    if len(trimmed_ref) == 0:
        # insertions must be treated differently since the meaning of a
        # position for an insertion is:
        #   "insert the alt nucleotides after this position"
        #
        # Aside: what if both trimmed ref and alt strings are empty?
        # This means we had a "null" variant, probably from a VCF
        # generated by force-calling mutations which weren't actually
        # found in the sample.
        # Null variants are interepted as inserting zero nucleotides
        # after the whole reference sequence.
        #
        # Start and end both are base-1 nucleotide position before
        # insertion.
        self.start = self.original_start + max(0, len(prefix) - 1)
        self.end = self.start
    else:
        # for substitutions and deletions the [start:end] interval is
        # an inclusive selection of reference nucleotides
        self.start = self.original_start + len(prefix)
        self.end = self.start + len(trimmed_ref) - 1

ensembl property

Deprecated alias for Variant.genome

RETURNS DESCRIPTION
Genome

trimmed_ref property

Eventually the field Variant.ref will store the reference nucleotides as given in a VCF or MAF and trimming of any shared prefix/suffix between ref and alt will be done via the properties trimmed_ref and trimmed_alt.

trimmed_alt property

Eventually the field Variant.ref will store the reference nucleotides as given in a VCF or MAF and trimming of any shared prefix/suffix between ref and alt will be done via the properties trimmed_ref and trimmed_alt.

trimmed_base1_start property

Currently the field Variant.start carries the base-1 starting position adjusted by trimming any shared prefix between Variant.ref and Variant.alt. Eventually this trimming should be done more explicitly via trimmed_* properties.

trimmed_base1_end property

Currently the field Variant.end carries the base-1 "last" position of this variant, adjusted by trimming any shared suffix between Variant.ref and Variant.alt. Eventually this trimming should be done more explicitly via trimmed_* properties.

short_description property

HGVS nomenclature for genomic variants More info: http://www.hgvs.org/mutnomen/

coding_transcripts property

Protein coding transcripts

genes property

Return Gene object for all genes which overlap this variant.

gene_ids property

Return IDs of all genes which overlap this variant. Calling this method is significantly cheaper than calling Variant.genes(), which has to issue many more queries to construct each Gene object.

gene_names property

Return names of all genes which overlap this variant. Calling this method is significantly cheaper than calling Variant.genes(), which has to issue many more queries to construct each Gene object.

coding_genes property

Protein coding transcripts

is_insertion property

Does this variant represent the insertion of nucleotides into the reference genome?

is_deletion property

Does this variant represent the deletion of nucleotides from the reference genome?

is_indel property

Is this variant either an insertion or deletion?

is_snv property

Is the variant a single nucleotide variant

is_transition property

Is this variant and pyrimidine to pyrimidine change or purine to purine change

is_transversion property

Is this variant a pyrimidine to purine change or vice versa

__lt__(other)

Variants are ordered by locus.

Source code in varcode/variant.py
def __lt__(self, other):
    """
    Variants are ordered by locus.
    """
    require_instance(other, Variant, name="variant")
    if self.contig == other.contig:
        return self.start < other.start
    return self.contig < other.contig

to_dict()

We want the original values (un-normalized) field values while serializing since normalization will happen in init.

Source code in varcode/variant.py
def to_dict(self):
    """
    We want the original values (un-normalized) field values while
    serializing since normalization will happen in __init__.
    """
    return dict(
        contig=self.original_contig,
        start=self.original_start,
        ref=self.original_ref,
        alt=self.original_alt,
        genome=self.original_genome,
        allow_extended_nucleotides=self.allow_extended_nucleotides,
        normalize_contig_names=self.normalize_contig_names,
        convert_ucsc_contig_names=self.convert_ucsc_contig_names)

effects(raise_on_error=True, splice_outcomes=False, annotator=None, phase_resolver=None, rna_resolver=None, germline=None)

Predict the variant's effects on overlapping transcripts.

PARAMETER DESCRIPTION
raise_on_error

If True, raise on annotation errors; if False, capture them as Failure effects.

TYPE: bool DEFAULT: True

splice_outcomes

If True, splice-disrupting effects are wrapped in a :class:varcode.splice_outcomes.SpliceOutcomeSet carrying multiple plausible outcomes (normal splicing, exon skipping, intron retention, cryptic splice). Opt-in; default False preserves existing behaviour. See openvax/varcode#262.

TYPE: bool DEFAULT: False

annotator

Per-call annotator override. None uses the currently configured default ("fast" today; swappable via :func:varcode.set_default_annotator or :func:varcode.use_annotator). String names are resolved against the registry. See openvax/varcode#271.

TYPE: str, EffectAnnotator, or None DEFAULT: None

phase_resolver

Optional phase-evidence source (typically an :class:~varcode.phasing.IsovarPhaseResolver). When provided and the resolver has an assembled contig for (self, transcript), the returned effect's mutant_transcript is populated with the contig-derived :class:~varcode.MutantTranscript — the protein is the protein actually observed in RNA rather than one inferred from the reference. See openvax/varcode#269.

TYPE: PhaseResolver or None DEFAULT: None

rna_resolver

Optional RNA-observed-outcome source. When provided, any :class:~varcode.MultiOutcomeEffect in the result has observed outcomes from the resolver appended to its outcomes view (DNA-predicted first, RNA-observed after). Useful for refining SV / splice / haplotype predictions with isoform-level evidence from Isovar, Exacto, or a custom long-read pipeline. See openvax/varcode#259.

TYPE: RNAEvidenceResolver or None DEFAULT: None

germline

Optional patient-germline context. When non-empty, every per-transcript effect is computed against the patient's germline-applied transcript instead of the reference. Codons / splice signals where germline overlaps the somatic and phase is unknown produce a :class:PhaseAmbiguousEffect with one outcome per haplotype hypothesis; LOH at germline het positions sets effect.is_loh = True. None or :meth:GermlineContext.empty falls through to today's reference-relative behaviour byte-identically. See openvax/varcode#268.

TYPE: GermlineContext or None DEFAULT: None

Source code in varcode/variant.py
def effects(
        self, raise_on_error=True, splice_outcomes=False,
        annotator=None, phase_resolver=None, rna_resolver=None,
        germline=None):
    """Predict the variant's effects on overlapping transcripts.

    Parameters
    ----------
    raise_on_error : bool
        If True, raise on annotation errors; if False, capture
        them as Failure effects.

    splice_outcomes : bool
        If True, splice-disrupting effects are wrapped in a
        :class:`varcode.splice_outcomes.SpliceOutcomeSet` carrying
        multiple plausible outcomes (normal splicing, exon
        skipping, intron retention, cryptic splice). Opt-in;
        default False preserves existing behaviour. See
        openvax/varcode#262.

    annotator : str, EffectAnnotator, or None
        Per-call annotator override. ``None`` uses the currently
        configured default (``"fast"`` today; swappable via
        :func:`varcode.set_default_annotator` or
        :func:`varcode.use_annotator`). String names are resolved
        against the registry. See openvax/varcode#271.

    phase_resolver : PhaseResolver or None
        Optional phase-evidence source (typically an
        :class:`~varcode.phasing.IsovarPhaseResolver`). When
        provided and the resolver has an assembled contig for
        ``(self, transcript)``, the returned effect's
        ``mutant_transcript`` is populated with the
        contig-derived :class:`~varcode.MutantTranscript` — the
        protein is the protein actually observed in RNA rather
        than one inferred from the reference. See
        openvax/varcode#269.

    rna_resolver : RNAEvidenceResolver or None
        Optional RNA-observed-outcome source. When provided, any
        :class:`~varcode.MultiOutcomeEffect` in the result has
        observed outcomes from the resolver appended to its
        ``outcomes`` view (DNA-predicted first, RNA-observed
        after). Useful for refining SV / splice / haplotype
        predictions with isoform-level evidence from Isovar,
        Exacto, or a custom long-read pipeline. See
        openvax/varcode#259.

    germline : GermlineContext or None
        Optional patient-germline context. When non-empty, every
        per-transcript effect is computed against the patient's
        germline-applied transcript instead of the reference.
        Codons / splice signals where germline overlaps the
        somatic and phase is unknown produce a
        :class:`PhaseAmbiguousEffect` with one outcome per
        haplotype hypothesis; LOH at germline het positions sets
        ``effect.is_loh = True``. ``None`` or
        :meth:`GermlineContext.empty` falls through to today's
        reference-relative behaviour byte-identically. See
        openvax/varcode#268.
    """
    effects = predict_variant_effects(
        variant=self,
        raise_on_error=raise_on_error,
        splice_outcomes=splice_outcomes,
        annotator=annotator,
        germline=germline,
        phase_resolver=phase_resolver,
    )
    if phase_resolver is not None:
        from .phasing import apply_phase_resolver_to_effects
        apply_phase_resolver_to_effects(effects, phase_resolver)
    if rna_resolver is not None:
        from .rna_evidence import apply_rna_evidence_to_effects
        apply_rna_evidence_to_effects(effects, rna_resolver)
    return effects

clone_without_ucsc_data()

Clone this variant but discarding the original format of its genome and contig: useful if we want to mix hg19 and GRCh37 variants.

RETURNS DESCRIPTION
Variant
Source code in varcode/variant.py
def clone_without_ucsc_data(self):
    """
    Clone this variant but discarding the original format of its genome
    and contig: useful if we want to mix hg19 and GRCh37 variants.

    Returns
    -------
    Variant
    """
    return Variant(
        contig=self.contig,
        start=self.original_start,
        ref=self.original_ref,
        alt=self.original_alt,
        genome=self.genome,
        allow_extended_nucleotides=self.allow_extended_nucleotides,
        normalize_contig_names=self.normalize_contig_names,
        convert_ucsc_contig_names=False)

varcode.VariantCollection

varcode.VariantCollection(variants, distinct=True, sort_key=variant_ascending_position_sort_key, sources=None, source_to_metadata_dict={})

Bases: Collection

Construct a VariantCollection from a list of Variant records.

PARAMETER DESCRIPTION
variants

Variant objects contained in this VariantCollection

TYPE: iterable

distinct

Don't keep repeated variants

TYPE: bool DEFAULT: True

sort_key

TYPE: callable DEFAULT: variant_ascending_position_sort_key

sources

Optional set of source names, may be larger than those for which we have metadata dictionaries.

TYPE: set DEFAULT: None

source_to_metadata_dict

Dictionary mapping each source name (e.g. VCF path) to a dictionary from metadata attributes to values.

TYPE: dict DEFAULT: {}

Source code in varcode/variant_collection.py
def __init__(
        self,
        variants,
        distinct=True,
        sort_key=variant_ascending_position_sort_key,
        sources=None,
        source_to_metadata_dict={}):
    """
    Construct a VariantCollection from a list of Variant records.

    Parameters
    ----------
    variants : iterable
        Variant objects contained in this VariantCollection

    distinct : bool
        Don't keep repeated variants

    sort_key : callable

    sources : set
        Optional set of source names, may be larger than those for
        which we have metadata dictionaries.

    source_to_metadata_dict : dict
        Dictionary mapping each source name (e.g. VCF path) to a dictionary
        from metadata attributes to values.
    """
    self.source_to_metadata_dict = source_to_metadata_dict
    if sources is None:
        sources = set(source_to_metadata_dict.keys())
    if any(source not in sources for source in source_to_metadata_dict.keys()):
        raise ValueError(
            "Mismatch between sources=%s and keys of source_to_metadata_dict=%s" % (
                sources,
                set(source_to_metadata_dict.keys())))
    Collection.__init__(
        self,
        elements=variants,
        distinct=distinct,
        sort_key=sort_key,
        sources=sources)
    # Keep self.variants in sync with the Collection's post-sort,
    # post-dedup elements so that iterating `vc` and reading
    # `vc.variants` produce the same order.  See openvax/varcode#220.
    self.variants = self.elements

metadata property

The most common usage of a VariantCollection is loading a single VCF, in which case it's annoying to have to always specify that path when accessing metadata fields. This property is meant to both maintain backward compatibility with old versions of Varcode and make the common case easier.

samples property

Sorted list of sample names present in the collection's sample_info metadata (empty if no VCFs with sample columns were loaded).

to_dict()

Since Collection.to_dict() returns a state dictionary with an 'elements' field we have to rename it to 'variants'.

Source code in varcode/variant_collection.py
def to_dict(self):
    """
    Since Collection.to_dict() returns a state dictionary with an
    'elements' field we have to rename it to 'variants'.
    """
    return dict(
        variants=self.variants,
        distinct=self.distinct,
        sort_key=self.sort_key,
        sources=self.sources,
        source_to_metadata_dict=self.source_to_metadata_dict)

clone_with_new_elements(new_elements)

Create another VariantCollection of the same class and with same state (including metadata) but possibly different entries.

Warning: metadata is a dictionary keyed by variants. This method leaves that dictionary as-is, which may result in extraneous entries or missing entries.

Source code in varcode/variant_collection.py
def clone_with_new_elements(self, new_elements):
    """
    Create another VariantCollection of the same class and with
    same state (including metadata) but possibly different entries.

    Warning: metadata is a dictionary keyed by variants. This method
    leaves that dictionary as-is, which may result in extraneous entries
    or missing entries.
    """
    kwargs = self.to_dict()
    kwargs["variants"] = new_elements
    return self.from_dict(kwargs)

effects(raise_on_error=True, splice_outcomes=False, annotator=None, phase_resolver=None, rna_resolver=None, germline=None, validate_reference=True)

PARAMETER DESCRIPTION
raise_on_error

If exception is raised while determining effect of variant on a transcript, should it be raised? This default is True, meaning errors result in raised exceptions, otherwise they are only logged.

TYPE: bool DEFAULT: True

splice_outcomes

If True, splice-disrupting effects are wrapped in a :class:varcode.splice_outcomes.SpliceOutcomeSet carrying multiple plausible outcomes. Opt-in; default False preserves existing behaviour. See openvax/varcode#262.

TYPE: bool DEFAULT: False

annotator

Per-call annotator override applied to every variant in the collection. See :meth:Variant.effects and openvax/varcode#271.

TYPE: str, EffectAnnotator, or None DEFAULT: None

phase_resolver

Optional phase-evidence source (e.g. :class:~varcode.phasing.IsovarPhaseResolver). When provided, any effect whose (variant, transcript) is covered by an assembled contig has its mutant_transcript populated with the contig-derived :class:~varcode.MutantTranscript. See openvax/varcode#269.

TYPE: PhaseResolver or None DEFAULT: None

rna_resolver

Optional RNA-observed-outcome source. When provided, any :class:~varcode.MultiOutcomeEffect in the result has observed outcomes from the resolver appended to its outcomes view. See openvax/varcode#259.

TYPE: RNAEvidenceResolver or None DEFAULT: None

germline

Optional patient-germline context. When non-empty, every per-transcript effect is computed against the patient's germline-applied transcript instead of the reference. See :meth:Variant.effects and openvax/varcode#268.

TYPE: GermlineContext or None DEFAULT: None

validate_reference

Cross-check that the germline context's reference build matches this collection's reference build before running annotation. Hard error on mismatch. Set to False if you've already lifted over and know the builds agree. Ignored when germline is None.

TYPE: bool DEFAULT: True

Source code in varcode/variant_collection.py
def effects(
        self, raise_on_error=True, splice_outcomes=False,
        annotator=None, phase_resolver=None, rna_resolver=None,
        germline=None, validate_reference=True):
    """
    Parameters
    ----------
    raise_on_error : bool, optional
        If exception is raised while determining effect of variant on a
        transcript, should it be raised? This default is True, meaning
        errors result in raised exceptions, otherwise they are only logged.

    splice_outcomes : bool, optional
        If True, splice-disrupting effects are wrapped in a
        :class:`varcode.splice_outcomes.SpliceOutcomeSet` carrying
        multiple plausible outcomes. Opt-in; default False
        preserves existing behaviour. See openvax/varcode#262.

    annotator : str, EffectAnnotator, or None
        Per-call annotator override applied to every variant in
        the collection. See :meth:`Variant.effects` and
        openvax/varcode#271.

    phase_resolver : PhaseResolver or None
        Optional phase-evidence source (e.g.
        :class:`~varcode.phasing.IsovarPhaseResolver`). When
        provided, any effect whose ``(variant, transcript)`` is
        covered by an assembled contig has its
        ``mutant_transcript`` populated with the contig-derived
        :class:`~varcode.MutantTranscript`. See
        openvax/varcode#269.

    rna_resolver : RNAEvidenceResolver or None
        Optional RNA-observed-outcome source. When provided, any
        :class:`~varcode.MultiOutcomeEffect` in the result has
        observed outcomes from the resolver appended to its
        ``outcomes`` view. See openvax/varcode#259.

    germline : GermlineContext or None
        Optional patient-germline context. When non-empty, every
        per-transcript effect is computed against the patient's
        germline-applied transcript instead of the reference.
        See :meth:`Variant.effects` and openvax/varcode#268.

    validate_reference : bool, default True
        Cross-check that the germline context's reference build
        matches this collection's reference build before running
        annotation. Hard error on mismatch. Set to False if
        you've already lifted over and know the builds agree.
        Ignored when ``germline`` is ``None``.
    """
    from datetime import datetime, timezone

    from .annotators.registry import resolve_annotator
    from .phasing import (
        apply_phase_resolver_to_effects,
        build_haplotype_effects,
    )

    # Pre-flight: cross-VCF build mismatch surfaces here as one
    # readable error rather than per-variant ReferenceMismatchError
    # noise. The empty/None context is a no-op.
    if germline:
        germline.validate_against(
            self, validate_reference=validate_reference)

    annotator_instance = resolve_annotator(annotator)
    per_variant = [
        effect
        for variant in self
        for effect in variant.effects(
            raise_on_error=raise_on_error,
            splice_outcomes=splice_outcomes,
            annotator=annotator,
            germline=germline,
            phase_resolver=phase_resolver,
        )
    ]
    if phase_resolver is not None:
        apply_phase_resolver_to_effects(per_variant, phase_resolver)
        # Joint cis-variant effects sit alongside the per-variant
        # ones (#269). Consumers pick whichever granularity they
        # need; top-priority sort still reflects the highest
        # individual effect severity.
        per_variant.extend(build_haplotype_effects(
            self, per_variant, phase_resolver))
    if rna_resolver is not None:
        from .rna_evidence import apply_rna_evidence_to_effects
        apply_rna_evidence_to_effects(per_variant, rna_resolver)
    return EffectCollection(per_variant,
        annotator=getattr(annotator_instance, "name", None),
        annotator_version=getattr(annotator_instance, "version", None),
        annotated_at=datetime.now(timezone.utc).isoformat(
            timespec="seconds"),
    )

reference_names()

All distinct reference names used by Variants in this collection.

RETURNS DESCRIPTION
set of str
Source code in varcode/variant_collection.py
@memoize
def reference_names(self):
    """
    All distinct reference names used by Variants in this
    collection.

    Returns
    -------
    set of str
    """
    return {variant.reference_name for variant in self}

original_reference_names()

Similar to reference_names but preserves UCSC references, so that a variant collection derived from an hg19 VCF would return {"hg19"} instead of {"GRCh37"}.

RETURNS DESCRIPTION
set of str
Source code in varcode/variant_collection.py
@memoize
def original_reference_names(self):
    """
    Similar to `reference_names` but preserves UCSC references,
    so that a variant collection derived from an hg19 VCF would
    return {"hg19"} instead of {"GRCh37"}.

    Returns
    -------
    set of str
    """
    return {variant.original_reference_name for variant in self}

groupby_gene_name()

Group variants by the gene names they overlap, which may put each variant in multiple groups.

Source code in varcode/variant_collection.py
def groupby_gene_name(self):
    """
    Group variants by the gene names they overlap, which may put each
    variant in multiple groups.
    """
    return self.multi_groupby(lambda x: x.gene_names)

gene_counts()

Returns number of elements overlapping each gene name. Expects the derived class (VariantCollection or EffectCollection) to have an implementation of groupby_gene_name.

Source code in varcode/variant_collection.py
def gene_counts(self):
    """
    Returns number of elements overlapping each gene name. Expects the
    derived class (VariantCollection or EffectCollection) to have
    an implementation of groupby_gene_name.
    """
    return {
        gene_name: len(group)
        for (gene_name, group)
        in self.groupby_gene_name().items()
    }

filter_by_transcript_expression(transcript_expression_dict, min_expression_value=0.0)

Filters variants down to those which have overlap a transcript whose expression value in the transcript_expression_dict argument is greater than min_expression_value.

PARAMETER DESCRIPTION
transcript_expression_dict

Dictionary mapping Ensembl transcript IDs to expression estimates (either FPKM or TPM)

TYPE: dict

min_expression_value

Threshold above which we'll keep an effect in the result collection

TYPE: float DEFAULT: 0.0

Source code in varcode/variant_collection.py
def filter_by_transcript_expression(
        self,
        transcript_expression_dict,
        min_expression_value=0.0):
    """
    Filters variants down to those which have overlap a transcript whose
    expression value in the transcript_expression_dict argument is greater
    than min_expression_value.

    Parameters
    ----------
    transcript_expression_dict : dict
        Dictionary mapping Ensembl transcript IDs to expression estimates
        (either FPKM or TPM)

    min_expression_value : float
        Threshold above which we'll keep an effect in the result collection
    """
    return self.filter_any_above_threshold(
        multi_key_fn=lambda variant: variant.transcript_ids,
        value_dict=transcript_expression_dict,
        threshold=min_expression_value)

filter_by_gene_expression(gene_expression_dict, min_expression_value=0.0)

Filters variants down to those which have overlap a gene whose expression value in the transcript_expression_dict argument is greater than min_expression_value.

PARAMETER DESCRIPTION
gene_expression_dict

Dictionary mapping Ensembl gene IDs to expression estimates (either FPKM or TPM)

TYPE: dict

min_expression_value

Threshold above which we'll keep an effect in the result collection

TYPE: float DEFAULT: 0.0

Source code in varcode/variant_collection.py
def filter_by_gene_expression(
        self,
        gene_expression_dict,
        min_expression_value=0.0):
    """
    Filters variants down to those which have overlap a gene whose
    expression value in the transcript_expression_dict argument is greater
    than min_expression_value.

    Parameters
    ----------
    gene_expression_dict : dict
        Dictionary mapping Ensembl gene IDs to expression estimates
        (either FPKM or TPM)

    min_expression_value : float
        Threshold above which we'll keep an effect in the result collection
    """
    return self.filter_any_above_threshold(
        multi_key_fn=lambda effect: effect.gene_ids,
        value_dict=gene_expression_dict,
        threshold=min_expression_value)

exactly_equal(other)

Comparison between VariantCollection instances that takes into account the info field of Variant instances.

RETURNS DESCRIPTION
True if the variants in this collection equal the variants in the other
collection. The Variant.info fields are included in the comparison.
Source code in varcode/variant_collection.py
def exactly_equal(self, other):
    '''
    Comparison between VariantCollection instances that takes into account
    the info field of Variant instances.

    Returns
    ----------
    True if the variants in this collection equal the variants in the other
    collection. The Variant.info fields are included in the comparison.
    '''
    return (
        self.__class__ == other.__class__ and
        len(self) == len(other) and
        all(x.exactly_equal(y) for (x, y) in zip(self, other)))

union(*others, **kwargs)

Returns the union of variants in a several VariantCollection objects.

Source code in varcode/variant_collection.py
def union(self, *others, **kwargs):
    """
    Returns the union of variants in a several VariantCollection objects.
    """
    return self._combine_variant_collections(
        combine_fn=set.union,
        variant_collections=(self,) + others,
        kwargs=kwargs)

intersection(*others, **kwargs)

Returns the intersection of variants in several VariantCollection objects.

Source code in varcode/variant_collection.py
def intersection(self, *others, **kwargs):
    """
    Returns the intersection of variants in several VariantCollection objects.
    """
    return self._combine_variant_collections(
        combine_fn=set.intersection,
        variant_collections=(self,) + others,
        kwargs=kwargs)

difference(*others, **kwargs)

Returns variants present in this collection but not in any of the others.

Source code in varcode/variant_collection.py
def difference(self, *others, **kwargs):
    """
    Returns variants present in this collection but not in any of the others.
    """
    return self._combine_variant_collections(
        combine_fn=set.difference,
        variant_collections=(self,) + others,
        kwargs=kwargs)

to_dataframe()

Build a DataFrame from this variant collection.

Source code in varcode/variant_collection.py
def to_dataframe(self):
    """Build a DataFrame from this variant collection."""
    def row_from_variant(variant):
        return OrderedDict([
            ("chr", variant.contig),
            ("start", variant.original_start),
            ("ref", variant.original_ref),
            ("alt", variant.original_alt),
            ("gene_name", ";".join(variant.gene_names)),
            ("gene_id", ";".join(variant.gene_ids))
        ])
    rows = [row_from_variant(v) for v in self]
    # Always return a DataFrame with the expected columns, even
    # when empty, so downstream code (CSV round-trip, joins) doesn't
    # have to special-case len == 0.
    return pd.DataFrame.from_records(rows, columns=self._DATAFRAME_COLUMNS)

to_csv(path, include_header=True)

Write this collection to CSV.

PARAMETER DESCRIPTION
path

Output path.

TYPE: str

include_header

If True (default), prepend # key=value metadata lines with varcode version and reference genome so the file can be read back via from_csv without supplying a genome explicitly. Pass include_header=False for fast consumers that don't tolerate comment lines.

TYPE: bool DEFAULT: True

Source code in varcode/variant_collection.py
def to_csv(self, path, include_header=True):
    """Write this collection to CSV.

    Parameters
    ----------
    path : str
        Output path.

    include_header : bool
        If True (default), prepend ``# key=value`` metadata lines
        with varcode version and reference genome so the file can
        be read back via ``from_csv`` without supplying a genome
        explicitly. Pass ``include_header=False`` for fast
        consumers that don't tolerate comment lines.
    """
    df = self.to_dataframe()
    if not include_header:
        df.to_csv(path, index=False)
        return

    metadata = OrderedDict()
    metadata["varcode_version"] = _varcode_version
    metadata["reference_name"] = self._serialized_reference_name()
    with open(path, "w") as f:
        write_metadata_header(f, metadata)
        df.to_csv(f, index=False)

from_csv(path, genome=None, distinct=True, sort_key=variant_ascending_position_sort_key) classmethod

Rebuild a VariantCollection from a CSV previously written by VariantCollection.to_csv().

The CSV round-trip is human-readable and easy to inspect. For byte-for-byte round-trip or for faster loading of large collections (≳10k variants), prefer from_json — CSV parsing plus per-row Variant construction is significantly slower.

PARAMETER DESCRIPTION
path

Path to the CSV file. Lines starting with '#' are treated as comments and parsed as key=value metadata.

TYPE: str

genome

Reference genome to associate with the loaded variants. If None, the reference is read from the CSV's metadata header (# reference_name=...). If neither is available, raises ValueError.

TYPE: pyensembl.Genome, str, int, or None DEFAULT: None

distinct

Drop duplicate variants (same as the constructor).

TYPE: bool DEFAULT: True

sort_key

Sort key for the resulting collection.

TYPE: callable DEFAULT: variant_ascending_position_sort_key

RETURNS DESCRIPTION
VariantCollection
Source code in varcode/variant_collection.py
@classmethod
def from_csv(
        cls,
        path,
        genome=None,
        distinct=True,
        sort_key=variant_ascending_position_sort_key):
    """Rebuild a VariantCollection from a CSV previously written by
    ``VariantCollection.to_csv()``.

    The CSV round-trip is human-readable and easy to inspect. For
    byte-for-byte round-trip or for faster loading of large
    collections (≳10k variants), prefer ``from_json`` — CSV parsing
    plus per-row Variant construction is significantly slower.

    Parameters
    ----------
    path : str
        Path to the CSV file. Lines starting with '#' are treated as
        comments and parsed as ``key=value`` metadata.

    genome : pyensembl.Genome, str, int, or None
        Reference genome to associate with the loaded variants. If
        ``None``, the reference is read from the CSV's metadata
        header (``# reference_name=...``). If neither is available,
        raises ``ValueError``.

    distinct : bool
        Drop duplicate variants (same as the constructor).

    sort_key : callable
        Sort key for the resulting collection.

    Returns
    -------
    VariantCollection
    """
    header = read_metadata_header(path)
    warn_on_version_drift(header, _varcode_version, path)
    if genome is None:
        genome = header.get("reference_name")
    if genome is None:
        raise ValueError(
            "from_csv needs a reference genome: pass the `genome` "
            "argument explicitly, or write the CSV with "
            "`to_csv(include_header=True)` so `# reference_name=...` "
            "is recorded in the header. Neither was found at %s." % path)

    # Accept either "chr" or "contig" as the contig column so
    # CSVs are interchangeable between VariantCollection and
    # EffectCollection (openvax/varcode#274). Declaring both in
    # dtype is a no-op for whichever column is absent.
    df = pd.read_csv(
        path, comment="#", dtype={"chr": str, "contig": str})
    contig_col = resolve_contig_column(df.columns)
    if contig_col is None:
        raise ValueError(
            "CSV at %s is missing a contig column: expected one of %s."
            % (path, list(CONTIG_COLUMN_ALIASES)))
    required = {"start", "ref", "alt"}
    missing = required - set(df.columns)
    if missing:
        raise ValueError(
            "CSV at %s is missing required columns: %s" % (
                path, sorted(missing)))

    # Extract the columns we need by name and coerce types up front,
    # then iterate with zip. This is robust against column reordering
    # and extra columns (unlike itertuples, which depends on
    # attribute access to valid-identifier column names in fixed
    # positions) and avoids the per-row overhead of iterrows.
    contigs = df[contig_col].astype(str)
    starts = df["start"].astype(int)
    refs = df["ref"].fillna("")
    alts = df["alt"].fillna("")

    variants = [
        Variant(
            contig=contig,
            start=start,
            ref=ref,
            alt=alt,
            genome=genome,
        )
        for contig, start, ref, alt in zip(contigs, starts, refs, alts)
    ]
    return cls(
        variants=variants,
        distinct=distinct,
        sort_key=sort_key,
    )

has_sample_data()

True if the collection has any per-sample genotype info.

Source code in varcode/variant_collection.py
def has_sample_data(self):
    """True if the collection has any per-sample genotype info."""
    return len(self.samples) > 0

genotype(variant, sample)

Return the Genotype for sample at variant.

PARAMETER DESCRIPTION
variant

TYPE: Variant

sample

TYPE: str

RETURNS DESCRIPTION
Genotype or None

None if the variant has no sample_info metadata at all (e.g. it was constructed directly rather than loaded from a multi-sample VCF).

RAISES DESCRIPTION
SampleNotFoundError

If the variant's metadata exists but doesn't include the requested sample. Subclass of KeyError.

Source code in varcode/variant_collection.py
def genotype(self, variant, sample):
    """Return the ``Genotype`` for ``sample`` at ``variant``.

    Parameters
    ----------
    variant : Variant
    sample : str

    Returns
    -------
    Genotype or None
        ``None`` if the variant has no sample_info metadata at all
        (e.g. it was constructed directly rather than loaded from
        a multi-sample VCF).

    Raises
    ------
    SampleNotFoundError
        If the variant's metadata exists but doesn't include the
        requested sample. Subclass of ``KeyError``.
    """
    meta = self._metadata_for(variant)
    if meta is None:
        return None
    sample_info = meta.get("sample_info")
    if sample_info is None:
        return None
    if sample not in sample_info:
        raise SampleNotFoundError(
            "Sample %r not found in %s. Available samples: %s" % (
                sample, variant, sorted(sample_info.keys())))
    return Genotype.from_sample_info(sample_info[sample])

zygosity(variant, sample)

Zygosity of the given sample at the given variant.

Multi-allelic aware: at a site split into multiple Variants, each asks "does this sample carry this alt?".

Source code in varcode/variant_collection.py
def zygosity(self, variant, sample):
    """Zygosity of the given sample at the given variant.

    Multi-allelic aware: at a site split into multiple Variants,
    each asks "does this sample carry *this* alt?".
    """
    gt = self.genotype(variant, sample)
    if gt is None:
        return Zygosity.MISSING
    return gt.zygosity_for_alt(self._alt_index_for(variant))

for_sample(sample)

Return a VariantCollection restricted to variants where sample carries the alt (heterozygous or homozygous). Useful for multi-sample VCFs where not every row is called in every sample.

Source code in varcode/variant_collection.py
def for_sample(self, sample):
    """Return a VariantCollection restricted to variants where
    ``sample`` carries the alt (heterozygous or homozygous). Useful
    for multi-sample VCFs where not every row is called in every
    sample.
    """
    return self._filter_by_zygosity(
        sample,
        keep=lambda z: z in (Zygosity.HETEROZYGOUS, Zygosity.HOMOZYGOUS),
    )

heterozygous_in(sample)

Variants where sample is heterozygous for this variant's alt.

Source code in varcode/variant_collection.py
def heterozygous_in(self, sample):
    """Variants where ``sample`` is heterozygous for this variant's alt."""
    return self._filter_by_zygosity(
        sample,
        keep=lambda z: z is Zygosity.HETEROZYGOUS,
    )

homozygous_alt_in(sample)

Variants where sample is homozygous for this variant's alt.

Source code in varcode/variant_collection.py
def homozygous_alt_in(self, sample):
    """Variants where ``sample`` is homozygous for this variant's alt."""
    return self._filter_by_zygosity(
        sample,
        keep=lambda z: z is Zygosity.HOMOZYGOUS,
    )

varcode.StructuralVariant

varcode.StructuralVariant(contig: str, start: int, sv_type: str, end: Optional[int] = None, alt: Optional[str] = None, ref: str = 'N', mate_contig: Optional[str] = None, mate_start: Optional[int] = None, mate_orientation: Optional[str] = None, ci_start: Optional[Tuple[int, int]] = None, ci_end: Optional[Tuple[int, int]] = None, alt_assembly: Optional[str] = None, info: Optional[Mapping[str, Any]] = None, genome=None, ensembl=None, normalize_contig_names: bool = True, convert_ucsc_contig_names=None)

Bases: Variant

A structural variant — deletion, duplication, inversion, insertion, CNV, or breakend — too large or too complex to represent as a simple ref/alt nucleotide pair.

Subclasses :class:Variant so isinstance(v, Variant) still works; downstream code that handles variant kinds generically (effect collections, serialization) sees a :class:Variant and the shared contract still applies. The SV-specific fields (:attr:sv_type, :attr:end, breakend mate fields) live here and are consulted by SV-aware code.

The SV position model:

  • :attr:start — 1-based start of the affected region (matches VCF POS).
  • :attr:end — 1-based inclusive end. For a DEL/DUP/INV/CNV this is the SV endpoint on the same contig. For an INS it equals start (insertions are zero-width in reference coords). For a BND, end == start and the other breakpoint lives in :attr:mate_contig / :attr:mate_start.
PARAMETER DESCRIPTION
contig

Chromosome of the (first) breakpoint.

TYPE: str

start

1-based start position.

TYPE: int

sv_type

One of :data:SV_TYPES.

TYPE: str

end

1-based inclusive end position. Defaults to start for zero-width SVs (INS, BND).

TYPE: int DEFAULT: None

alt

Original ALT field from the VCF — <DEL>, <INS:ME:ALU>, G]17:198982], etc. Kept so round-trip to VCF is possible. Defaults to "<{sv_type}>".

TYPE: str DEFAULT: None

ref

Original REF base (usually one nucleotide, the anchor). Defaults to "N".

TYPE: str DEFAULT: 'N'

mate_contig

For BND: the mate breakpoint's chromosome.

TYPE: str DEFAULT: None

mate_start

For BND: the mate breakpoint's position.

TYPE: int DEFAULT: None

mate_orientation

For BND: one of "[[", "[]", "][, "]]" encoding the VCF 4.1 breakend strand + direction shorthand (first bracket = preceding; second = following). See VCF §5.4 for the full grammar.

TYPE: str DEFAULT: None

ci_start

Confidence interval around start (VCF CIPOS).

TYPE: (int, int) DEFAULT: None

ci_end

Confidence interval around end (VCF CIEND).

TYPE: (int, int) DEFAULT: None

alt_assembly

Caller-supplied assembled sequence of the rearranged allele. Hook for long-read / targeted-assembly pipelines. The SV annotator can prefer this over inferring from breakpoints.

TYPE: str DEFAULT: None

info

Open-ended bag for extra VCF INFO fields the core class doesn't model (HOMLEN, SVMETHOD, MATEID, etc.). Kept as a Mapping so callers can pass whatever shape their caller produces.

TYPE: Mapping[str, Any] DEFAULT: None

genome

Same meaning as :class:Variant.

DEFAULT: None

ensembl

Same meaning as :class:Variant.

DEFAULT: None

normalize_contig_names

Same meaning as :class:Variant.

DEFAULT: None

convert_ucsc_contig_names

Same meaning as :class:Variant.

DEFAULT: None

Source code in varcode/structural_variant.py
def __init__(
        self,
        contig: str,
        start: int,
        sv_type: str,
        end: Optional[int] = None,
        alt: Optional[str] = None,
        ref: str = "N",
        mate_contig: Optional[str] = None,
        mate_start: Optional[int] = None,
        mate_orientation: Optional[str] = None,
        ci_start: Optional[Tuple[int, int]] = None,
        ci_end: Optional[Tuple[int, int]] = None,
        alt_assembly: Optional[str] = None,
        info: Optional[Mapping[str, Any]] = None,
        genome=None,
        ensembl=None,
        normalize_contig_names: bool = True,
        convert_ucsc_contig_names=None):
    if sv_type not in SV_TYPES:
        raise ValueError(
            "Unknown sv_type %r (expected one of %s)"
            % (sv_type, sorted(SV_TYPES)))
    if end is None:
        end = start

    # Initialize the base Variant with a placeholder ref/alt so the
    # nucleotide-normalization path doesn't reject <DEL>-style
    # symbolic alleles. The original symbolic ALT is preserved in
    # ``_sv_alt``; :attr:`alt` returns it via a property override.
    Variant.__init__(
        self,
        contig=contig,
        start=start,
        ref=ref if ref else "N",
        alt="A" if ref == "N" else "A",  # placeholder, overridden below
        genome=genome,
        ensembl=ensembl,
        allow_extended_nucleotides=True,
        normalize_contig_names=normalize_contig_names,
        convert_ucsc_contig_names=convert_ucsc_contig_names)

    # Override end position — base Variant ignores end for SNVs
    # but we need it as an explicit SV endpoint.
    self.end = int(end)

    # SV-specific fields.
    self.sv_type = sv_type
    self.mate_contig = mate_contig
    self.mate_start = int(mate_start) if mate_start is not None else None
    self.mate_orientation = mate_orientation
    self.ci_start = tuple(ci_start) if ci_start is not None else None
    self.ci_end = tuple(ci_end) if ci_end is not None else None
    self.alt_assembly = alt_assembly
    self.info = dict(info) if info is not None else {}

    # Preserve the original symbolic ALT string so round-tripping
    # and downstream consumers see what the VCF said.
    self._sv_alt = alt if alt is not None else "<%s>" % sv_type

symbolic_alt: str property

The original symbolic / breakend ALT string from the VCF.

length: Optional[int] property

Length of the SV span in reference coordinates, when well-defined. None for breakends (the span depends on the mate, which may be on another contig).

varcode.parse_symbolic_alt

varcode.parse_symbolic_alt(contig: str, start: int, ref: str, alt: str, info=None, genome=None) -> Optional[StructuralVariant]

Parse a single symbolic or breakend ALT into a :class:StructuralVariant. Returns None if the ALT is not symbolic (the caller keeps handling it as a simple variant).

info is an optional mapping (e.g. a pyvcf INFO dict) that may carry END, SVTYPE, CIPOS, CIEND, MATEID, etc. The parser reads those when present but doesn't require them — the ALT shape alone is enough to distinguish symbolic from breakend from inline.

Source code in varcode/sv_allele_parser.py
def parse_symbolic_alt(
        contig: str,
        start: int,
        ref: str,
        alt: str,
        info=None,
        genome=None) -> Optional[StructuralVariant]:
    """Parse a single symbolic or breakend ALT into a
    :class:`StructuralVariant`. Returns ``None`` if the ALT is not
    symbolic (the caller keeps handling it as a simple variant).

    ``info`` is an optional mapping (e.g. a ``pyvcf`` INFO dict) that
    may carry ``END``, ``SVTYPE``, ``CIPOS``, ``CIEND``, ``MATEID``,
    etc. The parser reads those when present but doesn't require
    them — the ALT shape alone is enough to distinguish symbolic
    from breakend from inline.
    """
    if not alt:
        return None

    # Symbolic allele: <DEL>, <DUP>, <INS:ME:ALU>, <CN0>, etc.
    m = _SYMBOLIC_RE.match(alt)
    if m:
        token = m.group(1).upper()
        # Top-level type is the first colon-delimited token; e.g.
        # ``INS:ME:ALU`` → ``INS``. The rest stays in ``info`` under
        # the custom ``symbolic_subtype`` key.
        top, _, subtype = token.partition(":")

        # Copy-number alleles: <CN0>, <CN1>, <CN2>, ..., <CNV>. We collapse
        # to sv_type="CNV" but preserve the integer count separately so
        # downstream code can distinguish a deletion (CN0 / CN1 in a
        # diploid) from a duplication (CN3+) without re-parsing the ALT.
        copy_number = None
        cn_match = _CN_TOKEN_RE.match(top)
        if cn_match:
            cn_digits = cn_match.group("n")
            if cn_digits:
                copy_number = int(cn_digits)
            top = "CNV"

        if top not in SV_TYPES:
            # Unknown symbolic — keep the raw token as subtype, fall
            # back to BND so the caller still gets a usable object.
            subtype = token
            top = "BND"

        end = _extract_info(info, "END") or start
        svtype_from_info = _extract_info(info, "SVTYPE")
        ci_start = _extract_info(info, "CIPOS")
        ci_end = _extract_info(info, "CIEND")

        # INFO/SVTYPE overrides when present and recognized.
        if svtype_from_info and svtype_from_info.upper() in SV_TYPES:
            top = svtype_from_info.upper()

        # Inserted-sequence hint for INS rows. Long-read callers ship
        # the assembled inserted sequence as INSSEQ (Manta) or
        # SVINSSEQ (some Sniffles/PBSV variants). Both are accepted;
        # the resolved sequence flows through StructuralVariant's
        # ``alt_assembly`` slot, where the SV annotator already prefers
        # it over inferring from breakpoint coordinates alone.
        alt_assembly = None
        if top == "INS":
            alt_assembly = (
                _extract_info_scalar(info, "INSSEQ")
                or _extract_info_scalar(info, "SVINSSEQ"))

        sv_info = {}
        if subtype:
            sv_info["symbolic_subtype"] = subtype
        if copy_number is not None:
            sv_info["copy_number"] = copy_number

        return StructuralVariant(
            contig=contig,
            start=int(start),
            end=int(end),
            sv_type=top,
            alt=alt,
            ref=ref or "N",
            alt_assembly=alt_assembly,
            ci_start=tuple(ci_start) if ci_start else None,
            ci_end=tuple(ci_end) if ci_end else None,
            info=sv_info,
            genome=genome,
        )

    # Breakend: t[CHROM:POS[ etc. Four orientation shapes per §5.4.
    m = _BREAKEND_RE.match(alt)
    if m:
        prefix = m.group("prefix")
        suffix = m.group("suffix")
        open_br = m.group("open")
        close_br = m.group("close")
        mate_contig = m.group("mate_contig")
        mate_pos = int(m.group("mate_pos"))
        # Two-letter orientation token captures which side the REF
        # base is on (prefix = joined-after, suffix = joined-before)
        # and the bracket direction (mate strand).
        orientation = open_br + close_br
        return StructuralVariant(
            contig=contig,
            start=int(start),
            end=int(start),
            sv_type="BND",
            alt=alt,
            ref=ref or "N",
            mate_contig=mate_contig,
            mate_start=mate_pos,
            mate_orientation=orientation,
            info={"mateid": (_extract_info(info, "MATEID")
                             or _extract_info(info, "PARID")),
                  "bnd_anchor": prefix or suffix},
            genome=genome,
        )

    # Spanning-deletion placeholder. VCF 4.2+ uses ``*`` to mean "this
    # position is covered by a deletion recorded on another row".
    # We drop these like before — there's no SV object to construct.
    return None

varcode.SV_TYPES

varcode.SV_TYPES = frozenset({'DEL', 'DUP', 'INV', 'INS', 'CNV', 'BND'}) module-attribute

Genotypes

varcode.Genotype

varcode.Genotype(raw_gt: str, alleles: Tuple[Optional[int], ...], phased: bool = False, phase_set: Optional[int] = None, allele_depths: Optional[Tuple[int, ...]] = None, total_depth: Optional[int] = None, genotype_quality: Optional[int] = None) dataclass

Bases: DataclassSerializable

One sample's genotype at one variant locus.

The alleles tuple encodes the observed alleles using VCF GT semantics: 0 is the reference allele, 1 is the first ALT listed on the VCF row, 2 is the second, and so on. None indicates a no-call on that haplotype.

For varcode's variant-level API, note that Variant.alt is a specific alt (a multi-allelic VCF row is split into one Variant per alt). When querying zygosity relative to a Variant, use the variant's alt_allele_index from the collection's metadata and add 1 to get the GT-encoded index, then call :meth:zygosity_for_alt or :meth:carries_alt.

is_called: bool property

True if at least one allele is non-None.

ploidy: int property

Number of alleles in the call (including missing).

from_sample_info(sample_info) classmethod

Build a Genotype from pyvcf's call.data._asdict() output.

Handles the keys varcode normally sees: GT, AD, DP, GQ, PS. Missing keys default to None.

Source code in varcode/genotype.py
@classmethod
def from_sample_info(cls, sample_info):
    """Build a Genotype from pyvcf's ``call.data._asdict()`` output.

    Handles the keys varcode normally sees: ``GT``, ``AD``, ``DP``,
    ``GQ``, ``PS``. Missing keys default to ``None``.
    """
    if sample_info is None:
        return cls(raw_gt="./.", alleles=(None, None), phased=False)
    gt_str = sample_info.get("GT")
    if gt_str is None:
        gt_str = "./."
    alleles, phased = parse_gt_string(gt_str)
    ad = sample_info.get("AD")
    return cls(
        raw_gt=gt_str,
        alleles=alleles,
        phased=phased,
        phase_set=sample_info.get("PS"),
        allele_depths=tuple(ad) if ad is not None else None,
        total_depth=sample_info.get("DP"),
        genotype_quality=sample_info.get("GQ"),
    )

carries_alt(alt_index: int) -> bool

True if this sample's genotype contains the given alt.

alt_index uses VCF GT encoding: 1 is the first alt on the row, 2 is the second, etc. (i.e. one more than alt_allele_index from the VariantCollection metadata).

Source code in varcode/genotype.py
def carries_alt(self, alt_index: int) -> bool:
    """True if this sample's genotype contains the given alt.

    ``alt_index`` uses VCF GT encoding: ``1`` is the first alt on
    the row, ``2`` is the second, etc. (i.e. one more than
    ``alt_allele_index`` from the VariantCollection metadata).
    """
    return any(
        a == alt_index
        for a in self.alleles
        if a is not None
    )

copies_of_alt(alt_index: int) -> int

Number of haplotypes carrying the given alt.

Source code in varcode/genotype.py
def copies_of_alt(self, alt_index: int) -> int:
    """Number of haplotypes carrying the given alt."""
    return sum(
        1 for a in self.alleles
        if a is not None and a == alt_index
    )

zygosity_for_alt(alt_index: int) -> Zygosity

Classify the sample's zygosity relative to one alt allele.

Multi-allelic aware: GT=1/2 queried for alt 1 returns HETEROZYGOUS (one copy of this alt, one of a different alt); queried for alt 3 it returns ABSENT.

Source code in varcode/genotype.py
def zygosity_for_alt(self, alt_index: int) -> Zygosity:
    """Classify the sample's zygosity relative to one alt allele.

    Multi-allelic aware: ``GT=1/2`` queried for alt ``1`` returns
    ``HETEROZYGOUS`` (one copy of this alt, one of a different
    alt); queried for alt ``3`` it returns ``ABSENT``.
    """
    called = [a for a in self.alleles if a is not None]
    if len(called) == 0:
        return Zygosity.MISSING
    n_copies = sum(1 for a in called if a == alt_index)
    if n_copies == 0:
        return Zygosity.ABSENT
    if n_copies == len(called):
        return Zygosity.HOMOZYGOUS
    return Zygosity.HETEROZYGOUS

depth_for_alt(alt_index: int) -> Optional[int]

Per-allele read depth for a given alt, from the AD field.

AD is indexed with ref at position 0 and alt #1 at position 1, etc., so alt_index should use GT encoding (1 = first alt).

Source code in varcode/genotype.py
def depth_for_alt(self, alt_index: int) -> Optional[int]:
    """Per-allele read depth for a given alt, from the ``AD`` field.

    ``AD`` is indexed with ref at position 0 and alt #1 at
    position 1, etc., so ``alt_index`` should use GT encoding
    (``1`` = first alt).
    """
    if self.allele_depths is None:
        return None
    if alt_index >= len(self.allele_depths):
        return None
    return self.allele_depths[alt_index]

varcode.Zygosity

varcode.Zygosity

Bases: Enum

Zygosity of a sample's genotype relative to a specific alt allele.

ABSENT is distinct from MISSING: ABSENT means the call exists but doesn't include the alt in question (e.g. the sample is ref-ref, or carries a different alt at a multi-allelic site). MISSING means the call itself is ./. or the sample wasn't called.

Effects

varcode.MutationEffect

varcode.MutationEffect(variant)

Bases: Serializable

Base class for mutation effects.

Source code in varcode/effects/effect_classes.py
def __init__(self, variant):
    self.variant = variant

short_description property

A short but human-readable description of the effect. Defaults to class name for most of the non-coding effects, but is more informative for coding ones.

original_protein_sequence property

Amino acid sequence of a coding transcript (without the nucleotide variant/mutation)

__lt__(other)

Effects are ordered by their associated variants, which have comparison implement in terms of their chromosomal locations.

Source code in varcode/effects/effect_classes.py
def __lt__(self, other):
    """
    Effects are ordered by their associated variants, which have
    comparison implement in terms of their chromosomal locations.
    """
    return self.variant < other.variant

varcode.NonsilentCodingMutation

varcode.NonsilentCodingMutation(variant, transcript, aa_mutation_start_offset, aa_mutation_end_offset, aa_ref)

Bases: CodingMutation

All coding mutations other than silent codon substitutions

variant : Variant

transcript : Transcript

aa_mutation_start_offset : int Offset of first modified amino acid in protein (starting from 0)

aa_mutation_end_offset : int Offset after last mutated amino acid (half-open coordinates)

aa_ref : str Amino acid string of what used to be at aa_mutation_start_offset in the wildtype (unmutated) protein.

Source code in varcode/effects/effect_classes.py
def __init__(
        self,
        variant,
        transcript,
        aa_mutation_start_offset,
        aa_mutation_end_offset,
        aa_ref):
    """
    variant : Variant

    transcript : Transcript

    aa_mutation_start_offset : int
        Offset of first modified amino acid in protein (starting from 0)

    aa_mutation_end_offset : int
        Offset after last mutated amino acid (half-open coordinates)

    aa_ref : str
        Amino acid string of what used to be at aa_mutation_start_offset
        in the wildtype (unmutated) protein.
    """
    CodingMutation.__init__(
        self,
        variant=variant,
        transcript=transcript)
    self.aa_mutation_start_offset = aa_mutation_start_offset
    self.aa_mutation_end_offset = aa_mutation_end_offset
    self.aa_ref = bio_seq_to_str(aa_ref)

varcode.MultiOutcomeEffect

varcode.MultiOutcomeEffect(variant)

Bases: MutationEffect

Marker base class for effects that represent a set of plausible outcomes rather than a single deterministic effect.

Subclasses must expose:

  • candidates — sequence of :class:MutationEffect instances, sorted most-plausible-first. (Kept for back-compat with 2.x callers.)
  • most_likely — the top candidate (i.e. candidates[0]).
  • priority_class — effect class whose priority this set adopts (read by :func:varcode.effects.effect_priority).

Harmonized interface (#299): new code should read :attr:outcomes instead of candidates. Each entry is an :class:~varcode.outcomes.Outcome carrying the effect plus provenance (probability, source, evidence dict). The default implementation wraps candidates with source="varcode" and no probability — external scorers (SpliceAI, Pangolin), RNA-evidence callers (Isovar), and long-read assembly tools override to attach their own scores without subclassing.

Downstream consumers filter for multi-outcome results with isinstance(effect, MultiOutcomeEffect), so new wrappers (RNA evidence #259, germline-aware #268, SV-at-breakpoint) implement the same protocol without downstream code churn.

Source code in varcode/effects/effect_classes.py
def __init__(self, variant):
    self.variant = variant

outcomes property

Tuple of :class:~varcode.outcomes.Outcome objects, most-plausible-first. Default implementation wraps :attr:candidates under source="varcode"; subclasses (or external integrations) override to attach probabilities and evidence.

External integrations (RNA evidence, SpliceAI scoring, etc.) attach extra outcomes via :meth:_with_extra_outcomes — subclasses overriding this property must call that helper on their derived tuple so the plug-in path remains uniform.

varcode.EffectCollection

varcode.EffectCollection(effects, distinct=False, sort_key=None, sources=set([]), annotator=None, annotator_version=None, annotated_at=None)

Bases: Collection

Collection of MutationEffect objects and helpers for grouping or filtering them.

PARAMETER DESCRIPTION
effects

Collection of any class which is compatible with the sort key

TYPE: list

distinct

Only keep distinct entries or allow duplicates.

TYPE: bool DEFAULT: False

sort_key

Function which maps each element to a sorting criterion. If None (the default), effects are sorted by priority with the most severe effects first. Pass an explicit sort_key to override this behaviour, or False to disable sorting.

TYPE: fn DEFAULT: None

sources

Set of files from which this collection was generated.

TYPE: set DEFAULT: set([])

annotator

Name of the :class:EffectAnnotator that produced these effects. Populated automatically by :func:predict_variant_effects; None for collections built by hand. See openvax/varcode#271.

TYPE: str or None DEFAULT: None

annotator_version

Version string of the annotator (typically the varcode version for built-in annotators). None when annotator is None.

TYPE: str or None DEFAULT: None

annotated_at

ISO-8601 UTC timestamp recording when the annotation ran. Populated by :func:predict_variant_effects.

TYPE: str or None DEFAULT: None

Source code in varcode/effects/effect_collection.py
def __init__(
        self,
        effects,
        distinct=False,
        sort_key=None,
        sources=set([]),
        annotator=None,
        annotator_version=None,
        annotated_at=None):
    """
    Parameters
    ----------
    effects : list
        Collection of any class which  is compatible with the sort key


    distinct : bool
        Only keep distinct entries or allow duplicates.

    sort_key : fn
        Function which maps each element to a sorting criterion.
        If None (the default), effects are sorted by priority with
        the most severe effects first. Pass an explicit sort_key to
        override this behaviour, or `False` to disable sorting.

    sources : set
        Set of files from which this collection was generated.

    annotator : str or None
        Name of the :class:`EffectAnnotator` that produced these
        effects. Populated automatically by
        :func:`predict_variant_effects`; ``None`` for collections
        built by hand. See openvax/varcode#271.

    annotator_version : str or None
        Version string of the annotator (typically the varcode
        version for built-in annotators). ``None`` when
        ``annotator`` is None.

    annotated_at : str or None
        ISO-8601 UTC timestamp recording when the annotation ran.
        Populated by :func:`predict_variant_effects`.
    """
    if sort_key is None:
        sort_key = _default_effect_sort_key
    elif sort_key is False:
        sort_key = None
    Collection.__init__(
        self,
        elements=effects,
        distinct=distinct,
        sort_key=sort_key,
        sources=sources)
    # Keep self.effects in sync with the Collection's post-sort
    # elements so that iterating and reading `.effects` produce
    # the same order.  See openvax/varcode#220.
    self.effects = self.elements
    self.annotator = annotator
    self.annotator_version = annotator_version
    self.annotated_at = annotated_at

gene_counts()

Returns number of elements overlapping each gene name. Expects the derived class (VariantCollection or EffectCollection) to have an implementation of groupby_gene_name.

Source code in varcode/effects/effect_collection.py
def gene_counts(self):
    """
    Returns number of elements overlapping each gene name. Expects the
    derived class (VariantCollection or EffectCollection) to have
    an implementation of groupby_gene_name.
    """
    return {
        gene_name: len(group)
        for (gene_name, group)
        in self.groupby_gene_name().items()
    }

filter_by_transcript_expression(transcript_expression_dict, min_expression_value=0.0)

Filters effects to those which have an associated transcript whose expression value in the transcript_expression_dict argument is greater than min_expression_value.

PARAMETER DESCRIPTION
transcript_expression_dict

Dictionary mapping Ensembl transcript IDs to expression estimates (either FPKM or TPM)

TYPE: dict

min_expression_value

Threshold above which we'll keep an effect in the result collection

TYPE: float DEFAULT: 0.0

Source code in varcode/effects/effect_collection.py
def filter_by_transcript_expression(
        self,
        transcript_expression_dict,
        min_expression_value=0.0):
    """
    Filters effects to those which have an associated transcript whose
    expression value in the transcript_expression_dict argument is greater
    than min_expression_value.

    Parameters
    ----------
    transcript_expression_dict : dict
        Dictionary mapping Ensembl transcript IDs to expression estimates
        (either FPKM or TPM)

    min_expression_value : float
        Threshold above which we'll keep an effect in the result collection
    """
    return self.filter_above_threshold(
        key_fn=lambda effect: effect.transcript_id,
        value_dict=transcript_expression_dict,
        threshold=min_expression_value)

filter_by_gene_expression(gene_expression_dict, min_expression_value=0.0)

Filters effects to those which have an associated gene whose expression value in the gene_expression_dict argument is greater than min_expression_value.

PARAMETER DESCRIPTION
gene_expression_dict

Dictionary mapping Ensembl gene IDs to expression estimates (either FPKM or TPM)

TYPE: dict

min_expression_value

Threshold above which we'll keep an effect in the result collection

TYPE: float DEFAULT: 0.0

Source code in varcode/effects/effect_collection.py
def filter_by_gene_expression(
        self,
        gene_expression_dict,
        min_expression_value=0.0):
    """
    Filters effects to those which have an associated gene whose
    expression value in the gene_expression_dict argument is greater
    than min_expression_value.

    Parameters
    ----------
    gene_expression_dict : dict
        Dictionary mapping Ensembl gene IDs to expression estimates
        (either FPKM or TPM)

    min_expression_value : float
        Threshold above which we'll keep an effect in the result collection
    """
    return self.filter_above_threshold(
        key_fn=lambda effect: effect.gene_id,
        value_dict=gene_expression_dict,
        threshold=min_expression_value)

filter_by_effect_priority(min_priority_class)

Create a new EffectCollection containing only effects whose priority falls below the given class.

Source code in varcode/effects/effect_collection.py
def filter_by_effect_priority(self, min_priority_class):
    """
    Create a new EffectCollection containing only effects whose priority
    falls below the given class.
    """
    min_priority = transcript_effect_priority_dict[min_priority_class]
    return self.filter(
        lambda effect: effect_priority(effect) >= min_priority)

drop_silent_and_noncoding()

Create a new EffectCollection containing only non-silent coding effects

Source code in varcode/effects/effect_collection.py
def drop_silent_and_noncoding(self):
    """
    Create a new EffectCollection containing only non-silent coding effects
    """
    return self.filter(lambda effect: effect.modifies_protein_sequence)

detailed_string()

Create a long string with all transcript effects for each mutation, grouped by gene (if a mutation affects multiple genes).

Source code in varcode/effects/effect_collection.py
def detailed_string(self):
    """
    Create a long string with all transcript effects for each mutation,
    grouped by gene (if a mutation affects multiple genes).
    """
    lines = []
    # TODO: annoying to always write `groupby_result.items()`,
    # consider makings a GroupBy class which iterates over pairs
    # and also common helper methods like `map_values`.
    for variant, variant_effects in self.groupby_variant().items():
        lines.append("\n%s" % variant)

        gene_effects_groups = variant_effects.groupby_gene_id()
        for (gene_id, gene_effects) in gene_effects_groups.items():
            if gene_id:
                gene_name = variant.ensembl.gene_name_of_gene_id(gene_id)
                lines.append("  Gene: %s (%s)" % (gene_name, gene_id))
            # place transcript effects with more significant impact
            # on top (e.g. FrameShift should go before NoncodingTranscript)
            for effect in sorted(
                    gene_effects,
                    key=effect_priority,
                    reverse=True):
                lines.append("  -- %s" % effect)

        # if we only printed one effect for this gene then
        # it's redundant to print it again as the highest priority effect
        if len(variant_effects) > 1:
            best = variant_effects.top_priority_effect()
            lines.append("  Highest Priority Effect: %s" % best)
    return "\n".join(lines)

top_priority_effect()

Highest priority MutationEffect of all genes/transcripts overlapped by this variant. If this variant doesn't overlap anything, then this this method will return an Intergenic effect.

If multiple effects have the same priority, then return the one which is associated with the longest transcript.

Source code in varcode/effects/effect_collection.py
def top_priority_effect(self):
    """Highest priority MutationEffect of all genes/transcripts overlapped
    by this variant. If this variant doesn't overlap anything, then this
    this method will return an Intergenic effect.

    If multiple effects have the same priority, then return the one
    which is associated with the longest transcript.
    """
    return top_priority_effect(self.elements)

top_priority_effect_per_variant()

Highest priority effect for each unique variant

Source code in varcode/effects/effect_collection.py
def top_priority_effect_per_variant(self):
    """Highest priority effect for each unique variant"""
    return OrderedDict(
        (variant, top_priority_effect(variant_effects))
        for (variant, variant_effects)
        in self.groupby_variant().items())

top_priority_effect_per_transcript_id()

Highest priority effect for each unique transcript ID

Source code in varcode/effects/effect_collection.py
def top_priority_effect_per_transcript_id(self):
    """Highest priority effect for each unique transcript ID"""
    return OrderedDict(
        (transcript_id, top_priority_effect(variant_effects))
        for (transcript_id, variant_effects)
        in self.groupby_transcript_id().items())

top_priority_effect_per_gene_id()

Highest priority effect for each unique gene ID

Source code in varcode/effects/effect_collection.py
def top_priority_effect_per_gene_id(self):
    """Highest priority effect for each unique gene ID"""
    return OrderedDict(
        (gene_id, top_priority_effect(variant_effects))
        for (gene_id, variant_effects)
        in self.groupby_gene_id().items())

effect_expression(expression_levels)

PARAMETER DESCRIPTION
expression_levels

Dictionary mapping transcript IDs to length-normalized expression levels (either FPKM or TPM)

TYPE: dict

RETURNS DESCRIPTION
OrderedDict

Mapping from each transcript effect to an expression quantity. Effects that don't have an associated transcript (e.g. Intergenic) are excluded.

Source code in varcode/effects/effect_collection.py
def effect_expression(self, expression_levels):
    """
    Parameters
    ----------
    expression_levels : dict
        Dictionary mapping transcript IDs to length-normalized expression
        levels (either FPKM or TPM)

    Returns
    -------
    OrderedDict
        Mapping from each transcript effect to an expression quantity.
        Effects that don't have an associated transcript (e.g. Intergenic)
        are excluded.
    """
    return OrderedDict(
        (effect, expression_levels.get(effect.transcript.id, 0.0))
        for effect in self
        if effect.transcript is not None)

top_expression_effect(expression_levels)

Return effect whose transcript has the highest expression level. If none of the effects are expressed or have associated transcripts, then return None. In case of ties, add lexicographical sorting by effect priority and transcript length.

Source code in varcode/effects/effect_collection.py
def top_expression_effect(self, expression_levels):
    """
    Return effect whose transcript has the highest expression level.
    If none of the effects are expressed or have associated transcripts,
    then return None. In case of ties, add lexicographical sorting by
    effect priority and transcript length.
    """
    effect_expression_dict = self.effect_expression(expression_levels)

    if len(effect_expression_dict) == 0:
        return None

    def key_fn(effect_fpkm_pair):
        """
        Sort effects primarily by their expression level
        and secondarily by the priority logic used in
        `top_priority_effect`.
        """
        (effect, fpkm) = effect_fpkm_pair
        return (fpkm, multi_gene_effect_sort_key(effect))

    return max(effect_expression_dict.items(), key=key_fn)[0]

to_dataframe()

Build a dataframe from the effect collection.

Source code in varcode/effects/effect_collection.py
def to_dataframe(self):
    """Build a dataframe from the effect collection."""
    # list of properties to extract from Variant objects if they're
    # not None
    variant_properties = [
        "contig",
        "start",
        "ref",
        "alt",
        "is_snv",
        "is_transversion",
        "is_transition"
    ]

    def row_from_effect(effect):
        row = OrderedDict()

        row['variant'] = str(effect.variant.short_description)
        for field_name in variant_properties:
            # if effect.variant is None then this column value will be None
            row[field_name] = getattr(effect.variant, field_name, None)
        row['gene_id'] = effect.gene_id
        row['gene_name'] = effect.gene_name
        row['transcript_id'] = effect.transcript_id
        row['transcript_name'] = effect.transcript_name

        row['effect_type'] = effect.__class__.__name__
        row['effect'] = effect.short_description
        return row
    # Always emit the same column set even for empty collections so
    # CSV round-trip doesn't have to special-case len == 0.
    return pd.DataFrame.from_records(
        [row_from_effect(effect) for effect in self],
        columns=self._DATAFRAME_COLUMNS,
    )

to_csv(path, include_header=True)

Write this collection to CSV.

PARAMETER DESCRIPTION
path

Output path.

TYPE: str

include_header

If True (default), prepend # key=value metadata lines with varcode version and reference genome so the file can be read back via from_csv without supplying a genome explicitly. Pass include_header=False for fast consumers that don't tolerate comment lines.

TYPE: bool DEFAULT: True

Source code in varcode/effects/effect_collection.py
def to_csv(self, path, include_header=True):
    """Write this collection to CSV.

    Parameters
    ----------
    path : str
        Output path.

    include_header : bool
        If True (default), prepend ``# key=value`` metadata lines
        with varcode version and reference genome so the file can
        be read back via ``from_csv`` without supplying a genome
        explicitly. Pass ``include_header=False`` for fast
        consumers that don't tolerate comment lines.
    """
    df = self.to_dataframe()
    if not include_header:
        df.to_csv(path, index=False)
        return

    metadata = OrderedDict()
    metadata["varcode_version"] = _varcode_version
    metadata["reference_name"] = self._serialized_reference_name()
    # Annotator provenance (#271 stage 3b). Each field is skipped
    # when None by write_metadata_header.
    metadata["annotator"] = self.annotator
    metadata["annotator_version"] = self.annotator_version
    metadata["annotated_at"] = self.annotated_at
    with open(path, "w") as f:
        write_metadata_header(f, metadata)
        df.to_csv(f, index=False)

from_csv(path, genome=None) classmethod

Rebuild an EffectCollection from a CSV previously written by EffectCollection.to_csv().

The current CSV format records (contig, start, ref, alt, transcript_id) but not enough per-effect state to reconstruct effects byte-for-byte. This method takes the pragmatic semantic round-trip path: rebuild each Variant, re-annotate against the recorded transcript, and emit the resulting effect. The resulting collection should match the original whenever annotation is deterministic for a given (variant, transcript) pair.

Prefer from_json for byte-for-byte round-trip or for larger collections (≳10k effects); per-row re-annotation makes CSV loading significantly slower than JSON. Emits a warning when the CSV header reports a different major varcode version than the one currently installed — annotation logic can change across major versions and the reconstructed effects may differ from the ones that were written.

PARAMETER DESCRIPTION
path

Path to the CSV file. Lines starting with '#' are treated as comments and parsed as key=value metadata.

TYPE: str

genome

Reference genome to associate with the loaded variants and to look up transcripts by ID. If None, the reference is read from the CSV's metadata header (# reference_name=...). If neither is available, raises ValueError.

TYPE: pyensembl.Genome, str, int, or None DEFAULT: None

RETURNS DESCRIPTION
EffectCollection
Source code in varcode/effects/effect_collection.py
@classmethod
def from_csv(cls, path, genome=None):
    """Rebuild an EffectCollection from a CSV previously written by
    ``EffectCollection.to_csv()``.

    The current CSV format records (contig, start, ref, alt,
    transcript_id) but not enough per-effect state to reconstruct
    effects byte-for-byte. This method takes the pragmatic semantic
    round-trip path: rebuild each Variant, re-annotate against the
    recorded transcript, and emit the resulting effect. The
    resulting collection should match the original whenever
    annotation is deterministic for a given (variant, transcript)
    pair.

    **Prefer ``from_json`` for byte-for-byte round-trip** or for
    larger collections (≳10k effects); per-row re-annotation makes
    CSV loading significantly slower than JSON. Emits a warning
    when the CSV header reports a different *major* varcode version
    than the one currently installed — annotation logic can change
    across major versions and the reconstructed effects may differ
    from the ones that were written.

    Parameters
    ----------
    path : str
        Path to the CSV file. Lines starting with '#' are treated as
        comments and parsed as ``key=value`` metadata.

    genome : pyensembl.Genome, str, int, or None
        Reference genome to associate with the loaded variants and
        to look up transcripts by ID. If ``None``, the reference is
        read from the CSV's metadata header
        (``# reference_name=...``). If neither is available, raises
        ``ValueError``.

    Returns
    -------
    EffectCollection
    """
    # Import here to avoid a circular import at module load time.
    import warnings
    from ..variant import Variant

    header = read_metadata_header(path)
    warn_on_version_drift(header, _varcode_version, path)
    if genome is None:
        genome = header.get("reference_name")
    if genome is None:
        raise ValueError(
            "from_csv needs a reference genome: pass the `genome` "
            "argument explicitly, or write the CSV with "
            "`to_csv(include_header=True)` so `# reference_name=...` "
            "is recorded in the header. Neither was found at %s." % path)

    # Accept either "contig" or "chr" as the contig column so
    # CSVs are interchangeable between VariantCollection and
    # EffectCollection (openvax/varcode#274). Declaring both in
    # dtype is a no-op for whichever column is absent.
    df = pd.read_csv(
        path, comment="#", dtype={"chr": str, "contig": str})
    contig_col = resolve_contig_column(df.columns)
    if contig_col is None:
        raise ValueError(
            "CSV at %s is missing a contig column: expected one of %s."
            % (path, list(CONTIG_COLUMN_ALIASES)))
    required = {"start", "ref", "alt", "transcript_id", "effect_type"}
    missing = required - set(df.columns)
    if missing:
        raise ValueError(
            "CSV at %s is missing required columns: %s" % (
                path, sorted(missing)))

    # Extract columns by name and coerce types up front; zip is
    # robust against column reordering and extra columns, unlike
    # itertuples which depends on attribute access to
    # valid-identifier column names in fixed positions.
    contigs = df[contig_col].astype(str)
    starts = df["start"].astype(int)
    refs = df["ref"].fillna("")
    alts = df["alt"].fillna("")
    transcript_ids = df["transcript_id"]
    effect_types = df["effect_type"]

    effects = []
    resolved_genome = None
    for contig, start, ref, alt, transcript_id, effect_type in zip(
            contigs, starts, refs, alts, transcript_ids, effect_types):
        variant = Variant(
            contig=contig,
            start=start,
            ref=ref,
            alt=alt,
            genome=genome,
        )
        # Cache the resolved pyensembl.Genome from the first variant
        # so subsequent transcript lookups don't pay the inference cost.
        if resolved_genome is None:
            resolved_genome = variant.ensembl
        if pd.isna(transcript_id) or transcript_id == "":
            # Row is for an intergenic / intragenic effect — no
            # transcript context. Rebuild by running the full effect
            # set on the variant and taking the non-transcript match.
            effects_for_variant = variant.effects()
            matched = False
            for e in effects_for_variant:
                if e.transcript is None and e.__class__.__name__ == effect_type:
                    effects.append(e)
                    matched = True
                    break
            if not matched:
                warnings.warn(
                    "Could not recover effect of type %r for variant %s "
                    "with no transcript_id in %s; row dropped from "
                    "reconstructed collection." % (
                        effect_type, variant, path)
                )
            continue
        transcript = resolved_genome.transcript_by_id(str(transcript_id))
        effects.append(variant.effect_on_transcript(transcript))
    return cls(
        effects=effects,
        annotator=header.get("annotator"),
        annotator_version=header.get("annotator_version"),
        annotated_at=header.get("annotated_at"),
    )

varcode.Outcome

varcode.Outcome(effect: Any, probability: Optional[float] = None, source: str = 'varcode', evidence: Mapping[str, Any] = dict(), description: Optional[str] = None) dataclass

Bases: DataclassSerializable

One plausible consequence of a variant.

PARAMETER DESCRIPTION
effect

The effect this outcome represents. Guaranteed to be a :class:~varcode.effects.MutationEffect instance — producers that can't compute a full coding effect use placeholder subclasses (e.g. :class:~varcode.effects.effect_classes.PredictedIntronRetention) so consumers can iterate outcome.effect.short_description and outcome.effect.mutant_protein_sequence uniformly across SV, splice, and point-variant outcomes (#339).

TYPE: MutationEffect

probability

Estimated likelihood this outcome actually happens, in [0, 1]. None means "not scored" — the outcome is in the candidate set but no tool has assigned a probability. Callers that treat None as "unknown" rather than "zero" will be robust to new outcomes added over time.

TYPE: float or None DEFAULT: None

source

Name of the tool or annotator that produced this outcome. Defaults to "varcode" for built-in classifications. External integrations set their own ("spliceai", "isovar", "longread_assembly", etc.) so downstream callers can filter by source.

TYPE: str DEFAULT: 'varcode'

evidence

Open-ended provenance dict. Shape is source-specific; the convention is that keys match the source's native field names (e.g. SpliceAI scores under {"ds_ag": 0.12, "ds_al": ...}, RNA read counts under {"junction_reads": 42}). Consumers that need a particular shape should type-check at the call site rather than rely on a rigid schema here.

TYPE: Mapping[str, Any] DEFAULT: dict()

description

Optional human-readable sentence describing this specific outcome ("Exon 7 is skipped (in-frame, 15 aa removed)"). Distinct from effect.short_description, which is the effect's HGVS-style label. Producers that want a richer narrative attach it here rather than nesting it in evidence. Declared last so positional calls of the form Outcome(effect, probability, source, evidence) continue to route the dict to evidence.

TYPE: str or None DEFAULT: None

short_description: str property

Convenience passthrough to the wrapped effect's short_description. Lets callers build tables of outcomes without unpacking outcome.effect.short_description everywhere.

Priority ordering

varcode.effect_priority(effect)

Returns the integer priority for a given transcript effect.

Effects may opt out of class-based priority lookup by exposing a priority_class attribute — used by wrapper classes like :class:varcode.splice_outcomes.SpliceOutcomeSet to delegate to the wrapped effect's class.

Source code in varcode/effects/effect_ordering.py
def effect_priority(effect):
    """
    Returns the integer priority for a given transcript effect.

    Effects may opt out of class-based priority lookup by exposing a
    ``priority_class`` attribute — used by wrapper classes like
    :class:`varcode.splice_outcomes.SpliceOutcomeSet` to delegate to
    the wrapped effect's class.
    """
    cls = getattr(effect, "priority_class", None) or effect.__class__
    return transcript_effect_priority_dict.get(cls, -1)

varcode.top_priority_effect(effects)

Given a collection of variant transcript effects, return the top priority object. ExonicSpliceSite variants require special treatment since they actually represent two effects -- the splicing modification and whatever else would happen to the exonic sequence if nothing else gets changed. In cases where multiple transcripts give rise to multiple effects, use a variety of filtering and sorting heuristics to pick the canonical transcript.

Source code in varcode/effects/effect_ordering.py
def top_priority_effect(effects):
    """
    Given a collection of variant transcript effects,
    return the top priority object. ExonicSpliceSite variants require special
    treatment since they actually represent two effects -- the splicing modification
    and whatever else would happen to the exonic sequence if nothing else gets
    changed. In cases where multiple transcripts give rise to multiple
    effects, use a variety of filtering and sorting heuristics to pick
    the canonical transcript.
    """
    if len(effects) == 0:
        raise ValueError("List of effects cannot be empty")

    effects = map(
        select_between_exonic_splice_site_and_alternate_effect,
        effects)

    effects_grouped_by_gene = apply_groupby(
        effects, fn=gene_id_of_associated_transcript, skip_none=False)

    if None in effects_grouped_by_gene:
        effects_without_genes = effects_grouped_by_gene.pop(None)
    else:
        effects_without_genes = []

    # if we had any effects associated with genes then choose one of those
    if len(effects_grouped_by_gene) > 0:
        effects_with_genes = [
            top_priority_effect_for_single_gene(gene_effects)
            for gene_effects in effects_grouped_by_gene.values()
        ]
        return max(effects_with_genes, key=multi_gene_effect_sort_key)
    else:
        # if all effects were without genes then choose the best among those
        assert len(effects_without_genes) > 0
        return max(effects_without_genes, key=multi_gene_effect_sort_key)

Mutant transcripts

varcode.MutantTranscript

varcode.MutantTranscript(reference_transcript: Optional[object] = None, edits: Tuple[TranscriptEdit, ...] = tuple(), reference_segments: Optional[Tuple[ReferenceSegment, ...]] = None, cdna_sequence: Optional[str] = None, mutant_protein_sequence: Optional[str] = None, annotator_name: str = 'unknown', evidence: Optional[dict] = None) dataclass

Bases: DataclassSerializable

A reference transcript (or assembled set of reference segments) with zero or more variant-derived edits applied, optionally carrying the mutated cDNA and protein sequences.

Producers (the protein-diff annotator, RNA-evidence importers, the splice-outcomes rewrite, germline-aware annotation, structural-variant annotators) construct this once per (transcript-or-segments, variant-set, context) and hand it to downstream consumers. Each consumer reads the fields it cares about — edits for provenance, cdna_sequence / mutant_protein_sequence for protein-level analysis.

Two shapes:

  • Point-variant shape (reference_transcript is set, reference_segments is None) — the mutant is derived from a single reference transcript by applying zero or more :class:TranscriptEdit objects. This is the shape used by the protein-diff annotator for SNVs, MNVs, and simple indels.

  • Structural-variant shape (reference_segments is set, reference_transcript may be None or the primary / 5'-partner transcript) — the mutant is assembled by concatenating :class:ReferenceSegment slices in order. A gene fusion is two segments from two transcripts; a translocation to intergenic is one transcript segment plus a genomic-interval segment; an inversion is three forward/ reverse/forward segments. edits may still be populated for point-variant edits layered on top of the assembled segments, but typically an SV carries no extra edits.

Sequence fields are Optional[str] because not every producer computes them eagerly. Callers that require the protein check or compute it themselves; the protein-diff annotator guarantees it for point variants.

Forward-looking hooks (not implemented here; documented so new integrations know where to plug in):

  • Personalized / full-genome reference — pass a :class:ReferenceSegment whose source is a patient-specific contig object. varcode's translation logic reads source.sequence; it doesn't care whether that's GRCh38 or a custom assembly.
  • Long-read resolution — when an SV has an :attr:alt_assembly on the :class:StructuralVariant, the SV annotator can wrap that sequence as a single synthetic segment.
  • SV outcomes ambiguity — a translocation producing many candidate ORFs that only RNA can resolve should return List[MutantTranscript] or wrap it in a :class:MultiOutcomeEffect per #299, each with its own evidence dict capturing the disambiguator.

reference_transcript: Optional[object] = None class-attribute instance-attribute

The :class:pyensembl.Transcript this mutant is derived from (point-variant shape), or the primary / 5'-partner transcript (SV shape). None when the SV has no canonical primary transcript (e.g. intergenic-to-intergenic BNDs). Not typed tightly here so :mod:pyensembl isn't a hard import dependency for anyone who just wants the dataclass.

edits: Tuple[TranscriptEdit, ...] = field(default_factory=tuple) class-attribute instance-attribute

Edits applied to produce this mutant, sorted by :attr:TranscriptEdit.cdna_start. Empty tuple means the mutant is identical to the reference (or, for SV shape, the assembled segments carry the rearrangement directly without further point-level edits).

reference_segments: Optional[Tuple[ReferenceSegment, ...]] = None class-attribute instance-attribute

Ordered tuple of :class:ReferenceSegment objects that, when concatenated in order (applying reverse-complement to - strand segments), produce the mutant cDNA. None for the point-variant shape; a fusion's segments would be (5p_partner_segment, 3p_partner_segment). Coordinates are in each segment's own reference system.

cdna_sequence: Optional[str] = None class-attribute instance-attribute

The mutated spliced mRNA, when computed. None if the producer hasn't materialized it yet.

mutant_protein_sequence: Optional[str] = None class-attribute instance-attribute

The translated mutant protein, stopping at the first stop codon. None if not yet translated, or if the edit set doesn't produce a coherent ORF (e.g. start-codon loss). Callers that need a guaranteed-present protein should use the protein-diff annotator once it lands.

annotator_name: str = 'unknown' class-attribute instance-attribute

Name of the :class:EffectAnnotator (or other producer) that created this MutantTranscript. Used as provenance in serialization and for A/B comparisons.

evidence: Optional[dict] = None class-attribute instance-attribute

Optional producer-specific evidence (RNA read counts, Isovar fragment ids, SpliceAI scores, long-read assembly metadata). Shape is annotator-specific and not part of the stable contract; consumers that care about a particular evidence shape should type-check it at the call site.

is_identical_to_reference: bool property

True if no edits were applied AND there are no reference-rearranging segments. Does NOT check cdna_sequence / mutant_protein_sequence — a producer can legitimately carry an identical sequence with zero edits and a single identity segment.

is_structural: bool property

True when this mutant was assembled from :attr:reference_segments (SV shape) rather than applying :attr:edits to a single reference transcript.

total_length_delta: int property

Sum of :attr:TranscriptEdit.length_delta across all edits — how much longer or shorter the mutant cDNA is than the reference (point-variant shape). For SV shape, returns 0; the length of an assembled cDNA is the sum of segment lengths, not a delta against a single reference.

varcode.apply_variant_to_transcript

varcode.apply_variant_to_transcript(variant, transcript)

Construct a :class:MutantTranscript by applying variant to transcript's spliced cDNA.

Returns a :class:MutantTranscript whose cdna_sequence is populated, plus mutant_protein_sequence when the variant lies after the start codon (so translation from the canonical start is well-defined). The codon table is selected from the transcript's contig — mitochondrial transcripts use NCBI table 2 automatically (see :func:varcode.effects.codon_tables.codon_table_for_transcript).

Returns None when the variant can't be cleanly applied:

  • Transcript is not protein-coding or is incomplete.
  • Variant doesn't overlap the transcript at all.
  • Variant spans more than one exon (splice-junction-crossing variants need the splice-aware path; not handled here).
  • Reference allele doesn't match the transcript's cDNA at the computed offset.

Callers that get None should fall back to the fast :class:EffectAnnotator. The forthcoming protein-diff annotator layers effect classification on top of this builder.

Source code in varcode/mutant_transcript.py
def apply_variant_to_transcript(variant, transcript):
    """Construct a :class:`MutantTranscript` by applying ``variant``
    to ``transcript``'s spliced cDNA.

    Returns a :class:`MutantTranscript` whose ``cdna_sequence`` is
    populated, plus ``mutant_protein_sequence`` when the variant
    lies after the start codon (so translation from the canonical
    start is well-defined). The codon table is selected from the
    transcript's contig — mitochondrial transcripts use NCBI table
    2 automatically (see :func:`varcode.effects.codon_tables.codon_table_for_transcript`).

    Returns ``None`` when the variant can't be cleanly applied:

    * Transcript is not protein-coding or is incomplete.
    * Variant doesn't overlap the transcript at all.
    * Variant spans more than one exon (splice-junction-crossing
      variants need the splice-aware path; not handled here).
    * Reference allele doesn't match the transcript's cDNA at the
      computed offset.

    Callers that get ``None`` should fall back to the fast
    :class:`EffectAnnotator`. The forthcoming protein-diff annotator
    layers effect classification on top of this builder.
    """
    from pyensembl import Transcript

    if not isinstance(transcript, Transcript):
        return None
    if not transcript.is_protein_coding or not transcript.complete:
        return None

    full_sequence = str(transcript.sequence)
    resolved = _resolve_variant_edit(variant, transcript, full_sequence)
    if resolved is None:
        return None
    edit, cdna_offset = resolved
    mutant_cdna = (
        full_sequence[:edit.cdna_start]
        + edit.alt_bases
        + full_sequence[edit.cdna_end:]
    )
    mutant_protein = None
    cds_start = min(transcript.start_codon_spliced_offsets)
    if cdna_offset >= cds_start:
        mutant_protein = _translate_from_cds(mutant_cdna, transcript)
    return MutantTranscript(
        reference_transcript=transcript,
        edits=(edit,),
        cdna_sequence=mutant_cdna,
        mutant_protein_sequence=mutant_protein,
        annotator_name="protein_diff",
    )

varcode.apply_variants_to_transcript

varcode.apply_variants_to_transcript(variants, transcript)

Apply a list of variants to a single transcript, yielding one :class:MutantTranscript that carries all the resulting edits (#269). Used for haplotype-aware joint effect prediction — cis variants on the same transcript become one combined mutant rather than N independent per-variant mutants.

Edits are applied in cDNA-coordinate order (highest offset first, so earlier offsets aren't shifted) to transcript's spliced cDNA. Returns None when any of the usual single-variant preconditions fail (non-coding, incomplete, etc.), or when the provided variants conflict — i.e. claim to edit overlapping cDNA ranges. The caller is responsible for falling back to per-variant effects in that case.

mutant_protein_sequence is populated when at least one edit lands after the canonical CDS start; the joint cDNA is translated from there to the first stop.

Order of variants doesn't matter — edits are sorted by cDNA offset internally.

Source code in varcode/mutant_transcript.py
def apply_variants_to_transcript(variants, transcript):
    """Apply a list of variants to a single transcript, yielding one
    :class:`MutantTranscript` that carries all the resulting edits
    (#269). Used for haplotype-aware joint effect prediction — cis
    variants on the same transcript become one combined mutant
    rather than N independent per-variant mutants.

    Edits are applied in cDNA-coordinate order (highest offset
    first, so earlier offsets aren't shifted) to ``transcript``'s
    spliced cDNA. Returns ``None`` when any of the usual
    single-variant preconditions fail (non-coding, incomplete, etc.),
    or when the provided variants conflict — i.e. claim to edit
    overlapping cDNA ranges. The caller is responsible for falling
    back to per-variant effects in that case.

    ``mutant_protein_sequence`` is populated when at least one edit
    lands after the canonical CDS start; the joint cDNA is translated
    from there to the first stop.

    Order of ``variants`` doesn't matter — edits are sorted by cDNA
    offset internally.
    """
    from pyensembl import Transcript

    if not isinstance(transcript, Transcript):
        return None
    if not transcript.is_protein_coding or not transcript.complete:
        return None

    full_sequence = str(transcript.sequence)
    resolved = []
    for variant in variants:
        r = _resolve_variant_edit(variant, transcript, full_sequence)
        if r is None:
            return None
        resolved.append(r)
    # Sort by cDNA start so overlap detection is a simple linear scan
    # and we can apply high-to-low without shifting earlier edits.
    resolved.sort(key=lambda pair: pair[0].cdna_start)
    for i in range(len(resolved) - 1):
        e1 = resolved[i][0]
        e2 = resolved[i + 1][0]
        # Insertions at the same offset don't "overlap" per se, but
        # their order is ambiguous — treat as a conflict.
        if e1.cdna_end > e2.cdna_start or (
                e1.cdna_start == e2.cdna_start == e1.cdna_end == e2.cdna_end):
            return None
    # Apply edits from the highest cDNA offset down so earlier offsets
    # aren't affected by upstream edits' length changes.
    mutant_cdna = full_sequence
    for edit, _ in sorted(
            resolved, key=lambda pair: pair[0].cdna_start, reverse=True):
        mutant_cdna = (
            mutant_cdna[:edit.cdna_start]
            + edit.alt_bases
            + mutant_cdna[edit.cdna_end:]
        )
    mutant_protein = None
    cds_start = min(transcript.start_codon_spliced_offsets)
    if any(anchor >= cds_start for _, anchor in resolved):
        mutant_protein = _translate_from_cds(mutant_cdna, transcript)
    return MutantTranscript(
        reference_transcript=transcript,
        edits=tuple(edit for edit, _ in resolved),
        cdna_sequence=mutant_cdna,
        mutant_protein_sequence=mutant_protein,
        annotator_name="protein_diff",
    )

Annotators

varcode.EffectAnnotator

varcode.EffectAnnotator

Bases: Protocol

Protocol for an object that annotates variant effects on transcripts.

Conforming objects expose:

  • name — short identifier (e.g. "fast") used in the registry and in serialized provenance.
  • supports — set of variant-kind tags the annotator can handle (e.g. {"snv", "indel"}). Callers that hand the annotator a variant outside this set get a clear :class:UnsupportedVariantError rather than silently wrong output.
  • :meth:annotate_on_transcript — the per-transcript entry point.

Optionally exposes version (string) — used in CSV provenance headers so readers can detect when a serialized collection came from a different annotator version. Built-in annotators track varcode's version; third-party annotators expose their own.

The protocol is intentionally narrow at this stage — additional methods (annotate_collection, annotate_with_context) will be added as downstream work needs them. The contract is duck-typed (@runtime_checkable) so third-party annotators don't need to inherit from varcode just to register.

varcode.FastEffectAnnotator

varcode.FastEffectAnnotator

Wraps :func:varcode.effects.predict_variant_effect_on_transcript.

version = _varcode_version class-attribute instance-attribute

Built-in annotators track varcode's own version. Third-party annotators (isovar's plugin, exacto's plugin) expose their own version string here; CSV provenance headers and round-trip warnings read from this field. See #271.

supports = frozenset({'snv', 'indel', 'mnv'}) class-attribute instance-attribute

Variant kinds this annotator handles. Splice-possibility sets, structural variants, and phased haplotypes fall outside the fast offset-based path and will be handled by the protein-diff annotator.

annotate_on_transcript(variant, transcript)

Delegate to the existing per-transcript prediction.

No fast-path / slow-path dispatch at this stage; that lives on the protein-diff annotator once it exists.

Source code in varcode/annotators/fast.py
def annotate_on_transcript(self, variant, transcript):
    """Delegate to the existing per-transcript prediction.

    No fast-path / slow-path dispatch at this stage; that lives
    on the protein-diff annotator once it exists.
    """
    # Lazy import avoids a circular dep at package import time.
    from ..effects import predict_variant_effect_on_transcript
    return predict_variant_effect_on_transcript(variant, transcript)

varcode.ProteinDiffEffectAnnotator

varcode.ProteinDiffEffectAnnotator

Classify effects by diffing translated mutant protein against the reference protein.

Produces byte-for-byte identical output to :class:FastEffectAnnotator on the common case (trivial SNVs and simple indels) because both flow through the same :func:classify_from_protein_diff classifier. Diverges where protein-diff's approach is provably more accurate (boundary codons, frameshift realignment). Any divergence must appear in the parity harness EXPECTED_DIFFS with an issue link.

annotate_on_transcript(variant, transcript)

Classify the effect of variant on transcript.

Runs fast first to detect splice-adjacent variants (which stay fast-classified); for everything else, builds a :class:MutantTranscript and diffs the translated protein.

Source code in varcode/annotators/protein_diff.py
def annotate_on_transcript(self, variant, transcript):
    """Classify the effect of ``variant`` on ``transcript``.

    Runs fast first to detect splice-adjacent variants (which
    stay fast-classified); for everything else, builds a
    :class:`MutantTranscript` and diffs the translated protein.
    """
    from pyensembl import Transcript
    if not isinstance(transcript, Transcript):
        raise TypeError(
            "Expected %s : %s to have type Transcript" % (
                transcript, type(transcript)))

    if not transcript.is_protein_coding:
        return NoncodingTranscript(variant, transcript)

    if not transcript.complete:
        return IncompleteTranscript(variant, transcript)

    # Run fast to get the splice classification.
    fast_effect = FastEffectAnnotator().annotate_on_transcript(
        variant, transcript)

    # Pure-intronic splice effects: fast only — no protein-
    # level diff to compute.
    if isinstance(fast_effect, (SpliceDonor, SpliceAcceptor)):
        return fast_effect
    if (isinstance(fast_effect, IntronicSpliceSite)
            and not isinstance(fast_effect, (SpliceDonor, SpliceAcceptor))):
        return fast_effect

    # Non-splice location-based classes (UTR, deep intronic): fast's
    # position-based classification is authoritative. The protein may
    # be unchanged but "outside the CDS" carries location semantics a
    # whole-protein diff doesn't. Closes #318.
    if isinstance(fast_effect, (ThreePrimeUTR, FivePrimeUTR, Intronic)):
        return fast_effect

    # ExonicSpliceSite: dual-dispatch. Fast provides the
    # splice class; protein-diff provides the alternate_effect
    # via protein diff.
    if isinstance(fast_effect, ExonicSpliceSite):
        mt = apply_variant_to_transcript(variant, transcript)
        if mt is not None and mt.mutant_protein_sequence is not None:
            alt = classify_from_protein_diff(
                variant=variant,
                transcript=transcript,
                ref_protein=str(transcript.protein_sequence),
                mut_protein=mt.mutant_protein_sequence,
                length_delta=mt.total_length_delta,
                mutant_transcript=mt)
            return ExonicSpliceSite(
                variant=variant,
                transcript=transcript,
                exon=fast_effect.exon,
                alternate_effect=alt)
        return fast_effect

    # Non-splice: protein-diff slow path.
    mt = apply_variant_to_transcript(variant, transcript)
    if mt is None or mt.mutant_protein_sequence is None:
        # UTR, ref-mismatch, splice-junction-spanning, etc.
        return fast_effect

    ref_protein = str(transcript.protein_sequence)
    mut_protein = mt.mutant_protein_sequence

    # Alternate start codon rewrite: if the first codon changed to
    # another recognised start codon in the transcript's codon
    # table (e.g. ATG→CTG/GTG/TTG, or MT ATG→GTG under table 2),
    # the initiator tRNA still loads Met regardless of what the
    # codon would decode to internally. Rewrite the mutant
    # protein's first residue to 'M' so the shared diff classifier
    # sees the biologically correct protein. Closes #320.
    cds_start = min(transcript.start_codon_spliced_offsets)
    ref_first_codon = str(
        transcript.sequence)[cds_start:cds_start + 3]
    mut_first_codon = mt.cdna_sequence[
        cds_start:cds_start + 3].upper()
    if (mut_first_codon != ref_first_codon
            and mut_protein
            and mut_protein[0] != "M"
            and ref_protein
            and ref_protein[0] == "M"):
        codon_table = codon_table_for_transcript(transcript)
        if mut_first_codon in codon_table.start_codons:
            mut_protein = "M" + mut_protein[1:]

    # Proteins match → Silent or AlternateStartCodon. Handle
    # both here because the shared classifier doesn't have
    # access to the cDNA edit offset for the correct aa_pos.
    if ref_protein == mut_protein:
        if ref_first_codon != mut_first_codon:
            codon_table = codon_table_for_transcript(transcript)
            if mut_first_codon in codon_table.start_codons:
                return AlternateStartCodon(
                    variant=variant,
                    transcript=transcript,
                    ref_codon=ref_first_codon,
                    alt_codon=mut_first_codon)
        from ..effects.effect_classes import Silent
        edit = mt.edits[0] if mt.edits else None
        aa_pos = (edit.cdna_start - cds_start) // 3 if edit else 0
        aa_ref = (
            ref_protein[aa_pos]
            if 0 <= aa_pos < len(ref_protein)
            else "")
        return Silent(
            variant=variant,
            transcript=transcript,
            aa_pos=aa_pos,
            aa_ref=aa_ref)

    return classify_from_protein_diff(
        variant=variant,
        transcript=transcript,
        ref_protein=ref_protein,
        mut_protein=mut_protein,
        length_delta=mt.total_length_delta,
        mutant_transcript=mt)

varcode.StructuralVariantAnnotator

varcode.StructuralVariantAnnotator

Classify :class:~varcode.StructuralVariant consequences on a single transcript.

supports advertises the SV kinds the annotator handles. The :class:~varcode.FastEffectAnnotator and :class:~varcode.annotators.protein_diff.ProteinDiffEffectAnnotator advertise {"snv", "indel", "mnv"}; this one advertises the SV-type tokens used on :attr:StructuralVariant.sv_type.

annotate_on_transcript(variant, transcript)

Classify variant on transcript. Returns a single effect (typically a MultiOutcomeEffect subclass); consume effect.outcomes for the full outcome set.

Source code in varcode/annotators/structural_variant.py
def annotate_on_transcript(self, variant, transcript):
    """Classify ``variant`` on ``transcript``. Returns a single
    effect (typically a ``MultiOutcomeEffect`` subclass); consume
    ``effect.outcomes`` for the full outcome set.
    """
    from pyensembl import Transcript
    if not isinstance(transcript, Transcript):
        raise TypeError(
            "Expected pyensembl.Transcript, got %s" % type(transcript))

    if not transcript.is_protein_coding:
        return NoncodingTranscript(variant, transcript)

    sv_type = getattr(variant, "sv_type", None)

    if sv_type == "DEL":
        effect = self._annotate_deletion(variant, transcript)
    elif sv_type == "DUP":
        effect = self._annotate_duplication(variant, transcript)
    elif sv_type == "INV":
        effect = self._annotate_inversion(variant, transcript)
    elif sv_type in ("CNV",):
        # CNVs are ambiguous at the protein level (gain vs loss
        # depends on direction). Treat as duplication in the CNV-
        # gain case; the CN0 subtype is routed to deletion logic
        # at parse time (see sv_allele_parser).
        effect = self._annotate_duplication(variant, transcript)
    elif sv_type == "BND":
        effect = self._annotate_breakend(variant, transcript)
    elif sv_type == "INS":
        # A large symbolic insertion is *structurally* an SV but
        # functionally resembles an in-frame-or-frameshift
        # insertion if the sequence is known. For now we defer
        # to the deletion shape (reporting the anchor codon) —
        # callers with ``alt_assembly`` populated get a richer
        # result once the SV annotator materializes segments.
        effect = self._annotate_insertion(variant, transcript)
    else:
        # Unknown SV type — fall back to intergenic so the call
        # graph doesn't crash.
        return Intergenic(variant)

    # Attach cryptic-exon candidates enumerated from flanking
    # sequence / long-read assembly (#337). They show up as
    # additional Outcomes with source="varcode_motif".
    self._enumerate_and_attach_cryptics(variant, effect)
    # Attach splice-outcome candidates when any SV breakpoint
    # lands in a canonical splice window on the transcript
    # (#341). They show up as additional Outcomes with
    # source="varcode_splice".
    self._enumerate_and_attach_splice_outcomes(variant, transcript, effect)
    return effect

Registry

varcode.register_annotator(annotator)

Add an annotator to the process-global registry, keyed by its .name. Re-registering under the same name overrides the previous entry — this is deliberate so callers can swap implementations in tests.

Source code in varcode/annotators/registry.py
def register_annotator(annotator):
    """Add an annotator to the process-global registry, keyed by its
    ``.name``. Re-registering under the same name overrides the
    previous entry — this is deliberate so callers can swap
    implementations in tests.
    """
    name = getattr(annotator, "name", None)
    if not name:
        raise ValueError(
            "Annotator %r has no .name attribute; cannot register." % annotator)
    _REGISTRY[name] = annotator
    return annotator

varcode.get_annotator(name)

Look up a registered annotator by name. Raises KeyError if no annotator is registered under that name.

Source code in varcode/annotators/registry.py
def get_annotator(name):
    """Look up a registered annotator by name. Raises ``KeyError``
    if no annotator is registered under that name.
    """
    return _REGISTRY[name]

varcode.get_default_annotator()

Return the annotator currently configured as the default.

Current default is "protein_diff" (#322–#327 closed the last known correctness bugs between the two). "fast" stays available as an opt-in.

Source code in varcode/annotators/registry.py
def get_default_annotator():
    """Return the annotator currently configured as the default.

    Current default is ``"protein_diff"`` (#322–#327 closed the last
    known correctness bugs between the two). ``"fast"`` stays
    available as an opt-in.
    """
    return _REGISTRY[_DEFAULT_NAME]

varcode.set_default_annotator(name)

Swap the process-wide default annotator. name must refer to a registered annotator.

Source code in varcode/annotators/registry.py
def set_default_annotator(name):
    """Swap the process-wide default annotator. ``name`` must refer
    to a registered annotator.
    """
    global _DEFAULT_NAME
    if name not in _REGISTRY:
        raise KeyError(
            "No annotator registered under %r — call register_annotator() "
            "first or pick from %r." % (name, sorted(_REGISTRY)))
    _DEFAULT_NAME = name

varcode.use_annotator(name_or_instance)

Context manager that temporarily swaps the default annotator.

Useful for A/B comparisons and scoped overrides without mutating global state across the codebase::

with varcode.use_annotator("protein_diff"):
    effects = variant_collection.effects()

Accepts the same argument shape as the annotator= kwarg: a registered-name string, or an annotator instance. Passing an instance registers it temporarily under its .name so that name-based lookups inside the block find it; on exit the previous default and any previously-registered annotator under that name are restored.

Source code in varcode/annotators/registry.py
@contextmanager
def use_annotator(name_or_instance):
    """Context manager that temporarily swaps the default annotator.

    Useful for A/B comparisons and scoped overrides without mutating
    global state across the codebase::

        with varcode.use_annotator("protein_diff"):
            effects = variant_collection.effects()

    Accepts the same argument shape as the ``annotator=`` kwarg:
    a registered-name string, or an annotator instance. Passing an
    instance registers it temporarily under its ``.name`` so that
    name-based lookups inside the block find it; on exit the
    previous default and any previously-registered annotator under
    that name are restored.
    """
    global _DEFAULT_NAME
    prior_default = _DEFAULT_NAME

    if isinstance(name_or_instance, str):
        if name_or_instance not in _REGISTRY:
            raise KeyError(
                "No annotator registered under %r — register one first or "
                "pass an instance." % name_or_instance)
        _DEFAULT_NAME = name_or_instance
        prior_registration = None
    else:
        name = getattr(name_or_instance, "name", None)
        if not name:
            raise ValueError(
                "Annotator instance has no .name attribute; cannot scope.")
        prior_registration = _REGISTRY.get(name)
        _REGISTRY[name] = name_or_instance
        _DEFAULT_NAME = name

    try:
        yield
    finally:
        _DEFAULT_NAME = prior_default
        if not isinstance(name_or_instance, str):
            if prior_registration is None:
                _REGISTRY.pop(name, None)
            else:
                _REGISTRY[name] = prior_registration

Phasing

varcode.IsovarPhaseResolver

varcode.IsovarPhaseResolver(provider: IsovarAssemblyProvider)

Phase resolver backed by an :class:IsovarAssemblyProvider (#269, #259).

Two variants are cis if they appear on the same assembled contig. That's direct molecular evidence — not a probabilistic call.

Usage::

isovar_results = run_isovar(bam, vcf)
resolver = IsovarPhaseResolver(isovar_results)
effects = variants.effects(phase_resolver=resolver)

Any effect whose (variant, transcript) is covered by an assembled contig gets its :attr:~MutationEffect.mutant_transcript populated with the contig-derived :class:MutantTranscript — the protein attached to the effect is the protein actually observed in RNA, not one inferred from the reference.

Source code in varcode/phasing.py
def __init__(self, provider: IsovarAssemblyProvider):
    self.provider = provider

has_contig(variant, transcript) -> bool

Convenience passthrough to the provider.

Source code in varcode/phasing.py
def has_contig(self, variant, transcript) -> bool:
    """Convenience passthrough to the provider."""
    return self.provider.has_contig(variant, transcript)

mutant_transcript(variant, transcript)

Return the assembled :class:MutantTranscript, or None when this provider has no contig for (variant, transcript).

Source code in varcode/phasing.py
def mutant_transcript(self, variant, transcript):
    """Return the assembled :class:`MutantTranscript`, or ``None``
    when this provider has no contig for ``(variant, transcript)``.
    """
    if not self.provider.has_contig(variant, transcript):
        return None
    return self.provider.mutant_transcript(variant, transcript)

in_cis(v1, v2, transcript=None) -> Optional[bool]

Return True if v1 and v2 appear on the same Isovar contig, False if they're each on a different contig (distinct physical molecules — trans), None when neither variant has a contig (no evidence).

transcript is required because assemblies may be isoform-specific; pass None only if the provider is isoform-agnostic (the protocol allows this).

Source code in varcode/phasing.py
def in_cis(self, v1, v2, transcript=None) -> Optional[bool]:
    """Return ``True`` if ``v1`` and ``v2`` appear on the same
    Isovar contig, ``False`` if they're each on a different
    contig (distinct physical molecules — trans), ``None`` when
    neither variant has a contig (no evidence).

    ``transcript`` is required because assemblies may be
    isoform-specific; pass ``None`` only if the provider is
    isoform-agnostic (the protocol allows this).
    """
    if transcript is None:
        return None
    v1_has = self.provider.has_contig(v1, transcript)
    v2_has = self.provider.has_contig(v2, transcript)
    if not v1_has and not v2_has:
        return None
    if v1_has:
        return v2 in self.provider.variants_in_contig(v1, transcript)
    return v1 in self.provider.variants_in_contig(v2, transcript)

phased_partners(variant, transcript) -> Sequence

Variants observed on the same contig as variant on transcript — i.e. the cis set. Empty if no contig.

Source code in varcode/phasing.py
def phased_partners(self, variant, transcript) -> Sequence:
    """Variants observed on the same contig as ``variant`` on
    ``transcript`` — i.e. the cis set. Empty if no contig."""
    if not self.provider.has_contig(variant, transcript):
        return ()
    return tuple(self.provider.variants_in_contig(variant, transcript))

varcode.VCFPhaseResolver

varcode.VCFPhaseResolver(variant_collection, sample)

Phase resolver backed by VCF GT + PS FORMAT fields.

Reads the phase data that varcode's VCF loader already parses into :class:~varcode.Genotype (via #267): whether the GT delimiter was | (phased) or / (unphased), the PS phase-set identifier, and the per-haplotype allele indices in :attr:Genotype.alleles.

Two variants are cis when they sit in the same phase set on the same haplotype slot, trans when they sit in the same phase set on different slots, and the resolver returns None ("no evidence") for variants that aren't both phased, don't share a phase set, or lack called alleles.

Compatible with any tool that writes standard-shaped VCF: WhatsHap, HapCUT2, DeepVariant, GATK HaplotypeCaller, long-read callers (PEPPER-DeepVariant, Clair3), population phasers (SHAPEIT5, Eagle2). varcode doesn't care which one wrote the file — it only reads GT and PS.

Multi-allelic sites are handled: varcode splits those rows into one :class:~varcode.Variant per ALT, each with an alt_allele_index preserved on the :class:~varcode.VariantCollection metadata. The resolver maps each variant to its GT-encoded index and asks "which haplotype slot carries this specific alt?".

Single-sample by construction. Phase is per-sample; multi-sample VCFs need one resolver per sample.

Currently supplies the cis/trans query but does not attach a :class:~varcode.MutantTranscript — DNA phasing alone doesn't produce an assembled contig. The natural next step is a HaplotypeEffect / multi-variant apply_variants_to_transcript helper that, when two or more cis variants overlap the same transcript, builds a single joint :class:MutantTranscript applying all edits at once. That's a separate PR — this resolver already has the inputs it needs (in_cis) to drive the grouping.

Source code in varcode/phasing.py
def __init__(self, variant_collection, sample):
    self._collection = variant_collection
    self._sample = sample

in_cis(v1, v2, transcript=None) -> Optional[bool]

Return True if v1 and v2 are on the same haplotype in the same phase set, False if they're on different haplotypes in the same phase set, None when the phase relationship can't be determined (unphased GT, different phase sets, uncalled alleles).

transcript is accepted for interface symmetry with :class:IsovarPhaseResolver.in_cis but isn't consulted — DNA-level phase is isoform-agnostic.

Source code in varcode/phasing.py
def in_cis(self, v1, v2, transcript=None) -> Optional[bool]:
    """Return ``True`` if ``v1`` and ``v2`` are on the same
    haplotype in the same phase set, ``False`` if they're on
    different haplotypes in the same phase set, ``None`` when
    the phase relationship can't be determined (unphased GT,
    different phase sets, uncalled alleles).

    ``transcript`` is accepted for interface symmetry with
    :class:`IsovarPhaseResolver.in_cis` but isn't consulted —
    DNA-level phase is isoform-agnostic.
    """
    g1 = self._genotype(v1)
    g2 = self._genotype(v2)
    if g1 is None or g2 is None:
        return None
    alt1 = self._collection._alt_index_for(v1)
    alt2 = self._collection._alt_index_for(v2)
    # Homozygous-alt on either side makes phase deterministic:
    # every haplotype carries this alt, so it's cis with anything
    # the OTHER variant sits on. Shortcut before requiring
    # phased=True.
    hom1 = self._is_homozygous_alt(g1, alt1)
    hom2 = self._is_homozygous_alt(g2, alt2)
    if hom1 and hom2:
        return True
    if hom1:
        # v1 is on every haplotype → cis with v2 iff v2 is called.
        return self._haplotype_slot(g2, v2) is not None
    if hom2:
        return self._haplotype_slot(g1, v1) is not None
    # Otherwise both must be phased and in the same phase set.
    if not g1.phased or not g2.phased:
        return None
    if g1.phase_set != g2.phase_set or g1.phase_set is None:
        return None
    s1 = self._haplotype_slot(g1, v1)
    s2 = self._haplotype_slot(g2, v2)
    if s1 is None or s2 is None:
        return None
    return s1 == s2

phased_partners(variant, transcript=None)

Variants in the collection that are cis with variant under this resolver — i.e. sit in the same phase set on the same haplotype slot. Empty when variant isn't phased or has no called alt in the sample.

Source code in varcode/phasing.py
def phased_partners(self, variant, transcript=None):
    """Variants in the collection that are cis with ``variant``
    under this resolver — i.e. sit in the same phase set on the
    same haplotype slot. Empty when ``variant`` isn't phased or
    has no called alt in the sample.
    """
    partners = []
    for other in self._collection:
        if other == variant:
            continue
        if self.in_cis(variant, other) is True:
            partners.append(other)
    return tuple(partners)

varcode.apply_phase_resolver_to_effects

varcode.apply_phase_resolver_to_effects(effects, phase_resolver)

Post-process an :class:EffectCollection (or any iterable of :class:MutationEffect) to attach contig-derived :class:MutantTranscript objects when the resolver has evidence.

Mutates each effect in place by setting effect.mutant_transcript. Effects whose transcript isn't resolvable or whose (variant, transcript) has no contig are left untouched — so this is safe to call on a mixed collection where only some variants have RNA evidence.

Source code in varcode/phasing.py
def apply_phase_resolver_to_effects(effects, phase_resolver):
    """Post-process an :class:`EffectCollection` (or any iterable of
    :class:`MutationEffect`) to attach contig-derived
    :class:`MutantTranscript` objects when the resolver has evidence.

    Mutates each effect in place by setting
    ``effect.mutant_transcript``. Effects whose transcript isn't
    resolvable or whose ``(variant, transcript)`` has no contig are
    left untouched — so this is safe to call on a mixed collection
    where only some variants have RNA evidence.
    """
    if phase_resolver is None:
        return effects
    if not hasattr(phase_resolver, "mutant_transcript"):
        return effects
    for e in effects:
        transcript = getattr(e, "transcript", None)
        variant = getattr(e, "variant", None)
        if variant is None or transcript is None:
            continue
        mt = phase_resolver.mutant_transcript(variant, transcript)
        if mt is not None:
            # Intentional mutation: the effect's mutant_transcript
            # slot was either None (point variants, cryptic stubs,
            # etc.) or populated from DNA-only inference. Isovar's
            # assembly is higher-confidence evidence, so it wins.
            e.mutant_transcript = mt
    return effects

RNA evidence

varcode.RNAEvidenceResolver

varcode.RNAEvidenceResolver

Bases: Protocol

Source of RNA-observed outcomes for a (variant, transcript) pair.

Implementers return zero or more :class:~varcode.outcomes.Outcome objects describing isoforms, fusions, or RNA-level events that were actually observed in reads. An empty sequence means "no evidence for this pair" — the existing DNA-predicted outcomes are left alone.

Returned outcomes should set source to a producer-specific string ("isovar", "exacto", "longread_assembly", ...) and populate evidence with whatever shape that producer natively emits (transcript model IDs, junction read counts, etc.). See :func:make_rna_outcome for a convenience factory that fills the common fields.

observed_outcomes(variant, transcript) -> Sequence[Outcome]

Return RNA-observed outcomes for variant on transcript, or an empty sequence when no evidence is available. Must not raise on unknown (variant, transcript) pairs — return an empty sequence instead.

Source code in varcode/rna_evidence.py
def observed_outcomes(self, variant, transcript) -> Sequence[Outcome]:
    """Return RNA-observed outcomes for ``variant`` on
    ``transcript``, or an empty sequence when no evidence is
    available. Must not raise on unknown ``(variant, transcript)``
    pairs — return an empty sequence instead."""
    ...

varcode.NullRNAEvidenceResolver

varcode.NullRNAEvidenceResolver

No-op resolver that always reports "no evidence".

Useful as a default in pipelines where an RNA resolver is optional and as a baseline in tests. apply_rna_evidence_to_effects is safe to call with this resolver — it's a no-op walk.

varcode.apply_rna_evidence_to_effects

varcode.apply_rna_evidence_to_effects(effects: Iterable, resolver) -> Iterable

Attach RNA-observed outcomes from resolver to each effect in place.

Walks effects and, for any effect with a resolvable (variant, transcript), asks resolver.observed_outcomes for any RNA-observed outcomes and stashes them on the effect's _extra_outcomes slot. The :attr:MultiOutcomeEffect.outcomes property (and its overrides) consult that slot via :meth:MultiOutcomeEffect._with_extra_outcomes so callers see DNA-predicted outcomes followed by RNA-observed ones.

Single-outcome effects (Missense, FrameShift, etc.) are left untouched even when the resolver has evidence — those classes don't expose an outcomes view, and replacing them with a multi-outcome wrapper would break downstream isinstance checks. Producers that need to surface RNA observations on point variants should report them as a separate :class:MultiOutcomeEffect rather than mutating an existing single-outcome one. (The point-variant diff is generally already correct from DNA, so this is rarely an issue in practice.)

Mirrors :func:varcode.phasing.apply_phase_resolver_to_effects: in-place mutation, safe to call on a mixed collection where only some variants have RNA evidence, no-op when resolver is None or doesn't implement the protocol.

Returns effects for chaining convenience.

Source code in varcode/rna_evidence.py
def apply_rna_evidence_to_effects(effects: Iterable, resolver) -> Iterable:
    """Attach RNA-observed outcomes from ``resolver`` to each effect
    in place.

    Walks ``effects`` and, for any effect with a resolvable
    ``(variant, transcript)``, asks ``resolver.observed_outcomes``
    for any RNA-observed outcomes and stashes them on the effect's
    ``_extra_outcomes`` slot. The :attr:`MultiOutcomeEffect.outcomes`
    property (and its overrides) consult that slot via
    :meth:`MultiOutcomeEffect._with_extra_outcomes` so callers see
    DNA-predicted outcomes followed by RNA-observed ones.

    Single-outcome effects (Missense, FrameShift, etc.) are left
    untouched even when the resolver has evidence — those classes
    don't expose an ``outcomes`` view, and replacing them with a
    multi-outcome wrapper would break downstream ``isinstance`` checks.
    Producers that need to surface RNA observations on point variants
    should report them as a separate :class:`MultiOutcomeEffect` rather
    than mutating an existing single-outcome one. (The point-variant
    diff is generally already correct from DNA, so this is rarely an
    issue in practice.)

    Mirrors :func:`varcode.phasing.apply_phase_resolver_to_effects`:
    in-place mutation, safe to call on a mixed collection where only
    some variants have RNA evidence, no-op when ``resolver`` is None
    or doesn't implement the protocol.

    Returns ``effects`` for chaining convenience.
    """
    if resolver is None:
        return effects
    if not hasattr(resolver, "observed_outcomes"):
        return effects

    # Lazy import to avoid an import cycle (effect_classes imports
    # from varcode.outcomes, which sits below us).
    from .effects.effect_classes import MultiOutcomeEffect

    for effect in effects:
        if not isinstance(effect, MultiOutcomeEffect):
            continue
        variant = getattr(effect, "variant", None)
        transcript = getattr(effect, "transcript", None)
        if variant is None or transcript is None:
            continue
        observed = resolver.observed_outcomes(variant, transcript)
        if not observed:
            continue
        # Coerce to tuple eagerly so we don't keep a generator that
        # would silently exhaust on a second outcomes() read.
        observed_tuple = tuple(observed)
        if not observed_tuple:
            continue
        existing = getattr(effect, "_extra_outcomes", ())
        effect._extra_outcomes = tuple(existing) + observed_tuple
    return effects

varcode.make_rna_outcome

varcode.make_rna_outcome(effect, *, probability: Optional[float] = None, source: str = 'rna', transcript_model_id: Optional[str] = None, read_count: Optional[int] = None, description: Optional[str] = None, extra_evidence: Optional[Mapping[str, Any]] = None) -> Outcome

Construct an :class:~varcode.outcomes.Outcome carrying RNA-derived provenance.

Convenience factory for the common fields a reads-based or long-read assembly tool wants on each observed outcome — keeps consumers from hand-rolling the evidence dict shape and lets downstream code rely on a small set of well-known keys.

PARAMETER DESCRIPTION
effect

The effect this RNA-observed outcome represents.

TYPE: MutationEffect

probability

Estimated frequency of this isoform (e.g. expression-supported fraction). None means "not scored".

TYPE: float or None DEFAULT: None

source

Producer name; defaults to "rna". Set to "isovar" / "exacto" / etc. for tool-specific filtering.

TYPE: str DEFAULT: 'rna'

transcript_model_id

Stable ID of the observed transcript model from the producer. Stored under evidence["transcript_model_id"].

TYPE: str or None DEFAULT: None

read_count

Supporting read count. Stored under evidence["read_count"].

TYPE: int or None DEFAULT: None

description

Human-readable label, passed through to :attr:Outcome.description.

TYPE: str or None DEFAULT: None

extra_evidence

Producer-specific extra fields, merged into the evidence dict on top of the well-known keys above. Allows tool-native fields (e.g. "tpm", "junction_id") to ride along without forcing a schema here.

TYPE: Mapping or None DEFAULT: None

Source code in varcode/rna_evidence.py
def make_rna_outcome(
        effect,
        *,
        probability: Optional[float] = None,
        source: str = "rna",
        transcript_model_id: Optional[str] = None,
        read_count: Optional[int] = None,
        description: Optional[str] = None,
        extra_evidence: Optional[Mapping[str, Any]] = None) -> Outcome:
    """Construct an :class:`~varcode.outcomes.Outcome` carrying
    RNA-derived provenance.

    Convenience factory for the common fields a reads-based or
    long-read assembly tool wants on each observed outcome — keeps
    consumers from hand-rolling the ``evidence`` dict shape and lets
    downstream code rely on a small set of well-known keys.

    Parameters
    ----------
    effect : MutationEffect
        The effect this RNA-observed outcome represents.
    probability : float or None
        Estimated frequency of this isoform (e.g. expression-supported
        fraction). ``None`` means "not scored".
    source : str
        Producer name; defaults to ``"rna"``. Set to ``"isovar"`` /
        ``"exacto"`` / etc. for tool-specific filtering.
    transcript_model_id : str or None
        Stable ID of the observed transcript model from the producer.
        Stored under ``evidence["transcript_model_id"]``.
    read_count : int or None
        Supporting read count. Stored under ``evidence["read_count"]``.
    description : str or None
        Human-readable label, passed through to
        :attr:`Outcome.description`.
    extra_evidence : Mapping or None
        Producer-specific extra fields, merged into the evidence dict
        on top of the well-known keys above. Allows tool-native fields
        (e.g. ``"tpm"``, ``"junction_id"``) to ride along without
        forcing a schema here.
    """
    evidence: dict = {}
    if transcript_model_id is not None:
        evidence["transcript_model_id"] = transcript_model_id
    if read_count is not None:
        evidence["read_count"] = read_count
    if extra_evidence:
        evidence.update(extra_evidence)
    return Outcome(
        effect=effect,
        probability=probability,
        source=source,
        evidence=evidence,
        description=description,
    )

Germline-aware annotation

varcode.GermlineContext

varcode.GermlineContext(variants: 'VariantCollection', completeness: Completeness = Completeness.COMPLETE, reference_name: Optional[str] = None, metadata: Mapping[str, Any] = dict()) dataclass

The patient's germline, packaged with completeness metadata and reference-build info for cross-VCF validation.

Construct via the from_* classmethods rather than instantiating directly; the constructors apply the input-shape-specific validation each route needs.

ATTRIBUTE DESCRIPTION
variants

The germline variants as a :class:~varcode.VariantCollection. Empty for :meth:empty contexts.

TYPE: 'VariantCollection'

completeness

How to interpret absence-of-a-call (see :class:Completeness).

TYPE: Completeness

reference_name

The genome reference these variants were called against — "GRCh37", "GRCh38", "hg19", etc. None when the context is empty or the reference is unknown. Used by :meth:validate_against to fail fast on cross-VCF build mismatches.

TYPE: Optional[str]

metadata

Open-ended dict for caller-supplied annotations (source caller name, sample identifier, normalization tool, etc.). Not interpreted by varcode; rides along for downstream consumers and serialization.

TYPE: Mapping[str, Any]

Examples:

Route 1 — full germline call set::

ctx = GermlineContext.from_germline_vcf("normal.vcf")

Route 2 — multi-sample VCF, extract a column. The user must declare completeness explicitly because absence-from-a-multi- sample column rarely means ref/ref::

ctx = GermlineContext.from_multi_sample_vcf(
    "merged.vcf", sample="NORMAL", completeness=Completeness.SPARSE)

Direct construction (tests, custom pipelines)::

ctx = GermlineContext.from_variants(
    germline_variants, completeness=Completeness.COMPLETE,
    reference_name="GRCh38")

Explicit empty context — opt-in to reference-relative fallback::

ctx = GermlineContext.empty()

from_germline_vcf(path: str, *, completeness: Completeness = Completeness.COMPLETE, metadata: Optional[Mapping[str, Any]] = None, **load_vcf_kwargs) -> 'GermlineContext' classmethod

Load a full germline VCF into a context.

load_vcf_kwargs are passed through to :func:varcode.load_vcf — for example genome= or only_passing=False. The returned context defaults to Completeness.COMPLETE; pass completeness= only if the VCF is something other than a real germline call set.

Source code in varcode/germline.py
@classmethod
def from_germline_vcf(
        cls,
        path: str,
        *,
        completeness: Completeness = Completeness.COMPLETE,
        metadata: Optional[Mapping[str, Any]] = None,
        **load_vcf_kwargs) -> "GermlineContext":
    """Load a full germline VCF into a context.

    ``load_vcf_kwargs`` are passed through to
    :func:`varcode.load_vcf` — for example ``genome=`` or
    ``only_passing=False``. The returned context defaults to
    ``Completeness.COMPLETE``; pass ``completeness=`` only if the
    VCF is something other than a real germline call set.
    """
    # Lazy import to avoid pulling vcf.py into the import graph
    # for callers who don't need it.
    from .vcf import load_vcf
    vc = load_vcf(path, **load_vcf_kwargs)
    if len(vc) == 0:
        warnings.warn(
            "Loaded germline VCF %r contains zero variants. This "
            "is almost always a wrong-file error; effect "
            "prediction will silently fall through to "
            "reference-relative on every somatic variant." % path)
    return cls(
        variants=vc,
        completeness=completeness,
        reference_name=cls._reference_name_of(vc),
        metadata=dict(metadata or {}),
    )

from_multi_sample_vcf(path: str, sample: str, *, completeness: Completeness, metadata: Optional[Mapping[str, Any]] = None, **load_vcf_kwargs) -> 'GermlineContext' classmethod

Load a multi-sample VCF and extract one sample's calls as the germline.

completeness is required (no default) — multi-sample VCFs from somatic callers (Mutect2's NORMAL column, e.g.) are almost always sparse, but pure-germline multi-sample VCFs (1000G, gnomAD batch genotyping) are complete. Forcing the caller to declare prevents subtle correctness bugs from treating a sparse column as if absence implied ref/ref.

The sample is filtered post-load. If you need per-sample zygosity information, pass include_info=True (the default) and consult vc.metadata[variant]["sample_info"][sample] downstream.

Source code in varcode/germline.py
@classmethod
def from_multi_sample_vcf(
        cls,
        path: str,
        sample: str,
        *,
        completeness: Completeness,
        metadata: Optional[Mapping[str, Any]] = None,
        **load_vcf_kwargs) -> "GermlineContext":
    """Load a multi-sample VCF and extract one sample's calls as
    the germline.

    ``completeness`` is required (no default) — multi-sample VCFs
    from somatic callers (Mutect2's ``NORMAL`` column, e.g.) are
    almost always sparse, but pure-germline multi-sample VCFs
    (1000G, gnomAD batch genotyping) are complete. Forcing the
    caller to declare prevents subtle correctness bugs from
    treating a sparse column as if absence implied ref/ref.

    The sample is filtered post-load. If you need per-sample
    zygosity information, pass ``include_info=True`` (the default)
    and consult ``vc.metadata[variant]["sample_info"][sample]``
    downstream.
    """
    from .vcf import load_vcf
    vc = load_vcf(path, **load_vcf_kwargs)
    if sample not in cls._samples_of(vc):
        from .errors import SampleNotFoundError
        raise SampleNotFoundError(
            "Sample %r not present in %s. Available samples: %s" % (
                sample, path, sorted(cls._samples_of(vc))))
    # Filter to variants where the named sample has a non-ref call.
    # The base VariantCollection isn't intrinsically sample-aware;
    # this is a conservative subset that the caller can refine.
    filtered = cls._variants_called_in_sample(vc, sample)
    meta = dict(metadata or {})
    meta.setdefault("source_path", path)
    meta.setdefault("sample", sample)
    return cls(
        variants=filtered,
        completeness=completeness,
        reference_name=cls._reference_name_of(vc),
        metadata=meta,
    )

from_variants(variants, *, completeness: Completeness = Completeness.COMPLETE, reference_name: Optional[str] = None, metadata: Optional[Mapping[str, Any]] = None) -> 'GermlineContext' classmethod

Construct from an already-built :class:VariantCollection or any iterable of :class:Variant objects.

Useful for tests, hand-built pipelines, and downstream tools that already have variants in memory and don't need to re-parse a VCF. reference_name should be passed explicitly when not carried by the variants themselves; otherwise cross-VCF validation will be a no-op.

Source code in varcode/germline.py
@classmethod
def from_variants(
        cls,
        variants,
        *,
        completeness: Completeness = Completeness.COMPLETE,
        reference_name: Optional[str] = None,
        metadata: Optional[Mapping[str, Any]] = None) -> "GermlineContext":
    """Construct from an already-built :class:`VariantCollection`
    or any iterable of :class:`Variant` objects.

    Useful for tests, hand-built pipelines, and downstream tools
    that already have variants in memory and don't need to re-parse
    a VCF. ``reference_name`` should be passed explicitly when not
    carried by the variants themselves; otherwise cross-VCF
    validation will be a no-op.
    """
    from .variant_collection import VariantCollection
    if isinstance(variants, VariantCollection):
        vc = variants
    else:
        vc = VariantCollection(list(variants))
    if reference_name is None:
        reference_name = cls._reference_name_of(vc)
    return cls(
        variants=vc,
        completeness=completeness,
        reference_name=reference_name,
        metadata=dict(metadata or {}),
    )

empty() -> 'GermlineContext' classmethod

Explicit no-germline context. Use this in pipelines where germline= is structurally required but the caller has no germline data — it documents intent better than passing None, and downstream code can rely on the kwarg always being a :class:GermlineContext.

Effect prediction with an empty context falls through to reference-relative annotation (no patient transcript construction), with no warnings — the caller has explicitly opted in to the fallback.

Source code in varcode/germline.py
@classmethod
def empty(cls) -> "GermlineContext":
    """Explicit no-germline context. Use this in pipelines where
    ``germline=`` is structurally required but the caller has no
    germline data — it documents intent better than passing
    ``None``, and downstream code can rely on the kwarg always
    being a :class:`GermlineContext`.

    Effect prediction with an empty context falls through to
    reference-relative annotation (no patient transcript
    construction), with no warnings — the caller has explicitly
    opted in to the fallback.
    """
    from .variant_collection import VariantCollection
    return cls(
        variants=VariantCollection([]),
        completeness=Completeness.EMPTY,
        reference_name=None,
        metadata={},
    )

__bool__() -> bool

Truthy when there's something to apply. EMPTY contexts are falsy so if germline_context: reads idiomatically.

Source code in varcode/germline.py
def __bool__(self) -> bool:
    """Truthy when there's something to apply. ``EMPTY`` contexts
    are falsy so ``if germline_context:`` reads idiomatically."""
    return self.completeness is not Completeness.EMPTY and len(self.variants) > 0

validate_against(somatic, *, validate_reference: bool = True) -> None

Cross-validate this context with a somatic :class:VariantCollection. Hard error on reference-build mismatch unless validate_reference=False; warn on suspicious shapes (empty germline, sparse coverage with no overlap with somatic, etc.).

Called automatically by :meth:Variant.effects / :meth:VariantCollection.effects when a context is supplied; callers running validation manually can do so up front to fail fast before annotation.

Source code in varcode/germline.py
def validate_against(
        self,
        somatic,
        *,
        validate_reference: bool = True) -> None:
    """Cross-validate this context with a somatic
    :class:`VariantCollection`. Hard error on reference-build
    mismatch unless ``validate_reference=False``; warn on
    suspicious shapes (empty germline, sparse coverage with no
    overlap with somatic, etc.).

    Called automatically by :meth:`Variant.effects` /
    :meth:`VariantCollection.effects` when a context is supplied;
    callers running validation manually can do so up front to fail
    fast before annotation.
    """
    if not isinstance(self, GermlineContext):
        raise TypeError(
            "validate_against expects self to be a GermlineContext")
    somatic_ref = self._reference_name_of(somatic)
    if validate_reference and self.reference_name and somatic_ref:
        if self.reference_name != somatic_ref:
            raise GenomeBuildMismatchError(
                somatic_reference=somatic_ref,
                germline_reference=self.reference_name)
    if (self.completeness is not Completeness.EMPTY
            and len(self.variants) == 0):
        warnings.warn(
            "GermlineContext is non-empty by completeness flag (%s) "
            "but holds zero variants — likely a wrong-file or "
            "filter-too-aggressive error." % self.completeness.value)

variants_in_window(contig: str, start: int, end: int) -> Tuple

Germline variants overlapping [start, end] on contig (inclusive on both ends).

Used by the window-based lookup machinery (slice 2 of #268). Lazy interval index is built on first call and cached on the instance — subsequent calls are O(log N) per contig.

Returns a tuple (immutable) so callers can safely cache the result without worrying about the underlying index mutating.

Source code in varcode/germline.py
def variants_in_window(
        self,
        contig: str,
        start: int,
        end: int) -> Tuple:
    """Germline variants overlapping ``[start, end]`` on
    ``contig`` (inclusive on both ends).

    Used by the window-based lookup machinery (slice 2 of #268).
    Lazy interval index is built on first call and cached on the
    instance — subsequent calls are O(log N) per contig.

    Returns a tuple (immutable) so callers can safely cache the
    result without worrying about the underlying index mutating.
    """
    index = self._index()
    starts, variants = index.get(contig, ((), ()))
    if not starts:
        return ()
    # Variants are indexed by start. Find candidates whose
    # start <= end, then filter by their actual end span. Most
    # germline variants are SNVs / short indels so the candidate
    # window is small.
    cutoff = bisect.bisect_right(starts, end)
    result = []
    for i in range(cutoff):
        v = variants[i]
        v_end = getattr(v, "end", None) or v.start
        if v_end >= start:
            result.append(v)
    return tuple(result)

varcode.Completeness

varcode.Completeness

Bases: Enum

How exhaustive the germline call set is — the load-bearing flag that pins what absence of a call at a position means.

The same data structure ("a list of germline variants") can come from very different pipelines, and downstream effect prediction cannot make the right call without knowing which:

  • If a position is absent from a real germline VCF emitted by a germline caller that examined the entire normal BAM, the patient is ref/ref there. Effect prediction proceeds reference-relative at that codon.
  • If a position is absent from the NORMAL column of a somatic-caller VCF, it likely means the somatic caller didn't emit a row — not that the position is ref/ref. The patient's germline state at that codon is unknown. The honest output is a possibility set including "unknown germline."
  • If a position is absent from a panel-of-normals filter list, it definitely doesn't imply ref/ref — the file only lists curated hotspots.

Mis-treating "absent" as "ref/ref" silently produces wrong germline-aware effects on somatic variants in long stretches of the genome the somatic caller never touched. The flag exists so that mistake fails loud (or at least produces an honest possibility set) instead of silently corrupting clinical annotation.

Values

+-------------------+-----------------------------------+--------------------------+ | Value | Typical pipeline of origin | Absence at a position | +===================+===================================+==========================+ | :attr:COMPLETE | Germline caller (DeepVariant, | ⇒ ref/ref | | | HaplotypeCaller, Strelka2 | | | | germline) on the normal BAM | | +-------------------+-----------------------------------+--------------------------+ | :attr:SPARSE | NORMAL column of a somatic | ⇒ unknown (probably | | | tumor-vs-normal VCF (Mutect2, | ref/ref but not | | | Strelka2 somatic, VarScan2 | queried). Honest output| | | somatic) | is a possibility set. | +-------------------+-----------------------------------+--------------------------+ | :attr:HOTSPOTS_ | Panel-of-normals filter list, | ⇒ definitely unknown. | | ONLY | ClinVar pathogenic list, single- | Strictly weaker | | | hotspot allowlists | evidence than SPARSE. | +-------------------+-----------------------------------+--------------------------+ | :attr:EMPTY | "I have no germline data" — | n/a (no germline-aware | | | explicit fallback, used so users | logic runs; equivalent to| | | opt into reference-relative | not passing germline= at | | | annotation rather than getting it | all) | | | by accident from a missing kwarg | | +-------------------+-----------------------------------+--------------------------+

What downstream slices do with this

Slice 3 of #268 wires germline= through annotator dispatch. When a somatic variant lands in a transcript window that has no germline calls, the annotator reads this flag to decide between:

  • COMPLETE → patient is ref/ref in this window; emit a single reference-relative effect.
  • SPARSE / HOTSPOTS_ONLY → patient's germline is unknown in this window; emit a possibility set including the reference-relative effect plus "germline-unknown" outcomes so the user sees the uncertainty.
  • EMPTY → no germline-aware logic; reference-relative.
Constructors and defaults

:meth:GermlineContext.from_germline_vcf defaults to COMPLETE because that's almost always what a real germline VCF is.

:meth:GermlineContext.from_multi_sample_vcf requires the caller to declare completeness explicitly (no default) — a multi-sample VCF could be either, and silently defaulting either direction is a correctness bug waiting to happen.

:meth:GermlineContext.empty always sets EMPTY.

varcode.predict_germline_aware_effect

varcode.predict_germline_aware_effect(somatic_variant, transcript, germline_ctx: GermlineContext, annotator, phase_resolver=None, window_fn=default_germline_window, max_hypotheses: int = 8)

Predict the effect of somatic_variant on transcript against the patient's germline-applied transcript.

Single entry point for germline-aware effect prediction. :func:varcode.effects.predict_variant_effects calls this whenever a non-empty :class:GermlineContext is supplied; otherwise it bypasses the germline path entirely and the existing annotator dispatch produces today's reference-relative output unchanged.

Behaviour by case:

  • No germline in the somatic's window — patient transcript ≡ reference transcript at this locus; delegate to annotator directly. SPARSE / HOTSPOTS_ONLY contexts mark the result with effect.germline_unknown = True so consumers can see the uncertainty.
  • Germline in window, phase known (resolver answers, or hemizygous, or all-cis-by-zygosity) — single patient haplotype; classify against it via :func:_classify_against_patient_baseline.
  • Germline in window, phase unknown — enumerate hypotheses (capped via max_hypotheses), classify each, wrap in :class:~varcode.effects.effect_classes.PhaseAmbiguousEffect.

LOH (somatic matches germline at position+alt with het zygosity) sets effect.is_loh = True regardless of which branch ran.

window_fn is the pluggable window selector — defaults to :func:default_germline_window (codon-level, with splice-signal expansion when the somatic is splice-adjacent). Callers that need different windows pass their own.

Source code in varcode/germline.py
def predict_germline_aware_effect(
        somatic_variant,
        transcript,
        germline_ctx: GermlineContext,
        annotator,
        phase_resolver=None,
        window_fn=default_germline_window,
        max_hypotheses: int = 8):
    """Predict the effect of ``somatic_variant`` on ``transcript``
    against the patient's germline-applied transcript.

    Single entry point for germline-aware effect prediction.
    :func:`varcode.effects.predict_variant_effects` calls this whenever
    a non-empty :class:`GermlineContext` is supplied; otherwise it
    bypasses the germline path entirely and the existing annotator
    dispatch produces today's reference-relative output unchanged.

    Behaviour by case:

    * **No germline in the somatic's window** — patient transcript ≡
      reference transcript at this locus; delegate to ``annotator``
      directly. SPARSE / HOTSPOTS_ONLY contexts mark the result with
      ``effect.germline_unknown = True`` so consumers can see the
      uncertainty.
    * **Germline in window, phase known** (resolver answers, or
      hemizygous, or all-cis-by-zygosity) — single patient haplotype;
      classify against it via :func:`_classify_against_patient_baseline`.
    * **Germline in window, phase unknown** — enumerate hypotheses
      (capped via ``max_hypotheses``), classify each, wrap in
      :class:`~varcode.effects.effect_classes.PhaseAmbiguousEffect`.

    LOH (``somatic`` matches germline at position+alt with het zygosity)
    sets ``effect.is_loh = True`` regardless of which branch ran.

    ``window_fn`` is the pluggable window selector — defaults to
    :func:`default_germline_window` (codon-level, with splice-signal
    expansion when the somatic is splice-adjacent). Callers that
    need different windows pass their own.
    """
    from pyensembl import Transcript
    if not isinstance(transcript, Transcript):
        # Mirrors annotator entry: SVs and other non-Transcript
        # consumers don't go through germline-aware prediction.
        return annotator.annotate_on_transcript(somatic_variant, transcript)

    contig, start, end = window_fn(somatic_variant, transcript)
    germline_in_window = germline_ctx.variants_in_window(contig, start, end)
    is_loh = detect_loh(somatic_variant, germline_in_window)

    if not germline_in_window:
        effect = annotator.annotate_on_transcript(
            somatic_variant, transcript)
        if germline_ctx.completeness in (
                Completeness.SPARSE, Completeness.HOTSPOTS_ONLY):
            effect.germline_unknown = True
        if is_loh:
            effect.is_loh = True
        return effect

    hypotheses = enumerate_phase_hypotheses(
        somatic_variant,
        germline_in_window,
        phase_resolver=phase_resolver,
        max_hypotheses=max_hypotheses)

    if len(hypotheses) == 1:
        effect = _classify_against_patient_baseline(
            somatic_variant, transcript, hypotheses[0])
        # Stash the phase metadata on the effect so consumers /
        # serializers can recover it. Single-hypothesis effects don't
        # need a possibility set, but the evidence is still useful.
        effect.germline_phase_state = hypotheses[0].phase_state
        effect.germline_variants_in_window = tuple(germline_in_window)
        if is_loh:
            effect.is_loh = True
        return effect

    # Multiple hypotheses → possibility set.
    candidates = tuple(
        _classify_against_patient_baseline(
            somatic_variant, transcript, h)
        for h in hypotheses)
    from .effects.effect_classes import PhaseAmbiguousEffect
    effect = PhaseAmbiguousEffect(
        variant=somatic_variant,
        transcript=transcript,
        candidates=candidates,
        hypotheses=hypotheses,
        germline_variants=tuple(germline_in_window))
    if is_loh:
        effect.is_loh = True
    return effect

varcode.apply_germline_to_transcript

varcode.apply_germline_to_transcript(transcript, germline_ctx, somatic_variant=None)

Apply germline variants from germline_ctx to transcript, returning the patient's baseline :class:MutantTranscript.

Lower-level entry point for callers that want the patient transcript directly without going through full effect prediction. Used internally by :func:predict_germline_aware_effect; exposed publicly for downstream tools (Isovar, Exacto) that want to compute a custom analysis on the patient haplotype.

Behaviour:

  • If germline_ctx is empty, returns None.
  • If somatic_variant is provided, restricts germline to the somatic's window (per :func:default_germline_window); else applies all germline variants overlapping any exon of the transcript.
  • If germline edits conflict (overlapping cDNA ranges) or land outside the CDS, returns None and the caller falls back.

The returned object is the same shape that :func:varcode.mutant_transcript.apply_variants_to_transcript produces: a :class:MutantTranscript carrying the germline edits with mutant_protein_sequence populated when the edits land after the CDS start.

Source code in varcode/germline.py
def apply_germline_to_transcript(transcript, germline_ctx, somatic_variant=None):
    """Apply germline variants from ``germline_ctx`` to ``transcript``,
    returning the patient's baseline :class:`MutantTranscript`.

    Lower-level entry point for callers that want the patient
    transcript directly without going through full effect prediction.
    Used internally by :func:`predict_germline_aware_effect`; exposed
    publicly for downstream tools (Isovar, Exacto) that want to
    compute a custom analysis on the patient haplotype.

    Behaviour:

    * If ``germline_ctx`` is empty, returns ``None``.
    * If ``somatic_variant`` is provided, restricts germline to the
      somatic's window (per :func:`default_germline_window`); else
      applies all germline variants overlapping any exon of the
      transcript.
    * If germline edits conflict (overlapping cDNA ranges) or land
      outside the CDS, returns ``None`` and the caller falls back.

    The returned object is the same shape that
    :func:`varcode.mutant_transcript.apply_variants_to_transcript`
    produces: a :class:`MutantTranscript` carrying the germline
    edits with ``mutant_protein_sequence`` populated when the edits
    land after the CDS start.
    """
    if not germline_ctx:
        return None
    from .mutant_transcript import apply_variants_to_transcript
    if somatic_variant is not None:
        contig, start, end = default_germline_window(
            somatic_variant, transcript)
        germline_in_window = germline_ctx.variants_in_window(
            contig, start, end)
    else:
        # Whole-transcript window: cover every exon. Useful for
        # callers building a single patient transcript independent of
        # any specific somatic — e.g., to translate the patient
        # protein for cohort-level analysis.
        germline_in_window = []
        try:
            for exon in transcript.exons:
                germline_in_window.extend(
                    germline_ctx.variants_in_window(
                        exon.contig, exon.start, exon.end))
        except Exception:
            return None
    if not germline_in_window:
        return None
    return apply_variants_to_transcript(
        list(germline_in_window), transcript)

varcode.enumerate_phase_hypotheses

varcode.enumerate_phase_hypotheses(somatic_variant, germline_in_window, phase_resolver=None, max_hypotheses: int = 8) -> Tuple[PhaseHypothesis, ...]

Enumerate plausible phase configurations of somatic_variant relative to germline_in_window.

Three regimes:

  • Hemizygous chromosome (chrX/Y/M, male X) — single haplotype; all germline-in-window is implicitly cis. One hypothesis.
  • Resolver answers for every pair (phase_resolver.in_cis returns True/False for each (somatic, germline_v)) — a single deterministic hypothesis with cis/trans assigned per the resolver. phase_state="phased".
  • Phase unknown — enumerate all 2^n cis/trans assignments across n germline variants. Cap at max_hypotheses; emit a single "unknown" placeholder when the cap is exceeded (consumers see a TooManyHypotheses evidence flag).

The cap is configurable so downstream pipelines that tolerate more hypotheses (long-read with rich phasing, manual analyses) can raise it. Default 8 = up to 3 germline variants in a window fully unphased.

Source code in varcode/germline.py
def enumerate_phase_hypotheses(
        somatic_variant,
        germline_in_window,
        phase_resolver=None,
        max_hypotheses: int = 8) -> Tuple[PhaseHypothesis, ...]:
    """Enumerate plausible phase configurations of ``somatic_variant``
    relative to ``germline_in_window``.

    Three regimes:

    * **Hemizygous chromosome** (chrX/Y/M, male X) — single haplotype;
      all germline-in-window is implicitly cis. One hypothesis.
    * **Resolver answers for every pair** (``phase_resolver.in_cis``
      returns True/False for each ``(somatic, germline_v)``) — a
      single deterministic hypothesis with cis/trans assigned per
      the resolver. ``phase_state="phased"``.
    * **Phase unknown** — enumerate all 2^n cis/trans assignments
      across n germline variants. Cap at ``max_hypotheses``; emit a
      single ``"unknown"`` placeholder when the cap is exceeded
      (consumers see a ``TooManyHypotheses`` evidence flag).

    The cap is configurable so downstream pipelines that tolerate more
    hypotheses (long-read with rich phasing, manual analyses) can
    raise it. Default 8 = up to 3 germline variants in a window
    fully unphased.
    """
    # Hemizygous: chrX (in males), chrY, chrM. Detection is
    # heuristic since varcode doesn't carry sex info — the X
    # chromosome detection conservatively requires the genome to
    # claim hemizygosity. For v1 we treat chrM (mitochondrial) and
    # chrY as definitely hemizygous; chrX is treated as diploid
    # by default (the female case; the male case undergenerates
    # but doesn't misgenerate).
    contig = str(somatic_variant.contig).lstrip("chr").upper()
    if contig in ("M", "MT", "Y"):
        return (PhaseHypothesis(
            cis=tuple(germline_in_window),
            trans=(),
            haplotype="A",
            phase_state="implicit"),)

    # Resolver-based phase: ask the resolver for each germline
    # variant. If the resolver answers for all of them, single
    # deterministic hypothesis.
    if (phase_resolver is not None
            and hasattr(phase_resolver, "in_cis")
            and germline_in_window):
        cis_list = []
        trans_list = []
        all_answered = True
        for g in germline_in_window:
            try:
                answer = phase_resolver.in_cis(somatic_variant, g)
            except Exception:
                all_answered = False
                break
            if answer is True:
                cis_list.append(g)
            elif answer is False:
                trans_list.append(g)
            else:
                all_answered = False
                break
        if all_answered:
            return (PhaseHypothesis(
                cis=tuple(cis_list),
                trans=tuple(trans_list),
                haplotype="A",
                phase_state="phased"),)

    # Phase unknown: enumerate 2^n cis/trans assignments across the
    # n germline variants. Cap to avoid blow-up.
    n = len(germline_in_window)
    if n == 0:
        # No germline in window — single trivial hypothesis (no
        # germline edits). Caller usually short-circuits before
        # reaching this branch, but it's a safe default.
        return (PhaseHypothesis(
            cis=(),
            trans=(),
            haplotype="A",
            phase_state="phased"),)

    if 2 ** n > max_hypotheses:
        # Bail out cleanly — emit a single "too many" hypothesis with
        # all germline marked cis (the conservative case where
        # somatic effect is most strongly germline-modified).
        return (PhaseHypothesis(
            cis=tuple(germline_in_window),
            trans=(),
            haplotype="unknown",
            phase_state="too_many_hypotheses"),)

    hypotheses: List[PhaseHypothesis] = []
    germline_tuple = tuple(germline_in_window)
    for mask in range(2 ** n):
        cis = []
        trans = []
        for i, g in enumerate(germline_tuple):
            if mask & (1 << i):
                cis.append(g)
            else:
                trans.append(g)
        # Label haplotype "A" for the all-cis case, "B" for all-trans,
        # "mixed" otherwise. These are opaque tags consumers use for
        # cross-axis matching with RNA evidence.
        if not trans:
            hap = "A"
        elif not cis:
            hap = "B"
        else:
            hap = "A_mixed_%d" % mask
        hypotheses.append(PhaseHypothesis(
            cis=tuple(cis),
            trans=tuple(trans),
            haplotype=hap,
            phase_state="unknown"))
    return tuple(hypotheses)

varcode.detect_loh

varcode.detect_loh(somatic_variant, germline_in_window) -> bool

True when somatic_variant is identical at (position, alt) to a germline variant in the window.

LOH is the most common "looks somatic but isn't really" case — the patient was germline het at this position, and the tumor lost the reference allele, so the variant call says "alt" in tumor and "het" in normal but the alt itself is the germline allele. We flag the resulting effect with is_loh=True so consumers can distinguish a true somatic mutation from a zygosity change.

Only same-position-and-alt counts. A position where germline and somatic disagree on alt is a different mutation, not LOH.

Source code in varcode/germline.py
def detect_loh(somatic_variant, germline_in_window) -> bool:
    """True when ``somatic_variant`` is identical at (position, alt)
    to a germline variant in the window.

    LOH is the most common "looks somatic but isn't really" case —
    the patient was germline het at this position, and the tumor lost
    the reference allele, so the variant call says "alt" in tumor and
    "het" in normal but the alt itself is the germline allele. We
    flag the resulting effect with ``is_loh=True`` so consumers can
    distinguish a true somatic mutation from a zygosity change.

    Only same-position-and-alt counts. A position where germline and
    somatic disagree on alt is a different mutation, not LOH.
    """
    for g in germline_in_window:
        if (g.contig == somatic_variant.contig
                and g.start == somatic_variant.start
                and g.ref == somatic_variant.ref
                and g.alt == somatic_variant.alt):
            return True
    return False

varcode.default_germline_window

varcode.default_germline_window(somatic_variant, transcript) -> Tuple[str, int, int]

Default window for looking up germline variants relevant to a somatic variant on a transcript.

Returns (contig, start, end) covering the codon containing the somatic variant — three reference bases on each side of somatic_variant.start. This is the window from #268's table for in-exon coding variants.

Larger windows (splice signal region for splice-adjacent variants, same exon for frameshift candidates) are useful refinements but don't change the API. Callers that need them pass a custom window_fn to :func:predict_germline_aware_effect.

Splice-adjacent: when the somatic is within 6bp of an exon-intron boundary, expand to a 12bp window centered on the boundary so germline edits to the donor / acceptor signal show up in the lookup. This catches the "germline broke the splice site" case without forcing the caller to wire up a separate window function.

Source code in varcode/germline.py
def default_germline_window(somatic_variant, transcript) -> Tuple[str, int, int]:
    """Default window for looking up germline variants relevant to a
    somatic variant on a transcript.

    Returns ``(contig, start, end)`` covering the codon containing the
    somatic variant — three reference bases on each side of
    ``somatic_variant.start``. This is the window from #268's table
    for in-exon coding variants.

    Larger windows (splice signal region for splice-adjacent variants,
    same exon for frameshift candidates) are useful refinements but
    don't change the API. Callers that need them pass a custom
    ``window_fn`` to :func:`predict_germline_aware_effect`.

    Splice-adjacent: when the somatic is within 6bp of an exon-intron
    boundary, expand to a 12bp window centered on the boundary so
    germline edits to the donor / acceptor signal show up in the
    lookup. This catches the "germline broke the splice site" case
    without forcing the caller to wire up a separate window function.
    """
    contig = somatic_variant.contig
    pos = somatic_variant.start
    # Default: the codon containing the somatic. Three bases on either
    # side — slightly wider than strictly necessary so overlapping
    # frame-aware codon membership is conservative.
    start = pos - 3
    end = (getattr(somatic_variant, "end", None) or pos) + 3
    # Splice-adjacent expansion: if any of this transcript's exon
    # boundaries is within 6bp of the somatic, widen to capture the
    # canonical splice signal region (MAG | GURAGU and YAG | R, ~12bp).
    try:
        for exon in transcript.exons:
            for boundary in (exon.start, exon.end):
                if abs(boundary - pos) <= 6:
                    start = min(start, boundary - 6)
                    end = max(end, boundary + 6)
    except Exception:
        # Hand-built transcripts in tests may not have exons; the
        # default codon window is a fine fallback.
        pass
    return contig, max(1, start), end

VariantCollection transforms

varcode.transforms.pair_breakends

varcode.transforms.pair_breakends(vc)

Merge MATEID-paired BND rows into a single combined StructuralVariant per rearrangement event. (reduces)

For each pair of :class:~varcode.StructuralVariant rows where row A's MATEID references row B's VCF ID (and vice versa), emit one combined row carrying both endpoints. The combined variant's source_variants attribute holds the two originals.

Non-BND variants, single-row TRA, and single-ended BNDs (no MATEID) pass through unchanged with source_variants=().

Pairing rules:

  • Primary key: MATEID field on each variant's info dict against the VCF row ID stored in the collection's source metadata.
  • Alias: PARID (used by older GRIDSS) is treated as MATEID.
  • Symmetric: row A's MATEID must equal row B's ID and row B's MATEID must equal row A's ID. Asymmetric references are warned and left unpaired.
  • If a MATEID points to an ID not present in this collection (filtered out, chunked load), a warning is emitted and the variant passes through unpaired.
  • If three or more rows share a MATEID group, the whole group is left unpaired with a warning (pairing is ambiguous).
  • Already-paired input (source_variants non-empty) passes through unchanged — :func:pair_breakends is idempotent.

Metadata merge for paired rows:

  • Genotype: both halves must agree on the per-sample GT. The combined variant inherits the shared genotype. Disagreement raises :class:ValueError.
  • alt_assembly: if exactly one half carries an assembled sequence, the combined variant inherits it. If both differ, A's wins (deterministic via lex source-ID ordering) with a warning.
  • filter: union of both halves' FILTER tokens; PASS is dropped if any non-PASS label is present (stricter wins).
  • qual: minimum of the two halves' quality scores.
  • Other INFO fields: prefer A's; the other half is reachable via combined.source_variants.
PARAMETER DESCRIPTION
vc

Input collection. May contain a mix of structural and non-structural variants.

TYPE: VariantCollection

RETURNS DESCRIPTION
VariantCollection

A new collection. SV rows that were halves of paired BNDs are replaced by combined rows; everything else (including SNVs/indels/MNVs and unpaired SVs) passes through.

Source code in varcode/transforms/__init__.py
def pair_breakends(vc):
    """Merge MATEID-paired BND rows into a single combined
    ``StructuralVariant`` per rearrangement event. (reduces)

    For each pair of :class:`~varcode.StructuralVariant` rows where
    row A's ``MATEID`` references row B's VCF ID (and vice versa),
    emit one combined row carrying both endpoints. The combined
    variant's ``source_variants`` attribute holds the two originals.

    Non-BND variants, single-row TRA, and single-ended BNDs (no
    ``MATEID``) pass through unchanged with ``source_variants=()``.

    Pairing rules:

    * Primary key: ``MATEID`` field on each variant's ``info`` dict
      against the VCF row ID stored in the collection's source
      metadata.
    * Alias: ``PARID`` (used by older GRIDSS) is treated as ``MATEID``.
    * Symmetric: row A's ``MATEID`` must equal row B's ID and row B's
      ``MATEID`` must equal row A's ID. Asymmetric references are
      warned and left unpaired.
    * If a ``MATEID`` points to an ID not present in this collection
      (filtered out, chunked load), a warning is emitted and the
      variant passes through unpaired.
    * If three or more rows share a ``MATEID`` group, the whole group
      is left unpaired with a warning (pairing is ambiguous).
    * Already-paired input (``source_variants`` non-empty) passes
      through unchanged — :func:`pair_breakends` is idempotent.

    Metadata merge for paired rows:

    * Genotype: both halves must agree on the per-sample ``GT``. The
      combined variant inherits the shared genotype. Disagreement
      raises :class:`ValueError`.
    * ``alt_assembly``: if exactly one half carries an assembled
      sequence, the combined variant inherits it. If both differ,
      A's wins (deterministic via lex source-ID ordering) with a
      warning.
    * ``filter``: union of both halves' FILTER tokens; ``PASS`` is
      dropped if any non-PASS label is present (stricter wins).
    * ``qual``: minimum of the two halves' quality scores.
    * Other INFO fields: prefer A's; the other half is reachable
      via ``combined.source_variants``.

    Parameters
    ----------
    vc : VariantCollection
        Input collection. May contain a mix of structural and
        non-structural variants.

    Returns
    -------
    VariantCollection
        A new collection. SV rows that were halves of paired BNDs
        are replaced by combined rows; everything else (including
        SNVs/indels/MNVs and unpaired SVs) passes through.
    """
    variant_to_id, id_to_variants = _build_id_indices(vc)

    # For each BND variant, what does it claim as its mate?
    bnd_to_mateid = {}
    for variant in vc:
        if not _is_breakend(variant):
            continue
        if getattr(variant, "source_variants", ()):
            continue
        mate_id = _mate_reference(variant)
        if mate_id is None:
            continue
        bnd_to_mateid[variant] = mate_id

    # How many rows reference each ID as their mate? An ID with
    # in-degree > 1 indicates ambiguity — a clean pair requires the
    # MATEID pointer to be 1:1.
    mate_in_degree = {}
    for mate_id in bnd_to_mateid.values():
        mate_in_degree[mate_id] = mate_in_degree.get(mate_id, 0) + 1

    # Walk BND variants and resolve each to one of: paired (with a
    # specific mate variant), ambiguous (skip with warning naming the
    # connected component), missing-mate (warn), asymmetric (warn).
    replacement = {}     # original variant -> combined variant
    processed = set()    # variants whose pairing decision is final
    ambiguity_warned = set()   # connected-component ids already warned about

    def _component_ids(seed_id):
        """Connected component of mate references reachable from
        ``seed_id``. Used to produce one warning per ambiguous group
        rather than one per row."""
        visited = set()
        stack = [seed_id]
        while stack:
            node = stack.pop()
            if node in visited:
                continue
            visited.add(node)
            # Outgoing: variants whose ID is `node` -- find their mateids.
            for v in id_to_variants.get(node, ()):
                mid = bnd_to_mateid.get(v)
                if mid is not None and mid not in visited:
                    stack.append(mid)
            # Incoming: any mateid pointing AT `node`.
            for v, mid in bnd_to_mateid.items():
                if mid == node:
                    own = variant_to_id.get(v)
                    if own is not None and own not in visited:
                        stack.append(own)
        return visited

    for variant in vc:
        if not _is_breakend(variant):
            continue
        if variant in processed:
            continue
        if getattr(variant, "source_variants", ()):
            continue
        own_id = variant_to_id.get(variant)
        if own_id is None:
            continue
        mate_id = bnd_to_mateid.get(variant)
        if mate_id is None:
            continue
        if mate_id not in id_to_variants:
            warnings.warn(
                "pair_breakends: BND %r references MATEID/PARID %r "
                "which is not in this collection; left unpaired. "
                "This typically means the mate row was filter-dropped "
                "or split across chunked loads."
                % (own_id, mate_id),
                stacklevel=2)
            processed.add(variant)
            continue
        # Ambiguity: this variant or its claimed mate has incoming
        # mate-degree > 1, meaning at least one other row also points
        # at the same target. Whole connected component is unsafe.
        if (mate_in_degree.get(own_id, 0) > 1
                or mate_in_degree.get(mate_id, 0) > 1):
            component = _component_ids(own_id)
            component_key = frozenset(component)
            if component_key not in ambiguity_warned:
                ambiguity_warned.add(component_key)
                warnings.warn(
                    "pair_breakends: BND ID group %r has ambiguous "
                    "MATEID/PARID references (at least one ID is "
                    "referenced by more than one row); left unpaired. "
                    "Each rearrangement should produce exactly two BND "
                    "rows with 1:1 mate pointers."
                    % sorted(component),
                    stacklevel=2)
            # Mark every variant in the component as processed so we
            # don't re-warn from a different vantage point.
            for cid in component:
                for v in id_to_variants.get(cid, ()):
                    processed.add(v)
            continue
        # Identify the specific mate variant. With in-degree 1 there's
        # exactly one BND row at `mate_id`.
        mate_candidates = [
            v for v in id_to_variants[mate_id]
            if _is_breakend(v)
        ]
        if not mate_candidates:
            warnings.warn(
                "pair_breakends: BND %r references MATEID %r but that "
                "ID is held by a non-BND variant; left unpaired."
                % (own_id, mate_id),
                stacklevel=2)
            processed.add(variant)
            continue
        mate = mate_candidates[0]
        # Symmetric check: mate must point back at us.
        if bnd_to_mateid.get(mate) != own_id:
            warnings.warn(
                "pair_breakends: BND %r mate reference is asymmetric "
                "(this row -> %r but mate -> %r); left unpaired."
                % (own_id, mate_id, bnd_to_mateid.get(mate)),
                stacklevel=2)
            processed.add(variant)
            processed.add(mate)
            continue
        # Clean pair.
        a, b = sorted([variant, mate], key=lambda v: variant_to_id[v])
        a_id = variant_to_id[a]
        b_id = variant_to_id[b]
        combined = _build_combined(a, b, a_id, b_id)
        replacement[a] = combined
        replacement[b] = combined
        processed.add(a)
        processed.add(b)

    # Build output VC. Pass-through variants land first-occurrence; paired
    # variants are replaced by their combined variant the first time either
    # half is encountered, then the second half is skipped to preserve
    # ordering and cardinality.
    out_variants = []
    emitted_combined = set()
    for variant in vc:
        combined = replacement.get(variant)
        if combined is None:
            out_variants.append(variant)
            continue
        if id(combined) in emitted_combined:
            continue
        emitted_combined.add(id(combined))
        out_variants.append(combined)

    # Build output metadata. Pass-through variants reuse the existing
    # metadata entries by reference; combined variants get a freshly
    # merged entry written to every source path that contained either
    # half.
    out_metadata = {path: {} for path in vc.source_to_metadata_dict}
    for path, by_variant in vc.source_to_metadata_dict.items():
        out_by_variant = out_metadata[path]
        for variant, meta in by_variant.items():
            combined = replacement.get(variant)
            if combined is None:
                out_by_variant[variant] = meta
            elif combined not in out_by_variant:
                a, b = combined.source_variants
                a_meta = by_variant.get(a)
                b_meta = by_variant.get(b)
                a_id = variant_to_id[a]
                b_id = variant_to_id[b]
                out_by_variant[combined] = _merge_metadata(
                    a_meta, b_meta, a_id, b_id)

    return VariantCollection(
        variants=out_variants,
        sources=vc.sources,
        source_to_metadata_dict=out_metadata,
    )

varcode.transforms.left_align_indels

varcode.transforms.left_align_indels(vc)

Shift indels to their canonical leftmost equivalent position. (preserves)

Indels in homopolymer or short-tandem-repeat regions can be represented at any of several equivalent positions — CTT->T inside a CT-repeat means the same biological event as CT->_ two positions to the left. Tools that compare variants by (contig, start, ref, alt) see those representations as distinct calls. Left-alignment normalizes to a single canonical representation per indel: the leftmost equivalent position.

The algorithm is the standard variant-normalization left-shift used by bcftools norm and GATK LeftAlignAndTrimVariants, applied as an opt-in VariantCollection -> VariantCollection transform rather than baked into VCF load.

Reference sequence is read via the genome the variants carry — no explicit reference parameter. Coverage tiers (see :mod:varcode.genome_sequence):

  • Chromosome FASTA attached (via :class:varcode.Genome's fasta slot): indels everywhere shift to canonical positions, including in introns and intergenic regions.
  • No FASTA (default pyensembl install): indels fully within an exon shift via the transcript cDNA fallback. Intronic and intergenic indels pass through unchanged. Indels that start exonic but would shift across an exon boundary stop at the boundary and carry info["left_align_partial"] = True.
PARAMETER DESCRIPTION
vc

Input collection. May contain a mix of SNVs, MNVs, indels, complex variants, and SVs — only pure indels (length-different REF/ALT with one side empty after suffix trimming) are considered for shifting. Everything else passes through.

TYPE: VariantCollection

RETURNS DESCRIPTION
VariantCollection

A new collection with indels at their canonical leftmost positions. Variants that shifted carry source_variants=(original,); everything else passes through with source_variants=().

See

TYPE: doc:`/transforms` for the behavior table covering all

six (location × FASTA-attached) combinations and the metadata
fields the transform writes.

Examples:

>>> from varcode.transforms import left_align_indels
>>> normalized = left_align_indels(vc)
Source code in varcode/transforms/__init__.py
def left_align_indels(vc):
    """Shift indels to their canonical leftmost equivalent position. (preserves)

    Indels in homopolymer or short-tandem-repeat regions can be
    represented at any of several equivalent positions — ``CTT->T``
    inside a ``CT``-repeat means the same biological event as
    ``CT->_`` two positions to the left. Tools that compare variants
    by ``(contig, start, ref, alt)`` see those representations as
    distinct calls. Left-alignment normalizes to a single canonical
    representation per indel: the leftmost equivalent position.

    The algorithm is the standard variant-normalization left-shift
    used by ``bcftools norm`` and GATK ``LeftAlignAndTrimVariants``,
    applied as an opt-in ``VariantCollection -> VariantCollection``
    transform rather than baked into VCF load.

    Reference sequence is read via the genome the variants carry —
    no explicit ``reference`` parameter. Coverage tiers (see
    :mod:`varcode.genome_sequence`):

    * **Chromosome FASTA attached** (via :class:`varcode.Genome`'s
      ``fasta`` slot): indels everywhere shift to canonical
      positions, including in introns and intergenic regions.
    * **No FASTA** (default pyensembl install): indels fully within
      an exon shift via the transcript cDNA fallback. Intronic and
      intergenic indels pass through unchanged. Indels that *start*
      exonic but would shift across an exon boundary stop at the
      boundary and carry ``info["left_align_partial"] = True``.

    Parameters
    ----------
    vc : VariantCollection
        Input collection. May contain a mix of SNVs, MNVs, indels,
        complex variants, and SVs — only pure indels (length-different
        REF/ALT with one side empty after suffix trimming) are
        considered for shifting. Everything else passes through.

    Returns
    -------
    VariantCollection
        A new collection with indels at their canonical leftmost
        positions. Variants that shifted carry
        ``source_variants=(original,)``; everything else passes through
        with ``source_variants=()``.

    See :doc:`/transforms` for the behavior table covering all
    six (location × FASTA-attached) combinations and the metadata
    fields the transform writes.

    Examples
    --------

    >>> from varcode.transforms import left_align_indels
    >>> normalized = left_align_indels(vc)  # doctest: +SKIP
    """
    # original variant -> (shifted variant, bounded_by_coverage flag).
    # The flag rides with the shifted variant so we don't need a
    # second data structure keyed on id() — the lifecycle of the
    # flag and the lifecycle of the shifted variant are identical.
    replacement = {}

    for variant in vc:
        if not variant.is_indel:
            continue
        shifted, partial = _left_align_one(variant, _reference_range)
        if shifted is variant:
            continue
        replacement[variant] = (shifted, partial)

    if not replacement:
        return vc

    out_variants = [
        replacement[v][0] if v in replacement else v
        for v in vc
    ]

    out_metadata = {}
    for path, by_variant in vc.source_to_metadata_dict.items():
        out_by_variant = {}
        for variant, meta in by_variant.items():
            entry = replacement.get(variant)
            if entry is None:
                out_by_variant[variant] = meta
            else:
                shifted, partial = entry
                new_meta = dict(meta) if meta else {}
                info = dict(new_meta.get("info") or {})
                info["original_start"] = variant.start
                if partial:
                    info["left_align_partial"] = True
                new_meta["info"] = info
                out_by_variant[shifted] = new_meta
        out_metadata[path] = out_by_variant

    return VariantCollection(
        variants=out_variants,
        sources=vc.sources,
        source_to_metadata_dict=out_metadata,
    )

File loading

varcode.load_vcf

varcode.vcf.load_vcf(path, genome=None, reference_vcf_key='reference', only_passing=True, allow_extended_nucleotides=False, include_info=True, chunk_size=10 ** 5, max_variants=None, sort_key=variant_ascending_position_sort_key, distinct=True, normalize_contig_names=True, convert_ucsc_contig_names=True, parse_structural_variants=False, genome_fasta=None)

Load reference name and Variant objects from the given VCF filename.

Local files are parsed directly. HTTP/HTTPS URLs are downloaded to a temporary file and load_vcf recurses on the local copy; pandas doesn't reliably stream gzipped HTTP responses, so we materialize first.

PARAMETER DESCRIPTION
path

Path to VCF (.vcf) or compressed VCF (.vcf.gz).

TYPE: str

genome

Optionally pass in a PyEnsembl Genome object, name of reference, or PyEnsembl release version to specify the reference associated with a VCF (otherwise infer reference from VCF using reference_vcf_key)

TYPE: pyensembl.Genome, reference name, Ensembl version int DEFAULT: pyensembl.Genome

reference_vcf_key

Name of metadata field which contains path to reference FASTA file (default = 'reference')

TYPE: str DEFAULT: 'reference'

only_passing

If true, any entries whose FILTER field is not one of "." or "PASS" is dropped.

TYPE: bool DEFAULT: True

allow_extended_nucleotides

Allow characters other that A,C,T,G in the ref and alt strings.

TYPE: bool DEFAULT: False

include_info

Whether to parse the INFO and per-sample columns. If you don't need these, set to False for faster parsing.

TYPE: bool DEFAULT: True

chunk_size

Number of records to load in memory at once.

DEFAULT: 10 ** 5

max_variants

If specified, return only the first max_variants variants.

TYPE: int DEFAULT: None

sort_key

Function which maps each element to a sorting criterion. Set to None to not to sort the variants.

TYPE: fn DEFAULT: variant_ascending_position_sort_key

distinct

Don't keep repeated variants

TYPE: bool DEFAULT: True

normalize_contig_names

By default contig names will be normalized by converting integers to strings (e.g. 1 -> "1"), and converting any letters after "chr" to uppercase (e.g. "chrx" -> "chrX"). If you don't want this behavior then pass normalize_contig_names=False.

TYPE: bool DEFAULT: True

convert_ucsc_contig_names

Convert chromosome names from hg19 (e.g. "chr1") to equivalent names for GRCh37 (e.g. "1"). By default this is set to True. If None, it also evaluates to True if the genome of the VCF is a UCSC reference.

TYPE: bool DEFAULT: True

genome_fasta

Optionally attach a chromosome FASTA to the resolved genome before parsing. Equivalent to wrapping with varcode.Genome(genome, fasta=genome_fasta) and passing the wrapper; see :class:varcode.Genome for the accepted types and verification behavior. Without this, features that need raw genomic bases (cryptic-exon scoring, indel left-alignment, sequence-aware splice prediction) fall back to transcript-cDNA coverage only.

TYPE: str, path-like, or FASTA object DEFAULT: None

Source code in varcode/vcf.py
def load_vcf(
        path,
        genome=None,
        reference_vcf_key="reference",
        only_passing=True,
        allow_extended_nucleotides=False,
        include_info=True,
        chunk_size=10 ** 5,
        max_variants=None,
        sort_key=variant_ascending_position_sort_key,
        distinct=True,
        normalize_contig_names=True,
        convert_ucsc_contig_names=True,
        parse_structural_variants=False,
        genome_fasta=None):
    """
    Load reference name and Variant objects from the given VCF filename.

    Local files are parsed directly. HTTP/HTTPS URLs are downloaded to a
    temporary file and ``load_vcf`` recurses on the local copy; pandas
    doesn't reliably stream gzipped HTTP responses, so we materialize first.

    Parameters
    ----------

    path : str
        Path to VCF (*.vcf) or compressed VCF (*.vcf.gz).

    genome : {pyensembl.Genome, reference name, Ensembl version int}, optional
        Optionally pass in a PyEnsembl Genome object, name of reference, or
        PyEnsembl release version to specify the reference associated with a
        VCF (otherwise infer reference from VCF using reference_vcf_key)

    reference_vcf_key : str, optional
        Name of metadata field which contains path to reference FASTA
        file (default = 'reference')

    only_passing : bool, optional
        If true, any entries whose FILTER field is not one of "." or "PASS" is
        dropped.

    allow_extended_nucleotides : bool, default False
        Allow characters other that A,C,T,G in the ref and alt strings.

    include_info : bool, default True
        Whether to parse the INFO and per-sample columns. If you don't need
        these, set to False for faster parsing.

    chunk_size: int, optional
        Number of records to load in memory at once.

    max_variants : int, optional
        If specified, return only the first max_variants variants.

    sort_key : fn
        Function which maps each element to a sorting criterion.
        Set to None to not to sort the variants.

    distinct : bool, default True
        Don't keep repeated variants

    normalize_contig_names : bool, default True
        By default contig names will be normalized by converting integers
        to strings (e.g. 1 -> "1"), and converting any letters after "chr"
        to uppercase (e.g. "chrx" -> "chrX"). If you don't want
        this behavior then pass normalize_contig_names=False.

    convert_ucsc_contig_names : bool, default True
        Convert chromosome names from hg19 (e.g. "chr1") to equivalent names
        for GRCh37 (e.g. "1"). By default this is set to True. If None, it
        also evaluates to True if the genome of the VCF is a UCSC reference.

    genome_fasta : str, path-like, or FASTA object, optional
        Optionally attach a chromosome FASTA to the resolved genome
        before parsing. Equivalent to wrapping with
        ``varcode.Genome(genome, fasta=genome_fasta)`` and passing the
        wrapper; see :class:`varcode.Genome` for the accepted types
        and verification behavior. Without this, features that need
        raw genomic bases (cryptic-exon scoring, indel
        left-alignment, sequence-aware splice prediction) fall back
        to transcript-cDNA coverage only.
    """

    require_string(path, "Path or URL to VCF")
    parsed_path = parse_url_or_path(path)

    if parsed_path.scheme and parsed_path.scheme.lower() != "file":
        # pandas.read_table nominally supports HTTP, but it tends to crash on
        # large files and does not support gzip. Switching to the python-based
        # implementation of read_table (with engine="python") helps with some
        # issues but introduces a new set of problems (e.g. the dtype parameter
        # is not accepted). For these reasons, we're currently not attempting
        # to load VCFs over HTTP with pandas directly, and instead download it
        # to a temporary file and open that.

        (filename, headers) = urllib.request.urlretrieve(path)
        try:
            # The downloaded file has no file extension, which confuses pyvcf
            # for gziped files in Python 3. We rename it to have the correct
            # file extension.
            new_filename = "%s.%s" % (
                filename, parsed_path.path.split(".")[-1])
            os.rename(filename, new_filename)
            filename = new_filename
            return load_vcf(
                filename,
                genome=genome,
                reference_vcf_key=reference_vcf_key,
                only_passing=only_passing,
                allow_extended_nucleotides=allow_extended_nucleotides,
                include_info=include_info,
                genome_fasta=genome_fasta,
                chunk_size=chunk_size,
                max_variants=max_variants,
                sort_key=sort_key,
                distinct=distinct,
                normalize_contig_names=normalize_contig_names,
                convert_ucsc_contig_names=convert_ucsc_contig_names,
                parse_structural_variants=parse_structural_variants)
        finally:
            logger.info("Removing temporary file: %s", filename)
            os.unlink(filename)

    # Loading a local file.
    # The file will be opened twice: first to parse the header, then by
    # pandas to read the data. Normalize away any file:// scheme so the
    # opener (and pandas, below) get a plain filesystem path.
    if parsed_path.scheme and parsed_path.scheme.lower() == "file":
        path = parsed_path.path
    header = VCFHeader.from_path(path)

    ####
    # The following code looks a bit crazy because it's motivated by the
    # desired to preserve UCSC reference names even though the Variant
    # objects we're creating will convert them to EnsemblRelease genomes
    # with different reference names.
    #
    # For example, if a VCF is aligned against 'hg19' then we want to create a
    # variant which has 'hg19' as its genome argument, so that serialization
    # back to VCF will put the correct reference genome in the generated
    # header.
    if genome is None:
        if reference_vcf_key not in header.metadata:
            raise ValueError("Unable to infer reference genome for %s" % (path,))
        genome = header.metadata[reference_vcf_key]

    genome, genome_was_ucsc = infer_genome(genome)
    if genome_was_ucsc:
        genome = ensembl_to_ucsc_reference_names[genome.reference_name]

    if convert_ucsc_contig_names is None:
        convert_ucsc_contig_names = genome_was_ucsc

    if genome_fasta is not None:
        # Wrap the resolved genome in varcode.Genome so the FASTA is
        # part of the genome object rather than a side-channel
        # attachment. Late import keeps the cost on callers who opt in.
        from .genome import Genome as VarcodeGenome
        genome = VarcodeGenome(genome, fasta=genome_fasta)

    df_iterator = read_vcf_into_dataframe(
        path,
        include_info=include_info,
        sample_names=header.samples if include_info else None,
        chunk_size=chunk_size)

    if include_info:
        sample_info_parser = header.parse_samples
    else:
        sample_info_parser = None

    variant_kwargs = {
        'genome': genome,
        'allow_extended_nucleotides': allow_extended_nucleotides,
        'normalize_contig_names': normalize_contig_names,
        'convert_ucsc_contig_names': convert_ucsc_contig_names,
    }

    variant_collection_kwargs = {
        'sort_key': sort_key,
        'distinct': distinct
    }

    # TODO: drop chrMT variants from hg19 and warn user about it

    return dataframes_to_variant_collection(
        df_iterator,
        source_path=path,
        info_parser=header.parse_info if include_info else None,
        only_passing=only_passing,
        max_variants=max_variants,
        sample_names=header.samples if include_info else None,
        sample_info_parser=sample_info_parser,
        variant_kwargs=variant_kwargs,
        variant_collection_kwargs=variant_collection_kwargs,
        parse_structural_variants=parse_structural_variants)

varcode.load_maf

varcode.load_maf(path, optional_cols=[], sort_key=variant_ascending_position_sort_key, distinct=True, raise_on_error=True, encoding=None, nrows=None)

Load reference name and Variant objects from MAF filename.

PARAMETER DESCRIPTION
path

Path to MAF (*.maf).

TYPE: str

optional_cols

A list of MAF columns to include as metadata if they are present in the MAF. Does not result in an error if those columns are not present.

TYPE: list DEFAULT: []

sort_key

Function which maps each element to a sorting criterion. Set to None to not to sort the variants.

TYPE: fn DEFAULT: variant_ascending_position_sort_key

distinct

Don't keep repeated variants

TYPE: bool DEFAULT: True

raise_on_error

Raise an exception upon encountering an error or just log a warning.

TYPE: bool DEFAULT: True

encoding

Encoding to use for UTF when reading MAF file.

TYPE: str DEFAULT: None

nrows

Limit to number of rows loaded

TYPE: int DEFAULT: None

Source code in varcode/maf.py
def load_maf(
        path,
        optional_cols=[],
        sort_key=variant_ascending_position_sort_key,
        distinct=True,
        raise_on_error=True,
        encoding=None,
        nrows=None):
    """
    Load reference name and Variant objects from MAF filename.

    Parameters
    ----------

    path : str
        Path to MAF (*.maf).

    optional_cols : list, optional
        A list of MAF columns to include as metadata if they are present in the MAF.
        Does not result in an error if those columns are not present.

    sort_key : fn
        Function which maps each element to a sorting criterion.
        Set to None to not to sort the variants.

    distinct : bool
        Don't keep repeated variants

    raise_on_error : bool
        Raise an exception upon encountering an error or just log a warning.

    encoding : str, optional
        Encoding to use for UTF when reading MAF file.

    nrows : int, optional
        Limit to number of rows loaded
    """
    # pylint: disable=no-member
    # pylint gets confused by read_csv inside load_maf_dataframe
    maf_df = load_maf_dataframe(
        path,
        nrows=nrows,
        raise_on_error=raise_on_error,
        encoding=encoding)

    if len(maf_df) == 0 and raise_on_error:
        raise ValueError("Empty MAF file %s" % path)

    ensembl_objects = {}
    variants = []
    metadata = {}
    for _, x in maf_df.iterrows():
        contig = x.Chromosome
        if isnull(contig):
            error_message = "Invalid contig name: %s" % (contig,)
            if raise_on_error:
                raise ValueError(error_message)
            else:
                logging.warn(error_message)
                continue

        start_pos = x.Start_Position
        ref = x.Reference_Allele

        # it's possible in a MAF file to have multiple Ensembl releases
        # mixed in a single MAF file (the genome assembly is
        # specified by the NCBI_Build column)
        ncbi_build = x.NCBI_Build
        if ncbi_build in ensembl_objects:
            genome = ensembl_objects[ncbi_build]
        else:
            if isinstance(ncbi_build, int):
                reference_name = "B%d" % ncbi_build
            else:
                reference_name = str(ncbi_build)
            genome, _ = infer_genome(reference_name)
            ensembl_objects[ncbi_build] = genome

        # have to try both Tumor_Seq_Allele1 and Tumor_Seq_Allele2
        # to figure out which is different from the reference allele
        if x.Tumor_Seq_Allele1 != ref:
            alt = x.Tumor_Seq_Allele1
        else:
            if x.Tumor_Seq_Allele2 == ref:
                error_message = (
                    "Both tumor alleles agree with reference %s: %s" % (
                        ref, x,))
                if raise_on_error:
                    raise ValueError(error_message)
                else:
                    logging.warn(error_message)
                    continue
            alt = x.Tumor_Seq_Allele2

        variant = Variant(
            contig,
            start_pos,
            str(ref),
            str(alt),
            genome)

        # keep metadata about the variant and its TCGA annotation
        metadata[variant] = {
            'Hugo_Symbol': x.Hugo_Symbol,
            'Center': x.Center,
            'Strand': x.Strand,
            'Variant_Classification': x.Variant_Classification,
            'Variant_Type': x.Variant_Type,
            'dbSNP_RS': x.dbSNP_RS,
            'dbSNP_Val_Status': x.dbSNP_Val_Status,
            'Tumor_Sample_Barcode': x.Tumor_Sample_Barcode,
            'Matched_Norm_Sample_Barcode': x.Matched_Norm_Sample_Barcode,
        }
        for optional_col in optional_cols:
            if optional_col in x:
                metadata[variant][optional_col] = x[optional_col]

        variants.append(variant)

    return VariantCollection(
        variants=variants,
        source_to_metadata_dict={path: metadata},
        sort_key=sort_key,
        distinct=distinct)

varcode.load_maf_dataframe(path, nrows=None, raise_on_error=True, encoding=None)

Load the guaranteed columns of a TCGA MAF file into a DataFrame

PARAMETER DESCRIPTION
path

Path to MAF file

TYPE: str

nrows

Optional limit to number of rows loaded

TYPE: int DEFAULT: None

raise_on_error

Raise an exception upon encountering an error or log an error

TYPE: bool DEFAULT: True

encoding

Encoding to use for UTF when reading MAF file.

TYPE: str DEFAULT: None

Source code in varcode/maf.py
def load_maf_dataframe(path, nrows=None, raise_on_error=True, encoding=None):
    """
    Load the guaranteed columns of a TCGA MAF file into a DataFrame

    Parameters
    ----------
    path : str
        Path to MAF file

    nrows : int
        Optional limit to number of rows loaded

    raise_on_error : bool
        Raise an exception upon encountering an error or log an error

    encoding : str, optional
        Encoding to use for UTF when reading MAF file.
    """
    require_string(path, "Path to MAF")

    n_basic_columns = len(MAF_COLUMN_NAMES)

    # pylint: disable=no-member
    # pylint gets confused by read_csv
    df = pandas.read_csv(
        path,
        comment="#",
        sep="\t",
        low_memory=False,
        skip_blank_lines=True,
        header=0,
        nrows=nrows,
        encoding=encoding)

    if len(df.columns) < n_basic_columns:
        error_message = (
            "Too few columns in MAF file %s, expected %d but got  %d : %s" % (
                path, n_basic_columns, len(df.columns), df.columns))
        if raise_on_error:
            raise ValueError(error_message)
        else:
            logging.warn(error_message)

    # check each pair of expected/actual column names to make sure they match
    for expected, actual in zip(MAF_COLUMN_NAMES, df.columns):
        if expected != actual:
            # MAFs in the wild have capitalization differences in their
            # column names, normalize them to always use the names above
            if expected.lower() == actual.lower():
                # using DataFrame.rename in Python 2.7.x doesn't seem to
                # work for some files, possibly because Pandas treats
                # unicode vs. str columns as different?
                df[expected] = df[actual]
                del df[actual]
            else:
                error_message = (
                    "Expected column %s but got %s" % (expected, actual))
                if raise_on_error:
                    raise ValueError(error_message)
                else:
                    logging.warn(error_message)

    return df

Exceptions

varcode.ReferenceMismatchError

varcode.ReferenceMismatchError(variant, transcript, expected_ref, observed_ref, transcript_offset=None, genome_start=None, genome_end=None)

Bases: ValueError

Raised when a variant's reported ref allele does not match the reference genome at the variant's position.

This most often means one of:

  • The variant was called against a different reference build than the one being used for annotation (e.g. GRCh37 vs GRCh38).
  • The variant's ref field was populated with the patient's germline allele rather than the canonical reference. VCF requires the ref field to match the reference genome; germline variants at the same position should be encoded as separate variants.
  • Strand confusion: the variant is specified on the negative strand but varcode expects positive-strand coordinates.

Callers who would rather continue past this error can pass raise_on_error=False to :meth:Variant.effects to receive Failure effects instead.

Source code in varcode/errors.py
def __init__(self, variant, transcript, expected_ref, observed_ref,
             transcript_offset=None, genome_start=None, genome_end=None):
    self.variant = variant
    self.transcript = transcript
    self.expected_ref = expected_ref
    self.observed_ref = observed_ref
    self.transcript_offset = transcript_offset
    self.genome_start = genome_start
    self.genome_end = genome_end

    location = ""
    if transcript_offset is not None:
        location = " at transcript offset %d" % transcript_offset
    if genome_start is not None and genome_end is not None:
        location += " (chromosome positions %d:%d)" % (
            genome_start, genome_end)

    message = (
        "Reference allele mismatch for %s on %s%s: variant reports "
        "ref=%r but the reference genome has %r at this position.\n"
        "This usually means the variant was called against a "
        "different genome build, the ref field was filled in with "
        "the patient's germline allele rather than the reference, "
        "or the variant is on the wrong strand. Pass "
        "raise_on_error=False to .effects() to receive a Failure "
        "effect instead of raising." % (
            variant, transcript, location, observed_ref, expected_ref)
    )
    super().__init__(message)

varcode.SampleNotFoundError

varcode.SampleNotFoundError

Bases: KeyError

Raised when genotype info is requested for a sample that isn't present in the VariantCollection's source VCF(s).

varcode.GenomeBuildMismatchError

varcode.GenomeBuildMismatchError(somatic_reference, germline_reference)

Bases: ValueError

Raised when a germline VCF and a somatic VCF were called against different reference genome builds (e.g. GRCh37 vs GRCh38). Effect coordinates from the two VCFs cannot be meaningfully composed.

Subclasses :class:ValueError so callers that already catch ValueError for ReferenceMismatchError continue to work. Set validate_reference=False on the call site if the user has explicitly lifted over one VCF into the other build and knows what they're doing.

Source code in varcode/germline.py
def __init__(self, somatic_reference, germline_reference):
    self.somatic_reference = somatic_reference
    self.germline_reference = germline_reference
    super().__init__(
        "Genome build mismatch: somatic VCF uses %r, germline "
        "context uses %r. Effect coordinates from the two cannot "
        "be composed without lift-over. If you've already lifted "
        "over one VCF into the other build, pass "
        "validate_reference=False to skip this check." % (
            somatic_reference, germline_reference))

varcode.UnsupportedVariantError

varcode.UnsupportedVariantError

Bases: ValueError

Raised when an :class:EffectAnnotator is asked to handle a variant kind outside its declared supports set.

Prefer this over silent mis-annotation — the whole point of the pluggable-annotator design is that callers can see exactly which annotator handles which variant kinds.