수달이네 기술 블로그

6. RNN활용 KOSPI예측(시퀀스데이터) 본문

AI공부/자연어처리

6. RNN활용 KOSPI예측(시퀀스데이터)

슬픈 수달이 2026. 2. 23. 22:48

이번엔 RNN을 활용하여 코스피 데이터같은 시계열 데이터를 예측해 볼 것이다.

Date Open High Low Close Adj Close Volume
04/01/2000 1028.329956 1066.180054 1016.590027 1059.040039 1059.040039 19589800
05/01/2000 1006.869995 1026.52002 984.049988 986.309998 986.309998 25769600
06/01/2000 1013.950012 1014.900024 953.5 960.789978 960.789978 20352300

Date: 날짜 Open: 개장가 High: 최고가 Low: 최저가 Close: 종가 Adj Close: 종가 Volume: 거래량

  • 연속적인 과거 20일을 학습하여 다음날 종가를 예측하는 모델을 만들 것이다.
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim

from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader, TensorDataset
seed = 2026
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
  • 시드 설정: 시드는 아무렇게나 해도 됨
    • np: numpy
    • torch: cpu
    • torch.cuda: gpu
df = pd.read_csv("./dataset/kospi.csv")
df
  • csv를 읽어옴
if 'Date' in df.columns:
    df['Date'] = pd.to_datetime(df['Date'], errors = 'coerce') # 날짜 형식으로 변환
    df = df.sort_values('Date').reset_index(drop=True) # 날짜 타입으로 변환
  • 컬럼들 중 날짜 데이터를 변환해줄 것이다.
    • errors= ‘corece’ 만약 날짜로 변환할 수 없는 값의 경우 에러를 내지 말고, NaT로 변환
    • NaT: Not a Time(NaN과 비슷한데 시간에 적용)
  Date Open High Low Close Adj Close Volume
0 2000-01-02 955.440002 959.309998 923.520020 928.750000 928.750000 30614300.0
1 2000-01-06 730.549988 746.700012 722.239990 738.489990 738.489990 31433500.0
2 2000-01-08 710.049988 727.530029 700.830017 727.099976 727.099976 29793900.0

형식이 변환되었고, 날짜 별로 정렬된 것을 확인할 수 있다.

df = df.ffill() #bfill()
  • ffill(): 비어있는 값이 발견되면 앞의 값으로 처리(front fill)
  • bfill(): 비어있는 값이 있으면 뒤의 값으로 처리(behind fill)

데이터를 보면 일부 비어있는 날들이 몇 개 보이는데 그건 장이 열지 않는 휴장일이기 때문이다.

def set_seed(seed=2026):
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

def make_seq(X, y, L):
    Xs, ys = [], []
    for i in range(len(X) - L): # L = 20일을 줄것임.
        Xs.append(X[i:i+L]) # 예측하는데에 필요한 기간
        ys.append(y[i+L]) # 이후 예측
    Xs = torch.tensor(np.array(Xs), dtype=torch.float32)
    ys = torch.tensor(np.array(ys), dtype=torch.float32).view(-1, 1)
def fill_missing_after_split(train_df, test_df, cols): # train과 test로 나눈 후에 결측치를 채우는 함수
    train_df = train_df.copy()
    test_df = test_df.copy()
    train_df[cols] = train_df[cols].ffill() # 과거값으로 ffill
    bridge = pd.concat([train_df.tail(1), test_df], axis = 0) # train과 test의 경계값을 연결
    # train 마지막행 + test를 이어붙여 ffill후 test만 분리
    bridge[cols] = bridge[cols].ffill() # 연결된 데이터에서 ffill
    test_df[cols] = bridge.iloc[1:][cols].values # test_df에 채워진 값 적용
    return train_df, test_df
  • 결측치가 있으면 이전 값으로 연결하도록 함.

Early Stopping

학습에 더 이상 진전이 없으면 추가 학습하지 않고 종료

class EarlyStopping:
    def __init__(self, patience = 30, min_delta = 1e-6):
        self.patience = patience
        self.min_delta = min_delta
        self.count = 0
        self.best = None
        self.best_state = None
    def step(self, val_loss, model):
        if self.best is None or val_loss < self.best - self.min_delta:
            self.best = val_loss
            self.count = 0
            self.best_state = {k: v.detach().cpu().clone() for k, v in model.state_dict().items()}
            return False
        self.count += 1
        return self.count >= self.patience

    def load_best(self, model, device):
        model.load_state_dict({k: v.to(device) for k, v in self.best_state.items()})

patience: 30번이상 좋아지지 않으면 min_delta = 좋아지는 기준 best = 가장 좋은 값 저장 count = 현재 카운트 best_state = 가장 좋은 상태를 저장

  • step에서 best값을 저장하는 사이클을 짜줌.
  • load_best 로 best값을 가져옴
class RNNRegressor(nn.module):
    def __init__(self, input_size, hidden_size = 64, num_layers = 2, dropout = 0.2):
        super().__init__()
        self.rnn = nn.RNN(input_size = input_size, hidden_size = hidden_size, num_layers = num_layers
                          , batch_first = True, nonlinearity= 'tanh', dropout = dropout if num_layers > 1 else 0.0)
        self.head = nn.Sequential(nn.LayerNorm(hidden_size), nn.Linear(hidden_size, 64), nn.ReLU(),
                                  nn.Dropout(dropout), nn.Linear(64, 1))
    def forward(self, x):
        B = x.size(0)
        h0 = torch.zeros(self.rnn.num_layers, B, self.rnn.hidden_size, device = x.device)
        out, _ = self.rnn(x, h0) #out : (B, L, hidden_size)
        last = out[:, -1, :]
        return self.head(last)
  • RNN모델 설계

데이터 분리 / 정규화

test_ratio = 0.3
test_start = int(len(df) * (1 - test_ratio))
trainval_df = df.iloc[:test_start].copy()
test_df = df.iloc[test_start:].copy()

val_ratio = 0.2
val_start = int(len(trainval_df) * (1 - val_ratio))
train_df = trainval_df.iloc[:val_start].copy()
val_df = trainval_df.iloc[val_start:].copy()
train_df, val_df = fill_missing_after_split(train_df, val_df,  cols = FEATS)
trainval_df, test_df = fill_missing_after_split(trainval_df, test_df, cols = FEATS)
  • test와 val, train 데이터를 분리하고 결측치 처리
scaler_x = MinMaxScaler()
scaler_y = MinMaxScaler()

X_train = scaler_x.fit_transform(train_df[FEATS].values)
X_val = scaler_x.transform(val_df[FEATS].values)
X_test = scaler_x.transform(test_df[FEATS].values)
X_train
# array([[0.23348791, 0.2330902 , 0.22130596, 0.22017413],
#        [0.12607878, 0.13132982, 0.12446595, 0.12910621],
#        [0.11628783, 0.1221546 , 0.11416516, 0.12365438],
#        ...,
#        [0.21824269, 0.21755404, 0.2196509 , 0.21625878],
#        [0.21816626, 0.21748705, 0.21687001, 0.21420059],
#        [0.21197648, 0.21184884, 0.21360321, 0.2108979 ]], shape=(2527, 4))
  • MinMaxScaler를 통해 정규화 시켜주었다.
y_train = scaler_y.fit_transform(train_df[TARGET].values)
y_val = scaler_y.transform(val_df[TARGET].values)
y_test = scaler_y.transform(test_df[TARGET].values)
  • y도 만들어줌

make_seq

L = 20
X_train_seq, y_train_seq = make_seq(X_train, y_train, L)
X_val_seq, y_val_seq = make_seq(X_val, y_val, L)
X_test_seq, y_test_seq = make_seq(X_test, y_test, L)
print(X_train_seq.shape, y_train_seq.shape)
print(X_val_seq.shape, y_val_seq.shape)
print(X_test_seq.shape, y_test_seq.shape)
# torch.Size([2507, 20, 4]) torch.Size([2507, 1])
# torch.Size([612, 20, 4]) torch.Size([612, 1])
# torch.Size([1334, 20, 4]) torch.Size([1334, 1])
  • make_seq함수를 이용해 sequence를 만들어준다.
    • 행의 개수 = 2507
    • 컬럼수 20 = 20개씩 끊어서 가져옴
def make_seq(X, y, L):
    Xs, ys = [], []
    for i in range(len(X) - L): # L = 20일을 줄것임.
        Xs.append(X[i:i+L]) # 예측하는데에 필요한 기간
        ys.append(y[i+L]) # 이후 예측
    Xs = torch.tensor(np.array(Xs), dtype=torch.float32)
    ys = torch.tensor(np.array(ys), dtype=torch.float32).view(-1, 1)
  • 이전에 만들었던 make_seq를 보면
  • 20개의 단위로 끊어서 sequence를 저장해 준다.
  • y는 그 이후의 값을 학습한다.

데이터 로더

batch_size = 64

train_loader = DataLoader(TensorDataset(X_train_seq, y_train_seq), batch_size = batch_size, shuffle = True)
val_loader = DataLoader(TensorDataset(X_val_seq, y_val_seq), batch_size = batch_size, shuffle = False)
test_loader = DataLoader(TensorDataset(X_test_seq, y_test_seq), batch_size = batch_size, shuffle = False)
  • train과 test는 shuffle해줄 필요 없다.

모델 생성/학습

model = RNNRegressor(input_size = X_train_seq.size(2), hidden_size=64, num_layers = 2, dropout = 0.1).to(device)  
model

# RNNRegressor(
#   (rnn): RNN(4, 64, num_layers=2, batch_first=True, dropout=0.1)
#   (head): Sequential(
#     (0): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
#     (1): Linear(in_features=64, out_features=64, bias=True)
#     (2): ReLU()
#     (3): Dropout(p=0.1, inplace=False)
#     (4): Linear(in_features=64, out_features=1, bias=True)
#   )
# )
  • 위에서 정의한 모델을 생성한다.
criterion = nn.MSELoss()
optimizer = optim.AdamW(model.parameters(), lr = 2e-3, weight_decay=1e-4)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode = 'min', factor = 0.5, patience = 10)

손실함수: MSE를 사용

옵티마이저: AdamW(Adam 보다 lr을 더 개별적으로 조정할 수 있다.

스케쥴러: 검증 성능이 더이상 좋아지지 않으면 학습률을 자동으로 낮춰주는 방법

  • mode = ‘min’: 어떤게 감소해야 좋은 것인가를 설정(여기선 min(MSE에서 최솟값이 감소해야 좋음)
  • factor: 50%씩 학습률을 감소시킴
  • patience = 10: 10에폭동안 성능이 개선되지 않으면 LR을 감소시킨다는 으미
def run_epoch(loader, train=True):
    model.train(train)
    total = 0.0
    for xb, yb in loader:
        xb, yb = xb.to(device), yb.to(device)
        if train:
            optimizer.zero_grad()
        pred = model(xb)
        loss = criterion(pred, yb)
        if train:
            loss.backward()
            #clip_norm: 모든 파라미터의 기울기를 모아서 크기가 max_norm을 넘으면 강제로 줄임
            #L2 규제로 현재 norm = 5.0이면 max_norm = 1.0 일때 모든 grad를 0.2배로 줄임
            # 0.5~5.0사이가 안정적
            torch.nn.utils.clip_grad_norm_(model.parameters(), clip_norm)
            optimizer.step()

clip_grad_norm_

  • clip_norm: 모든 파라미터의 기울기를 모아서 크기가 max_norm을 넘으면 강제로 줄임
  • L2 규제로 현재 norm = 5.0이면 max_norm = 1.0 일때 모든 grad를 0.2배로 줄임
  • 0.5~5.0사이가 안정적

참고로, 만약 끝에 ‘_’ 기호가 있으면 반환값이 없음(inplace연산), 이건 반환값이 존재

for epoch in range(1, epochs + 1):
    tr_loss = run_epoch(train_loader, train=True)
    va_loss = run_epoch(val_loader, train=False)
    scheduler.step(va_loss)
    if epoch == 1 or epoch % 20 == 0:
        lr = optimizer.param_groups[0]['lr']
        print(f"[{epoch:03d}/{epochs}] train MSE: {tr_loss:.6f} | val MSE: {va_loss:.6f} | lr: {lr:.2e}")
    if early.step(va_loss, model):
        print(f'Early stop at epoch {epoch}. BEST val MSE: {early.best:.6f}')
        break
early.load_best(model, device)

# [001/300] train MSE: 0.022115 | val MSE: 0.011054 | lr: 2.00e-03
# [020/300] train MSE: 0.003933 | val MSE: 0.014996 | lr: 1.00e-03
# Early stop at epoch 31. BEST val MSE: 0.011054

에폭을 돌리면 위와 같다.

평가

model.eval()
pred_list, true_list = [], []
with torch.no_grad():
    for xb, yb in test_loader:
        xb, yb = xb.to(device), yb.to(device)
        pred = model(xb)
        pred_list.append(pred.cpu().numpy())
        true_list.append(yb.cpu().numpy())
pred_scaled = np.vstack(pred_list) # 배치 단위로 예측한 결과를 하나의 배열로 합침
true_scaled = np.vstack(true_list)

pred_unscaled = scaler_y.inverse_transform(pred_scaled) # 예측값을 원래 스케일로 복원
true_unscaled = scaler_y.inverse_transform(true_scaled)
mse = mean_squared_error(true_unscaled, pred_unscaled)
rmse = np.sqrt(mse)
mae = mean_absolute_error(true_unscaled, pred_unscaled)
r2 = r2_score(true_unscaled, pred_unscaled)
print(f"Test MSE: {mse:.6f} | RMSE: {rmse:.6f} | MAE: {mae:.6f} | R2: {r2:.6f}")

# Test MSE: 19748.472656 | RMSE: 140.529259 | MAE: 127.858833 | R2: 0.571647
  • 4개의 평가지표로 평가해보았다.

이제 실제 차트와 예측 차트를 만들어서 서로 비교해보는 시각화를 해볼 것이다.

  • 하늘색: 실제 데이터 , 노란색 실제 값, 녹색 예측값
  • 녹색과 노란색이 거의 비슷한것으로 보아 예측 성능이 나쁘지 않음을 알 수 있었다.

'AI공부 > 자연어처리' 카테고리의 다른 글

8. GRU(Gated Recurrent Unit)  (0) 2026.02.28
7. LSTM(Long Short Term Memory)  (0) 2026.02.27
5. RNN(Recurrent Neural Network)기초  (1) 2026.02.22
4. FastText  (0) 2026.02.21
3. NSMC를 활용한 단어 분류  (1) 2026.02.20