cached.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929
  1. from __future__ import annotations
  2. import inspect
  3. import logging
  4. import os
  5. import tempfile
  6. import time
  7. import weakref
  8. from shutil import rmtree
  9. from typing import TYPE_CHECKING, Any, Callable, ClassVar
  10. from fsspec import AbstractFileSystem, filesystem
  11. from fsspec.callbacks import DEFAULT_CALLBACK
  12. from fsspec.compression import compr
  13. from fsspec.core import BaseCache, MMapCache
  14. from fsspec.exceptions import BlocksizeMismatchError
  15. from fsspec.implementations.cache_mapper import create_cache_mapper
  16. from fsspec.implementations.cache_metadata import CacheMetadata
  17. from fsspec.spec import AbstractBufferedFile
  18. from fsspec.transaction import Transaction
  19. from fsspec.utils import infer_compression
  20. if TYPE_CHECKING:
  21. from fsspec.implementations.cache_mapper import AbstractCacheMapper
  22. logger = logging.getLogger("fsspec.cached")
  23. class WriteCachedTransaction(Transaction):
  24. def complete(self, commit=True):
  25. rpaths = [f.path for f in self.files]
  26. lpaths = [f.fn for f in self.files]
  27. if commit:
  28. self.fs.put(lpaths, rpaths)
  29. self.files.clear()
  30. self.fs._intrans = False
  31. self.fs._transaction = None
  32. self.fs = None # break cycle
  33. class CachingFileSystem(AbstractFileSystem):
  34. """Locally caching filesystem, layer over any other FS
  35. This class implements chunk-wise local storage of remote files, for quick
  36. access after the initial download. The files are stored in a given
  37. directory with hashes of URLs for the filenames. If no directory is given,
  38. a temporary one is used, which should be cleaned up by the OS after the
  39. process ends. The files themselves are sparse (as implemented in
  40. :class:`~fsspec.caching.MMapCache`), so only the data which is accessed
  41. takes up space.
  42. Restrictions:
  43. - the block-size must be the same for each access of a given file, unless
  44. all blocks of the file have already been read
  45. - caching can only be applied to file-systems which produce files
  46. derived from fsspec.spec.AbstractBufferedFile ; LocalFileSystem is also
  47. allowed, for testing
  48. """
  49. protocol: ClassVar[str | tuple[str, ...]] = ("blockcache", "cached")
  50. def __init__(
  51. self,
  52. target_protocol=None,
  53. cache_storage="TMP",
  54. cache_check=10,
  55. check_files=False,
  56. expiry_time=604800,
  57. target_options=None,
  58. fs=None,
  59. same_names: bool | None = None,
  60. compression=None,
  61. cache_mapper: AbstractCacheMapper | None = None,
  62. **kwargs,
  63. ):
  64. """
  65. Parameters
  66. ----------
  67. target_protocol: str (optional)
  68. Target filesystem protocol. Provide either this or ``fs``.
  69. cache_storage: str or list(str)
  70. Location to store files. If "TMP", this is a temporary directory,
  71. and will be cleaned up by the OS when this process ends (or later).
  72. If a list, each location will be tried in the order given, but
  73. only the last will be considered writable.
  74. cache_check: int
  75. Number of seconds between reload of cache metadata
  76. check_files: bool
  77. Whether to explicitly see if the UID of the remote file matches
  78. the stored one before using. Warning: some file systems such as
  79. HTTP cannot reliably give a unique hash of the contents of some
  80. path, so be sure to set this option to False.
  81. expiry_time: int
  82. The time in seconds after which a local copy is considered useless.
  83. Set to falsy to prevent expiry. The default is equivalent to one
  84. week.
  85. target_options: dict or None
  86. Passed to the instantiation of the FS, if fs is None.
  87. fs: filesystem instance
  88. The target filesystem to run against. Provide this or ``protocol``.
  89. same_names: bool (optional)
  90. By default, target URLs are hashed using a ``HashCacheMapper`` so
  91. that files from different backends with the same basename do not
  92. conflict. If this argument is ``true``, a ``BasenameCacheMapper``
  93. is used instead. Other cache mapper options are available by using
  94. the ``cache_mapper`` keyword argument. Only one of this and
  95. ``cache_mapper`` should be specified.
  96. compression: str (optional)
  97. To decompress on download. Can be 'infer' (guess from the URL name),
  98. one of the entries in ``fsspec.compression.compr``, or None for no
  99. decompression.
  100. cache_mapper: AbstractCacheMapper (optional)
  101. The object use to map from original filenames to cached filenames.
  102. Only one of this and ``same_names`` should be specified.
  103. """
  104. super().__init__(**kwargs)
  105. if fs is None and target_protocol is None:
  106. raise ValueError(
  107. "Please provide filesystem instance(fs) or target_protocol"
  108. )
  109. if not (fs is None) ^ (target_protocol is None):
  110. raise ValueError(
  111. "Both filesystems (fs) and target_protocol may not be both given."
  112. )
  113. if cache_storage == "TMP":
  114. tempdir = tempfile.mkdtemp()
  115. storage = [tempdir]
  116. weakref.finalize(self, self._remove_tempdir, tempdir)
  117. else:
  118. if isinstance(cache_storage, str):
  119. storage = [cache_storage]
  120. else:
  121. storage = cache_storage
  122. os.makedirs(storage[-1], exist_ok=True)
  123. self.storage = storage
  124. self.kwargs = target_options or {}
  125. self.cache_check = cache_check
  126. self.check_files = check_files
  127. self.expiry = expiry_time
  128. self.compression = compression
  129. # Size of cache in bytes. If None then the size is unknown and will be
  130. # recalculated the next time cache_size() is called. On writes to the
  131. # cache this is reset to None.
  132. self._cache_size = None
  133. if same_names is not None and cache_mapper is not None:
  134. raise ValueError(
  135. "Cannot specify both same_names and cache_mapper in "
  136. "CachingFileSystem.__init__"
  137. )
  138. if cache_mapper is not None:
  139. self._mapper = cache_mapper
  140. else:
  141. self._mapper = create_cache_mapper(
  142. same_names if same_names is not None else False
  143. )
  144. self.target_protocol = (
  145. target_protocol
  146. if isinstance(target_protocol, str)
  147. else (fs.protocol if isinstance(fs.protocol, str) else fs.protocol[0])
  148. )
  149. self._metadata = CacheMetadata(self.storage)
  150. self.load_cache()
  151. self.fs = fs if fs is not None else filesystem(target_protocol, **self.kwargs)
  152. def _strip_protocol(path):
  153. # acts as a method, since each instance has a difference target
  154. return self.fs._strip_protocol(type(self)._strip_protocol(path))
  155. self._strip_protocol: Callable = _strip_protocol
  156. @staticmethod
  157. def _remove_tempdir(tempdir):
  158. try:
  159. rmtree(tempdir)
  160. except Exception:
  161. pass
  162. def _mkcache(self):
  163. os.makedirs(self.storage[-1], exist_ok=True)
  164. def cache_size(self):
  165. """Return size of cache in bytes.
  166. If more than one cache directory is in use, only the size of the last
  167. one (the writable cache directory) is returned.
  168. """
  169. if self._cache_size is None:
  170. cache_dir = self.storage[-1]
  171. self._cache_size = filesystem("file").du(cache_dir, withdirs=True)
  172. return self._cache_size
  173. def load_cache(self):
  174. """Read set of stored blocks from file"""
  175. self._metadata.load()
  176. self._mkcache()
  177. self.last_cache = time.time()
  178. def save_cache(self):
  179. """Save set of stored blocks from file"""
  180. self._mkcache()
  181. self._metadata.save()
  182. self.last_cache = time.time()
  183. self._cache_size = None
  184. def _check_cache(self):
  185. """Reload caches if time elapsed or any disappeared"""
  186. self._mkcache()
  187. if not self.cache_check:
  188. # explicitly told not to bother checking
  189. return
  190. timecond = time.time() - self.last_cache > self.cache_check
  191. existcond = all(os.path.exists(storage) for storage in self.storage)
  192. if timecond or not existcond:
  193. self.load_cache()
  194. def _check_file(self, path):
  195. """Is path in cache and still valid"""
  196. path = self._strip_protocol(path)
  197. self._check_cache()
  198. return self._metadata.check_file(path, self)
  199. def clear_cache(self):
  200. """Remove all files and metadata from the cache
  201. In the case of multiple cache locations, this clears only the last one,
  202. which is assumed to be the read/write one.
  203. """
  204. rmtree(self.storage[-1])
  205. self.load_cache()
  206. self._cache_size = None
  207. def clear_expired_cache(self, expiry_time=None):
  208. """Remove all expired files and metadata from the cache
  209. In the case of multiple cache locations, this clears only the last one,
  210. which is assumed to be the read/write one.
  211. Parameters
  212. ----------
  213. expiry_time: int
  214. The time in seconds after which a local copy is considered useless.
  215. If not defined the default is equivalent to the attribute from the
  216. file caching instantiation.
  217. """
  218. if not expiry_time:
  219. expiry_time = self.expiry
  220. self._check_cache()
  221. expired_files, writable_cache_empty = self._metadata.clear_expired(expiry_time)
  222. for fn in expired_files:
  223. if os.path.exists(fn):
  224. os.remove(fn)
  225. if writable_cache_empty:
  226. rmtree(self.storage[-1])
  227. self.load_cache()
  228. self._cache_size = None
  229. def pop_from_cache(self, path):
  230. """Remove cached version of given file
  231. Deletes local copy of the given (remote) path. If it is found in a cache
  232. location which is not the last, it is assumed to be read-only, and
  233. raises PermissionError
  234. """
  235. path = self._strip_protocol(path)
  236. fn = self._metadata.pop_file(path)
  237. if fn is not None:
  238. os.remove(fn)
  239. self._cache_size = None
  240. def _open(
  241. self,
  242. path,
  243. mode="rb",
  244. block_size=None,
  245. autocommit=True,
  246. cache_options=None,
  247. **kwargs,
  248. ):
  249. """Wrap the target _open
  250. If the whole file exists in the cache, just open it locally and
  251. return that.
  252. Otherwise, open the file on the target FS, and make it have a mmap
  253. cache pointing to the location which we determine, in our cache.
  254. The ``blocks`` instance is shared, so as the mmap cache instance
  255. updates, so does the entry in our ``cached_files`` attribute.
  256. We monkey-patch this file, so that when it closes, we call
  257. ``close_and_update`` to save the state of the blocks.
  258. """
  259. path = self._strip_protocol(path)
  260. path = self.fs._strip_protocol(path)
  261. if "r" not in mode:
  262. return self.fs._open(
  263. path,
  264. mode=mode,
  265. block_size=block_size,
  266. autocommit=autocommit,
  267. cache_options=cache_options,
  268. **kwargs,
  269. )
  270. detail = self._check_file(path)
  271. if detail:
  272. # file is in cache
  273. detail, fn = detail
  274. hash, blocks = detail["fn"], detail["blocks"]
  275. if blocks is True:
  276. # stored file is complete
  277. logger.debug("Opening local copy of %s", path)
  278. return open(fn, mode)
  279. # TODO: action where partial file exists in read-only cache
  280. logger.debug("Opening partially cached copy of %s", path)
  281. else:
  282. hash = self._mapper(path)
  283. fn = os.path.join(self.storage[-1], hash)
  284. blocks = set()
  285. detail = {
  286. "original": path,
  287. "fn": hash,
  288. "blocks": blocks,
  289. "time": time.time(),
  290. "uid": self.fs.ukey(path),
  291. }
  292. self._metadata.update_file(path, detail)
  293. logger.debug("Creating local sparse file for %s", path)
  294. # call target filesystems open
  295. self._mkcache()
  296. f = self.fs._open(
  297. path,
  298. mode=mode,
  299. block_size=block_size,
  300. autocommit=autocommit,
  301. cache_options=cache_options,
  302. cache_type="none",
  303. **kwargs,
  304. )
  305. if self.compression:
  306. comp = (
  307. infer_compression(path)
  308. if self.compression == "infer"
  309. else self.compression
  310. )
  311. f = compr[comp](f, mode="rb")
  312. if "blocksize" in detail:
  313. if detail["blocksize"] != f.blocksize:
  314. raise BlocksizeMismatchError(
  315. f"Cached file must be reopened with same block"
  316. f" size as original (old: {detail['blocksize']},"
  317. f" new {f.blocksize})"
  318. )
  319. else:
  320. detail["blocksize"] = f.blocksize
  321. f.cache = MMapCache(f.blocksize, f._fetch_range, f.size, fn, blocks)
  322. close = f.close
  323. f.close = lambda: self.close_and_update(f, close)
  324. self.save_cache()
  325. return f
  326. def _parent(self, path):
  327. return self.fs._parent(path)
  328. def hash_name(self, path: str, *args: Any) -> str:
  329. # Kept for backward compatibility with downstream libraries.
  330. # Ignores extra arguments, previously same_name boolean.
  331. return self._mapper(path)
  332. def close_and_update(self, f, close):
  333. """Called when a file is closing, so store the set of blocks"""
  334. if f.closed:
  335. return
  336. path = self._strip_protocol(f.path)
  337. self._metadata.on_close_cached_file(f, path)
  338. try:
  339. logger.debug("going to save")
  340. self.save_cache()
  341. logger.debug("saved")
  342. except OSError:
  343. logger.debug("Cache saving failed while closing file")
  344. except NameError:
  345. logger.debug("Cache save failed due to interpreter shutdown")
  346. close()
  347. f.closed = True
  348. def ls(self, path, detail=True):
  349. return self.fs.ls(path, detail)
  350. def __getattribute__(self, item):
  351. if item in {
  352. "load_cache",
  353. "_open",
  354. "save_cache",
  355. "close_and_update",
  356. "__init__",
  357. "__getattribute__",
  358. "__reduce__",
  359. "_make_local_details",
  360. "open",
  361. "cat",
  362. "cat_file",
  363. "cat_ranges",
  364. "get",
  365. "read_block",
  366. "tail",
  367. "head",
  368. "info",
  369. "ls",
  370. "exists",
  371. "isfile",
  372. "isdir",
  373. "_check_file",
  374. "_check_cache",
  375. "_mkcache",
  376. "clear_cache",
  377. "clear_expired_cache",
  378. "pop_from_cache",
  379. "local_file",
  380. "_paths_from_path",
  381. "get_mapper",
  382. "open_many",
  383. "commit_many",
  384. "hash_name",
  385. "__hash__",
  386. "__eq__",
  387. "to_json",
  388. "to_dict",
  389. "cache_size",
  390. "pipe_file",
  391. "pipe",
  392. "start_transaction",
  393. "end_transaction",
  394. }:
  395. # all the methods defined in this class. Note `open` here, since
  396. # it calls `_open`, but is actually in superclass
  397. return lambda *args, **kw: getattr(type(self), item).__get__(self)(
  398. *args, **kw
  399. )
  400. if item in ["__reduce_ex__"]:
  401. raise AttributeError
  402. if item in ["transaction"]:
  403. # property
  404. return type(self).transaction.__get__(self)
  405. if item in ["_cache", "transaction_type"]:
  406. # class attributes
  407. return getattr(type(self), item)
  408. if item == "__class__":
  409. return type(self)
  410. d = object.__getattribute__(self, "__dict__")
  411. fs = d.get("fs", None) # fs is not immediately defined
  412. if item in d:
  413. return d[item]
  414. elif fs is not None:
  415. if item in fs.__dict__:
  416. # attribute of instance
  417. return fs.__dict__[item]
  418. # attributed belonging to the target filesystem
  419. cls = type(fs)
  420. m = getattr(cls, item)
  421. if (inspect.isfunction(m) or inspect.isdatadescriptor(m)) and (
  422. not hasattr(m, "__self__") or m.__self__ is None
  423. ):
  424. # instance method
  425. return m.__get__(fs, cls)
  426. return m # class method or attribute
  427. else:
  428. # attributes of the superclass, while target is being set up
  429. return super().__getattribute__(item)
  430. def __eq__(self, other):
  431. """Test for equality."""
  432. if self is other:
  433. return True
  434. if not isinstance(other, type(self)):
  435. return False
  436. return (
  437. self.storage == other.storage
  438. and self.kwargs == other.kwargs
  439. and self.cache_check == other.cache_check
  440. and self.check_files == other.check_files
  441. and self.expiry == other.expiry
  442. and self.compression == other.compression
  443. and self._mapper == other._mapper
  444. and self.target_protocol == other.target_protocol
  445. )
  446. def __hash__(self):
  447. """Calculate hash."""
  448. return (
  449. hash(tuple(self.storage))
  450. ^ hash(str(self.kwargs))
  451. ^ hash(self.cache_check)
  452. ^ hash(self.check_files)
  453. ^ hash(self.expiry)
  454. ^ hash(self.compression)
  455. ^ hash(self._mapper)
  456. ^ hash(self.target_protocol)
  457. )
  458. class WholeFileCacheFileSystem(CachingFileSystem):
  459. """Caches whole remote files on first access
  460. This class is intended as a layer over any other file system, and
  461. will make a local copy of each file accessed, so that all subsequent
  462. reads are local. This is similar to ``CachingFileSystem``, but without
  463. the block-wise functionality and so can work even when sparse files
  464. are not allowed. See its docstring for definition of the init
  465. arguments.
  466. The class still needs access to the remote store for listing files,
  467. and may refresh cached files.
  468. """
  469. protocol = "filecache"
  470. local_file = True
  471. def open_many(self, open_files, **kwargs):
  472. paths = [of.path for of in open_files]
  473. if "r" in open_files.mode:
  474. self._mkcache()
  475. else:
  476. return [
  477. LocalTempFile(
  478. self.fs,
  479. path,
  480. mode=open_files.mode,
  481. fn=os.path.join(self.storage[-1], self._mapper(path)),
  482. **kwargs,
  483. )
  484. for path in paths
  485. ]
  486. if self.compression:
  487. raise NotImplementedError
  488. details = [self._check_file(sp) for sp in paths]
  489. downpath = [p for p, d in zip(paths, details) if not d]
  490. downfn0 = [
  491. os.path.join(self.storage[-1], self._mapper(p))
  492. for p, d in zip(paths, details)
  493. ] # keep these path names for opening later
  494. downfn = [fn for fn, d in zip(downfn0, details) if not d]
  495. if downpath:
  496. # skip if all files are already cached and up to date
  497. self.fs.get(downpath, downfn)
  498. # update metadata - only happens when downloads are successful
  499. newdetail = [
  500. {
  501. "original": path,
  502. "fn": self._mapper(path),
  503. "blocks": True,
  504. "time": time.time(),
  505. "uid": self.fs.ukey(path),
  506. }
  507. for path in downpath
  508. ]
  509. for path, detail in zip(downpath, newdetail):
  510. self._metadata.update_file(path, detail)
  511. self.save_cache()
  512. def firstpart(fn):
  513. # helper to adapt both whole-file and simple-cache
  514. return fn[1] if isinstance(fn, tuple) else fn
  515. return [
  516. open(firstpart(fn0) if fn0 else fn1, mode=open_files.mode)
  517. for fn0, fn1 in zip(details, downfn0)
  518. ]
  519. def commit_many(self, open_files):
  520. self.fs.put([f.fn for f in open_files], [f.path for f in open_files])
  521. [f.close() for f in open_files]
  522. for f in open_files:
  523. # in case autocommit is off, and so close did not already delete
  524. try:
  525. os.remove(f.name)
  526. except FileNotFoundError:
  527. pass
  528. self._cache_size = None
  529. def _make_local_details(self, path):
  530. hash = self._mapper(path)
  531. fn = os.path.join(self.storage[-1], hash)
  532. detail = {
  533. "original": path,
  534. "fn": hash,
  535. "blocks": True,
  536. "time": time.time(),
  537. "uid": self.fs.ukey(path),
  538. }
  539. self._metadata.update_file(path, detail)
  540. logger.debug("Copying %s to local cache", path)
  541. return fn
  542. def cat(
  543. self,
  544. path,
  545. recursive=False,
  546. on_error="raise",
  547. callback=DEFAULT_CALLBACK,
  548. **kwargs,
  549. ):
  550. paths = self.expand_path(
  551. path, recursive=recursive, maxdepth=kwargs.get("maxdepth")
  552. )
  553. getpaths = []
  554. storepaths = []
  555. fns = []
  556. out = {}
  557. for p in paths.copy():
  558. try:
  559. detail = self._check_file(p)
  560. if not detail:
  561. fn = self._make_local_details(p)
  562. getpaths.append(p)
  563. storepaths.append(fn)
  564. else:
  565. detail, fn = detail if isinstance(detail, tuple) else (None, detail)
  566. fns.append(fn)
  567. except Exception as e:
  568. if on_error == "raise":
  569. raise
  570. if on_error == "return":
  571. out[p] = e
  572. paths.remove(p)
  573. if getpaths:
  574. self.fs.get(getpaths, storepaths)
  575. self.save_cache()
  576. callback.set_size(len(paths))
  577. for p, fn in zip(paths, fns):
  578. with open(fn, "rb") as f:
  579. out[p] = f.read()
  580. callback.relative_update(1)
  581. if isinstance(path, str) and len(paths) == 1 and recursive is False:
  582. out = out[paths[0]]
  583. return out
  584. def _open(self, path, mode="rb", **kwargs):
  585. path = self._strip_protocol(path)
  586. if "r" not in mode:
  587. hash = self._mapper(path)
  588. fn = os.path.join(self.storage[-1], hash)
  589. user_specified_kwargs = {
  590. k: v
  591. for k, v in kwargs.items()
  592. # those kwargs were added by open(), we don't want them
  593. if k not in ["autocommit", "block_size", "cache_options"]
  594. }
  595. return LocalTempFile(self, path, mode=mode, fn=fn, **user_specified_kwargs)
  596. detail = self._check_file(path)
  597. if detail:
  598. detail, fn = detail
  599. _, blocks = detail["fn"], detail["blocks"]
  600. if blocks is True:
  601. logger.debug("Opening local copy of %s", path)
  602. # In order to support downstream filesystems to be able to
  603. # infer the compression from the original filename, like
  604. # the `TarFileSystem`, let's extend the `io.BufferedReader`
  605. # fileobject protocol by adding a dedicated attribute
  606. # `original`.
  607. f = open(fn, mode)
  608. f.original = detail.get("original")
  609. return f
  610. else:
  611. raise ValueError(
  612. f"Attempt to open partially cached file {path}"
  613. f" as a wholly cached file"
  614. )
  615. else:
  616. fn = self._make_local_details(path)
  617. kwargs["mode"] = mode
  618. # call target filesystems open
  619. self._mkcache()
  620. if self.compression:
  621. with self.fs._open(path, **kwargs) as f, open(fn, "wb") as f2:
  622. if isinstance(f, AbstractBufferedFile):
  623. # want no type of caching if just downloading whole thing
  624. f.cache = BaseCache(0, f.cache.fetcher, f.size)
  625. comp = (
  626. infer_compression(path)
  627. if self.compression == "infer"
  628. else self.compression
  629. )
  630. f = compr[comp](f, mode="rb")
  631. data = True
  632. while data:
  633. block = getattr(f, "blocksize", 5 * 2**20)
  634. data = f.read(block)
  635. f2.write(data)
  636. else:
  637. self.fs.get_file(path, fn)
  638. self.save_cache()
  639. return self._open(path, mode)
  640. class SimpleCacheFileSystem(WholeFileCacheFileSystem):
  641. """Caches whole remote files on first access
  642. This class is intended as a layer over any other file system, and
  643. will make a local copy of each file accessed, so that all subsequent
  644. reads are local. This implementation only copies whole files, and
  645. does not keep any metadata about the download time or file details.
  646. It is therefore safer to use in multi-threaded/concurrent situations.
  647. This is the only of the caching filesystems that supports write: you will
  648. be given a real local open file, and upon close and commit, it will be
  649. uploaded to the target filesystem; the writability or the target URL is
  650. not checked until that time.
  651. """
  652. protocol = "simplecache"
  653. local_file = True
  654. transaction_type = WriteCachedTransaction
  655. def __init__(self, **kwargs):
  656. kw = kwargs.copy()
  657. for key in ["cache_check", "expiry_time", "check_files"]:
  658. kw[key] = False
  659. super().__init__(**kw)
  660. for storage in self.storage:
  661. if not os.path.exists(storage):
  662. os.makedirs(storage, exist_ok=True)
  663. def _check_file(self, path):
  664. self._check_cache()
  665. sha = self._mapper(path)
  666. for storage in self.storage:
  667. fn = os.path.join(storage, sha)
  668. if os.path.exists(fn):
  669. return fn
  670. def save_cache(self):
  671. pass
  672. def load_cache(self):
  673. pass
  674. def pipe_file(self, path, value=None, **kwargs):
  675. if self._intrans:
  676. with self.open(path, "wb") as f:
  677. f.write(value)
  678. else:
  679. super().pipe_file(path, value)
  680. def ls(self, path, detail=True, **kwargs):
  681. path = self._strip_protocol(path)
  682. details = []
  683. try:
  684. details = self.fs.ls(
  685. path, detail=True, **kwargs
  686. ).copy() # don't edit original!
  687. except FileNotFoundError as e:
  688. ex = e
  689. else:
  690. ex = None
  691. if self._intrans:
  692. path1 = path.rstrip("/") + "/"
  693. for f in self.transaction.files:
  694. if f.path == path:
  695. details.append(
  696. {"name": path, "size": f.size or f.tell(), "type": "file"}
  697. )
  698. elif f.path.startswith(path1):
  699. if f.path.count("/") == path1.count("/"):
  700. details.append(
  701. {"name": f.path, "size": f.size or f.tell(), "type": "file"}
  702. )
  703. else:
  704. dname = "/".join(f.path.split("/")[: path1.count("/") + 1])
  705. details.append({"name": dname, "size": 0, "type": "directory"})
  706. if ex is not None and not details:
  707. raise ex
  708. if detail:
  709. return details
  710. return sorted(_["name"] for _ in details)
  711. def info(self, path, **kwargs):
  712. path = self._strip_protocol(path)
  713. if self._intrans:
  714. f = [_ for _ in self.transaction.files if _.path == path]
  715. if f:
  716. size = os.path.getsize(f[0].fn) if f[0].closed else f[0].tell()
  717. return {"name": path, "size": size, "type": "file"}
  718. f = any(_.path.startswith(path + "/") for _ in self.transaction.files)
  719. if f:
  720. return {"name": path, "size": 0, "type": "directory"}
  721. return self.fs.info(path, **kwargs)
  722. def pipe(self, path, value=None, **kwargs):
  723. if isinstance(path, str):
  724. self.pipe_file(self._strip_protocol(path), value, **kwargs)
  725. elif isinstance(path, dict):
  726. for k, v in path.items():
  727. self.pipe_file(self._strip_protocol(k), v, **kwargs)
  728. else:
  729. raise ValueError("path must be str or dict")
  730. def cat_ranges(
  731. self, paths, starts, ends, max_gap=None, on_error="return", **kwargs
  732. ):
  733. lpaths = [self._check_file(p) for p in paths]
  734. rpaths = [p for l, p in zip(lpaths, paths) if l is False]
  735. lpaths = [l for l, p in zip(lpaths, paths) if l is False]
  736. self.fs.get(rpaths, lpaths)
  737. return super().cat_ranges(
  738. paths, starts, ends, max_gap=max_gap, on_error=on_error, **kwargs
  739. )
  740. def _open(self, path, mode="rb", **kwargs):
  741. path = self._strip_protocol(path)
  742. sha = self._mapper(path)
  743. if "r" not in mode:
  744. fn = os.path.join(self.storage[-1], sha)
  745. user_specified_kwargs = {
  746. k: v
  747. for k, v in kwargs.items()
  748. if k not in ["autocommit", "block_size", "cache_options"]
  749. } # those were added by open()
  750. return LocalTempFile(
  751. self,
  752. path,
  753. mode=mode,
  754. autocommit=not self._intrans,
  755. fn=fn,
  756. **user_specified_kwargs,
  757. )
  758. fn = self._check_file(path)
  759. if fn:
  760. return open(fn, mode)
  761. fn = os.path.join(self.storage[-1], sha)
  762. logger.debug("Copying %s to local cache", path)
  763. kwargs["mode"] = mode
  764. self._mkcache()
  765. self._cache_size = None
  766. if self.compression:
  767. with self.fs._open(path, **kwargs) as f, open(fn, "wb") as f2:
  768. if isinstance(f, AbstractBufferedFile):
  769. # want no type of caching if just downloading whole thing
  770. f.cache = BaseCache(0, f.cache.fetcher, f.size)
  771. comp = (
  772. infer_compression(path)
  773. if self.compression == "infer"
  774. else self.compression
  775. )
  776. f = compr[comp](f, mode="rb")
  777. data = True
  778. while data:
  779. block = getattr(f, "blocksize", 5 * 2**20)
  780. data = f.read(block)
  781. f2.write(data)
  782. else:
  783. self.fs.get_file(path, fn)
  784. return self._open(path, mode)
  785. class LocalTempFile:
  786. """A temporary local file, which will be uploaded on commit"""
  787. def __init__(self, fs, path, fn, mode="wb", autocommit=True, seek=0, **kwargs):
  788. self.fn = fn
  789. self.fh = open(fn, mode)
  790. self.mode = mode
  791. if seek:
  792. self.fh.seek(seek)
  793. self.path = path
  794. self.size = None
  795. self.fs = fs
  796. self.closed = False
  797. self.autocommit = autocommit
  798. self.kwargs = kwargs
  799. def __reduce__(self):
  800. # always open in r+b to allow continuing writing at a location
  801. return (
  802. LocalTempFile,
  803. (self.fs, self.path, self.fn, "r+b", self.autocommit, self.tell()),
  804. )
  805. def __enter__(self):
  806. return self.fh
  807. def __exit__(self, exc_type, exc_val, exc_tb):
  808. self.close()
  809. def close(self):
  810. # self.size = self.fh.tell()
  811. if self.closed:
  812. return
  813. self.fh.close()
  814. self.closed = True
  815. if self.autocommit:
  816. self.commit()
  817. def discard(self):
  818. self.fh.close()
  819. os.remove(self.fn)
  820. def commit(self):
  821. self.fs.put(self.fn, self.path, **self.kwargs)
  822. # we do not delete local copy - it's still in the cache
  823. @property
  824. def name(self):
  825. return self.fn
  826. def __repr__(self) -> str:
  827. return f"LocalTempFile: {self.path}"
  828. def __getattr__(self, item):
  829. return getattr(self.fh, item)