Expand source code
from typing import List
import numpy as np
from ...model import Model
from ...mutation import TreeMutation, GrowMutation, PruneMutation
from ...node import LeafNode, TreeNode
from ...samplers.treemutation import TreeMutationLikelihoodRatio
from ...sigma import Sigma
from ...tree import Tree
def log_grow_ratio(combined_node: LeafNode, left_node: LeafNode, right_node: LeafNode, sigma: Sigma, sigma_mu: float):
# deviation: https: // www.cs.ubc.ca / ~murphyk / Papers / bayesGauss.pdf
var = np.power(sigma.current_value(), 2)
var_mu = np.power(sigma_mu, 2)
n = combined_node.data.X.n_obsv
n_l = left_node.data.X.n_obsv
n_r = right_node.data.X.n_obsv
# first_term = (var * (var + n * sigma_mu)) / ((var + n_l * var_mu) * (var + n_r * var_mu))
# first_term = np.log(np.sqrt(first_term))
combined_y_sum = combined_node.data.y.summed_y()
left_y_sum = left_node.data.y.summed_y()
right_y_sum = right_node.data.y.summed_y()
left_resp_contribution = np.square(left_y_sum) / (var + n_l * sigma_mu)
right_resp_contribution = np.square(right_y_sum) / (var + n_r * sigma_mu)
combined_resp_contribution = np.square(combined_y_sum) / (var + n * sigma_mu)
# resp_contribution = left_resp_contribution + right_resp_contribution - combined_resp_contribution
numerator_first = np.log(np.sqrt(((var + n_l * var_mu) * (var + n_r * var_mu))))
numerator_second = (var_mu / (2 * var)) * (left_resp_contribution + right_resp_contribution)
big_model_l = numerator_first + numerator_second
denominator_first = np.log(np.sqrt((var * (var + n * var_mu))))
denominator_second = (var_mu / (2 * var)) * combined_resp_contribution
small_model_l = denominator_first + denominator_second
return big_model_l, small_model_l
# return first_term + ((var_mu / (2 * var)) * resp_contribution)
class UniformTreeMutationLikelihoodRatio(TreeMutationLikelihoodRatio):
def __init__(self,
prob_method: List[float] = None):
if prob_method is None:
prob_method = [0.5, 0.5]
self.prob_method = prob_method
def log_transition_ratio(self, tree: Tree, mutation: TreeMutation):
if mutation.kind == "prune":
mutation: PruneMutation = mutation
return self.log_prune_transition_ratio(tree, mutation)
if mutation.kind == "grow":
mutation: GrowMutation = mutation
return self.log_grow_transition_ratio(tree, mutation)
else:
raise NotImplementedError("kind {} not supported".format(mutation.kind))
def log_tree_ratio(self, model: Model, tree: Tree, mutation: TreeMutation):
if mutation.kind == "grow":
mutation: GrowMutation = mutation
return self.log_tree_ratio_grow(model, tree, mutation)
if mutation.kind == "prune":
mutation: PruneMutation = mutation
return self.log_tree_ratio_prune(model, mutation)
def log_likelihood_ratio(self, model: Model, tree: Tree, proposal: TreeMutation):
if proposal.kind == "grow":
proposal: GrowMutation = proposal
return self.log_likelihood_ratio_grow(model, proposal)
if proposal.kind == "prune":
proposal: PruneMutation = proposal
return self.log_likelihood_ratio_prune(model, proposal)
else:
raise NotImplementedError("Only prune and grow mutations supported")
@staticmethod
def log_likelihood_ratio_grow(model: Model, proposal: TreeMutation):
new_model_l, old_model_l = log_grow_ratio(proposal.existing_node, proposal.updated_node.left_child,
proposal.updated_node.right_child, model.sigma, model.sigma_m)
return (new_model_l - old_model_l), (new_model_l, old_model_l)
@staticmethod
def log_likelihood_ratio_prune(model: Model, proposal: TreeMutation):
old_model_l, new_model_l = log_grow_ratio(proposal.updated_node, proposal.existing_node.left_child,
proposal.existing_node.right_child, model.sigma, model.sigma_m)
return (new_model_l - old_model_l), (new_model_l, old_model_l)
def log_grow_transition_ratio(self, tree: Tree, mutation: GrowMutation):
prob_prune_selected = - np.log(n_prunable_decision_nodes(tree) + 1)
prob_grow_selected = log_probability_split_within_tree(tree, mutation)
prob_selection_ratio = prob_prune_selected - prob_grow_selected
prune_grow_ratio = np.log(self.prob_method[1] / self.prob_method[0])
numerator = prob_prune_selected
denominator = prob_grow_selected
# return prune_grow_ratio + prob_selection_ratio
return numerator - denominator, (numerator, denominator)
def log_prune_transition_ratio(self, tree: Tree, mutation: PruneMutation):
if n_splittable_leaf_nodes(tree) == 1:
prob_grow_node_selected = - np.inf # Infinitely unlikely to be able to prune a null tree
else:
prob_grow_node_selected = - np.log(n_splittable_leaf_nodes(tree) - 1)
prob_split = log_probability_split_within_node(GrowMutation(mutation.updated_node, mutation.existing_node))
prob_grow_selected = prob_grow_node_selected + prob_split
prob_prune_selected = - np.log(n_prunable_decision_nodes(tree))
prob_selection_ratio = prob_grow_selected - prob_prune_selected
grow_prune_ratio = np.log(self.prob_method[0] / self.prob_method[1])
numerator = prob_grow_selected
denominator = prob_prune_selected
# return grow_prune_ratio + prob_selection_ratio
return numerator - denominator, (numerator, denominator)
@staticmethod
def log_tree_ratio_grow(model: Model, tree: Tree, proposal: GrowMutation):
denominator = log_probability_node_not_split(model, proposal.existing_node)
prob_left_not_split = log_probability_node_not_split(model, proposal.updated_node.left_child)
prob_right_not_split = log_probability_node_not_split(model, proposal.updated_node.right_child)
prob_updated_node_split = log_probability_node_split(model, proposal.updated_node)
prob_chosen_split = log_probability_split_within_tree(tree, proposal)
numerator = prob_left_not_split + prob_right_not_split + prob_updated_node_split + prob_chosen_split
return numerator - denominator, (numerator, denominator)
@staticmethod
def log_tree_ratio_prune(model: Model, proposal: PruneMutation):
numerator = log_probability_node_not_split(model, proposal.updated_node)
prob_left_not_split = log_probability_node_not_split(model, proposal.existing_node.left_child)
prob_right_not_split = log_probability_node_not_split(model, proposal.existing_node.left_child)
prob_updated_node_split = log_probability_node_split(model, proposal.existing_node)
prob_chosen_split = log_probability_split_within_node(
GrowMutation(proposal.updated_node, proposal.existing_node))
denominator = prob_left_not_split + prob_right_not_split + prob_updated_node_split + prob_chosen_split
return numerator - denominator, (numerator, denominator)
def n_prunable_decision_nodes(tree: Tree) -> int:
"""
The number of prunable decision nodes
i.e. how many decision nodes have two leaf children
"""
return len(tree.prunable_decision_nodes)
def n_splittable_leaf_nodes(tree: Tree) -> int:
"""
The number of splittable leaf nodes
i.e. how many leaf nodes have more than one distinct values in their covariate matrix
"""
return len(tree.splittable_leaf_nodes)
def log_probability_split_within_tree(tree: Tree, mutation: GrowMutation) -> float:
"""
The log probability of the particular grow mutation being selected conditional on growing a given tree
i.e.
log(P(mutation | node)P(node| tree)
"""
prob_node_chosen_to_split_on = - np.log(n_splittable_leaf_nodes(tree))
prob_split_chosen = log_probability_split_within_node(mutation)
return prob_node_chosen_to_split_on + prob_split_chosen
def log_probability_split_within_node(mutation: GrowMutation) -> float:
"""
The log probability of the particular grow mutation being selected conditional on growing a given node
i.e.
log(P(splitting_value | splitting_variable, node, grow) * P(splitting_variable | node, grow))
"""
prob_splitting_variable_selected = - np.log(mutation.existing_node.data.X.n_splittable_variables)
splitting_variable = mutation.updated_node.most_recent_split_condition().splitting_variable
splitting_value = mutation.updated_node.most_recent_split_condition().splitting_value
prob_value_selected_within_variable = np.log(
mutation.existing_node.data.X.proportion_of_value_in_variable(splitting_variable, splitting_value))
return prob_splitting_variable_selected + prob_value_selected_within_variable
def log_probability_node_split(model: Model, node: TreeNode):
return np.log(model.alpha * np.power(1 + node.depth, -model.beta))
def log_probability_node_not_split(model: Model, node: TreeNode):
return np.log(1. - model.alpha * np.power(1 + node.depth, -model.beta))
Functions
def log_grow_ratio(combined_node: LeafNode, left_node: LeafNode, right_node: LeafNode, sigma: Sigma, sigma_mu: float)
-
Expand source code
def log_grow_ratio(combined_node: LeafNode, left_node: LeafNode, right_node: LeafNode, sigma: Sigma, sigma_mu: float): # deviation: https: // www.cs.ubc.ca / ~murphyk / Papers / bayesGauss.pdf var = np.power(sigma.current_value(), 2) var_mu = np.power(sigma_mu, 2) n = combined_node.data.X.n_obsv n_l = left_node.data.X.n_obsv n_r = right_node.data.X.n_obsv # first_term = (var * (var + n * sigma_mu)) / ((var + n_l * var_mu) * (var + n_r * var_mu)) # first_term = np.log(np.sqrt(first_term)) combined_y_sum = combined_node.data.y.summed_y() left_y_sum = left_node.data.y.summed_y() right_y_sum = right_node.data.y.summed_y() left_resp_contribution = np.square(left_y_sum) / (var + n_l * sigma_mu) right_resp_contribution = np.square(right_y_sum) / (var + n_r * sigma_mu) combined_resp_contribution = np.square(combined_y_sum) / (var + n * sigma_mu) # resp_contribution = left_resp_contribution + right_resp_contribution - combined_resp_contribution numerator_first = np.log(np.sqrt(((var + n_l * var_mu) * (var + n_r * var_mu)))) numerator_second = (var_mu / (2 * var)) * (left_resp_contribution + right_resp_contribution) big_model_l = numerator_first + numerator_second denominator_first = np.log(np.sqrt((var * (var + n * var_mu)))) denominator_second = (var_mu / (2 * var)) * combined_resp_contribution small_model_l = denominator_first + denominator_second return big_model_l, small_model_l # return first_term + ((var_mu / (2 * var)) * resp_contribution)
def log_probability_node_not_split(model: Model, node: TreeNode)
-
Expand source code
def log_probability_node_not_split(model: Model, node: TreeNode): return np.log(1. - model.alpha * np.power(1 + node.depth, -model.beta))
def log_probability_node_split(model: Model, node: TreeNode)
-
Expand source code
def log_probability_node_split(model: Model, node: TreeNode): return np.log(model.alpha * np.power(1 + node.depth, -model.beta))
def log_probability_split_within_node(mutation: GrowMutation) ‑> float
-
The log probability of the particular grow mutation being selected conditional on growing a given node
i.e. log(P(splitting_value | splitting_variable, node, grow) * P(splitting_variable | node, grow))
Expand source code
def log_probability_split_within_node(mutation: GrowMutation) -> float: """ The log probability of the particular grow mutation being selected conditional on growing a given node i.e. log(P(splitting_value | splitting_variable, node, grow) * P(splitting_variable | node, grow)) """ prob_splitting_variable_selected = - np.log(mutation.existing_node.data.X.n_splittable_variables) splitting_variable = mutation.updated_node.most_recent_split_condition().splitting_variable splitting_value = mutation.updated_node.most_recent_split_condition().splitting_value prob_value_selected_within_variable = np.log( mutation.existing_node.data.X.proportion_of_value_in_variable(splitting_variable, splitting_value)) return prob_splitting_variable_selected + prob_value_selected_within_variable
def log_probability_split_within_tree(tree: Tree, mutation: GrowMutation) ‑> float
-
The log probability of the particular grow mutation being selected conditional on growing a given tree i.e. log(P(mutation | node)P(node| tree)
Expand source code
def log_probability_split_within_tree(tree: Tree, mutation: GrowMutation) -> float: """ The log probability of the particular grow mutation being selected conditional on growing a given tree i.e. log(P(mutation | node)P(node| tree) """ prob_node_chosen_to_split_on = - np.log(n_splittable_leaf_nodes(tree)) prob_split_chosen = log_probability_split_within_node(mutation) return prob_node_chosen_to_split_on + prob_split_chosen
def n_prunable_decision_nodes(tree: Tree) ‑> int
-
The number of prunable decision nodes i.e. how many decision nodes have two leaf children
Expand source code
def n_prunable_decision_nodes(tree: Tree) -> int: """ The number of prunable decision nodes i.e. how many decision nodes have two leaf children """ return len(tree.prunable_decision_nodes)
def n_splittable_leaf_nodes(tree: Tree) ‑> int
-
The number of splittable leaf nodes i.e. how many leaf nodes have more than one distinct values in their covariate matrix
Expand source code
def n_splittable_leaf_nodes(tree: Tree) -> int: """ The number of splittable leaf nodes i.e. how many leaf nodes have more than one distinct values in their covariate matrix """ return len(tree.splittable_leaf_nodes)
Classes
class UniformTreeMutationLikelihoodRatio (prob_method: List[float] = None)
-
Responsible for evaluating the ratio of mutations to the reverse movement
Expand source code
class UniformTreeMutationLikelihoodRatio(TreeMutationLikelihoodRatio): def __init__(self, prob_method: List[float] = None): if prob_method is None: prob_method = [0.5, 0.5] self.prob_method = prob_method def log_transition_ratio(self, tree: Tree, mutation: TreeMutation): if mutation.kind == "prune": mutation: PruneMutation = mutation return self.log_prune_transition_ratio(tree, mutation) if mutation.kind == "grow": mutation: GrowMutation = mutation return self.log_grow_transition_ratio(tree, mutation) else: raise NotImplementedError("kind {} not supported".format(mutation.kind)) def log_tree_ratio(self, model: Model, tree: Tree, mutation: TreeMutation): if mutation.kind == "grow": mutation: GrowMutation = mutation return self.log_tree_ratio_grow(model, tree, mutation) if mutation.kind == "prune": mutation: PruneMutation = mutation return self.log_tree_ratio_prune(model, mutation) def log_likelihood_ratio(self, model: Model, tree: Tree, proposal: TreeMutation): if proposal.kind == "grow": proposal: GrowMutation = proposal return self.log_likelihood_ratio_grow(model, proposal) if proposal.kind == "prune": proposal: PruneMutation = proposal return self.log_likelihood_ratio_prune(model, proposal) else: raise NotImplementedError("Only prune and grow mutations supported") @staticmethod def log_likelihood_ratio_grow(model: Model, proposal: TreeMutation): new_model_l, old_model_l = log_grow_ratio(proposal.existing_node, proposal.updated_node.left_child, proposal.updated_node.right_child, model.sigma, model.sigma_m) return (new_model_l - old_model_l), (new_model_l, old_model_l) @staticmethod def log_likelihood_ratio_prune(model: Model, proposal: TreeMutation): old_model_l, new_model_l = log_grow_ratio(proposal.updated_node, proposal.existing_node.left_child, proposal.existing_node.right_child, model.sigma, model.sigma_m) return (new_model_l - old_model_l), (new_model_l, old_model_l) def log_grow_transition_ratio(self, tree: Tree, mutation: GrowMutation): prob_prune_selected = - np.log(n_prunable_decision_nodes(tree) + 1) prob_grow_selected = log_probability_split_within_tree(tree, mutation) prob_selection_ratio = prob_prune_selected - prob_grow_selected prune_grow_ratio = np.log(self.prob_method[1] / self.prob_method[0]) numerator = prob_prune_selected denominator = prob_grow_selected # return prune_grow_ratio + prob_selection_ratio return numerator - denominator, (numerator, denominator) def log_prune_transition_ratio(self, tree: Tree, mutation: PruneMutation): if n_splittable_leaf_nodes(tree) == 1: prob_grow_node_selected = - np.inf # Infinitely unlikely to be able to prune a null tree else: prob_grow_node_selected = - np.log(n_splittable_leaf_nodes(tree) - 1) prob_split = log_probability_split_within_node(GrowMutation(mutation.updated_node, mutation.existing_node)) prob_grow_selected = prob_grow_node_selected + prob_split prob_prune_selected = - np.log(n_prunable_decision_nodes(tree)) prob_selection_ratio = prob_grow_selected - prob_prune_selected grow_prune_ratio = np.log(self.prob_method[0] / self.prob_method[1]) numerator = prob_grow_selected denominator = prob_prune_selected # return grow_prune_ratio + prob_selection_ratio return numerator - denominator, (numerator, denominator) @staticmethod def log_tree_ratio_grow(model: Model, tree: Tree, proposal: GrowMutation): denominator = log_probability_node_not_split(model, proposal.existing_node) prob_left_not_split = log_probability_node_not_split(model, proposal.updated_node.left_child) prob_right_not_split = log_probability_node_not_split(model, proposal.updated_node.right_child) prob_updated_node_split = log_probability_node_split(model, proposal.updated_node) prob_chosen_split = log_probability_split_within_tree(tree, proposal) numerator = prob_left_not_split + prob_right_not_split + prob_updated_node_split + prob_chosen_split return numerator - denominator, (numerator, denominator) @staticmethod def log_tree_ratio_prune(model: Model, proposal: PruneMutation): numerator = log_probability_node_not_split(model, proposal.updated_node) prob_left_not_split = log_probability_node_not_split(model, proposal.existing_node.left_child) prob_right_not_split = log_probability_node_not_split(model, proposal.existing_node.left_child) prob_updated_node_split = log_probability_node_split(model, proposal.existing_node) prob_chosen_split = log_probability_split_within_node( GrowMutation(proposal.updated_node, proposal.existing_node)) denominator = prob_left_not_split + prob_right_not_split + prob_updated_node_split + prob_chosen_split return numerator - denominator, (numerator, denominator)
Ancestors
- TreeMutationLikelihoodRatio
- abc.ABC
Static methods
def log_likelihood_ratio_grow(model: Model, proposal: TreeMutation)
-
Expand source code
@staticmethod def log_likelihood_ratio_grow(model: Model, proposal: TreeMutation): new_model_l, old_model_l = log_grow_ratio(proposal.existing_node, proposal.updated_node.left_child, proposal.updated_node.right_child, model.sigma, model.sigma_m) return (new_model_l - old_model_l), (new_model_l, old_model_l)
def log_likelihood_ratio_prune(model: Model, proposal: TreeMutation)
-
Expand source code
@staticmethod def log_likelihood_ratio_prune(model: Model, proposal: TreeMutation): old_model_l, new_model_l = log_grow_ratio(proposal.updated_node, proposal.existing_node.left_child, proposal.existing_node.right_child, model.sigma, model.sigma_m) return (new_model_l - old_model_l), (new_model_l, old_model_l)
def log_tree_ratio_grow(model: Model, tree: Tree, proposal: GrowMutation)
-
Expand source code
@staticmethod def log_tree_ratio_grow(model: Model, tree: Tree, proposal: GrowMutation): denominator = log_probability_node_not_split(model, proposal.existing_node) prob_left_not_split = log_probability_node_not_split(model, proposal.updated_node.left_child) prob_right_not_split = log_probability_node_not_split(model, proposal.updated_node.right_child) prob_updated_node_split = log_probability_node_split(model, proposal.updated_node) prob_chosen_split = log_probability_split_within_tree(tree, proposal) numerator = prob_left_not_split + prob_right_not_split + prob_updated_node_split + prob_chosen_split return numerator - denominator, (numerator, denominator)
def log_tree_ratio_prune(model: Model, proposal: PruneMutation)
-
Expand source code
@staticmethod def log_tree_ratio_prune(model: Model, proposal: PruneMutation): numerator = log_probability_node_not_split(model, proposal.updated_node) prob_left_not_split = log_probability_node_not_split(model, proposal.existing_node.left_child) prob_right_not_split = log_probability_node_not_split(model, proposal.existing_node.left_child) prob_updated_node_split = log_probability_node_split(model, proposal.existing_node) prob_chosen_split = log_probability_split_within_node( GrowMutation(proposal.updated_node, proposal.existing_node)) denominator = prob_left_not_split + prob_right_not_split + prob_updated_node_split + prob_chosen_split return numerator - denominator, (numerator, denominator)
Methods
def log_grow_transition_ratio(self, tree: Tree, mutation: GrowMutation)
-
Expand source code
def log_grow_transition_ratio(self, tree: Tree, mutation: GrowMutation): prob_prune_selected = - np.log(n_prunable_decision_nodes(tree) + 1) prob_grow_selected = log_probability_split_within_tree(tree, mutation) prob_selection_ratio = prob_prune_selected - prob_grow_selected prune_grow_ratio = np.log(self.prob_method[1] / self.prob_method[0]) numerator = prob_prune_selected denominator = prob_grow_selected # return prune_grow_ratio + prob_selection_ratio return numerator - denominator, (numerator, denominator)
def log_prune_transition_ratio(self, tree: Tree, mutation: PruneMutation)
-
Expand source code
def log_prune_transition_ratio(self, tree: Tree, mutation: PruneMutation): if n_splittable_leaf_nodes(tree) == 1: prob_grow_node_selected = - np.inf # Infinitely unlikely to be able to prune a null tree else: prob_grow_node_selected = - np.log(n_splittable_leaf_nodes(tree) - 1) prob_split = log_probability_split_within_node(GrowMutation(mutation.updated_node, mutation.existing_node)) prob_grow_selected = prob_grow_node_selected + prob_split prob_prune_selected = - np.log(n_prunable_decision_nodes(tree)) prob_selection_ratio = prob_grow_selected - prob_prune_selected grow_prune_ratio = np.log(self.prob_method[0] / self.prob_method[1]) numerator = prob_grow_selected denominator = prob_prune_selected # return grow_prune_ratio + prob_selection_ratio return numerator - denominator, (numerator, denominator)
Inherited members