import os
import re
import sys
from pathlib import Path
import click
import cv2
import h5py
import numpy as np
import toml
from astropy.convolution import Gaussian2DKernel
from numpy.typing import ArrayLike
from scipy.signal import fftconvolve
from radiosim.logging import setup_logger
__all__ = [
"add_noise",
"adjust_outpath",
"cart2pol",
"check_outpath",
"create_grid",
"load_data",
"pol2cart",
"read_config",
"relativistic_boosting",
"save_sky_distribution_bundle",
"zoom_on_source",
"zoom_out",
]
LOGGER = setup_logger(namespace=__name__, tracebacks_suppress=[click])
[docs]
def create_grid(pixel, bundle_size):
"""
Creates a square 2d grid.
Parameters
----------
pixel: int
number of pixel in x and y
bundle_size: int
number of images in each bundle
Returns
-------
grid: ndarray
2d grid with 1e-10 pixels, X meshgrid, Y meshgrid
"""
x = np.linspace(0, pixel - 1, num=pixel)
y = np.linspace(0, pixel - 1, num=pixel)
X, Y = np.meshgrid(x, y)
grid = np.array([np.zeros(X.shape) + 1e-10, X, Y])
grid = np.repeat(
grid[None, :, :, :],
bundle_size,
axis=0,
)
return grid
[docs]
def relativistic_boosting(theta, beta):
"""
Calculate relativistic boosting factor for a jet.
Parameters
----------
theta: float
angle of the jet in relation to the observer
beta: float
velocity of the jet components
Returns
-------
boost_app: float
boosting factor for the approaching jet
boost_rec: float
boosting factor for the receding jet
"""
gamma = 1 / np.sqrt(1 - beta**2) # Lorentz factor
mu = np.cos(theta)
boost_app = 1 / (gamma * (1 - beta * mu))
boost_rec = 1 / (gamma * (1 + beta * mu))
return boost_app, boost_rec
[docs]
def zoom_on_source(img, comp=None, max_amp=0.01):
"""
Zoom on source to cut out irrelevant area. Shape will stay equal.
Parameters
----------
img: 2D array
Image of the sky used to zoom on
comp: 3D array (n, (img))
Images of the components, same zooming as on img
max_amp: float
Maximal amplitude which will be at the edge of the image
Returns
-------
zoomed_img: ndarray
Image after zooming
zoom_factor: float
Zooming factor
"""
# find farest outside column or row with amplitude > max_amp
mask = img > max_amp
mask_flip = np.flip(mask)
idx_left = np.argmax(np.sum(mask, axis=0) > 0)
idx_right = np.argmax(np.sum(mask_flip, axis=0) > 0)
idx_bottom = np.argmax(np.sum(mask, axis=1) > 0)
idx_top = np.argmax(np.sum(mask_flip, axis=1) > 0)
# print(idx_left, idx_right, idx_bottom, idx_top)
idx = np.min([idx_left, idx_right, idx_bottom, idx_top])
size = img.shape[0]
zoom_factor = size / (size - 2 * idx)
# crop the source
cropped_img = img[idx : size - idx, idx : size - idx]
zoomed_img = cv2.resize(
cropped_img, dsize=(size, size), interpolation=cv2.INTER_LINEAR
)
if comp is not None:
cropped_comp = comp[:, idx : size - idx, idx : size - idx]
zoomed_comp = np.empty_like(comp)
for i, component in enumerate(cropped_comp):
zoomed_comp[i] = cv2.resize(
component, dsize=(size, size), interpolation=cv2.INTER_LINEAR
)
return zoomed_img, zoomed_comp, zoom_factor
return zoomed_img, zoom_factor
[docs]
def zoom_out(img, comp=None, pad_value=0):
"""
Zoom out of an image. Padding edges with zeros.
Parameters
----------
img: 2D array
Image of the sky
comp: 3D array (n, (img))
Images of the components
pad_value: int
Number of pixels to pad around the source
Returns
-------
zoomed_img: ndarray
Image after zooming
zoomed_comp: ndarray
Componets after zooming
"""
if not isinstance(pad_value, int):
pad_value = np.int64(pad_value)
size = img.shape[0]
img = cv2.resize(
np.pad(img, pad_value), dsize=(size, size), interpolation=cv2.INTER_LINEAR
)
if comp is not None:
for component in comp:
component = cv2.resize(
np.pad(component, pad_value),
dsize=(size, size),
interpolation=cv2.INTER_LINEAR,
)
return img, comp
return img
[docs]
def check_outpath(outpath, verbose=True):
"""
Check if outpath exists. Check for existing source_bundle files.
Ask to overwrite or reuse existing files.
Parameters
----------
outpath : str
path to out directory
verbose : bool, optional
Activate or deactive verbose mode. Automatically overwrites
existing files if set to False. Default: True
Returns
-------
sim_sources : bool
flag to enable/disable source simulation routine
"""
path = Path(outpath)
exists = path.exists()
if exists is True:
source = {p for p in path.rglob("*data*.h5") if p.is_file()}
if source:
LOGGER.warning("Found existing source simulations!")
if not verbose or click.confirm(
"Do you really want to overwrite existing files?", abort=False
):
LOGGER.warning("Overwriting existing source simulations!")
[p.unlink() for p in source]
sim_sources = True
return sim_sources
else:
LOGGER.info("Keeping existing source simulations!")
sim_sources = False
sys.exit()
else:
sim_sources = True
else:
Path(path).mkdir(parents=True, exist_ok=False)
sim_sources = True
return sim_sources
[docs]
def read_config(config):
"""
Unpacking of the config file to print the config parameters
Parameters
----------
config: toml-file
toml configuration file with all parameters
Returns
-------
sim_comf: dictionary
unpacked configurations
"""
sim_conf = {}
sim_conf["seed"] = config["general"]["seed"]
sim_conf["mode"] = config["general"]["mode"]
sim_conf["threads"] = config["general"]["threads"]
sim_conf["outpath"] = config["paths"]["outpath"]
sim_conf["training_type"] = config["jet"]["training_type"]
sim_conf["num_jet_components"] = config["jet"]["num_jet_components"]
sim_conf["scaling"] = config["jet"]["scaling"]
sim_conf["num_sources"] = config["survey"]["num_sources"]
sim_conf["class_distribution"] = config["survey"]["class_distribution"]
sim_conf["scale_sources"] = config["survey"]["scale_sources"]
sim_conf["class_ratio"] = config["mojave"]["class_ratio"]
sim_conf["bundles_train"] = config["image_options"]["bundles_train"]
sim_conf["bundles_valid"] = config["image_options"]["bundles_valid"]
sim_conf["bundles_test"] = config["image_options"]["bundles_test"]
sim_conf["bundle_size"] = config["image_options"]["bundle_size"]
sim_conf["img_size"] = config["image_options"]["img_size"]
sim_conf["noise"] = config["image_options"]["noise"]
sim_conf["noise_level"] = config["image_options"]["noise_level"]
return sim_conf
[docs]
def add_noise(image, noise_level):
"""
Used for adding noise.
Parameters
----------
image: 4darray
bundle of images of shape [n, 1, size, size]
noise_level: int
maximum intensity of noise in percent
Returns
-------
image_noised: 4darray
bundle of noised images
"""
def noise_small(kernel, mean=0, std=1):
"""
Create the noise of different kernel sizes
"""
max_noise = np.random.uniform(0, 1, img_shape[0])
noise = (
np.random.normal(mean, std, size=img_shape) * max_noise[:, None, None, None]
)
g_kernel = Gaussian2DKernel(kernel / 2).array[None, None, :]
return fftconvolve(noise, g_kernel, mode="same")
def call_noise(kernels, strengths):
"""
Loop through lists and normalize
"""
noise_out = np.zeros(shape=img_shape)
for kernel, strength in zip(kernels, strengths):
if kernel == 1:
noise = np.random.normal(size=img_shape)
else:
noise = noise_small(kernel)
noise /= np.abs(noise).max() / strength
noise_out += noise
return noise_out
img_shape = image.shape
kernels = [1, img_shape[-1] / 32, img_shape[-1] / 8]
strengths = [0.2, 0.3, 0.5] # have to add up to 1
noise = call_noise(kernels, strengths)
noise_level = (
np.random.uniform(*noise_level, size=image.shape[0])[:, None, None, None] / 100
)
noise /= noise.max(axis=(1, 2, 3))[:, None, None, None]
noise *= image.max(axis=(1, 2, 3))[:, None, None, None] * noise_level
image_noised = noise + image
return image_noised
[docs]
def adjust_outpath(path, option, form="h5"):
"""
Add number to out path when filename already exists.
Parameters
----------
path: str
path to save directory
option: str
additional keyword to add to path
form: str
file extension
Returns
-------
out: str
adjusted path
"""
counter = 0
filename = str(path) + (option + "_{}." + form)
while os.path.isfile(filename.format(counter)):
counter += 1
out = filename.format(counter)
return out
[docs]
def save_sky_distribution_bundle(path, x, y, name_x="x", name_y="y"):
"""
Write images created in analysis to h5 file.
Parameters
----------
path: str
path to save file
x: ndarray
image of the full jet, sum over all components
y: ndarray
images of components or list, depends on train_type
name_x: str
name of the x-data
name_y: str
name of the y-data
"""
with h5py.File(path, "w") as hf:
hf.create_dataset(name_x, data=x)
hf.create_dataset(name_y, data=y)
hf.close()
def _save_mojave_bundle(path: str, data: tuple, data_name: tuple) -> None:
"""
Write MOJAVE simulations created in analysis to h5 file.
Parameters
----------
path: str
path to save file
data : tuple
data to store, e.g. galaxies, classes, coordinates, ...
data_name : tuple
name of the data columns
"""
assert len(data) == len(data_name)
with h5py.File(path, "w") as hf:
for dat, name in zip(data, data_name):
hf.create_dataset(name, data=dat)
def _gen_vlba_obs_position(rng, size: int) -> tuple[float, float]:
ra = rng.uniform(0, 360, size=size)
dec = rng.uniform(-30, 87, size=size)
return ra, dec
def _gen_date(rng, start_date: str, size: int) -> ArrayLike:
from datetime import datetime
from h5py import string_dtype
# define fixed string length for h5py
utf8_type = string_dtype("utf-8", 10)
start_date = np.datetime64(start_date)
now = np.datetime64(datetime.now().date())
delta = now - start_date
dates = start_date + rng.integers(delta.astype(int), size=size)
return dates.astype(utf8_type)
[docs]
def cart2pol(x: float, y: float):
"""
Transforms cartesian to polar coordinates.
Parameters
----------
x: float
x-coordinate
y: float
y-coordinate
Returns
-------
r: float
radius, euclidean distance between (0,0) and (x,y)
phi: float
angle in radian
"""
r = np.sqrt(x**2 + y**2)
phi = np.arctan2(y, x)
return (r, phi)
[docs]
def pol2cart(r: float, phi: float):
"""
Transforms polar to cartesian coordinates.
Parameters
----------
r: float
radius, euclidean distance between (0,0) and (x,y)
phi: float
angle in radian
Returns
-------
x: float
x-coordinate
y: float
y-coordinate
"""
x = r * np.cos(phi)
y = r * np.sin(phi)
return (x, y)
[docs]
def load_data(conf_path, data_type="train", key="x"):
config = toml.load(conf_path)
path = Path(config["paths"]["outpath"])
bundle_paths = np.array([x for x in path.iterdir()])
paths = [
path for path in bundle_paths if re.findall("data*" + data_type, path.name)
]
data = []
for path_test in paths:
df = h5py.File(path_test, "r")
data.extend(df[key])
return data