OPENSSL源码阅读
前言
原先写的几个都只是写了自动机,今天来看看多线程和异步。我们今天重点就是看看OPENSSL怎么做,这么做的好处与问题,锁的粒度等问题。实际上服务端s_server并不支持多线程,libcrypto支持多线程,所以对于S_SERVER我们重点看异步的做法,看libcrypto的多线程到底做了些什么。
前置知识
OPENSSL的ASYNC JOB就是使用协程+per thread局部变量的概念实现的,我们需要理解协程的基本概念,协程是个什么东西?
##
- 多线程是怎么竞争的,竞争哪些内容?
- 异步是怎么做的?我们以S_SERVER为例,启动的选项为-nbio_test和-nbio。nbio的socket type为SOCK_STREAM
- 任务队列是怎么做的?(因为异步就是一个事情发现没法直接做完,那就先去做别的事情。这就是任务队列)
OPENSSL大量使用per-thread内部局部变量,即调用函数phread_key_create
创建pthread_key_t
类型数值,每个线程再调用pthread_setspcific()
和pthread_getspecific()
去初始化线程内局部变量。OPENSSL调用ossl_init_get_thread_local
函数去获取per线程局部变量,根据参数决定是不是分配。
ossl_init_thread_start
函数调用ossl_init_get_thread_local
初始化per thread的数据,初始化成功返回1,
说说OPENSSL的异步
OPENSSL的异步需要调用函数ASYNC_start_job
,去注册一个函数做异步操作。这里的异步操作需要返回“per 线程局部变量”ctxkey
,每个线程运行时都会有自己的任务(工作),分配器和判阻塞。每个线程调用async_fibre_swapcontext
去切换当前的任务到设置好的函数,使得函数继续。ASYNC_start_job
的分析我们等会再说。
struct async_ctx_st {
async_fibre dispatcher;
ASYNC_JOB *currjob;
unsigned int blocked;
};
struct async_job_st {
async_fibre fibrectx;
int (*func) (void *);
void *funcargs;
int ret;
int status;
ASYNC_WAIT_CTX *waitctx;
};
async_ctx *async_get_ctx(void)
{
return (async_ctx *)CRYPTO_THREAD_get_local(&ctxkey);
}
函数ASYNC_init_thread
分配POOL,并初始化线程的局部变量。
函数ossl_init_thread_start
初始化并分配当前线程的per thread的本地状态thread_local_inits_st
数据结构地址和空间。
函数ASYNC_start_job
首先获取当前线程的async_ctx
结构,如果SSL当前有JOB,那就初始化当前ctx的运行JOB为SSL的JOB,然后判断当前JOB的状态,结束ASYNC_JOB_STOPPING
,正在暂停ASYNC_JOB_PAUSING
还是已暂停ASYNC_JOB_PAUSED
,为什么区分这三种状态?如果当前为已暂停,往往意味着刚才是因为等待网络IO/卡的计算,再次进来是因为网络IO/卡的计算结果有数据了,要继续向下运行了。如果当前为正在暂停,意味着是某个流程调用ASYNC_pause_job
来暂停当前进行的函数,比方说送卡请求,等着结果了。而结束往往是指JOB已经结束了,没必要再维持了,已经拿到结果了。
好,继续说,判断完JOB的三种状态,就调用async_fibre_swapcontext
去切换协议栈了,我们这里一般是切换到async_start_func
函数,去调用我们在ASYNC_start_job
传入的函数指针。这里注意啊ctx->currjob->func = func;
传入的函数并不直接被调用,而是在函数async_start_func
被交换回来。
int ASYNC_start_job(ASYNC_JOB **job, ASYNC_WAIT_CTX *wctx, int *ret,
int (*func)(void *), void *args, size_t size)
{
async_ctx *ctx;
if (!OPENSSL_init_crypto(OPENSSL_INIT_ASYNC, NULL))
return ASYNC_ERR;
ctx = async_get_ctx();
if (ctx == NULL)
ctx = async_ctx_new();
if (ctx == NULL)
return ASYNC_ERR;
if (*job)
ctx->currjob = *job;
for (;;) {
if (ctx->currjob != NULL) {
if (ctx->currjob->status == ASYNC_JOB_STOPPING) {
*ret = ctx->currjob->ret;
ctx->currjob->waitctx = NULL;
async_release_job(ctx->currjob);
ctx->currjob = NULL;
*job = NULL;
return ASYNC_FINISH;
}
if (ctx->currjob->status == ASYNC_JOB_PAUSING) {
*job = ctx->currjob;
ctx->currjob->status = ASYNC_JOB_PAUSED;
ctx->currjob = NULL;
return ASYNC_PAUSE;
}
if (ctx->currjob->status == ASYNC_JOB_PAUSED) {
ctx->currjob = *job;
/* Resume previous job */
if (!async_fibre_swapcontext(&ctx->dispatcher,
&ctx->currjob->fibrectx, 1)) {
ASYNCerr(ASYNC_F_ASYNC_START_JOB,
ASYNC_R_FAILED_TO_SWAP_CONTEXT);
goto err;
}
continue;
}
/* Should not happen */
ASYNCerr(ASYNC_F_ASYNC_START_JOB, ERR_R_INTERNAL_ERROR);
async_release_job(ctx->currjob);
ctx->currjob = NULL;
*job = NULL;
return ASYNC_ERR;
}
/* Start a new job */
if ((ctx->currjob = async_get_pool_job()) == NULL)
return ASYNC_NO_JOBS;
if (args != NULL) {
ctx->currjob->funcargs = OPENSSL_malloc(size);
if (ctx->currjob->funcargs == NULL) {
ASYNCerr(ASYNC_F_ASYNC_START_JOB, ERR_R_MALLOC_FAILURE);
async_release_job(ctx->currjob);
ctx->currjob = NULL;
return ASYNC_ERR;
}
memcpy(ctx->currjob->funcargs, args, size);
} else {
ctx->currjob->funcargs = NULL;
}
ctx->currjob->func = func;
ctx->currjob->waitctx = wctx;
if (!async_fibre_swapcontext(&ctx->dispatcher,
&ctx->currjob->fibrectx, 1)) {
ASYNCerr(ASYNC_F_ASYNC_START_JOB, ASYNC_R_FAILED_TO_SWAP_CONTEXT);
goto err;
}
}
err:
async_release_job(ctx->currjob);
ctx->currjob = NULL;
*job = NULL;
return ASYNC_ERR;
}
函数async_ctx_new
初始化并分配当前线程的per thread的本地状态,调用ossl_init_thread_start
初始化thread_local_inits_st
数据结构地址和空间。之后初始化per thread的async_ctx_st
结构,每个ctx的结构。
函数async_get_pool_job
从pool里获取一个job,获取本地pool,如果pool为空还要再分配per thread的工作pool。从pool中pop出最后进栈的job,如果没有job,就调用async_job_new
分配一个新工作,然后调用async_fibre_makecontext
初始化协程的调用栈,这里重点注意async_fibre_makecontext
函数,该函数先分配一个堆栈出来,该堆栈长度为32768字节(小吧,省事省资源啊),然后调用makecontext
函数切换调用栈到函数async_start_func
中,async_start_func
的功能下面再说。
static ASYNC_JOB *async_get_pool_job(void) {
ASYNC_JOB *job;
async_pool *pool;
pool = (async_pool *)CRYPTO_THREAD_get_local(&poolkey);
if (pool == NULL) {
/*
* Pool has not been initialised, so init with the defaults, i.e.
* no max size and no pre-created jobs
*/
if (ASYNC_init_thread(0, 0) == 0)
return NULL;
pool = (async_pool *)CRYPTO_THREAD_get_local(&poolkey);
}
job = sk_ASYNC_JOB_pop(pool->jobs);
if (job == NULL) {
/* Pool is empty */
if ((pool->max_size != 0) && (pool->curr_size >= pool->max_size))
return NULL;
job = async_job_new();
if (job != NULL) {
if (! async_fibre_makecontext(&job->fibrectx)) {
async_job_free(job);
return NULL;
}
pool->curr_size++;
}
}
return job;
}
int async_fibre_makecontext(async_fibre *fibre)
{
fibre->env_init = 0;
if (getcontext(&fibre->fibre) == 0) {
fibre->fibre.uc_stack.ss_sp = OPENSSL_malloc(STACKSIZE);
if (fibre->fibre.uc_stack.ss_sp != NULL) {
fibre->fibre.uc_stack.ss_size = STACKSIZE;
fibre->fibre.uc_link = NULL;
makecontext(&fibre->fibre, async_start_func, 0);
return 1;
}
} else {
fibre->fibre.uc_stack.ss_sp = NULL;
}
return 0;
}
函数async_start_func
该函数负责调用传入的JOB所指向的函数,函数终止以后调用async_fibre_swapcontext
来交出控制栈并返回到ctx->dispatcher
中
void async_start_func(void)
{
ASYNC_JOB *job;
async_ctx *ctx = async_get_ctx();
while (1) {
/* Run the job */
job = ctx->currjob;
job->ret = job->func(job->funcargs);
/* Stop the job */
job->status = ASYNC_JOB_STOPPING;
if (!async_fibre_swapcontext(&job->fibrectx,
&ctx->dispatcher, 1)) {
/*
* Should not happen. Getting here will close the thread...can't do
* much about it
*/
ASYNCerr(ASYNC_F_ASYNC_START_FUNC, ASYNC_R_FAILED_TO_SWAP_CONTEXT);
}
}
}
函数SSL_get_all_async_fds
负责获取当前SSL* s
的wait_ctx
所关联的所有文件描述符,获取之后我们就可以对其调用epoll或者select函数来监控通信。对于每个S->wait_ctx
,需要调用函数ASYNC_WAIT_CTX_set_wait_fd
去给S->wait_ctx
注册上这个描述符。一般来说只有异步的时候这个函数才会被调用,函数wait_for_async
会调用它,而sv_body
函数会调用wait_for_async
等待网络异步IO事件。这里再赘述两局,OPENSSL的异步事件需要调用底层的ENGINE实现,底层的ENGINE有很多种,比方说AF_ALG,DUMMY,GM等等,这些ENGINE由我们实现并提供。
函数async_fibre_swapcontext
切换线程栈,注意这里面的o->env_init = 1
,第一次从o到n的时候调用setcontext
函数,此时还没有调用过n,所以用setcontext
函数。从n回到o的时候调用_longjmp
。下次再从o到n的时候,就还是用_longjmp了。
static ossl_inline int async_fibre_swapcontext(async_fibre *o, async_fibre *n, int r)
{
o->env_init = 1;
if (!r || !_setjmp(o->env)) {
if (n->env_init)
_longjmp(n->env, 1);
else
setcontext(&n->fibre);
}
return 1;
}
函数dummy_pause_job
这个函数非常有趣,我们这里要注意这个dummy_pause_job
只是个假的异步唤醒函数。该函数先对异步ctxS->wait_ctx
注册了一个管道pipefd,并把writefd注册到了wait_ctx中,注册完了向管道写入一个字符”x”来表示唤醒,然后调用ASYNC_pause_job
函数继续执行从而切换协程。然后自己再从管道当中把那个X读出来,相当于假装通知+假装唤醒。
static void dummy_pause_job(void) {
ASYNC_JOB *job;
ASYNC_WAIT_CTX *waitctx;
OSSL_ASYNC_FD pipefds[2] = {0, 0};
OSSL_ASYNC_FD *writefd;
#if defined(ASYNC_WIN)
DWORD numwritten, numread;
char buf = DUMMY_CHAR;
#elif defined(ASYNC_POSIX)
char buf = DUMMY_CHAR;
#endif
if ((job = ASYNC_get_current_job()) == NULL)
return;
waitctx = ASYNC_get_wait_ctx(job);
if (ASYNC_WAIT_CTX_get_fd(waitctx, engine_dasync_id, &pipefds[0],
(void **)&writefd)) {
pipefds[1] = *writefd;
} else {
writefd = OPENSSL_malloc(sizeof(*writefd));
if (writefd == NULL)
return;
#if defined(ASYNC_WIN)
if (CreatePipe(&pipefds[0], &pipefds[1], NULL, 256) == 0) {
OPENSSL_free(writefd);
return;
}
#elif defined(ASYNC_POSIX)
if (pipe(pipefds) != 0) {
OPENSSL_free(writefd);
return;
}
#endif
*writefd = pipefds[1];
if (!ASYNC_WAIT_CTX_set_wait_fd(waitctx, engine_dasync_id, pipefds[0],
writefd, wait_cleanup)) {
wait_cleanup(waitctx, engine_dasync_id, pipefds[0], writefd);
return;
}
}
/*
* In the Dummy async engine we are cheating. We signal that the job
* is complete by waking it before the call to ASYNC_pause_job(). A real
* async engine would only wake when the job was actually complete
*/
#if defined(ASYNC_WIN)
WriteFile(pipefds[1], &buf, 1, &numwritten, NULL);
#elif defined(ASYNC_POSIX)
if (write(pipefds[1], &buf, 1) < 0)
return;
#endif
/* Ignore errors - we carry on anyway */
ASYNC_pause_job();
/* Clear the wake signal */
#if defined(ASYNC_WIN)
ReadFile(pipefds[0], &buf, 1, &numread, NULL);
#elif defined(ASYNC_POSIX)
if (read(pipefds[0], &buf, 1) < 0)
return;
#endif
}
从锁说起
我们重点关注对openssl对linux系统下pthread的支持,具体的文件为threads_pthread.c,该文件位于crypto文件夹中,使用的锁为读写锁。初始化锁的函数为CRYPTO_THREAD_lock_new
,实现非常简单,就是分配并初始化读写锁。我们重点是要看哪里使用了读写锁。异步的操作文件为async.c
CRYPTO_RWLOCK *CRYPTO_THREAD_lock_new(void)
{
# ifdef USE_RWLOCK
CRYPTO_RWLOCK *lock;
if ((lock = OPENSSL_zalloc(sizeof(pthread_rwlock_t))) == NULL) {
/* Don't set error, to avoid recursion blowup. */
return NULL;
}
if (pthread_rwlock_init(lock, NULL) != 0) {
OPENSSL_free(lock);
return NULL;
}
...
return lock;
}
- 在函数BIO_new会调用
CRYPTO_THREAD_lock_new
,初始化bio->lock - 函数
BN_BLINDING_new
中,生成一个BN_BLINDING
,初始化其->lock - 函数
context_init
中,会初始化OPENSSL_CTX
的lock
,oncelock
和ctx->index_locks[OPENSSL_CTX_MAX_INDEXES]
- 函数
dh_new_intern
中会初始化DH* ret->lock
- 函数
evp_md_new
- 函数各种key,evp_cipher_new等等``
- 函数
CRYPTO_secure_malloc_init
- 函数
ssl_cert_new
和ssl_cert_dup
- 函数
SSL_CTX_new_with_xxx
和SSL_new
,查询session的时候,可以看到会对SSL_CTX加锁保护,查找session会进行检索。 - 函数
X509_STORE_new
和一些其他的函数
我们可以看到对证书,BIO,SSL_CTX,SSL_st,DH/各种曲线/CIPHER都分配了锁结构。
我们重点关注下session是怎么存储的,OPENSSL查找session的函数为ssl_get_prev_session
,核心的查找函数是lookup_sess_in_cache
,该函数的几个查找函数直接看是看不到的,这几个函数都是通过宏定义DEFINE_LHASH_OF
获得的。所以我们实际上是对每个LHASH_OF(SSL_SESSION) s->session_ctx->sessions
进行查找,调用的核心函数实际上是OPENSSL_LH_retrieve
。此时使用读锁来获取SSL_CTX,因为指向的SESSION CTX都是一个,这样子操作实际上比较糟糕。
void *OPENSSL_LH_retrieve(OPENSSL_LHASH *lh, const void *data)
{
unsigned long hash;
OPENSSL_LH_NODE **rn;
void *ret;
tsan_store((TSAN_QUALIFIER int *)&lh->error, 0);
rn = getrn(lh, data, &hash);
if (*rn == NULL) {
tsan_counter(&lh->num_retrieve_miss);
return NULL;
} else {
ret = (*rn)->data;
tsan_counter(&lh->num_retrieve);
}
return ret;
}
static OPENSSL_LH_NODE **getrn(OPENSSL_LHASH *lh,
const void *data, unsigned long *rhash)
{
OPENSSL_LH_NODE **ret, *n1;
unsigned long hash, nn;
OPENSSL_LH_COMPFUNC cf;
hash = (*(lh->hash)) (data);
tsan_counter(&lh->num_hash_calls);
*rhash = hash;
nn = hash % lh->pmax;
if (nn < lh->p)
nn = hash % lh->num_alloc_nodes;
cf = lh->comp;
ret = &(lh->b[(int)nn]);
for (n1 = *ret; n1 != NULL; n1 = n1->next) {
tsan_counter(&lh->num_hash_comps);
if (n1->hash != hash) {
ret = &(n1->next);
continue;
}
tsan_counter(&lh->num_comp_calls);
if (cf(n1->data, data) == 0)
break;
ret = &(n1->next);
}
return ret;
}
结束
结尾的闲言碎语
写到这里差不多就可以结束了。TLS这块还有啥不明白的直接告诉我就成了