OPENSSL源码阅读

OPENSSL的多线程

Posted by 大狗 on October 15, 2020

OPENSSL源码阅读

前言

原先写的几个都只是写了自动机,今天来看看多线程和异步。我们今天重点就是看看OPENSSL怎么做,这么做的好处与问题,锁的粒度等问题。实际上服务端s_server并不支持多线程,libcrypto支持多线程,所以对于S_SERVER我们重点看异步的做法,看libcrypto的多线程到底做了些什么。

前置知识

OPENSSL的ASYNC JOB就是使用协程+per thread局部变量的概念实现的,我们需要理解协程的基本概念,协程是个什么东西?

##

  • 多线程是怎么竞争的,竞争哪些内容?
  • 异步是怎么做的?我们以S_SERVER为例,启动的选项为-nbio_test和-nbio。nbio的socket type为SOCK_STREAM
  • 任务队列是怎么做的?(因为异步就是一个事情发现没法直接做完,那就先去做别的事情。这就是任务队列)

OPENSSL大量使用per-thread内部局部变量,即调用函数phread_key_create创建pthread_key_t类型数值,每个线程再调用pthread_setspcific()pthread_getspecific()去初始化线程内局部变量。OPENSSL调用ossl_init_get_thread_local函数去获取per线程局部变量,根据参数决定是不是分配。

ossl_init_thread_start函数调用ossl_init_get_thread_local初始化per thread的数据,初始化成功返回1,

说说OPENSSL的异步

OPENSSL的异步需要调用函数ASYNC_start_job,去注册一个函数做异步操作。这里的异步操作需要返回“per 线程局部变量”ctxkey,每个线程运行时都会有自己的任务(工作),分配器和判阻塞。每个线程调用async_fibre_swapcontext去切换当前的任务到设置好的函数,使得函数继续。ASYNC_start_job的分析我们等会再说。

	struct async_ctx_st {
    async_fibre dispatcher;
    ASYNC_JOB *currjob;
    unsigned int blocked;
};

struct async_job_st {
    async_fibre fibrectx;
    int (*func) (void *);
    void *funcargs;
    int ret;
    int status;
    ASYNC_WAIT_CTX *waitctx;
};

	async_ctx *async_get_ctx(void)
{
    return (async_ctx *)CRYPTO_THREAD_get_local(&ctxkey);
}

函数ASYNC_init_thread分配POOL,并初始化线程的局部变量。

函数ossl_init_thread_start初始化并分配当前线程的per thread的本地状态thread_local_inits_st数据结构地址和空间。

函数ASYNC_start_job首先获取当前线程的async_ctx结构,如果SSL当前有JOB,那就初始化当前ctx的运行JOB为SSL的JOB,然后判断当前JOB的状态,结束ASYNC_JOB_STOPPING,正在暂停ASYNC_JOB_PAUSING还是已暂停ASYNC_JOB_PAUSED,为什么区分这三种状态?如果当前为已暂停,往往意味着刚才是因为等待网络IO/卡的计算,再次进来是因为网络IO/卡的计算结果有数据了,要继续向下运行了。如果当前为正在暂停,意味着是某个流程调用ASYNC_pause_job来暂停当前进行的函数,比方说送卡请求,等着结果了。而结束往往是指JOB已经结束了,没必要再维持了,已经拿到结果了。

好,继续说,判断完JOB的三种状态,就调用async_fibre_swapcontext去切换协议栈了,我们这里一般是切换到async_start_func函数,去调用我们在ASYNC_start_job传入的函数指针。这里注意啊ctx->currjob->func = func;传入的函数并不直接被调用,而是在函数async_start_func被交换回来。

	int ASYNC_start_job(ASYNC_JOB **job, ASYNC_WAIT_CTX *wctx, int *ret,
                    int (*func)(void *), void *args, size_t size)
{
    async_ctx *ctx;

    if (!OPENSSL_init_crypto(OPENSSL_INIT_ASYNC, NULL))
        return ASYNC_ERR;

    ctx = async_get_ctx();
    if (ctx == NULL)
        ctx = async_ctx_new();
    if (ctx == NULL)
        return ASYNC_ERR;

    if (*job)
        ctx->currjob = *job;

    for (;;) {
        if (ctx->currjob != NULL) {
            if (ctx->currjob->status == ASYNC_JOB_STOPPING) {
                *ret = ctx->currjob->ret;
                ctx->currjob->waitctx = NULL;
                async_release_job(ctx->currjob);
                ctx->currjob = NULL;
                *job = NULL;
                return ASYNC_FINISH;
            }

            if (ctx->currjob->status == ASYNC_JOB_PAUSING) {
                *job = ctx->currjob;
                ctx->currjob->status = ASYNC_JOB_PAUSED;
                ctx->currjob = NULL;
                return ASYNC_PAUSE;
            }

            if (ctx->currjob->status == ASYNC_JOB_PAUSED) {
                ctx->currjob = *job;
                /* Resume previous job */
                if (!async_fibre_swapcontext(&ctx->dispatcher,
                        &ctx->currjob->fibrectx, 1)) {
                    ASYNCerr(ASYNC_F_ASYNC_START_JOB,
                             ASYNC_R_FAILED_TO_SWAP_CONTEXT);
                    goto err;
                }
                continue;
            }

            /* Should not happen */
            ASYNCerr(ASYNC_F_ASYNC_START_JOB, ERR_R_INTERNAL_ERROR);
            async_release_job(ctx->currjob);
            ctx->currjob = NULL;
            *job = NULL;
            return ASYNC_ERR;
        }

        /* Start a new job */
        if ((ctx->currjob = async_get_pool_job()) == NULL)
            return ASYNC_NO_JOBS;

        if (args != NULL) {
            ctx->currjob->funcargs = OPENSSL_malloc(size);
            if (ctx->currjob->funcargs == NULL) {
                ASYNCerr(ASYNC_F_ASYNC_START_JOB, ERR_R_MALLOC_FAILURE);
                async_release_job(ctx->currjob);
                ctx->currjob = NULL;
                return ASYNC_ERR;
            }
            memcpy(ctx->currjob->funcargs, args, size);
        } else {
            ctx->currjob->funcargs = NULL;
        }

        ctx->currjob->func = func;
        ctx->currjob->waitctx = wctx;
        if (!async_fibre_swapcontext(&ctx->dispatcher,
                &ctx->currjob->fibrectx, 1)) {
            ASYNCerr(ASYNC_F_ASYNC_START_JOB, ASYNC_R_FAILED_TO_SWAP_CONTEXT);
            goto err;
        }
    }

err:
    async_release_job(ctx->currjob);
    ctx->currjob = NULL;
    *job = NULL;
    return ASYNC_ERR;
}

函数async_ctx_new初始化并分配当前线程的per thread的本地状态,调用ossl_init_thread_start初始化thread_local_inits_st数据结构地址和空间。之后初始化per thread的async_ctx_st结构,每个ctx的结构。

函数async_get_pool_job从pool里获取一个job,获取本地pool,如果pool为空还要再分配per thread的工作pool。从pool中pop出最后进栈的job,如果没有job,就调用async_job_new分配一个新工作,然后调用async_fibre_makecontext初始化协程的调用栈,这里重点注意async_fibre_makecontext函数,该函数先分配一个堆栈出来,该堆栈长度为32768字节(小吧,省事省资源啊),然后调用makecontext函数切换调用栈到函数async_start_func中,async_start_func的功能下面再说。

	static ASYNC_JOB *async_get_pool_job(void) {
    ASYNC_JOB *job;
    async_pool *pool;

    pool = (async_pool *)CRYPTO_THREAD_get_local(&poolkey);
    if (pool == NULL) {
        /*
         * Pool has not been initialised, so init with the defaults, i.e.
         * no max size and no pre-created jobs
         */
        if (ASYNC_init_thread(0, 0) == 0)
            return NULL;
        pool = (async_pool *)CRYPTO_THREAD_get_local(&poolkey);
    }

    job = sk_ASYNC_JOB_pop(pool->jobs);
    if (job == NULL) {
        /* Pool is empty */
        if ((pool->max_size != 0) && (pool->curr_size >= pool->max_size))
            return NULL;

        job = async_job_new();
        if (job != NULL) {
            if (! async_fibre_makecontext(&job->fibrectx)) {
                async_job_free(job);
                return NULL;
            }
            pool->curr_size++;
        }
    }
    return job;
}

int async_fibre_makecontext(async_fibre *fibre)
{
    fibre->env_init = 0;
    if (getcontext(&fibre->fibre) == 0) {
        fibre->fibre.uc_stack.ss_sp = OPENSSL_malloc(STACKSIZE);
        if (fibre->fibre.uc_stack.ss_sp != NULL) {
            fibre->fibre.uc_stack.ss_size = STACKSIZE;
            fibre->fibre.uc_link = NULL;
            makecontext(&fibre->fibre, async_start_func, 0);
            return 1;
        }
    } else {
        fibre->fibre.uc_stack.ss_sp = NULL;
    }
    return 0;
}

函数async_start_func该函数负责调用传入的JOB所指向的函数,函数终止以后调用async_fibre_swapcontext来交出控制栈并返回到ctx->dispatcher

	void async_start_func(void)
{
    ASYNC_JOB *job;
    async_ctx *ctx = async_get_ctx();

    while (1) {
        /* Run the job */
        job = ctx->currjob;
        job->ret = job->func(job->funcargs);

        /* Stop the job */
        job->status = ASYNC_JOB_STOPPING;
        if (!async_fibre_swapcontext(&job->fibrectx,
                                     &ctx->dispatcher, 1)) {
            /*
             * Should not happen. Getting here will close the thread...can't do
             * much about it
             */
            ASYNCerr(ASYNC_F_ASYNC_START_FUNC, ASYNC_R_FAILED_TO_SWAP_CONTEXT);
        }
    }
}

函数SSL_get_all_async_fds负责获取当前SSL* swait_ctx所关联的所有文件描述符,获取之后我们就可以对其调用epoll或者select函数来监控通信。对于每个S->wait_ctx,需要调用函数ASYNC_WAIT_CTX_set_wait_fd去给S->wait_ctx注册上这个描述符。一般来说只有异步的时候这个函数才会被调用,函数wait_for_async会调用它,而sv_body函数会调用wait_for_async等待网络异步IO事件。这里再赘述两局,OPENSSL的异步事件需要调用底层的ENGINE实现,底层的ENGINE有很多种,比方说AF_ALG,DUMMY,GM等等,这些ENGINE由我们实现并提供。

函数async_fibre_swapcontext切换线程栈,注意这里面的o->env_init = 1,第一次从o到n的时候调用setcontext函数,此时还没有调用过n,所以用setcontext函数。从n回到o的时候调用_longjmp。下次再从o到n的时候,就还是用_longjmp了。

	static ossl_inline int async_fibre_swapcontext(async_fibre *o, async_fibre *n, int r)
{
    o->env_init = 1;

    if (!r || !_setjmp(o->env)) {
        if (n->env_init)
            _longjmp(n->env, 1);
        else
            setcontext(&n->fibre);
    }

    return 1;
}

函数dummy_pause_job这个函数非常有趣,我们这里要注意这个dummy_pause_job只是个假的异步唤醒函数。该函数先对异步ctxS->wait_ctx注册了一个管道pipefd,并把writefd注册到了wait_ctx中,注册完了向管道写入一个字符”x”来表示唤醒,然后调用ASYNC_pause_job函数继续执行从而切换协程。然后自己再从管道当中把那个X读出来,相当于假装通知+假装唤醒。

	static void dummy_pause_job(void) {
    ASYNC_JOB *job;
    ASYNC_WAIT_CTX *waitctx;
    OSSL_ASYNC_FD pipefds[2] = {0, 0};
    OSSL_ASYNC_FD *writefd;
#if defined(ASYNC_WIN)
    DWORD numwritten, numread;
    char buf = DUMMY_CHAR;
#elif defined(ASYNC_POSIX)
    char buf = DUMMY_CHAR;
#endif

    if ((job = ASYNC_get_current_job()) == NULL)
        return;

    waitctx = ASYNC_get_wait_ctx(job);

    if (ASYNC_WAIT_CTX_get_fd(waitctx, engine_dasync_id, &pipefds[0],
                              (void **)&writefd)) {
        pipefds[1] = *writefd;
    } else {
        writefd = OPENSSL_malloc(sizeof(*writefd));
        if (writefd == NULL)
            return;
#if defined(ASYNC_WIN)
        if (CreatePipe(&pipefds[0], &pipefds[1], NULL, 256) == 0) {
            OPENSSL_free(writefd);
            return;
        }
#elif defined(ASYNC_POSIX)
        if (pipe(pipefds) != 0) {
            OPENSSL_free(writefd);
            return;
        }
#endif
        *writefd = pipefds[1];

        if (!ASYNC_WAIT_CTX_set_wait_fd(waitctx, engine_dasync_id, pipefds[0],
                                        writefd, wait_cleanup)) {
            wait_cleanup(waitctx, engine_dasync_id, pipefds[0], writefd);
            return;
        }
    }
    /*
     * In the Dummy async engine we are cheating. We signal that the job
     * is complete by waking it before the call to ASYNC_pause_job(). A real
     * async engine would only wake when the job was actually complete
     */
#if defined(ASYNC_WIN)
    WriteFile(pipefds[1], &buf, 1, &numwritten, NULL);
#elif defined(ASYNC_POSIX)
    if (write(pipefds[1], &buf, 1) < 0)
        return;
#endif

    /* Ignore errors - we carry on anyway */
    ASYNC_pause_job();

    /* Clear the wake signal */
#if defined(ASYNC_WIN)
    ReadFile(pipefds[0], &buf, 1, &numread, NULL);
#elif defined(ASYNC_POSIX)
    if (read(pipefds[0], &buf, 1) < 0)
        return;
#endif
}

从锁说起

我们重点关注对openssl对linux系统下pthread的支持,具体的文件为threads_pthread.c,该文件位于crypto文件夹中,使用的锁为读写锁。初始化锁的函数为CRYPTO_THREAD_lock_new,实现非常简单,就是分配并初始化读写锁。我们重点是要看哪里使用了读写锁。异步的操作文件为async.c

	CRYPTO_RWLOCK *CRYPTO_THREAD_lock_new(void)
{
# ifdef USE_RWLOCK
    CRYPTO_RWLOCK *lock;

    if ((lock = OPENSSL_zalloc(sizeof(pthread_rwlock_t))) == NULL) {
        /* Don't set error, to avoid recursion blowup. */
        return NULL;
    }

    if (pthread_rwlock_init(lock, NULL) != 0) {
        OPENSSL_free(lock);
        return NULL;
    }
...

    return lock;
}
  • 在函数BIO_new会调用CRYPTO_THREAD_lock_new,初始化bio->lock
  • 函数BN_BLINDING_new中,生成一个BN_BLINDING,初始化其->lock
  • 函数context_init中,会初始化OPENSSL_CTXlock,oncelockctx->index_locks[OPENSSL_CTX_MAX_INDEXES]
  • 函数dh_new_intern中会初始化DH* ret->lock
  • 函数evp_md_new
  • 函数各种key,evp_cipher_new等等``
  • 函数CRYPTO_secure_malloc_init
  • 函数ssl_cert_newssl_cert_dup
  • 函数SSL_CTX_new_with_xxxSSL_new,查询session的时候,可以看到会对SSL_CTX加锁保护,查找session会进行检索。
  • 函数X509_STORE_new和一些其他的函数

我们可以看到对证书,BIO,SSL_CTX,SSL_st,DH/各种曲线/CIPHER都分配了锁结构。

我们重点关注下session是怎么存储的,OPENSSL查找session的函数为ssl_get_prev_session,核心的查找函数是lookup_sess_in_cache,该函数的几个查找函数直接看是看不到的,这几个函数都是通过宏定义DEFINE_LHASH_OF获得的。所以我们实际上是对每个LHASH_OF(SSL_SESSION) s->session_ctx->sessions进行查找,调用的核心函数实际上是OPENSSL_LH_retrieve。此时使用读锁来获取SSL_CTX,因为指向的SESSION CTX都是一个,这样子操作实际上比较糟糕。

	void *OPENSSL_LH_retrieve(OPENSSL_LHASH *lh, const void *data)
{
    unsigned long hash;
    OPENSSL_LH_NODE **rn;
    void *ret;

    tsan_store((TSAN_QUALIFIER int *)&lh->error, 0);

    rn = getrn(lh, data, &hash);

    if (*rn == NULL) {
        tsan_counter(&lh->num_retrieve_miss);
        return NULL;
    } else {
        ret = (*rn)->data;
        tsan_counter(&lh->num_retrieve);
    }

    return ret;
}

static OPENSSL_LH_NODE **getrn(OPENSSL_LHASH *lh,
                               const void *data, unsigned long *rhash)
{
    OPENSSL_LH_NODE **ret, *n1;
    unsigned long hash, nn;
    OPENSSL_LH_COMPFUNC cf;

    hash = (*(lh->hash)) (data);
    tsan_counter(&lh->num_hash_calls);
    *rhash = hash;

    nn = hash % lh->pmax;
    if (nn < lh->p)
        nn = hash % lh->num_alloc_nodes;

    cf = lh->comp;
    ret = &(lh->b[(int)nn]);
    for (n1 = *ret; n1 != NULL; n1 = n1->next) {
        tsan_counter(&lh->num_hash_comps);
        if (n1->hash != hash) {
            ret = &(n1->next);
            continue;
        }
        tsan_counter(&lh->num_comp_calls);
        if (cf(n1->data, data) == 0)
            break;
        ret = &(n1->next);
    }
    return ret;
}

结束

结尾的闲言碎语

写到这里差不多就可以结束了。TLS这块还有啥不明白的直接告诉我就成了 狗头的赞赏码.jpg