diff options
Diffstat (limited to 'testing/cffi1/test_function_args.py')
-rw-r--r-- | testing/cffi1/test_function_args.py | 208 |
1 files changed, 208 insertions, 0 deletions
diff --git a/testing/cffi1/test_function_args.py b/testing/cffi1/test_function_args.py new file mode 100644 index 0000000..30c6fed --- /dev/null +++ b/testing/cffi1/test_function_args.py @@ -0,0 +1,208 @@ +import pytest, sys +try: + # comment out the following line to run this test. + # the latest on x86-64 linux: https://github.com/libffi/libffi/issues/574 + if sys.platform != 'win32': + raise ImportError("this test is skipped because it keeps finding " + "failures in libffi, instead of cffi") + + from hypothesis import given, settings, example + from hypothesis import strategies as st +except ImportError as e: + e1 = e + def test_types(): + pytest.skip(str(e1)) +else: + + from cffi import FFI + import sys, random + from .test_recompiler import verify + + ALL_PRIMITIVES = [ + 'unsigned char', + 'short', + 'int', + 'long', + 'long long', + 'float', + 'double', + #'long double', --- on x86 it can give libffi crashes + ] + def _make_struct(s): + return st.lists(s, min_size=1) + types = st.one_of(st.sampled_from(ALL_PRIMITIVES), + st.lists(st.sampled_from(ALL_PRIMITIVES), min_size=1)) + # NB. 'types' could be st.recursive instead, but it doesn't + # really seem useful + + def draw_primitive(ffi, typename): + value = random.random() * 2**40 + if typename != 'long double': + return ffi.cast(typename, value) + else: + return value + + TEST_RUN_COUNTER = 0 + + + @given(st.lists(types), types) + @settings(max_examples=100, deadline=5000) # 5000ms + def test_types(tp_args, tp_result): + global TEST_RUN_COUNTER + print(tp_args, tp_result) + cdefs = [] + structs = {} + + def build_type(tp): + if type(tp) is list: + field_types = [build_type(tp1) for tp1 in tp] + fields = ['%s f%d;' % (ftp, j) + for (j, ftp) in enumerate(field_types)] + fields = '\n '.join(fields) + name = 's%d' % len(cdefs) + cdefs.append("typedef struct {\n %s\n} %s;" % (fields, name)) + structs[name] = field_types + return name + else: + return tp + + args = [build_type(tp) for tp in tp_args] + result = build_type(tp_result) + + TEST_RUN_COUNTER += 1 + signature = "%s testfargs(%s)" % (result, + ', '.join(['%s a%d' % (arg, i) for (i, arg) in enumerate(args)]) + or 'void') + + source = list(cdefs) + + cdefs.append("%s;" % signature) + cdefs.append("extern %s testfargs_result;" % result) + for i, arg in enumerate(args): + cdefs.append("extern %s testfargs_arg%d;" % (arg, i)) + source.append("%s testfargs_result;" % result) + for i, arg in enumerate(args): + source.append("%s testfargs_arg%d;" % (arg, i)) + source.append(signature) + source.append("{") + for i, arg in enumerate(args): + source.append(" testfargs_arg%d = a%d;" % (i, i)) + source.append(" return testfargs_result;") + source.append("}") + + typedef_line = "typedef %s;" % (signature.replace('testfargs', + '(*mycallback_t)'),) + assert signature.endswith(')') + sig_callback = "%s testfcallback(mycallback_t callback)" % result + cdefs.append(typedef_line) + cdefs.append("%s;" % sig_callback) + source.append(typedef_line) + source.append(sig_callback) + source.append("{") + source.append(" return callback(%s);" % + ', '.join(["testfargs_arg%d" % i for i in range(len(args))])) + source.append("}") + + ffi = FFI() + ffi.cdef("\n".join(cdefs)) + lib = verify(ffi, 'test_function_args_%d' % TEST_RUN_COUNTER, + "\n".join(source), no_cpp=True) + + # when getting segfaults, enable this: + if False: + from testing.udir import udir + import subprocess + f = open(str(udir.join('run1.py')), 'w') + f.write('import sys; sys.path = %r\n' % (sys.path,)) + f.write('from _CFFI_test_function_args_%d import ffi, lib\n' % + TEST_RUN_COUNTER) + for i in range(len(args)): + f.write('a%d = ffi.new("%s *")\n' % (i, args[i])) + aliststr = ', '.join(['a%d[0]' % i for i in range(len(args))]) + f.write('lib.testfargs(%s)\n' % aliststr) + f.write('ffi.addressof(lib, "testfargs")(%s)\n' % aliststr) + f.close() + print("checking for segfault for direct call...") + rc = subprocess.call([sys.executable, 'run1.py'], cwd=str(udir)) + assert rc == 0, rc + + def make_arg(tp): + if tp in structs: + return [make_arg(tp1) for tp1 in structs[tp]] + else: + return draw_primitive(ffi, tp) + + passed_args = [make_arg(arg) for arg in args] + returned_value = make_arg(result) + + def write(p, v): + if type(v) is list: + for i, v1 in enumerate(v): + write(ffi.addressof(p, 'f%d' % i), v1) + else: + p[0] = v + + write(ffi.addressof(lib, 'testfargs_result'), returned_value) + + ## CALL forcing libffi + print("CALL forcing libffi") + received_return = ffi.addressof(lib, 'testfargs')(*passed_args) + ## + + _tp_long_double = ffi.typeof("long double") + def check(p, v): + if type(v) is list: + for i, v1 in enumerate(v): + check(ffi.addressof(p, 'f%d' % i), v1) + else: + if ffi.typeof(p).item is _tp_long_double: + assert ffi.cast("double", p[0]) == v + else: + assert p[0] == v + + for i, arg in enumerate(passed_args): + check(ffi.addressof(lib, 'testfargs_arg%d' % i), arg) + ret = ffi.new(result + "*", received_return) + check(ret, returned_value) + + ## CALLBACK + def expand(value): + if isinstance(value, ffi.CData): + t = ffi.typeof(value) + if t is _tp_long_double: + return float(ffi.cast("double", value)) + return [expand(getattr(value, 'f%d' % i)) + for i in range(len(t.fields))] + else: + return value + + # when getting segfaults, enable this: + if False: + from testing.udir import udir + import subprocess + f = open(str(udir.join('run1.py')), 'w') + f.write('import sys; sys.path = %r\n' % (sys.path,)) + f.write('from _CFFI_test_function_args_%d import ffi, lib\n' % + TEST_RUN_COUNTER) + f.write('def callback(*args): return ffi.new("%s *")[0]\n' % result) + f.write('fptr = ffi.callback("%s(%s)", callback)\n' % (result, + ','.join(args))) + f.write('print(lib.testfcallback(fptr))\n') + f.close() + print("checking for segfault for callback...") + rc = subprocess.call([sys.executable, 'run1.py'], cwd=str(udir)) + assert rc == 0, rc + + seen_args = [] + def callback(*args): + seen_args.append([expand(arg) for arg in args]) + return returned_value + + fptr = ffi.callback("%s(%s)" % (result, ','.join(args)), callback) + print("CALL with callback") + received_return = lib.testfcallback(fptr) + + assert len(seen_args) == 1 + assert passed_args == seen_args[0] + ret = ffi.new(result + "*", received_return) + check(ret, returned_value) |