Threads:
The main rule here is that only the main R thread can call a function
from the R API.
The civetweb callbacks run on the civetweb threads, so they cannot
call any R API.
Currently we only use the begin_request()
callback, and this is
called from the request threads.
So we need to synchronize between the request threads and the main R
thread.
We essentially have a producer-consumer problem, with a single consumer, the main R thread, and multiple producers, the request threads. Having a single consumer means that the queue to store the jobs is of length one. This is a good guide on how to solve such a problem: https://docs.oracle.com/cd/E36784_01/html/E36868/sync-31.html
We need two conditions, to signal that 1) there is something to
work on, and that 2) new work may come in.
We also need a mutex to be able to wait on these conditions.
These are stored in the user_data
of the civetweb server instance:
struct server_user_data { ... pthread_cond_t process_more; /* there is something to process */ pthread_cond_t process_less; /* we can process something */ pthread_mutex_t process_lock; struct mg_connection *nextconn; ... };
nextconn
is the queue, it is used to pass the request from a
request thread to the main R thread.
When a request thread comes in, it has to make sure that nextconn
is NULL
, so waits on process_less
.
Once given the green light, it sets nextconn
to the civetweb
connection object, and then wait on its own finish_cond
condition,
which is stored in the connection specific user data:
struct connection_user_data { pthread_cond_t finish_cond; /* can finish callback? */ pthread_mutex_t finish_lock; int main_todo; /* what should the main thread do? */ int req_todo; /* what shoudl the request thread do? */ double secs; /* how much should we wait? */ SEXP req; ... };
The main R thread can use the user_data
of next_conn
to access
all information about the connection and the request.
Once the main R thread is done with processing the request, it sets
the connection's req_todo
field to non-zero, and signals the
connection's finish_cond
condition to allow the request thread to
continue.
Then it also signals the process_less
condition of the server,
to let other request threads in.
Currently the main R thread can set req_todo
to two different
values.
WEBFAKES_DONE
means that the request was processed, and the request
thread can quit. Most requests are like this.WEBFAKES_WAIT
means that the request thread still needs to stay
around and sleep for the specified number of secs
.
After sleeping for the specified amount of time, the request thread
will signal process_more
again, notifying the main R thread, but
it also sets main_todo
to WEBFAKES_WAIT
, so the main R thread
knows that this is not a new request. The main R thread can just
take the stored request from the req
field of the connection
user data in this case.While the server is running, all errors must be handled and the server must keep running.
They are caught and re-thrown, with the civetweb error log added. The error log typically contains more information. E.g. the most common failure is that the specified port is not free and the error log has a meaningful error message in this case.
Errors that happen in the R request handler functions are caught and the server will send an HTTP 500 response, with the R error message:
while (TRUE) { req <- server_poll(srv) tryCatch( self$.process_request(req), error = function(err) { cat(as.character(err), file = stderr()) response_send_error(req, as.character(err), 500L) } ) }
For a response that is sent out in multiple pieces, this is not possible if the status code and the headers have been sent out already. In this case we just send out the R error message.
Errors that happen in the C code while processing the request or
the response are different, because most probably we cannot send
anything meaningful to the client.
E.g. the most frequent such error happens when the connection breaks
or the client closes the connection.
These errors are caught in the server_poll()
and response_*()
R functions, and printed to the screen (see server.R
).
If they originate from civetweb, then they are also logged in the
civetweb error log. These errors invalidate the request, and finish
the processing callback. This is implemented by the server.R
functions (re)throwing a webfakes_error
, which is caught and then
silently ignored by the processing loop.
See 'Resource cleanup' below for how resources are cleaned up on error.
See the 'Multithreading' section above as well.
We create a req
object for an incoming request, before passing it
to R from C.
This object is an environment and it is kept until the response to
the request is completely sent out.
(Or the connection is closed for some reason.)
The req
object is also added to the connection user data of
civetweb.
Additionally, the server keeps an list (environment) of all request
objects.
The latter makes sure that the request object is not garbage
collected, so we don't need to worry about that.
When a response is delayed, the app makes a note about the position
of the handler function in the handler stack (in .stackptr
), so
that this handler function can be called again, after the delay.
Then it calls response_write()
which sends a WEBFAKES_WAIT
message
to the request thread. Then the main R thread can continue
processing and potentially serving other requests, assuming the
server has been started with at least two threads.
After the wait, the request thread sends a message to the main R
thread again, and the app's poll call will get the same request
object for the second (etc.) time. The app starts calling the handler
functions from the recorded .stackptr
position.
The server runs until it is interrupted.
(From the console or remotely via processx::process$interrupt()
.)
We need to make sure that the server can be interrupted while waiting
for new requests (i.e. the main R thread waiting on the process_more
condition, see 'Multithreading' above).
pthread_cond_wait()
is interrupted by SIGINT
on Unix, seemingly,
but not on Windows, so we need to use pthread_cond_timedwait()
.
We currently check for interrupts every 50ms.
If the server is interrupted at this point, no cleanup is needed
needed, as it does not hold any resources.
In fact the functions in server.R
will keep the server intact,
with all delayed responses, and it is possible to call server_poll()
again.
But the app$listen()
method does clean up the server in this case.
Maybe this will change in the future.
In theory the C code cannot be interrupted at any other points. On the other hand the R API functions might error any time, so we do need proper cleanup everywhere, see the 'Resource cleanup' section below.
If the R code is interrupted, the server.R
functions do not need
any cleanup.
(In theory some error messages might get lost if the timing is
extremely unfortunate and a server.R
function is handling an
error when the interrupt happens.)
The app$listen()
method cleans up the server in on.exit()
.
See also 'Interruption' just above.
There are some points in the C code where R errors happen while the code is holding resources. We use (a copy of) the cleancall package to take care of resource cleanup here.
The first such place is the server_poll()
C function, after a
request is in from the request thread. Creating an R object for the
request involves a lot of R API calls, and if one of them fails,
we need to clean up the resources associated with this connection.
(The error will be logged and the R server_poll()
function will
continue polling.)
The cleanup in this case involves:
WEBFAKES_DONE
message to the request thread, so it will
quit.process_less
condition, to let other threads know
that we are again ready to process requests.Other functions that need cleanup are the C functions that work on
the response: response_delay()
, response_send_headers()
,
response_send()
and response_write()
.
These cleanups are very similar to the one for server_poll()
.
The finalizer of the server object takes care of cleaning up all
resources associated with a server, including all request objects
and request threads.
This is also called by the server_stop()
R function, which is in
turn called by the on.exit()
on the listen()
method.
The finalizer uses the list of requests in the tag of the xptr object
to walk over all requests, and finish all request threads.
Thread that are sleeping because of a delayed response frequently
check a server-wide shutdown flag, which the finalizer also sets,
so these threads quit as well.
Then the finalizer calls the civetweb function mg_stop()
.
mg_stop()
has its own shutdown flag and waits for all request and
worker threads to quit.
Given that we just cleaned up all of them, there shouldn't be too
many, and if there were any just coming in, they'll also observe out
shutdown flag and quit quickly.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.