Librerias¶
In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
import numpy as np
from lib.LCWavelet import *
import os
from tqdm import tqdm
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, Dataset
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
import optuna
Arquitectura del modelo¶
In [2]:
class ShallueModel(nn.Module):
def __init__(self, global_size=2001, local_size=201, num_classes=2, wavelet_levels=[]):
super(ShallueModel, self).__init__()
self.global_size = global_size
self.local_size = local_size
self.num_classes = num_classes
if num_classes == 1:
print("Binary classification, sigmoid activation will be used.")
elif num_classes > 1:
print("Multi-class classification, softmax activation will be used.")
c_in = 2
if len(wavelet_levels) > 0:
c_in = len(wavelet_levels) * 2
self.conv_global = nn.Sequential(
nn.Conv1d(c_in, 16, 5),
nn.ReLU(),
nn.Conv1d(16, 16, 5),
nn.ReLU(),
nn.MaxPool1d(kernel_size=5, stride=2),
nn.Conv1d(16, 32, 5),
nn.ReLU(),
nn.Conv1d(32, 32, 5),
nn.ReLU(),
nn.MaxPool1d(kernel_size=5, stride=2),
nn.Conv1d(32, 64, 5),
nn.ReLU(),
nn.Conv1d(64, 64, 5),
nn.ReLU(),
nn.MaxPool1d(kernel_size=5, stride=2),
nn.Conv1d(64, 128, 5),
nn.ReLU(),
nn.Conv1d(128, 128, 5),
nn.ReLU(),
nn.MaxPool1d(kernel_size=5, stride=2),
nn.Conv1d(128, 256, 5),
nn.ReLU(),
nn.Conv1d(256, 256, 5),
nn.ReLU(),
nn.MaxPool1d(kernel_size=5, stride=2),
)
self.conv_local = nn.Sequential(
nn.Conv1d(c_in, 16, 5),
nn.ReLU(),
nn.Conv1d(16, 16, 5),
nn.ReLU(),
nn.MaxPool1d(kernel_size=7, stride=2),
nn.Conv1d(16, 32, 5),
nn.ReLU(),
nn.Conv1d(32, 32, 5),
nn.ReLU(),
nn.MaxPool1d(kernel_size=7, stride=2),
)
# Calcular automáticamente el número de features resultantes de la concatenación
with torch.no_grad():
dummy_global = torch.zeros(1, c_in, self.global_size)
dummy_local = torch.zeros(1, c_in, self.local_size)
out_global = self.conv_global(dummy_global)
out_local = self.conv_local(dummy_local)
# Flatten cada salida y sumar sus dimensiones
num_features = out_global.view(1, -1).size(1) + out_local.view(1, -1).size(1)
print("Número de features concatenados:", num_features)
self.fc = nn.Sequential(
nn.Linear(num_features, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, num_classes)
)
def forward(self, inputs):
global_feat = self.conv_global(inputs[0])
local_feat = self.conv_local(inputs[1])
global_feat = global_feat.view(global_feat.size(0), -1)
local_feat = local_feat.view(local_feat.size(0), -1)
# Concatenación de todas las ramas
x = torch.cat((global_feat, local_feat), dim=1)
x = self.fc(x)
if self.num_classes > 1:
return F.softmax(x, dim=1)
else:
return torch.sigmoid(x)
class ShallueModel2D(nn.Module):
def __init__(self, global_size=2001, local_size=201, num_classes=2, wavelet_levels=[]):
super().__init__()
self.global_size = global_size
self.local_size = local_size
self.num_classes = num_classes
if num_classes == 1:
print("Binary classification → sigmoid output")
else:
print("Multi-class classification → softmax output")
if len(wavelet_levels) > 0:
print(f"Wavelet levels: {wavelet_levels}")
c_in = len(wavelet_levels) * 2
else:
c_in = 2
# --------------------------------------------------
# Branch para la señal global, reformulada como "imagen" 2×L
# --------------------------------------------------
self.conv_global = nn.Sequential(
# combina las dos “filas” (par/impar) en la primera convolución
nn.Conv2d(1, 16, kernel_size=(c_in//2, 5)),
nn.ReLU(),
nn.Conv2d(16, 16, kernel_size=(1, 5)),
nn.ReLU(),
nn.MaxPool2d(kernel_size=(1, 5), stride=(1, 2)),
nn.Conv2d(16, 32, kernel_size=(c_in//2, 5)),
nn.ReLU(),
nn.Conv2d(32, 32, kernel_size=(1, 5)),
nn.ReLU(),
nn.MaxPool2d(kernel_size=(1, 5), stride=(1, 2)),
nn.Conv2d(32, 64, kernel_size=(1, 5)),
nn.ReLU(),
nn.Conv2d(64, 64, kernel_size=(1, 5)),
nn.ReLU(),
nn.MaxPool2d(kernel_size=(1, 5), stride=(1, 2)),
nn.Conv2d(64, 128, kernel_size=(1, 5)),
nn.ReLU(),
nn.Conv2d(128, 128, kernel_size=(1, 5)),
nn.ReLU(),
nn.MaxPool2d(kernel_size=(1, 5), stride=(1, 2)),
nn.Conv2d(128, 256, kernel_size=(1, 5)),
nn.ReLU(),
nn.Conv2d(256, 256, kernel_size=(1, 5)),
nn.ReLU(),
nn.MaxPool2d(kernel_size=(1, 5), stride=(1, 2)),
)
# --------------------------------------------------
# Branch para la señal local, reformulada como "imagen" 2×L
# --------------------------------------------------
self.conv_local = nn.Sequential(
nn.Conv2d(1, 16, kernel_size=(c_in//2, 5)),
nn.ReLU(),
nn.Conv2d(16, 16, kernel_size=(1, 5)),
nn.ReLU(),
nn.MaxPool2d(kernel_size=(1, 7), stride=(1, 2)),
nn.Conv2d(16, 32, kernel_size=(c_in//2, 5)),
nn.ReLU(),
nn.Conv2d(32, 32, kernel_size=(1, 5)),
nn.ReLU(),
nn.MaxPool2d(kernel_size=(1, 7), stride=(1, 2)),
)
# --------------------------------------------------
# Calcular automáticamente cuántas características salen tras conv_global + conv_local
# --------------------------------------------------
with torch.no_grad():
# Dummy de forma [batch=1, canales=1, altura=2, ancho=global_size]
dg = torch.zeros(1, 1, c_in, self.global_size)
dl = torch.zeros(1, 1, c_in, self.local_size)
og = self.conv_global(dg)
ol = self.conv_local(dl)
num_features = og.view(1, -1).size(1) + ol.view(1, -1).size(1)
print(f"Número de features concatenados: {num_features}")
# --------------------------------------------------
# Fully connected
# --------------------------------------------------
self.fc = nn.Sequential(
nn.Linear(num_features, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, self.num_classes)
)
def forward(self, inputs):
# inputs: tuple (global, local), cada uno [B, 2, L]
g, l = inputs
# convertimos a [B, 1, 2, L] para Conv2d
g = g.unsqueeze(1)
l = l.unsqueeze(1)
# ramas convolucionales
g_feat = self.conv_global(g)
l_feat = self.conv_local(l)
# aplanar y concatenar
g_feat = g_feat.view(g_feat.size(0), -1)
l_feat = l_feat.view(l_feat.size(0), -1)
x = torch.cat((g_feat, l_feat), dim=1)
# fully connected
x = self.fc(x)
# activación final
if self.num_classes > 1:
return F.softmax(x, dim=1)
else:
# para binary: devuelve [B, 1] con probabilidad
return torch.sigmoid(x)
class ShallueModel2DNorm(nn.Module):
def __init__(self, global_size=2001, local_size=201, num_classes=2, wavelet_levels=[]):
super().__init__()
self.global_size = global_size
self.local_size = local_size
self.num_classes = num_classes
if num_classes == 1:
print("Binary classification → sigmoid output")
else:
print("Multi-class classification → softmax output")
if len(wavelet_levels) > 0:
print(f"Wavelet levels: {wavelet_levels}")
c_in = len(wavelet_levels) * 2
else:
c_in = 2
# --------------------------------------------------
# Branch para la señal global, reformulada como "imagen" 2×L
# --------------------------------------------------
self.conv_global = nn.Sequential(
# combina las dos “filas” (par/impar) en la primera convolución
nn.Conv2d(1, 16, kernel_size=(2, 5)),
nn.ReLU(),
nn.BatchNorm2d(16),
nn.Conv2d(16, 16, kernel_size=(1, 5)),
nn.ReLU(),
nn.BatchNorm2d(16),
nn.MaxPool2d(kernel_size=(1, 5), stride=(1, 2)),
nn.Conv2d(16, 32, kernel_size=(1, 5)),
nn.ReLU(),
nn.BatchNorm2d(32),
nn.Conv2d(32, 32, kernel_size=(1, 5)),
nn.ReLU(),
nn.BatchNorm2d(32),
nn.MaxPool2d(kernel_size=(1, 5), stride=(1, 2)),
nn.Conv2d(32, 64, kernel_size=(1, 5)),
nn.ReLU(),
nn.BatchNorm2d(64),
nn.Conv2d(64, 64, kernel_size=(1, 5)),
nn.ReLU(),
nn.BatchNorm2d(64),
nn.MaxPool2d(kernel_size=(1, 5), stride=(1, 2)),
nn.Conv2d(64, 128, kernel_size=(1, 5)),
nn.ReLU(),
nn.BatchNorm2d(128),
nn.Conv2d(128, 128, kernel_size=(1, 5)),
nn.ReLU(),
nn.BatchNorm2d(128),
nn.MaxPool2d(kernel_size=(1, 5), stride=(1, 2)),
nn.Conv2d(128, 256, kernel_size=(1, 5)),
nn.ReLU(),
nn.BatchNorm2d(256),
nn.Conv2d(256, 256, kernel_size=(1, 5)),
nn.ReLU(),
nn.BatchNorm2d(256),
nn.MaxPool2d(kernel_size=(1, 5), stride=(1, 2)),
)
# --------------------------------------------------
# Branch para la señal local, reformulada como "imagen" 2×L
# --------------------------------------------------
self.conv_local = nn.Sequential(
nn.Conv2d(1, 16, kernel_size=(2, 5)),
nn.ReLU(),
nn.BatchNorm2d(16),
nn.Conv2d(16, 16, kernel_size=(1, 5)),
nn.ReLU(),
nn.BatchNorm2d(16),
nn.MaxPool2d(kernel_size=(1, 7), stride=(1, 2)),
nn.Conv2d(16, 32, kernel_size=(1, 5)),
nn.ReLU(),
nn.BatchNorm2d(32),
nn.Conv2d(32, 32, kernel_size=(1, 5)),
nn.ReLU(),
nn.BatchNorm2d(32),
nn.MaxPool2d(kernel_size=(1, 7), stride=(1, 2)),
)
# --------------------------------------------------
# Calcular automáticamente cuántas características salen tras conv_global + conv_local
# --------------------------------------------------
with torch.no_grad():
# Dummy de forma [batch=1, canales=1, altura=2, ancho=global_size]
dg = torch.zeros(1, 1, c_in, self.global_size)
dl = torch.zeros(1, 1, c_in, self.local_size)
og = self.conv_global(dg)
ol = self.conv_local(dl)
num_features = og.view(1, -1).size(1) + ol.view(1, -1).size(1)
print(f"Número de features concatenados: {num_features}")
# --------------------------------------------------
# Fully connected
# --------------------------------------------------
self.fc = nn.Sequential(
nn.Linear(num_features, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, self.num_classes)
)
def forward(self, inputs):
# inputs: tuple (global, local), cada uno [B, 2, L]
g, l = inputs
# convertimos a [B, 1, 2, L] para Conv2d
g = g.unsqueeze(1)
l = l.unsqueeze(1)
# ramas convolucionales
g_feat = self.conv_global(g)
l_feat = self.conv_local(l)
# aplanar y concatenar
g_feat = g_feat.view(g_feat.size(0), -1)
l_feat = l_feat.view(l_feat.size(0), -1)
x = torch.cat((g_feat, l_feat), dim=1)
# fully connected
x = self.fc(x)
# activación final
if self.num_classes > 1:
return F.softmax(x, dim=1)
else:
# para binary: devuelve [B, 1] con probabilidad
return torch.sigmoid(x)
In [3]:
class SkipLayer(nn.Module):
def __init__(self,
in_channels,
out_channels,
kernel_size=(1,5),
stride=1,
padding=(0,2),
conv_size=2,
pool_kernel=(1,5),
pool_stride=(1,2)):
"""
- in_channels/out_channels: canales antes y después de la rama conv.
- kernel_size, stride, padding: parámetros de cada Conv2d.
- conv_size: cuántas conv+BN+ReLU encadenas antes del pool.
- pool_kernel/stride: parámetros del MaxPool2d.
"""
super().__init__()
# 1) Rama convolucional
layers = []
c_in = in_channels
for i in range(conv_size):
layers += [
nn.Conv2d(c_in, out_channels,
kernel_size=kernel_size if i==0 else (1, kernel_size[1]),
stride=stride,
padding=padding if i==0 else (0, padding[1])),
# nn.BatchNorm2d(out_channels, track_running_stats=False),
nn.ReLU()
]
c_in = out_channels
layers += [
nn.MaxPool2d(kernel_size=pool_kernel, stride=pool_stride)
]
self.conv_branch = nn.Sequential(*layers)
# 2) Rama identidad: same pool + conv1x1 para ajustar canales
self.id_branch = nn.Sequential(
nn.MaxPool2d(kernel_size=pool_kernel, stride=pool_stride),
nn.Conv2d(in_channels, out_channels, kernel_size=1),
# nn.BatchNorm2d(out_channels, track_running_stats=False),
)
def forward(self, x):
out = self.conv_branch(x) # → [B, out_channels, H_out, W_out]
idt = self.id_branch(x) # → [B, out_channels, H_out, W_out]
return F.relu(out + idt)
class ShallueModel2DSkip(nn.Module):
def __init__(self, global_size=2001, local_size=201,
num_classes=2, wavelet_levels=[]):
super().__init__()
c_in = len(wavelet_levels)*2 if wavelet_levels else 2
self.num_classes = num_classes
if num_classes == 1:
print("Binary classification → sigmoid output")
else:
print("Multi-class classification → softmax output")
print(f"Wavelet levels: {wavelet_levels}")
print(f"c_in: {c_in}")
# Branch global
self.conv_global = nn.Sequential(
# kernel_height=c_in para abarcar las filas, kernel_width=5
SkipLayer(1, 16,
kernel_size=(c_in, 5),
padding=(0,2),
conv_size=2,
pool_kernel=(1,5),
pool_stride=(1,2)),
SkipLayer(16, 32,
kernel_size=(1, 5),
padding=(0,2),
conv_size=2,
pool_kernel=(1,5),
pool_stride=(1,2)),
SkipLayer(32, 64,
kernel_size=(1, 5),
padding=(0,2),
conv_size=2,
pool_kernel=(1,5),
pool_stride=(1,2)),
SkipLayer(64, 128,
kernel_size=(1, 5),
padding=(0,2),
conv_size=2,
pool_kernel=(1,5),
pool_stride=(1,2)),
SkipLayer(128, 256,
kernel_size=(1, 5),
padding=(0,2),
conv_size=2,
pool_kernel=(1,5),
pool_stride=(1,2))
)
# Branch local (idéntico pero con local_size)
self.conv_local = nn.Sequential(
SkipLayer(1, 16,
kernel_size=(c_in, 5),
padding=(0,2),
conv_size=2,
pool_kernel=(1,7),
pool_stride=(1,2)),
SkipLayer(16, 32,
kernel_size=(1, 5),
padding=(0,2),
conv_size=2,
pool_kernel=(1,7),
pool_stride=(1,2)),
)
# Calcular num_features como antes
for m in self.modules():
# desactiva gradientes en todas las capas
for p in m.parameters():
p.requires_grad_(False)
with torch.no_grad():
dg = torch.zeros(1, 1, c_in, global_size)
dl = torch.zeros(1, 1, c_in, local_size)
og = self.conv_global(dg)
ol = self.conv_local(dl)
num_features = og.view(1, -1).size(1) + ol.view(1, -1).size(1)
# ahora reactiva los gradientes
for m in self.modules():
for p in m.parameters():
p.requires_grad_(True)
print(f"Número de features concatenados: {num_features}")
self.fc = nn.Sequential(
nn.Linear(num_features, 512), nn.ReLU(),
nn.Linear(512, 256), nn.ReLU(),
nn.Linear(256, 64), nn.ReLU(),
nn.Linear(64, num_classes)
)
def forward(self, inputs):
g, l = inputs # cada uno [B, C_in, L]
g = g.unsqueeze(1) # → [B, 1, C_in, L]
l = l.unsqueeze(1)
g_feat = self.conv_global(g)
l_feat = self.conv_local(l)
# aplanar y concatenar
g_feat = g_feat.view(g_feat.size(0), -1)
l_feat = l_feat.view(l_feat.size(0), -1)
x = torch.cat((g_feat, l_feat), dim=1)
x = self.fc(x)
if self.num_classes > 1:
return F.softmax(x, dim=1)
else:
return torch.sigmoid(x)
Cargado de datos¶
In [4]:
def load_data(path, file_name):
lc = LightCurveWaveletGlobalLocalCollection.from_pickle(path + file_name)
try:
getattr(lc, 'levels')
except AttributeError:
lc.levels = [1, 2, 3, 4]
return lc
path='all_data_2025_dmey/'
files = os.listdir(path)
kepler_files = [f for f in files if f.endswith('.pickle')]
light_curves = []
for file in tqdm(kepler_files, desc='Loading data'):
light_curves.append(load_data(path, file))
Loading data: 100%|██████████| 9211/9211 [01:27<00:00, 105.29it/s]
Separar entre los confirmados y candidatos¶
In [5]:
candidates = [lc for lc in light_curves if lc.headers['class'] == 'CANDIDATE']
print("Número de candidatos:", len(candidates))
confirmed = [lc for lc in light_curves if lc.headers['class'] == 'CONFIRMED' or lc.headers['class'] == 'FALSE POSITIVE']
print("Número de confirmados:", len(confirmed))
classes = {'FALSE POSITIVE': 0, 'CONFIRMED': 1}
print("Clases:", classes)
Número de candidatos: 1955
Número de confirmados: 7256
Clases: {'FALSE POSITIVE': 0, 'CONFIRMED': 1}
In [6]:
global_odd = []
global_even = []
local_odd = []
local_even = []
wl_series = []
labels = []
for lc in tqdm(confirmed, desc='Processing light curves'):
global_odd.append(lc.pliegue_impar_global._light_curve.flux.value)
global_even.append(lc.pliegue_par_global._light_curve.flux.value)
local_odd.append(lc.pliegue_impar_local._light_curve.flux.value)
local_even.append(lc.pliegue_par_local._light_curve.flux.value)
# Extraer los niveles de wavelet
wl_series.append({
'global_odd': lc.pliegue_impar_global._lc_w_collection,
'global_even': lc.pliegue_par_global._lc_w_collection,
'local_odd': lc.pliegue_impar_local._lc_w_collection,
'local_even': lc.pliegue_par_local._lc_w_collection
})
# Convertir la clase a un número entero
labels.append(classes[lc.headers['class']])
print("Número de datos:", len(global_odd))
print('Elementos de cada clase:', {k: labels.count(k) for k in set(labels)})
Processing light curves: 100%|██████████| 7256/7256 [00:00<00:00, 105092.23it/s]
Número de datos: 7256
Elementos de cada clase: {0: 4517, 1: 2739}
In [9]:
array = wl_series[2]['global_even'][0][0]
plt.figure(figsize=(10, 5))
plt.plot(array, label='Global Odd')
plt.title('Wavelet Transform - Global Odd')
plt.xlabel('Time')
plt.ylabel('Flux')
Out[9]:
Text(0, 0.5, 'Flux')
Separar las muestras en train y test¶
In [10]:
def create_2D(levels):
"""Crea una lista de diccionarios con las series temporales globales y locales en 2D
Args:
levels (list): Lista de niveles de wavelet, si es 0 se usa la señal original
Returns:
list: Lista de diccionarios con las series temporales globales y locales
"""
items = []
for i in range(len(wl_series)): # recorre todos los datos
global_series = []
local_series = []
for l in levels:
if l == 0:
global_series.append(global_odd[i])
global_series.append(global_even[i])
local_series.append(local_odd[i])
local_series.append(local_even[i])
else:
global_series.append(wl_series[i]['global_odd'][l-1][0])
global_series.append(wl_series[i]['global_even'][l-1][0])
local_series.append(wl_series[i]['local_odd'][l-1][0])
local_series.append(wl_series[i]['local_even'][l-1][0])
global_series = np.array(global_series)
local_series = np.array(local_series)
item = {
'global': global_series,
'local': local_series,
'label': labels[i]
}
items.append(item)
return items
In [11]:
wavelet_levels = [1, 2, 3, 4] # [0, 1, 2, 3, 4]
items = create_2D(wavelet_levels)
print(items[0]['global'].shape)
print(items[0]['local'].shape)
train, test = train_test_split(items, test_size=0.2, random_state=33)
val, test = train_test_split(test, test_size=0.5, random_state=33)
# train, test = train_test_split(items, test_size=0.3)
train_global = torch.tensor([item['global'] for item in train])
train_local = torch.tensor([item['local'] for item in train])
train_labels = torch.tensor([item['label'] for item in train])
val_global = torch.tensor([item['global'] for item in val])
val_local = torch.tensor([item['local'] for item in val])
val_labels = torch.tensor([item['label'] for item in val])
test_global = torch.tensor([item['global'] for item in test])
test_local = torch.tensor([item['local'] for item in test])
test_labels = torch.tensor([item['label'] for item in test])
train_dataset = torch.utils.data.TensorDataset(train_global, train_local, train_labels)
val_dataset = torch.utils.data.TensorDataset(val_global, val_local, val_labels)
test_dataset = torch.utils.data.TensorDataset(test_global, test_local, test_labels)
print("Tamaño del conjunto de entrenamiento:", len(train_dataset))
print("Tamaño del conjunto de validación:", len(val_dataset))
print("Tamaño del conjunto de prueba:", len(test_dataset))
(8, 2001) (8, 201)
d:\Diego\Astrofisica\TFM\ExoPlanet-Detection\.venv\lib\site-packages\ipykernel_launcher.py:10: UserWarning: Creating a tensor from a list of numpy.ndarrays is extremely slow. Please consider converting the list to a single numpy.ndarray with numpy.array() before converting to a tensor. (Triggered internally at C:\actions-runner\_work\pytorch\pytorch\builder\windows\pytorch\torch\csrc\utils\tensor_new.cpp:233.) # Remove the CWD from sys.path while we load stuff.
Tamaño del conjunto de entrenamiento: 5804 Tamaño del conjunto de validación: 726 Tamaño del conjunto de prueba: 726
In [13]:
fig, ax = plt.subplots(2, 1, figsize=(10, 5))
ax[0].imshow(items[2]['global'], aspect='auto', cmap='gray')
ax[0].set_title('Global')
ax[0].set_xlabel('Time')
ax[0].set_ylabel('Flux')
ax[1].imshow(items[2]['local'], aspect='auto', cmap='gray')
ax[1].set_title('Local')
ax[1].set_xlabel('Time')
ax[1].set_ylabel('Flux')
plt.tight_layout()
plt.show()
Entrenar el modelo¶
Definir device en caso de usar GPU o CPU¶
In [14]:
try:
import torch_directml
if torch_directml.is_available():
device = torch_directml.device()
else:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Dispositivo:", device)
except ImportError:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Dispositivo:", device)
Dispositivo: privateuseone:0
In [15]:
class FocalLoss(nn.Module):
def __init__(self, gamma=2.0, alpha=0.25):
super(FocalLoss, self).__init__()
self.gamma = gamma
self.alpha = alpha
def forward(self, inputs, targets):
BCE_loss = F.binary_cross_entropy_with_logits(inputs, targets.float(), reduction='none')
pt = torch.exp(-BCE_loss)
F_loss = self.alpha * (1 - pt) ** self.gamma * BCE_loss
return F_loss.mean()
In [ ]:
def optuna_search(trial):
# Definir los hiperparámetros a optimizar
batch_size = trial.suggest_categorical('batch_size', [32, 64])
# learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-1)
learning_rate = trial.suggest_categorical('learning_rate', [1e-5, 1e-4, 1e-3, 1e-2])
# num_epochs = trial.suggest_int('num_epochs', 50, 100, step=10)
num_epochs = trial.suggest_categorical('num_epochs', [50, 65, 80, 100])
# weight_decay = trial.suggest_float('weight_decay', 1e-6, 1e-2)
weight_decay = trial.suggest_categorical('weight_decay', [1e-6, 1e-5, 1e-4, 1e-3])
criterion = trial.suggest_categorical('loss_fn', [0, 1]) # 0: CrossEntropy, 1: BCEWithLogits, 2: FocalLoss
num_classes = 1
if criterion == 0:
criterion = nn.CrossEntropyLoss(weight=torch.tensor([1, 1.74]).to(device))
num_classes = 2
elif criterion == 1:
criterion = nn.BCEWithLogitsLoss(pos_weight=torch.tensor([1.74]).to(device))
elif criterion == 2:
criterion = FocalLoss(gamma=2.0, alpha=0.25).to(device)
# Crear el modelo
model = ShallueModel2D(num_classes=num_classes, wavelet_levels=wavelet_levels).to(device)
# Definir el optimizador y la función de pérdida
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
# criterion = nn.BCEWithLogitsLoss(pos_weight=torch.tensor([fraction]).to(device))
# Crear los DataLoaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
# Entrenar el modelo
model.train()
for batch, data in enumerate(train_loader):
global_s, local_s, labels = data
# check if any tensor is empty
if global_s.numel() == 0 or local_s.numel() == 0:
continue
# check if any tensor has nan
if torch.isnan(global_s).any() or torch.isnan(local_s).any():
continue
# Move data to device
global_s = global_s.to(device)#.unsqueeze(1).float()
local_s = local_s.to(device)#.unsqueeze(1).float()
labels = labels.to(device)
optimizer.zero_grad()
# Forward propagation
outputs = model((global_s, local_s))
if type(criterion) == nn.BCEWithLogitsLoss or type(criterion) == FocalLoss:
# elimnar la dimensión extra de outputs
outputs = outputs.squeeze(1)
labels = labels.float()
# Compute loss and backpropagation
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}")
# Evaluar el modelo
model.eval()
test_size = len(test_loader.dataset)
n_batches = len(test_loader)
total_loss = 0.0
correct = 0
all_labels = []
all_predictions = []
with torch.no_grad():
for batch, data in enumerate(test_loader):
global_s, local_s, labels = data
# check if any tensor is empty
if global_s.numel() == 0 or local_s.numel() == 0:
continue
# check if any tensor has nan
if torch.isnan(global_s).any() or torch.isnan(local_s).any():
continue
# Move data to device
global_s = global_s.to(device).float()#.unsqueeze(1).float()
local_s = local_s.to(device).float()#.unsqueeze(1).float()
labels = labels.to(device)
outputs = model((global_s, local_s))
if type(criterion) == nn.BCEWithLogitsLoss or type(criterion) == FocalLoss:
# elimnar la dimensión extra de outputs
outputs = outputs.squeeze(1)
labels = labels.float()
loss = criterion(outputs, labels)
total_loss += loss.item()
if type(criterion) == nn.BCEWithLogitsLoss or type(criterion) == FocalLoss:
predicted = (outputs > 0.5).float()
else:
_, predicted = torch.max(outputs.data, 1)
correct += (predicted == labels).sum().item()
labels = labels.cpu().numpy()
predicted = predicted.cpu().numpy()
all_labels.extend(labels)
all_predictions.extend(predicted)
accuracy = correct / test_size
val_loss = total_loss / n_batches
f1 = f1_score(all_labels, all_predictions, average='weighted')
precision = precision_score(all_labels, all_predictions, average='weighted', zero_division=0)
recall = recall_score(all_labels, all_predictions, average='weighted')
return accuracy, f1, precision, recall, val_loss
# Ejecutar la búsqueda de hiperparámetros
study = optuna.create_study(directions=["maximize", "maximize", "maximize", "maximize", "minimize"], storage="sqlite:///optuna_study.db", study_name="shallue_model6", load_if_exists=True)
# Establecer el sampler para la búsqueda de hiperparámetros
study.optimize(optuna_search, n_trials=40, show_progress_bar=True, n_jobs=1)
# Imprimir los mejores hiperparámetros y resultados
print("Mejores hiperparámetros encontrados:")
best = None
for t in study.best_trials:
if best is None or (t.values[0] > best.values[0] and t.values[1] > best.values[1] and t.values[2] > best.values[2] and t.values[3] > best.values[3] and t.values[4] < best.values[4]):
best = t
print(f"Batch size: {best.params['batch_size']}")
print(f"Learning rate: {best.params['learning_rate']}")
print(f"Num epochs: {best.params['num_epochs']}")
print(f"Weight decay: {best.params['weight_decay']}")
print(f"Loss function: {best.params['loss_fn']}")
print(f"Accuracy: {best.values[0]}")
print(f"F1 Score: {best.values[1]}")
print(f"Precision: {best.values[2]}")
print(f"Recall: {best.values[3]}")
print(f"Validation loss: {best.values[4]}")
In [16]:
batch_size = 64
num_epochs = 100
learning_rate = 0.0001
l2_penalty = 1e-4 #0.003380
dropout = 0.006516
# fraction = max_class_size / min_class_size
fraction = 1.74 #1.74
print("Fracción:", fraction)
fraction = torch.tensor(fraction, device=device) #torch.tensor(0.9342, device=device) # peso positivo para la función de pérdida BCEWithLogitsLoss
n_classes = 1 # si pones 1 usa sigmoid, si pones >1 usa softmax
if n_classes > 1:
loss_fn = nn.CrossEntropyLoss() # para clasificación multiclase
else:
loss_fn = nn.BCEWithLogitsLoss(pos_weight=fraction) # para clasificación binaria con sigmoid
# loss_fn = FocalLoss(gamma=2.0, alpha=0.25) # para clasificación binaria con sigmoid y focal loss
model = ShallueModel2DSkip(global_size=2001, local_size=201, num_classes=n_classes, wavelet_levels=wavelet_levels).to(device)
# if input("¿Quieres inicializar el modelo? (s/n): ").lower() == 's':
# try:
# model.load_state_dict(torch.load('models/Shallue_model.pth'))
# model.eval()
# except:
# print("No se pudo cargar el modelo. Se inicializa uno nuevo.")
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=l2_penalty)
schedule = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10, verbose=True)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
Fracción: 1.74 Binary classification → sigmoid output Wavelet levels: [1, 2, 3, 4] c_in: 8 Número de features concatenados: 132608
In [17]:
def train_fn(model, train_loader, optimizer, loss_fn):
model.train()
train_size = len(train_loader.dataset)
n_batches = len(train_loader)
total_loss = 0.0
correct = 0
for batch, data in enumerate(train_loader):
global_s, local_s, labels = data
# check if any tensor is empty
if global_s.numel() == 0 or local_s.numel() == 0:
continue
# check if any tensor has nan
if torch.isnan(global_s).any() or torch.isnan(local_s).any():
continue
# Move data to device
global_s = global_s.to(device)#.unsqueeze(1).float()
local_s = local_s.to(device)#.unsqueeze(1).float()
labels = labels.to(device)
optimizer.zero_grad()
# Forward propagation
outputs = model((global_s, local_s))
if type(loss_fn) == nn.BCEWithLogitsLoss or type(loss_fn) == FocalLoss:
# elimnar la dimensión extra de outputs
outputs = outputs.squeeze(1)
labels = labels.float()
# Compute loss and backpropagation
loss = loss_fn(outputs, labels)
loss.backward()
optimizer.step()
total_loss += loss.item()
if type(loss_fn) == nn.BCEWithLogitsLoss or type(loss_fn) == FocalLoss:
predicted = (outputs > 0.5).float()
else:
_, predicted = torch.max(outputs.data, 1)
correct += (predicted == labels).sum().item()
accuracy = correct / train_size
train_loss = total_loss / n_batches
return train_loss, accuracy
def val_fn(model, val_loader, loss_fn):
model.eval()
test_size = len(val_loader.dataset)
n_batches = len(val_loader)
total_loss = 0.0
correct = 0
all_labels = []
all_predictions = []
with torch.no_grad():
for batch, data in enumerate(val_loader):
global_s, local_s, labels = data
# check if any tensor is empty
if global_s.numel() == 0 or local_s.numel() == 0:
continue
# check if any tensor has nan
if torch.isnan(global_s).any() or torch.isnan(local_s).any():
continue
# Move data to device
global_s = global_s.to(device)#.unsqueeze(1).float()
local_s = local_s.to(device)#.unsqueeze(1).float()
labels = labels.to(device)
outputs = model((global_s, local_s))
if type(loss_fn) == nn.BCEWithLogitsLoss or type(loss_fn) == FocalLoss:
# elimnar la dimensión extra de outputs
outputs = outputs.squeeze(1)
labels = labels.float()
loss = loss_fn(outputs, labels)
total_loss += loss.item()
if type(loss_fn) == nn.BCEWithLogitsLoss or type(loss_fn) == FocalLoss:
predicted = (outputs > 0.5).float()
else:
_, predicted = torch.max(outputs.data, 1)
correct += (predicted == labels).sum().item()
labels = labels.cpu().numpy()
predicted = predicted.cpu().numpy()
all_labels.extend(labels)
all_predictions.extend(predicted)
accuracy = correct / test_size
val_loss = total_loss / n_batches
f1 = f1_score(all_labels, all_predictions, average='weighted')
precision = precision_score(all_labels, all_predictions, average='weighted', zero_division=0)
recall = recall_score(all_labels, all_predictions, average='weighted')
return val_loss, accuracy, f1, precision, recall
In [18]:
%matplotlib inline
prev_val_loss = float('inf')
patience = 10
early_stopping_counter = 0
verbose = False
loss_train_list = []
loss_val_list = []
precision_list = []
f1_list = []
recall_list = []
for epoch in tqdm(range(num_epochs)):
train_loss, train_accuracy = train_fn(model, train_loader, optimizer, loss_fn)
val_loss, val_accuracy, f1, precision, recall = val_fn(model, val_loader, loss_fn)
schedule.step(val_loss)
if verbose:
print(f'Epoch {epoch}/{num_epochs}')
print(f'Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')
print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')
print(f'F1 Score: {f1:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}')
# early stopping
if val_loss > prev_val_loss:
early_stopping_counter += 1
if early_stopping_counter >= patience:
print("Early stopping")
break
else:
early_stopping_counter = 0
# Save the model if validation loss decreases
# torch.save(model.state_dict(), 'models/Shallue_model.pth')
# print("Modelo guardado")
prev_val_loss = val_loss
loss_train_list.append(train_loss)
loss_val_list.append(val_loss)
precision_list.append(precision)
f1_list.append(f1)
recall_list.append(recall)
print("Entrenamiento y validación completados.", '------'*20)
print(f'Epoch {epoch+1}/{num_epochs}')
print(f'Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')
print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')
print(f'F1 Score: {f1:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}')
56%|█████▌ | 56/100 [05:52<04:37, 6.30s/it]
Epoch 00056: reducing learning rate of group 0 to 1.0000e-05.
67%|██████▋ | 67/100 [07:01<03:27, 6.29s/it]
Epoch 00067: reducing learning rate of group 0 to 1.0000e-06.
78%|███████▊ | 78/100 [08:10<02:18, 6.29s/it]
Epoch 00078: reducing learning rate of group 0 to 1.0000e-07.
87%|████████▋ | 87/100 [09:13<01:22, 6.37s/it]
Early stopping Entrenamiento y validación completados. ------------------------------------------------------------------------------------------------------------------------ Epoch 88/100 Train Loss: 0.6553, Train Accuracy: 0.9280 Validation Loss: 0.7276, Validation Accuracy: 0.8554 F1 Score: 0.8561, Precision: 0.8577, Recall: 0.8554
In [19]:
# plot the loss, precision, f1 and recall graphs
fig, axs = plt.subplots(4, 1, figsize=(10, 15))
axs[0].plot(loss_train_list, label='Train Loss')
axs[0].plot(loss_val_list, label='Validation Loss', color='purple')
axs[0].set_title('Loss')
axs[0].set_xlabel('Epochs')
axs[0].set_ylabel('Loss')
axs[0].legend()
axs[1].plot(precision_list, label='Precision', color='orange')
axs[1].set_title('Precision')
axs[1].set_xlabel('Epochs')
axs[1].set_ylabel('Precision')
axs[1].legend()
axs[2].plot(f1_list, label='F1 Score', color='green')
axs[2].set_title('F1 Score')
axs[2].set_xlabel('Epochs')
axs[2].set_ylabel('F1 Score')
axs[2].legend()
axs[3].plot(recall_list, label='Recall', color='red')
axs[3].set_title('Recall')
axs[3].set_xlabel('Epochs')
axs[3].set_ylabel('Recall')
axs[3].legend()
plt.tight_layout()
plt.show()
In [60]:
torch.save(model.state_dict(), 'models/Shallue_model_93-76-81.pth')
In [15]:
model = ShallueModel2DSkip(global_size=2001, local_size=201, num_classes=n_classes, wavelet_levels=wavelet_levels).to(device)
model.load_state_dict(torch.load('models/ShallueSkip_best.pth'))
Binary classification → sigmoid output Wavelet levels: [1, 2, 3, 4] c_in: 8 Número de features concatenados: 132608
Out[15]:
<All keys matched successfully>
In [20]:
def predict(model, data_loader, loss_fn):
predicted = []
probs = []
labels = []
loss_test = []
model.eval()
with torch.no_grad():
for data in data_loader:
global_s, local_s, label = data
# check if any tensor is empty
if global_s.numel() == 0 or local_s.numel() == 0:
predicted.append(np.full_like(label, -1))
labels.append(np.full_like(label, -1))
probs.append(np.full_like(label, -1))
loss_test.append(np.nan)
continue
# check if any tensor has nan
if torch.isnan(global_s).any() or torch.isnan(local_s).any():
predicted.append(np.full_like(label, -1))
labels.append(np.full_like(label, -1))
probs.append(np.full_like(label, -1))
continue
global_s = global_s.to(device)#.unsqueeze(1).float()
local_s = local_s.to(device)#.unsqueeze(1).float()
label = label.to(device)
output = model((global_s, local_s))
if type(loss_fn) == nn.BCEWithLogitsLoss or type(loss_fn) == FocalLoss:
# elimnar la dimensión extra de outputs
output = output.squeeze(1)
label = label.float()
loss = loss_fn(output, label)
loss_test.append(loss.item())
if type(loss_fn) == nn.BCEWithLogitsLoss or type(loss_fn) == FocalLoss:
predicted.append((output.cpu() > 0.5).float())
else:
_, predicted_label = torch.max(output.data, 1)
predicted.append(predicted_label.cpu().numpy())
labels.append(label.cpu().numpy())
probs.append(output.cpu().numpy())
predicted = np.concatenate(predicted)
labels = np.concatenate(labels)
probs = np.concatenate(probs)
# loss_test = np.concatenate(loss_test)
return predicted, labels, probs, loss_test
In [21]:
def plot_confusion_matrix(y_true, y_pred, classes, title = 'Confusion Matrix'):
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes)
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.title(title)
# plot stats
accuracy = accuracy_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred, average='weighted')
precision = precision_score(y_true, y_pred, average='weighted', zero_division=0)
recall = recall_score(y_true, y_pred, average='weighted')
plt.text(2.7, 0.5, f'Accuracy: {accuracy:.4f}\nF1: {f1:.4f}\nPrecision: {precision:.4f}\nRecall: {recall:.4f}',
horizontalalignment='center', verticalalignment='center', fontsize=12, color='black')
plt.show()
# plot the ROC curve
def plot_roc_curve(y_true, y_pred, title='ROC Curve'):
from sklearn.metrics import roc_curve, auc
fpr, tpr, thresholds = roc_curve(y_true, y_pred)
roc_auc = auc(fpr, tpr)
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='blue', lw=2, label='ROC curve (area = {:.2f})'.format(roc_auc))
plt.plot([0, 1], [0, 1], color='red', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title(title)
plt.legend(loc='lower right')
plt.show()
def plot_loss(loss):
plt.figure(figsize=(10, 5))
plt.plot(loss)
plt.title('Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.show()
In [22]:
predicted, labels, probs, loss_serie = predict(model, test_loader, loss_fn)
classes = ['FALSE POSITIVE', 'CONFIRMED']
plot_confusion_matrix(labels, predicted, classes, title='Test Confusion Matrix')
# plot_loss(loss_serie)
# plot_roc_curve(labels, predicted, title='Test ROC Curve')
Probar el modelo con los datos de CANDIDATOS¶
In [23]:
global_odd = []
global_even = []
local_odd = []
local_even = []
wl_candidates = []
for lc in tqdm(candidates, desc='Processing light curves'):
global_odd.append(lc.pliegue_impar_global._light_curve.flux.value)
global_even.append(lc.pliegue_par_global._light_curve.flux.value)
local_odd.append(lc.pliegue_impar_local._light_curve.flux.value)
local_even.append(lc.pliegue_par_local._light_curve.flux.value)
# Extraer los niveles de wavelet
wl_candidates.append({
'global_odd': lc.pliegue_impar_global._lc_w_collection,
'global_even': lc.pliegue_par_global._lc_w_collection,
'local_odd': lc.pliegue_impar_local._lc_w_collection,
'local_even': lc.pliegue_par_local._lc_w_collection
})
items_cand = []
for i in tqdm(range(len(wl_candidates)), desc='Creating items'):
global_series = []
local_series = []
for l in wavelet_levels:
if l == 0:
global_series.append(global_odd[i])
global_series.append(global_even[i])
local_series.append(local_odd[i])
local_series.append(local_even[i])
else:
global_series.append(wl_candidates[i]['global_odd'][l-1][0])
global_series.append(wl_candidates[i]['global_even'][l-1][0])
local_series.append(wl_candidates[i]['local_odd'][l-1][0])
local_series.append(wl_candidates[i]['local_even'][l-1][0])
global_series = np.array(global_series)
local_series = np.array(local_series)
item = {
'global': global_series,
'local': local_series,
'label': 0
}
items_cand.append(item)
candidates_global = torch.tensor([item['global'] for item in items_cand])
candidates_local = torch.tensor([item['local'] for item in items_cand])
candidates_labels = torch.tensor([item['label'] for item in items_cand])
candidates_dataset = torch.utils.data.TensorDataset(candidates_global, candidates_local, candidates_labels)
print("Tamaño del conjunto de candidatos:", len(candidates_dataset))
candidates_loader = DataLoader(candidates_dataset, batch_size=batch_size, shuffle=False)
print("Número de candidatos:", len(candidates_loader.dataset))
Processing light curves: 100%|██████████| 1955/1955 [00:00<00:00, 99997.13it/s] Creating items: 100%|██████████| 1955/1955 [00:00<00:00, 29579.97it/s]
Tamaño del conjunto de candidatos: 1955 Número de candidatos: 1955
In [24]:
predicted_candidates, _, probs, _ = predict(model, candidates_loader, loss_fn)
print(len(predicted_candidates))
1955
In [25]:
# contar los positivos y negativos
n_positive = sum(predicted_candidates)
n_negative = len(predicted_candidates) - n_positive
print("Exoplanetas confirmados:", n_positive)
print("Falso positivo:", n_negative)
plt.bar_label(plt.bar(['Exoplanetas confirmados', 'Falso positivo'], [n_positive, n_negative]))
plt.title('Predicciones de candidatos')
plt.show()
# para el caso de confirmados evaluar la distribución de los valores de probabilidad
probs_confirmed = probs[predicted_candidates == 1]
plt.figure(figsize=(10, 5))
plt.hist(probs_confirmed, bins=50, color='blue', alpha=0.7)
plt.title('Distribución de las probabilidades de los candidatos')
plt.xlabel('Probabilidad')
plt.ylabel('Frecuencia')
# plt.axvline(x=0.5, color='red', linestyle='--', label='Umbral 0.5')
plt.legend()
plt.show()
Exoplanetas confirmados: 689.0 Falso positivo: 1266.0
No artists with labels found to put in legend. Note that artists whose label start with an underscore are ignored when legend() is called with no argument.
In [51]:
# crear un DataFrame con los resultados y guardarlo en un archivo CSV
import pandas as pd
print("Guardando resultados en un DataFrame...")
print("Número de candidatos:", len(predicted_candidates))
print("candidates_dataset:", len(candidates_dataset))
df_results = pd.DataFrame({
'id': [f'candidate_{i+1}' for i in range(len(predicted_candidates))],
'kepler_name': [c.headers['Kepler_name'] for c in candidates],
'predicted': predicted_candidates,
'probability': probs,
})
df_results.to_csv('results/candidates_predictions.csv', index=False)
print("Resultados guardados en 'results/candidates_predictions.csv'.")
Guardando resultados en un DataFrame... Número de candidatos: 1955 candidates_dataset: 1955 Resultados guardados en 'results/candidates_predictions.csv'.
In [26]:
# comparar series de un condidato confirmado y uno falso positivo
# coger la posición de un candidato confirmado y uno falso positivo (1 y 0 respectivamente) en las probabilidades
confirmed_index = np.where(probs == 1)[0][0]
false_positive_index = np.where(probs == 0)[0][0]
print("Índice del candidato confirmado:", confirmed_index)
print("Índice del falso positivo:", false_positive_index)
confirmed_global = items_cand[confirmed_index]['global']
confirmed_local = items_cand[confirmed_index]['local']
false_positive_global = items_cand[false_positive_index]['global']
false_positive_local = items_cand[false_positive_index]['local']
fig, ax = plt.subplots(2, 2, figsize=(15, 10))
ax[0, 0].imshow(confirmed_global, aspect='auto', cmap='gray')
ax[0, 0].set_title('Global Confirmed')
ax[0, 0].set_xlabel('Time')
ax[0, 0].set_ylabel('Flux')
ax[0, 1].imshow(confirmed_local, aspect='auto', cmap='gray')
ax[0, 1].set_title('Local Confirmed')
ax[0, 1].set_xlabel('Time')
ax[0, 1].set_ylabel('Flux')
ax[1, 0].imshow(false_positive_global, aspect='auto', cmap='gray')
ax[1, 0].set_title('Global False Positive')
ax[1, 0].set_xlabel('Time')
ax[1, 0].set_ylabel('Flux')
ax[1, 1].imshow(false_positive_local, aspect='auto', cmap='gray')
ax[1, 1].set_title('Local False Positive')
ax[1, 1].set_xlabel('Time')
ax[1, 1].set_ylabel('Flux')
plt.tight_layout()
plt.show()
Índice del candidato confirmado: 14 Índice del falso positivo: 790