import numpy as np
from .globals import FRAME_WIDTH, FRAME_HEIGHT, initializer
from .cython_utils import project_midplane_c
from .camera_funcs import CameraModel
import spiceypy as spice
import matplotlib.pyplot as plt
from skimage import io
import os
import sys
import tqdm
import multiprocessing
[docs]
class FrameletData:
"""Holds information and methods pertaining to all the framelets in the image"""
def __init__(self, metadata: dict, imgfolder: str):
"""Initialize the structure by reading in the metadata and separating the framelets in the image
:param metadata: input image metadata
:param imgfolder: path to the folder where the image is stored
"""
self.nframelets = int(metadata["LINES"] / FRAME_HEIGHT)
# number of RGB frames
self.nframes = int(self.nframelets / 3)
print(f"Found {self.nframes} RGB frames")
start_utc = metadata["START_TIME"]
self.start_et = spice.str2et(start_utc)
fname = metadata["FILE_NAME"].replace("-raw.png", "")
fullimg = plt.imread(os.path.join(imgfolder, f"{fname}-raw.png"))
# ignore color channels in the image
if len(fullimg.shape) == 3:
fullimg = fullimg.mean(axis=-1)
self.sclat = float(metadata["SUB_SPACECRAFT_LATITUDE"])
self.sclon = float(metadata["SUB_SPACECRAFT_LONGITUDE"])
# get the exposure and convert to seconds
self.exposure = (
float(metadata["EXPOSURE_DURATION"].replace("<ms>", "").strip()) / 1.0e3
)
intframe_delay = metadata["INTERFRAME_DELAY"].split(" ")
self.frame_delay = float(intframe_delay[0])
self.load_flatfield()
self.framelets = []
for n in tqdm.tqdm(range(self.nframes), desc="Decompanding"):
for c in range(3):
startrow = 3 * FRAME_HEIGHT * n + c * FRAME_HEIGHT
endrow = 3 * FRAME_HEIGHT * n + (c + 1) * FRAME_HEIGHT
flati = self.flatfield[(c * FRAME_HEIGHT) : ((c + 1) * FRAME_HEIGHT), :]
img = fullimg[startrow:endrow, :]
framei = decompand(img) / 16384
framei = framei / (flati * self.exposure)
self.framelets.append(
Framelet(self.start_et, self.frame_delay, n, c, framei)
)
self.fullimg = np.concatenate(
[frame.rawimg for frame in self.framelets], axis=0
)
[docs]
@classmethod
def from_file(
cls,
start_et: float,
sclat: float,
sclon: float,
frame_delay: float,
exposure: float,
rawimg: np.ndarray,
latitude: np.ndarray,
longitude: np.ndarray,
incidence: np.ndarray,
emission: np.ndarray,
fluxcal: np.ndarray,
coords: np.ndarray,
):
"""Load the frame data from input array
:param start_et: the start of the observation in spacecraft ET
:param sclat: the sub-spacecraft latitude
:param sclon: the sub-spacecraft longitude
:param frame_delay: the inter-frame delay in seconds
:param exposure: the exposure time in seconds
:param rawimg: the raw, decompanded image (shape: nframes, 3, 1648, 128)
:param latitude: the planetographic latitude for each pixel (degrees, shape: nframes, 3, 1648, 128)
:param longitude: the Sys III longitude for each pixel (degrees, shape: nframes, 3, 1648, 128)
:param incidence: the solar incidence angle (radians, shape: nframes, 3, 1648, 128)
:param emission: the surface emission angle wrt the spacecraft (radians, shape: nframes, 3, 1648, 128)
:param fluxcal: the geometric calibration for the flux for each pixel (radians, shape: nframes, 3, 1648, 128)
:param coords: the coordinate of the pixel in the mid-plane frame (radians, shape: nframes, 3, 1648, 128, 2)
"""
self = cls.__new__(cls)
self.start_et = start_et
self.frame_delay = frame_delay
self.sclat = sclat
self.sclon = sclon
self.exposure = exposure
self.nframes = rawimg.shape[0]
self.load_flatfield()
self.framelets = []
for n in range(self.nframes):
for c in range(3):
framelet = Framelet(self.start_et, self.frame_delay, n, c, rawimg[n, c])
framelet.lat = latitude[n, c]
framelet.lon = longitude[n, c]
framelet.incidence = incidence[n, c]
framelet.emission = emission[n, c]
framelet.fluxcal = fluxcal[n, c]
framelet.coords = coords[n, c]
framelet.image = rawimg[n, c] / fluxcal[n, c]
self.framelets.append(framelet)
return self
[docs]
def load_flatfield(self):
# flatfield and gain from Brian Swift's GitHub
# (https://github.com/BrianSwift/JunoCam/tree/master/Juno3D)
self.flatfield = np.array(
io.imread(os.path.dirname(__file__) + "/cal/flatFieldsSmooth12to16.tiff")
)
self.flatfield[self.flatfield == 0] = 1.0
[docs]
def get_backplane(self, num_procs: int) -> None:
"""Retrieve backplane information for all framelets (i.e., get pixel coordinates in the mid-plane frame and also incidence/emission angles)
:param num_procs: number of processors to use for multi-threaded processing
"""
tmid = self.tmid
if num_procs > 1:
inpargs = [(frame, tmid) for frame in self.framelets]
with multiprocessing.Pool(
processes=num_procs, initializer=initializer
) as pool:
try:
with tqdm.tqdm(inpargs, desc="Projecting framelets") as pbar:
self.framelets = pool.starmap(
Framelet.project_to_midplane, pbar
)
pool.close()
except KeyboardInterrupt:
pool.terminate()
sys.exit()
finally:
pool.join()
else:
for frame in tqdm.tqdm(self.framelets):
frame.project_to_midplane(tmid)
[docs]
def update_jitter(self, jitter: float) -> None:
"""Update the jitter in the spacecraft clock for each framelet
:param jitter: the new jitter value in seconds
"""
self.jitter = jitter
for framelet in self.framelets:
framelet.jitter = jitter
@property
def tmid(self) -> float:
"""The mid-plane clock time"""
return np.mean([frame.et for frame in self.framelets])
@property
def rawimage(self) -> np.ndarray:
"""The raw, decompanded image in all the framelets"""
return np.stack([frame.rawimg for frame in self.framelets], axis=0).reshape(
(self.nframes, 3, FRAME_HEIGHT, FRAME_WIDTH)
)
@property
def image(self) -> np.ndarray:
"""The concatenated illumination-corrected image in all the framelets"""
return np.stack([frame.image for frame in self.framelets], axis=0).reshape(
(self.nframes, 3, FRAME_HEIGHT, FRAME_WIDTH)
)
@property
def coords(self) -> np.ndarray:
"""The concatenated coordinate of each pixel in the mid-plane frame for each pixel in the camera frame"""
return np.stack([frame.coords for frame in self.framelets], axis=0).reshape(
(self.nframes, 3, FRAME_HEIGHT, FRAME_WIDTH, 2)
)
@property
def emission(self) -> np.ndarray:
"""The concatenated emission angles for each pixel in the mid-plane frame"""
return np.stack([frame.emission for frame in self.framelets], axis=0).reshape(
(self.nframes, 3, FRAME_HEIGHT, FRAME_WIDTH)
)
@property
def incidence(self) -> np.ndarray:
"""The concatenated incidence angles for each pixel in the mid-plane frame"""
return np.stack([frame.incidence for frame in self.framelets], axis=0).reshape(
(self.nframes, 3, FRAME_HEIGHT, FRAME_WIDTH)
)
@property
def longitude(self) -> np.ndarray:
"""The concatenated Sys III longitude values for each pixel in the mid-plane frame"""
return np.stack([frame.lon for frame in self.framelets], axis=0).reshape(
(self.nframes, 3, FRAME_HEIGHT, FRAME_WIDTH)
)
@property
def latitude(self) -> np.ndarray:
"""The concatenated planetographic latitude value for each pixel in the mid-plane frame"""
return np.stack([frame.lat for frame in self.framelets], axis=0).reshape(
(self.nframes, 3, FRAME_HEIGHT, FRAME_WIDTH)
)
[docs]
class Framelet:
"""Holds information and methods for a single framelet in the JunoCam image"""
def __init__(
self,
start_et: float,
frame_delay: float,
frame_no: int,
color: int,
img: np.ndarray,
):
"""Initialize the framelet with relevant spacecraft data and camera information
:param start_et: the spacecraft clock time for the start of the exposure in ET
:param frame_delay: the inter-frame delay for this camera
:param frame_no: the index of the 3-color subframe (i.e., the 4th framelet is blue frame #2)
:param color: the filter for this framelet (i.e., 0 for blue, 1 for green and 2 for red)
"""
self.start_et = start_et
self.frame_delay = frame_delay
self.frame_no = frame_no
self.color = color
self.rawimg = img
self.jitter = None
self.camera = CameraModel(color)
@property
def et(self) -> float:
"""Get the observation time for this framelet"""
jitter = 0 if self.jitter is None else self.jitter
return (
self.start_et
+ self.camera.time_bias
+ jitter
+ (self.frame_delay + self.camera.iframe_delay) * self.frame_no
)
[docs]
def project_to_midplane(self, tmid: float):
"""Get the backplane information at the mid-plane frame for this framelet. Returns self for multi-processing
:param tmid: the spacecraft clock at the mid-plane frame
"""
coords = np.nan * np.zeros((FRAME_HEIGHT, FRAME_WIDTH, 2))
lat = np.nan * np.zeros((FRAME_HEIGHT, FRAME_WIDTH))
lon = np.nan * np.zeros((FRAME_HEIGHT, FRAME_WIDTH))
incidence = np.nan * np.zeros((FRAME_HEIGHT, FRAME_WIDTH))
emission = np.nan * np.zeros((FRAME_HEIGHT, FRAME_WIDTH))
fluxcal = np.ones((FRAME_HEIGHT, FRAME_WIDTH))
project_midplane_c(
self.et, self.color, tmid, lon, lat, incidence, emission, coords, fluxcal
)
# geometry correction
self.fluxcal = fluxcal
self.coords = coords
self.image = self.rawimg / self.fluxcal
self.lat = lat
self.lon = lon
# emission and incidence angles for lightning correction
self.emission = emission
self.incidence = incidence
return self
# for decompanding -- taken from Kevin Gill's github page
SQROOT = np.load(os.path.join(os.path.dirname(__file__), 'decompanding_table.npy'))
[docs]
def decompand(image: np.ndarray) -> np.ndarray:
"""Decompands the image from the 8-bit in the public release
to the original 12-bit shot by JunoCam
:param image: 8-bit input image
:return: Original 12-bit image
"""
data = np.array(255 * image, dtype=np.uint8)
ny, nx = data.shape
def get_sqrt(x: float) -> float:
return SQROOT[x]
v_get_sqrt = np.vectorize(get_sqrt)
data2 = v_get_sqrt(data)
return data2