LiveQueryClient.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. /**
  2. * Copyright (c) 2015-present, Parse, LLC.
  3. * All rights reserved.
  4. *
  5. * This source code is licensed under the BSD-style license found in the
  6. * LICENSE file in the root directory of this source tree. An additional grant
  7. * of patent rights can be found in the PATENTS file in the same directory.
  8. *
  9. */
  10. /* global WebSocket */
  11. import CoreManager from './CoreManager';
  12. import EventEmitter from './EventEmitter';
  13. import ParseObject from './ParseObject';
  14. import LiveQuerySubscription from './LiveQuerySubscription';
  15. import { resolvingPromise } from './promiseUtils'; // The LiveQuery client inner state
  16. const CLIENT_STATE = {
  17. INITIALIZED: 'initialized',
  18. CONNECTING: 'connecting',
  19. CONNECTED: 'connected',
  20. CLOSED: 'closed',
  21. RECONNECTING: 'reconnecting',
  22. DISCONNECTED: 'disconnected'
  23. }; // The event type the LiveQuery client should sent to server
  24. const OP_TYPES = {
  25. CONNECT: 'connect',
  26. SUBSCRIBE: 'subscribe',
  27. UNSUBSCRIBE: 'unsubscribe',
  28. ERROR: 'error'
  29. }; // The event we get back from LiveQuery server
  30. const OP_EVENTS = {
  31. CONNECTED: 'connected',
  32. SUBSCRIBED: 'subscribed',
  33. UNSUBSCRIBED: 'unsubscribed',
  34. ERROR: 'error',
  35. CREATE: 'create',
  36. UPDATE: 'update',
  37. ENTER: 'enter',
  38. LEAVE: 'leave',
  39. DELETE: 'delete'
  40. }; // The event the LiveQuery client should emit
  41. const CLIENT_EMMITER_TYPES = {
  42. CLOSE: 'close',
  43. ERROR: 'error',
  44. OPEN: 'open'
  45. }; // The event the LiveQuery subscription should emit
  46. const SUBSCRIPTION_EMMITER_TYPES = {
  47. OPEN: 'open',
  48. CLOSE: 'close',
  49. ERROR: 'error',
  50. CREATE: 'create',
  51. UPDATE: 'update',
  52. ENTER: 'enter',
  53. LEAVE: 'leave',
  54. DELETE: 'delete'
  55. };
  56. const generateInterval = k => {
  57. return Math.random() * Math.min(30, Math.pow(2, k) - 1) * 1000;
  58. };
  59. /**
  60. * Creates a new LiveQueryClient.
  61. * Extends events.EventEmitter
  62. * <a href="https://nodejs.org/api/events.html#events_class_eventemitter">cloud functions</a>.
  63. *
  64. * A wrapper of a standard WebSocket client. We add several useful methods to
  65. * help you connect/disconnect to LiveQueryServer, subscribe/unsubscribe a ParseQuery easily.
  66. *
  67. * javascriptKey and masterKey are used for verifying the LiveQueryClient when it tries
  68. * to connect to the LiveQuery server
  69. *
  70. * We expose three events to help you monitor the status of the LiveQueryClient.
  71. *
  72. * <pre>
  73. * let Parse = require('parse/node');
  74. * let LiveQueryClient = Parse.LiveQueryClient;
  75. * let client = new LiveQueryClient({
  76. * applicationId: '',
  77. * serverURL: '',
  78. * javascriptKey: '',
  79. * masterKey: ''
  80. * });
  81. * </pre>
  82. *
  83. * Open - When we establish the WebSocket connection to the LiveQuery server, you'll get this event.
  84. * <pre>
  85. * client.on('open', () => {
  86. *
  87. * });</pre>
  88. *
  89. * Close - When we lose the WebSocket connection to the LiveQuery server, you'll get this event.
  90. * <pre>
  91. * client.on('close', () => {
  92. *
  93. * });</pre>
  94. *
  95. * Error - When some network error or LiveQuery server error happens, you'll get this event.
  96. * <pre>
  97. * client.on('error', (error) => {
  98. *
  99. * });</pre>
  100. * @alias Parse.LiveQueryClient
  101. */
  102. class LiveQueryClient extends EventEmitter {
  103. /*:: attempts: number;*/
  104. /*:: id: number;*/
  105. /*:: requestId: number;*/
  106. /*:: applicationId: string;*/
  107. /*:: serverURL: string;*/
  108. /*:: javascriptKey: ?string;*/
  109. /*:: masterKey: ?string;*/
  110. /*:: sessionToken: ?string;*/
  111. /*:: connectPromise: Promise;*/
  112. /*:: subscriptions: Map;*/
  113. /*:: socket: any;*/
  114. /*:: state: string;*/
  115. /**
  116. * @param {Object} options
  117. * @param {string} options.applicationId - applicationId of your Parse app
  118. * @param {string} options.serverURL - <b>the URL of your LiveQuery server</b>
  119. * @param {string} options.javascriptKey (optional)
  120. * @param {string} options.masterKey (optional) Your Parse Master Key. (Node.js only!)
  121. * @param {string} options.sessionToken (optional)
  122. */
  123. constructor({
  124. applicationId,
  125. serverURL,
  126. javascriptKey,
  127. masterKey,
  128. sessionToken
  129. }) {
  130. super();
  131. if (!serverURL || serverURL.indexOf('ws') !== 0) {
  132. throw new Error('You need to set a proper Parse LiveQuery server url before using LiveQueryClient');
  133. }
  134. this.reconnectHandle = null;
  135. this.attempts = 1;
  136. this.id = 0;
  137. this.requestId = 1;
  138. this.serverURL = serverURL;
  139. this.applicationId = applicationId;
  140. this.javascriptKey = javascriptKey;
  141. this.masterKey = masterKey;
  142. this.sessionToken = sessionToken;
  143. this.connectPromise = resolvingPromise();
  144. this.subscriptions = new Map();
  145. this.state = CLIENT_STATE.INITIALIZED;
  146. }
  147. shouldOpen()
  148. /*: any*/
  149. {
  150. return this.state === CLIENT_STATE.INITIALIZED || this.state === CLIENT_STATE.DISCONNECTED;
  151. }
  152. /**
  153. * Subscribes to a ParseQuery
  154. *
  155. * If you provide the sessionToken, when the LiveQuery server gets ParseObject's
  156. * updates from parse server, it'll try to check whether the sessionToken fulfills
  157. * the ParseObject's ACL. The LiveQuery server will only send updates to clients whose
  158. * sessionToken is fit for the ParseObject's ACL. You can check the LiveQuery protocol
  159. * <a href="https://github.com/parse-community/parse-server/wiki/Parse-LiveQuery-Protocol-Specification">here</a> for more details. The subscription you get is the same subscription you get
  160. * from our Standard API.
  161. *
  162. * @param {Object} query - the ParseQuery you want to subscribe to
  163. * @param {string} sessionToken (optional)
  164. * @return {Object} subscription
  165. */
  166. subscribe(query
  167. /*: Object*/
  168. , sessionToken
  169. /*: ?string*/
  170. )
  171. /*: Object*/
  172. {
  173. if (!query) {
  174. return;
  175. }
  176. const className = query.className;
  177. const queryJSON = query.toJSON();
  178. const where = queryJSON.where;
  179. const fields = queryJSON.keys ? queryJSON.keys.split(',') : undefined;
  180. const subscribeRequest = {
  181. op: OP_TYPES.SUBSCRIBE,
  182. requestId: this.requestId,
  183. query: {
  184. className,
  185. where,
  186. fields
  187. }
  188. };
  189. if (sessionToken) {
  190. subscribeRequest.sessionToken = sessionToken;
  191. }
  192. const subscription = new LiveQuerySubscription(this.requestId, query, sessionToken);
  193. this.subscriptions.set(this.requestId, subscription);
  194. this.requestId += 1;
  195. this.connectPromise.then(() => {
  196. this.socket.send(JSON.stringify(subscribeRequest));
  197. });
  198. return subscription;
  199. }
  200. /**
  201. * After calling unsubscribe you'll stop receiving events from the subscription object.
  202. *
  203. * @param {Object} subscription - subscription you would like to unsubscribe from.
  204. */
  205. unsubscribe(subscription
  206. /*: Object*/
  207. ) {
  208. if (!subscription) {
  209. return;
  210. }
  211. this.subscriptions.delete(subscription.id);
  212. const unsubscribeRequest = {
  213. op: OP_TYPES.UNSUBSCRIBE,
  214. requestId: subscription.id
  215. };
  216. this.connectPromise.then(() => {
  217. this.socket.send(JSON.stringify(unsubscribeRequest));
  218. });
  219. }
  220. /**
  221. * After open is called, the LiveQueryClient will try to send a connect request
  222. * to the LiveQuery server.
  223. *
  224. */
  225. open() {
  226. const WebSocketImplementation = CoreManager.getWebSocketController();
  227. if (!WebSocketImplementation) {
  228. this.emit(CLIENT_EMMITER_TYPES.ERROR, 'Can not find WebSocket implementation');
  229. return;
  230. }
  231. if (this.state !== CLIENT_STATE.RECONNECTING) {
  232. this.state = CLIENT_STATE.CONNECTING;
  233. }
  234. this.socket = new WebSocketImplementation(this.serverURL); // Bind WebSocket callbacks
  235. this.socket.onopen = () => {
  236. this._handleWebSocketOpen();
  237. };
  238. this.socket.onmessage = event => {
  239. this._handleWebSocketMessage(event);
  240. };
  241. this.socket.onclose = () => {
  242. this._handleWebSocketClose();
  243. };
  244. this.socket.onerror = error => {
  245. this._handleWebSocketError(error);
  246. };
  247. }
  248. resubscribe() {
  249. this.subscriptions.forEach((subscription, requestId) => {
  250. const query = subscription.query;
  251. const queryJSON = query.toJSON();
  252. const where = queryJSON.where;
  253. const fields = queryJSON.keys ? queryJSON.keys.split(',') : undefined;
  254. const className = query.className;
  255. const sessionToken = subscription.sessionToken;
  256. const subscribeRequest = {
  257. op: OP_TYPES.SUBSCRIBE,
  258. requestId,
  259. query: {
  260. className,
  261. where,
  262. fields
  263. }
  264. };
  265. if (sessionToken) {
  266. subscribeRequest.sessionToken = sessionToken;
  267. }
  268. this.connectPromise.then(() => {
  269. this.socket.send(JSON.stringify(subscribeRequest));
  270. });
  271. });
  272. }
  273. /**
  274. * This method will close the WebSocket connection to this LiveQueryClient,
  275. * cancel the auto reconnect and unsubscribe all subscriptions based on it.
  276. *
  277. */
  278. close() {
  279. if (this.state === CLIENT_STATE.INITIALIZED || this.state === CLIENT_STATE.DISCONNECTED) {
  280. return;
  281. }
  282. this.state = CLIENT_STATE.DISCONNECTED;
  283. this.socket.close(); // Notify each subscription about the close
  284. for (const subscription of this.subscriptions.values()) {
  285. subscription.emit(SUBSCRIPTION_EMMITER_TYPES.CLOSE);
  286. }
  287. this._handleReset();
  288. this.emit(CLIENT_EMMITER_TYPES.CLOSE);
  289. } // ensure we start with valid state if connect is called again after close
  290. _handleReset() {
  291. this.attempts = 1;
  292. this.id = 0;
  293. this.requestId = 1;
  294. this.connectPromise = resolvingPromise();
  295. this.subscriptions = new Map();
  296. }
  297. _handleWebSocketOpen() {
  298. this.attempts = 1;
  299. const connectRequest = {
  300. op: OP_TYPES.CONNECT,
  301. applicationId: this.applicationId,
  302. javascriptKey: this.javascriptKey,
  303. masterKey: this.masterKey,
  304. sessionToken: this.sessionToken
  305. };
  306. this.socket.send(JSON.stringify(connectRequest));
  307. }
  308. _handleWebSocketMessage(event
  309. /*: any*/
  310. ) {
  311. let data = event.data;
  312. if (typeof data === 'string') {
  313. data = JSON.parse(data);
  314. }
  315. let subscription = null;
  316. if (data.requestId) {
  317. subscription = this.subscriptions.get(data.requestId);
  318. }
  319. switch (data.op) {
  320. case OP_EVENTS.CONNECTED:
  321. if (this.state === CLIENT_STATE.RECONNECTING) {
  322. this.resubscribe();
  323. }
  324. this.emit(CLIENT_EMMITER_TYPES.OPEN);
  325. this.id = data.clientId;
  326. this.connectPromise.resolve();
  327. this.state = CLIENT_STATE.CONNECTED;
  328. break;
  329. case OP_EVENTS.SUBSCRIBED:
  330. if (subscription) {
  331. subscription.emit(SUBSCRIPTION_EMMITER_TYPES.OPEN);
  332. }
  333. break;
  334. case OP_EVENTS.ERROR:
  335. if (data.requestId) {
  336. if (subscription) {
  337. subscription.emit(SUBSCRIPTION_EMMITER_TYPES.ERROR, data.error);
  338. }
  339. } else {
  340. this.emit(CLIENT_EMMITER_TYPES.ERROR, data.error);
  341. }
  342. break;
  343. case OP_EVENTS.UNSUBSCRIBED:
  344. // We have already deleted subscription in unsubscribe(), do nothing here
  345. break;
  346. default:
  347. {
  348. // create, update, enter, leave, delete cases
  349. if (!subscription) {
  350. break;
  351. }
  352. let override = false;
  353. if (data.original) {
  354. override = true;
  355. delete data.original.__type; // Check for removed fields
  356. for (const field in data.original) {
  357. if (!(field in data.object)) {
  358. data.object[field] = undefined;
  359. }
  360. }
  361. data.original = ParseObject.fromJSON(data.original, false);
  362. }
  363. delete data.object.__type;
  364. const parseObject = ParseObject.fromJSON(data.object, override);
  365. subscription.emit(data.op, parseObject, data.original);
  366. const localDatastore = CoreManager.getLocalDatastore();
  367. if (override && localDatastore.isEnabled) {
  368. localDatastore._updateObjectIfPinned(parseObject).then(() => {});
  369. }
  370. }
  371. }
  372. }
  373. _handleWebSocketClose() {
  374. if (this.state === CLIENT_STATE.DISCONNECTED) {
  375. return;
  376. }
  377. this.state = CLIENT_STATE.CLOSED;
  378. this.emit(CLIENT_EMMITER_TYPES.CLOSE); // Notify each subscription about the close
  379. for (const subscription of this.subscriptions.values()) {
  380. subscription.emit(SUBSCRIPTION_EMMITER_TYPES.CLOSE);
  381. }
  382. this._handleReconnect();
  383. }
  384. _handleWebSocketError(error
  385. /*: any*/
  386. ) {
  387. this.emit(CLIENT_EMMITER_TYPES.ERROR, error);
  388. for (const subscription of this.subscriptions.values()) {
  389. subscription.emit(SUBSCRIPTION_EMMITER_TYPES.ERROR);
  390. }
  391. this._handleReconnect();
  392. }
  393. _handleReconnect() {
  394. // if closed or currently reconnecting we stop attempting to reconnect
  395. if (this.state === CLIENT_STATE.DISCONNECTED) {
  396. return;
  397. }
  398. this.state = CLIENT_STATE.RECONNECTING;
  399. const time = generateInterval(this.attempts); // handle case when both close/error occur at frequent rates we ensure we do not reconnect unnecessarily.
  400. // we're unable to distinguish different between close/error when we're unable to reconnect therefore
  401. // we try to reonnect in both cases
  402. // server side ws and browser WebSocket behave differently in when close/error get triggered
  403. if (this.reconnectHandle) {
  404. clearTimeout(this.reconnectHandle);
  405. }
  406. this.reconnectHandle = setTimeout((() => {
  407. this.attempts++;
  408. this.connectPromise = resolvingPromise();
  409. this.open();
  410. }).bind(this), time);
  411. }
  412. }
  413. CoreManager.setWebSocketController(WebSocket);
  414. export default LiveQueryClient;