Source code for radiosim.mojave.mojave

import numpy as np
from joblib import Parallel, delayed
from scipy.stats import expon, skewnorm
from skimage.transform import rotate, swirl

from radiosim.utils import setup_logger
from radiosim.utils.gauss import skewed_gauss, twodgaussian
from radiosim.utils.utils import _gen_date, _gen_vlba_obs_position

__all__ = [
    "add_swirl",
    "create_mojave",
    "gen_compact",
    "gen_jet",
    "gen_one_jet",
    "gen_shocks",
    "gen_two_jet",
]

LOGGER = setup_logger(namespace=__name__)


[docs] def create_mojave(conf, rng): """ Create MOJAVE like sources. Parameters: ----------- conf : loaded conf file Returns: -------- glx : array_like generated sources """ threads = conf.general.threads if not threads: threads = 1 elif not isinstance(threads, int) or threads <= 0: raise ValueError("threads has to be int > 0 or None") size = conf.dataset.img_size # calculate amount of each class to generate per bundle bundle_size = conf.dataset.bundle_size ratio = np.array(conf.mojave.class_ratio) # amount = np.array([n_comact, n_one_jet, n_two_jet]]) amount = np.array((ratio / ratio.sum()) * bundle_size).astype(int) while amount.sum() < bundle_size: amount[amount.argmax()] += 1 jets = [] compact = Parallel(n_jobs=threads)( delayed(gen_compact)(rng=child, size=size) for child in rng.spawn(amount[0]) ) one_jet = Parallel(n_jobs=threads)( delayed(gen_one_jet)(rng=child, size=size) for child in rng.spawn(amount[1]) ) two_jet = Parallel(n_jobs=threads)( delayed(gen_two_jet)(rng=child, size=size) for child in rng.spawn(amount[2]) ) jets = np.array([*compact, *one_jet, *two_jet]) glx_class = [] for glx_type, amt in zip([0, 1, 2], amount): glx_class.extend([glx_type] * amt) glx_class = np.array(glx_class) shuffler = np.arange(bundle_size) rng.shuffle(shuffler) ra, dec = _gen_vlba_obs_position(rng=rng, size=bundle_size) obs_dates = _gen_date(rng=rng, start_date="1995-01-01", size=bundle_size) data = [jets[shuffler], glx_class[shuffler], ra[shuffler], dec[shuffler], obs_dates] data_name = ["galaxies", "galaxy_classes", "RA", "DEC", "date"] return data, data_name
[docs] def gen_jet( size: int, amplitude: float, width: float, length: float, a: float ) -> np.ndarray: """ Generate jet from skewed 2d normal distributiuon. Parameters ---------- size : int length of the square image amplitude : float maximal amplitude of the distribution width : float width of the distribution (perpendicular to the skewed function) length : float length of the distribution a : float skewness parameter Returns ------- jet : array_like jet for source """ jet = skewed_gauss( size=size, x=size / 2, y=size / 2, amp=amplitude, width=width, length=length, a=a, ) jet[np.isclose(jet, 0)] = 0 return jet
[docs] def gen_shocks( jet: np.ndarray, shock_comp: int, length: float, width: float, sx: float, rng: np.random.Generator, ) -> tuple[np.ndarray, tuple, int]: """ Generate equally spaced shocks in jet. Parameters ---------- jet : array_like generated jet without shock shock_comp : int max amount of shocks length : float length of jet width : float width of jet sx : float sx component of main gaussian rng : :class:`~numpy.random.Generator` numpy random generator Returns ------- jet : array_like jet with shock components printout_information : tuple information about each component skipped : int amount of skipped shock components """ ps_orientation = [] ps_amp = [] ps_x = [] ps_y = [] ps_sx = [] ps_sy = [] ps_rot = [] ps_dist = [] ps_a = [] mask = np.copy(jet) mask[np.isclose(mask, 0)] = 0 mask[mask > 0] = 1 mask = mask.astype(bool) size = len(jet) s_length = 2 * length - sx s_dist = s_length / shock_comp s_dist0 = s_dist skipped = 0 for _ in range(shock_comp): # skip randomly if rng.uniform(0, 1) > 2 / 3: skipped += 1 continue s_x = size / 2 + s_dist s_y = size / 2 try: s_amp = jet[int(s_y), int(s_x)] * rng.uniform(0.03, 0.6) except IndexError: # catch if image is too small - must fix properly later continue s_sx = rng.uniform(*np.sort([2, width])) s_sy = rng.uniform(*np.sort([s_sx, length / shock_comp])) s_rot = 0 s_a = rng.uniform(5, 9) jet += skewed_gauss(size, int(s_x), int(s_y), s_amp, s_sx, s_sy, s_a) ps_amp.append(s_amp) ps_x.append(int(s_x)) ps_y.append(int(s_y)) ps_sx.append(s_sx) ps_sy.append(s_sy) ps_rot.append(s_rot) ps_dist.append(s_dist) ps_a.append(s_a) ps_orientation.append("r") s_dist += s_dist0 return ( jet, (ps_orientation, ps_amp, ps_x, ps_y, ps_sx, ps_sy, ps_rot, ps_dist, ps_a), skipped, )
[docs] def add_swirl( jet: np.ndarray, rng: np.random.Generator, first_jet_params: tuple | None = None, ) -> tuple[np.ndarray, tuple]: """ Add swirl distortion to the jet. Parameters ---------- jet : array_like generated jet rng : :class:`~numpy.random.Generator` numpy random generator first_jet_params : tuple | None, default: None Use when generate two jet sources. Tuple of parameters to generate similar swirl for second jet. Contains the returned parameters from the first jet. Returns ------- swirled_jet : array_like swirl distorted input jet parameters : tuple parameters of the aplied swirl distortion """ size = len(jet) if not first_jet_params: strength = rng.normal(loc=0, scale=0.2) radius = rng.uniform(50, 200) center = np.array([size / 2, size / 2]) else: deviation = 1 + rng.normal(loc=0, scale=0.1, size=2) strength = first_jet_params[0] * deviation[0] radius = first_jet_params[1] * deviation[1] center = first_jet_params[2] swirled_jet = swirl(jet, center=center, strength=strength, radius=radius) return swirled_jet, (strength, radius, center)
[docs] def gen_two_jet( rng: int, size: int = 1024, printout: bool = False, ) -> np.ndarray: """ Generate a two jet galaxy. Parameters ---------- rng : :class:`~numpy.random.Generator` | int numpy random generator or seed to generate random number generator size : int, default: 1024 size of the galaxy to be genrated printout : bool, default: False print out log information Returns ------- glx : array_like simulated two jet source """ if isinstance(rng, int): rng = np.random.default_rng(seed=rng) amp_params = (1.715e-4, 1.985e-2) width_params = (4.37, 22.71, 16.85) length_params = (8.62, 69.60, 54.73) left_deviation = 1 + rng.normal( loc=0, scale=0.1, size=5 ) # amp, length, width, strength, radius amp = expon.rvs(*amp_params, random_state=rng) r_width = skewnorm.rvs(*width_params, random_state=rng) / 10 # 10 r_length = skewnorm.rvs(*length_params, random_state=rng) / 4 # 10 r_jet_amp = amp * rng.power(0.7) l_width = r_width * left_deviation[2] l_length = r_length * left_deviation[1] l_jet_amp = r_jet_amp * left_deviation[0] dimensions = np.sort([r_width / 2, r_length / 5, l_width / 2, l_length / 5]) sx = rng.uniform(dimensions.min(), dimensions.max()) * 1.5 # too large sy = rng.uniform(dimensions.min(), dimensions.max()) * 1.5 # redraw if gaussian is too elliptical while np.any([sx / sy < 1 / 2, sx / sy > 2]): sy = rng.uniform(dimensions.min(), dimensions.max()) * 1.5 rot = rng.uniform(0, 2 * np.pi) a = 4 r_jet = gen_jet(size, r_jet_amp, r_width, r_length, a) l_jet = gen_jet(size, l_jet_amp, l_width, l_length, a) shock_comp_r = int(np.round(rng.power(0.5), decimals=1) * 10) shock_comp_l = int(np.round(rng.power(0.5), decimals=1) * 10) if shock_comp_r > 0: r_jet, r_printout, r_skipped = gen_shocks( r_jet, shock_comp_r, r_length, r_width, sx, rng ) if shock_comp_l > 0: l_jet, l_printout, l_skipped = gen_shocks( l_jet, shock_comp_l, l_length, l_width, sx, rng ) r_jet, r_swirl_comp = add_swirl(r_jet, rng=rng) l_jet, l_swirl_comp = add_swirl(l_jet, rng=rng, first_jet_params=r_swirl_comp) glx = r_jet + np.flip(l_jet) x = size / 2 y = size / 2 gauss = twodgaussian([amp, size / 2, size / 2, sx, sy, rot], size) glx += gauss angle = rng.uniform(0, 360) glx = rotate(glx, angle=angle) glx[np.isclose(glx, 0)] = 0 if printout: print("left Jet:") print( f"width = {l_width:.3f}, length = {l_length:.3f}, \ jet_amp = {l_jet_amp:.3f}, a = {a:.3f}" ) print("right Jet:") print( f"width = {r_width:.3f}, length = {r_length:.3f}, \ jet_amp = {r_jet_amp:.3f}, a = {a:.3f}" ) print("Source:") print( f"amp = {amp:.3f}, x = {x:.0f}, y = {y:.0f}, sx = {sx:.3f}, \ sy = {sy:.3f}, rot = {rot:.3f}" ) if shock_comp_r > 0: print( f"Right Shock, {shock_comp_r} Components, \ {r_skipped} skipped:" ) for _, amp, x, y, sx, sy, rot, dist, _ in zip(*r_printout): print( f"amp = {amp:.6f}, x = {x:.0f}, y = {y:.0f}, \ sx = {sx:.3f}, sy = {sy:.3f}, rot = {rot:.3f}, \ dist = {dist:.3f}" ) if shock_comp_l > 0: print( f"Left Shock, {shock_comp_l} Components, \ {l_skipped} skipped:" ) for _, amp, x, y, sx, sy, rot, dist, _ in zip(*l_printout): print( f"amp = {amp:.6f}, x = {x:.0f}, y = {y:.0f}, \ sx = {sx:.3f}, sy = {sy:.3f}, rot = {rot:.3f}, \ dist = {dist:.3f}" ) print("Swirl:") print( f"Right: strength = {r_swirl_comp[0]:.3f}, \ radius = {r_swirl_comp[1]:.3f}, center = {r_swirl_comp[2]}" ) print( f"Left: strength = {l_swirl_comp[0]:.3f}, \ radius = {l_swirl_comp[1]:.3f}, center = {l_swirl_comp[2]}" ) return glx
[docs] def gen_one_jet(rng: int, size: int = 1024, printout: bool = False) -> np.ndarray: """ Generate a one jet galaxy. Parameters ---------- rng : :class:`~numpy.random.Generator` | int numpy generator or seed to generate random number generator size : int, default: 1024 size of the galaxy to be genrated printout : bool, default: False print out debug information Returns ------- glx : ndarray simulated one jet source """ if isinstance(rng, int): rng = np.random.default_rng(seed=rng) amp_params = (1.715e-4, 1.985e-2) width_params = (4.37, 22.71, 16.85) length_params = (8.62, 69.60, 54.73) amp = expon.rvs(*amp_params, random_state=rng) width = skewnorm.rvs(*width_params, random_state=rng) / 12 # 10 length = skewnorm.rvs(*length_params, random_state=rng) / 6.5 # 10 jet_amp = amp * rng.power(0.7) dimensions = np.sort([width / 2, length / 5]) sx = rng.uniform(*dimensions) * 1.5 # too large sy = rng.uniform(*dimensions) * 1.5 # redraw if gaussian is too elliptical while np.any([sx / sy < 1 / 2, sx / sy > 2]): sy = rng.uniform(dimensions.min(), dimensions.max()) * 1.5 rot = rng.uniform(0, 2 * np.pi) a = 4 jet = gen_jet(size, jet_amp, width, length, a) shock_comp = int(np.round(rng.power(0.5), decimals=1) * 10) if shock_comp > 0: jet, shock_printout, skipped = gen_shocks( jet, shock_comp, length, width, sx, rng ) jet, swirl_comp = add_swirl(jet, rng=rng) x = size / 2 y = size / 2 gauss = twodgaussian([amp, size / 2, size / 2, sx, sy, rot], size) glx = jet + gauss angle = rng.uniform(0, 360) glx = rotate(glx, angle=angle) glx[np.isclose(glx, 0)] = 0 if printout: print("Skewed dist:") print( f"width = {width:.3f}, length = {length:.3f}, \ jet_amp = {jet_amp:.3f}, a = {a:.3f}" ) print("Source:") print( f"amp = {amp:.3f}, x = {x:.0f}, y = {y:.0f}, sx = {sx:.3f}, \ sy = {sy:.3f}, rot = {rot:.3f}" ) if shock_comp > 0: print(f"{shock_comp} shock components, {skipped} skipped:") for _, amp, x, y, sx, sy, rot, dist, _ in zip(shock_printout): print( f"amp = {amp:.6f}, x = {x:.0f}, y = {y:.0f}, \ sx = {sx:.3f}, sy = {sy:.3f}, rot = {rot:.3f}, \ dist = {dist:.3f}" ) print("Swirl:") print( f"strength = {swirl_comp[0]:.3f}, \ radius = {swirl_comp[1]:.3f}, center = {swirl_comp[2]}" ) return glx
[docs] def gen_compact(rng: int, size: int = 1024, printout: bool = False) -> np.ndarray: """Generate a compact galaxy. Parameters ---------- rng : :class:`~numpy.random.Generator` | int numpy generator or seed to generate random number generator. size : int, default: 1024 size of the galaxy to be genrated printout : bool, default: False print out debug information. Returns ------- glx : ndarray Simulated compact source. """ if isinstance(rng, int): rng = np.random.default_rng(seed=rng) amp_params = (5.54e-05, 0.02) width_params = (8.41, 3.04, 3.46) length_params = (6.04, 3.02, 2.59) ncomps = rng.integers(1, 4) # draw amount of components outflow = rng.uniform() > 0.5 # draw if outflow of the jet is generated amp = expon.rvs(*amp_params, random_state=rng) jet_amp = amp * rng.normal(2 / 3, 0.05) y = np.arange(size) width = skewnorm.rvs(*width_params, random_state=rng) * 0.8 length = skewnorm.rvs(*length_params, random_state=rng) * 1 if outflow: a = 4 glx = gen_jet(size, jet_amp, width, length, a) glx, swirl_comp = add_swirl(glx, rng=rng) else: glx = np.zeros((size, size)) dimensions = np.sort([width / 2, length / 4]) dimensions[0] *= 1.2 dimensions = np.sort(dimensions) i = 0 comp_amp = [] comp_x = [] comp_y = [] comp_sx = [] comp_sy = [] comp_rot = [] vertical = True while i < ncomps: deviation = 1 + rng.normal(loc=0, scale=0.2, size=2) if i != 0: x = rng.uniform(-5, 5) y = rng.uniform(-5, 5) else: x = 0 y = 0 if i == 0: sx = rng.uniform(*dimensions) * 1.7 # too large j = 0 while sx > 15: if j > 10: sx /= 5 break sx = rng.uniform(*dimensions) * 1.7 j += 1 sy = rng.uniform(*dimensions) * 1.7 j = 0 while sy > 15: if j > 10: sy /= 5 break sy = rng.uniform(*dimensions) * 1.7 j += 1 j = 0 while np.any([sx / sy < 0.33, sx / sy > 3]) or np.all( [sx / sy > 0.75, sx / sy < 1.33] ): if j > 10: sy = sx * rng.choice([rng.uniform(0.4, 0.8), rng.uniform(1.2, 2.5)]) if sy > 15: sy /= 5 break sy = rng.uniform(*dimensions) * 1.7 k = 0 while sy > 15: if k > 10: sy /= 5 break sy = rng.uniform(*dimensions) * 1.7 k += 1 j += 1 else: sx = comp_sx[-1] * deviation[0] sy = comp_sy[-1] * deviation[1] if i == 0: vertical = sx > sy try: rot = rng.uniform(comp_rot[-1] - 0.1, comp_rot[-1] + 0.1) except IndexError: if vertical: rot = rng.uniform(-np.pi / 4, np.pi / 4) else: rot = rng.uniform(-np.pi / 4 + np.pi / 2, np.pi / 4 + np.pi / 2) c_amp = amp * rng.uniform(0.5, 1.2) gauss = twodgaussian([c_amp, size / 2 - x, size / 2 - y, sx, sy, rot], size) glx += gauss comp_amp.append(c_amp) comp_x.append(size / 2 - x) comp_y.append(size / 2 - y) comp_sx.append(sx) comp_sy.append(sy) comp_rot.append(rot) i += 1 angle = rng.uniform(0, 360) glx = rotate(glx, angle=angle) shift = size // 2 - np.argwhere(glx == glx.max())[0] glx = np.roll(glx, shift=shift, axis=[0, 1]) glx[np.isclose(glx, 0)] = 0 glx /= glx.max() glx *= amp if printout: if outflow: print("Skewed dist:") print( f"width = {width:.3f}, " "length = {length:.3f}, " "jet_amp = {jet_amp:.3f}, a = {a:.3f}" ) print("Swirl:") print( f"strength = {swirl_comp[0]:.3f}," "radius = {swirl_comp[1]:.3f}, " "center = {swirl_comp[2]}" ) print("Sources:") for cx, cy, csx, csy, cr in zip(comp_x, comp_y, comp_sx, comp_sy, comp_rot): print( f"amp = {amp:.3f}," f"x = {cx:.0f}, " f"y = {cy:.0f}, " f"sx = {csx:.3f}, " f"sy = {csy:.3f}, " f"rot = {cr:.3f}" ) return glx