gen_transactional.js

  1. // Modifications copyright 2020 Caf.js Labs and contributors
  2. /*!
  3. Copyright 2013 Hewlett-Packard Development Company, L.P.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. 'use strict';
  15. /**
  16. * A transactional plug to manage internal state safely.
  17. *
  18. * CAF goal is to create and manage billions of long running, active, stateful
  19. * proxies, i.e., CAs, where "long running" means years. In order to tolerate
  20. * -without human intervention- failures, upgrades, migration, or other errors,
  21. * the CA state has to be managed carefully.
  22. *
  23. * A complication is that CA state is not only application state, but state
  24. * associated with the plugins that the application uses.
  25. *
  26. * Also, CAF always ensures that CA state is consistent with
  27. * the externalized view of that state, i.e., CA actions that affect the
  28. * external world.
  29. *
  30. * For example, if a CA sends a message, and it keeps track of
  31. * sent messages in its internal state, even in the presence of failures
  32. * it should never forget that it sent that message.
  33. *
  34. * CAF approach assumes:
  35. *
  36. * 1. Operations that affect the external world are mediated by a
  37. * plugin that makes them idempotent.
  38. *
  39. * 2. Plugins defer these operations until commit time, ensuring that, before
  40. * execution, a persistent record of them has been checkpointed to an external
  41. * service.
  42. *
  43. * 3. All plugins and application code coordinate using a two-phase commit
  44. * protocol, so that disaggregated state is managed consistently.
  45. *
  46. * 4. During recovery the last committed state is reloaded, and deferred
  47. * operations in the checkpoint are retried until they succeed.
  48. *
  49. * Methods `__ca_setLogActionsTarget__` and `__ca_lazyApply__` defer methods.
  50. *
  51. * Methods `__ca_init__` and `__ca_resume__` handle initialization and
  52. * recovery.
  53. *
  54. * Methods `__ca_begin__`, `__ca_prepare__`, `__ca_commit__` and `__ca_abort__`
  55. * implement the two-phase commit protocol.
  56. *
  57. * Variable `that.state` is a JSON-serializable representation of this plugin
  58. * state. The contents of this variable are always checkpointed before
  59. * any state externalization, unless it is `null`.
  60. *
  61. * @module caf_components/gen_transactional
  62. * @augments module:caf_components/gen_container
  63. */
  64. // @ts-ignore: augments not attached to a class
  65. const genContainer = require('./gen_container');
  66. const myUtils = require('./myUtils');
  67. const async = require('async');
  68. /**
  69. * Helper constructor method for a transactional component.
  70. *
  71. * Description of types in file `types.js`.
  72. *
  73. * @param {ctxType} $ A context containing references to other components.
  74. * @param {specType} spec Configuration data for this component.
  75. * @return {Object} A new generic component.
  76. *
  77. * @throws {Error} If inputs are invalid.
  78. */
  79. exports.create = function($, spec) {
  80. const that = genContainer.create($, spec);
  81. /**
  82. * JSON-serializable representation of a plugin private state.
  83. *
  84. * The contents of this variable are always checkpointed before
  85. * any state externalization, unless it is `null`.
  86. *
  87. *
  88. * @type {Object}
  89. * @memberof! module:caf_ca/gen_transactional#
  90. * @alias state
  91. */
  92. that.state = null;
  93. /*
  94. * Backup state to provide transactional behavior for the handler.
  95. *
  96. */
  97. var stateBackup = '';
  98. var logActions = [];
  99. var logActionsTarget = that;
  100. const childrenSpec = that.__ca_getChildrenSpec__();
  101. var transChildrenObj = null;
  102. const applyFun = function(method, args) {
  103. return function(x, cb0) {
  104. try {
  105. const argsAll = (args || []).concat([cb0]);
  106. const wrappedMethod = myUtils.wrapAsyncFunction(x.obj[method],
  107. x.obj);
  108. wrappedMethod.apply(x.obj, argsAll);
  109. } catch (err) {
  110. cb0(err);
  111. }
  112. };
  113. };
  114. const childrenSeriesTransF = function(f, cb) {
  115. const findTrans = function() {
  116. const result = {};
  117. childrenSpec.forEach(function(x) {
  118. if (!that.$[x.name]) {
  119. throw new Error('Missing child ' + x.name);
  120. } else if (that.$[x.name].__ca_isTransactional__) {
  121. result[x.name] = x;
  122. }
  123. });
  124. return result;
  125. };
  126. transChildrenObj = findTrans(); // throw if any child missing
  127. const transSpec = childrenSpec.filter(function(x) {
  128. return transChildrenObj[x.name];
  129. });
  130. const transComp = [];
  131. transSpec.forEach(function(x) {
  132. const obj = that.$[x.name];
  133. if (obj) {
  134. transComp.push({obj: obj, name: x.name});
  135. }
  136. });
  137. async.mapSeries(transComp, f, function(err, res) {
  138. if (err) {
  139. cb(err, res);
  140. } else {
  141. const result = {};
  142. transComp.forEach(function(x, i) {
  143. result[x.name] = res[i];
  144. });
  145. cb(err, result);
  146. }
  147. });
  148. };
  149. /**
  150. * Run-time type information.
  151. *
  152. * @type {boolean}
  153. * @memberof! module:caf_components/gen_transactional#
  154. * @alias __ca_isTransactional__
  155. */
  156. that.__ca_isTransactional__ = true;
  157. const replayLog = function(cb) {
  158. async.eachSeries(logActions, function(x, cb0) {
  159. try {
  160. const args = (x.args && x.args.slice(0) || []);
  161. args.push(cb0);
  162. const wrappedMethod = myUtils.wrapAsyncFunction(
  163. logActionsTarget[x.method],
  164. logActionsTarget
  165. );
  166. wrappedMethod.apply(logActionsTarget, args);
  167. } catch (ex) {
  168. ex.method = x.method;
  169. ex.args = x.args;
  170. cb0(ex);
  171. }
  172. }, function(err, data) {
  173. if (err) {
  174. cb(err);
  175. } else {
  176. logActions = [];
  177. cb(err, data);
  178. }
  179. });
  180. };
  181. /**
  182. * Sets a receiver object implementing all the delayed methods.
  183. *
  184. * @param {Object} obj A receiver for delayed methods.
  185. *
  186. * @memberof! module:caf_components/gen_transactional#
  187. * @alias __ca_setLogActionsTarget__
  188. */
  189. that.__ca_setLogActionsTarget__ = function(obj) {
  190. logActionsTarget = obj;
  191. };
  192. /**
  193. * Queues an operation to be executed at commit time.
  194. *
  195. * Internally, operations are made asynchronous and executed serially by
  196. * adding an extra callback. The actual implementation of these operations
  197. * is delegated to the `logActionsTarget` object.
  198. *
  199. * Operations are assumed to be idempotent.
  200. *
  201. * @param {string} method A method name to execute.
  202. * @param {Array.<any>} args An array of method arguments.
  203. *
  204. *
  205. * @memberof! module:caf_components/gen_transactional#
  206. * @alias __ca_lazyApply__
  207. */
  208. that.__ca_lazyApply__ = function(method, args) {
  209. logActions.push({method: method, args: args});
  210. };
  211. /**
  212. * Initializes the state of this plug from scratch.
  213. *
  214. * This method is called only once, when the plug is created.
  215. *
  216. * @param {cbType} cb A callback to continue after initialization.
  217. *
  218. * @memberof! module:caf_components/gen_transactional#
  219. * @alias __ca_init__
  220. */
  221. that.__ca_init__ = function(cb) {
  222. try {
  223. logActions = [];
  224. childrenSeriesTransF(applyFun('__ca_init__', []), cb);
  225. } catch (err) {
  226. cb(err);
  227. }
  228. };
  229. /**
  230. * Reloads the state of this plug from a previous checkpoint.
  231. *
  232. * It also retries deferred operations in the checkpoint.
  233. *
  234. * This method is called many times, for example, after
  235. * recovering from a failure or restarting after migration.
  236. *
  237. * @param {Object} cp The last checkpoint of the state of this plug.
  238. * @param {cbType} cb A callback to continue after resuming.
  239. *
  240. * @memberof! module:caf_components/gen_transactional#
  241. * @alias __ca_resume__
  242. */
  243. that.__ca_resume__ = function(cp, cb) {
  244. try {
  245. cp = cp || {}; //Hot code changes add a new transactional plugin
  246. const cb0 = function(err, data) {
  247. if (err) {
  248. cb(err, data);
  249. } else {
  250. that.state = cp.state || that.state;
  251. logActions = cp.logActions || [];
  252. replayLog(cb);
  253. }
  254. };
  255. childrenSeriesTransF(function(x, cb1) {
  256. try {
  257. const wrappedMethod = myUtils.wrapAsyncFunction(
  258. x.obj.__ca_resume__,
  259. x.obj
  260. );
  261. wrappedMethod.apply(x.obj, [cp[x.name], cb1]);
  262. } catch (err) {
  263. cb1(err);
  264. }
  265. }, cb0);
  266. } catch (err) {
  267. cb(err);
  268. }
  269. };
  270. /**
  271. * Begins a two phase commit transaction.
  272. *
  273. * CAF calls this method before the application handler processes
  274. * a message. A read-only copy of the message is passed as an argument
  275. * to facilitate configuration.
  276. *
  277. * @param {Object} msg The message to be processed.
  278. * @param {cbType} cb A callback to continue the transaction.
  279. *
  280. * @memberof! module:caf_components/gen_transactional#
  281. * @alias __ca_begin__
  282. *
  283. */
  284. that.__ca_begin__ = function(msg, cb) {
  285. try {
  286. if (that.state) {
  287. stateBackup = JSON.stringify(that.state);
  288. }
  289. logActions = [];
  290. childrenSeriesTransF(applyFun('__ca_begin__', [msg]), cb);
  291. } catch (err) {
  292. cb(err);
  293. }
  294. };
  295. /**
  296. * Prepares to commit the transaction.
  297. *
  298. * CAF calls this method after the handler has succesfully
  299. * processed the message.
  300. *
  301. * If ready to commit, it returns in the callback a JSON
  302. * serializable data structure reflecting the new state after
  303. * processing the message.
  304. *
  305. * Then, CAF checkpoints this data structure using a remote service.
  306. *
  307. * To abort the transaction we return an error in the (node.js) callback.
  308. * This will abort all the transactional plugs associated with the CA.
  309. *
  310. * @param {cbType} cb A callback to continue or abort the transaction.
  311. *
  312. * @memberof! module:caf_components/gen_transactional#
  313. * @alias __ca_prepare__
  314. */
  315. that.__ca_prepare__ = function(cb) {
  316. try {
  317. const cb0 = function(err, data) {
  318. if (err) {
  319. cb(err, data);
  320. } else {
  321. if (that.state) {
  322. data.state = that.state;
  323. }
  324. if (logActions.length > 0) {
  325. data.logActions = logActions;
  326. }
  327. cb(err, data);
  328. }
  329. };
  330. childrenSeriesTransF(applyFun('__ca_prepare__', []), cb0);
  331. } catch (err) {
  332. cb(err);
  333. }
  334. };
  335. /**
  336. * Commits the transaction.
  337. *
  338. * Called by CAF when all the `prepare` calls to transactional
  339. * plugs were successful, and the new state of those plugs has been
  340. * checkpointed using an external service.
  341. *
  342. * An error during commit will shutdown the CA since we cannot abort
  343. * committed transactions. See `__ca_resume__` for the recovery strategy.
  344. *
  345. * @param {cbType} cb A callback to continue after commiting.
  346. *
  347. * @memberof! module:caf_components/gen_transactional#
  348. * @alias __ca_commit__
  349. */
  350. that.__ca_commit__ = function(cb) {
  351. try {
  352. const cb0 = function(err, data) {
  353. if (err) {
  354. cb(err, data);
  355. } else {
  356. replayLog(cb);
  357. }
  358. };
  359. childrenSeriesTransF(applyFun('__ca_commit__', []), cb0);
  360. } catch (err) {
  361. cb(err);
  362. }
  363. };
  364. /**
  365. * Aborts the transaction.
  366. *
  367. * CAF calls this method when an error was returned
  368. * by the handler, or any transactional plugs propagated an error during
  369. * `prepare`.
  370. *
  371. * Note that an error during remote checkpointing cannot
  372. * guarantee that the checkpoint was not made durable, and we need to
  373. * assume that it did. This means that we need to shutdown the CA.
  374. *
  375. * An implementation of this method should undo state changes and
  376. * ignore deferred operations.
  377. *
  378. * @param {cbType} cb A callback to continue after aborting.
  379. *
  380. * @memberof! module:caf_components/gen_transactional#
  381. * @alias __ca_abort__
  382. */
  383. that.__ca_abort__ = function(cb) {
  384. try {
  385. if (that.state && stateBackup) {
  386. that.state = JSON.parse(stateBackup);
  387. }
  388. logActions = [];
  389. childrenSeriesTransF(applyFun('__ca_abort__', []), cb);
  390. } catch (err) {
  391. cb(err);
  392. }
  393. };
  394. return that;
  395. };