diff options
Diffstat (limited to 'contrib/unbound/cachedb/redis.c')
-rw-r--r-- | contrib/unbound/cachedb/redis.c | 144 |
1 files changed, 130 insertions, 14 deletions
diff --git a/contrib/unbound/cachedb/redis.c b/contrib/unbound/cachedb/redis.c index 3dfa95859eb8..9383f1c8576c 100644 --- a/contrib/unbound/cachedb/redis.c +++ b/contrib/unbound/cachedb/redis.c @@ -46,6 +46,8 @@ #include "cachedb/cachedb.h" #include "util/alloc.h" #include "util/config_file.h" +#include "util/locks.h" +#include "util/timeval_func.h" #include "sldns/sbuffer.h" #ifdef USE_REDIS @@ -75,6 +77,18 @@ struct redis_moddata { /* timeout for connection setup */ struct timeval connect_timeout; struct timeval replica_connect_timeout; + /* the reconnect interval time. */ + struct timeval reconnect_interval; + struct timeval replica_reconnect_interval; + /* reconnect attempts, 0 if connected, counts up failed reconnects. */ + int reconnect_attempts; + int replica_reconnect_attempts; + /* Lock on reconnect_wait time. */ + lock_basic_type wait_lock; + lock_basic_type replica_wait_lock; + /* reconnect wait time, wait until it has passed before reconnect. */ + struct timeval reconnect_wait; + struct timeval replica_reconnect_wait; /* the redis logical database to use */ int logical_db; int replica_logical_db; @@ -82,6 +96,10 @@ struct redis_moddata { int set_with_ex_available; }; +/** The limit on the number of redis connect attempts. After failure if + * the number is exceeded, the reconnects are throttled by the wait time. */ +#define REDIS_RECONNECT_ATTEMPT_LIMIT 3 + static redisReply* redis_command(struct module_env*, struct cachedb_env*, const char*, const uint8_t*, size_t, int); @@ -105,6 +123,8 @@ moddata_clean(struct redis_moddata** moddata) { } free((*moddata)->replica_ctxs); } + lock_basic_destroy(&(*moddata)->wait_lock); + lock_basic_destroy(&(*moddata)->replica_wait_lock); free(*moddata); *moddata = NULL; } @@ -113,10 +133,39 @@ static redisContext* redis_connect(const char* host, int port, const char* path, const char* password, int logical_db, const struct timeval connect_timeout, - const struct timeval command_timeout) + const struct timeval command_timeout, + const struct timeval* reconnect_interval, + int* reconnect_attempts, + struct timeval* reconnect_wait, + lock_basic_type* wait_lock, + struct timeval* now_tv, + const char* infostr) { + struct timeval now_val; redisContext* ctx; + /* See if the redis server is down, and reconnect has to wait. */ + if(*reconnect_attempts > REDIS_RECONNECT_ATTEMPT_LIMIT) { + /* Acquire lock to look at timeval, the integer has atomic + * integrity. */ + struct timeval wait_tv; + if(now_tv) { + now_val = *now_tv; + } else { + if(gettimeofday(&now_val, NULL) < 0) + log_err("redis: gettimeofday: %s", + strerror(errno)); + } + lock_basic_lock(wait_lock); + wait_tv = *reconnect_wait; + lock_basic_unlock(wait_lock); + if(timeval_smaller(&now_val, &wait_tv)) { + verbose(VERB_ALGO, "redis %sdown, reconnect wait", + infostr); + return NULL; + } + } + if(path && path[0]!=0) { ctx = redisConnectUnixWithTimeout(path, connect_timeout); } else { @@ -126,18 +175,18 @@ redis_connect(const char* host, int port, const char* path, const char *errstr = "out of memory"; if(ctx) errstr = ctx->errstr; - log_err("failed to connect to redis server: %s", errstr); + log_err("failed to connect to redis %sserver: %s", infostr, errstr); goto fail; } if(redisSetTimeout(ctx, command_timeout) != REDIS_OK) { - log_err("failed to set redis timeout, %s", ctx->errstr); + log_err("failed to set redis %stimeout, %s", infostr, ctx->errstr); goto fail; } if(password && password[0]!=0) { redisReply* rep; rep = redisCommand(ctx, "AUTH %s", password); if(!rep || rep->type == REDIS_REPLY_ERROR) { - log_err("failed to authenticate with password"); + log_err("failed to authenticate %swith password", infostr); freeReplyObject(rep); goto fail; } @@ -147,18 +196,20 @@ redis_connect(const char* host, int port, const char* path, redisReply* rep; rep = redisCommand(ctx, "SELECT %d", logical_db); if(!rep || rep->type == REDIS_REPLY_ERROR) { - log_err("failed to set logical database (%d)", - logical_db); + log_err("failed %sto set logical database (%d)", + infostr, logical_db); freeReplyObject(rep); goto fail; } freeReplyObject(rep); } + *reconnect_attempts = 0; if(verbosity >= VERB_OPS) { char port_str[6+1]; port_str[0] = ' '; (void)snprintf(port_str+1, sizeof(port_str)-1, "%d", port); - verbose(VERB_OPS, "Connection to Redis established (%s%s)", + verbose(VERB_OPS, "Connection to Redis %sestablished (%s%s)", + infostr, path&&path[0]!=0?path:host, path&&path[0]!=0?"":port_str); } @@ -167,6 +218,25 @@ redis_connect(const char* host, int port, const char* path, fail: if(ctx) redisFree(ctx); + (*reconnect_attempts)++; + if(*reconnect_attempts > REDIS_RECONNECT_ATTEMPT_LIMIT) { + /* Wait for the reconnect interval before trying again. */ + struct timeval tv; + if(now_tv) { + now_val = *now_tv; + } else { + if(gettimeofday(&now_val, NULL) < 0) + log_err("redis: gettimeofday: %s", + strerror(errno)); + } + tv = now_val; + timeval_add(&tv, reconnect_interval); + lock_basic_lock(wait_lock); + *reconnect_wait = tv; + lock_basic_unlock(wait_lock); + verbose(VERB_ALGO, "redis %sreconnect wait until %d.%6.6d", + infostr, (int)tv.tv_sec, (int)tv.tv_usec); + } return NULL; } @@ -191,6 +261,13 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env) log_err("out of memory"); goto fail; } + lock_basic_init(&moddata->wait_lock); + lock_protect(&moddata->wait_lock, &moddata->reconnect_wait, + sizeof(moddata->reconnect_wait)); + lock_basic_init(&moddata->replica_wait_lock); + lock_protect(&moddata->replica_wait_lock, + &moddata->replica_reconnect_wait, + sizeof(moddata->replica_reconnect_wait)); moddata->numctxs = env->cfg->num_threads; /* note: server_host and similar string configuration options are * shallow references to configured strings; we don't have to free them @@ -219,6 +296,8 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env) set_timeout(&moddata->replica_connect_timeout, env->cfg->redis_replica_timeout, env->cfg->redis_replica_connect_timeout); + set_timeout(&moddata->reconnect_interval, 1000, 0); + set_timeout(&moddata->replica_reconnect_interval, 1000, 0); moddata->logical_db = env->cfg->redis_logical_db; moddata->replica_logical_db = env->cfg->redis_replica_logical_db; @@ -245,7 +324,13 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env) moddata->server_password, moddata->logical_db, moddata->connect_timeout, - moddata->command_timeout); + moddata->command_timeout, + &moddata->reconnect_interval, + &moddata->reconnect_attempts, + &moddata->reconnect_wait, + &moddata->wait_lock, + env->now_tv, + ""); if(!ctx) { log_err("redis_init: failed to init redis " "(for thread %d)", i); @@ -263,7 +348,13 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env) moddata->replica_server_password, moddata->replica_logical_db, moddata->replica_connect_timeout, - moddata->replica_command_timeout); + moddata->replica_command_timeout, + &moddata->replica_reconnect_interval, + &moddata->replica_reconnect_attempts, + &moddata->replica_reconnect_wait, + &moddata->replica_wait_lock, + env->now_tv, + "replica "); if(!ctx) { log_err("redis_init: failed to init redis " "replica (for thread %d)", i); @@ -301,7 +392,7 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env) set_with_ex_fail: log_err("redis_init: failure during redis_init, the " "redis-expire-records option requires the SET with EX command " - "(redis >= 2.6.2)"); + "(redis >= 2.6.12)"); return 1; fail: moddata_clean(&moddata); @@ -364,7 +455,13 @@ redis_command(struct module_env* env, struct cachedb_env* cachedb_env, d->replica_server_password, d->replica_logical_db, d->replica_connect_timeout, - d->replica_command_timeout); + d->replica_command_timeout, + &d->replica_reconnect_interval, + &d->replica_reconnect_attempts, + &d->replica_reconnect_wait, + &d->replica_wait_lock, + env->now_tv, + "replica "); } else { ctx = redis_connect( d->server_host, @@ -373,7 +470,13 @@ redis_command(struct module_env* env, struct cachedb_env* cachedb_env, d->server_password, d->logical_db, d->connect_timeout, - d->command_timeout); + d->command_timeout, + &d->reconnect_interval, + &d->reconnect_attempts, + &d->reconnect_wait, + &d->wait_lock, + env->now_tv, + ""); } ctx_selector[env->alloc->thread_num] = ctx; } @@ -405,7 +508,14 @@ redis_lookup(struct module_env* env, struct cachedb_env* cachedb_env, char* key, struct sldns_buffer* result_buffer) { redisReply* rep; - char cmdbuf[4+(CACHEDB_HASHSIZE/8)*2+1]; /* "GET " + key */ + /* Supported commands: + * - "GET " + key + */ +#define REDIS_LOOKUP_MAX_BUF_LEN \ + 4 /* "GET " */ \ + +(CACHEDB_HASHSIZE/8)*2 /* key hash */ \ + + 1 /* \0 */ + char cmdbuf[REDIS_LOOKUP_MAX_BUF_LEN]; int n; int ret = 0; @@ -465,7 +575,13 @@ redis_store(struct module_env* env, struct cachedb_env* cachedb_env, * older redis 2.0.0 was "SETEX " + key + " " + ttl + " %b" * - "EXPIRE " + key + " 0" */ - char cmdbuf[6+(CACHEDB_HASHSIZE/8)*2+11+3+1]; +#define REDIS_STORE_MAX_BUF_LEN \ + 7 /* "EXPIRE " */ \ + +(CACHEDB_HASHSIZE/8)*2 /* key hash */ \ + + 7 /* " %b EX " */ \ + + 20 /* ttl (uint64_t) */ \ + + 1 /* \0 */ + char cmdbuf[REDIS_STORE_MAX_BUF_LEN]; if (!set_ttl) { verbose(VERB_ALGO, "redis_store %s (%d bytes)", key, (int)data_len); |