const AWSError = require('./aws-error')
const {
InvalidCachedModelError, InvalidFilterError, InvalidOptionsError
} = require('./errors')
const Filter = require('./filter')
const { ITEM_SOURCE, loadOptionDefaults } = require('./utils')
function mergeCondition (base, additional) {
const [condition, attrNames, attrValues] = base
condition.push(...(additional[0] ?? []))
Object.assign(attrNames, additional[1] ?? {})
Object.assign(attrValues, additional[2] ?? {})
}
/**
* Database iterator. Supports query and scan operations.
* @private
*/
class __DBIterator {
/**
* Create an iterator instance.
* @param {Object} params
* @param {Class} params.ModelCls The model class.
* @param {__WriteBatcher} params.writeBatcher An instance of __WriteBatcher
* @param {Object} params.options Iterator options
*/
constructor ({
ModelCls,
writeBatcher,
options
}) {
this.__writeBatcher = writeBatcher
this.__ModelCls = ModelCls
this.__fetchParams = {}
Object.assign(this, options)
__DBIterator.__assertValidInputs(ModelCls, options)
}
static __assertValidInputs (ModelCls, options) {
if (options.index) {
if (options.inconsistentRead === false) {
throw new InvalidOptionsError('index and inconsistent read. Reading from index needs to be inconsistent')
}
if (!ModelCls.INDEXES[options.index]) {
throw new InvalidOptionsError(`index ${options.index}`)
}
}
}
static get METHOD () {
return this.name.toLowerCase()
}
static __getKeyNames (Cls, index) {
const extractKey = (name) => {
if (index) {
return new Set(Cls.INDEXES[index][name])
}
return new Set(Object.keys(Cls[name]))
}
const partitionKeys = extractKey('KEY')
const sortKeys = extractKey('SORT_KEY')
const allKeys = new Set([...partitionKeys, ...sortKeys])
return {
partitionKeys,
sortKeys,
allKeys
}
}
__getKeyConditionExpression () {
return [[], {}, {}] // conditions, attrNames, attrValues
}
__getFilterExpression () {
return [[], {}, {}] // conditions, attrNames, attrValues
}
__addConditionExpression (params, methodName) {
const [conditions, attrNames, attrValues] = this[methodName]()
if (conditions.length > 0) {
const type = methodName.replace('__get', '')
params[type] = conditions.map(c => {
return '(' + c + ')'
}).join(' AND ')
params.ExpressionAttributeNames = params.ExpressionAttributeNames ?? {}
Object.assign(params.ExpressionAttributeNames, attrNames)
params.ExpressionAttributeValues = params.ExpressionAttributeValues ??
{}
Object.assign(params.ExpressionAttributeValues, attrValues)
}
}
__setupParams () {
if (Object.keys(this.__fetchParams).length === 0) {
// pagination and limit flags are setup else where since their values can
// dynamically change as the iterator moves on.
const params = {
TableName: this.__ModelCls.fullTableName,
ConsistentRead: !this.inconsistentRead
}
if (this.index) {
params.IndexName = this.index
}
this.__addConditionExpression(params, '__getKeyConditionExpression')
this.__addConditionExpression(params, '__getFilterExpression')
if (this.shardIndex !== undefined) {
params.Segment = this.shardIndex
params.TotalSegments = this.shardCount
}
if (this.descending !== undefined) {
params.ScanIndexForward = !this.descending
}
Object.assign(this.__fetchParams, params)
}
// Keep __fetchParams unchanged by shallow copying.
// Only top level keys are modified (ExclusiveStartKey),
// deepcopy is not necessary
return { ...this.__fetchParams }
}
/**
* Get one batch of items, by going through at most n items. Return a
* nextToken for pagination.
*
* @param {Integer} n The max number of items to check (not return). When
* filtering is done, items not passing the filter conditions will not be
* returned, but they count towards the max.
* @param {Object} [nextToken=undefined] A token for fetching the next batch.
* It is returned from a previous call to __getBatch. When nextToken is
* undefined, the function will go from the start of the DB table.
*
* @return {Tuple(Array<Model>, String)} A tuple of (items, nextToken). When
* nextToken
* is undefined, the end of the DB table has been reached.
*/
async __getBatch (n, nextToken = undefined) {
const params = this.__setupParams()
params.Limit = n
if (!nextToken) {
delete params.ExclusiveStartKey
} else {
params.ExclusiveStartKey = nextToken
}
const method = this.constructor.METHOD
const client = this.bypassCache ? this.documentClient : this.daxClient
const result = await client[method](params)
.catch(
// istanbul ignore next
e => { throw new AWSError(method, e) })
const models = result.Items?.map(item => {
const m = new this.__ModelCls(ITEM_SOURCE[method.toUpperCase()], false,
item, this.index)
if (m.__hasExpired) {
return undefined
}
if (this.cacheModels) {
const model = this.__writeBatcher.getModel(
this.__ModelCls.fullTableName,
m.__encodedKey._id,
m.__encodedKey._sk
)
if (model) {
if (!model.__src.canBeCached || model.__toBeDeleted) {
throw new InvalidCachedModelError(model)
}
return model
}
}
this.__writeBatcher.track(m)
return m
}).filter(m => !!m) || []
return [
models,
result.LastEvaluatedKey
]
}
/**
* Fetch n items from DB, return the fetched items and a token to next page.
*
* @param {Integer} n The number of items to return.
* @param {String} [nextToken=undefined] A string token for fetching the next
* batch. It is returned from a previous call to fetch. When nextToken is
* undefined, the function will go from the start of the DB table.
*
* @return {Tuple(Array<Model>, String)} A tuple of (items, nextToken). When
* nextToken is undefined, the end of the DB table has been reached.
*/
async fetch (n, nextToken = undefined) {
const ret = []
nextToken = nextToken ? JSON.parse(nextToken) : undefined
while (ret.length < n) {
const [ms, nt] = await this.__getBatch(
n - ret.length,
nextToken
)
ret.push(...ms)
nextToken = nt // Update even if nt is undefined, to terminate pagination
if (!nt) {
// no more items
break
}
}
return [ret, nextToken ? JSON.stringify(nextToken) : undefined]
}
/**
* A generator API for retrieving items from DB.
*
* @param {Integer} n The number of items to return.
*/
async * run (n) {
let fetchedCount = 0
let nextToken
while (fetchedCount < n) {
const [models, nt] = await this.__getBatch(
Math.min(n - fetchedCount, 50),
nextToken
)
for (const model of models) {
yield model
}
if (!nt) {
return
}
fetchedCount += models.length
nextToken = nt
}
}
}
/**
* Scan handle for constructing filter expressions
*/
class Scan extends __DBIterator {
/**
* Create an iterator instance.
* @param {Object} params
* @param {Class} params.ModelCls The model class.
* @param {__WriteBatcher} params.writeBatcher An instance of __WriteBatcher
* @param {Object} params.options Iterator options
* @property {Boolean} [params.options.inconsistentRead=false] When true,
* a strongly consistent read is performed.
* Note: Consistent read(inconsistentRead=false) only available when query
* or scan is performed on Model or LocalSecondaryIndex. To retrieve more
* recent results when scanning GlobalSecondaryIndex, enabling
* `bypassCache` instead.
* @property {Number} [params.options.shardIndex=undefined] Only available in
* scan. Distributed scan is supported by dynamodb, where multiple machines
* can scan non-overlapping sections of the database in parallel, boosting
* the throughput of a database scan. ShardIndex denotes the shard where
* the scan should be performed on.
* @property {Number} [params.options.shardCount=undefined] Only available in
* scan. Distributed scan is supported by dynamodb, where multiple machines
* can scan non-overlapping sections of the database in parallel, boosting
* the throughput of a database scan. ShardCount controls the number of
* shards in a distributed scan.
* @property {Boolean} [params.options.bypassCache=false] DAX stores `Scan`
* and `Query` request results in its query cache, and the cache is not
* invalidated by DynamoDB updates. Set this to true to skip the DAX cache
* for more up-to-date results.
* @property {Boolean} [params.options.cacheModels=false] Whether to cache
* models already retrieved from the database. When off, getting a model
* with the same key the second time in the same transaction results in an
* error. When on, `get`ting the same key simply returns the cached model.
* Previous modifications done to the model are reflected in the returned
* model.
*/
constructor ({
ModelCls,
writeBatcher,
options = {}
}) {
Scan.__assertValidInputs(options)
options = loadOptionDefaults(options, {
bypassCache: false,
inconsistentRead: options.index !== undefined,
shardCount: undefined,
shardIndex: undefined,
index: undefined,
cacheModels: false
})
super({ ModelCls, writeBatcher, options })
Object.freeze(this)
}
static __assertValidInputs (options) {
if ((options.shardCount === undefined) !==
(options.shardIndex === undefined)) {
throw new InvalidOptionsError('shardIndex & shardCount',
'Sharded scan requires both shardCount and shardIndex')
}
if (options.shardIndex < 0 || options.shardIndex >= options.shardCount) {
throw new InvalidOptionsError('shardIndex',
'ShardIndex must be positive and smaller than shardCount.')
}
}
}
class Query extends __DBIterator {
/**
* Create an iterator instance.
* @param {Object} params
* @param {Class} params.ModelCls The model class.
* @param {__WriteBatcher} params.writeBatcher An instance of __WriteBatcher
* @param {Object} params.options Iterator options
* @param {Boolean} [params.options.allowLazyFilter=false] Filtering on
* non-key fields in query, and any fields in scan is performed after data
* is retrieved from db server node, but before data is returned to client.
* By default, this library forbids these filters by throwing exception as
* they cause slow operations. By allowing lazy filter, the exception is
* suppressed.
* @param {Boolean} [params.options.descending=false] Only available in
* query, controls ordering based on sort key.
* @property {Boolean} [params.options.inconsistentRead=false] When true,
* a strongly consistent read is performed.
* Note: Consistent read(inconsistentRead=false) only available when query
* or scan is performed on Model or LocalSecondaryIndex. To retrieve more
* recent results when scanning GlobalSecondaryIndex, enabling
* `bypassCache` instead.
* @property {Boolean} [params.options.bypassCache=false] DAX stores `Scan`
* and `Query` request results in its query cache, and the cache is not
* invalidated by DynamoDB updates. Set this to true to skip the DAX cache
* for more up-to-date results.
* @property {Boolean} [params.options.cacheModels=false] Whether to cache
* models already retrieved from the database. When off, getting a model
* with the same key the second time in the same transaction results in an
* error. When on, `get`ting the same key simply returns the cached model.
* Previous modifications done to the model are reflected in the returned
* model.
*/
constructor ({
ModelCls,
writeBatcher,
options = {}
}) {
options = loadOptionDefaults(options, {
bypassCache: false,
allowLazyFilter: false,
descending: false,
inconsistentRead: options.index !== undefined,
index: undefined,
cacheModels: false
})
super({ ModelCls, writeBatcher, options })
this.__data = {}
let index = 0
// Use the PK, SK for the index if present
this.__KEY_NAMES = this.constructor.__getKeyNames(ModelCls, this.index)
const { partitionKeys, sortKeys } = this.__KEY_NAMES
// We want to know the PK, SK for the model for lazy filtering
const modelKeys = this.constructor.__getKeyNames(ModelCls)
for (const name of Object.keys(ModelCls.schema.objectSchemas)) {
let keyType
if (partitionKeys.has(name)) {
keyType = 'PARTITION'
} else if (sortKeys.has(name)) {
keyType = 'SORT'
}
let filterName = name
if (this.index && !keyType && modelKeys.allKeys.has(name)) {
// We are trying to filter on a field from the model's key
filterName = ModelCls.__encodeCompoundFieldName([name])
}
const handle = new Filter(this.constructor.METHOD, filterName, `${index}`,
keyType)
this.__data[name] = handle
if (!keyType && !this.allowLazyFilter) {
this[name] = () => {
throw new InvalidFilterError(`May not filter on non-key fields. You
can allow lazy filter to enable filtering non-key fields.`)
}
} else if (!keyType && filterName !== name && !ModelCls.__compoundFields.has(name)) {
this[name] = () => {
throw new InvalidFilterError(`May not filter on ${name}
if INDEX_INCLUDE_KEYS is false`)
}
} else {
const self = this
this[name] = function (operation) {
if (arguments.length === 1) {
handle.filter('==', operation)
} else {
handle.filter(...arguments)
}
return self
}
}
index++
}
Object.freeze(this)
}
__getEncodedVal (key, vals) {
let funcName
const pascalName = key[0].toUpperCase() + key.substring(1)
if (this.index) {
funcName = `__get${pascalName}ForIndex`
return this.__ModelCls[funcName](vals, this.index)
} else {
funcName = `__get${pascalName}`
return this.__ModelCls[funcName](vals)
}
}
__getAWSKeyName (key) {
if (this.index) {
return this.__ModelCls.__getKeyNamesForIndex(this.index)[`_${key}`]
}
return `_${key}`
}
__getKeyConditionExpression () {
const { partitionKeys, sortKeys } = this.__KEY_NAMES
const keys = { id: partitionKeys, sk: sortKeys }
const ret = [[], {}, {}]
for (const keyName of ['id', 'sk']) {
const awsKeyName = this.__getAWSKeyName(keyName)
const keyComponents = {}
let op
for (const key of keys[keyName]) {
const filter = this.__data[key]
if (filter.__value !== undefined) {
keyComponents[key] = filter.__value
op = filter.awsOperator
}
}
if (Object.keys(keyComponents).length === 0) {
continue
}
if (op === 'between') {
const betweenKeyComponents = [{}, {}]
for (const [key, value] of Object.entries(keyComponents)) {
for (let index = 0; index < value.length; index++) {
betweenKeyComponents[index][key] = value[index]
}
}
const [leftKeyComponents, rightKeyComponents] = betweenKeyComponents
const leftValue = this.__getEncodedVal(keyName, leftKeyComponents)
const rightValue = this.__getEncodedVal(keyName, rightKeyComponents)
const condition = `#_${keyName} BETWEEN :l_${keyName} AND :r_${keyName}`
mergeCondition(ret, [
[condition],
{ [`#_${keyName}`]: `${awsKeyName}` },
{
[`:l_${keyName}`]: leftValue,
[`:r_${keyName}`]: rightValue
}
])
} else {
const value = this.__getEncodedVal(keyName, keyComponents)
let condition = `#_${keyName}${op}:_${keyName}`
if (op === 'prefix') {
condition = `begins_with(#_${keyName},:_${keyName})`
}
mergeCondition(ret, [
[condition],
{ [`#_${keyName}`]: `${awsKeyName}` },
{ [`:_${keyName}`]: value }
])
}
}
if (ret[0].length === 0) {
throw new InvalidFilterError('Query must contain partition key filters')
}
return ret
}
__getFilterExpression () {
this.__checkKeyFilters()
const ret = [[], {}, {}] // conditions, attrNames, attrValues
const { partitionKeys, sortKeys } = this.__KEY_NAMES
for (const [name, handle] of Object.entries(this.__data)) {
if (partitionKeys.has(name)) {
continue
}
if (sortKeys.has(name)) {
continue
}
mergeCondition(ret, [
handle.conditions,
handle.attrNames,
handle.attrValues
])
}
return ret
}
__checkKeyFilters () {
const { partitionKeys, sortKeys } = this.__KEY_NAMES
for (const keys of [partitionKeys, sortKeys]) {
const operations = []
for (const key of keys) {
operations.push(this.__data[key].__operation)
}
const areOperationsSame = operations.reduce(
(prev, curr) => prev && curr === operations[0],
true
)
if (!areOperationsSame) {
throw new InvalidFilterError(
`Filter operations on keys ${Array.from(keys)} must be the same`)
}
}
}
__setupParams () {
for (const handle of Object.values(this.__data)) {
handle.lock()
}
return super.__setupParams()
}
}
module.exports = {
Query,
Scan
}