Source code for linna.main

import numpy as np
import pyDOE2
import sample_generator as sg
from copy import deepcopy
import os
import glob
import pickle
import sys
import emcee
from linna.nn import *
from scipy.special import erf
from scipy.stats import chi2
import io
import gc
import torch
from torch.utils.data import Dataset, DataLoader
from torch.utils import mkldnn as mkldnn_utils
from linna.util import *
import tempfile


[docs]def ml_sampler(outdir, theory, priors, data, cov, init, pool, nwalkers, gpunode, omegab2cut=None, nepoch=4500, method="zeus", nbest=None, chisqcut=None, loglikelihoodfunc=None): """ LINNA main function with hyperparameters set to values described in To et al. 2022 Args: outdir (string): output directory theory (function): theory model priors (dict of str: [float, float]): string can be either flat or gauss. If the string is 'flat', [a,b] indicates the lower and upper limits of the prior. If the string is 'gauss', [a,b] indicates the mean and sigma. data (1d array): float array, data vector cov (2d array): float array, covariance matrix init (ndarray): initial guess of mcmc, pool (mpi pool, optional): a mpi pool instance that can do pool.map(function, iterables). nwalkers (int) number of mcmc walkers gpunode (string): name of gpu node omegab2cut (list of int): 2 elements containing the lower and upper limits of omegab*h^2 nepoch (int, optional): maximum number of epoch for the neural network training method (string, optional): Samplers. LINNA supports `emcee` and `zeus`(default) nbest (int or list of int): number of points to include in the training set per iteration according to the optimizer chisqcut (float, optional): cut the training data if there chisq is greater than this value loglikelihoodfunc (callable, optional): function of model, data , inverse of covariance matrix and return the log liklihood value. If None, then use gaussian likelihood Returns: nd array: MCMC chain 1d array: log probability of MCMC chain """ ntrainArr = [10000, 10000, 10000, 10000] nvalArr = [500, 500, 500, 500] if method=="emcee": nkeepArr = [2, 2, 5, 4] ntimesArr = [5, 5, 10, 15] ntautolArr = [0.03, 0.03, 0.02, 0.01] temperatureArr = [4.0, 2.0, 1.0, 1.0] meanshiftArr = [0.2, 0.2, 0.2, 0.2] stdshiftArr = [0.15,0.15,0.15,0.15] elif method=="zeus": nkeepArr = [2, 2, 5, 5] ntimesArr = [5, 5, 10, 50] ntautolArr = [0.03, 0.03, 0.02, 0.01] temperatureArr = [4.0, 2.0, 1.0, 1.0] meanshiftArr = [0.2, 0.2, 0.2, 0.2] stdshiftArr = [0.15,0.15,0.15,0.15] else: raise NotImplementedError(method) dolog10index = None ypositive = False device = "cuda" docuda=False tsize=1 nnmodel_in = ChtoModelv2 params = {} params["trainingoption"] = 1 params["num_epochs"] = nepoch params["batch_size"] = 500 return ml_sampler_core(ntrainArr, nvalArr, nkeepArr, ntimesArr, ntautolArr, meanshiftArr, stdshiftArr, outdir, theory, priors, data, cov, init, pool, nwalkers, device, dolog10index, ypositive, temperatureArr, omegab2cut, docuda, tsize, gpunode, nnmodel_in, params, method, nbest=nbest, chisqcut=chisqcut, loglikelihoodfunc=loglikelihoodfunc)
[docs]def ml_sampler_core(ntrainArr, nvalArr, nkeepArr, ntimesArr, ntautolArr, meanshiftArr, stdshiftArr, outdir, theory, priors, data, cov, init, pool, nwalkers, device, dolog10index, ypositive, temperatureArr, omegab2cut=None, docuda=False, tsize=1, gpunode=None, nnmodel_in=None, params=None, method="emcee", nbest=None, chisqcut=None, loglikelihoodfunc=None, nsigma=3): """ LINNA main function Args: ntrainArr (int array): number of training data per iteration nvalArr (int array): number of validation data per iteration nkeepArr (int array): number of autocorrelation time to be kept ntimesArr (int array): number of autocorrelation time to stop mcmc ntautolArr (float array): error limit of autocorrelation time meanshiftArr (float array): limit on mean shift of parameter estimation from the first and second half of the chain stdshiftArr (float array): limit on std shift of parameter estimation from the first and second half of the chain outdir (string): output directory theory (function): theory model priors (dict of str: [float, float]): string can be either flat or gauss. If the string is 'flat', [a,b] indicates the lower and upper limits of the prior. If the string is 'gauss', [a,b] indicates the mean and sigma. data (1d array): float array, data vector cov (2d array): float array, covariance matrix init (ndarray): initial guess of mcmc, pool (object): mpi4py pool instance nwalkers (int) number of mcmc walkers device (string): cpu or gpu dolog10index (int array): index of parameters to do log10 ypositive (bool): whether the data vector is expected to be all positive temperatureArr (float array): temperature parameters for each iteration omegab2cut (list of int): 2 elements containing the lower and upper limits of omegab*h^2 docuda (bool): whether do gpu for evaluation tsize (int, optional): number of cores for training gpunode (string): name of gpu node nnmodel_in (string): instance of neural network model params (dictionary): dictionary of parameters method (string): sampling method nbest (int or list of int): number of points to include in the training set per iteration according to the optimizer chisqcut (float, optional): cut the training data if there chisq is greater than this value loglikelihoodfunc (callable): function of model, data , inverse of covariance matrix and return the log liklihood value nsigma (float): the training point in the first iteration will be generated within nsigma of the gaussian prior Returns: nd array: MCMC chain 1d array: log probability of MCMC chain """ ndim = len(init) sigma = np.sqrt(np.diag(cov)) inv_cov = np.linalg.inv(cov) prior_range = [] for item in priors: if item['dist'] == 'flat': prior_range.append([item['arg1'], item['arg2']]) elif item['dist'] == 'gauss': prior_range.append([item['arg1']-5*item['arg2'], item['arg1']+5*item['arg2']]) else: print("not implement dist : {0}".format(item['dist']), flush=True) assert(0) transform = Transform(priors) invtransform = invTransform(priors) init = invtransform(init) if method=="emcee": filename = "chemcee_256.h5" elif method == "zeus": filename = "zeus_256.h5" else: raise NotImplementedError(method) for i, (nt, nv, nk, ntimes, tautol, temperature, meanshift, stdshift) in enumerate(zip(ntrainArr, nvalArr, nkeepArr, ntimesArr, ntautolArr, temperatureArr, meanshiftArr, stdshiftArr)): if isinstance(nbest, list): nbest_in = nbest[i] if nbest_in <=0: nbest_in = None else: nbest_in = nbest if nbest_in is not None: tempdir = tempfile.TemporaryDirectory() def negloglike(x): d = data-theory([-1,x], tempdir) return d.dot(inv_cov.dot(d)) else: negloglike=None temperature = temperature**2 print("#"*100) print("iteration: {0}".format(i), flush=True) print("#"*100) outdir_in = os.path.join(outdir, "iter_{0}/".format(i)) if i==0: chain=None else: chain_name = os.path.join(os.path.join(outdir, "iter_{0}/".format(i-1)), filename[:-3]) if os.path.isfile(chain_name+".h5"): chain_name = chain_name+".h5" chain, _temp, _temp2= read_chain_and_cut(chain_name.format(i-1), nk, ntimes, method=method) else: chain_name = chain_name+".txt" chain = np.loadtxt(chain_name)[-100000:,:-1] #Generate training ntrain = nt nval = nv nnsampler = NN_samplerv1(outdir_in, prior_range) if "trainingoption" in params: options = params['trainingoption'] else: options = 0 generate_training_point(theory, nnsampler, pool, outdir_in, ntrain, nval, data, inv_cov, chain, nsigma=nsigma, omegab2cut=omegab2cut, options=options, negloglike= negloglike, nbest_in=nbest_in, chisqcut=chisqcut) chain = None del chain if i!=0: try: del _temp del _temp2 except: pass gc.collect() if (pool is None) or pool.is_master(): outdir_list = [os.path.join(outdir, "iter_{0}/".format(m)) for m in range(int(i+1))] f = open(outdir_list[-1]+"/model_pickle.pkl", 'wb') pickle.dump(train_NN, f) f.close() f = open(outdir_list[-1]+"/model_args.pkl", 'wb') if gpunode is not None: docuda=True else: docuda=torch.cuda.is_available() pickle.dump([nnsampler, cov, inv_cov, sigma, outdir_in, outdir_list, data, dolog10index, ypositive, False, 2, temperature, docuda, None, 1, nnmodel_in, params, nbest_in is not None], f) f.close() if not os.path.isfile(outdir_list[-1] + "/finish.pkl"): if gpunode == 'automaticgpu': while(True): gpufile = os.path.join(outdir, "gpunodeinfo.pkl") try: if os.path.isfile(gpufile): with open(gpufile, 'rb') as f: gpuinfo = pickle.load(f) gpunode = gpuinfo["nodename"] break except: pass if gpunode is not None: print("running gpu on {0}".format(gpunode), flush=True) os.system("cat {2}/train_gpu.py | ssh {0} python - {1} {3}".format(gpunode, outdir_list[-1], os.path.dirname(os.path.abspath(__file__)), "cuda")) while(1): if os.path.isfile(outdir_list[-1] + "/finish.pkl"): break else: os.system("python {1}/train_gpu.py {0} {2}".format(outdir_list[-1], os.path.dirname(os.path.abspath(__file__)), "nocuda")) while(1): if os.path.isfile(outdir_list[-1] + "/finish.pkl"): break #Retrieve model model, y_invtransform_data = retrieve_model(outdir_in, len(init), len(data), nnmodel_in) if not docuda: model.model = model.model.to(memory_format=torch.channels_last) model.MKLDNN=True #Do MCMC if os.path.isfile(os.path.join(outdir_in, filename)): continue invcov_new = torch.from_numpy(inv_cov.astype(np.float32)).to('cpu').detach().clone().requires_grad_() data_new = torch.from_numpy(data.astype(np.float32)).to('cpu').detach().clone().requires_grad_() if loglikelihoodfunc is None: loglikelihoodfunc = gaussianlogliklihood log_prob = Log_prob(data_new, invcov_new, model, y_invtransform_data, transform, temperature, nograd=True, loglikelihoodfunc=loglikelihoodfunc) dlnp = None ddlnp = None if pool is not None: pool.noduplicate=True run_mcmc(nnsampler, outdir_in, method, ndim, nwalkers, init, log_prob, dlnp=dlnp, ddlnp=ddlnp, pool=pool, transform=transform, ntimes=ntimes, tautol=tautol, meanshift=meanshift, stdshift=stdshift, nk=nk) if pool is not None: pool.noduplicate_close() chain_name = os.path.join(os.path.join(outdir, "iter_{0}/".format(len(ntrainArr)-1)), filename[:-3]) if os.path.isfile(chain_name+".h5"): chain_name = chain_name+".h5" chain, log_prob_samples_x, reader = read_chain_and_cut(chain_name.format(len(ntrainArr)-1), nk, ntimes, method=method) log_prob_samples_x = reader.get_log_prob(discard=0, flat=True, thin=1) else: chain_name = chain_name+".txt" chain = np.loadtxt(chain_name)[-100000:,:-1] log_prob_samples_x = np.loadtxt(chain_name)[-100000:,-1] #Optional importance sampling if 'nimp' in params.keys(): if not os.path.isfile(outdir+"/samples_im.npy"): chain_name = os.path.join(os.path.join(outdir, "iter_{0}/".format(len(ntrainArr)-1)), filename[:-3]) if os.path.isfile(chain_name+".h5"): chain_name = chain_name+".h5" chain, log_prob_samples_x, reader = read_chain_and_cut(chain_name.format(len(ntrainArr)-1), nk, ntimes, method=method, flat=True) else: chain_name = chain_name+".txt" chain = np.loadtxt(chain_name)[-100000:,:-1] log_prob_samples_x = np.loadtxt(chain_name)[-100000:,-1] select = np.random.randint(0, len(chain), params['nimp']) chain = chain[select] log_prob_samples_x = log_prob_samples_x[select] np.save(outdir+"/samples_im.npy", chain) np.save(outdir+"/log_prob_samples_x.npy", log_prob_samples_x) else: chain = np.load(outdir+"/samples_im.npy") log_prob_samples_x = np.load(outdir+"/log_prob_samples_x.npy") outimp = os.path.join(outdir, 'imp/') nnsampler = NN_samplerv1(outimp, prior_range) if not os.path.isdir(outimp): os.makedirs(outimp) if not os.path.isfile(outdir+"/theory.npy"): theory = nnsampler.generate_training_data(zip(range(len(chain)), chain), theory, pool=pool, args=[outimp]) np.save(outdir+"/theory.npy", theory) else: theory = np.load(outdir+"/theory.npy") logprior=LogPrior(priors) log_prob_samples_x = log_prob_samples_x.flatten() logp = logp_theory_data(chain, theory, data, inv_cov, logprior) w = np.exp(logp-log_prob_samples_x) w[np.abs(np.log(w)-np.mean(np.log(w)))>2*np.std(np.log(w))]=0 w = w/np.sum(w) np.save(outdir+"/weight_im.npy", [log_prob_samples_x.flatten(), logp, w]) return chain, log_prob_samples_x