import numpy as np import astropy.units as u from astropy.io import fits from scipy.interpolate import BSpline, splder import sys import json import traceback #==============================================================================# """ Python code connected to the Xspec implementation of the xscatXY model. TODO: - Speed up the code? - Is it better to include dxs/dr in the FITS file and then interpolate? - Are we doing (xext,yext) wrong? I think we are assuming that the rectangular region is centered on the source, but the source is not centered on the detector, so `xext` is not symmetric (nor is `yext`). """ #==============================================================================# def read_xscat(fdat): """ Return a dictionary containing data from the XSCAT FITS file (or text file). """ # Get the `fdat` file extension fext = fdat.split('.')[-1] # `fdat` is a .fits file (collection of xscat txt outputs) if (fext == 'fits'): with fits.open(fdat, mode='readonly') as hdul: # Standard XSCAT output data = hdul['XSCAT'].data rext = data['RadExtr'] * u.arcsec xpos = data['DustPos'] * u.dimensionless_unscaled enrg = data['Energy'] * u.keV sigma = data['Sigma'] * u.cm**2 nr = len(np.unique(rext)) nx = len(np.unique(xpos)) nE = len(np.unique(enrg)) rext = rext.reshape((nr,nx,nE), order='F')[:,0,0] xpos = xpos.reshape((nr,nx,nE), order='F')[0,:,0] enrg = enrg.reshape((nr,nx,nE), order='F')[0,0,:] sigma = sigma.reshape((nr,nx,nE), order='F')[:,:,:] dict = {'rext':rext, 'xpos':xpos, 'enrg':enrg, 'sigma':sigma} # Best-fit cross section and its radial derivative on (r,x,E) grid if ('SIGMAFIT' in data.names) and ('DSIGMADR' in data.names): sigmafit = data['SIGMAFIT'] * u.cm**2 dsigmadr = data['DSIGMADR'] * u.cm**2 / u.arcsec sigmafit = sigmafit.reshape((nr,nx,nE), order='F')[:,:,:] dsigmadr = dsigmadr.reshape((nr,nx,nE), order='F')[:,:,:] dict.update({'sigmafit':sigmafit, 'dsigmadr':dsigmadr}) # B-spline parameters (knots, coefficients, degree) # NOTE: Saved `knots` and `coeffs` as FITS arrays, so need the [0] if 'BSPLINE' in hdul: data = hdul['BSPLINE'].data head = hdul['BSPLINE'].header nt = head['NKNOTS'] nc = head['NCOEFFS'] degree = head['DEGREE'] knots = data['KNOTS'][0].reshape((nt,nx,nE), order='F')[:,:,:] coeffs = data['COEFFS'][0].reshape((nc,nx,nE), order='F')[:,:,:] dict.update({'knots':knots, 'coeffs':coeffs, 'degree':degree}) # `fdat` is a .dat file (output by xscat code, with `r` & `x` as #comments) elif (fext == 'dat'): data = np.loadtxt(fdat, comments='#') enrg = data[:,0] * u.keV sigma = data[:,-1] * u.cm**2 with open(fdat, 'r') as f: xpos = float(f.readline().strip('# x = ').strip()) * u.dimensionless_unscaled rext = float(f.readline().strip('# r = ').strip()) * u.arcsec dict = {'rext':rext, 'xpos':xpos, 'enrg':enrg, 'sigma':sigma} # `fdat` is something else else: raise ValueError(f"`fdat` file extension {fext} is not .fits or .txt") return dict #==============================================================================# def scale_data(y, y0=None, ypos=True, yuni=True, ylog=True): """ Scale (or unscale) data by applying ypos -> yuni -> ylog (or the reverse). Parameters: y : Array of data values to be scaled (or unscaled) y0 : If unscaling, you must provide the array of original data values ypos : Scale (or unscale) the data to a minimum value of 2*|min(y0)| yuni : Scale (or unscale) the data to a maximum value of 1 ylog : Scale (or unscale) the data logarithmically (ln/exp) Returns: Array of scaled (or unscaled) data values Example: # Prepare data y0 for fitting by scaling to (0,1] and then logarithmically >> y = scale_data(y=y0, y0=None, ypos=True, yuni=True, ylog=True) # Unscale the best-fit model zfit for comparision to the un-scaled data y0 >> z = scale_data(y=zfit, y0=y0, ypos=True, yuni=True, ylog=True) """ # Scale the data (we are working with the original data) if y0 is None: # Make all of the data positive by adding a constant offset (if needed) if ypos: # Some y <= 0, so we add a constant to make all y > 0 if (np.min(y) <= 0): c = 2 * np.abs(np.min(y)) y = y + c # Make the max value unity if yuni: y = y / np.max(y) # Take the natural log of the data if ylog: y = np.log(y) # Unscale the data (we want to recover the original data) else: # Recover the constant if (ypos and (np.min(y0)<=0)): c = 2 * np.abs(np.min(y0)) else: c = 0 # Un-log the data if ylog: y = np.exp(y) # Un-uni the data if yuni: y = y * np.max(y0 + c) # Un-pos the data if ypos: y = y - c # Return the scaled (or unscaled) data return y #==============================================================================# def unscale_deriv(dy_dx, y, y0, ypos=True, yuni=True, ylog=True): """ Unscale the derivative `dy_dx`, given the scaled data `y` and raw data `y0`. Scaling relations (logarithmic, constant divisor, additive constant): z = ln(y) --> y = exp(z) | Log-unscaling is different dz/dx = 1/y dy/dx --> dy/dx = exp(z) dz/dx | for data and derivative. z = 1/(f+c) y --> y = (f+c) z | Multiplicative constant dz/dx = 1/(f+c) dy/dx --> dy/dx = (f+c) dz/dx | unscaling is the same. z = y + c --> y = z - c | Additive constant unscaling applies to dz/dx = dy/dx --> dy/dx = dz/dx | the data but not its derivative. """ # Recover the constant if (ypos and (np.min(y0)<=0)): c = 2 * np.abs(np.min(y0)) else: c = 0 # Un-log the derivative if ylog: dy_dx = np.exp(y) * dy_dx # Un-uni the derivative if yuni: dy_dx = dy_dx * np.max(y0 + c) return dy_dx #==============================================================================# def eval_Bspline(tck, x): """ Evaluate B-spline at positions `x`, given `tck` tuple (knots, coeffs, deg). """ # Evaluate B-spline basis functions at the x-values t, c, k = tck B = BSpline(t=t, c=c, k=k) y = B(x) return y #==============================================================================# def eval_Bderiv(tck, x): """ Evaluate B-spline derivative at `x`, given `tck` tuple (knots, coeffs, deg). """ t, c, k = tck B = BSpline(t=t, c=c, k=k) Bderiv = splder(B, n=1) dy_dx = Bderiv(x) return dy_dx #==============================================================================# def halo_flux_profile_xy(xs, r, tck, xext, yext, xoff, yoff, nH, F0=1, dx=1, dy=1): """ Return the halo flux on the (x,y) grid and the grids themselves. """ # Create the (x,y) grid to exactly match the rectangular integration region xl, xr = xoff-xext, xoff+xext yb, yt = yoff-yext, yoff+yext nx, ny = int((xr-xl)/dx)+1, int((yt-yb)/dy)+1 x = np.linspace(xl, xr, nx) y = np.linspace(yb, yt, ny) xx, yy = np.meshgrid(x, y, indexing='ij') rr = np.sqrt(xx**2 + yy**2) # Evaluate the cross-section and its derivative on the (x,y) grid # NOTE: Using best-fit cross-section going forward (easier to interpolate) z = eval_Bspline(tck=tck, x=rr) # Best-fit `z` dz_dr = eval_Bderiv(tck=tck, x=rr) # Derivative dxs_dr = unscale_deriv(dy_dx=dz_dr, y=z, y0=xs) # Unscale the derivative xs_fit = scale_data(y=z, y0=xs) # Unscale the best fit # Evaluate the halo flux on the (x,y) grid fh_xy = -dxs_dr * nH * np.exp(-xs_fit * nH) / (2 * np.pi * rr) # If there is a grid zone (x,y)=(0,0), set fh_xy=0 on physical grounds ir0 = np.argwhere(rr == 0) if (ir0.size > 0): i0, j0 = np.argwhere(rr == 0)[0] fh_xy[i0,j0] = 0 # TODO: Is it better to just return x,y instead of xx,yy? # Return the halo flux on the (x,y) grid and the grids themselves return fh_xy, xx, yy #==============================================================================# def halo_flux_enclosed_xy(xs, r, tck, xext, yext, xoff, yoff, nH, F0=1, dx=1, dy=1): """ Return the halo flux enclosed by the rectangular region. """ # Halo flux xy-profile fh_xy, xx, yy = halo_flux_profile_xy(xs=xs, r=r, tck=tck, xext=xext, yext=yext, xoff=xoff, yoff=yoff, nH=nH, F0=F0, dx=dx, dy=dy) # (x,y) grids (1D) x = xx[:,0] y = yy[0,:] # Integrate the halo flux distribution over the rectangular region Fh = np.trapezoid( y=np.trapezoid(y=fh_xy, x=x, axis=0), x=y, axis=0 ) #Fh = np.trapz( y=np.trapz(y=fh_xy, x=x, axis=0), x=y, axis=0 ) return Fh #==============================================================================# def attenuated_flux(xs, nH, F0=1): """ Return the observed specific flux (or flux attenuation factor if `F0=1`). NOTE: To return the attenuated intrinsic flux `FX` due to scattering out of our direct line-of-light to the source, provide `xs` @ `rext=0`. """ F = F0 * np.exp(-xs * nH) return F #==============================================================================# def fmult(xs, r, tck, xext, yext, xoff, yoff, nH, F0=1, dx=1, dy=1): """ Return the multiplicative flux attenuation factor for a rectangular region, which can be offset from the source. """ # Calculate the attenuated source flux (if the region contains the source) xl, xr = xoff-xext, xoff+xext yb, yt = yoff-yext, yoff+yext if ((xl <= 0 <= xr) and (yb <= 0 <= yt)): if (r[0] == 0): FX = attenuated_flux(xs=xs[0], nH=nH, F0=F0) else: raise ValueError(f"The first entry r[0]={r[0]:.1f}, but it must be 0.") else: FX = 0 # Integrate the halo flux distribution over the rectangular region, then Fh = halo_flux_enclosed_xy(xs=xs, r=r, tck=tck, xext=xext, yext=yext, xoff=xoff, yoff=yoff, nH=nH, F0=F0, dx=dx, dy=dy) # Return the multiplicative flux attenuation factor fmult = FX + Fh return fmult #==============================================================================# def xscatxy(fdat, xext, yext, xoff, yoff, zpos, nH, F0=1, dx=1, dy=1): """ Return the xscatXY flux multiplier as a function of photon energy. Parameters: fdat : xscat FITS file (e.g., 'xscat_test.fits') xext : x half-width [arcsec] yext : y half-height [arcsec] xoff : x offset [arcsec] yoff : y offset [arcsec] zpos : Relative position of the dust screen [0,1] nH : Equivalent neutral hydrogen column density [1e22 cm^-2] F0 : Specific flux intrinsic to the source (same units as returned flux) dx : x resolution for recovering halo flux xy-profile [arcsec] dy : x resolution for recovering halo flux xy-profile [arcsec] Notes: - Swift/XRT spatial resolution is 2.357 [arcsec/pixel] - Units of `nH` are [1e22 cm^-2] here only; other funcs it is [cm^-2] - We want `F0=1` for relative measurements (e.g., fmult) """ # Convert `nH` to [cm^-2] nH *= 1e22 # Collect the data data = read_xscat(fdat) rext = data['rext'].to(u.arcsec).value # [nr] xpos = data['xpos'].to(u.dimensionless_unscaled).value # [nz] enrg = data['enrg'].to(u.keV).value # [nE] sigma = data['sigma'].to(u.cm**2).value # [nr,nz,nE] xs_fit = data['sigmafit'].to(u.cm**2).value # [nr,nz,nE] dxs_dr = data['dsigmadr'].to(u.cm**2/u.arcsec).value # [nr,nz,nE] knots = data['knots'] # [nt,nz,nE] coeffs = data['coeffs'] # [nc,nz,nE] degree = data['degree'] # int # Number of grid zones nr = len(rext) nz = len(xpos) nE = len(enrg) nt = len(knots) nc = len(coeffs) # Calculate `fmult` for the rectangular region as a func of photon energy Ex = [5*10**(-6), 0.005, 0.01, 0.015, 0.02, 0.025, 0.03, 0.035, 0.04, 0.045, 0.05, 0.055, 0.06, 0.065, 0.07, 0.075] nEx = len(Ex) fmult_xy = np.zeros(nEx) # TODO: Interpolate `zpos` # iz = np.argmin(np.abs(xpos - zpos)) Emax, Emin = np.max(enrg), np.min(enrg) # Loop through energy for iEx in range(nEx): Sval = np.zeros([nr, nE]) knotval = np.zeros([nt, nE]) coeffval = np.zeros([nc, nE]) for iE in range(nE): for ir in range(nr): Sval[ir, iE] = np.interp(zpos, xpos, sigma[ir, :, iE]) for it in range(nt): knotval[it, iE] = np.interp(zpos, xpos, knots[it, :, iE]) for ic in range(nc): coeffval[ic, iE] = np.interp(zpos, xpos, coeffs[ic, :, iE]) en = Ex[iEx] Svalr = np.zeros(nr) knotvalt = np.zeros(nt) coeffvalc = np.zeros(nc) if en > Emax: Svalr = Sval[:, -1] * (en/Emax)**(-2.0) knotvalt = knotval[:, -1] #* (en/Emax)**(-2.0) coeffvalc = coeffval[:, -1] #* (en/Emax)**(-2.0) elif en < Emin: Svalr = Sval[:, 0] knotvalt = knotval[:, 0] coeffvalc = coeffval[:, 0] else: for ir in range(nr): Svalr[ir] = np.interp(en, enrg, Sval[ir, :]) for it in range(nt): knotvalt[it] = np.interp(en, enrg, knotval[it, :]) for ic in range(nc): coeffvalc[ic] = np.interp(en, enrg, coeffval[ic, :]) # Best-fit results #xs = sigma[:,iz,iE] tck = (knotvalt, coeffvalc, degree) # Rectangular region `fmult` fmult_xy[iEx] = fmult(xs=Svalr, r=rext, tck=tck, xext=xext, yext=yext, xoff=xoff, yoff=yoff, nH=nH, F0=F0, dx=dx, dy=dy) #print(f" E = {enrg[iE]:.3f} keV; fmult = {fmult_xy[iE]:.5f}") # TODO: Extrapolate `fmult` to higher energies... return fmult_xy #==============================================================================# def main(): fdat = '/home/tplohr/proj/SF25-26/xspec/heasoft-6.36/xscatXY/xspec_greg/xscat_test.fits' for line in sys.stdin: line = line.strip() if not line: continue try: payload = json.loads(line) cmd = payload.get("cmd") if cmd == "quit": return if cmd != "calc": reply = {"error": "unknown command"} else: result = xscatxy( # Ex, fdat, payload["xext"], payload["yext"], 0, 0, payload["Xpos"], payload["NH"], ) reply = {"result": result} except Exception as e: reply = {"error": str(e)} sys.stdout.write(json.dumps(reply) + "\n") sys.stdout.flush() if __name__ == "__main__": #main() fdat = '/home/tplohr/proj/SF25-26/xspec/heasoft-6.36/xscatXY/xspec_greg/xscat_test.fits' Ex = np.logspace(-1, 1, 100) fmult = xscatxy(fdat, xext=30*2.357, yext=300*2.357, xoff=0, yoff=0, zpos=0.5, nH=1, F0=1, dx=1, dy=1) print(fmult) #==============================================================================#