const assert = require('assert')

const { detailedDiff } = require('deep-object-diff')

const AsyncEmitter = require('./async-emitter')
const AWSError = require('./aws-error')
const { Data } = require('./data')
const {
  InvalidCachedModelError,
  InvalidModelDeletionError,
  InvalidModelUpdateError,
  InvalidOptionsError,
  InvalidParameterError,
  ModelAlreadyExistsError,
  ModelDeletedTwiceError,
  TransactionFailedError,
  WriteAttemptedInReadOnlyTxError,
  ModelTrackedTwiceError
} = require('./errors')
const { Query, Scan } = require('./iterators')
const { Key } = require('./key')
const { Model, NonExistentItem } = require('./models')
const { sleep, ITEM_SOURCE, loadOptionDefaults } = require('./utils')

async function getWithArgs (args, callback) {
  if (!args || !(args instanceof Array) || args.length === 0) {
    throw new InvalidParameterError('args', 'must be a non-empty array')
  }
  const [first, ...args1] = args
  if (first && first.prototype instanceof Model) {
    if (args1.length === 1 || args1.length === 2) {
      let handle
      if (args1.length === 2 && args1[1].createIfMissing) {
        handle = first.data(args1[0])
      } else {
        handle = first.key(args1[0])
      }
      return getWithArgs([handle, ...args1.slice(1)], callback)
    } else {
      throw new InvalidParameterError('args',
        'Expecting args to have a tuple of (Model, values, optionalOpt).')
    }
  } else if (first && first instanceof Key) {
    if (args1.length > 1) {
      throw new InvalidParameterError('args',
        'Expecting args to have a tuple of (key, optionalOpt).')
    }
    return callback(first, args1.length === 1 ? args1[0] : undefined)
  } else if (first && first instanceof Array && first.length !== 0) {
    const nonKeys = first.filter(obj => !(obj instanceof Key))
    if (nonKeys.length !== 0) {
      throw new InvalidParameterError('args',
        'Expecting args to have a tuple of ([key], optionalOpt).')
    }
    if (args1.length > 1) {
      throw new InvalidParameterError('args',
        'Expecting args to have a tuple of ([key], optionalOpt).')
    }

    const params = args1.length === 1 ? args1[0] : undefined
    return callback(first, params)
  } else {
    console.log(JSON.stringify(args))
    throw new InvalidParameterError('args',
      'Expecting Model or Key or [Key] as the first argument')
  }
}

/**
 * Batches put and update (potentially could support delete) requests to
 * DynamoDB within a transaction and sent on commit.
 * @private
 * @memberof Internal
 *
 * @example
 * const batcher = new __WriteBatcher()
 * batcher.write(model)
 * batcher.write(otherModel)
 * await batcher.commit()
 */
class __WriteBatcher {
  constructor () {
    this.__allModels = []
    this.__toWrite = []
    this.__toCheck = {}
    this.resolved = false
  }

  /**
   * Gets params for the request according to method, batches the params.
   * Favors update over put for writing to DynamoDB, except for a corner case
   * where update disallows write operations without an UpdateExpression. This
   * happens when a new model is created with no fields besides keys populated
   * and written to DB.
   *
   * @param {Model} model the model to write
   * @access private
   */
  async __write (model) {
    if (!model.__isMutated()) {
      throw new Error('Attempting to write an unchanged model ' +
        model.toString())
    }

    if (!this.__toCheck[model]) {
      if (this.__toCheck[model] === false) {
        throw new Error(`Attempting to write model ${model.toString()} twice`)
      } else {
        throw new Error('Attempting to write untracked model ' +
          model.toString())
      }
    }
    await model.finalize()
    this.__toCheck[model] = false

    let action
    let params

    if (model.__toBeDeleted) {
      action = 'Delete'
      params = model.__deleteParams()
    } else {
      action = 'Update'
      params = model.__updateParams()
      if (!Object.prototype.hasOwnProperty.call(
        params,
        'UpdateExpression'
      )) {
        // When a new item with no values other than the keys are written,
        // we have to use Put, else dynamodb would throw.
        action = 'Put'
        params = model.__putParams()
      }
    }
    this.__toWrite.push({ [action]: params })
  }

  /**
   * Start tracking models in a transaction. So when the batched write commits,
   * Optimistic locking on those readonly models is automatically performed.
   * @param {Model} model A model to track.
   */
  track (model) {
    const trackedModel = this.__toCheck[model]
    if (trackedModel !== undefined) {
      /**
       * Cannot track two instances pointing to the same item in dynamodb,
       */
      if (model.__src.isDelete) {
        throw new ModelDeletedTwiceError(model)
      } else if (!(model.__src.isGet || model.__src.isCreate) ||
                 !(trackedModel instanceof NonExistentItem)) {
        throw new ModelTrackedTwiceError(model, trackedModel)
      }
    }
    this.__allModels.push(model)
    this.__toCheck[model] = model
  }

  /**
   * Return all tracked models.
   */
  get trackedModels () {
    return Object.values(this.__allModels)
  }

  /**
   * Commits batched writes by sending DynamoDB requests.
   *
   * @returns {Boolean} whether any model is written to DB.
   */
  async commit (expectWrites) {
    assert(!this.resolved, 'Already wrote models.')
    this.resolved = true

    const promises = []
    for (const model of this.__allModels) {
      if (this.__toCheck[model] && model.__isMutated(expectWrites)) {
        promises.push(this.__write(model))
      }
    }
    await Promise.all(promises)

    if (this.__toWrite.length === 0) {
      return false
    }
    if (!expectWrites) {
      throw new WriteAttemptedInReadOnlyTxError(this.__toWrite)
    }

    if (this.__allModels.length === 1 &&
        this.__toWrite.length === 1) {
      await this.__allModels[0].__write()
      return true
    }
    const toCheck = Object.values(this.__toCheck)
      .map(m => {
        if (m === false) {
          // removed from __toCheck; updateExpression created in __write
          return undefined
        }
        return m.__conditionCheckParams()
      })
      .filter(cond => !!cond)
      .map(cond => {
        return { ConditionCheck: cond }
      })
    const items = [...this.__toWrite, ...toCheck]
    const params = {
      TransactItems: items
    }
    await this.transactWrite(params)
    return true
  }

  async transactWrite (txWriteParams) {
    const request = this.daxClient.transactWrite(txWriteParams)
    await request.catch(e => {
      throw this.__extractError(txWriteParams, e)
    })
  }

  /**
   * Find a model with the same TableName and Key from a list of models
   * @param {String} tableName
   * @param {Object} key { _id: '', _sk: '' }
   */
  __getModel (tableName, key) {
    return this.getModel(tableName, key._id, key._sk)
  }

  getModel (tableName, id, sk) {
    for (const model of this.__allModels) {
      if (model.__fullTableName === tableName &&
          model._id === id &&
          model._sk === sk) {
        return !(model instanceof NonExistentItem) ? model : undefined
      }
    }
  }

  /**
   * Extract CancellationReasons from the input error, and associate each
   * reason with a model passed to transactWrite API. The returned error is
   * either a single error, or a group of errors wrapped in an error under
   * `allErrors` property.
   * @param {*} params Input to transactWrite API
   * @param {Error} error An error throw by AWS SDK DynamoDB transactWrite API
   * @returns an Error object
   */
  __extractError (params, error) {
    const reasons = error.CancellationReasons
    error = new AWSError('extract error', error)
    assert(reasons, 'error missing reasons: ' + error)

    const allErrors = []

    for (let idx = 0; idx < reasons.length; idx++) {
      const reason = reasons[idx]
      if (reason.Code === 'ConditionalCheckFailed') {
        // Items in reasons maps 1to1 to items in request, here we do a reverse
        // lookup to find the original model that triggered the error.
        const transact = params.TransactItems[idx]
        const method = Object.keys(transact)[0]
        let model
        const tableName = transact[method].TableName
        switch (method) {
          case 'Update':
          case 'ConditionCheck':
          case 'Delete':
            model = this.__getModel(
              tableName,
              transact[method].Key
            )
            break
          case 'Put':
            model = this.__getModel(
              tableName,
              transact[method].Item
            )
            break
        }
        let CustomErrorCls
        if (model.__toBeDeleted) {
          CustomErrorCls = InvalidModelDeletionError
        } else if (model.__src.isCreate) {
          CustomErrorCls = ModelAlreadyExistsError
        } else if (model.__src.isUpdate) {
          CustomErrorCls = InvalidModelUpdateError
        }
        if (CustomErrorCls) {
          const err = new CustomErrorCls(tableName, model._id, model._sk)
          allErrors.push(err)
        }
      }
    }

    if (allErrors.length === 1) {
      return allErrors[0]
    } else {
      error.allErrors = allErrors
      return error
    }
  }
}

/**
 * Transaction context.
 */
class Transaction {
  /**
   * Options for running a transaction.
   * @typedef {Object} TransactionOptions
   * @property {Boolean} [readOnly=false] whether writes are allowed
   * @property {Number} [retries=3] The number of times to retry after the
   *   initial attempt fails.
   * @property {Number} [initialBackoff=500] In milliseconds, delay
   *   after the first attempt fails and before first retry happens.
   * @property {Number} [maxBackoff=10000] In milliseconds, max delay
   *   between retries. Must be larger than 200.
   * @property {Boolean} [cacheModels=false] Whether to cache models already
   *   retrieved from the database. When off, getting a model with the same key
   *   the second time in the same transaction results in an error. When on,
   *   `get`ting the same key simply returns the cached model. Previous
   *   modifications done to the model are reflected in the returned model. If
   *   the model key was used in some API other than "get", an error will
   *   result.
   */

  /**
   * Returns the default [options]{@link TransactionOptions} for a transaction.
   */
  get defaultOptions () {
    return {
      readOnly: false,
      retries: 3,
      initialBackoff: 500,
      maxBackoff: 10000,
      cacheModels: false
    }
  }

  /**
   * @param {TransactionOptions} [options] Options for the transaction
   */
  constructor (options) {
    const defaults = this.defaultOptions
    this.options = loadOptionDefaults(options, defaults)

    if (this.options.retries < 0) {
      throw new InvalidOptionsError('retries',
        'Retry count must be non-negative')
    }
    if (this.options.initialBackoff < 1) {
      throw new InvalidOptionsError('initialBackoff',
        'Initial back off must be larger than 1ms.')
    }
    if (this.options.maxBackoff < 200) {
      // A transactWrite would take some where between 100~200ms.
      // Max of less than 200 is too aggressive.
      throw new InvalidOptionsError('maxBackoff',
        'Max back off must be larger than 200ms.')
    }
  }

  /**
   * All events transactions may emit.
   *
   * POST_COMMIT: When a transaction is committed. Do clean up,
   *              summery, post process here.
   * TX_FAILED: When a transaction failed permanently (either by failing all
   *            retries, or getting a non-retryable error). Handler has the
   *            signature of (error) => {}.
   */
  static EVENTS = {
    POST_COMMIT: 'postCommit',
    TX_FAILED: 'txFailed'
  }

  addEventHandler (event, handler, name = undefined) {
    if (!Object.values(this.constructor.EVENTS).includes(event)) {
      throw new Error(`Unsupported event ${event}`)
    }
    // istanbul ignore next
    assert(name === undefined || !name.startsWith('_'),
      'Event name must not start with "_"')
    this.__eventIndex = (this.__eventIndex ?? 0) + 1
    this.__eventEmitter.once(event, handler,
      name ?? `_generatedName${this.__eventIndex}`)
  }

  /**
   * Get an item using DynamoDB's getItem API.
   *
   * @param {Key} key A key for the item
   * @param {GetParams} params Params for how to get the item
   */
  async __getItem (key, params) {
    const getParams = key.Cls.__getParams(key.encodedKeys, params)
    const data = await this.daxClient.get(getParams)
      .catch(
        // istanbul ignore next
        e => {
          throw new AWSError('get', e)
        })
    if (!params.createIfMissing && !data.Item) {
      this.__writeBatcher.track(new NonExistentItem(key))
      return undefined
    }
    const isNew = !data.Item
    const vals = data.Item || key.vals
    let model = new key.Cls(ITEM_SOURCE.GET, isNew, vals)
    if (model.__hasExpired) {
      // DynamoDB may not have deleted the model promptly, just treat it as if
      // it's not on server.
      if (params.createIfMissing) {
        model = new key.Cls(ITEM_SOURCE.GET, true, key.vals)
      } else {
        this.__writeBatcher.track(new NonExistentItem(key))
        return undefined
      }
    }
    this.__writeBatcher.track(model)
    return model
  }

  /**
   * Gets multiple items using DynamoDB's transactGetItems API.
   * @param {Array<Key>} keys A list of keys to get.
   * @param {GetParams} params Params used to get items, all items will be
   *   fetched using the same params.
   */
  async __transactGetItems (keys, params) {
    const txItems = []
    for (const key of keys) {
      const param = key.Cls.__getParams(key.encodedKeys, params)
      delete param.ConsistentRead // Omit for transactGetItems.
      txItems.push({
        Get: param
      })
    }
    const data = await this.daxClient.transactGet({
      TransactItems: txItems
    }).catch(
      // istanbul ignore next
      e => { throw new AWSError('transactGet', e) }
    )
    const responses = data.Responses
    const models = []
    for (let idx = 0; idx < keys.length; idx++) {
      const key = keys[idx]
      const data = responses[idx]
      if ((!params || !params.createIfMissing) && !data.Item) {
        this.__writeBatcher.track(new NonExistentItem(key))
        models[idx] = undefined
        continue
      }
      let model = new key.Cls(
        ITEM_SOURCE.GET,
        !data.Item,
        data.Item || key.vals)

      if (model.__hasExpired) {
        if (params.createIfMissing) {
          model = new key.Cls(
            ITEM_SOURCE.GET,
            true,
            key.vals)
        } else {
          model = undefined
        }
      }

      models[idx] = model
      if (model) {
        this.__writeBatcher.track(model)
      } else {
        this.__writeBatcher.track(new NonExistentItem(key))
      }
    }
    return models
  }

  /**
   * Gets multiple items using DynamoDB's batchGetItems API.
   * @param {Array<Key>} keys A list of keys to get.
   * @param {GetParams} params Params used to get items, all items will be
   *   fetched using the same params.
   */
  async __batchGetItems (keys, params) {
    let reqItems = {}
    const unorderedModels = []
    const modelClsLookup = {}
    for (const key of keys) {
      modelClsLookup[key.Cls.fullTableName] = key.Cls
      const param = key.Cls.__getParams(key.encodedKeys, params)
      const getsPerTable = reqItems[param.TableName] || { Keys: [] }
      getsPerTable.Keys.push(param.Key)
      getsPerTable.ConsistentRead = param.ConsistentRead
      reqItems[param.TableName] = getsPerTable
    }

    let reqCnt = 0
    while (Object.keys(reqItems).length !== 0) {
      if (reqCnt > 10) {
        throw new Error(`Failed to get all items ${
          keys.map(k => {
            return `${k.Cls.name} ${JSON.stringify(k.compositeID)}`
          })}`)
      }
      if (reqCnt !== 0) {
        // Backoff
        const millisBackOff = Math.min(100 * reqCnt, 1000)
        const offset = Math.floor(Math.random() * millisBackOff * 0.2) -
        millisBackOff * 0.1 // +-0.1 backoff as jitter to spread out conflicts
        await sleep(millisBackOff + offset)
      }
      reqCnt++

      const data = await this.daxClient.batchGet({
        RequestItems: reqItems
      }).catch(
        // istanbul ignore next
        e => { throw new AWSError('batchGet', e) }
      )

      // Merge results
      const responses = data.Responses
      for (const [modelClsName, items] of Object.entries(responses)) {
        const Cls = modelClsLookup[modelClsName]
        for (const item of items) {
          const tempModel = new Cls(ITEM_SOURCE.GET, false, item)
          if (!tempModel.__hasExpired) {
            unorderedModels.push(tempModel)
          }
        }
      }

      // Chain into next batch
      reqItems = data.UnprocessedKeys
    }

    // Restore ordering, creat models that are not on server.
    const models = []
    for (let idx = 0; idx < keys.length; idx++) {
      const key = keys[idx]
      const addModel = () => {
        for (const model of unorderedModels) {
          if (model.__fullTableName === key.Cls.fullTableName &&
              model._id === key.encodedKeys._id &&
              model._sk === key.encodedKeys._sk) {
            models.push(model)
            return true
          }
        }
        return false
      }
      if (addModel()) {
        continue
      }
      // If we reach here, no model is found for the key.
      if (params.createIfMissing) {
        models.push(new key.Cls(ITEM_SOURCE.GET, true, key.vals))
      } else {
        models.push(undefined)
      }
    }

    // Now track models, so everything is in expected order.
    for (let index = 0; index < models.length; index++) {
      const model = models[index]
      if (model) {
        this.__writeBatcher.track(model)
      } else {
        this.__writeBatcher.track(new NonExistentItem(keys[index]))
      }
    }
    return models
  }

  /**
   * Fetches model(s) from database.
   * This method supports 3 different signatures.
   *   get(Cls, keyOrDataValues, params)
   *   get(Key|Data, params)
   *   get([Key|Data], params)
   *
   * When only one items is fetched, DynamoDB's getItem API is called. Must use
   * a Key when createIfMissing is not true, and Data otherwise.
   *
   * When a list of items is fetched:
   *   If inconsistentRead is false (the default), DynamoDB's transactGetItems
   *     API is called for a strongly consistent read. Transactional reads will
   *     be slower than batched reads.
   *   If inconsistentRead is true, DynamoDB's batchGetItems API is called.
   *     Batched fetches are more efficient than calling get with 1 key many
   *     times, since there is less HTTP request overhead. Batched fetches is
   *     faster than transactional fetches, but provides a weaker consistency.
   *
   * @param {Class} Cls a Model class.
   * @param {String|CompositeID} key Key or keyValues
   * @param {GetParams} [params]
   * @returns Model(s) associated with provided key
   */
  async get (...args) {
    return getWithArgs(args, async (arg, params) => {
      // make sure we have a Key or Data depending on createIfMissing
      params = params || {}
      const argIsArray = arg instanceof Array
      const arr = argIsArray ? arg : [arg]
      for (let i = 0; i < arr.length; i++) {
        if (params.createIfMissing) {
          if (!(arr[i] instanceof Data)) {
            throw new InvalidParameterError('args',
              'must pass a Data to tx.get() when createIfMissing is true')
          }
        } else if (arr[i] instanceof Data) {
          throw new InvalidParameterError('args',
            'must pass a Key to tx.get() when createIfMissing is not true')
        }
      }
      const cachedModels = []
      let keysOrDataToGet = []
      if (this.options.cacheModels) {
        for (const keyOrData of arr) {
          const cachedModel = this.__writeBatcher.getModel(
            keyOrData.Cls.fullTableName,
            keyOrData.encodedKeys._id,
            keyOrData.encodedKeys._sk
          )
          if (cachedModel) {
            if (!cachedModel.__src.canBeCached || cachedModel.__toBeDeleted) {
              throw new InvalidCachedModelError(cachedModel)
            }
            cachedModels.push(cachedModel)
          } else {
            keysOrDataToGet.push(keyOrData)
          }
        }
      } else {
        keysOrDataToGet = arr
      }
      // fetch the data in bulk if more than 1 item was requested
      const fetchedModels = []
      if (keysOrDataToGet.length > 0) {
        if (argIsArray) {
          if (!params.inconsistentRead) {
            fetchedModels.push(
              ...(await this.__transactGetItems(keysOrDataToGet, params)))
          } else {
            fetchedModels.push(
              ...(await this.__batchGetItems(keysOrDataToGet, params)))
          }
        } else {
          // just fetch the one item that was requested
          fetchedModels.push(await this.__getItem(keysOrDataToGet[0], params))
        }
      }

      let ret = []
      if (this.options.cacheModels) {
        const findModel = (tableName, id, sk) => {
          for (let index = 0; index < keysOrDataToGet.length; index++) {
            const toGetKeyOrData = keysOrDataToGet[index]
            if (tableName === toGetKeyOrData.Cls.fullTableName &&
              id === toGetKeyOrData.encodedKeys._id &&
              sk === toGetKeyOrData.encodedKeys._sk) {
              return fetchedModels[index]
            }
          }

          for (const model of cachedModels) {
            // istanbul ignore else
            if (tableName === model.constructor.fullTableName &&
              id === model._id &&
              sk === model._sk) {
              return model
            }
          }
        }
        for (const keyOrData of arr) {
          ret.push(findModel(
            keyOrData.Cls.fullTableName,
            keyOrData.encodedKeys._id,
            keyOrData.encodedKeys._sk
          ))
        }
      } else {
        // UnorderedModels is really ordered when cacheModels is disabled
        // don't sort to save time
        ret = fetchedModels
      }

      return argIsArray ? ret : ret[0]
    })
  }

  /**
   * Updates an item without reading from DB. If the item doesn't exist in DB,
   * ConditionCheckFailure will be thrown.
   *
   * @param {Class} Cls The model's class.
   * @param {CompositeID|Object} original A superset of CompositeID,
   *   field's values. Used as conditions for the update
   * @param {Object} updated Updated fields for the item, without CompositeID
   *   fields.
   */
  update (Cls, original, updated) {
    if (Object.values(original).filter(d => d === undefined).length !== 0) {
      // We don't check for attribute_not_exists anyway.
      throw new InvalidParameterError(
        'original',
        'original values must not be undefined')
    }
    if (!updated || Object.keys(updated).length === 0) {
      throw new InvalidParameterError(
        'updated',
        'must have values to be updated')
    }

    const data = Cls.__splitKeysAndData(original)[2] // this also checks keys
    const model = new Cls(ITEM_SOURCE.UPDATE, false, original)
    Object.keys(data).forEach(k => {
      model.getField(k).get() // Read to show in ConditionExpression
    })

    Object.keys(updated).forEach(key => {
      if (Cls._attrs[key].keyType !== undefined) {
        throw new InvalidParameterError(
          'updated', 'must not contain key fields')
      }
      model[key] = updated[key]
    })

    this.__writeBatcher.track(model)

    // Don't return model, since it should be closed to further modifications.
    // return model
  }

  /**
   * Creates or puts an item without reading from DB.
   * It differs from {@link update} in that:
   *   a) If item doesn't exists, a new item is created in DB
   *   b) If item does exists, fields present locally will overwrite values in
   *      DB, fields absent locally will be removed from DB.
   *
   * @param {Class} Cls The model's class.
   * @param {CompositeID|Object} original A superset of CompositeID,
   *   field's values. Non-key values are used for conditional locking
   * @param {Object} updated Final values for the model.
   *   Values for every field in the model must be provided. Fields with
   *   `undefined` value will be removed from DB.
   */
  createOrPut (Cls, updated, original = {}) {
    const newData = { ...updated }
    for (const key of Object.keys(original)) {
      if (Object.hasOwnProperty.call(newData, key)) {
        // cannot change a key component's value
        if (Cls.__KEY_COMPONENT_NAMES.has(key)) {
          if (newData[key] !== original[key]) {
            throw new InvalidParameterError(updated,
              'key components values in updated must match (or be omitted)')
          }
        }
      } else {
        // old values which aren't explicitly changed are kept the same
        newData[key] = original[key]
      }
    }
    // We create the item we intend to write (with newData), and then update
    // its __initialValue for any preconditions requested (with `original`).
    // Creating the model with newData validates that newData specified are
    // complete, valid item all on its own.
    const model = new Cls(ITEM_SOURCE.CREATE_OR_PUT, true, newData)
    Object.keys(original).forEach(key => {
      const field = model.getField(key)
      // we set the initial value and then mark it as read so that the write
      // batcher later generates a database update which is conditioned on the
      // the item's current value in the database for this field being
      // original[key] (if the item existed)
      field.__initialValue = original[key]
      field.get()
    })
    this.__writeBatcher.track(model)

    // Don't return model, since it should be closed to further modifications.
    // return model
  }

  /**
   * Creates a model without accessing DB. Write will make sure the item does
   * not exist.
   *
   * @param {Model} Cls A Model class.
   * @param {CompositeID|Object} data A superset of CompositeID of the model,
   *   plus any data for Fields on the Model.
   */
  create (Cls, data) {
    const model = new Cls(ITEM_SOURCE.CREATE, true, { ...data })
    this.__writeBatcher.track(model)
    return model
  }

  /**
   * Deletes model(s) from database.
   *
   * If a model is read from database, but it did not exist when deleting the
   * item, an exception is raised.
   *
   * @param {List<Key|Model>} args Keys and Models
   */
  delete (...args) {
    for (const a of args) {
      if (a instanceof Model) {
        a.__markForDeletion()
      } else if (a instanceof Key) {
        const model = new a.Cls(ITEM_SOURCE.DELETE, true,
          a.keyComponents)
        this.__writeBatcher.track(model)
      } else {
        throw new InvalidParameterError('args', 'Must be models and keys')
      }
    }
  }

  /**
   * Create a handle for applications to scan DB items.
   * @param {Model} ModelCls A model class.
   * @param {IteratorOptions} params.options Iterator options
   * @return Scan handle. See {@link __DBIterator} for details.
   */
  scan (ModelCls, options) {
    return new Scan({
      ModelCls,
      writeBatcher: this.__writeBatcher,
      options: { cacheModels: this.options.cacheModels, ...options }
    })
  }

  /**
   * Create a handle for applications to query DB items.
   * @param {Model} ModelCls A model class.
   * @param {IteratorOptions} params.options Iterator options
   * @return Query handle. See {@link __DBIterator} for details.
   */
  query (ModelCls, options) {
    return new Query({
      ModelCls,
      writeBatcher: this.__writeBatcher,
      options: { cacheModels: this.options.cacheModels, ...options }
    })
  }

  __reset () {
    this.__writeBatcher = new __WriteBatcher()
    this.__eventEmitter = new AsyncEmitter()
  }

  static __isRetryable (err) {
    const retryableErrors = {
      ConditionalCheckFailedException: true,
      TransactionCanceledException: true
    }

    if (err.retryable) {
      return true
    }

    if (retryableErrors[err.code]) {
      return true
    }
    return false
  }

  /** Marks a transaction as read-only. */
  makeReadOnly () {
    this.options.readOnly = true
  }

  /** Enables model cache */
  enableModelCache () {
    this.options.cacheModels = true
  }

  /**
   * Runs a closure in transaction.
   * @param {Function} func the closure to run
   * @access private
   */
  async __run (func) {
    if (!(func instanceof Function || typeof func === 'function')) {
      throw new InvalidParameterError('func', 'must be a function / closure')
    }

    let millisBackOff = this.options.initialBackoff
    const maxBackoff = this.options.maxBackoff
    for (let tryCnt = 0; tryCnt <= this.options.retries; tryCnt++) {
      try {
        this.__reset()
        const ret = await func(this)
        await this.__writeBatcher.commit(!this.options.readOnly)
        await this.__eventEmitter.emit(this.constructor.EVENTS.POST_COMMIT)
        return ret
      } catch (err) {
        // make sure EVERY error is retryable; allErrors is present if err
        // was thrown in __WriteBatcher.commit()'s onError handler
        const allErrors = err.allErrors || [err]
        const errorMessages = []
        for (let i = 0; i < allErrors.length; i++) {
          const anErr = allErrors[i]
          if (!this.constructor.__isRetryable(anErr)) {
            errorMessages.push(`  ${i + 1}) ${anErr.message}`)
          }
        }
        if (errorMessages.length) {
          if (allErrors.length === 1) {
            // if there was only one error, just rethrow it
            const e = allErrors[0]
            await this.__eventEmitter.emit(this.constructor.EVENTS.TX_FAILED,
              e)
            throw e
          } else {
            // if there were multiple errors, combine it into one error which
            // summarizes all of the failures
            const e = new TransactionFailedError(
              ['Multiple Non-retryable Errors: ', ...errorMessages].join('\n'),
              err)
            await this.__eventEmitter.emit(this.constructor.EVENTS.TX_FAILED,
              e)
            throw e
          }
        } else {
          console.log(`Transaction commit attempt ${tryCnt} failed with ` +
            `error ${err}.`)
        }
      }
      if (tryCnt >= this.options.retries) {
        // note: this exact message is checked and during load testing this
        // error will not be sent to Sentry; if this message changes, please
        // update make-app.js too
        const err = new TransactionFailedError('Too much contention.')
        await this.__eventEmitter.emit(this.constructor.EVENTS.TX_FAILED, err)
        throw err
      }
      const offset = Math.floor(Math.random() * millisBackOff * 0.2) -
        millisBackOff * 0.1 // +-0.1 backoff as jitter to spread out conflicts
      await sleep(millisBackOff + offset)
      millisBackOff = Math.min(maxBackoff, millisBackOff * 2)
    }
  }

  /**
   * Runs a function in transaction, using specified parameters.
   *
   * If a non-retryable error is thrown while running the transaction, it will
   * be re-raised.
   *
   * @param {TransactionOptions} [options]
   * @param {Function} func the closure to run.
   *
   * @example
   * // Can be called in 2 ways:
   * Transaction.run(async (tx) => {
   *   // Do something
   * })
   *
   * // Or
   * Transaction.run({ retryCount: 2 }, async (tx) => {
   *   // Do something
   * })
   */
  static async run (...args) {
    const opts = (args.length === 1) ? {} : args[0]
    const func = args[args.length - 1]
    if (args.length <= 0 || args.length > 2) {
      throw new InvalidParameterError('args', 'should be ([options,] func)')
    }
    return new Transaction(opts).__run(func)
  }

  /**
   * Return before and after snapshots of all relevant models.
   */
  getModelDiffs (filter = () => true) {
    const allBefore = []
    const allAfter = []
    const allDiff = []
    for (const model of this.__writeBatcher.trackedModels) {
      // istanbul ignore if
      if (!filter(model)) {
        continue
      }
      const before = model.getSnapshot({ initial: true, dbKeys: true })
      const after = model.getSnapshot({ initial: false, dbKeys: true })
      const modelName = model.key ? model.key.Cls.name : model.constructor.name
      const key = model.key ? model.key.encodedKeys : model.__encodedKey
      allBefore.push({ [modelName]: { ...key, data: before } })
      allAfter.push({ [modelName]: { ...key, data: after } })
      const diff = detailedDiff(before, after)
      allDiff.push({ [modelName]: { ...key, data: diff } })
    }
    return {
      before: allBefore,
      after: allAfter,
      diff: allDiff
    }
  }
}

module.exports = {
  __WriteBatcher,
  Transaction,
  getWithArgs
}