123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260 |
- import os
- import numpy as np
- import pandas as pd
- import torch
- import torch.nn as nn
- from torch.utils.data import TensorDataset, DataLoader
- from sklearn.preprocessing import MinMaxScaler
- import matplotlib.pyplot as plt
- # 获取当前文件的绝对路径并构建csv文件路径
- # 获取 main.py 所在的目录
- current_dir = os.path.dirname(os.path.abspath(__file__))
- # 构建CSV文件的相对路径(相对于main.py)
- csv_relative_path = os.path.join('..', '..', 'huinongbao-app', 'src', 'assets', '慧农宝_final.csv')
- # 将当前目录与CSV文件的相对路径组合成完整路径,并规范化它
- full_name = os.path.normpath(os.path.join(current_dir, csv_relative_path))
- print(f"Full path to CSV: {full_name}")
- # 读取CSV文件并确保日期列是datetime类型,并设置为索引
- df = pd.read_csv(full_name, sep=',', encoding='utf-8', engine='python')
- # 检查数据是否存在,确保顶部数据是2024-12-18,脐橙,8.5,101.74
- expected_top_row = ['2024-12-18', '脐橙', '8.5', '101.74']
- if not df.empty and df.iloc[0].astype(str).tolist() != expected_top_row:
- # 如果顶部数据不匹配,输出提示并退出
- print("当前CSV文件的顶部数据不符合预期。请检查文件。")
- exit()
- # 提示用户输入数据
- num_entries = int(input("请输入要添加的条目数量: "))
- user_data = {
- "数值": [],
- "指数": []
- }
- for i in range(num_entries):
- value = input(f"请输入第 {i + 1} 个条目的数值: ")
- index = input(f"请输入第 {i + 1} 个条目的指数: ")
- user_data["数值"].append(value)
- user_data["指数"].append(index)
- # 创建新的DataFrame用于用户输入的数据
- new_data = pd.DataFrame({
- '日期': pd.date_range(start='2024-12-19', periods=num_entries, freq='D').date, # 仅保留日期部分
- '品种': ['脐橙'] * num_entries,
- '数值': user_data["数值"],
- '指数': user_data["指数"],
- })
- # 将新的数据插入到DataFrame的顶部
- df = pd.concat([new_data, df], ignore_index=True)
- # 保存更新后的DataFrame到CSV文件
- df.to_csv(full_name, index=False, encoding='utf-8')
- print("CSV文件已更新。")
- df['日期'] = pd.to_datetime(df['日期'])
- df.set_index('日期', inplace=True)
- print(df.head(1))
- # 只选择脐橙的数据,并按日期升序排序
- orange_df = df[df['品种'] == '脐橙'].sort_index(ascending=True)
- # 初始化变量
- dataX = [] # 属性
- dataY = [] # 标签
- history_days = 200
- future_days = 18
- # 创建一个滑动窗口来获取历史数据和未来数据
- for i in range(len(orange_df) - history_days - future_days + 1):
- tempX = orange_df[['数值', '指数']].iloc[i:(i + history_days)].values.tolist()
- tempY = orange_df[['数值', '指数']].iloc[(i + history_days):(i + history_days + future_days)].values.tolist()
- dataX.append(tempX)
- dataY.append(tempY)
- # 转换为numpy数组
- dataX = np.array(dataX)
- dataY = np.array(dataY)
- print("dataX shape:", dataX.shape)
- print("dataY shape:", dataY.shape)
- # 数据标准化
- scaler_X = MinMaxScaler(feature_range=(0, 1))
- scaler_Y = MinMaxScaler(feature_range=(0, 1))
- dataX_scaled = scaler_X.fit_transform(dataX.reshape(-1, 2)).reshape(dataX.shape)
- dataY_scaled = scaler_Y.fit_transform(dataY.reshape(-1, 2)).reshape(dataY.shape)
- # 转换为Tensor
- dataX_tensor = torch.tensor(dataX_scaled, dtype=torch.float32)
- dataY_tensor = torch.tensor(dataY_scaled, dtype=torch.float32).view(dataY.shape[0], -1) # 将标签转换为适合模型输出的形状
- # 创建数据集和加载器
- dataset = TensorDataset(dataX_tensor, dataY_tensor)
- train_size = int(0.8 * len(dataset))
- test_size = len(dataset) - train_size
- train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])
- train_loader = DataLoader(
- dataset=train_dataset,
- batch_size=64,
- shuffle=True
- )
- test_loader = DataLoader(
- dataset=test_dataset,
- batch_size=64,
- shuffle=False
- )
- # 定义模型(例如使用LSTM)
- class LSTMModel(nn.Module):
- def __init__(self, input_dim, hidden_dim, layer_dim, output_dim):
- super(LSTMModel, self).__init__()
- self.hidden_dim = hidden_dim
- self.layer_dim = layer_dim
- self.lstm = nn.LSTM(input_dim, hidden_dim, layer_dim, batch_first=True)
- self.fc = nn.Linear(hidden_dim, output_dim)
- def forward(self, x):
- h0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).to(x.device)
- c0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).to(x.device)
- out, _ = self.lstm(x, (h0, c0))
- out = self.fc(out[:, -1, :])
- return out
- input_dim = 2
- hidden_dim = 50
- layer_dim = 2
- output_dim = 2 * future_days # 因为我们预测的是18天的两个特征
- model = LSTMModel(input_dim, hidden_dim, layer_dim, output_dim)
- criterion = nn.MSELoss()
- optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
- # 初始化损失列表
- lossList = []
- lossListTest = []
- num_epochs = 100
- device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
- model.to(device)
- for epoch in range(num_epochs):
- model.train()
- running_loss = 0.0
- # 训练模型
- for i, (inputs, labels) in enumerate(train_loader):
- inputs, labels = inputs.to(device), labels.to(device)
- # 前向传播
- outputs = model(inputs)
- loss = criterion(outputs, labels)
- # 反向传播和优化
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
- running_loss += loss.item()
- avg_train_loss = running_loss / len(train_loader)
- lossList.append(avg_train_loss)
- # 测试模型
- model.eval()
- test_loss = 0.0
- with torch.no_grad():
- predictions, actuals = [], []
- for inputs, labels in test_loader:
- inputs, labels = inputs.to(device), labels.to(device)
- outputs = model(inputs)
- loss = criterion(outputs, labels)
- test_loss += loss.item()
- predictions.append(outputs.cpu().numpy())
- actuals.append(labels.cpu().numpy())
- avg_test_loss = test_loss / len(test_loader)
- lossListTest.append(avg_test_loss) # 在每个epoch后添加测试损失
- print(f'Epoch [{epoch + 1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, Test Loss: {avg_test_loss:.4f}')
- # 绘制损失下降图
- # plt.figure(figsize=(10, 6))
- # plt.plot(list(range(1, num_epochs + 1)), lossList, label='Train')
- # plt.plot(list(range(1, num_epochs + 1)), lossListTest, label='Test')
- # plt.legend()
- # plt.xlabel('epoch')
- # plt.ylabel('loss')
- # plt.title('Loss over epochs')
- # plt.show()
- latest_data = orange_df[['数值', '指数']].iloc[-history_days:].values
- # 标准化最新数据
- latest_data_scaled = scaler_X.transform(latest_data)
- # 转换为张量并添加批次维度
- latest_data_tensor = torch.tensor(latest_data_scaled, dtype=torch.float32).unsqueeze(0).to(device)
- # 模型推理
- model.eval()
- with torch.no_grad():
- future_predictions = model(latest_data_tensor)
- # 反标准化预测值
- future_predictions_unscaled = scaler_Y.inverse_transform(future_predictions.cpu().numpy().reshape(-1, 2))
- print("预测的未来18天的数值 (价格) 和指数:")
- for i in range(future_days):
- print(f"Day {i+1}: 数值 (价格) = {future_predictions_unscaled[i, 0]:.2f}, 指数 = {future_predictions_unscaled[i, 1]:.2f}")
- # plt.figure(figsize=(14, 7))
- # days = list(range(1, future_days + 1))
- # plt.plot(days, future_predictions_unscaled[:, 0], label='预测值 (数值)', marker='o')
- # plt.plot(days, future_predictions_unscaled[:, 1], label='预测值 (指数)', marker='x')
- # plt.title('预测的未来18天的数值 (价格) 和指数')
- # plt.xlabel('天数')
- # plt.ylabel('值')
- # plt.legend()
- # plt.show()
- # # 测试模型
- # model.eval()
- # with torch.no_grad():
- # predictions, actuals = [], []
- # for inputs, labels in test_loader:
- # inputs, labels = inputs.to(device), labels.to(device).view(labels.size(0), -1)
- # outputs = model(inputs)
- # predictions.append(outputs.cpu().numpy())
- # actuals.append(labels.cpu().numpy())
- #
- # predictions = np.concatenate(predictions)
- # actuals = np.concatenate(actuals)
- #
- # # 反标准化
- # predictions_unscaled = scaler_Y.inverse_transform(predictions.reshape(-1, 2))
- # actuals_unscaled = scaler_Y.inverse_transform(actuals.reshape(-1, 2))
- # # 可视化结果
- # plt.figure(figsize=(14, 18))
- # plt.plot(actuals_unscaled[:, 0], label='真实值 (数值)')
- # plt.plot(predictions_unscaled[:, 0], label='预测值 (数值)', linestyle='--')
- # plt.plot(actuals_unscaled[:, 1], label='真实值 (指数)')
- # plt.plot(predictions_unscaled[:, 1], label='预测值 (指数)', linestyle='--')
- # plt.title('脐橙 数值 和 指数 预测 vs 真实值')
- # plt.xlabel('样本编号')
- # plt.ylabel('值')
- # plt.legend()
- # plt.show()
|