import os import numpy as np import pandas as pd import torch import torch.nn as nn from torch.utils.data import TensorDataset, DataLoader from sklearn.preprocessing import MinMaxScaler import matplotlib.pyplot as plt # 获取当前文件的绝对路径并构建csv文件路径 # 获取 main.py 所在的目录 current_dir = os.path.dirname(os.path.abspath(__file__)) # 构建CSV文件的相对路径(相对于main.py) csv_relative_path = os.path.join('..', '..', 'huinongbao-app', 'src', 'assets', '慧农宝_final.csv') # 将当前目录与CSV文件的相对路径组合成完整路径,并规范化它 full_name = os.path.normpath(os.path.join(current_dir, csv_relative_path)) print(f"Full path to CSV: {full_name}") # 读取CSV文件并确保日期列是datetime类型,并设置为索引 df = pd.read_csv(full_name, sep=',', encoding='utf-8', engine='python') # 检查数据是否存在,确保顶部数据是2024-12-18,脐橙,8.5,101.74 expected_top_row = ['2024-12-18', '脐橙', '8.5', '101.74'] if not df.empty and df.iloc[0].astype(str).tolist() != expected_top_row: # 如果顶部数据不匹配,输出提示并退出 print("当前CSV文件的顶部数据不符合预期。请检查文件。") exit() # 提示用户输入数据 num_entries = int(input("请输入要添加的条目数量: ")) user_data = { "数值": [], "指数": [] } for i in range(num_entries): value = input(f"请输入第 {i + 1} 个条目的数值: ") index = input(f"请输入第 {i + 1} 个条目的指数: ") user_data["数值"].append(value) user_data["指数"].append(index) # 创建新的DataFrame用于用户输入的数据 new_data = pd.DataFrame({ '日期': pd.date_range(start='2024-12-19', periods=num_entries, freq='D').date, # 仅保留日期部分 '品种': ['脐橙'] * num_entries, '数值': user_data["数值"], '指数': user_data["指数"], }) # 将新的数据插入到DataFrame的顶部 df = pd.concat([new_data, df], ignore_index=True) # 保存更新后的DataFrame到CSV文件 df.to_csv(full_name, index=False, encoding='utf-8') print("CSV文件已更新。") df['日期'] = pd.to_datetime(df['日期']) df.set_index('日期', inplace=True) print(df.head(1)) # 只选择脐橙的数据,并按日期升序排序 orange_df = df[df['品种'] == '脐橙'].sort_index(ascending=True) # 初始化变量 dataX = [] # 属性 dataY = [] # 标签 history_days = 200 future_days = 18 # 创建一个滑动窗口来获取历史数据和未来数据 for i in range(len(orange_df) - history_days - future_days + 1): tempX = orange_df[['数值', '指数']].iloc[i:(i + history_days)].values.tolist() tempY = orange_df[['数值', '指数']].iloc[(i + history_days):(i + history_days + future_days)].values.tolist() dataX.append(tempX) dataY.append(tempY) # 转换为numpy数组 dataX = np.array(dataX) dataY = np.array(dataY) print("dataX shape:", dataX.shape) print("dataY shape:", dataY.shape) # 数据标准化 scaler_X = MinMaxScaler(feature_range=(0, 1)) scaler_Y = MinMaxScaler(feature_range=(0, 1)) dataX_scaled = scaler_X.fit_transform(dataX.reshape(-1, 2)).reshape(dataX.shape) dataY_scaled = scaler_Y.fit_transform(dataY.reshape(-1, 2)).reshape(dataY.shape) # 转换为Tensor dataX_tensor = torch.tensor(dataX_scaled, dtype=torch.float32) dataY_tensor = torch.tensor(dataY_scaled, dtype=torch.float32).view(dataY.shape[0], -1) # 将标签转换为适合模型输出的形状 # 创建数据集和加载器 dataset = TensorDataset(dataX_tensor, dataY_tensor) train_size = int(0.8 * len(dataset)) test_size = len(dataset) - train_size train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size]) train_loader = DataLoader( dataset=train_dataset, batch_size=64, shuffle=True ) test_loader = DataLoader( dataset=test_dataset, batch_size=64, shuffle=False ) # 定义模型(例如使用LSTM) class LSTMModel(nn.Module): def __init__(self, input_dim, hidden_dim, layer_dim, output_dim): super(LSTMModel, self).__init__() self.hidden_dim = hidden_dim self.layer_dim = layer_dim self.lstm = nn.LSTM(input_dim, hidden_dim, layer_dim, batch_first=True) self.fc = nn.Linear(hidden_dim, output_dim) def forward(self, x): h0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).to(x.device) c0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).to(x.device) out, _ = self.lstm(x, (h0, c0)) out = self.fc(out[:, -1, :]) return out input_dim = 2 hidden_dim = 50 layer_dim = 2 output_dim = 2 * future_days # 因为我们预测的是18天的两个特征 model = LSTMModel(input_dim, hidden_dim, layer_dim, output_dim) criterion = nn.MSELoss() optimizer = torch.optim.Adam(model.parameters(), lr=0.001) # 初始化损失列表 lossList = [] lossListTest = [] num_epochs = 100 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') model.to(device) for epoch in range(num_epochs): model.train() running_loss = 0.0 # 训练模型 for i, (inputs, labels) in enumerate(train_loader): inputs, labels = inputs.to(device), labels.to(device) # 前向传播 outputs = model(inputs) loss = criterion(outputs, labels) # 反向传播和优化 optimizer.zero_grad() loss.backward() optimizer.step() running_loss += loss.item() avg_train_loss = running_loss / len(train_loader) lossList.append(avg_train_loss) # 测试模型 model.eval() test_loss = 0.0 with torch.no_grad(): predictions, actuals = [], [] for inputs, labels in test_loader: inputs, labels = inputs.to(device), labels.to(device) outputs = model(inputs) loss = criterion(outputs, labels) test_loss += loss.item() predictions.append(outputs.cpu().numpy()) actuals.append(labels.cpu().numpy()) avg_test_loss = test_loss / len(test_loader) lossListTest.append(avg_test_loss) # 在每个epoch后添加测试损失 print(f'Epoch [{epoch + 1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, Test Loss: {avg_test_loss:.4f}') # 绘制损失下降图 # plt.figure(figsize=(10, 6)) # plt.plot(list(range(1, num_epochs + 1)), lossList, label='Train') # plt.plot(list(range(1, num_epochs + 1)), lossListTest, label='Test') # plt.legend() # plt.xlabel('epoch') # plt.ylabel('loss') # plt.title('Loss over epochs') # plt.show() latest_data = orange_df[['数值', '指数']].iloc[-history_days:].values # 标准化最新数据 latest_data_scaled = scaler_X.transform(latest_data) # 转换为张量并添加批次维度 latest_data_tensor = torch.tensor(latest_data_scaled, dtype=torch.float32).unsqueeze(0).to(device) # 模型推理 model.eval() with torch.no_grad(): future_predictions = model(latest_data_tensor) # 反标准化预测值 future_predictions_unscaled = scaler_Y.inverse_transform(future_predictions.cpu().numpy().reshape(-1, 2)) print("预测的未来18天的数值 (价格) 和指数:") for i in range(future_days): print(f"Day {i+1}: 数值 (价格) = {future_predictions_unscaled[i, 0]:.2f}, 指数 = {future_predictions_unscaled[i, 1]:.2f}") # plt.figure(figsize=(14, 7)) # days = list(range(1, future_days + 1)) # plt.plot(days, future_predictions_unscaled[:, 0], label='预测值 (数值)', marker='o') # plt.plot(days, future_predictions_unscaled[:, 1], label='预测值 (指数)', marker='x') # plt.title('预测的未来18天的数值 (价格) 和指数') # plt.xlabel('天数') # plt.ylabel('值') # plt.legend() # plt.show() # # 测试模型 # model.eval() # with torch.no_grad(): # predictions, actuals = [], [] # for inputs, labels in test_loader: # inputs, labels = inputs.to(device), labels.to(device).view(labels.size(0), -1) # outputs = model(inputs) # predictions.append(outputs.cpu().numpy()) # actuals.append(labels.cpu().numpy()) # # predictions = np.concatenate(predictions) # actuals = np.concatenate(actuals) # # # 反标准化 # predictions_unscaled = scaler_Y.inverse_transform(predictions.reshape(-1, 2)) # actuals_unscaled = scaler_Y.inverse_transform(actuals.reshape(-1, 2)) # # 可视化结果 # plt.figure(figsize=(14, 18)) # plt.plot(actuals_unscaled[:, 0], label='真实值 (数值)') # plt.plot(predictions_unscaled[:, 0], label='预测值 (数值)', linestyle='--') # plt.plot(actuals_unscaled[:, 1], label='真实值 (指数)') # plt.plot(predictions_unscaled[:, 1], label='预测值 (指数)', linestyle='--') # plt.title('脐橙 数值 和 指数 预测 vs 真实值') # plt.xlabel('样本编号') # plt.ylabel('值') # plt.legend() # plt.show()