100个机器学习代码深度剖析(1)股票预测

import matplotlib.pyplot as plt

import numpy as np

import pandas as pd

import torch

import torch.nn as nn

import tushare as ts

from sklearn.preprocessing import StandardScaler

from torch.utils.data import TensorDataset

from tqdm import tqdm

timestep = 1  # 时间步长,就是利用多少时间窗口

# 在 LSTM 中,“时间步长”(time step 或 sequence length)是指输入序列中每个样本包含的时间点数量。

# 它是时间序列的一个维度,决定了模型一次处理多少个连续的时间点。

# 时间步长是你希望模型一次性“记住”的时间长度,它取决于数据的时间特性、任务目标、计算资源和模型设计。

# 如果你的数据是时间序列(如股票、传感器、视频帧),时间步长通常对应于你希望模型一次看到的时间长度。

# 例如:股票数据:你可能希望模型每次看到过去 30 天的数据 → 时间步长 = 30

batch_size = 16  # 批次大小

# 1. 提高训练效率

# 一次处理多个样本(比如 16 个)可以充分利用 GPU 并行计算能力。

# 比起每次只处理一个样本(即 batch_size = 1),速度更快。

# 2. 稳定梯度更新

# 每次训练会计算一个“平均梯度”,这样可以减少训练过程中的波动。

# 如果 batch 太小,梯度可能不稳定;太大则可能过于平滑,学习变慢。

input_dim = 4  # 每个步长对应的特征数量,就是使用每天的4个特征,最高、最低、开盘、落盘

hidden_dim = 64  # 隐层大小

# 第一层处理原始输入(比如 [open, high, low, close])。

# 第二层处理第一层的输出。

# 第三层处理第二层的输出。

# 每层的隐藏状态维度都是 hidden_dim(64)。

output_dim = 1  # 由于是回归任务,最终输出层大小为1

# 如果是回归任务(预测一个连续值),通常输出维度就是 1。

# 如果是分类任务(比如预测涨跌),输出维度可能是类别数(比如 2 或更多)。

# 目标变量(你要预测的东西):

# 如果目标是 连续值(可以无限取值,比如价格、温度、长度) → 回归任务线性回归用于回归

# 如果目标是 离散类别(有限个标签,比如涨跌、猫狗、人群分类) → 分类任务逻辑回归

num_layers = 3  # LSTM的层数

# 单层 LSTM 可以捕捉时间序列的基本模式,但可能不足以学习复杂的长期依赖。

# 多层 LSTM(堆叠 LSTM):

# 第一层提取低级时间特征(比如短期波动)。

# 第二层提取更抽象的模式(比如趋势)。

# 第三层进一步整合信息,捕捉更复杂的关系。

# 类似于 CNN 堆叠卷积层,深度增加可以学习更复杂的特征。

# 经验值:

# 1~3 层是常见选择,超过 3 层需要更多数据和正则化,否则容易过拟合。

epochs = 10

best_loss = 0

# 每一轮(epoch)模型会遍历所有训练数据,更新参数。

# 一次遍历通常不足以让模型收敛(学到规律),所以要多次迭代。

# 每次迭代,模型通过 梯度下降 调整参数,使损失函数逐渐减小。

# 1 个 epoch = 所有训练样本都被用来训练一次。

# 如果 batch_size = 16,训练集有 1600 个样本:

# 每个 epoch 会分成 1600 / 16 = 100 个 batch。

# 每个 batch 更新一次参数。

# 欠拟合:模型太简单或训练不够 → 学不到规律

# 过拟合:模型太复杂或训练太久 → 记住训练集,泛化能力差

model_name = ‘LSTM’

save_path = ‘./{}.pth’.format(model_name)#定义模型名称并设置模型保存路径

# 1.加载股票数据

pro = ts.pro_api(‘f89befcaababe24c69c6e4cfb0614e45b43e2c1e95709f9ad6bb2598’)

df = pro.daily(ts_code=’000001.SZ’, start_date=’20130711′, end_date=’20220711′)

# 调用 TuShare 的 日线行情接口,获取股票每天的交易数据。

# 参数说明:

# ts_code=’000001.SZ’:股票代码,这里是 平安银行(深圳交易所)。

# start_date=’20130711’:开始日期(2013年7月11日)。

# end_date=’20220711’:结束日期(2022年7月11日)。

# 返回值:一个 DataFrame,包含每天的开盘价、收盘价、最高价、最低价、成交量等。

pro.daily()

df.index = pd.to_datetime(df.trade_date)  # 索引转为日期

df = df.iloc[::-1]  # 由于获取的数据是倒序的,需要将其调整为正序

# 把 trade_date(原本是字符串)转换成 datetime 类型,并设为索引,方便后续按时间处理。

# TuShare 返回的数据默认是 最新日期在前(倒序),为了训练模型,通常需要 时间正序(最早日期在前),所以用 iloc[::-1] 翻转。

# 2.将数据进行标准化

scaler = StandardScaler()

scaler_model = StandardScaler()

data = scaler_model.fit_transform(np.array(df[[‘open’, ‘high’, ‘low’, ‘close’]]).reshape(-1, 4))

scaler.fit_transform(np.array(df[‘close’]).reshape(-1, 1))

# 创建一个 StandardScaler 对象,用于标准化 目标变量(收盘价)。

# StandardScaler 会把数据转换为均值 0、方差 1 的分布,公式:

# x′=x−μ​/σ

# 其中 μ 是均值,σ是标准差。

# 再创建一个 StandardScaler 对象,用于标准化 输入特征(open、high、low、close)。

# 为什么要两个?

# 一个用于输入特征,一个用于输出目标,方便后续反向转换(inverse_transform)。

# 从 df 中取出四个特征列:open、high、low、close。

# 转换成 NumPy 数组,并 reshape 成 (样本数, 4)。

# 用 scaler_model 标准化这些特征。

# data 是一个标准化后的二维数组,每行对应一天的四个特征。

# 从 df 中取出 close(收盘价)。

# 转换成二维数组 (样本数, 1)。

# 用 scaler 标准化收盘价。

# 为什么要单独处理收盘价?

# 因为它是预测目标,后续绘图或评估时需要反标准化(恢复原始价格)。

# 形成训练数据,例如12345变成12-3,23-4,34-5

def split_data(data, timestep):

    dataX = []  # 保存X

    dataY = []  # 保存Y

    # dataX:保存每个样本的输入序列(例如过去几天的特征)

    # dataY:保存每个样本对应的预测目标(例如下一天的收盘价)

    # 将整个窗口的数据保存到X中,将未来一天保存到Y中

    for index in range(len(data) – timestep):

        dataX.append(data[index: index + timestep])

        dataY.append(data[index + timestep][3])

        data[index: index + timestep]

# 取出一个时间窗口的数据(例如过去1天、过去30天)

# data[index + timestep][3]:取出时间窗口之后一天的第4个特征(即收盘价)

    dataX = np.array(dataX)

    dataY = np.array(dataY)

# 把 Python 的列表转换成 NumPy 数组

    # 获取训练集大小 80%训练集 round换成整数 结果int整数

    train_size = int(np.round(0.8 * dataX.shape[0]))

    # 划分训练集、测试集

    x_train = dataX[: train_size, :].reshape(-1, timestep, 4)

    y_train = dataY[: train_size]

    x_test = dataX[train_size:, :].reshape(-1, timestep, 4)

    y_test = dataY[train_size:]

    return [x_train, y_train, x_test, y_test]

# dataX[: train_size, :]:取前 train_size 个样本作为训练集的输入数据。

# .reshape(-1, timestep, 4):把数据重新调整成 LSTM 所需的三维格式:

# -1 表示自动计算样本数量;

# timestep 是时间步长(1);

# 4 是每个时间步的特征数量(open、high、low、close)。

# # 3.获取训练数据   x_train: 1700,1,4

x_train, y_train, x_test, y_test = split_data(data, timestep)

# 4.将数据转为tensor

x_train_tensor = torch.from_numpy(x_train).to(torch.float32)

y_train_tensor = torch.from_numpy(y_train).to(torch.float32)

x_test_tensor = torch.from_numpy(x_test).to(torch.float32)

y_test_tensor = torch.from_numpy(y_test).to(torch.float32)

# 1. torch.from_numpy(…)

# 把 NumPy 数组(如 x_train)转换成 PyTorch 的张量。

# NumPy 是 Python 中处理数据的标准工具,但 PyTorch 模型不能直接用 NumPy 数据。

# 2. .to(torch.float32)

# 把张量的数据类型转换成 float32(32位浮点数),这是深度学习中最常用的数据类型。

# 如果不转换,有可能会报错或导致模型训练不稳定。

# 5.形成训练数据集

train_data = TensorDataset(x_train_tensor, y_train_tensor)

test_data = TensorDataset(x_test_tensor, y_test_tensor)

# 6.将数据加载成迭代器

train_loader = torch.utils.data.DataLoader(train_data,

                                           batch_size,

                                           True)

test_loader = torch.utils.data.DataLoader(test_data,

                                          batch_size,

                                          False)

# train_data:你之前用 TensorDataset 封装好的训练数据(包含输入和标签)。

# batch_size:每次训练处理多少个样本(你设置的是 16)。

# True:表示在每个 epoch 中 打乱数据顺序(shuffle),有助于提高模型泛化能力。

# DataLoader

# 自动分批处理数据:不用手动写循环来分割数据。

# 支持打乱数据顺序:训练时打乱数据可以防止模型记住数据顺序,提升泛化能力。

# 提高效率:可以结合 GPU 并行处理,提升训练速度。

# 统一接口:训练和测试都可以用相同的方式迭代数据

# 7.定义LSTM网络

class LSTM(nn.Module):

    def __init__(self, input_dim, hidden_dim, num_layers, output_dim):

        super(LSTM, self).__init__()

        self.hidden_dim = hidden_dim  # 隐层大小

        self.num_layers = num_layers  # LSTM层数

        # input_dim为特征维度,就是每个时间点对应的特征数量,这里为4

        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)#batch_first=True 表示输入的形状是 (batch_size, time_step, input_dim)。

        self.fc = nn.Linear(hidden_dim, output_dim)

#创建一个全连接层,用于将 LSTM 的输出映射到最终的预测值

    def forward(self, x):

        output, (h_n, c_n) = self.lstm(x)  # output为所有时间片的输出,形状为:16,1,4

        # print(output.shape) torch.Size([16, 1, 64]) batch_size,timestep,hidden_dim

        # print(h_n.shape) torch.Size([3, 16, 64]) num_layers,batch_size,hidden_dim

        # print(c_n.shape) torch.Size([3, 16, 64]) num_layers,batch_size,hidden_dim

        batch_size, timestep, hidden_dim = output.shape

        # 将output变成 batch_size * timestep, hidden_dim

        output = output.reshape(-1, hidden_dim)

        output = self.fc(output)  # 形状为batch_size * timestep, 1

        output = output.reshape(timestep, batch_size, -1)

        return output[-1]  # 返回最后一个时间片的输出

# 把 LSTM 的输出 reshape 成适合全连接层的格式。

# 用 fc 层得到预测结果。

# 最后返回最后一个时间步的输出作为预测值。

model = LSTM(input_dim, hidden_dim, num_layers, output_dim)  # 定义LSTM网络

loss_function = nn.MSELoss()  # 定义损失函数

optimizer = torch.optim.Adam(model.parameters(), lr=0.01)  # 定义优化器

# 8.模型训练

for epoch in range(epochs):

# epochs 是你设置的训练轮数(比如 10),每一轮都会遍历整个训练集一次。

# 多轮训练可以让模型逐渐学习数据中的规律。

    model.train()

    running_loss = 0

    train_bar = tqdm(train_loader)  # 形成进度条

    for data in train_bar:

        x_train, y_train = data  # 解包迭代器中的X和Y

        optimizer.zero_grad()

        y_train_pred = model(x_train)

        loss = loss_function(y_train_pred, y_train.reshape(-1, 1))

        loss.backward()

        optimizer.step()

        running_loss += loss.item()

        train_bar.desc = “train epoch[{}/{}] loss:{:.3f}”.format(epoch + 1,

                                                                 epochs,

                                                                 loss)

# x_train, y_train = data:从 DataLoader 中取出一个 batch 的数据。

# optimizer.zero_grad():清除上一次的梯度。

# model(x_train):前向传播,得到预测值。

# loss_function(…):计算预测值与真实值之间的误差(MSE)。

# loss.backward():反向传播,计算梯度。

# optimizer.step():更新模型参数。

    # 模型验证

    model.eval()

    test_loss = 0

    with torch.no_grad():

        test_bar = tqdm(test_loader)

        for data in test_bar:

            x_test, y_test = data

            y_test_pred = model(x_test)

            test_loss = loss_function(y_test_pred, y_test.reshape(-1, 1))

    if test_loss < best_loss:

        best_loss = test_loss

        torch.save(model.state_dict(), save_path)

# 如果当前验证损失比之前的最好结果还低,就保存模型参数。

# model.state_dict() 是模型的参数字典。

# save_path 是你设置的保存路径。

#训练集用于“学习”模型参数,而 测试集用于“评估”模型效果,不参与学习过程。

print(‘Finished Training’)

# 9.绘制结果

plt.figure(figsize=(12, 8))

plt.plot(scaler.inverse_transform((model(x_train_tensor).detach().numpy()).reshape(-1, 1)), “b”)

plt.plot(scaler.inverse_transform(y_train_tensor.detach().numpy().reshape(-1, 1)), “r”)

plt.legend()

plt.show()

y_test_pred = model(x_test_tensor)

plt.figure(figsize=(12, 8))

plt.plot(scaler.inverse_transform(y_test_pred.detach().numpy()), “b”)

plt.plot(scaler.inverse_transform(y_test_tensor.detach().numpy().reshape(-1, 1)), “r”)

plt.legend()

plt.show()

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇