본문 바로가기
Personal Projects/Model

[생성 신경망] CGAN 모델 구현

by muns91 2024. 4. 22.
Conditional GAN (CGAN) 실습

 

 이전 글에서는 CGAN에 대한 이론을 살펴보았습니다. 그럼 이론을 살펴보았으니, 코드를 보면서 구현 실습을 하도록 하겠습니다. 코드에 대한 참고는 글 맨 하단에서 참고하실 수 있으며, 실습을 원하시는 분들은 제가 실습한 자료가 있는 깃 허브를 참고하시길 바랍니다. 

 

 

만들면서 배우는 생성 AI

딥러닝 기초부터 최신 생성 AI 모델까지 설명합니다. 텐서플로와 케라스를 사용해 변이형 오토인코더(VAE), 생성적 적대 신경망(GAN), 트랜스포머, 노멀라이징 플로 모델, 에너지 기반 모델, 잡음

www.aladin.co.kr

 


 

라이브러리 & 모듈 import

import os
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers, models, metrics, optimizers
from tensorflow.keras.datasets import mnist

# MNIST 데이터 로드 및 전처리
def load_mnist_data():
    (X_train, y_train), (_, _) = mnist.load_data()
    X_train = X_train.reshape(X_train.shape[0], 28, 28, 1).astype('float32')
    X_train = (X_train - 127.5) / 127.5  # 이미지를 [-1, 1] 범위로 정규화
    y_train = tf.keras.utils.to_categorical(y_train, 10)  # One-hot encoding
    return X_train, y_train

X_train, y_train = load_mnist_data()

 

 해당 코드에서는 학습에 필요한 라이브러리를 불러오고 MNIST 데이터 세트를 로드하며 전처리하는 과정을 포함합니다. 

 

  • 함수 정의: load_mnist_data라는 함수를 정의하여 MNIST 데이터를 로드하고 전처리하는 작업을 수행합니다. 함수는 X_train (훈련 이미지)과 y_train (훈련 레이블)을 반환합니다.
  • 데이터 로드: mnist.load_data()를 통해 MNIST 데이터셋을 로드합니다. 이 데이터셋은 손으로 쓴 숫자 이미지(0-9)를 포함하고 있습니다.
  • 이미지 Reshape 및 정규화
    • X_train.reshape(X_train.shape[0], 28, 28, 1): 각 이미지를 28x28 픽셀 크기의 1채널(흑백) 이미지로 변환합니다.
    • X_train.astype('float32'): 이미지의 데이터 타입을 32비트 부동 소수점으로 변경합니다.
    • (X_train - 127.5) / 127.5: 이미지 픽셀 값을 [-1, 1] 범위로 정규화합니다. 이는 일반적으로 신경망의 성능을 향상시키는 데 도움을 줍니다.
  • 레이블 원-핫 인코딩:
    tf.keras.utils.to_categorical(y_train, 10): 숫자 레이블을 원-핫 인코딩 형식으로 변환합니다. 이는 각 레이블을 길이가 10인 벡터로 변환하며, 해당하는 위치만 1이고 나머지는 0입니다. 예를 들어, 숫자 '3'은 [0, 0, 0, 1, 0, 0, 0, 0, 0, 0]로 표현됩니다. 

 

하이퍼 파라미터

class Hyperparameters:
    def __init__(self):
        self.image_size = (28, 28, 1)
        self.classes = 10
        self.batch_size = 32
        self.z_dim = 100
        self.learning_rate = 0.00005
        self.adam_beta_1 = 0.5
        self.adam_beta_2 = 0.999
        self.epochs = 30
        self.critic_steps = 5
        self.gp_weight = 10.0

 

 하이퍼 파라미터라는 클래스를 정의하여 기계 학습 모델에서 사용될 하이퍼 파라미터를 캡슐화하였습니다. 

  • image_size: 모델에 입력될 이미지의 크기를 지정합니다. 여기서는 MNIST 데이터셋의 이미지 크기인 28x28 픽셀, 그리고 흑백 이미지이므로 채널 수는 1입니다.
  • classes: 분류할 클래스의 수입니다. MNIST 데이터셋은 0부터 9까지의 10개 숫자를 포함하므로 클래스 수는 10입니다.
  • batch_size: 훈련 데이터를 몇 개씩 묶어 처리할지 결정합니다. 이 값은 메모리 사용량과 훈련 속도에 영향을 미치며, 일반적으로 더 큰 배치 크기는 하드웨어가 지원하는 한도 내에서 더 빠른 학습을 가능하게 합니다.
  • z_dim: 잠재 공간 벡터의 차원으로, 생성자에 입력될 노이즈 벡터의 크기를 의미합니다. 이 차원 수는 생성된 이미지의 다양성에 영향을 미칩니다.
  • learning_rate: 학습률은 최적화 알고리즘에서 파라미터 업데이트 시 사용하는 스텝 크기를 결정합니다. 너무 높으면 학습이 불안정해지고, 너무 낮으면 학습 속도가 느려집니다.
  • adam_beta_1 및 adam_beta_2: 이 두 매개변수는 Adam 최적화 알고리즘에서 각각 일차 모멘트 및 이차 모멘트 추정의 감쇠율을 결정합니다. 일반적으로 beta_1은 0.9, beta_2는 0.999가 기본값입니다.
  • epochs: 전체 훈련 데이터셋을 몇 번 반복해서 학습할지 지정합니다. 에폭 수가 많을수록 모델은 더 많은 데이터를 보고 학습할 수 있지만, 오버피팅의 위험도 증가할 수 있습니다.
  • critic_steps: WGAN-GP에서 사용되는 용어로, 생성자를 한 번 업데이트하기 전에 판별자를 몇 번 업데이트할지 결정합니다. 이 값은 판별자와 생성자 간의 균형을 맞추는 데 중요합니다.
  • gp_weight: 그래디언트 패널티의 가중치로, 판별자의 그래디언트를 제한함으로써 학습 과정의 안정성을 도모합니다.

 

비평자

def make_discriminator_model(input_shape=(28, 28, 1), num_classes=10):
    image = layers.Input(shape=input_shape)
    label = layers.Input(shape=(num_classes,))
    label_img = layers.Dense(input_shape[0]*input_shape[1], activation='linear')(label)
    label_img = layers.Reshape((input_shape[0], input_shape[1], 1))(label_img)
    x = layers.Concatenate()([image, label_img])
    x = layers.Conv2D(64, (5, 5), strides=(2, 2), padding='same')(x)
    x = layers.LeakyReLU()(x)
    x = layers.Dropout(0.3)(x)
    x = layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same')(x)
    x = layers.LeakyReLU()(x)
    x = layers.Dropout(0.3)(x)
    x = layers.Flatten()(x)
    x = layers.Dense(1)(x)
    model = models.Model([image, label], x)
    model.summary()
    return model

 

 조건부 GAN의 비평자 모델을 구성하는 함수를 정의하였습니다. 이 판별자는 이미지와 연관된 레이블을 입력으로 받아 이미지가 진짜일 확률을 추정합니다. 

 

 

생성자

def make_generator_model(z_dim, num_classes=10):
    noise = layers.Input(shape=(z_dim,))
    label = layers.Input(shape=(num_classes,))
    x = layers.Concatenate()([noise, label])
    x = layers.Dense(7*7*256, use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU()(x)
    x = layers.Reshape((7, 7, 256))(x)
    x = layers.Conv2DTranspose(128, (5, 5), strides=(1, 1), padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU()(x)
    x = layers.Conv2DTranspose(64, (5, 5), strides=(2, 2), padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU()(x)
    x = layers.Conv2DTranspose(1, (5, 5), strides=(2, 2), padding='same', use_bias=False, activation='tanh')(x)
    model = models.Model([noise, label], x)
    model.summary()
    return model

 

 해당 코드는 CGAN의 생성자 모델을 정의하는 함수입니다. 생성자는 노이즈와 레이블을 입력 받아, 이를 통해 새로운 이미지를 새성합니다. 

 

 

조건부 WGAN

class ConditionalWGAN(models.Model):
    def __init__(self, generator, discriminator, latent_dim=100, critic_steps=5, gp_weight=10.0):
        super(ConditionalWGAN, self).__init__()
        self.generator = generator
        self.discriminator = discriminator
        self.latent_dim = latent_dim
        self.critic_steps = critic_steps
        self.gp_weight = gp_weight
        self.d_optimizer = optimizers.Adam(1e-4)
        self.g_optimizer = optimizers.Adam(1e-4)

    def compile(self):
        super(ConditionalWGAN, self).compile()
        self.d_loss_metric = metrics.Mean(name="d_loss")
        self.g_loss_metric = metrics.Mean(name="g_loss")

    @property
    def metrics(self):
        return [self.d_loss_metric, self.g_loss_metric]

    def gradient_penalty(self, batch_size, real_images, fake_images, labels):
        """Calculates the gradient penalty."""
        alpha = tf.random.normal([batch_size, 1, 1, 1], 0.0, 1.0)
        diff = fake_images - real_images
        interpolated = real_images + alpha * diff

        with tf.GradientTape() as gp_tape:
            gp_tape.watch(interpolated)
            pred = self.discriminator([interpolated, labels], training=True)

        grads = gp_tape.gradient(pred, [interpolated])[0]
        norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2, 3]))
        gp = tf.reduce_mean((norm - 1.0) ** 2)
        return gp

    def train_step(self, data):
        real_images, labels = data
        batch_size = tf.shape(real_images)[0]

        for _ in range(self.critic_steps):
            random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
            with tf.GradientTape() as tape:
                fake_images = self.generator([random_latent_vectors, labels], training=True)
                fake_logits = self.discriminator([fake_images, labels], training=True)
                real_logits = self.discriminator([real_images, labels], training=True)
                d_cost = tf.reduce_mean(fake_logits) - tf.reduce_mean(real_logits)
                gp = self.gradient_penalty(batch_size, real_images, fake_images, labels)
                d_loss = d_cost + gp * self.gp_weight

            d_gradient = tape.gradient(d_loss, self.discriminator.trainable_variables)
            self.d_optimizer.apply_gradients(zip(d_gradient, self.discriminator.trainable_variables))

        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
        with tf.GradientTape() as tape:
            generated_images = self.generator([random_latent_vectors, labels], training=True)
            generated_logits = self.discriminator([generated_images, labels], training=True)
            g_loss = -tf.reduce_mean(generated_logits)

        g_gradient = tape.gradient(g_loss, self.generator.trainable_variables)
        self.g_optimizer.apply_gradients(zip(g_gradient, self.generator.trainable_variables))

        self.d_loss_metric.update_state(d_loss)
        self.g_loss_metric.update_state(g_loss)

        return {"d_loss": self.d_loss_metric.result(), "g_loss": self.g_loss_metric.result()}

 

  1. 초기화 함수 (__init__):
    • 인자 설명:
      • generator: 이미지를 생성하는 신경망 모델입니다.
      • discriminator: 생성된 이미지와 실제 이미지를 구분하는 신경망 모델입니다.
      • latent_dim: 생성자에 입력되는 잡음 벡터의 차원입니다.
      • critic_steps: 각 생성자 업데이트 전에 수행되는 판별자(critic)의 업데이트 횟수입니다.
      • gp_weight: 그래디언트 패널티의 가중치로, 판별자의 그래디언트를 제어하여 훈련을 안정화합니다.
    • 속성 설정:
      • 모델 내부에서 사용할 변수들(생성자, 판별자, 최적화기, 손실 측정 지표)을 초기화합니다.
  2. 컴파일 함수 (compile):
    • super().compile(): 상위 클래스의 컴파일 함수를 호출하여 기본 설정을 수행합니다.
    • 손실 지표(d_loss_metric, g_loss_metric)를 초기화하여 훈련 중 손실을 추적합니다.
  3. 메트릭 속성 (metrics):
    • 훈련 중 계산할 손실 지표를 리스트로 반환합니다. 이는 TensorFlow가 훈련 과정에서 손실 값을 자동으로 계산하고 출력할 수 있게 합니다.
  4. 그래디언트 패널티 함수 (gradient_penalty):
    • 판별자의 그래디언트를 계산하여 1에서 벗어난 정도에 대한 패널티를 적용합니다. 이를 통해 훈련 과정에서 판별자가 너무 강력해져 생성자가 학습할 기회를 상실하는 것을 방지합니다.
  5. 훈련 단계 정의 (train_step):
    • 실제 훈련 과정에서 실행되는 단계로, 여기서 실제 이미지와 레이블을 사용해 판별자와 생성자의 업데이트를 수행합니다.
    • 판별자 업데이트: critic_steps만큼 반복하며 판별자의 손실(d_loss)을 계산하고 최적화합니다.
      생성자 업데이트: 생성자로부터 이미지를 생성하고, 이를 판별자가 평가한 결과를 통해 생성자의 손실(g_loss)을 계산하고 최적화합니다.
    • 마지막으로, 손실 값을 업데이트하고 훈련 단계의 결과로 손실 값을 반환합니다.

 

데이터 셋 준비 및 모델 훈련

if not os.path.exists('output'):
    os.makedirs('output')
    
    
params = Hyperparameters()
generator = make_generator_model(params.z_dim)
discriminator = make_discriminator_model(params.image_size)
cgan = ConditionalWGAN(generator, discriminator, params.z_dim, params.critic_steps, params.gp_weight)
cgan.compile()

# 데이터셋 준비
train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).shuffle(1000).batch(params.batch_size)
# save_images_callback = SaveImagesCallback(generator, params.z_dim)

# 모델 훈련
cgan.fit(train_dataset, epochs=params.epochs)

 

  1. 출력 디렉토리 확인 및 생성
    • 이 부분은 훈련 중 생성된 이미지나 다른 출력 파일을 저장할 디렉토리를 확인하고, 해당 디렉토리가 존재하지 않을 경우 새로 생성합니다. os.path.exists 함수는 주어진 경로의 디렉토리나 파일이 존재하는지 확인하고, os.makedirs는 새 디렉토리를 생성합니다.
  2. 하이퍼 파라미터, 모델 생성 및 컴파일
    • Hyperparameters 클래스 인스턴스를 생성하여 모델의 설정 값들을 초기화합니다.
    • make_generator_model과 make_discriminator_model 함수를 사용해 각각 생성자와 판별자 모델을 생성합니다. 이들은 params에서 정의된 설정을 사용합니다.
    • ConditionalWGAN 클래스를 이용하여 CGAN 모델을 초기화하고, compile() 메소드를 호출하여 모델을 컴파일합니다. 이 과정에서 손실 함수, 최적화 알고리즘 등이 설정됩니다.
  3. 데이터 셋 준비
    • tf.data.Dataset.from_tensor_slices 메소드를 사용하여 훈련 데이터(X_train, y_train)로부터 TensorFlow 데이터셋 객체를 생성합니다.
    • shuffle(1000)은 데이터셋의 샘플을 무작위로 섞어주며, 여기서 1000은 버퍼 크기를 의미합니다. 이는 데이터를 랜덤하게 섞어 과적합을 방지하는 데 도움을 줍니다.
    • batch(params.batch_size)는 지정된 배치 크기로 데이터를 분할하여 모델 훈련 시 각 스텝에서 처리할 데이터 양을 정의합니다.
  4. 모델 훈련
    • cgan.fit 메소드를 사용하여 준비된 데이터셋(train_dataset)에 대해 훈련을 수행합니다. 
    • epochs=params.epochs는 훈련을 몇 번 반복할지 지정합니다.

 

이미지 확인

import numpy as np
import matplotlib.pyplot as plt

# 하이퍼파라미터 설정
z_dim = 100  # 또는 Hyperparameters 클래스에서 정의된 값을 사용하세요.

# 그리드 차원 설정
image_grid_rows = 10
image_grid_columns = 5

# 랜덤한 잡음 샘플링
z = np.random.normal(0, 1, (image_grid_rows * image_grid_columns, z_dim))

# 생성할 이미지 레이블 준비 (0-9까지 각 숫자를 5번씩 생성)
labels_to_generate = np.array([i for i in range(10) for _ in range(5)])
labels_to_generate = tf.keras.utils.to_categorical(labels_to_generate, 10)  # 원-핫 인코딩

# 랜덤한 잡음에서 이미지 생성
gen_imgs = generator.predict([z, labels_to_generate])

# 이미지 픽셀 값 [0, 1] 사이로 스케일 변환
gen_imgs = 0.5 * gen_imgs + 0.5

# 이미지 그리드 설정 및 출력
fig, axs = plt.subplots(image_grid_rows, image_grid_columns, figsize=(10, 20), sharey=True, sharex=True)

cnt = 0
for i in range(image_grid_rows):
    for j in range(image_grid_columns):
        axs[i, j].imshow(gen_imgs[cnt, :, :, 0], cmap='gray')
        axs[i, j].axis('off')
        axs[i, j].set_title(f"Digit: {np.argmax(labels_to_generate[cnt])}")  # 각 이미지의 레이블을 타이틀로 표시
        cnt += 1

plt.show()

 

 

 

 


 

마무리 

 여기까지 CGAN에 대한 실습 코드를 살펴보았습니다. GAN을 시작으로 하여 CGAN까지 실습을 수행하면서 기본적인 MNIST 데이터를 사용함에도 파라미터 조정과 모델을 구축하는 과정에서 시행착오가 많았습니다. 그리고 CGAN 외에도 다양한 GAN이 있으니, 틈틈히 모델들을 조사해보고 구현해보기 위해서 계속해서 공부를 수행해야겠습니다. 앞으로도 더 많은 모델들을 공부하면서 상황에 따라서 적합한 모델을 찾아 문제를 해결할 수 있는 사람이 되도록 노력해야겠습니다. 그럼, 오늘의 실습은 여기서 마무리하도록 하겠습니다. 

 

참고

1. 논문 : https://arxiv.org/abs/1411.1784

2. 만들면서 배우는 생성 AI : https://www.aladin.co.kr/shop/wproduct.aspx?ItemId=324278784

 

만들면서 배우는 생성 AI

딥러닝 기초부터 최신 생성 AI 모델까지 설명합니다. 텐서플로와 케라스를 사용해 변이형 오토인코더(VAE), 생성적 적대 신경망(GAN), 트랜스포머, 노멀라이징 플로 모델, 에너지 기반 모델, 잡음

www.aladin.co.kr

 

반응형