http_sync.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932
  1. """This file is largely copied from http.py"""
  2. import io
  3. import logging
  4. import re
  5. import urllib.error
  6. import urllib.parse
  7. from copy import copy
  8. from json import dumps, loads
  9. from urllib.parse import urlparse
  10. try:
  11. import yarl
  12. except (ImportError, ModuleNotFoundError, OSError):
  13. yarl = False
  14. from fsspec.callbacks import _DEFAULT_CALLBACK
  15. from fsspec.registry import register_implementation
  16. from fsspec.spec import AbstractBufferedFile, AbstractFileSystem
  17. from fsspec.utils import DEFAULT_BLOCK_SIZE, isfilelike, nullcontext, tokenize
  18. from ..caching import AllBytes
  19. # https://stackoverflow.com/a/15926317/3821154
  20. ex = re.compile(r"""<(a|A)\s+(?:[^>]*?\s+)?(href|HREF)=["'](?P<url>[^"']+)""")
  21. ex2 = re.compile(r"""(?P<url>http[s]?://[-a-zA-Z0-9@:%_+.~#?&/=]+)""")
  22. logger = logging.getLogger("fsspec.http")
  23. class JsHttpException(urllib.error.HTTPError): ...
  24. class StreamIO(io.BytesIO):
  25. # fake class, so you can set attributes on it
  26. # will eventually actually stream
  27. ...
  28. class ResponseProxy:
  29. """Looks like a requests response"""
  30. def __init__(self, req, stream=False):
  31. self.request = req
  32. self.stream = stream
  33. self._data = None
  34. self._headers = None
  35. @property
  36. def raw(self):
  37. if self._data is None:
  38. b = self.request.response.to_bytes()
  39. if self.stream:
  40. self._data = StreamIO(b)
  41. else:
  42. self._data = b
  43. return self._data
  44. def close(self):
  45. if hasattr(self, "_data"):
  46. del self._data
  47. @property
  48. def headers(self):
  49. if self._headers is None:
  50. self._headers = dict(
  51. [
  52. _.split(": ")
  53. for _ in self.request.getAllResponseHeaders().strip().split("\r\n")
  54. ]
  55. )
  56. return self._headers
  57. @property
  58. def status_code(self):
  59. return int(self.request.status)
  60. def raise_for_status(self):
  61. if not self.ok:
  62. raise JsHttpException(
  63. self.url, self.status_code, self.reason, self.headers, None
  64. )
  65. def iter_content(self, chunksize, *_, **__):
  66. while True:
  67. out = self.raw.read(chunksize)
  68. if out:
  69. yield out
  70. else:
  71. break
  72. @property
  73. def reason(self):
  74. return self.request.statusText
  75. @property
  76. def ok(self):
  77. return self.status_code < 400
  78. @property
  79. def url(self):
  80. return self.request.response.responseURL
  81. @property
  82. def text(self):
  83. # TODO: encoding from headers
  84. return self.content.decode()
  85. @property
  86. def content(self):
  87. self.stream = False
  88. return self.raw
  89. @property
  90. def json(self):
  91. return loads(self.text)
  92. class RequestsSessionShim:
  93. def __init__(self):
  94. self.headers = {}
  95. def request(
  96. self,
  97. method,
  98. url,
  99. params=None,
  100. data=None,
  101. headers=None,
  102. cookies=None,
  103. files=None,
  104. auth=None,
  105. timeout=None,
  106. allow_redirects=None,
  107. proxies=None,
  108. hooks=None,
  109. stream=None,
  110. verify=None,
  111. cert=None,
  112. json=None,
  113. ):
  114. from js import Blob, XMLHttpRequest
  115. logger.debug("JS request: %s %s", method, url)
  116. if cert or verify or proxies or files or cookies or hooks:
  117. raise NotImplementedError
  118. if data and json:
  119. raise ValueError("Use json= or data=, not both")
  120. req = XMLHttpRequest.new()
  121. extra = auth if auth else ()
  122. if params:
  123. url = f"{url}?{urllib.parse.urlencode(params)}"
  124. req.open(method, url, False, *extra)
  125. if timeout:
  126. req.timeout = timeout
  127. if headers:
  128. for k, v in headers.items():
  129. req.setRequestHeader(k, v)
  130. req.setRequestHeader("Accept", "application/octet-stream")
  131. req.responseType = "arraybuffer"
  132. if json:
  133. blob = Blob.new([dumps(data)], {type: "application/json"})
  134. req.send(blob)
  135. elif data:
  136. if isinstance(data, io.IOBase):
  137. data = data.read()
  138. blob = Blob.new([data], {type: "application/octet-stream"})
  139. req.send(blob)
  140. else:
  141. req.send(None)
  142. return ResponseProxy(req, stream=stream)
  143. def get(self, url, **kwargs):
  144. return self.request("GET", url, **kwargs)
  145. def head(self, url, **kwargs):
  146. return self.request("HEAD", url, **kwargs)
  147. def post(self, url, **kwargs):
  148. return self.request("POST}", url, **kwargs)
  149. def put(self, url, **kwargs):
  150. return self.request("PUT", url, **kwargs)
  151. def patch(self, url, **kwargs):
  152. return self.request("PATCH", url, **kwargs)
  153. def delete(self, url, **kwargs):
  154. return self.request("DELETE", url, **kwargs)
  155. class HTTPFileSystem(AbstractFileSystem):
  156. """
  157. Simple File-System for fetching data via HTTP(S)
  158. This is the BLOCKING version of the normal HTTPFileSystem. It uses
  159. requests in normal python and the JS runtime in pyodide.
  160. ***This implementation is extremely experimental, do not use unless
  161. you are testing pyodide/pyscript integration***
  162. """
  163. protocol = ("http", "https", "sync_http", "sync_https")
  164. sep = "/"
  165. def __init__(
  166. self,
  167. simple_links=True,
  168. block_size=None,
  169. same_scheme=True,
  170. cache_type="readahead",
  171. cache_options=None,
  172. client_kwargs=None,
  173. encoded=False,
  174. **storage_options,
  175. ):
  176. """
  177. Parameters
  178. ----------
  179. block_size: int
  180. Blocks to read bytes; if 0, will default to raw requests file-like
  181. objects instead of HTTPFile instances
  182. simple_links: bool
  183. If True, will consider both HTML <a> tags and anything that looks
  184. like a URL; if False, will consider only the former.
  185. same_scheme: True
  186. When doing ls/glob, if this is True, only consider paths that have
  187. http/https matching the input URLs.
  188. size_policy: this argument is deprecated
  189. client_kwargs: dict
  190. Passed to aiohttp.ClientSession, see
  191. https://docs.aiohttp.org/en/stable/client_reference.html
  192. For example, ``{'auth': aiohttp.BasicAuth('user', 'pass')}``
  193. storage_options: key-value
  194. Any other parameters passed on to requests
  195. cache_type, cache_options: defaults used in open
  196. """
  197. super().__init__(self, **storage_options)
  198. self.block_size = block_size if block_size is not None else DEFAULT_BLOCK_SIZE
  199. self.simple_links = simple_links
  200. self.same_schema = same_scheme
  201. self.cache_type = cache_type
  202. self.cache_options = cache_options
  203. self.client_kwargs = client_kwargs or {}
  204. self.encoded = encoded
  205. self.kwargs = storage_options
  206. try:
  207. import js # noqa: F401
  208. logger.debug("Starting JS session")
  209. self.session = RequestsSessionShim()
  210. self.js = True
  211. except Exception as e:
  212. import requests
  213. logger.debug("Starting cpython session because of: %s", e)
  214. self.session = requests.Session(**(client_kwargs or {}))
  215. self.js = False
  216. request_options = copy(storage_options)
  217. self.use_listings_cache = request_options.pop("use_listings_cache", False)
  218. request_options.pop("listings_expiry_time", None)
  219. request_options.pop("max_paths", None)
  220. request_options.pop("skip_instance_cache", None)
  221. self.kwargs = request_options
  222. @property
  223. def fsid(self):
  224. return "http_sync"
  225. def encode_url(self, url):
  226. if yarl:
  227. return yarl.URL(url, encoded=self.encoded)
  228. return url
  229. @classmethod
  230. def _strip_protocol(cls, path: str) -> str:
  231. """For HTTP, we always want to keep the full URL"""
  232. path = path.replace("http_sync://", "http://").replace(
  233. "https_sync://", "https://"
  234. )
  235. return path
  236. @classmethod
  237. def _parent(cls, path):
  238. # override, since _strip_protocol is different for URLs
  239. par = super()._parent(path)
  240. if len(par) > 7: # "http://..."
  241. return par
  242. return ""
  243. def _ls_real(self, url, detail=True, **kwargs):
  244. # ignoring URL-encoded arguments
  245. kw = self.kwargs.copy()
  246. kw.update(kwargs)
  247. logger.debug(url)
  248. r = self.session.get(self.encode_url(url), **self.kwargs)
  249. self._raise_not_found_for_status(r, url)
  250. text = r.text
  251. if self.simple_links:
  252. links = ex2.findall(text) + [u[2] for u in ex.findall(text)]
  253. else:
  254. links = [u[2] for u in ex.findall(text)]
  255. out = set()
  256. parts = urlparse(url)
  257. for l in links:
  258. if isinstance(l, tuple):
  259. l = l[1]
  260. if l.startswith("/") and len(l) > 1:
  261. # absolute URL on this server
  262. l = parts.scheme + "://" + parts.netloc + l
  263. if l.startswith("http"):
  264. if self.same_schema and l.startswith(url.rstrip("/") + "/"):
  265. out.add(l)
  266. elif l.replace("https", "http").startswith(
  267. url.replace("https", "http").rstrip("/") + "/"
  268. ):
  269. # allowed to cross http <-> https
  270. out.add(l)
  271. else:
  272. if l not in ["..", "../"]:
  273. # Ignore FTP-like "parent"
  274. out.add("/".join([url.rstrip("/"), l.lstrip("/")]))
  275. if not out and url.endswith("/"):
  276. out = self._ls_real(url.rstrip("/"), detail=False)
  277. if detail:
  278. return [
  279. {
  280. "name": u,
  281. "size": None,
  282. "type": "directory" if u.endswith("/") else "file",
  283. }
  284. for u in out
  285. ]
  286. else:
  287. return sorted(out)
  288. def ls(self, url, detail=True, **kwargs):
  289. if self.use_listings_cache and url in self.dircache:
  290. out = self.dircache[url]
  291. else:
  292. out = self._ls_real(url, detail=detail, **kwargs)
  293. self.dircache[url] = out
  294. return out
  295. def _raise_not_found_for_status(self, response, url):
  296. """
  297. Raises FileNotFoundError for 404s, otherwise uses raise_for_status.
  298. """
  299. if response.status_code == 404:
  300. raise FileNotFoundError(url)
  301. response.raise_for_status()
  302. def cat_file(self, url, start=None, end=None, **kwargs):
  303. kw = self.kwargs.copy()
  304. kw.update(kwargs)
  305. logger.debug(url)
  306. if start is not None or end is not None:
  307. if start == end:
  308. return b""
  309. headers = kw.pop("headers", {}).copy()
  310. headers["Range"] = self._process_limits(url, start, end)
  311. kw["headers"] = headers
  312. r = self.session.get(self.encode_url(url), **kw)
  313. self._raise_not_found_for_status(r, url)
  314. return r.content
  315. def get_file(
  316. self, rpath, lpath, chunk_size=5 * 2**20, callback=_DEFAULT_CALLBACK, **kwargs
  317. ):
  318. kw = self.kwargs.copy()
  319. kw.update(kwargs)
  320. logger.debug(rpath)
  321. r = self.session.get(self.encode_url(rpath), **kw)
  322. try:
  323. size = int(
  324. r.headers.get("content-length", None)
  325. or r.headers.get("Content-Length", None)
  326. )
  327. except (ValueError, KeyError, TypeError):
  328. size = None
  329. callback.set_size(size)
  330. self._raise_not_found_for_status(r, rpath)
  331. if not isfilelike(lpath):
  332. lpath = open(lpath, "wb")
  333. for chunk in r.iter_content(chunk_size, decode_unicode=False):
  334. lpath.write(chunk)
  335. callback.relative_update(len(chunk))
  336. def put_file(
  337. self,
  338. lpath,
  339. rpath,
  340. chunk_size=5 * 2**20,
  341. callback=_DEFAULT_CALLBACK,
  342. method="post",
  343. **kwargs,
  344. ):
  345. def gen_chunks():
  346. # Support passing arbitrary file-like objects
  347. # and use them instead of streams.
  348. if isinstance(lpath, io.IOBase):
  349. context = nullcontext(lpath)
  350. use_seek = False # might not support seeking
  351. else:
  352. context = open(lpath, "rb")
  353. use_seek = True
  354. with context as f:
  355. if use_seek:
  356. callback.set_size(f.seek(0, 2))
  357. f.seek(0)
  358. else:
  359. callback.set_size(getattr(f, "size", None))
  360. chunk = f.read(chunk_size)
  361. while chunk:
  362. yield chunk
  363. callback.relative_update(len(chunk))
  364. chunk = f.read(chunk_size)
  365. kw = self.kwargs.copy()
  366. kw.update(kwargs)
  367. method = method.lower()
  368. if method not in ("post", "put"):
  369. raise ValueError(
  370. f"method has to be either 'post' or 'put', not: {method!r}"
  371. )
  372. meth = getattr(self.session, method)
  373. resp = meth(rpath, data=gen_chunks(), **kw)
  374. self._raise_not_found_for_status(resp, rpath)
  375. def _process_limits(self, url, start, end):
  376. """Helper for "Range"-based _cat_file"""
  377. size = None
  378. suff = False
  379. if start is not None and start < 0:
  380. # if start is negative and end None, end is the "suffix length"
  381. if end is None:
  382. end = -start
  383. start = ""
  384. suff = True
  385. else:
  386. size = size or self.info(url)["size"]
  387. start = size + start
  388. elif start is None:
  389. start = 0
  390. if not suff:
  391. if end is not None and end < 0:
  392. if start is not None:
  393. size = size or self.info(url)["size"]
  394. end = size + end
  395. elif end is None:
  396. end = ""
  397. if isinstance(end, int):
  398. end -= 1 # bytes range is inclusive
  399. return f"bytes={start}-{end}"
  400. def exists(self, path, **kwargs):
  401. kw = self.kwargs.copy()
  402. kw.update(kwargs)
  403. try:
  404. logger.debug(path)
  405. r = self.session.get(self.encode_url(path), **kw)
  406. return r.status_code < 400
  407. except Exception:
  408. return False
  409. def isfile(self, path, **kwargs):
  410. return self.exists(path, **kwargs)
  411. def _open(
  412. self,
  413. path,
  414. mode="rb",
  415. block_size=None,
  416. autocommit=None, # XXX: This differs from the base class.
  417. cache_type=None,
  418. cache_options=None,
  419. size=None,
  420. **kwargs,
  421. ):
  422. """Make a file-like object
  423. Parameters
  424. ----------
  425. path: str
  426. Full URL with protocol
  427. mode: string
  428. must be "rb"
  429. block_size: int or None
  430. Bytes to download in one request; use instance value if None. If
  431. zero, will return a streaming Requests file-like instance.
  432. kwargs: key-value
  433. Any other parameters, passed to requests calls
  434. """
  435. if mode != "rb":
  436. raise NotImplementedError
  437. block_size = block_size if block_size is not None else self.block_size
  438. kw = self.kwargs.copy()
  439. kw.update(kwargs)
  440. size = size or self.info(path, **kwargs)["size"]
  441. if block_size and size:
  442. return HTTPFile(
  443. self,
  444. path,
  445. session=self.session,
  446. block_size=block_size,
  447. mode=mode,
  448. size=size,
  449. cache_type=cache_type or self.cache_type,
  450. cache_options=cache_options or self.cache_options,
  451. **kw,
  452. )
  453. else:
  454. return HTTPStreamFile(
  455. self,
  456. path,
  457. mode=mode,
  458. session=self.session,
  459. **kw,
  460. )
  461. def ukey(self, url):
  462. """Unique identifier; assume HTTP files are static, unchanging"""
  463. return tokenize(url, self.kwargs, self.protocol)
  464. def info(self, url, **kwargs):
  465. """Get info of URL
  466. Tries to access location via HEAD, and then GET methods, but does
  467. not fetch the data.
  468. It is possible that the server does not supply any size information, in
  469. which case size will be given as None (and certain operations on the
  470. corresponding file will not work).
  471. """
  472. info = {}
  473. for policy in ["head", "get"]:
  474. try:
  475. info.update(
  476. _file_info(
  477. self.encode_url(url),
  478. size_policy=policy,
  479. session=self.session,
  480. **self.kwargs,
  481. **kwargs,
  482. )
  483. )
  484. if info.get("size") is not None:
  485. break
  486. except Exception as exc:
  487. if policy == "get":
  488. # If get failed, then raise a FileNotFoundError
  489. raise FileNotFoundError(url) from exc
  490. logger.debug(str(exc))
  491. return {"name": url, "size": None, **info, "type": "file"}
  492. def glob(self, path, maxdepth=None, **kwargs):
  493. """
  494. Find files by glob-matching.
  495. This implementation is idntical to the one in AbstractFileSystem,
  496. but "?" is not considered as a character for globbing, because it is
  497. so common in URLs, often identifying the "query" part.
  498. """
  499. import re
  500. ends = path.endswith("/")
  501. path = self._strip_protocol(path)
  502. indstar = path.find("*") if path.find("*") >= 0 else len(path)
  503. indbrace = path.find("[") if path.find("[") >= 0 else len(path)
  504. ind = min(indstar, indbrace)
  505. detail = kwargs.pop("detail", False)
  506. if not has_magic(path):
  507. root = path
  508. depth = 1
  509. if ends:
  510. path += "/*"
  511. elif self.exists(path):
  512. if not detail:
  513. return [path]
  514. else:
  515. return {path: self.info(path)}
  516. else:
  517. if not detail:
  518. return [] # glob of non-existent returns empty
  519. else:
  520. return {}
  521. elif "/" in path[:ind]:
  522. ind2 = path[:ind].rindex("/")
  523. root = path[: ind2 + 1]
  524. depth = None if "**" in path else path[ind2 + 1 :].count("/") + 1
  525. else:
  526. root = ""
  527. depth = None if "**" in path else path[ind + 1 :].count("/") + 1
  528. allpaths = self.find(
  529. root, maxdepth=maxdepth or depth, withdirs=True, detail=True, **kwargs
  530. )
  531. # Escape characters special to python regex, leaving our supported
  532. # special characters in place.
  533. # See https://www.gnu.org/software/bash/manual/html_node/Pattern-Matching.html
  534. # for shell globbing details.
  535. pattern = (
  536. "^"
  537. + (
  538. path.replace("\\", r"\\")
  539. .replace(".", r"\.")
  540. .replace("+", r"\+")
  541. .replace("//", "/")
  542. .replace("(", r"\(")
  543. .replace(")", r"\)")
  544. .replace("|", r"\|")
  545. .replace("^", r"\^")
  546. .replace("$", r"\$")
  547. .replace("{", r"\{")
  548. .replace("}", r"\}")
  549. .rstrip("/")
  550. )
  551. + "$"
  552. )
  553. pattern = re.sub("[*]{2}", "=PLACEHOLDER=", pattern)
  554. pattern = re.sub("[*]", "[^/]*", pattern)
  555. pattern = re.compile(pattern.replace("=PLACEHOLDER=", ".*"))
  556. out = {
  557. p: allpaths[p]
  558. for p in sorted(allpaths)
  559. if pattern.match(p.replace("//", "/").rstrip("/"))
  560. }
  561. if detail:
  562. return out
  563. else:
  564. return list(out)
  565. def isdir(self, path):
  566. # override, since all URLs are (also) files
  567. try:
  568. return bool(self.ls(path))
  569. except (FileNotFoundError, ValueError):
  570. return False
  571. class HTTPFile(AbstractBufferedFile):
  572. """
  573. A file-like object pointing to a remove HTTP(S) resource
  574. Supports only reading, with read-ahead of a predermined block-size.
  575. In the case that the server does not supply the filesize, only reading of
  576. the complete file in one go is supported.
  577. Parameters
  578. ----------
  579. url: str
  580. Full URL of the remote resource, including the protocol
  581. session: requests.Session or None
  582. All calls will be made within this session, to avoid restarting
  583. connections where the server allows this
  584. block_size: int or None
  585. The amount of read-ahead to do, in bytes. Default is 5MB, or the value
  586. configured for the FileSystem creating this file
  587. size: None or int
  588. If given, this is the size of the file in bytes, and we don't attempt
  589. to call the server to find the value.
  590. kwargs: all other key-values are passed to requests calls.
  591. """
  592. def __init__(
  593. self,
  594. fs,
  595. url,
  596. session=None,
  597. block_size=None,
  598. mode="rb",
  599. cache_type="bytes",
  600. cache_options=None,
  601. size=None,
  602. **kwargs,
  603. ):
  604. if mode != "rb":
  605. raise NotImplementedError("File mode not supported")
  606. self.url = url
  607. self.session = session
  608. self.details = {"name": url, "size": size, "type": "file"}
  609. super().__init__(
  610. fs=fs,
  611. path=url,
  612. mode=mode,
  613. block_size=block_size,
  614. cache_type=cache_type,
  615. cache_options=cache_options,
  616. **kwargs,
  617. )
  618. def read(self, length=-1):
  619. """Read bytes from file
  620. Parameters
  621. ----------
  622. length: int
  623. Read up to this many bytes. If negative, read all content to end of
  624. file. If the server has not supplied the filesize, attempting to
  625. read only part of the data will raise a ValueError.
  626. """
  627. if (
  628. (length < 0 and self.loc == 0) # explicit read all
  629. # but not when the size is known and fits into a block anyways
  630. and not (self.size is not None and self.size <= self.blocksize)
  631. ):
  632. self._fetch_all()
  633. if self.size is None:
  634. if length < 0:
  635. self._fetch_all()
  636. else:
  637. length = min(self.size - self.loc, length)
  638. return super().read(length)
  639. def _fetch_all(self):
  640. """Read whole file in one shot, without caching
  641. This is only called when position is still at zero,
  642. and read() is called without a byte-count.
  643. """
  644. logger.debug(f"Fetch all for {self}")
  645. if not isinstance(self.cache, AllBytes):
  646. r = self.session.get(self.fs.encode_url(self.url), **self.kwargs)
  647. r.raise_for_status()
  648. out = r.content
  649. self.cache = AllBytes(size=len(out), fetcher=None, blocksize=None, data=out)
  650. self.size = len(out)
  651. def _parse_content_range(self, headers):
  652. """Parse the Content-Range header"""
  653. s = headers.get("Content-Range", "")
  654. m = re.match(r"bytes (\d+-\d+|\*)/(\d+|\*)", s)
  655. if not m:
  656. return None, None, None
  657. if m[1] == "*":
  658. start = end = None
  659. else:
  660. start, end = [int(x) for x in m[1].split("-")]
  661. total = None if m[2] == "*" else int(m[2])
  662. return start, end, total
  663. def _fetch_range(self, start, end):
  664. """Download a block of data
  665. The expectation is that the server returns only the requested bytes,
  666. with HTTP code 206. If this is not the case, we first check the headers,
  667. and then stream the output - if the data size is bigger than we
  668. requested, an exception is raised.
  669. """
  670. logger.debug(f"Fetch range for {self}: {start}-{end}")
  671. kwargs = self.kwargs.copy()
  672. headers = kwargs.pop("headers", {}).copy()
  673. headers["Range"] = f"bytes={start}-{end - 1}"
  674. logger.debug("%s : %s", self.url, headers["Range"])
  675. r = self.session.get(self.fs.encode_url(self.url), headers=headers, **kwargs)
  676. if r.status_code == 416:
  677. # range request outside file
  678. return b""
  679. r.raise_for_status()
  680. # If the server has handled the range request, it should reply
  681. # with status 206 (partial content). But we'll guess that a suitable
  682. # Content-Range header or a Content-Length no more than the
  683. # requested range also mean we have got the desired range.
  684. cl = r.headers.get("Content-Length", r.headers.get("content-length", end + 1))
  685. response_is_range = (
  686. r.status_code == 206
  687. or self._parse_content_range(r.headers)[0] == start
  688. or int(cl) <= end - start
  689. )
  690. if response_is_range:
  691. # partial content, as expected
  692. out = r.content
  693. elif start > 0:
  694. raise ValueError(
  695. "The HTTP server doesn't appear to support range requests. "
  696. "Only reading this file from the beginning is supported. "
  697. "Open with block_size=0 for a streaming file interface."
  698. )
  699. else:
  700. # Response is not a range, but we want the start of the file,
  701. # so we can read the required amount anyway.
  702. cl = 0
  703. out = []
  704. for chunk in r.iter_content(2**20, False):
  705. out.append(chunk)
  706. cl += len(chunk)
  707. out = b"".join(out)[: end - start]
  708. return out
  709. magic_check = re.compile("([*[])")
  710. def has_magic(s):
  711. match = magic_check.search(s)
  712. return match is not None
  713. class HTTPStreamFile(AbstractBufferedFile):
  714. def __init__(self, fs, url, mode="rb", session=None, **kwargs):
  715. self.url = url
  716. self.session = session
  717. if mode != "rb":
  718. raise ValueError
  719. self.details = {"name": url, "size": None}
  720. super().__init__(fs=fs, path=url, mode=mode, cache_type="readahead", **kwargs)
  721. r = self.session.get(self.fs.encode_url(url), stream=True, **kwargs)
  722. self.fs._raise_not_found_for_status(r, url)
  723. self.it = r.iter_content(1024, False)
  724. self.leftover = b""
  725. self.r = r
  726. def seek(self, *args, **kwargs):
  727. raise ValueError("Cannot seek streaming HTTP file")
  728. def read(self, num=-1):
  729. bufs = [self.leftover]
  730. leng = len(self.leftover)
  731. while leng < num or num < 0:
  732. try:
  733. out = self.it.__next__()
  734. except StopIteration:
  735. break
  736. if out:
  737. bufs.append(out)
  738. else:
  739. break
  740. leng += len(out)
  741. out = b"".join(bufs)
  742. if num >= 0:
  743. self.leftover = out[num:]
  744. out = out[:num]
  745. else:
  746. self.leftover = b""
  747. self.loc += len(out)
  748. return out
  749. def close(self):
  750. self.r.close()
  751. self.closed = True
  752. def get_range(session, url, start, end, **kwargs):
  753. # explicit get a range when we know it must be safe
  754. kwargs = kwargs.copy()
  755. headers = kwargs.pop("headers", {}).copy()
  756. headers["Range"] = f"bytes={start}-{end - 1}"
  757. r = session.get(url, headers=headers, **kwargs)
  758. r.raise_for_status()
  759. return r.content
  760. def _file_info(url, session, size_policy="head", **kwargs):
  761. """Call HEAD on the server to get details about the file (size/checksum etc.)
  762. Default operation is to explicitly allow redirects and use encoding
  763. 'identity' (no compression) to get the true size of the target.
  764. """
  765. logger.debug("Retrieve file size for %s", url)
  766. kwargs = kwargs.copy()
  767. ar = kwargs.pop("allow_redirects", True)
  768. head = kwargs.get("headers", {}).copy()
  769. # TODO: not allowed in JS
  770. # head["Accept-Encoding"] = "identity"
  771. kwargs["headers"] = head
  772. info = {}
  773. if size_policy == "head":
  774. r = session.head(url, allow_redirects=ar, **kwargs)
  775. elif size_policy == "get":
  776. r = session.get(url, allow_redirects=ar, **kwargs)
  777. else:
  778. raise TypeError(f'size_policy must be "head" or "get", got {size_policy}')
  779. r.raise_for_status()
  780. # TODO:
  781. # recognise lack of 'Accept-Ranges',
  782. # or 'Accept-Ranges': 'none' (not 'bytes')
  783. # to mean streaming only, no random access => return None
  784. if "Content-Length" in r.headers:
  785. info["size"] = int(r.headers["Content-Length"])
  786. elif "Content-Range" in r.headers:
  787. info["size"] = int(r.headers["Content-Range"].split("/")[1])
  788. elif "content-length" in r.headers:
  789. info["size"] = int(r.headers["content-length"])
  790. elif "content-range" in r.headers:
  791. info["size"] = int(r.headers["content-range"].split("/")[1])
  792. for checksum_field in ["ETag", "Content-MD5", "Digest"]:
  793. if r.headers.get(checksum_field):
  794. info[checksum_field] = r.headers[checksum_field]
  795. return info
  796. # importing this is enough to register it
  797. def register():
  798. register_implementation("http", HTTPFileSystem, clobber=True)
  799. register_implementation("https", HTTPFileSystem, clobber=True)
  800. register_implementation("sync_http", HTTPFileSystem, clobber=True)
  801. register_implementation("sync_https", HTTPFileSystem, clobber=True)
  802. register()
  803. def unregister():
  804. from fsspec.implementations.http import HTTPFileSystem
  805. register_implementation("http", HTTPFileSystem, clobber=True)
  806. register_implementation("https", HTTPFileSystem, clobber=True)