Librerias¶

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
import numpy as np
from lib.LCWavelet import *
import os
from tqdm import tqdm
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, Dataset
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
import optuna

Arquitectura del modelo¶

In [2]:
class ShallueModel(nn.Module):
    def __init__(self, global_size=2001, local_size=201, num_classes=2, wavelet_levels=[]):
        super(ShallueModel, self).__init__()
        self.global_size = global_size
        self.local_size = local_size
        self.num_classes = num_classes
        if num_classes == 1:
            print("Binary classification, sigmoid activation will be used.")
        elif num_classes > 1:
            print("Multi-class classification, softmax activation will be used.")
            
        c_in = 2
        if len(wavelet_levels) > 0:
            c_in = len(wavelet_levels) * 2
            
        self.conv_global = nn.Sequential(
            nn.Conv1d(c_in, 16, 5),
            nn.ReLU(),
            nn.Conv1d(16, 16, 5),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=5, stride=2),
            
            nn.Conv1d(16, 32, 5),
            nn.ReLU(),
            nn.Conv1d(32, 32, 5),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=5, stride=2),
            
            nn.Conv1d(32, 64, 5),
            nn.ReLU(),
            nn.Conv1d(64, 64, 5),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=5, stride=2),
            
            nn.Conv1d(64, 128, 5),
            nn.ReLU(),
            nn.Conv1d(128, 128, 5),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=5, stride=2),
            
            nn.Conv1d(128, 256, 5),
            nn.ReLU(),
            nn.Conv1d(256, 256, 5),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=5, stride=2),
        )
    
        
        self.conv_local = nn.Sequential(
            nn.Conv1d(c_in, 16, 5),
            nn.ReLU(),
            nn.Conv1d(16, 16, 5),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=7, stride=2),
            
            nn.Conv1d(16, 32, 5),
            nn.ReLU(),
            nn.Conv1d(32, 32, 5),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=7, stride=2),
        )

        
        # Calcular automáticamente el número de features resultantes de la concatenación
        with torch.no_grad():
            dummy_global = torch.zeros(1, c_in, self.global_size)
            dummy_local = torch.zeros(1, c_in, self.local_size)
            out_global  = self.conv_global(dummy_global)
            out_local   = self.conv_local(dummy_local)

            
            # Flatten cada salida y sumar sus dimensiones
            num_features = out_global.view(1, -1).size(1) + out_local.view(1, -1).size(1)
            
        print("Número de features concatenados:", num_features)
        
        self.fc = nn.Sequential(
            nn.Linear(num_features, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, num_classes)
        )
        
    def forward(self, inputs):
        global_feat = self.conv_global(inputs[0])
        local_feat = self.conv_local(inputs[1])

        
        global_feat = global_feat.view(global_feat.size(0), -1)
        local_feat = local_feat.view(local_feat.size(0), -1)

        # Concatenación de todas las ramas
        x = torch.cat((global_feat, local_feat), dim=1)
        x = self.fc(x)
        if self.num_classes > 1:
            return F.softmax(x, dim=1)
        else:
            return torch.sigmoid(x)

class ShallueModel2D(nn.Module):
    def __init__(self, global_size=2001, local_size=201, num_classes=2, wavelet_levels=[]):
        super().__init__()
        self.global_size = global_size
        self.local_size = local_size
        self.num_classes = num_classes

        if num_classes == 1:
            print("Binary classification → sigmoid output")
        else:
            print("Multi-class classification → softmax output")
        if len(wavelet_levels) > 0:
            print(f"Wavelet levels: {wavelet_levels}")
            c_in = len(wavelet_levels) * 2
        else:
            c_in = 2
        # --------------------------------------------------
        # Branch para la señal global, reformulada como "imagen" 2×L
        # --------------------------------------------------
        self.conv_global = nn.Sequential(
            # combina las dos “filas” (par/impar) en la primera convolución
            nn.Conv2d(1, 16, kernel_size=(c_in//2, 5)),
            nn.ReLU(),
            nn.Conv2d(16, 16, kernel_size=(1, 5)),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(1, 5), stride=(1, 2)),

            nn.Conv2d(16, 32, kernel_size=(c_in//2, 5)),
            nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=(1, 5)),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(1, 5), stride=(1, 2)),

            nn.Conv2d(32, 64, kernel_size=(1, 5)),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=(1, 5)),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(1, 5), stride=(1, 2)),

            nn.Conv2d(64, 128, kernel_size=(1, 5)),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=(1, 5)),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(1, 5), stride=(1, 2)),

            nn.Conv2d(128, 256, kernel_size=(1, 5)),
            nn.ReLU(),
            nn.Conv2d(256, 256, kernel_size=(1, 5)),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(1, 5), stride=(1, 2)),
        )

        # --------------------------------------------------
        # Branch para la señal local, reformulada como "imagen" 2×L
        # --------------------------------------------------
        self.conv_local = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=(c_in//2, 5)),
            nn.ReLU(),
            nn.Conv2d(16, 16, kernel_size=(1, 5)),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(1, 7), stride=(1, 2)),
            
            nn.Conv2d(16, 32, kernel_size=(c_in//2, 5)),
            nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=(1, 5)),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(1, 7), stride=(1, 2)),
        )

        # --------------------------------------------------
        # Calcular automáticamente cuántas características salen tras conv_global + conv_local
        # --------------------------------------------------
        with torch.no_grad():
            # Dummy de forma [batch=1, canales=1, altura=2, ancho=global_size]
            dg = torch.zeros(1, 1, c_in, self.global_size)
            dl = torch.zeros(1, 1, c_in, self.local_size)

            og = self.conv_global(dg)
            ol = self.conv_local(dl)

            num_features = og.view(1, -1).size(1) + ol.view(1, -1).size(1)

        print(f"Número de features concatenados: {num_features}")

        # --------------------------------------------------
        # Fully connected
        # --------------------------------------------------
        self.fc = nn.Sequential(
            nn.Linear(num_features, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, self.num_classes)
        )

    def forward(self, inputs):
        # inputs: tuple (global, local), cada uno [B, 2, L]
        g, l = inputs

        # convertimos a [B, 1, 2, L] para Conv2d
        g = g.unsqueeze(1)
        l = l.unsqueeze(1)

        # ramas convolucionales
        g_feat = self.conv_global(g)
        l_feat = self.conv_local(l)

        # aplanar y concatenar
        g_feat = g_feat.view(g_feat.size(0), -1)
        l_feat = l_feat.view(l_feat.size(0), -1)
        x = torch.cat((g_feat, l_feat), dim=1)

        # fully connected
        x = self.fc(x)

        # activación final
        if self.num_classes > 1:
            return F.softmax(x, dim=1)
        else:
            # para binary: devuelve [B, 1] con probabilidad
            return torch.sigmoid(x)

class ShallueModel2DNorm(nn.Module):
    def __init__(self, global_size=2001, local_size=201, num_classes=2, wavelet_levels=[]):
        super().__init__()
        self.global_size = global_size
        self.local_size = local_size
        self.num_classes = num_classes

        if num_classes == 1:
            print("Binary classification → sigmoid output")
        else:
            print("Multi-class classification → softmax output")
        if len(wavelet_levels) > 0:
            print(f"Wavelet levels: {wavelet_levels}")
            c_in = len(wavelet_levels) * 2
        else:
            c_in = 2
        # --------------------------------------------------
        # Branch para la señal global, reformulada como "imagen" 2×L
        # --------------------------------------------------
        self.conv_global = nn.Sequential(
            # combina las dos “filas” (par/impar) en la primera convolución
            nn.Conv2d(1, 16, kernel_size=(2, 5)),
            nn.ReLU(),
            nn.BatchNorm2d(16),
            nn.Conv2d(16, 16, kernel_size=(1, 5)),
            nn.ReLU(),
            nn.BatchNorm2d(16),
            nn.MaxPool2d(kernel_size=(1, 5), stride=(1, 2)),

            nn.Conv2d(16, 32, kernel_size=(1, 5)),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.Conv2d(32, 32, kernel_size=(1, 5)),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.MaxPool2d(kernel_size=(1, 5), stride=(1, 2)),

            nn.Conv2d(32, 64, kernel_size=(1, 5)),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.Conv2d(64, 64, kernel_size=(1, 5)),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(kernel_size=(1, 5), stride=(1, 2)),

            nn.Conv2d(64, 128, kernel_size=(1, 5)),
            nn.ReLU(),
            nn.BatchNorm2d(128),
            nn.Conv2d(128, 128, kernel_size=(1, 5)),
            nn.ReLU(),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(kernel_size=(1, 5), stride=(1, 2)),

            nn.Conv2d(128, 256, kernel_size=(1, 5)),
            nn.ReLU(),
            nn.BatchNorm2d(256),
            nn.Conv2d(256, 256, kernel_size=(1, 5)),
            nn.ReLU(),
            nn.BatchNorm2d(256),
            nn.MaxPool2d(kernel_size=(1, 5), stride=(1, 2)),
        )

        # --------------------------------------------------
        # Branch para la señal local, reformulada como "imagen" 2×L
        # --------------------------------------------------
        self.conv_local = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=(2, 5)),
            nn.ReLU(),
            nn.BatchNorm2d(16),
            nn.Conv2d(16, 16, kernel_size=(1, 5)),
            nn.ReLU(),
            nn.BatchNorm2d(16),
            nn.MaxPool2d(kernel_size=(1, 7), stride=(1, 2)),
            
            nn.Conv2d(16, 32, kernel_size=(1, 5)),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.Conv2d(32, 32, kernel_size=(1, 5)),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.MaxPool2d(kernel_size=(1, 7), stride=(1, 2)),
        )

        # --------------------------------------------------
        # Calcular automáticamente cuántas características salen tras conv_global + conv_local
        # --------------------------------------------------
        with torch.no_grad():
            # Dummy de forma [batch=1, canales=1, altura=2, ancho=global_size]
            dg = torch.zeros(1, 1, c_in, self.global_size)
            dl = torch.zeros(1, 1, c_in, self.local_size)

            og = self.conv_global(dg)
            ol = self.conv_local(dl)

            num_features = og.view(1, -1).size(1) + ol.view(1, -1).size(1)

        print(f"Número de features concatenados: {num_features}")

        # --------------------------------------------------
        # Fully connected
        # --------------------------------------------------
        self.fc = nn.Sequential(
            nn.Linear(num_features, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, self.num_classes)
        )

    def forward(self, inputs):
        # inputs: tuple (global, local), cada uno [B, 2, L]
        g, l = inputs

        # convertimos a [B, 1, 2, L] para Conv2d
        g = g.unsqueeze(1)
        l = l.unsqueeze(1)

        # ramas convolucionales
        g_feat = self.conv_global(g)
        l_feat = self.conv_local(l)

        # aplanar y concatenar
        g_feat = g_feat.view(g_feat.size(0), -1)
        l_feat = l_feat.view(l_feat.size(0), -1)
        x = torch.cat((g_feat, l_feat), dim=1)

        # fully connected
        x = self.fc(x)

        # activación final
        if self.num_classes > 1:
            return F.softmax(x, dim=1)
        else:
            # para binary: devuelve [B, 1] con probabilidad
            return torch.sigmoid(x)
In [3]:
class SkipLayer(nn.Module):
    def __init__(self,
                 in_channels,
                 out_channels,
                 kernel_size=(1,5),
                 stride=1,
                 padding=(0,2),
                 conv_size=2,
                 pool_kernel=(1,5),
                 pool_stride=(1,2)):
        """
        - in_channels/out_channels: canales antes y después de la rama conv.
        - kernel_size, stride, padding: parámetros de cada Conv2d.
        - conv_size: cuántas conv+BN+ReLU encadenas antes del pool.
        - pool_kernel/stride: parámetros del MaxPool2d.
        """
        super().__init__()
        # 1) Rama convolucional
        layers = []
        c_in = in_channels
        for i in range(conv_size):
            layers += [
                nn.Conv2d(c_in, out_channels,
                          kernel_size=kernel_size if i==0 else (1, kernel_size[1]),
                          stride=stride,
                          padding=padding if i==0 else (0, padding[1])),
                # nn.BatchNorm2d(out_channels, track_running_stats=False),
                nn.ReLU()
            ]
            c_in = out_channels

        layers += [
            nn.MaxPool2d(kernel_size=pool_kernel, stride=pool_stride)
        ]
        self.conv_branch = nn.Sequential(*layers)

        # 2) Rama identidad: same pool + conv1x1 para ajustar canales
        self.id_branch = nn.Sequential(
            nn.MaxPool2d(kernel_size=pool_kernel, stride=pool_stride),
            nn.Conv2d(in_channels, out_channels, kernel_size=1),
            # nn.BatchNorm2d(out_channels, track_running_stats=False),
        )

    def forward(self, x):
        out = self.conv_branch(x)   # → [B, out_channels, H_out, W_out]
        idt = self.id_branch(x)     # → [B, out_channels, H_out, W_out]
        return F.relu(out + idt)

    
class ShallueModel2DSkip(nn.Module):
    def __init__(self, global_size=2001, local_size=201,
                 num_classes=2, wavelet_levels=[]):
        super().__init__()
        c_in = len(wavelet_levels)*2 if wavelet_levels else 2
        self.num_classes = num_classes
        if num_classes == 1:
            print("Binary classification → sigmoid output")
        else:
            print("Multi-class classification → softmax output")
        print(f"Wavelet levels: {wavelet_levels}")
        print(f"c_in: {c_in}")
        # Branch global
        self.conv_global = nn.Sequential(
            # kernel_height=c_in para abarcar las filas, kernel_width=5
            SkipLayer(1,  16,
                      kernel_size=(c_in, 5),
                      padding=(0,2),
                      conv_size=2,
                      pool_kernel=(1,5),
                      pool_stride=(1,2)),
            SkipLayer(16,   32, 
                      kernel_size=(1, 5),
                      padding=(0,2),
                      conv_size=2,
                      pool_kernel=(1,5),
                      pool_stride=(1,2)),
            SkipLayer(32,   64,
                      kernel_size=(1, 5),
                      padding=(0,2),
                      conv_size=2,
                      pool_kernel=(1,5),
                      pool_stride=(1,2)),
            SkipLayer(64,  128,
                        kernel_size=(1, 5),
                        padding=(0,2),
                        conv_size=2,
                        pool_kernel=(1,5),
                        pool_stride=(1,2)),
            SkipLayer(128, 256,
                        kernel_size=(1, 5),
                        padding=(0,2),
                        conv_size=2,
                        pool_kernel=(1,5),
                        pool_stride=(1,2))
        )

        # Branch local (idéntico pero con local_size)
        self.conv_local = nn.Sequential(
            SkipLayer(1,  16,
                      kernel_size=(c_in, 5),
                      padding=(0,2),
                      conv_size=2,
                      pool_kernel=(1,7),
                      pool_stride=(1,2)),
            SkipLayer(16,   32,
                      kernel_size=(1, 5),
                      padding=(0,2),
                      conv_size=2,
                      pool_kernel=(1,7),
                      pool_stride=(1,2)),
        )

        # Calcular num_features como antes
        for m in self.modules():
            # desactiva gradientes en todas las capas
            for p in m.parameters():
                p.requires_grad_(False)
                
        with torch.no_grad():
            dg = torch.zeros(1, 1, c_in, global_size)
            dl = torch.zeros(1, 1, c_in, local_size)
            og = self.conv_global(dg)
            ol = self.conv_local(dl)
            num_features = og.view(1, -1).size(1) + ol.view(1, -1).size(1)
            
        # ahora reactiva los gradientes
        for m in self.modules():
            for p in m.parameters():
                p.requires_grad_(True)
            
        print(f"Número de features concatenados: {num_features}")
        self.fc = nn.Sequential(
            nn.Linear(num_features, 512), nn.ReLU(),
            nn.Linear(512, 256),          nn.ReLU(),
            nn.Linear(256,  64),          nn.ReLU(),
            nn.Linear(64, num_classes)
        )

    def forward(self, inputs):
        g, l = inputs  # cada uno [B, C_in, L]
        g = g.unsqueeze(1)  # → [B, 1, C_in, L]
        l = l.unsqueeze(1)
        g_feat = self.conv_global(g)
        l_feat = self.conv_local(l)
        # aplanar y concatenar
        g_feat = g_feat.view(g_feat.size(0), -1)
        l_feat = l_feat.view(l_feat.size(0), -1)
        x = torch.cat((g_feat, l_feat), dim=1)
        x = self.fc(x)
        if self.num_classes > 1:
            return F.softmax(x, dim=1)
        else:
            return torch.sigmoid(x)

Cargado de datos¶

In [4]:
def load_data(path, file_name):
    lc = LightCurveWaveletGlobalLocalCollection.from_pickle(path + file_name)
    try:
        getattr(lc, 'levels')
    except AttributeError:
        lc.levels = [1, 2, 3, 4]
    return lc
        
path='all_data_2025_dmey/'
files = os.listdir(path)
kepler_files = [f for f in files if f.endswith('.pickle')]
light_curves = []

for file in tqdm(kepler_files, desc='Loading data'):
    light_curves.append(load_data(path, file))
Loading data: 100%|██████████| 9211/9211 [01:27<00:00, 105.29it/s]

Separar entre los confirmados y candidatos¶

In [5]:
candidates = [lc for lc in light_curves if lc.headers['class'] == 'CANDIDATE']
print("Número de candidatos:", len(candidates))

confirmed = [lc for lc in light_curves if lc.headers['class'] == 'CONFIRMED' or lc.headers['class'] == 'FALSE POSITIVE']
print("Número de confirmados:", len(confirmed))

classes = {'FALSE POSITIVE': 0, 'CONFIRMED': 1}
print("Clases:", classes)
Número de candidatos: 1955
Número de confirmados: 7256
Clases: {'FALSE POSITIVE': 0, 'CONFIRMED': 1}
In [6]:
global_odd = []
global_even = []
local_odd = []
local_even = []
wl_series = []
labels = []

for lc in tqdm(confirmed, desc='Processing light curves'):
    global_odd.append(lc.pliegue_impar_global._light_curve.flux.value)
    global_even.append(lc.pliegue_par_global._light_curve.flux.value)
    local_odd.append(lc.pliegue_impar_local._light_curve.flux.value)
    local_even.append(lc.pliegue_par_local._light_curve.flux.value)
    # Extraer los niveles de wavelet
    wl_series.append({
        'global_odd': lc.pliegue_impar_global._lc_w_collection,
        'global_even': lc.pliegue_par_global._lc_w_collection,
        'local_odd': lc.pliegue_impar_local._lc_w_collection,
        'local_even': lc.pliegue_par_local._lc_w_collection
    })
    # Convertir la clase a un número entero
    labels.append(classes[lc.headers['class']])
    
print("Número de datos:", len(global_odd))
print('Elementos de cada clase:', {k: labels.count(k) for k in set(labels)})
Processing light curves: 100%|██████████| 7256/7256 [00:00<00:00, 105092.23it/s]
Número de datos: 7256
Elementos de cada clase: {0: 4517, 1: 2739}

In [9]:
array = wl_series[2]['global_even'][0][0]
plt.figure(figsize=(10, 5))
plt.plot(array, label='Global Odd')
plt.title('Wavelet Transform - Global Odd')
plt.xlabel('Time')
plt.ylabel('Flux')
Out[9]:
Text(0, 0.5, 'Flux')
No description has been provided for this image

Separar las muestras en train y test¶

In [10]:
def create_2D(levels):
    """Crea una lista de diccionarios con las series temporales globales y locales en 2D

    Args:
        levels (list): Lista de niveles de wavelet, si es 0 se usa la señal original

    Returns:
        list: Lista de diccionarios con las series temporales globales y locales
    """
    items = []
    for i in range(len(wl_series)): # recorre todos los datos
        global_series = []
        local_series = []
        for l in levels:
            if l == 0:
                global_series.append(global_odd[i])
                global_series.append(global_even[i])
                local_series.append(local_odd[i])
                local_series.append(local_even[i])
            else:
                global_series.append(wl_series[i]['global_odd'][l-1][0])
                global_series.append(wl_series[i]['global_even'][l-1][0])
                local_series.append(wl_series[i]['local_odd'][l-1][0])
                local_series.append(wl_series[i]['local_even'][l-1][0])
                
        global_series = np.array(global_series)
        local_series = np.array(local_series)
        item = {
            'global': global_series,
            'local': local_series,
            'label': labels[i]
        }
        items.append(item)
    return items
In [11]:
wavelet_levels = [1, 2, 3, 4] # [0, 1, 2, 3, 4]
items = create_2D(wavelet_levels)
print(items[0]['global'].shape)
print(items[0]['local'].shape)

train, test = train_test_split(items, test_size=0.2, random_state=33)
val, test = train_test_split(test, test_size=0.5, random_state=33)

# train, test = train_test_split(items, test_size=0.3)
train_global = torch.tensor([item['global'] for item in train])
train_local = torch.tensor([item['local'] for item in train])
train_labels = torch.tensor([item['label'] for item in train])

val_global = torch.tensor([item['global'] for item in val])
val_local = torch.tensor([item['local'] for item in val])
val_labels = torch.tensor([item['label'] for item in val])

test_global = torch.tensor([item['global'] for item in test])
test_local = torch.tensor([item['local'] for item in test])
test_labels = torch.tensor([item['label'] for item in test])

train_dataset = torch.utils.data.TensorDataset(train_global, train_local, train_labels)
val_dataset = torch.utils.data.TensorDataset(val_global, val_local, val_labels)
test_dataset = torch.utils.data.TensorDataset(test_global, test_local, test_labels)

print("Tamaño del conjunto de entrenamiento:", len(train_dataset))
print("Tamaño del conjunto de validación:", len(val_dataset))
print("Tamaño del conjunto de prueba:", len(test_dataset))
(8, 2001)
(8, 201)
d:\Diego\Astrofisica\TFM\ExoPlanet-Detection\.venv\lib\site-packages\ipykernel_launcher.py:10: UserWarning: Creating a tensor from a list of numpy.ndarrays is extremely slow. Please consider converting the list to a single numpy.ndarray with numpy.array() before converting to a tensor. (Triggered internally at C:\actions-runner\_work\pytorch\pytorch\builder\windows\pytorch\torch\csrc\utils\tensor_new.cpp:233.)
  # Remove the CWD from sys.path while we load stuff.
Tamaño del conjunto de entrenamiento: 5804
Tamaño del conjunto de validación: 726
Tamaño del conjunto de prueba: 726
In [13]:
fig, ax = plt.subplots(2, 1, figsize=(10, 5))
ax[0].imshow(items[2]['global'], aspect='auto', cmap='gray')
ax[0].set_title('Global')
ax[0].set_xlabel('Time')
ax[0].set_ylabel('Flux')
ax[1].imshow(items[2]['local'], aspect='auto', cmap='gray')
ax[1].set_title('Local')
ax[1].set_xlabel('Time')
ax[1].set_ylabel('Flux')
plt.tight_layout()
plt.show()
No description has been provided for this image

Entrenar el modelo¶

Definir device en caso de usar GPU o CPU¶

In [14]:
try:
    import torch_directml
    if torch_directml.is_available():
        device = torch_directml.device()
    else:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print("Dispositivo:", device)
except ImportError:
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print("Dispositivo:", device)
Dispositivo: privateuseone:0
In [15]:
class FocalLoss(nn.Module):
    def __init__(self, gamma=2.0, alpha=0.25):
        super(FocalLoss, self).__init__()
        self.gamma = gamma
        self.alpha = alpha

    def forward(self, inputs, targets):
        BCE_loss = F.binary_cross_entropy_with_logits(inputs, targets.float(), reduction='none')
        pt = torch.exp(-BCE_loss)
        F_loss = self.alpha * (1 - pt) ** self.gamma * BCE_loss
        return F_loss.mean()
In [ ]:
def optuna_search(trial):
    # Definir los hiperparámetros a optimizar
    batch_size = trial.suggest_categorical('batch_size', [32, 64])
    # learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-1)
    learning_rate = trial.suggest_categorical('learning_rate', [1e-5, 1e-4, 1e-3, 1e-2])
    # num_epochs = trial.suggest_int('num_epochs', 50, 100, step=10)
    num_epochs = trial.suggest_categorical('num_epochs', [50, 65, 80, 100])
    # weight_decay = trial.suggest_float('weight_decay', 1e-6, 1e-2)
    weight_decay = trial.suggest_categorical('weight_decay', [1e-6, 1e-5, 1e-4, 1e-3])
    criterion = trial.suggest_categorical('loss_fn', [0, 1]) # 0: CrossEntropy, 1: BCEWithLogits, 2: FocalLoss
    num_classes = 1
    if criterion == 0:
        criterion = nn.CrossEntropyLoss(weight=torch.tensor([1, 1.74]).to(device))
        num_classes = 2
    elif criterion == 1:
        criterion = nn.BCEWithLogitsLoss(pos_weight=torch.tensor([1.74]).to(device))
    elif criterion == 2:
        criterion = FocalLoss(gamma=2.0, alpha=0.25).to(device)

    # Crear el modelo
    model = ShallueModel2D(num_classes=num_classes, wavelet_levels=wavelet_levels).to(device)
    
    # Definir el optimizador y la función de pérdida
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    # criterion = nn.BCEWithLogitsLoss(pos_weight=torch.tensor([fraction]).to(device))

    # Crear los DataLoaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    # Entrenar el modelo
    model.train()
    for batch, data in enumerate(train_loader):
        global_s, local_s, labels = data
        # check if any tensor is empty
        if global_s.numel() == 0 or local_s.numel() == 0:
            continue
        # check if any tensor has nan
        if torch.isnan(global_s).any() or torch.isnan(local_s).any():
            continue
        # Move data to device
        global_s = global_s.to(device)#.unsqueeze(1).float()
        local_s = local_s.to(device)#.unsqueeze(1).float()
        labels = labels.to(device)
        
        optimizer.zero_grad()
        # Forward propagation
        outputs = model((global_s, local_s))
        
        if type(criterion) == nn.BCEWithLogitsLoss or type(criterion) == FocalLoss:
            # elimnar la dimensión extra de outputs
            outputs = outputs.squeeze(1)
            labels = labels.float()
        # Compute loss and backpropagation            
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}")

    # Evaluar el modelo
    model.eval()
    test_size = len(test_loader.dataset)
    n_batches = len(test_loader)
    total_loss = 0.0
    correct = 0
    all_labels = []
    all_predictions = []
    
    with torch.no_grad():
        for batch, data in enumerate(test_loader):
            global_s, local_s, labels = data
            # check if any tensor is empty
            if global_s.numel() == 0 or local_s.numel() == 0:
                continue
            # check if any tensor has nan
            if torch.isnan(global_s).any() or torch.isnan(local_s).any():
                continue
            # Move data to device
            global_s = global_s.to(device).float()#.unsqueeze(1).float()
            local_s = local_s.to(device).float()#.unsqueeze(1).float()
            labels = labels.to(device)
            
            outputs = model((global_s, local_s))
            if type(criterion) == nn.BCEWithLogitsLoss or type(criterion) == FocalLoss:
            # elimnar la dimensión extra de outputs
                outputs = outputs.squeeze(1)
                labels = labels.float()

            loss = criterion(outputs, labels)
            total_loss += loss.item()
            
            if type(criterion) == nn.BCEWithLogitsLoss or type(criterion) == FocalLoss:
                predicted = (outputs > 0.5).float()
            else:
                _, predicted = torch.max(outputs.data, 1)

            correct += (predicted == labels).sum().item()
            labels = labels.cpu().numpy()
            predicted = predicted.cpu().numpy()
            
            all_labels.extend(labels)
            all_predictions.extend(predicted)
    
    accuracy = correct / test_size
    val_loss = total_loss / n_batches
    
    f1 = f1_score(all_labels, all_predictions, average='weighted')
    precision = precision_score(all_labels, all_predictions, average='weighted', zero_division=0)
    recall = recall_score(all_labels, all_predictions, average='weighted')
    
    return accuracy, f1, precision, recall, val_loss

# Ejecutar la búsqueda de hiperparámetros
study = optuna.create_study(directions=["maximize", "maximize", "maximize", "maximize", "minimize"], storage="sqlite:///optuna_study.db", study_name="shallue_model6", load_if_exists=True)
# Establecer el sampler para la búsqueda de hiperparámetros
study.optimize(optuna_search, n_trials=40, show_progress_bar=True, n_jobs=1)
# Imprimir los mejores hiperparámetros y resultados
print("Mejores hiperparámetros encontrados:")
best = None
for t in study.best_trials:
    if best is None or (t.values[0] > best.values[0] and t.values[1] > best.values[1] and t.values[2] > best.values[2] and t.values[3] > best.values[3] and t.values[4] < best.values[4]):
        best = t

print(f"Batch size: {best.params['batch_size']}")
print(f"Learning rate: {best.params['learning_rate']}")
print(f"Num epochs: {best.params['num_epochs']}")
print(f"Weight decay: {best.params['weight_decay']}")
print(f"Loss function: {best.params['loss_fn']}")
print(f"Accuracy: {best.values[0]}")
print(f"F1 Score: {best.values[1]}")
print(f"Precision: {best.values[2]}")
print(f"Recall: {best.values[3]}")
print(f"Validation loss: {best.values[4]}")
In [16]:
batch_size = 64
num_epochs = 100
learning_rate = 0.0001
l2_penalty = 1e-4 #0.003380
dropout = 0.006516
# fraction = max_class_size / min_class_size
fraction = 1.74 #1.74
print("Fracción:", fraction)
fraction = torch.tensor(fraction, device=device) #torch.tensor(0.9342, device=device) # peso positivo para la función de pérdida BCEWithLogitsLoss
n_classes = 1 # si pones 1 usa sigmoid, si pones >1 usa softmax
if n_classes > 1:
    loss_fn = nn.CrossEntropyLoss() # para clasificación multiclase
else:
    loss_fn = nn.BCEWithLogitsLoss(pos_weight=fraction) # para clasificación binaria con sigmoid
    # loss_fn = FocalLoss(gamma=2.0, alpha=0.25) # para clasificación binaria con sigmoid y focal loss
    
model = ShallueModel2DSkip(global_size=2001, local_size=201, num_classes=n_classes, wavelet_levels=wavelet_levels).to(device)
# if input("¿Quieres inicializar el modelo? (s/n): ").lower() == 's':  
#     try: 
#         model.load_state_dict(torch.load('models/Shallue_model.pth'))
#         model.eval()
#     except:
#         print("No se pudo cargar el modelo. Se inicializa uno nuevo.")


optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=l2_penalty)
schedule = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10, verbose=True)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
Fracción: 1.74
Binary classification → sigmoid output
Wavelet levels: [1, 2, 3, 4]
c_in: 8
Número de features concatenados: 132608
In [17]:
def train_fn(model, train_loader, optimizer, loss_fn):
    model.train()
    train_size = len(train_loader.dataset)
    n_batches = len(train_loader)
    total_loss = 0.0
    correct = 0
    for batch, data in enumerate(train_loader):
        global_s, local_s, labels = data
        # check if any tensor is empty
        if global_s.numel() == 0 or local_s.numel() == 0:
            continue
        # check if any tensor has nan
        if torch.isnan(global_s).any() or torch.isnan(local_s).any():
            continue
        # Move data to device
        global_s = global_s.to(device)#.unsqueeze(1).float()
        local_s = local_s.to(device)#.unsqueeze(1).float()
        labels = labels.to(device)
        
        optimizer.zero_grad()
        # Forward propagation
        outputs = model((global_s, local_s))
        
        if type(loss_fn) == nn.BCEWithLogitsLoss or type(loss_fn) == FocalLoss:
            # elimnar la dimensión extra de outputs
            outputs = outputs.squeeze(1)
            labels = labels.float()
        # Compute loss and backpropagation            
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        if type(loss_fn) == nn.BCEWithLogitsLoss or type(loss_fn) == FocalLoss:
            predicted = (outputs > 0.5).float()
        else:
            _, predicted = torch.max(outputs.data, 1)
            
        correct += (predicted == labels).sum().item()

    accuracy = correct / train_size
    train_loss = total_loss / n_batches
        
    return train_loss, accuracy


def val_fn(model, val_loader, loss_fn):
    model.eval()
    test_size = len(val_loader.dataset)
    n_batches = len(val_loader)
    total_loss = 0.0
    correct = 0
    all_labels = []
    all_predictions = []
    
    with torch.no_grad():
        for batch, data in enumerate(val_loader):
            global_s, local_s, labels = data
            # check if any tensor is empty
            if global_s.numel() == 0 or local_s.numel() == 0:
                continue
            # check if any tensor has nan
            if torch.isnan(global_s).any() or torch.isnan(local_s).any():
                continue
            # Move data to device
            global_s = global_s.to(device)#.unsqueeze(1).float()
            local_s = local_s.to(device)#.unsqueeze(1).float()
            labels = labels.to(device)
            
            outputs = model((global_s, local_s))
            if type(loss_fn) == nn.BCEWithLogitsLoss or type(loss_fn) == FocalLoss:
            # elimnar la dimensión extra de outputs
                outputs = outputs.squeeze(1)
                labels = labels.float()

            loss = loss_fn(outputs, labels)
            total_loss += loss.item()
            
            if type(loss_fn) == nn.BCEWithLogitsLoss or type(loss_fn) == FocalLoss:
                predicted = (outputs > 0.5).float()
            else:
                _, predicted = torch.max(outputs.data, 1)

            correct += (predicted == labels).sum().item()
            labels = labels.cpu().numpy()
            predicted = predicted.cpu().numpy()
            
            all_labels.extend(labels)
            all_predictions.extend(predicted)
    
    accuracy = correct / test_size
    val_loss = total_loss / n_batches
    
    f1 = f1_score(all_labels, all_predictions, average='weighted')
    precision = precision_score(all_labels, all_predictions, average='weighted', zero_division=0)
    recall = recall_score(all_labels, all_predictions, average='weighted')
    
    return val_loss, accuracy, f1, precision, recall
In [18]:
%matplotlib inline
prev_val_loss = float('inf')
patience = 10
early_stopping_counter = 0
verbose = False
loss_train_list = []
loss_val_list = []
precision_list = []
f1_list = []
recall_list = []

for epoch in tqdm(range(num_epochs)):
    train_loss, train_accuracy = train_fn(model, train_loader, optimizer, loss_fn)

    val_loss, val_accuracy, f1, precision, recall = val_fn(model, val_loader, loss_fn)

    schedule.step(val_loss)
    
    if verbose:
        print(f'Epoch {epoch}/{num_epochs}')
        print(f'Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')
        print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')
        print(f'F1 Score: {f1:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}')

    # early stopping
    if val_loss > prev_val_loss:
        early_stopping_counter += 1
        if early_stopping_counter >= patience:
            print("Early stopping")
            break
    else:
        early_stopping_counter = 0
        # Save the model if validation loss decreases
        # torch.save(model.state_dict(), 'models/Shallue_model.pth')
        # print("Modelo guardado")
    
    prev_val_loss = val_loss
    loss_train_list.append(train_loss)
    loss_val_list.append(val_loss)
    precision_list.append(precision)
    f1_list.append(f1)
    recall_list.append(recall)


print("Entrenamiento y validación completados.", '------'*20)
print(f'Epoch {epoch+1}/{num_epochs}')
print(f'Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')
print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')
print(f'F1 Score: {f1:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}')
 56%|█████▌    | 56/100 [05:52<04:37,  6.30s/it]
Epoch 00056: reducing learning rate of group 0 to 1.0000e-05.
 67%|██████▋   | 67/100 [07:01<03:27,  6.29s/it]
Epoch 00067: reducing learning rate of group 0 to 1.0000e-06.
 78%|███████▊  | 78/100 [08:10<02:18,  6.29s/it]
Epoch 00078: reducing learning rate of group 0 to 1.0000e-07.
 87%|████████▋ | 87/100 [09:13<01:22,  6.37s/it]
Early stopping
Entrenamiento y validación completados. ------------------------------------------------------------------------------------------------------------------------
Epoch 88/100
Train Loss: 0.6553, Train Accuracy: 0.9280
Validation Loss: 0.7276, Validation Accuracy: 0.8554
F1 Score: 0.8561, Precision: 0.8577, Recall: 0.8554

In [19]:
# plot the loss, precision, f1 and recall graphs
fig, axs = plt.subplots(4, 1, figsize=(10, 15))
axs[0].plot(loss_train_list, label='Train Loss')
axs[0].plot(loss_val_list, label='Validation Loss', color='purple')
axs[0].set_title('Loss')
axs[0].set_xlabel('Epochs')
axs[0].set_ylabel('Loss')
axs[0].legend()
axs[1].plot(precision_list, label='Precision', color='orange')
axs[1].set_title('Precision')
axs[1].set_xlabel('Epochs')
axs[1].set_ylabel('Precision')
axs[1].legend()
axs[2].plot(f1_list, label='F1 Score', color='green')
axs[2].set_title('F1 Score')
axs[2].set_xlabel('Epochs')
axs[2].set_ylabel('F1 Score')
axs[2].legend()
axs[3].plot(recall_list, label='Recall', color='red')
axs[3].set_title('Recall')
axs[3].set_xlabel('Epochs')
axs[3].set_ylabel('Recall')
axs[3].legend()
plt.tight_layout()
plt.show()
No description has been provided for this image
In [60]:
torch.save(model.state_dict(), 'models/Shallue_model_93-76-81.pth')
In [15]:
model = ShallueModel2DSkip(global_size=2001, local_size=201, num_classes=n_classes, wavelet_levels=wavelet_levels).to(device)
model.load_state_dict(torch.load('models/ShallueSkip_best.pth'))
Binary classification → sigmoid output
Wavelet levels: [1, 2, 3, 4]
c_in: 8
Número de features concatenados: 132608
Out[15]:
<All keys matched successfully>
In [20]:
def predict(model, data_loader, loss_fn):
    predicted = []
    probs = []
    labels = []
    loss_test = []

    model.eval()

    with torch.no_grad():
        for data in data_loader:
            global_s, local_s, label = data
            # check if any tensor is empty
            if global_s.numel() == 0 or local_s.numel() == 0:
                predicted.append(np.full_like(label, -1))
                labels.append(np.full_like(label, -1))
                probs.append(np.full_like(label, -1))
                loss_test.append(np.nan)
                continue
            # check if any tensor has nan
            if torch.isnan(global_s).any() or torch.isnan(local_s).any():
                predicted.append(np.full_like(label, -1))
                labels.append(np.full_like(label, -1))
                probs.append(np.full_like(label, -1))
                continue
            
            global_s = global_s.to(device)#.unsqueeze(1).float()
            local_s = local_s.to(device)#.unsqueeze(1).float()
            label = label.to(device)

            
            output = model((global_s, local_s))
            if type(loss_fn) == nn.BCEWithLogitsLoss or type(loss_fn) == FocalLoss:
            # elimnar la dimensión extra de outputs
                output = output.squeeze(1)
                label = label.float()

            loss = loss_fn(output, label)
            loss_test.append(loss.item())
            
            if type(loss_fn) == nn.BCEWithLogitsLoss or type(loss_fn) == FocalLoss:
                predicted.append((output.cpu() > 0.5).float())
            else:
                _, predicted_label = torch.max(output.data, 1)
                predicted.append(predicted_label.cpu().numpy())
            
            labels.append(label.cpu().numpy())
            probs.append(output.cpu().numpy())
    predicted = np.concatenate(predicted)
    labels = np.concatenate(labels)
    probs = np.concatenate(probs)
    # loss_test = np.concatenate(loss_test)
    return predicted, labels, probs, loss_test
In [21]:
def plot_confusion_matrix(y_true, y_pred, classes, title = 'Confusion Matrix'):
    from sklearn.metrics import confusion_matrix
    import seaborn as sns
    import matplotlib.pyplot as plt
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes)
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.title(title)
    # plot stats
    accuracy = accuracy_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred, average='weighted')
    precision = precision_score(y_true, y_pred, average='weighted', zero_division=0)
    recall = recall_score(y_true, y_pred, average='weighted')
    plt.text(2.7, 0.5, f'Accuracy: {accuracy:.4f}\nF1: {f1:.4f}\nPrecision: {precision:.4f}\nRecall: {recall:.4f}', 
             horizontalalignment='center', verticalalignment='center', fontsize=12, color='black')
    plt.show()
    
# plot the ROC curve
def plot_roc_curve(y_true, y_pred, title='ROC Curve'):
    from sklearn.metrics import roc_curve, auc
    fpr, tpr, thresholds = roc_curve(y_true, y_pred)
    roc_auc = auc(fpr, tpr)
    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, color='blue', lw=2, label='ROC curve (area = {:.2f})'.format(roc_auc))
    plt.plot([0, 1], [0, 1], color='red', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(title)
    plt.legend(loc='lower right')
    plt.show()
    
def plot_loss(loss):
    plt.figure(figsize=(10, 5))
    plt.plot(loss)
    plt.title('Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.show()
In [22]:
predicted, labels, probs, loss_serie  = predict(model, test_loader, loss_fn)
classes = ['FALSE POSITIVE', 'CONFIRMED']
plot_confusion_matrix(labels, predicted, classes, title='Test Confusion Matrix')
# plot_loss(loss_serie)
# plot_roc_curve(labels, predicted, title='Test ROC Curve')
No description has been provided for this image

Probar el modelo con los datos de CANDIDATOS¶

In [23]:
global_odd = []
global_even = []
local_odd = []
local_even = []
wl_candidates = []

for lc in tqdm(candidates, desc='Processing light curves'):
    global_odd.append(lc.pliegue_impar_global._light_curve.flux.value)
    global_even.append(lc.pliegue_par_global._light_curve.flux.value)
    local_odd.append(lc.pliegue_impar_local._light_curve.flux.value)
    local_even.append(lc.pliegue_par_local._light_curve.flux.value)
    # Extraer los niveles de wavelet
    wl_candidates.append({
        'global_odd': lc.pliegue_impar_global._lc_w_collection,
        'global_even': lc.pliegue_par_global._lc_w_collection,
        'local_odd': lc.pliegue_impar_local._lc_w_collection,
        'local_even': lc.pliegue_par_local._lc_w_collection
    })

items_cand = []
for i in tqdm(range(len(wl_candidates)), desc='Creating items'):
    global_series = []
    local_series = []
    for l in wavelet_levels:
        if l == 0:
            global_series.append(global_odd[i])
            global_series.append(global_even[i])
            local_series.append(local_odd[i])
            local_series.append(local_even[i])
        else:
            global_series.append(wl_candidates[i]['global_odd'][l-1][0])
            global_series.append(wl_candidates[i]['global_even'][l-1][0])
            local_series.append(wl_candidates[i]['local_odd'][l-1][0])
            local_series.append(wl_candidates[i]['local_even'][l-1][0])

    global_series = np.array(global_series)
    local_series = np.array(local_series)
    item = {
        'global': global_series,
        'local': local_series,
        'label': 0
    }
    items_cand.append(item)
candidates_global = torch.tensor([item['global'] for item in items_cand])
candidates_local = torch.tensor([item['local'] for item in items_cand])
candidates_labels = torch.tensor([item['label'] for item in items_cand])
candidates_dataset = torch.utils.data.TensorDataset(candidates_global, candidates_local, candidates_labels)

print("Tamaño del conjunto de candidatos:", len(candidates_dataset))

candidates_loader = DataLoader(candidates_dataset, batch_size=batch_size, shuffle=False)
print("Número de candidatos:", len(candidates_loader.dataset))
Processing light curves: 100%|██████████| 1955/1955 [00:00<00:00, 99997.13it/s]
Creating items: 100%|██████████| 1955/1955 [00:00<00:00, 29579.97it/s]
Tamaño del conjunto de candidatos: 1955
Número de candidatos: 1955
In [24]:
predicted_candidates, _, probs, _ = predict(model, candidates_loader, loss_fn)
print(len(predicted_candidates))
1955
In [25]:
# contar los positivos y negativos
n_positive = sum(predicted_candidates)
n_negative = len(predicted_candidates) - n_positive
print("Exoplanetas confirmados:", n_positive)
print("Falso positivo:", n_negative)
plt.bar_label(plt.bar(['Exoplanetas confirmados', 'Falso positivo'], [n_positive, n_negative]))
plt.title('Predicciones de candidatos') 
plt.show()
# para el caso de confirmados evaluar la distribución de los valores de probabilidad
probs_confirmed = probs[predicted_candidates == 1]
plt.figure(figsize=(10, 5))
plt.hist(probs_confirmed, bins=50, color='blue', alpha=0.7)
plt.title('Distribución de las probabilidades de los candidatos')
plt.xlabel('Probabilidad')
plt.ylabel('Frecuencia')
# plt.axvline(x=0.5, color='red', linestyle='--', label='Umbral 0.5')
plt.legend()
plt.show()
Exoplanetas confirmados: 689.0
Falso positivo: 1266.0
No description has been provided for this image
No artists with labels found to put in legend.  Note that artists whose label start with an underscore are ignored when legend() is called with no argument.
No description has been provided for this image
In [51]:
# crear un DataFrame con los resultados y guardarlo en un archivo CSV
import pandas as pd
print("Guardando resultados en un DataFrame...")
print("Número de candidatos:", len(predicted_candidates))
print("candidates_dataset:", len(candidates_dataset))
df_results = pd.DataFrame({
    'id': [f'candidate_{i+1}' for i in range(len(predicted_candidates))],
    'kepler_name': [c.headers['Kepler_name'] for c in candidates],
    'predicted': predicted_candidates,
    'probability': probs,
})

df_results.to_csv('results/candidates_predictions.csv', index=False)
print("Resultados guardados en 'results/candidates_predictions.csv'.")
Guardando resultados en un DataFrame...
Número de candidatos: 1955
candidates_dataset: 1955
Resultados guardados en 'results/candidates_predictions.csv'.
In [26]:
# comparar series de un condidato confirmado y uno falso positivo
# coger la posición de un candidato confirmado y uno falso positivo (1 y 0 respectivamente) en las probabilidades
confirmed_index = np.where(probs == 1)[0][0]
false_positive_index = np.where(probs == 0)[0][0]
print("Índice del candidato confirmado:", confirmed_index)
print("Índice del falso positivo:", false_positive_index)

confirmed_global = items_cand[confirmed_index]['global']
confirmed_local = items_cand[confirmed_index]['local']
false_positive_global = items_cand[false_positive_index]['global']
false_positive_local = items_cand[false_positive_index]['local']
fig, ax = plt.subplots(2, 2, figsize=(15, 10))
ax[0, 0].imshow(confirmed_global, aspect='auto', cmap='gray')
ax[0, 0].set_title('Global Confirmed')
ax[0, 0].set_xlabel('Time')
ax[0, 0].set_ylabel('Flux')
ax[0, 1].imshow(confirmed_local, aspect='auto', cmap='gray')
ax[0, 1].set_title('Local Confirmed')
ax[0, 1].set_xlabel('Time')
ax[0, 1].set_ylabel('Flux')
ax[1, 0].imshow(false_positive_global, aspect='auto', cmap='gray')
ax[1, 0].set_title('Global False Positive')
ax[1, 0].set_xlabel('Time')
ax[1, 0].set_ylabel('Flux')
ax[1, 1].imshow(false_positive_local, aspect='auto', cmap='gray')
ax[1, 1].set_title('Local False Positive')
ax[1, 1].set_xlabel('Time')
ax[1, 1].set_ylabel('Flux')
plt.tight_layout()
plt.show()
Índice del candidato confirmado: 14
Índice del falso positivo: 790
No description has been provided for this image