foundationdb/layers/compressedColumn/compressedColumn.py

407 lines
15 KiB
Python

#
# compressedColumn.py
#
# This source file is part of the FoundationDB open source project
#
# Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import fdb
import fdb.tuple
import struct
_unpackedPrefix = 'unpacked'
_packedPrefix = 'packed'
fdb.api_version(16)
class _MergedData:
def __init__(self):
self.results = []
self.finishedPack = False
self.finishedUnpack = False
self.packedIndex = 0
pass
class Column:
def __init__(self, columnName):
self.columnName = columnName
self.packFetchCount = 10
self.targetChunkSize = 5000
self.maxChunkSize = 10000
# self.mergeChunkSize = 2500
def _getSubKeyTuple(self, key):
return fdb.tuple.unpack(key)[2:]
def _getSubKey(self, key):
return fdb.tuple.unpack(key)[2]
def _isPackedKey(self, key):
return str(key).startswith(fdb.tuple.pack((self.columnName, _packedPrefix)))
# This results in slight inefficiencies when the key being searched for comes before the first packed segment with strictRange=False
def _getPackedData(self, key, packedRange, requireKey=True, strictRange=True):
found = False
keyRange = None
packedKeyRange = None
packedData = None
for k, v in packedRange:
# print 'Searching ' + k + ' for ' + key
if self._isPackedKey(k):
if found:
endRange = self._getSubKeyTuple(k)
keyRange = (keyRange[0], endRange[0])
break
keyRange = self._getSubKeyTuple(k)
packedKeyRange = keyRange
# print str(keyRange)
if (not requireKey or key >= keyRange[0]) and key <= keyRange[1]:
if strictRange:
packedData = _PackedData(v)
break
else:
found = True
keyRange = (keyRange[0], keyRange[1] + chr(0))
packedData = _PackedData(v)
elif found:
keyRange = (keyRange[0], '\xff')
if strictRange:
return [keyRange, packedData]
else:
return [packedKeyRange, keyRange, packedData]
def _getPackedRange(self, tr, key):
return tr.get_range(fdb.KeySelector.last_less_than(fdb.tuple.pack((self.columnName, _packedPrefix, key + chr(0)))),
fdb.tuple.pack((self.columnName, _packedPrefix + chr(0))), 2)
def _getUnpackedData(self, tr, key):
return tr[fdb.tuple.pack((self.columnName, _unpackedPrefix, key))]
def _getUnpackedRange(self, tr, keyBegin, keyEnd, limit):
return tr.get_range(fdb.tuple.pack((self.columnName, _unpackedPrefix, keyBegin)),
fdb.tuple.pack((self.columnName, _unpackedPrefix, keyEnd)), limit)
def _mergeResults(self, packed, unpacked, totalUnpacked, packedIndex=0, minPackedKey='', maxKey=None):
data = _MergedData()
if packed is None:
# print 'No merge necessary'
data.finishedUnpack = True
data.finishedPack = True
data.packedIndex = 0
if maxKey is None:
data.results = [fdb.KeyValue(self._getSubKey(k), v) for k, v in unpacked]
else:
for k, v in unpacked:
if k < maxKey:
data.results.append(fdb.KeyValue(self._getSubKey(k), v))
else:
data.finishedUnpack = False
break
else:
# print 'Merging packed'
unpackedCount = 0
for k, v in unpacked:
subKey = self._getSubKey(k)
# print 'Unpacked: ' + subKey
if maxKey is not None and subKey >= maxKey:
# print 'subKey >= maxKey %s, %s' % (subKey, maxKey)
break
exactMatch = False
while packedIndex < len(packed.rows) \
and packed.rows[packedIndex].key <= subKey \
and (maxKey is None or packed.rows[packedIndex].key < maxKey):
exactMatch = packed.rows[packedIndex].key == subKey
if packed.rows[packedIndex].key < subKey and packed.rows[packedIndex].key >= minPackedKey:
data.results.append(packed.rows[packedIndex])
packedIndex += 1
if maxKey is None and packedIndex == len(packed.rows):
if exactMatch:
data.results.append(fdb.KeyValue(self._getSubKey(k), v))
# print 'packedIndex == len(packed.rows)'
break
data.results.append(fdb.KeyValue(self._getSubKey(k), v))
unpackedCount += 1
# print "Packed index: %d, Unpacked: %d, total: %d" % (packedIndex, unpackedCount, totalUnpacked)
if unpackedCount < totalUnpacked:
while packedIndex < len(packed.rows) and (maxKey is None or packed.rows[packedIndex].key < maxKey):
if packed.rows[packedIndex].key >= minPackedKey:
data.results.append(packed.rows[packedIndex])
packedIndex += 1
data.finishedPack = packedIndex == len(packed.rows)
data.finishedUnpack = unpackedCount == totalUnpacked
data.packedIndex = packedIndex
# print str(data.results)
# print 'Num Results: %d' % len(data.results)
return data
@fdb.transactional
def setRow(self, tr, rowName, value):
tr[fdb.tuple.pack((self.columnName, _unpackedPrefix, rowName))] = value
@fdb.transactional
def getRow(self, tr, rowName):
unpacked = self._getUnpackedData(tr, rowName)
packedRange = self._getPackedRange(tr, rowName)
if unpacked.present():
return unpacked
else:
packedData = self._getPackedData(rowName, packedRange)[1]
if packedData is None:
return None
return packedData.getRow(rowName)
return None
@fdb.transactional
def delete(self, tr):
tr.clear_range_startswith(self.columnName)
def getColumnStream(self, db, startRow=''):
return _ColumnStream(db, self, startRow)
# This function is not fully transactional. Each compressed block will be created in a transaction
def pack(self, db, startRow='', endRow='\xff'):
currentRow = startRow
numFetched = self.packFetchCount
while numFetched == self.packFetchCount:
# print 'outer: \'' + repr(currentRow) + '\''
try:
tr = db.create_transaction()
packedIndex = 0
packedData = None
lastRow = currentRow
newPack = _PackedData()
oldRows = []
while True:
# print 'inner: \'' + repr(currentRow) + '\''
unpacked = list(self._getUnpackedRange(tr, lastRow, endRow, self.packFetchCount))
unpackedCount = len(unpacked)
if len(unpacked) == 0:
break
if packedData is None:
subKey = self._getSubKey(unpacked[0].key)
packedRange = self._getPackedRange(tr, subKey)
[packedKeyRange, keyRange, packedData] = self._getPackedData(subKey, packedRange, False, False)
if packedKeyRange is not None:
# print 'Deleting old rows'
oldRows.append(fdb.tuple.pack((self.columnName, _packedPrefix, packedKeyRange[0], packedKeyRange[1])))
maxKey = None
if keyRange is not None:
maxKey = keyRange[1]
merged = self._mergeResults(packedData, unpacked, self.packFetchCount, packedIndex, lastRow, maxKey)
for row in merged.results:
oldRows.append(fdb.tuple.pack((self.columnName, _unpackedPrefix, row.key)))
newPack.addRow(row)
lastRow = row.key
# print 'Set lastRow = \'' + repr(lastRow) + '\''
lastRow = lastRow + chr(0)
if (maxKey is not None and merged.finishedPack) or (maxKey is None and newPack.bytes > self.targetChunkSize):
break
# print 'Deleting rows'
for row in oldRows:
# print 'Deleting row ' + repr(row)
del tr[row]
for k, v in newPack.getPackedKeyValues(self, self.targetChunkSize, self.maxChunkSize):
tr[k] = v
tr.commit().wait()
currentRow = lastRow
numFetched = unpackedCount
except fdb.FDBError as e:
if e.code == 1007: # transaction_too_old
pass
# FIXME: Unpack the overlapping packed block and try again
tr.on_error(e.code).wait()
class _ColumnStream:
def __init__(self, db, column, startKey):
self.column = column
self.db = db
self.currentKey = startKey
self.results = []
self.resultsIndex = 0
self.firstRead = True
self.fetchCount = 5
self.packedData = None
self.packedIndex = 0
def __iter__(self):
return self
def next(self):
value = self._readNextRow(self.db)
if value is None:
raise StopIteration
else:
return value
def _readNextRow(self, db):
# print 'Reading next row'
if self.resultsIndex >= len(self.results):
# print 'Fetching rows'
self._fetchRows(db)
if self.resultsIndex >= len(self.results):
# print 'Finished iterating: (%d/%d)' % (self.resultsIndex, len(self.results))
return None
else:
self.currentKey = self.results[self.resultsIndex].key
value = self.results[self.resultsIndex].value
self.resultsIndex += 1
# print 'Returning value (%s, %s)' % (self.currentKey, value)
return (self.currentKey, value)
@fdb.transactional
def _fetchRows(self, tr):
if self.firstRead:
# print 'First fetch'
startKey = self.currentKey
else:
# print 'Subsequent fetch %s' % self.currentKey
startKey = self.currentKey + chr(0)
# print 'Using start key %s' % startKey
# Read next packed and unpacked entries
# FIXME: Should we read unpacked after getting the result of the packed data? If we do, then we can more accurately limit the number
# of results that we get back
unpacked = self.column._getUnpackedRange(tr, startKey, '\xff', self.fetchCount)
if self.packedData is None:
packedRange = self.column._getPackedRange(tr, startKey)
[keyRange, self.packedData] = self.column._getPackedData(startKey, packedRange, False)
merged = self.column._mergeResults(self.packedData, unpacked, self.fetchCount, self.packedIndex, startKey)
if merged.finishedPack:
# print 'reset packed'
self.packedData = None
self.packedIndex = 0
else:
# print 'more packed %d' % merged.packedIndex
self.packedIndex = merged.packedIndex
self.results = merged.results
# print 'Getting range %s - %s (%d)' % (startKey, fdb.tuple.pack((self.column.columnName + chr(0))), self.fetchCount)
self.resultsIndex = 0
self.firstRead = False
class _PackedData:
def __init__(self, packedValue=None):
self.rows = []
self.bytes = 0
if packedValue is not None:
self._unpack(packedValue)
def addRow(self, row):
# print 'adding row %s' % row.key
self.rows.append(row)
self.bytes += len(row.key) + len(row.value) + 12
def getRow(self, rowName):
for row in self.rows:
if row.key == rowName:
return row.value
return None
def getPackedKeyValues(self, column, targetSize, maxSize):
rowIndex = 0
currentByte = 0
results = []
while currentByte < self.bytes:
headerItems = []
size = targetSize
if self.bytes - currentByte < maxSize:
size = maxSize
startRowIndex = rowIndex
startKey = None
endKey = None
bodyLength = 0
packBytes = 0
while rowIndex < len(self.rows) and packBytes < size:
row = self.rows[rowIndex]
headerItems.append(struct.pack('iii', len(row.key), len(row.value), bodyLength) + row.key)
bodyLength += len(row.value)
packBytes += len(row.key) + len(row.value) + 12
rowIndex += 1
if startKey is None:
startKey = row.key
endKey = row.key
header = ''.join(headerItems)
body = ''.join(row.value for row in self.rows[startRowIndex:rowIndex])
results.append(fdb.KeyValue(fdb.tuple.pack((column.columnName, _packedPrefix, startKey, endKey)),
struct.pack('i', len(header)) + header + body))
currentByte += packBytes
return results
def _unpack(self, str):
self.bytes = len(str)
headerLength = struct.unpack('i', str[0:4])[0]
header = str[4:4 + headerLength]
body = str[4 + headerLength:]
index = 0
while index < headerLength:
# print 'header length: %d, %d' % (len(self.header), index)
(keyLength, valueLength, valueOffset) = struct.unpack('iii', header[index:index + 12])
key = header[index + 12:index + 12 + keyLength]
index = index + 12 + keyLength
value = body[valueOffset:valueOffset + valueLength]
self.rows.append(fdb.KeyValue(key, value))