Homework 3#
References#
Lectures 8-12 (inclusive).
Instructions#
Type your name and email in the “Student details” section below.
Develop the code and generate the figures you need to solve the problems using this notebook.
For the answers that require a mathematical proof or derivation you should type them using latex. If you have never written latex before and you find it exceedingly difficult, we will likely accept handwritten solutions.
The total homework points are 100. Please note that the problems are not weighed equally.
Show code cell source
import matplotlib.pyplot as plt
%matplotlib inline
import matplotlib_inline
matplotlib_inline.backend_inline.set_matplotlib_formats('svg')
import seaborn as sns
sns.set_context("paper")
sns.set_style("ticks")
import scipy
import numpy as np
import scipy.stats as st
import urllib.request
import os
def download(
url : str,
local_filename : str = None
):
"""Download a file from a url.
Arguments
url -- The url we want to download.
local_filename -- The filemame to write on. If not
specified
"""
if local_filename is None:
local_filename = os.path.basename(url)
urllib.request.urlretrieve(url, local_filename)
Student details#
First Name:
Last Name:
Email:
Problem 1 - Propagating uncertainty through a differential equation#
This is a classic uncertainty propagation problem you must solve using Monte Carlo sampling. Consider the following stochastic harmonic oscillator:
where:
\(X = (X_1, X_2, X_3)\),
\(X_i \sim N(0, 1)\),
\(\omega(X) = 2\pi + X_1\),
\(\zeta = 0.01\),
\(y_0(X) = 1+ 0.1 X_2\), and
\(v_0 = 0.1 X_3\).
In other words, this stochastic harmonic oscillator has an uncertain natural frequency and uncertain initial conditions.
Our goal is to propagate uncertainty through this dynamical system, i.e., estimate the mean and variance of its solution. A solver for this dynamical system is given below:
Show code cell source
class Solver(object):
def __init__(
self,
nt=100,
T= 5
):
"""This is the initializer of the class.
Arguments:
nt -- The number of timesteps.
T -- The final time.
"""
self.nt = nt
self.T = T
# The timesteps on which we will get the solution
self.t = np.linspace(0, T, nt)
# The number of inputs the class accepts
self.num_input = 3
# The number of outputs the class returns
self.num_output = nt
def __call__(self, x):
"""This special class method emulates a function call.
Arguments:
x -- A 1D numpy array with 3 elements.
This represents the stochastic input x = (x1, x2, x3).
Returns the solution to the differential equation evaluated
at discrete timesteps.
"""
# uncertain quantities
x1 = x[0]
x2 = x[1]
x3 = x[2]
# ODE parameters
omega = 2*np.pi + x1
y10 = 1 + 0.1*x2
y20 = 0.1*x3
# initial conditions
y0 = np.array([y10, y20])
# coefficient matrix
zeta = 0.01
# spring constant
k = omega**2
# damping coeff
c = 2*zeta*omega
C = np.array([[0, 1],[-k, -c]])
#RHS of the ODE system
def rhs(y, t):
return np.dot(C, y)
y = scipy.integrate.odeint(rhs, y0, self.t)
return y
First, let’s demonstrate how the solver works:
solver = Solver()
x = np.random.randn(solver.num_input)
y = solver(x)
print(y)
Show code cell output
[[ 0.86576708 -0.07081763]
[ 0.78395341 -3.11469418]
[ 0.56113181 -5.56953151]
[ 0.23878524 -6.99663001]
[-0.12396796 -7.14681476]
[-0.46121097 -6.00385669]
[-0.71220187 -3.78541398]
[-0.83230903 -0.90168838]
[-0.80094098 2.12036928]
[-0.62506299 4.73330875]
[-0.33770241 6.46831321]
[ 0.00824421 7.01926973]
[ 0.3497426 6.2967116 ]
[ 0.62510152 4.44213762]
[ 0.7851126 1.80045998]
[ 0.80183849 -1.14408138]
[ 0.67348098 -3.85664129]
[ 0.42448251 -5.84888073]
[ 0.10088361 -6.76684143]
[-0.2381841 -6.45377838]
[-0.53129844 -4.97680669]
[-0.72586896 -2.61285585]
[-0.78756769 0.2031821 ]
[-0.70637807 2.95841439]
[-0.4981977 5.1553249 ]
[-0.20174311 6.40157369]
[ 0.12864372 6.48001768]
[ 0.43295194 5.38646247]
[ 0.65639775 3.32860044]
[ 0.7592825 0.68665609]
[ 0.72403958 -2.0568184 ]
[ 0.55822056 -4.40504421]
[ 0.29291538 -5.9369355 ]
[-0.02306872 -6.38256748]
[-0.3321811 -5.67067573]
[-0.57860535 -3.93983921]
[-0.7183338 -1.51163652]
[-0.72702144 1.16904744]
[-0.60421835 3.6154932 ]
[-0.37324672 5.38748383]
[-0.0767819 6.1704811 ]
[ 0.23102709 5.83140676]
[ 0.49444368 4.44114625]
[ 0.66623227 2.2599913 ]
[ 0.7161238 -0.31102992]
[ 0.63614215 -2.80384591]
[ 0.44185528 -4.76852372]
[ 0.16935986 -5.85444396]
[-0.1314191 -5.8728293 ]
[-0.40587092 -4.82948439]
[-0.6046108 -2.92208261]
[-0.69236186 -0.50279412]
[-0.65421123 1.98669343]
[-0.49813153 4.0957798 ]
[-0.25334408 5.44647893]
[ 0.03514479 5.80112309]
[ 0.31481417 5.10412343]
[ 0.5351864 3.49046183]
[ 0.65693402 1.25966174]
[ 0.65889321 -1.17976053]
[ 0.54173406 -3.38507097]
[ 0.32765279 -4.95963928]
[ 0.05617548 -5.62423949]
[-0.22313485 -5.26651912]
[-0.45972306 -3.95988113]
[-0.61118876 -1.94862723]
[-0.65088085 0.39766351]
[-0.57258028 2.65201066]
[-0.39144957 4.40770442]
[-0.14110915 5.35164924]
[ 0.13259763 5.32015835]
[ 0.37999574 4.3272645 ]
[ 0.55659041 2.5606637 ]
[ 0.6310658 0.34630239]
[ 0.59083288 -1.91172274]
[ 0.44414497 -3.80487754]
[ 0.21842764 -4.99407499]
[-0.04484269 -5.27037485]
[-0.29775616 -4.59163893]
[-0.49468467 -3.08872161]
[-0.60051012 -1.04033744]
[-0.59688247 1.17858354]
[-0.48539331 3.16549084]
[-0.28711771 4.56321481]
[-0.03863804 5.12414056]
[ 0.21470123 4.75403007]
[ 0.42706492 3.52774531]
[ 0.5604165 1.67427033]
[ 0.59132694 -0.46599777]
[ 0.51508477 -2.50373174]
[ 0.34638775 -4.07146737]
[ 0.11652009 -4.88983934]
[-0.13244427 -4.8173261 ]
[-0.35533922 -3.87465271]
[-0.51209716 -2.23965722]
[-0.57494804 -0.21380114]
[-0.53333387 1.83337961]
[-0.39567006 3.53165417]
[-0.18766199 4.57702565]
[ 0.05248931 4.78611519]]
Notice the dimension of y
:
y.shape
(100, 2)
The 100 rows corresponds to timesteps. The 2 columns correspond to position and velocity.
Let’s plot a few samples:
fig1, ax1 = plt.subplots()
ax1.set_xlabel('$t$ (Time)')
ax1.set_ylabel('$y(t)$ (Position)')
fig2, ax2 = plt.subplots()
ax2.set_xlabel('$t$ (Time)')
ax2.set_ylabel('$\dot{y}(t)$ (Velocity)')
for i in range(2):
x = np.random.randn(solver.num_input)
y = solver(x)
ax1.plot(solver.t, y[:, 0])
ax2.plot(
solver.t, y[:, 1],
label=f'Sample {i+1:d}')
plt.legend(loc="best", frameon=False)
sns.despine(trim=True);
For your convenience, here is code that takes many samples of the solver at once:
Show code cell source
def take_samples_from_solver(num_samples):
"""Takes ``num_samples`` from the ODE solver.
Returns them in an array of the form:
``num_samples x 100 x 2``
(100 timesteps, 2 states (position, velocity))
"""
samples = np.ndarray((num_samples, 100, 2))
for i in range(num_samples):
samples[i, :, :] = solver(
np.random.randn(solver.num_input)
)
return samples
It works like this:
samples = take_samples_from_solver(50)
print(samples.shape)
(50, 100, 2)
Here, the first dimension corresponds to different samples. Then we have timesteps. And finally, we have either position or velocity.
As an example, the velocity of the 25th sample at the first ten timesteps is:
samples[24, :10, 1]
array([ 0.10089843, -1.40721488, -2.78063974, -3.89754534, -4.659736 ,
-5.0012779 , -4.89419329, -4.35073137, -3.42203351, -2.19333372])
Part A#
Take 100 samples of the solver output and plot the estimated mean position and velocity as a function of time along with a 95% epistemic uncertainty interval around it. This interval captures how sure you are about the mean response when using only 100 Monte Carlo samples. You need to use the central limit theorem to find it (see the lecture notes).
samples = take_samples_from_solver(100)
# Sampled positions are: samples[:, :, 0]
# Sampled velocities are: samples[:, :, 1]
# Sampled position at the 10th timestep is: samples[:, 9, 0]
# etc.
# Your code here
Part B#
Plot the epistemic uncertainty about the mean position at \(t=5\)s as a function of the number of samples.
Solution:
# Your code here
Part C#
Repeat parts A and B for the squared response. That is, do the same thing as above, but consider \(y^2(t)\) and \(\dot{y}^2(t)\) instead of \(y(t)\) and \(\dot{y}(t)\). How many samples do you need to estimate the mean squared response at \(t=5\)s with negligible epistemic uncertainty?
Solution:
# Your code here
Part D#
Now that you know how many samples you need to estimate the mean of the response and the square response, use the formula:
and similarly, for \(\dot{y}(t)\), to estimate the position and velocity variance with negligible epistemic uncertainty. Plot both quantities as a function of time.
Solution:
# Your code here
Part E#
Put together the estimated mean and variance to plot a 95% predictive interval for the position and the velocity as functions of time.
Hint: You need to use the Central Limit Theorem. Check out the corresponding textbook example.
Solution:
Problem 2 - Earthquakes again#
The San Andreas fault extends through California, forming the boundary between the Pacific and the North American tectonic plates. It has caused some of the most significant earthquakes on Earth. We are going to focus on Southern California, and we would like to assess the probability of a significant earthquake, defined as an earthquake of magnitude 6.5 or greater, during the next ten years.
A. The first thing we will do is review a database of past earthquakes that have occurred in Southern California and collect the relevant data. We will start at 1900 because data before that time may be unreliable. Go over each decade and count the occurrence of a significant earthquake (i.e., count the number of orange and red colors in each decade). We have done this for you.
eq_data = np.array([
0, # 1900-1909
1, # 1910-1919
2, # 1920-1929
0, # 1930-1939
3, # 1940-1949
2, # 1950-1959
1, # 1960-1969
2, # 1970-1979
1, # 1980-1989
4, # 1990-1999
0, # 2000-2009
2 # 2010-2019
])
Let’s visualize them:
fig, ax = plt.subplots()
ax.bar(
np.linspace(1900, 2019, eq_data.shape[0]),
eq_data,
width=10
)
ax.set_xlabel('Decade')
ax.set_ylabel('# of major earthquakes in Southern CA')
sns.despine(trim=True);
A. The right way to model the number of earthquakes \(X_n\) in a decade \(n\) is using a Poisson distribution with unknown rate parameter \(\lambda\), i.e.,
The probability mass function is:
Here we have \(N = 12\) observations, say \(x_{1:N} = (x_1,\dots,x_N)\) (stored in eq_data
above).
Find the joint probability mass function (otherwise known as the likelihood) \(p(x_{1:N}|\lambda)\) of these random variables.
Hint: Assume that all measurements are independent. Then, their joint PMF is the product of the individual PMFs. You should be able to simplify the expression.
Answer:
B. The rate parameter \(\lambda\) (number of significant earthquakes per ten years) is positive. What prior distribution should we assign to it if we expect it to be around 2? A convenient choice is to pick a Gamma. See also the scipy.stats page for the Gamma because it results in an analytical posterior. We write:
where \(\alpha\) and \(\beta\) are positive hyper-parameters that we must set to represent our prior state of knowledge. The PDF is:
where we are not conditioning on \(\alpha\) and \(\beta\) because they should be fixed numbers.
Use the code below to pick reasonable values for \(\alpha\) and \(\beta\).
Just enter your choice of \(\alpha\) and \(\beta\) in the code block below.
Hint: Notice that the maximum entropy distribution for a positive parameter with known expectation is the Exponential, e.g., see the Table in this wiki page. Then, notice that the Exponential is a particular case of the Gamma (set \(\alpha=1\)).
import scipy.stats as st
# You have to pick an alpha:
alpha = 1.0
# And you have to pick a beta:
beta = 1.0
# This is the prior on lambda:
lambda_prior = st.gamma(alpha, scale=1.0 / beta)
# Let's plot it:
lambdas = np.linspace(0, lambda_prior.ppf(0.99), 100)
fig, ax = plt.subplots()
ax.plot(lambdas, lambda_prior.pdf(lambdas))
ax.set_xlabel('$\lambda$ (# or major earthquakes per decade)')
ax.set_ylabel('$p(\lambda)$')
sns.despine(trim=True);
C. Show that the posterior of \(\lambda\) conditioned on \(x_{1:N}\) is also a Gamma, but with updated hyperparameters.
Hint: When you write down the posterior of \(\lambda\) you can drop any multiplicative term that does not depend on it as it will be absorbed in the normalization constant. This will simplify the notation a little bit.
Answer:
D. Prior-likelihood pairs that result in a posterior with the same form as the prior are known as conjugate distributions. Conjugate distributions are your only hope for analytical Bayesian inference.
As a verification check, look at the Wikipedia page for conjugate priors, locate the Poisson-Gamma pair, and verify your answer above.
Nothing to report here. Just do it as a verification check.
E. Plot the prior and the posterior of \(\lambda\) on the same plot.
# Your expression for alpha posterior here:
alpha_post = 1.0
# Your expression for beta posterior here:
beta_post = 1.0
# The posterior
lambda_post = st.gamma(alpha_post, scale=1.0 / beta_post)
# Plot it
lambdas = np.linspace(0, lambda_post.ppf(0.99), 100)
fig, ax = plt.subplots()
ax.plot(lambdas, lambda_prior.pdf(lambdas))
ax.plot(lambdas, lambda_post.pdf(lambdas))
ax.set_xlabel('$\lambda$ (# or major earthquakes per decade)')
ax.set_ylabel('$p(\lambda|x_{1:N})$')
sns.despine(trim=True);
F. Let’s determine the predictive distribution for the number of significant earthquakes during the next decade. This is something we did not do in class, but it will reappear in future lectures. Let \(X\) be the random variable corresponding to the number of significant earthquakes during the next decade. We need to calculate:
How do we do this? We use the sum rule:
where going from the middle step to the rightmost one, we assumed that the number of earthquakes occurring in each decade is independent. You can carry out this integration analytically (it gives a negative Binomial distribution), but we are not going to bother with it.
Below, you will write code to characterize it using Monte Carlo sampling. You can take a sample from the posterior predictive by:
sampling a \(\lambda\) from its posterior \(p(\lambda|x_{1:N})\).
sampling an \(x\) from the likelihood \(p(x|\lambda)\).
This is the same procedure we used for replicated experiments.
Complete the code below:
def sample_posterior_predictive(n, lambda_post):
"""Sample from the posterior predictive.
Arguments
n -- The number of samples to take.
lambda_post -- The posterior for lambda.
Returns n samples from the posterior
"""
samples = np.empty((n,), dtype="i")
for i in range(n):
lambda_sample = # WRITE ME (SAMPLE FROM POSTERIOR)
samples[i] = # WRITE ME (SAMPLE FROM POISSON GIVEN LAMBDA_SAMPLE)
return samples
Test your code here:
samples = sample_posterior_predictive(10, lambda_post)
samples
G. Plot the predictive distribution \(p(x|x_{1:N})\).
Hint: Draw 1,000 samples using sample_posterior_predictive
and then draw a histogram.
# Your code here
H. What is the probability that at least one major earthquake will occur during the next decade?
Hint: You may use a Monte Carlo estimate of the probability. Ignore the uncertainty in the estimate.
num_samples = 10000
samples = sample_posterior_predictive(num_samples, lambda_post)
# Count how many major earthquakes occured:
count = 0
for i in range(num_samples):
if samples[i] >=1:
count += 1
prob_of_major_eq = # YOUR ESTIMATE HERE
print(f"p(X >= 1 | data) = {prob_of_major_eq}")
I. Find a 95% credible interval for \(\lambda\).
# Write your code here and print() your answer
J. Find the \(\lambda\) that minimizes the absolute loss (see lecture), and call it \(\lambda^*_N\). Then, plot the fully Bayesian predictive \(p(x|x_{1:N})\) in the same figure as \(p(x|\lambda^*_N)\).
# Write your code here and print() your answer
L. Draw replicated data from the model and compare them to the observed data.
Hint: Complete the missing code at the places indicated below.
def replicate_experiment(post_rv, n=len(eq_data), n_rep=9):
"""Replicate the experiment.
Arguments
post_rv -- The random variable object corresponding to
the posterior from which to sample.
n -- The number of observations.
nrep -- The number of repetitions.
Returns:
A numpy array of size n_rep x n.
"""
x_rep = np.empty((n_rep, n), dtype="i")
for i in range(n_rep):
x_rep[i, :] = # Your code here
return x_rep
Try your code here:
x_rep = replicate_experiment(lambda_post)
x_rep
If it works, then try the following visualization:
fig, ax = plt.subplots(
5,
2,
sharex='all',
sharey='all',
figsize=(20, 20)
)
ax[0, 0].bar(
np.linspace(1900, 2019, eq_data.shape[0]),
eq_data,
width=10,
color='red'
)
for i in range(1, n_rep + 1):
ax[int(i / 2), i % 2].bar(
np.linspace(1900, 2019, eq_data.shape[0]),
x_rep[i-1],
width=10
)
M. Plot the histograms and calculate the Bayesian p-values of the following test quantities:
Maximum number of consecutive decades with no earthquakes.
Maximum number of successive decades with earthquakes.
Hint: You may reuse the code from Posterior Predictive Checking.
def perform_diagnostics(post_rv, data, test_func, n_rep=1000):
"""Calculate Bayesian p-values.
Arguments
post_rv -- The random variable object corresponding to
the posterior from which to sample.
data -- The training data.
test_func -- The test function.
n -- The number of observations.
nrep -- The number of repetitions.
Returns a dictionary that includes the observed value of
the test function (T_obs), the Bayesian p-value (p_val),
the replicated test statistic (T_rep),
and all the replicated data (data_rep).
"""
T_obs = test_func(data)
n = data.shape[0]
data_rep = replicate_experiment(post_rv, n_rep=n_rep)
T_rep = np.array(
tuple(
test_func(x)
for x in data_rep
)
)
p_val = (
np.sum(np.ones((n_rep,))[T_rep > T_obs]) / n_rep
)
return dict(
T_obs=T_obs,
p_val=p_val,
T_rep=T_rep,
data_rep=data_rep
)
def plot_diagnostics(diagnostics):
"""Make the diagnostics plot.
Arguments:
diagnostics -- The dictionary returned by perform_diagnostics()
"""
fig, ax = plt.subplots()
tmp = ax.hist(
diagnostics["T_rep"],
density=True,
alpha=0.25,
label='Replicated test quantity'
)[0]
ax.plot(
diagnostics["T_obs"] * np.ones((50,)),
np.linspace(0, tmp.max(), 50),
'k',
label='Observed test quantity'
)
plt.legend(loc='best');
def do_diagnostics(post_rv, data, test_func, n_rep=1000):
"""Calculate Bayesian p-values and make the corresponding
diagnostic plot.
Arguments
post_rv -- The random variable object corresponding to
the posterior from which to sample.
data -- The training data.
test_func -- The test function.
n -- The number of observations.
nrep -- The number of repetitions.
Returns a dictionary that includes the observed value of
the test function (T_obs), the Bayesian p-value (p_val),
and the replicated experiment (data_rep).
"""
res = perform_diagnostics(
post_rv,
data,
test_func,
n_rep=n_rep
)
T_obs = res["T_obs"]
p_val = res["p_val"]
print(f'The observed test quantity is {T_obs}')
print(f'The Bayesian p_value is {p_val:.4f}')
plot_diagnostics(res)
# Here is the first test function for you
def T_eq_max_neq(x):
"""Return the maximum number of consecutive decades
with no earthquakes."""
count = 0
result = 0
for i in range(x.shape[0]):
if x[i] != 0:
count = 0
else:
count += 1
result = max(result, count)
return result
# Consult the textbook (Lecture 12) to figure out
# how to use do_diagnostics().
# Write your code here for the second test quantity
# (maximum number of consecutive decades with earthquakes)
# Hint: copy paste your code from the previous cell
# and make the necessary modifications