WeightedLayerPooling.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. from __future__ import annotations
  2. import json
  3. import os
  4. import torch
  5. from safetensors.torch import load_model as load_safetensors_model
  6. from safetensors.torch import save_model as save_safetensors_model
  7. from torch import Tensor, nn
  8. class WeightedLayerPooling(nn.Module):
  9. """Token embeddings are weighted mean of their different hidden layer representations"""
  10. def __init__(
  11. self, word_embedding_dimension, num_hidden_layers: int = 12, layer_start: int = 4, layer_weights=None
  12. ):
  13. super().__init__()
  14. self.config_keys = ["word_embedding_dimension", "layer_start", "num_hidden_layers"]
  15. self.word_embedding_dimension = word_embedding_dimension
  16. self.layer_start = layer_start
  17. self.num_hidden_layers = num_hidden_layers
  18. self.layer_weights = (
  19. layer_weights
  20. if layer_weights is not None
  21. else nn.Parameter(torch.tensor([1] * (num_hidden_layers + 1 - layer_start), dtype=torch.float))
  22. )
  23. def forward(self, features: dict[str, Tensor]):
  24. ft_all_layers = features["all_layer_embeddings"]
  25. all_layer_embedding = torch.stack(ft_all_layers)
  26. all_layer_embedding = all_layer_embedding[self.layer_start :, :, :, :] # Start from 4th layers output
  27. weight_factor = self.layer_weights.unsqueeze(-1).unsqueeze(-1).unsqueeze(-1).expand(all_layer_embedding.size())
  28. weighted_average = (weight_factor * all_layer_embedding).sum(dim=0) / self.layer_weights.sum()
  29. features.update({"token_embeddings": weighted_average})
  30. return features
  31. def get_word_embedding_dimension(self):
  32. return self.word_embedding_dimension
  33. def get_config_dict(self):
  34. return {key: self.__dict__[key] for key in self.config_keys}
  35. def save(self, output_path: str, safe_serialization: bool = True):
  36. with open(os.path.join(output_path, "config.json"), "w") as fOut:
  37. json.dump(self.get_config_dict(), fOut, indent=2)
  38. if safe_serialization:
  39. save_safetensors_model(self, os.path.join(output_path, "model.safetensors"))
  40. else:
  41. torch.save(self.state_dict(), os.path.join(output_path, "pytorch_model.bin"))
  42. @staticmethod
  43. def load(input_path):
  44. with open(os.path.join(input_path, "config.json")) as fIn:
  45. config = json.load(fIn)
  46. model = WeightedLayerPooling(**config)
  47. if os.path.exists(os.path.join(input_path, "model.safetensors")):
  48. load_safetensors_model(model, os.path.join(input_path, "model.safetensors"))
  49. else:
  50. model.load_state_dict(
  51. torch.load(
  52. os.path.join(input_path, "pytorch_model.bin"), map_location=torch.device("cpu"), weights_only=True
  53. )
  54. )
  55. return model