#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import numpy as np
from math import pi
from pathos.multiprocessing import Pool
from mpmath import fp
from pathlib import Path
import importlib.util
import sys
import glob
from ..ANTARESS_general.constant_data import c_light
##################################################################################################
#%%% Redefinitions and various functions
##################################################################################################
np_append=np.append
np_arange=np.arange
np_arctan2=np.arctan2
np_array=np.array
np_column_stack=np.column_stack
np_cos=np.cos
np_empty=np.empty
np_exp=np.exp
np_interp=np.interp
np_invert=np.invert
np_isnan=np.isnan
np_log=np.log
np_mean=np.mean
np_nan=np.nan
np_ones=np.ones
np_outer=np.outer
np_power=np.power
np_repeat=np.repeat
np_reshape=np.reshape
np_rint=np.rint
np_savetxt=np.savetxt
np_sqrt=np.sqrt
np_sum=np.sum
np_tile=np.tile
np_unique=np.unique
np_vstack=np.vstack
np_zeros=np.zeros
np_poly = np.polynomial.Polynomial
[docs]
def npint(array):
r'''**Integer conversion**
Converts an array to integer type.
Args:
TBD
Returns:
TBD
'''
return array.astype(int)
[docs]
def default_func(*x):
r"""**Unity function.**
Returns 1 for any set of input arguments (used as default function).
Args:
x (various) : input variables
Returns:
gen_output (array) : unity array
"""
gen_output = np.array([1.])
return gen_output
[docs]
def nantest(varname,var):
r"""**Nan/Inf test.**
Checks whether input variable is nan or inf.
Args:
TBD
Returns:
TBD
"""
if np.isnan(var).any():
raise Exception("NaN error: %s contains NaNs." % varname)
if np.isinf(var).any():
raise Exception("Finite error: %s contains in-finite values." % varname)
[docs]
def stop(message=None):
r'''**Stop routine**
Stops process with optional message.
Args:
message (str): stop message
Returns:
None
'''
str_message=' : '+message if message is not None else ''
print('Stop'+str_message)
raise SystemExit
return None
[docs]
def find_between(s, start, end):
r'''**In-between string search**
Finds the substring between two substrings of a string.
The two substrings must be unique.
Args:
TBD
Returns:
TBD
'''
return (s.split(start))[1].split(end)[0]
[docs]
def dicval(dic,key,def_val):
r'''**Missing dictionary key**
Returns the value associated with an existing key in a dictionary, or a user-defined value for missing keys.
Args:
dic (dict) : input dictionary
key (float) : input key
def_val (any) : default value for missing keys
Returns:
out_val (any) : value associated with `key` or default value
'''
out_val = [dic.get(key) if dic.get(key) is not None else def_val][0]
return out_val
[docs]
def get_time():
r"""**Current time.**
Returns current time.
Args:
None
Returns:
start_time (float) : current time
"""
import time
start_time = (time.time())
return start_time
[docs]
def end(start_time,id_time=''):
r"""**Time stop.**
Returns elapsed time.
Args:
start_time (float) : start time
Returns:
delta_time (float) : elapsed time
"""
import time
end=time.time()
delta_time = end-start_time
print('Elapsed %s: %s' % ('on timer '+id,delta_time))
return delta_time
[docs]
def get_pw10(x):
r"""**Find power of ten.**
For a given `x`, finds `n` so that :math:`x = y 10^n`, with :math:`1 < |y| < 10`.
Args:
x (float) : input value
Returns:
:math:`10^n` (n integer) : power of ten associated with `x`
"""
n = 10**(np.floor(np.log10(x)))
return n
[docs]
def dup_edges(x,xblue=None,xred=None):
r"""**Duplicate edges.**
Attribute edge values to each side of an undefined table.
Args:
x (arr, float) : input array with possibly undefined values
Returns:
x (arr, float) : input array with fully defined values
"""
idx_def = np_where1D(~np.isnan(x))
if idx_def[0]>0:
if xblue is None:x[0:idx_def[0]] = x[idx_def[0]]
else:x[0:idx_def[0]] = xblue
if idx_def[-1]<len(x)-1:
if xred is None:x[idx_def[-1]+1:] = x[idx_def[-1]]
else:x[idx_def[-1]+1::] = xred
return x
##################################################################################################
#%%% Array operations
##################################################################################################
[docs]
def np_where1D(cond):
r"""**1D where.**
Simplified numpy `where` function in 1D.
Args:
cond (conditional statement) : condition on 1D array
Returns:
idx_cond (array) : array of indexes validating condition
"""
idx_cond = np.where(cond)[0]
return idx_cond
[docs]
def step_range(start,stop,step):
r"""**Range with step.**
Return all points `x` between :math:`x_\mathrm{start}` and :math:`x_\mathrm{stop}` separated by :math:`dx`
.. math::
x = x_\mathrm{start} + i x_\mathrm{step} <= x_\mathrm{stop}
Args:
TBD
Returns:
TBD
"""
nsteps=int(round((stop-start)/step))+1
x_grid = start+np.arange(nsteps)*step
return x_grid
[docs]
def closest(x_tab,x_val):
r"""**Closest value : 1D.**
Returns index of closest value in a 1D array.
Args:
x_tab (array, float) : searched 1D array
x_val (float) : input value
Returns:
idx (int) : index of value closest to `x_val` in `x_tab`
"""
idx = (np.abs(x_tab-x_val)).argmin()
return idx
[docs]
def closest_Ndim(x_tab,x_val):
r"""**Closest value : ND.**
Returns index of closest values along the first dimension of an array.
Args:
x_tab (array, float) : searched multi-dimensional array
x_val (float) : input value
Returns:
idx (list, int) : indexes of values closest to `x_val` along the first dimension of `x_tab`
"""
idx = list(np.unravel_index(np.argmin(np.abs(x_tab-x_val), axis=None), x_tab.shape))
return idx
[docs]
def closest_arr(x_tab, x_val):
r"""**Closest array.**
Returns index of closest values from an array in another array.
Args:
x_tab (array, float) : searched multi-dimensional array
x_val (array, float) : input array values (must be sorted)
Returns:
idx (array, int) : indexes of values closest to `x_val` within `x_tab`
"""
x_tab = np.asarray(x_tab)
#Returns index 0 for single value array
if (len(x_tab)==1):
idx=np.repeat(0,len(x_val))
else:
#Return indexes in 'array' that would maintain order if 'in_value' values were inserted into 'array' after the 'array' value corresponding to index
#This means that idx-1 corresponds to the value in array immediately lower than the 'in_value' value, except if the value is lower than the min of array (then idx=0)
idx = x_tab.searchsorted(x_val)
#Set indexes equal to 0 (ie values lower than the min in array) to 1
#Set indexes equal to len(array) (ie values higher than the max in array) to len(array)-1
idx = np.clip(idx, 1, len(x_tab)-1)
#Return 'array' values immediately lower/higher than each 'in_value' values, if they are within 'array' min/max values
# if value is lower than the min, idx has been set to 1 and left returns the min (and first) value of array (right the second value in array)
# if value is higher than the max, idx has been set to len(array)-1 and right returns the max (and last) value of array (left the next to last value in array)
left = x_tab[idx-1]
right = x_tab[idx]
#Return True (=1) if value is closer to its lower limit in array (left), otherwise return False (=0)
# in the 1st case, idx-1 corresponds to the value in array immediately lower than the 'in_value' value
# in the 2nd case, idx corresponds to the value in array immediately higher than the 'in_value' value
#
# if value is lower than the min, (in_value - left) is <0 and always lower than (right - in_value) positive
# thus idx=1 is reduced by 1 and returns the min of the array
# if value is higher than the max, (in_value - left) is >0 and always higher than (right - in_value) negative
# thus idx=len(array)-1 is left untouched and returns the max of the array
idx -= x_val - left < right - x_val
return idx
[docs]
def findgen(n,int=False):
r"""**IDL findgen.**
Returns array of requested type with values increasing from 0 to n by step of 1.
Args:
TBD
Returns:
TBD
"""
if int:
return np.linspace(0,n-1,n).astype(int)
else:
return np.linspace(0,n-1,n)
[docs]
def upsample(x, sample=2):
r"""**Upsampling.**
Upsamples array by integer factor.
Array values do not have to be equidistant.
Args:
TBD
Returns:
TBD
"""
d=np.diff(x)/float(sample)
r = d.repeat(sample)
s=np.concatenate([[x[0]],x[0]+np.cumsum(r)])
return s
[docs]
def def_edge_tab(cen_bins,dim = 2):
r"""**Bins edge definition**
Defines edges of input, centered, bins.
Args:
TBD
Returns:
TBD
"""
if dim==0:
mid_bins = 0.5*(cen_bins[0:-1]+cen_bins[1::])
low_bins_st =cen_bins[0] - (mid_bins[0] - cen_bins[0])
high_bins_end = cen_bins[-1] + (cen_bins[-1]-mid_bins[-1])
edge_bins = np.concatenate(([low_bins_st], mid_bins,[high_bins_end]))
elif dim==2:
mid_bins = 0.5*(cen_bins[:,:,0:-1]+cen_bins[:,:,1::])
low_bins_st =cen_bins[:,:,0] - (mid_bins[:,:,0] - cen_bins[:,:,0])
high_bins_end = cen_bins[:,:,-1] + (cen_bins[:,:,-1]-mid_bins[:,:,-1])
edge_bins = np.concatenate((low_bins_st[:,:,None] , mid_bins,high_bins_end[:,:,None]),axis=2)
else:stop('Upgrade def_edge_tab()')
return edge_bins
##################################################################################################
#%%% Data manipulation
##################################################################################################
[docs]
def check_data(file_paths,vis=None,silent=False):
r"""**Data check**
Checks that ANTARESS data is saved on disk.
Args:
TBD
Returns:
TBD
"""
check_flag = True
suff = '' if vis is None else ' for '+vis
for key in file_paths:
check_flag&=(Path(file_paths[key]+'.npz').is_file())
if check_flag:
if ~silent:print(' Retrieving data'+suff)
else:
print(' Data not available'+suff)
stop()
return check_flag
[docs]
def dataload_npz(path):
r"""**Npz retrieval.**
Simplified call to function uploading `npz` file with `data` field.
Args:
TBD
Returns:
None
"""
data_path = glob.glob(path_nonpz(path)+'.npz')
if len(data_path)==0:stop('ERROR : file "'+path_nonpz(path)+'.npz'+'" does not exist"')
return np.load(data_path[0],allow_pickle=True)['data'].item()
[docs]
def datasave_npz(path,data):
r"""**Npz saving.**
Simplified call to function saving `npz` file with `data` field.
Args:
TBD
Returns:
None
"""
return np.savez_compressed(path_nonpz(path),data=data,allow_pickle=True)
[docs]
def path_nonpz(path):
r"""**npz path.**
Ensures that the path used by dataload_npz() or datasave_npz() has no .npz extension
Args:
path (str): path to a .npz
Returns:
path (str): path to a .npz
"""
return path.split('.npz')[0]
[docs]
def path(path):
r"""**Slash path.**
Ensures that a string that points to a folder (as a filepath) has a slash at the end.
Args:
path (str): path to a folder
Returns:
path (str): path to a folder
"""
if path[-1] != '/':
path+='/'
return path
[docs]
def path_singslash(path):
r"""**Single slash path.**
Ensures that a string that points to a folder (as a filepath) has only single slashes.
Args:
path (str): path to a folder
Returns:
path (str): path to a folder
"""
return path.replace('//','/')
[docs]
def import_module(mod_path):
r"""**Import python module.**
Reads file at mod_path and loads it as a python module.
Args:
mod_path (str): absolute path or name of the python file (if within current directory) to import, ending with .py
Returns:
module (): loaded module
"""
spec = importlib.util.spec_from_file_location('imported_modules', mod_path)
module = importlib.util.module_from_spec(spec)
sys.modules['imported_modules'] = module
spec.loader.exec_module(module)
return module
##################################################################################################
#%%% Math functions
##################################################################################################
[docs]
def is_odd(x):
r"""**Odd check.**
Cheks whether a variable is odd.
Args:
x (array, float) : input variable
Returns:
cond_odd (array, bool) : True if `x` is odd
"""
cond_odd = np.array(x % 2,dtype=bool)
return cond_odd
[docs]
def factrial(x):
r"""**Factorial.**
Simplified call to numpy factorial of input variable.
Args:
x (int) : input variable.
Returns:
fact_x (int) : factorial of `x`
"""
fact_x = np.math.factorial(x)
return fact_x
[docs]
def closest_mod(x,p,y):
r"""**Closest modulo.**
Given `x`, `p`, `y`, returns the value of :math:`x +- n \times p` closest to `y`.
This is equivalent to finding the largest `n` so that :math:`[ (x - y)/p ] +- n` is closest to `0`.
Args:
x (float or array) : dividend.
p (float or array) : divisor.
y (float or array) : target.
Returns:
r (float or array) : remainder
"""
r = x - np.round((x-y)/p)*p
return r
[docs]
def conv_log(var_tab,log_mod):
r'''**Log conversion**
Returns either the log10 or the original of the input value, upon request.
Args:
TBD
Returns:
TBD
'''
if log_mod:out_val= np.log10(var_tab)
else:out_val = var_tab
return out_val
[docs]
def np_hyp2f2(z):
r'''**Voigt integration**
Returns the integral of a Voigt profile, with broadcasting (original function cannot be used with arrays), based on Hypergeometric mpmath function.
Args:
TBD
Returns:
TBD
'''
np_hyp2f2_temp = np.frompyfunc(fp.hyp2f2, 5, 1)
out_val = np_hyp2f2_temp(1.,1.,2.,3./2.,-z*z)
return out_val
[docs]
def dichotomy(low, high, f, x):
r'''**Dichotomy**
Returns the value `v` between `low` and `high` such that :math:`f(v)=x` using a dichotomic search.
If `v` is not in `[low, high]`, returns `-1`.
Assumes that `f` is an increasing function (i.e. :math:`f(a)>f(b)` for :math:`a>b`).
If `f` is a decreasing function, use `-f`.
Args:
TBD
Returns:
TBD
'''
if f(low) > x or f(high) < x: return -1
mid = .5*(high + low)
if f(mid) == x:
return mid
elif f(mid) > x:
return dichotomy(low, mid, f, x)
else:
return dichotomy(mid, high, f, x)
[docs]
def convol_contin(x_grid,dx_grid,y_tab,kern,hw_kern):
r"""**Convolution with tabulated kernel.**
Convolves a tabulated profile `F` with a tabulated (continuously-defined) kernel `k`.
The convolved profile is expressed as
.. math::
F_\mathrm{conv}(x) &= \int_{t}( F(x-t) k_\mathrm{norm}(t) dt) \\
\mathrm{where \,} &k_\mathrm{norm}(t) = \frac{k(t)}{\int_{u}(k(u) du)}
With :math:`F_\mathrm{conv}` and `F` in the same units (thus :math:`k_\mathrm{norm}` in units of :math:`[dt^{-1}]`), and
:math:`k_\mathrm{norm}` normalized to an integral unity.
Expressed in the discrete domain, the convolution writes as (see `<http://www.exelisvis.com/docs/CONVOL.html>`_)
.. math::
F_\mathrm{conv}(x) &= \sum_{i=0}^{m-1}( F( x+i-hm ) k(i) ) \mathrm{\, if \,} hm <= x <= n-hm-1 \\
&= F(0) \sum_{i=0}^{hm-x-1} ( k(i) ) + \sum_{i=hm-x:m-1} ( F( x+i-hm ) k(i) ) \mathrm{\, if \,} x < hm \\
&= \sum_{i=0}^{n-1+hm-x} ( F( x+i-hm ) k(i) ) + F(n-1) \sum_{i=n+hm-x,m-1} ( k(i) ) \mathrm{\, if \,} x > n-hm-1
With `F` the function to be convolved (dimension `n`), and `m` the kernel (dimension `k`). This can also be written with :math:`(j=x+i-hm -> i=j-x+hm)` and :math:`(m - hm = hm)` as:
.. math::
F_\mathrm{conv}(x) &= \sum_{j=x-hm}^{x+hm} ( F( j ) k(j-x+hm) ) \mathrm{\, if \,} hm <= x <= n-hm-1 \\
&= F(0) \sum_{j=x-hm}^{-1} ( k(j-x+hm) ) + \sum_{j=0}^{x+hm} ( F( j ) k(j-x+hm) ) \mathrm{\, if \,} x < hm \\
&= \sum_{j=x-hm}^{n-1} ( F( j ) k(j-x+hm) ) + F(n-1) \sum_{j=n}^{x+hm-1} ( k(j-x+hm) ) \mathrm{\, if \,} x > n-hm-1
Here we use extended edges, ie we repeat the values of `F` at the edge to compute their convolved values.
We take into account the (possibly variable) spectral sampling of `F`, and use a function for the kernel to be able to calculate its value at any position. We define
+ :math:`k[\lambda_\mathrm{cen}](\lambda)` as the value of the kernel centered on :math:`\lambda_\mathrm{cen}`, at wavelength :math:`\lambda`.
+ :math:`\mathrm{hw}_\mathrm{kern}` (ie `hm`) the interval between the kernel center and the wavelength point where the kernel falls below 1\% of its max, ie where its
contribution to the convolution becomes negligible (:math:`\sim` 0.1\% difference with the full kernel range)
+ `dw(j)` the width of the pixel at index `j` in the table of `F`
We separate again three different cases
+ regular values (:math:`\lambda(0)+\mathrm{hw}_\mathrm{kern} <= \lambda(x) <= \lambda(n-1)-\mathrm{hw}_\mathrm{kern}`)
.. math::
F_\mathrm{conv}(x) &= \sum_{ j=j_s}^{j_e }( F( j ) k[\lambda(j)]( \lambda(x) ) dw(j) ) \\
& j_s \mathrm{lowest \, index \,that\, gives} \lambda(j_s) >= \lambda(x) - \mathrm{hw}_\mathrm{kern} \\
& j_e \mathrm{highest \, index \,that \,gives} \lambda(j_e) <= \lambda(x) + \mathrm{hw}_\mathrm{kern}
We cannot define :math:`(j_s,j_e)` in terms of indexes because the resolution of `F` can be variable
+ lower edge (:math:`\lambda(x) < \lambda(0)+\mathrm{hw}_\mathrm{kern}`)
.. math::
F_\mathrm{conv}(x) &= F(0) dw(0) \sum_{j= -1 - int(\mathrm{hw}_\mathrm{kern}/dw(0))}^{-1} ( k[\lambda(0)+dw(0) j]( \lambda(x) ) ) + \sum_{ j=0}^{j_e} ( F( j ) k[\lambda(j)]( \lambda(x) ) dw(j) )\\
& j_e \mathrm{highest \, index \,that \,gives} \lambda(j_e) <= \lambda(x) + \mathrm{hw}_\mathrm{kern}
In that case, because :math:`\mathrm{hw}_\mathrm{kern}/dw(0) -1 < int(\mathrm{hw}_\mathrm{kern}/dw(0)) <= \mathrm{hw}_\mathrm{kern}/dw(0)` the first pixel on which the kernel is centered on at
:math:`\lambda_{j_\mathrm{min}} = \lambda(0) - dw(0) - dw(0) int( \mathrm{hw}_\mathrm{kern}/dw(0) )` verifies :math:`\mathrm{hw}_\mathrm{kern}<\lambda(0)-\lambda_{j_\mathrm{min}}<=\mathrm{hw}_\mathrm{kern}+dw`
thus this is the farthest phantom pixel, when extending the table with pixels at the resolution of the first pixel, for which the kernel
contributes significantly to the convolution (since the next extended pixel would be at more than :math:`\mathrm{hw}_\mathrm{kern}` from the first pixel at :math`\lambda(0)`).
+ upper edge (:math:`\lambda(x) > \lambda(n-1)-\mathrm{hw}_\mathrm{kern}`)
.. math::
F_\mathrm{conv}(x) = \sum_{j=j_s}^{n-1}( F( j ) k[\lambda(j)]( \lambda(x) ) dw(j) ) + F(n-1) dw(n-1) \sum_{j=1}^{1+int(\mathrm{hw}_\mathrm{kern}/dw(n-1))}( k[\lambda(n-1)+dw(n-1) j]( \lambda(x) ) )
With :math:`j_s` lowest index that gives :math:`\lambda(j_s) >= \lambda(x) - \mathrm{hw}_\mathrm{kern}`.
In that case, because :math:`\mathrm{hw}_\mathrm{kern}/dw(n-1) -1 < int(\mathrm{hw}_\mathrm{kern}/dw(n-1)) <= \mathrm{hw}_\mathrm{kern}/dw(n-1)` the last pixel on which the kernel is centered on at
:math:`\lambda_{j_\mathrm{max}} = \lambda(n-1) + dw(n-1) + dw(n-1) int( \mathrm{hw}_\mathrm{kern}/dw(n-1) )` verifies :math:`\mathrm{hw}_\mathrm{kern} < \lambda_{j_\mathrm{max}} - \lambda(n-1) <= \mathrm{hw}_\mathrm{kern} +dw(n-1)`.
Args:
TBD
Returns:
TBD
"""
#First/last wavelength pixels
n_pix=len(y_tab)
wav0=x_grid[0]
dwav0=dx_grid[0]
dwavf=dx_grid[n_pix-1]
wavf=x_grid[n_pix-1]
if 2.*hw_kern>(wavf-wav0):stop('Kernel too large compared to spectrum')
#The convolved spectrum is defined on the same wavelength table as the original spectrum
F_conv=np.zeros(n_pix)
#We use directly the product spectrum x resolution 'F_dw'
F_dw=y_tab*dx_grid
#--------------------------------------------------------------------------------------------
#Regular values: w(0)+hw_kern <= w(x) <= w(n-1)-hw_kern
# F_conv(x) = sum( j=js,je ; F( j ) * k[w(j)]( w(x) ) * dw(j) )
# with js lowest index that gives w(js) >= w(x) - hw_kern
# je highest index that gives w(je) <= w(x) + hw_kern
id_reg=np.where( (x_grid >= wav0 + hw_kern) & (x_grid <= wavf - hw_kern))[0]
for ix in id_reg:
wavx=x_grid[ix]
j_tab_reg=np.arange(( np.where(x_grid >= wavx - hw_kern)[0] )[0],( np.where(x_grid <= wavx + hw_kern)[0] )[-1]+1)
F_conv[ix]+=np.sum(F_dw[j_tab_reg]*kern(wavx,x_grid[j_tab_reg]))
#--------------------------------------------------------------------------------------------
#Lower edge: w(x) < w(0)+hw_kern
# F_conv(x) = F(0) * dw(0) * sum(j= -1 - int(hw_kern/dw(0)),-1 ; k[w(0)+dw(0)*j]( w(x) ) ) +
# sum( j=0:je ; F( j ) * k[w(j)]( w(x) ) * dw(j) )
# with je highest index that gives w(je) <= w(x) + hw_kern
id_low=np.where( (x_grid < wav0 + hw_kern))[0]
for ix in id_low:
wavx=x_grid[ix]
j_tab_low=np.arange(-1-int(hw_kern/dwav0),0)
j_tab_reg=np.arange(0,( np.where(x_grid <= wavx + hw_kern)[0] )[-1]+1)
F_conv[ix]+=F_dw[0]*np.sum(kern(wavx,wav0+dwav0*j_tab_low))+\
np.sum(F_dw[j_tab_reg]*kern(wavx,x_grid[j_tab_reg]))
#--------------------------------------------------------------------------------------------
#Upper edge: w(x) > w(n-1)-hw_kern
# F_conv(x) = sum( j=js:n-1 ; F( j ) * k[w(j)]( w(x) ) * dw(j) )
# + F(n-1) * dw(n-1) * sum(j=1,1+int(hw_kern/dw(n-1)) ; k[w(n-1)+dw(n-1)*j]( w(x) ) )
# with js lowest index that gives w(js) >= w(x) - hw_kern
id_high=np.where( (x_grid > wavf - hw_kern))[0]
for ix in id_high:
wavx=x_grid[ix]
j_tab_reg=np.arange(( np.where(x_grid >= wavx - hw_kern)[0] )[0],n_pix)
j_tab_high=np.arange(1,1+int(hw_kern/dwavf)+1)
F_conv[ix]+=np.sum(F_dw[j_tab_reg]*kern(wavx,x_grid[j_tab_reg]))+\
F_dw[n_pix-1]*np.sum(kern(wavx,wavf+dwavf*j_tab_high))
return F_conv
##################################################################################################
#%%% Physics functions
##################################################################################################
[docs]
def planck(wav_tab,Teff):
r'''**Black body spectrum**
Calculates black body at a given temperature, over an input wavelength table.
To get the black body flux at distance `d` from the star, do::
>>> planck(wav_tab,Teff) x (Rstar/d)^2
Args:
wav_tab (array, float) : wavelength table in A
Teff (float) : temperature in K
Returns:
bbflux (array, float) : black body flux in erg/cm2/s/A at the radius of the star
'''
#Constants (cgs units)
h_planck=6.62606957e-27 #(g cm2 s-1 = erg s) Planck constant
k_boltz=1.3806488*1e-16 #(g cm2 s-2 K-1 = erg K-1) Boltzmann constant
c_light_cm=29979245800. #(cm s-1) speed of light
#Wavelengths in cm
wav_tab_cm=wav_tab*1e-8
#Coefficients
a = 2.0*h_planck*(c_light_cm**2) #in g cm4 s-3 = erg cm2 s-1
b = h_planck*c_light_cm/(k_boltz*wav_tab_cm*Teff) #in nothing
#Black body intensity
# - bbint_nu = (2*h*nu^3/c^2)*(1/(exp(h*nu/(k*T))-1)
# in erg s-1 cm-2 Hz-1 steradian-1
# - bbint_w = bbint_nu*(dnu/dlambda)
# = bbint_nu*(c*dlambda/lambda^2)/dlambda
# = bbint_nu*(c/lambda^2)
# = (2*h*c^2/lambda^5)*(1/(exp(h*c/(lambda*k*T))-1)
# in erg s-1 cm-3 steradian-1
# - bbint_w_A in erg s-1 cm-2 A-1 steradian-1
bbint_w = a/((np.power(wav_tab_cm,5.))*(np.exp(b)-1.0))
bbint_w_A = bbint_w*1e-8
#Black body flux
# - integral of intensity over all outgoing angles from the star surface, along the LOS visible by the observer
# - flux_w = int(phi in 0:2pi, theta in 0:pi/2 , bbint_w * d_om)
# flux_w = pi*bbint_w
# in erg s-1 cm-2 cm-1, at the star radius
# thus flux_w*1e-8 is in erg s-1 cm-2 A-1
bbflux=pi*bbint_w_A
return bbflux,bbint_w_A
[docs]
def air_index(l, t=15., p=760.):
r"""**Air refraction index.**
Computes the refraction index `n` of the air (author: C. Lovis)
.. math::
& \lambda_\mathrm{air} = \lambda_\mathrm{vacuum}/n \\
& n_\mathrm{vacuum}=1.
Args:
l (float, array): Wavelength in Angström
t (float, array): Air temperature in Celsius
p (float, array): Air pressure in millimeter of mercury
Returns:
n (float, array): Refraction index.
"""
n = 1e-6 * p * (1 + (1.049-0.0157*t)*1e-6*p) / 720.883 / (1 + 0.003661*t) * (64.328 + 29498.1/(146-(1e4/l)**2) + 255.4/(41-(1e4/l)**2))
n = n + 1
return n
[docs]
def gen_specdopshift(rv_s2r , v_s = 0.):
r"""**General Doppler effect.**
Calculates the Doppler shift for the case where the inertial motions of the source and receiver are at any specified angle.
The Doppler shift for a source moving at speed :math:`v_s` at an angle :math:`\theta_r` measured in the frame of a stationary receiver is expressed in frequency and wavelength as
.. math::
\nu_\mathrm{r} &= \frac{ \nu_\mathrm{s} }{ \gamma ( 1 + \beta \cos{\theta_r} ) } \\
\lambda_\mathrm{r} &= \lambda_\mathrm{s} \gamma (1+\beta \cos{\theta_r}) \\
\mathrm{where \,} &\beta = \frac{v}{c}
The radial component of the source's motion along the line of sight is equal to :math:`rv = v \cos{\theta_r}`, negative if the source is moving away from the receiver, so that
.. math::
\lambda_\mathrm{r} &= \lambda_\mathrm{s} f_\mathrm{Dopp} \\
\mathrm{where \,} &f_\mathrm{Dopp} = \gamma (1+ \beta_\mathrm{rv}) \\
&\beta_\mathrm{rv} = \frac{rv[s/r]}{c} \\
&\gamma = \frac{1}{\sqrt{1 - \beta^2}}
The Lorentz factor :math:`\gamma = 1/\sqrt{1 - (v/c)^2} \sim 1` in most cases but is accounted in the Doppler shift of the orbital velocity, as it can yields variations on the order of 1-10 m/s for close-in planets on eccentric orbits.
Typically we convert wavelengths from the receiver to the source frame (to align measurements in their frame of emission), so one needs to divide :math:`\lambda_\mathrm{r}` by :math:`f_\mathrm{Dopp}`.
Args:
rv_s2r (array, float) : radial velocity of source with respect to receiver (km/s)
v_s (array, float) : absolute velocity of source (km/s)
Returns:
f_dopp (array, float) : spectral Doppler shift
"""
gamma = 1./np.sqrt(1. - (v_s/c_light)**2.)
f_dopp = gamma*(1. + (rv_s2r/c_light))
return f_dopp
[docs]
def rel_lon_specdopshift(rv_s2r):
r"""**Longitudinal Doppler effect.**
Calculates the Doppler shift for the case where the inertial motions of the source and receiver are along the axis from one to the other.
Args:
rv_s2r (array, float) : radial velocity of source with respect to receiver (km/s)
Returns:
f_dopp (array, float) : spectral Doppler shift
"""
f_dopp = np.sqrt(1. - (rv_s2r/c_light) )/np.sqrt(1. + (rv_s2r/c_light) )
return f_dopp
[docs]
def conv_flux2ph(w_grid):
r"""**Flux to photons conversion.**
Returns the conversion factor from flux density in erg/cm2/s/A to ph/cm2/s/A at a given wavelength.
The energy of a photon is E[erg] = E[cm2 g s−2] = h[g cm2 s-1] c[m] / lambda[m] = 1e3*1e4*h[kg m2 s-1] c[m] / (1e-10 lambda[A]) = 1e3*1e4*6.62606957e-34*299792458./ (1e-10 lambda[A]) = 1.986445683269303e-08/lambda[A]
Thus one erg yields (1/1.986445683269303e-08)*lambda[A] photons
Args:
w_grid (array, float) : input wavelength grid (A)
Returns:
conv_fact (array, float) : conversion factor ( flux_ph = conv_fact*flux_erg)
"""
conv_fact = 5.03411250E+07 * w_grid
return conv_fact
##################################################################################################
#%%% Multithreading
##################################################################################################
[docs]
def MAIN_multithread(func_input,nthreads,n_elem,y_inputs,common_args,output = False):
r"""**Multithreading wrap-up.**
This routine is applicable to functions that takes chunkable inputs defined as arrays with dimension of same length.
Specific routines must be defined for other types of inputs.
Args:
func_input (function): multi-threaded function
nthreads (int): number of threads
n_elem (int): number of elements to thread between the different workers
y_inputs (list): threadable function inputs (each element of the list is an array with first dimension of length n_elem)
common_args (tuple): common function inputs to be passed to all workers (chunkable inputs should be put in y_inputs)
output (bool): set to True to return function outputs
Returns:
y_output (None or specific to func_input): function outputs
"""
#Opening workers
# - cannot be passed through lmfit
pool_proc = Pool(processes=nthreads)
#Chunking arguments
ind_chunk_list=init_parallel_func(nthreads,n_elem)
chunked_args=[tuple(y_inputs[i][ind_chunk[0]:ind_chunk[1]] for i in range(len(y_inputs)))+common_args for ind_chunk in ind_chunk_list]
#Threading function
all_results=tuple(tab for tab in pool_proc.starmap(func_input,chunked_args))
#Regrouping outputs
if output:y_output=np.concatenate(tuple(all_results[i] for i in range(nthreads)))
else:y_output = None
#Closing workers
pool_proc.close()
pool_proc.join()
return y_output
[docs]
def init_parallel_func(nthreads,n_elem):
r"""**Multithreading initialization.**
Prepares multi-threading to execute routine on different cores with several arguments.
Args:
nthreads (int): number of threads
n_elem (int): number of elements to thread
Returns:
ind_chunk_list (list): function arguments separated in chunks to be passed to each thread.
"""
if (nthreads<=n_elem):
#Size of chunks to be processed by each core
npercore=int(n_elem/nthreads)
#Indexes of each chunk
ind_chunk_list=[(iproc*npercore,(iproc+1)*npercore) for iproc in range(nthreads-1)]
#Last core has to process the remaining elements
ind_chunk_list+=[((nthreads-1)*npercore,int(n_elem))]
else:
stop('Too many threads ('+str(nthreads)+') for the number of input elements ('+str(n_elem)+')')
return ind_chunk_list