Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make FFI callbacks thread safe #12823

Draft
wants to merge 21 commits into
base: master
Choose a base branch
from
Draft
Changes from 8 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 95 additions & 5 deletions ext/ffi/ffi.c
Original file line number Diff line number Diff line change
@@ -935,9 +935,24 @@ static void zend_ffi_callback_hash_dtor(zval *zv) /* {{{ */
}
/* }}} */

static void zend_ffi_callback_trampoline(ffi_cif* cif, void* ret, void** args, void* data) /* {{{ */
{
zend_ffi_callback_data *callback_data = (zend_ffi_callback_data*)data;
static void (*orig_interrupt_function)(zend_execute_data *execute_data);

static void zend_ffi_dispatch_callback_end(void){ /* {{{ */
zend_atomic_bool_store_ex(&FFI_G(callback_in_progress), false);
pthread_cond_broadcast(&FFI_G(vm_ack));
pthread_mutex_unlock(&FFI_G(vm_lock));
}
/* }}} */

static void zend_ffi_dispatch_callback(void){ /* {{{ */
// this function must always run on the main thread
assert(pthread_self() == FFI_G(main_tid));

if (!zend_atomic_bool_load_ex(&FFI_G(callback_in_progress))) {
return;
}

zend_ffi_callback_data *callback_data = FFI_G(callback_data).data;
zend_fcall_info fci;
zend_ffi_type *ret_type;
zval retval;
@@ -951,13 +966,14 @@ static void zend_ffi_callback_trampoline(ffi_cif* cif, void* ret, void** args, v
fci.param_count = callback_data->arg_count;
fci.named_params = NULL;


if (callback_data->type->func.args) {
int n = 0;
zend_ffi_type *arg_type;

ZEND_HASH_PACKED_FOREACH_PTR(callback_data->type->func.args, arg_type) {
arg_type = ZEND_FFI_TYPE(arg_type);
zend_ffi_cdata_to_zval(NULL, args[n], arg_type, BP_VAR_R, &fci.params[n], (zend_ffi_flags)(arg_type->attr & ZEND_FFI_ATTR_CONST), 0, 0);
zend_ffi_cdata_to_zval(NULL, FFI_G(callback_data).args[n], arg_type, BP_VAR_R, &fci.params[n], (zend_ffi_flags)(arg_type->attr & ZEND_FFI_ATTR_CONST), 0, 0);
n++;
} ZEND_HASH_FOREACH_END();
}
@@ -977,15 +993,77 @@ static void zend_ffi_callback_trampoline(ffi_cif* cif, void* ret, void** args, v
free_alloca(fci.params, use_heap);

if (EG(exception)) {
zend_ffi_dispatch_callback_end();
zend_error_noreturn(E_ERROR, "Throwing from FFI callbacks is not allowed");
}

ret_type = ZEND_FFI_TYPE(callback_data->type->func.ret_type);
if (ret_type->kind != ZEND_FFI_TYPE_VOID) {
zend_ffi_zval_to_cdata(ret, ret_type, &retval);
zend_ffi_zval_to_cdata(FFI_G(callback_data).ret, ret_type, &retval);
}

zval_ptr_dtor(&retval);
zend_ffi_dispatch_callback_end();
}
/* }}} */

static void zend_ffi_interrupt_function(zend_execute_data *execute_data){ /* {{{ */
zend_ffi_dispatch_callback();

if (orig_interrupt_function) {
orig_interrupt_function(execute_data);
}
}
/* }}} */

static void zend_ffi_wait_request_barrier(bool release){ /* {{{ */
// get lock, first
pthread_mutex_lock(&FFI_G(vm_lock));

while(zend_atomic_bool_load_ex(&FFI_G(callback_in_progress))){
// we acquired the lock before the request could be serviced
// unlock it and wait for the flag
pthread_cond_wait(&FFI_G(vm_ack), &FFI_G(vm_lock));
}

if(release){
pthread_mutex_unlock(&FFI_G(vm_lock));
}
}
/* }}} */

static void zend_ffi_callback_trampoline(ffi_cif* cif, void* ret, void** args, void* data) /* {{{ */
{
// wait for a previously initiated request to complete
zend_ffi_wait_request_barrier(false);
Comment on lines +1031 to +1034
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may a regular in-main-thread callback, that doesn't require any locks.
You should check FFI_G(callback_tid) == FFI_G(main_tid) first.

Ideally, we should make distinct between regular and "thread" callback

$libc->pthread_create(
		FFI::addr($tid), NULL,
		FFI::thread_callback($thread_func), FFI::addr($arg)
	);


// mutex is now locked, and request is not pending.
// start a new one
zend_atomic_bool_store_ex(&FFI_G(callback_in_progress), true);

zend_ffi_call_data call_data = {
.cif = cif,
.ret = ret,
.args = args,
.data = (zend_ffi_callback_data *)data
};
FFI_G(callback_data) = call_data;

bool is_main_thread = pthread_self() == FFI_G(main_tid);

if(is_main_thread){
// dispatch the callback directly
zend_ffi_dispatch_callback();
} else {
// post interrupt request to acquire the main thread
zend_atomic_bool_store_ex(&EG(vm_interrupt), true);
pthread_mutex_unlock(&FFI_G(vm_lock));
}

if(!is_main_thread){
// wait for the request to complete before returning
zend_ffi_wait_request_barrier(true);
}
}
/* }}} */

@@ -5518,13 +5596,24 @@ ZEND_MINIT_FUNCTION(ffi)
return zend_ffi_preload(FFI_G(preload));
}

FFI_G(main_tid) = pthread_self();

zend_atomic_bool_store_ex(&FFI_G(callback_in_progress), false);
orig_interrupt_function = zend_interrupt_function;
zend_interrupt_function = zend_ffi_interrupt_function;

pthread_mutex_init(&FFI_G(vm_lock), NULL);
pthread_cond_init(&FFI_G(vm_ack), NULL);

return SUCCESS;
}
/* }}} */

/* {{{ ZEND_RSHUTDOWN_FUNCTION */
ZEND_RSHUTDOWN_FUNCTION(ffi)
{
zend_ffi_wait_request_barrier(true);

if (FFI_G(callbacks)) {
zend_hash_destroy(FFI_G(callbacks));
efree(FFI_G(callbacks));
@@ -5636,6 +5725,7 @@ static ZEND_GINIT_FUNCTION(ffi)
/* {{{ ZEND_GINIT_FUNCTION */
static ZEND_GSHUTDOWN_FUNCTION(ffi)
{
zend_ffi_wait_request_barrier(true);
if (ffi_globals->scopes) {
zend_hash_destroy(ffi_globals->scopes);
free(ffi_globals->scopes);
18 changes: 18 additions & 0 deletions ext/ffi/php_ffi.h
Original file line number Diff line number Diff line change
@@ -17,6 +17,9 @@
#ifndef PHP_FFI_H
#define PHP_FFI_H

#include <ffi.h>
#include <pthread.h>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Windows build is broken. There are no <pthread.h> there.


extern zend_module_entry ffi_module_entry;
#define phpext_ffi_ptr &ffi_module_entry

@@ -27,6 +30,15 @@ typedef enum _zend_ffi_api_restriction {
} zend_ffi_api_restriction;

typedef struct _zend_ffi_type zend_ffi_type;
typedef struct _zend_ffi_callback_data zend_ffi_callback_data;


typedef struct _zend_ffi_call_data {
ffi_cif* cif;
void* ret;
void** args;
zend_ffi_callback_data* data;
} zend_ffi_call_data;

ZEND_BEGIN_MODULE_GLOBALS(ffi)
zend_ffi_api_restriction restriction;
@@ -35,6 +47,12 @@ ZEND_BEGIN_MODULE_GLOBALS(ffi)
/* predefined ffi_types */
HashTable types;

zend_atomic_bool callback_in_progress;
pthread_mutex_t vm_lock;
pthread_cond_t vm_ack;
pthread_t main_tid;
zend_ffi_call_data callback_data;

/* preloading */
char *preload;
HashTable *scopes; /* list of preloaded scopes */
59 changes: 59 additions & 0 deletions ext/ffi/tests/callback_threads.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
--TEST--
FFI Thread safe callbacks
--EXTENSIONS--
ffi
--SKIPIF--
<?php
try {
$libc = FFI::cdef('
int pthread_create(void *restrict thread,
const void *restrict attr,
void *(*start_routine)(void *),
void *arg);
', 'libc.so.6');
} catch(Throwable $_){
die('skip libc.so.6 not available');
}
?>
--INI--
ffi.enable=1
--FILE--
<?php
$libc = FFI::cdef('
typedef uint64_t pthread_t;
int pthread_create(pthread_t *thread,
const void* attr,
void *(*start_routine)(void *),
void *arg);
int pthread_detach(pthread_t thread);
', 'libc.so.6');

$tid = $libc->new($libc->type('pthread_t'));
$accum = 0;
$thread_func = function($arg) use($libc, &$accum){
//$v = $libc->cast('int *', $arg)[0];
FFI::free($arg);
usleep(10 * 1000);
$accum++;
//print(".");
};

for($i=0; $i<100; $i++){
$arg = $libc->new('int', false);
$arg->cdata = $i;

$libc->pthread_create(
FFI::addr($tid), NULL,
$thread_func, FFI::addr($arg)
);
$libc->pthread_detach($tid->cdata);
}

while($accum != 100){
//print("w");
usleep(1000);
}
print($accum);
?>
--EXPECT--
100