src/server/daemon/resourcepool.c

Wed, 27 Nov 2024 23:00:07 +0100

author
Olaf Wintermann <olaf.wintermann@gmail.com>
date
Wed, 27 Nov 2024 23:00:07 +0100
changeset 563
6ca97c99173e
parent 490
d218607f5a7e
permissions
-rw-r--r--

add TODO to use a future ucx feature

/*
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 *
 * Copyright 2022 Olaf Wintermann. All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 *   1. Redistributions of source code must retain the above copyright
 *      notice, this list of conditions and the following disclaimer.
 *
 *   2. Redistributions in binary form must reproduce the above copyright
 *      notice, this list of conditions and the following disclaimer in the
 *      documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "resourcepool.h"
#include "request.h"
#include "session.h"
#include "../public/nsapi.h"
#include "../util/atomic.h"

#define RESOURCE_POOL_MAX_DEFAULT 32

#define RESOURCE_POOL_MAX_ALLOC 268435455

static CxMap *resource_pool_types;

int init_resource_pools(void) {
    resource_pool_types = cxHashMapCreate(cxDefaultAllocator, CX_STORE_POINTERS, 4);
    return resource_pool_types ? 0 : 1;
}

int resourcepool_register_type(const char *type_name, ResourceType *type_info) {
    if(cxMapPut(resource_pool_types, cx_hash_key_str(type_name), type_info)) {
        log_ereport(LOG_CATASTROPHE, "resourcepool_register_type: OOM");
        return 1;
    }
    return 0;
}



int resourcepool_new(ServerConfiguration *cfg, cxstring type, cxstring name, ConfigNode *node) {
    ResourceType *restype = cxMapGet(resource_pool_types, cx_hash_key_bytes((unsigned const char*)type.ptr, type.length));
    if(!restype) {
        log_ereport(LOG_MISCONFIG, "unknown resource pool type: %s", type.ptr);
        return 1;
    }
    
    char *respool_name = cx_strdup_a(cfg->a, name).ptr;
    if(!respool_name) {
        log_ereport(LOG_FAILURE, "resourcepool_new: OOM");
        return 1;
    }
    
    // convert ConfigNode to pblock
    // no sub-objects allowed for this specific ConfigNode, therefore
    // it can be represented as key-value-pairs
    pblock *param = config_obj2pblock(cfg->pool, node);
    if(!param) {
        log_ereport(LOG_FAILURE, "resourcepool_new: OOM");
        return 1;
    }
    
    ResourcePool *respool = pool_malloc(cfg->pool, sizeof(ResourcePool));
    if(!respool) {
        log_ereport(LOG_FAILURE, "resourcepool_new: OOM");
        return 1;
    }
    respool->pool = cfg->pool;
    
    void *respool_data = restype->init(cfg->pool, name.ptr, param);
    if(!respool_data) {
        log_ereport(LOG_FAILURE, "Cannot create resource pool data: pool: %s type: %s", name.ptr, type.ptr);
        return 1;
    }
    
    respool->name = respool_name;
    respool->type = restype;
    respool->data = respool_data;
    respool->min = 4; // TODO: get from node
    respool->max = RESOURCE_POOL_MAX_DEFAULT; // TODO: get from node
    
    respool->numcreated = 0;
    respool->numresources = 0;
    
    
    // don't allow too large resource pools
    // this prevents the need to check malloc integer overflows
    if(respool->max > RESOURCE_POOL_MAX_ALLOC) {
        respool->max = RESOURCE_POOL_MAX_ALLOC;
        log_ereport(LOG_WARN, "Resource pool %s: limit max to %d", name.ptr, respool->max);
    }
    
    respool->resalloc = respool->max;
    respool->resources = pool_malloc(cfg->pool, respool->resalloc * sizeof(ResourceDataPrivate*));
    
    if(!respool->resources || cxMapPut(cfg->resources, cx_hash_key_bytes((unsigned const char*)name.ptr, name.length), respool)) {
        log_ereport(LOG_FAILURE, "Cannot add resource pool: OOM");
        // the only cleanup we have to do
        restype->destroy(respool_data);
        return 1;
    }
    
    pthread_mutex_init(&respool->lock, NULL);
    pthread_cond_init(&respool->available, NULL);
    
    return 0;
}

static ResourceData* s_resourcepool_lookup(ServerConfiguration *cfg, Request *opt_rq, Session *opt_sn, const char *name, int flags) {
    NSAPIRequest *request = (NSAPIRequest*)opt_rq;
    NSAPISession *session = (NSAPISession*)opt_sn;
    ResourceDataPrivate *resource = NULL;
    
    // was this resource already used by this request?
    if(request && request->resources) {
        resource = cxMapGet(request->resources, cx_hash_key_str(name));
        if(resource) {
            return &resource->data;
        }
    }
    
    ResourcePool *respool = cxMapGet(cfg->resources, cx_hash_key_str(name));
    if(!respool) return NULL;

    
    pthread_mutex_lock(&respool->lock);
    WSBool createResource = FALSE;
    if(respool->numcreated < respool->min) {
        createResource = TRUE;
    }
    
    if(createResource) {
        // create a new resource and store it in the resourcepool
        log_ereport(LOG_DEBUG, "resourcepool %s: create resource", name);
        
        void *resourceData = respool->type->createresource(respool->data);
        if(resourceData) {
            respool->numcreated++;
            
            resource = pool_malloc(respool->pool, sizeof(ResourceDataPrivate));
            if(resource) {
                resource->data.data = respool->type->getresourcedata(resourceData);
                resource->data.resourcepool = respool;
                resource->resdata = resourceData;
            } else {
                respool->type->freeresource(respool->data, resourceData);
                log_ereport(LOG_CATASTROPHE, "resourcepool_lookup: OOM");
            }
        }
        // else: respool->type->createresource does logging in case of errors
    } else if(respool->numresources > 0) {
        resource = respool->resources[--respool->numresources];
    } else {
        // wait for free resource
        pthread_cond_wait(&respool->available, &respool->lock);
        if(respool->numresources > 0) {
            resource = respool->resources[--respool->numresources];
        }        
    }
    
    // save the resource in the request object, for caching and also
    // for cleanup later
    int err = 0;
    if(resource) {
        if(request && session) {
            if(!request->resources) {
                request->resources = cxHashMapCreate(pool_allocator(session->sn.pool), CX_STORE_POINTERS, 8);
            }

            if(request->resources) {
                if(cxMapPut(request->resources, cx_hash_key_str(name), resource)) {
                    err = 1;
                }
            } else {
                err = 1;
            }
        } // else: lookup is outside of any request context
        
        if(respool->type->prepare(respool->data, resource->resdata)) {
            err = -1;
        }
    }
    
    if(err) {
        // err == 1 caused by OOM
        log_ereport(LOG_FAILURE, "resourcepool_lookup: OOM");
        // cleanup
        resourcepool_destroy_resource(resource);
        resource = NULL;
    }
    
    pthread_mutex_unlock(&respool->lock);
    
    return (ResourceData*)resource;
}

ResourceData* resourcepool_cfg_lookup(ServerConfiguration *cfg, const char *name, int flags) {
    return s_resourcepool_lookup(cfg, NULL, NULL, name, flags);
}

ResourceData* resourcepool_lookup(Session *sn, Request *rq, const char *name, int flags) {
    NSAPISession *session = (NSAPISession*)sn;
    ServerConfiguration *cfg = session->config;
    return s_resourcepool_lookup(cfg, rq, sn, name, flags);
}

void resourcepool_free(Session *sn, Request *rq, ResourceData *resource) {
    NSAPIRequest *nsapi_rq = (NSAPIRequest *)rq;
    ResourceDataPrivate *res = (ResourceDataPrivate*)resource;
    ResourcePool *respool = resource->resourcepool;
    
    if(nsapi_rq && !nsapi_rq->finished) {
        // request processing still ongoing and SAFs will be executed
        // TODO: future ucx cxMapRemove returns, if it actually removed something, therefore use just cxMapRemove
        if(!cxMapRemoveAndGet(nsapi_rq->resources, cx_hash_key_str(respool->name))) {
            log_ereport(LOG_FAILURE, "resourcepool_free: cannot remove resource from request: potential double free");
        }
    }
    // else: safe to ignore nsapi_rq->resources
    
    if(respool->type->finish(respool->data, res->resdata)) {
        log_ereport(LOG_FAILURE, "resourcepool_free: finish failed");
    }
    
    pthread_mutex_lock(&respool->lock);
    
    if(respool->numresources >= respool->resalloc) {
        // actually respool->resalloc == respool->max
        // and numresources should always be smaller
        // however just be extra safe here
        respool->resalloc += 8;
        ResourceDataPrivate **new_res_array = pool_realloc(
                respool->pool,
                respool->resources,
                respool->resalloc * sizeof(ResourceDataPrivate*));
        if(new_res_array) {
            respool->resources = new_res_array;
        } else {
            log_ereport(LOG_FAILURE, "resourcepool_free: OOM");
            resourcepool_destroy_resource(res);
            pthread_mutex_unlock(&respool->lock);
            return;
        }
    }
    
    respool->resources[respool->numresources++] = res;
    
    pthread_cond_signal(&respool->available);
    pthread_mutex_unlock(&respool->lock);
}

void resourcepool_destroy_resource(ResourceDataPrivate *res) {
    res->data.resourcepool->numcreated--;
    res->data.resourcepool->type->freeresource(res->data.resourcepool->data, res->resdata);
    pool_free(res->data.resourcepool->pool, res);
}

void resourcepool_destroy(ResourcePool *respool) {
    // TODO
}

mercurial