Merge pull request #8871 from LukasMoll/flake8-bindings-python-tests

bindings/python/tests Applied flake8
This commit is contained in:
A.J. Beamon 2022-11-21 09:22:41 -08:00 committed by GitHub
commit 4a6b29deff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 31 additions and 21 deletions

View File

@ -518,7 +518,7 @@ def test_timeouts(db):
for i in range(2):
tr.options.set_timeout(1500)
tr.set_read_version(0x7ffffffffffffff0)
x = tr[b'foo']
_ = tr[b'foo']
try:
tr.commit().wait()
tr.reset()
@ -557,7 +557,7 @@ def test_db_timeouts(db):
tr[b'foo'] = b'bar'
tr.on_error(err).wait() # should not throw
time.sleep(1)
tr[b'foo']
_ = tr[b'foo']
try:
tr.commit().wait() # should throw
raise TestError("(2) Timeout didn't fire.")
@ -574,7 +574,7 @@ def test_db_timeouts(db):
time.sleep(0.75)
tr[b'foo'] = b'bar'
tr.on_error(err).wait() # should not throw
tr[b'foo']
_ = tr[b'foo']
time.sleep(0.75)
try:
tr.commit().wait() # should throw
@ -615,7 +615,7 @@ def test_db_timeouts(db):
tr.reset()
tr[b'foo'] = b'bar'
time.sleep(0.2)
tr.on_error(err).wait() #should not throw
tr.on_error(err).wait() # should not throw
tr[b'foo'] = b'bar'
time.sleep(0.8)
try:

View File

@ -24,15 +24,18 @@ import sys
if __name__ == '__main__':
fdb.api_version(720)
@fdb.transactional
def setValue(tr, key, value):
tr[key] = value
@fdb.transactional
def setValueWithLimit(tr, key, value, limit):
tr.options.set_size_limit(limit)
tr[key] = value
def test_size_limit_option(db):
value = b'a' * 1024
@ -69,6 +72,7 @@ def test_size_limit_option(db):
# Reset the size limit for future tests
db.options.set_transaction_size_limit(10000000)
@fdb.transactional
def test_get_approximate_size(tr):
tr[b'key1'] = b'value1'
@ -90,6 +94,7 @@ def test_get_approximate_size(tr):
s5 = tr.get_approximate_size().wait()
assert(s4 < s5)
# Expect a cluster file as input. This test will write to the FDB cluster, so
# be aware of potential side effects.
if __name__ == '__main__':

View File

@ -27,24 +27,26 @@ from fdb.tuple import pack
if __name__ == '__main__':
fdb.api_version(720)
def cleanup_tenant(db, tenant_name):
try:
tenant = db.open_tenant(tenant_name)
del tenant[:]
fdb.tenant_management.delete_tenant(db, tenant_name)
except fdb.FDBError as e:
if e.code == 2131: # tenant not found
if e.code == 2131: # tenant not found
pass
else:
raise
def test_tenant_tuple_name(db):
tuplename=(b'test', b'level', b'hierarchy', 3, 1.24, 'str')
tuplename = (b'test', b'level', b'hierarchy', 3, 1.24, 'str')
cleanup_tenant(db, tuplename)
fdb.tenant_management.create_tenant(db, tuplename)
tenant=db.open_tenant(tuplename)
tenant = db.open_tenant(tuplename)
tenant[b'foo'] = b'bar'
assert tenant[b'foo'] == b'bar'
@ -100,7 +102,7 @@ def test_tenant_operations(db):
del tr1[:]
tr1.commit().wait()
except fdb.FDBError as e:
tr.on_error(e).wait()
tr1.on_error(e).wait()
assert tenant1[b'tenant_test_key'] == None
assert db[prefix1 + b'tenant_test_key'] == None
@ -113,7 +115,7 @@ def test_tenant_operations(db):
tenant1[b'tenant_test_key']
assert False
except fdb.FDBError as e:
assert e.code == 2131 # tenant not found
assert e.code == 2131 # tenant not found
del tenant2[:]
fdb.tenant_management.delete_tenant(db, b'tenant2')
@ -126,6 +128,7 @@ def test_tenant_operations(db):
assert db[b'tenant_test_key'] == None
def test_tenant_operation_retries(db):
cleanup_tenant(db, b'tenant1')
cleanup_tenant(db, b'tenant2')
@ -138,7 +141,7 @@ def test_tenant_operation_retries(db):
fdb.tenant_management.create_tenant(db, b'tenant1')
assert False
except fdb.FDBError as e:
assert e.code == 2132 # tenant already exists
assert e.code == 2132 # tenant already exists
# Using a transaction skips the existence check
tr = db.create_transaction()
@ -166,7 +169,7 @@ def test_tenant_operation_retries(db):
fdb.tenant_management.delete_tenant(db, b'tenant1')
assert False
except fdb.FDBError as e:
assert e.code == 2131 # tenant not found
assert e.code == 2131 # tenant not found
# Using a transaction skips the existence check
tr = db.create_transaction()
@ -186,11 +189,13 @@ def test_tenant_operation_retries(db):
except fdb.FDBError as e:
tr.on_error(e).wait()
def test_tenants(db):
test_tenant_tuple_name(db)
test_tenant_operations(db)
test_tenant_operation_retries(db)
# Expect a cluster file as input. This test will write to the FDB cluster, so
# be aware of potential side effects.
if __name__ == '__main__':

View File

@ -26,7 +26,6 @@ import sys
import os
import struct
import threading
import time
import random
import time
import traceback
@ -136,7 +135,7 @@ def test_fdb_transactional_generator(db):
def function_that_yields(tr):
yield 0
assert fdb.get_api_version() < 630, "Pre-6.3, a decorator may wrap a function that yields"
except ValueError as e:
except ValueError:
assert fdb.get_api_version() >= 630, "Post-6.3, a decorator should throw if wrapped function yields"
@ -144,12 +143,13 @@ def test_fdb_transactional_returns_generator(db):
try:
def function_that_yields(tr):
yield 0
@fdb.transactional
def function_that_returns(tr):
return function_that_yields(tr)
function_that_returns()
assert fdb.get_api_version() < 630, "Pre-6.3, returning a generator is allowed"
except ValueError as e:
except ValueError:
assert fdb.get_api_version() >= 630, "Post-6.3, returning a generator should throw"
@ -400,11 +400,11 @@ class Tester:
inst.push(f)
elif inst.op == six.u("GET_ESTIMATED_RANGE_SIZE"):
begin, end = inst.pop(2)
estimatedSize = obj.get_estimated_range_size_bytes(begin, end).wait()
obj.get_estimated_range_size_bytes(begin, end).wait()
inst.push(b"GOT_ESTIMATED_RANGE_SIZE")
elif inst.op == six.u("GET_RANGE_SPLIT_POINTS"):
begin, end, chunkSize = inst.pop(3)
estimatedSize = obj.get_range_split_points(begin, end, chunkSize).wait()
obj.get_range_split_points(begin, end, chunkSize).wait()
inst.push(b"GOT_RANGE_SPLIT_POINTS")
elif inst.op == six.u("GET_KEY"):
key, or_equal, offset, prefix = inst.pop(4)
@ -522,7 +522,7 @@ class Tester:
self.last_version = inst.tr.get_committed_version()
inst.push(b"GOT_COMMITTED_VERSION")
elif inst.op == six.u("GET_APPROXIMATE_SIZE"):
approximate_size = inst.tr.get_approximate_size().wait()
inst.tr.get_approximate_size().wait()
inst.push(b"GOT_APPROXIMATE_SIZE")
elif inst.op == six.u("GET_VERSIONSTAMP"):
inst.push(inst.tr.get_versionstamp())
@ -613,9 +613,9 @@ class Tester:
result += [tenant.key]
try:
metadata = json.loads(tenant.value)
id = metadata["id"]
prefix = metadata["prefix"]
except (json.decoder.JSONDecodeError, KeyError) as e:
_ = metadata["id"]
_ = metadata["prefix"]
except (json.decoder.JSONDecodeError, KeyError):
assert False, "Invalid Tenant Metadata"
inst.push(fdb.tuple.pack(tuple(result)))
elif inst.op == six.u("UNIT_TESTS"):

View File

@ -173,7 +173,7 @@ def tupleTest(N=10000):
print("Prefix not before prefixed:\n Tuple: %s\n Bytes: %s\n Other: %s\n Bytes: %s" % (t, repr(pack(t)), t2, repr(pack(t2))))
return False
print ("Tuple check %d OK" % N)
print("Tuple check %d OK" % N)
return True
# test: