def ridge_fit(x, y, penalty=1.0):
z = add_intercept(x)
gram = z.T @ z
ridge = np.eye(gram.shape[0]) * penalty
ridge[0, 0] = 0.0
return np.linalg.solve(gram + ridge, z.T @ y)
def ridge_predict(x, beta):
return add_intercept(x) @ beta
def logistic_ridge_fit(x, y, penalty=1.0, max_iter=40):
z = add_intercept(x)
beta = np.zeros(z.shape[1])
ridge = np.eye(z.shape[1]) * penalty
ridge[0, 0] = 0.0
for _ in range(max_iter):
eta = z @ beta
p = expit(eta)
w = np.clip(p * (1.0 - p), 1e-5, None)
grad = z.T @ (p - y) + ridge @ beta
hess = z.T @ (z * w[:, None]) + ridge
step = np.linalg.solve(hess, grad)
beta -= step
if np.max(np.abs(step)) < 1e-6:
break
return beta
def logistic_predict(x, beta, clip=0.02):
return np.clip(expit(add_intercept(x) @ beta), clip, 1.0 - clip)
def crossfit_predict(x, y, fit_fn, pred_fn, penalty, k, rng, subset=None):
n = x.shape[0]
pred = np.zeros(n)
if subset is None:
subset = np.arange(n)
subset = np.asarray(subset)
folds = make_folds(subset.size, k, rng)
for fold in folds:
test = subset[fold]
train = np.setdiff1d(subset, test, assume_unique=False)
beta = fit_fn(x[train], y[train], penalty)
pred[test] = pred_fn(x[test], beta)
return pred
def two_period_did(d, y1, y0):
return (y1[d == 1].mean() - y0[d == 1].mean()) - (y1[d == 0].mean() - y0[d == 0].mean())
def om_did(x, d, y1, y0, xfit=False, penalty=1.0, rng=None):
rng = np.random.default_rng(0) if rng is None else rng
treated = np.flatnonzero(d == 1.0)
control = np.flatnonzero(d == 0.0)
preds = []
for y, group in [(y1, treated), (y1, control), (y0, treated), (y0, control)]:
beta = ridge_fit(x[group], y[group], penalty)
pred = ridge_predict(x, beta)
if xfit:
pred[group] = crossfit_predict(x, y, ridge_fit, ridge_predict, penalty, 3, rng, subset=group)[group]
preds.append(pred)
m1, m2, m3, m4 = preds
return float(np.mean((m1 - m2) - (m3 - m4)))
def propensity_hat(x, d, xfit=False, penalty=1.0, rng=None):
if not xfit:
beta = logistic_ridge_fit(x, d, penalty)
return logistic_predict(x, beta)
rng = np.random.default_rng(0) if rng is None else rng
return crossfit_predict(x, d, logistic_ridge_fit, logistic_predict, penalty, 3, rng)
def ipw_did(x, d, y1, y0, xfit=False, penalty=1.0, rng=None):
ehat = propensity_hat(x, d, xfit=xfit, penalty=penalty, rng=rng)
delta = y1 - y0
score = (d - ehat) / (d.mean() * (1.0 - ehat))
return float(np.mean(score * delta))
def aipw_did(x, d, y1, y0, xfit=False, penalty=1.0, rng=None):
rng = np.random.default_rng(0) if rng is None else rng
ehat = propensity_hat(x, d, xfit=xfit, penalty=penalty, rng=rng)
delta = y1 - y0
control = np.flatnonzero(d == 0.0)
beta = ridge_fit(x[control], delta[control], penalty)
mhat = ridge_predict(x, beta)
if xfit:
mhat[control] = crossfit_predict(x, delta, ridge_fit, ridge_predict, penalty, 3, rng, subset=control)[control]
score = (d - ehat) / (d.mean() * (1.0 - ehat))
return float(np.mean(score * (delta - mhat)))
def estimate_all(draw, penalty=25.0, rng=None):
y1, y0, d, x = draw["y1"], draw["y0"], draw["d"], draw["x"]
rng = np.random.default_rng(0) if rng is None else rng
return {
"naive": float(y1[d == 1].mean() - y1[d == 0].mean()),
"did": two_period_did(d, y1, y0),
"om": om_did(x, d, y1, y0, xfit=False, penalty=penalty, rng=rng),
"omXF": om_did(x, d, y1, y0, xfit=True, penalty=penalty, rng=rng),
"ipw": ipw_did(x, d, y1, y0, xfit=False, penalty=penalty, rng=rng),
"ipwXF": ipw_did(x, d, y1, y0, xfit=True, penalty=penalty, rng=rng),
"aipw": aipw_did(x, d, y1, y0, xfit=False, penalty=penalty, rng=rng),
"aipwXF": aipw_did(x, d, y1, y0, xfit=True, penalty=penalty, rng=rng),
}
trial = estimate_all(semipar_did_dgp(rho=0.0, rng=np.random.default_rng(7)), rng=np.random.default_rng(8))
display(HTML(html_table(["Estimator", "Estimate"], [[k, f"{v:.3f}"] for k, v in trial.items()])))