>From 2beea92c9d87c4ff1f9189470d3338d0173f21b0 Mon Sep 17 00:00:00 2001 From: Tomash Brechko Date: Sat, 19 Jul 2008 11:32:16 +0400 Subject: [PATCH] Add memcached_hash module. --- memcached_hash/Changes | 35 ++ memcached_hash/README | 216 ++++++++ memcached_hash/config | 6 + .../ngx_http_upstream_memcached_hash_module.c | 548 ++++++++++++++++++++ server/src/core/ngx_string.h | 1 + server/src/http/ngx_http_upstream.c | 1 + server/src/http/ngx_http_upstream.h | 1 + 7 files changed, 808 insertions(+), 0 deletions(-) create mode 100644 memcached_hash/Changes create mode 100644 memcached_hash/README create mode 100644 memcached_hash/config create mode 100644 memcached_hash/ngx_http_upstream_memcached_hash_module.c diff --git a/memcached_hash/Changes b/memcached_hash/Changes new file mode 100644 index 0000000..579cda4 --- /dev/null +++ b/memcached_hash/Changes @@ -0,0 +1,35 @@ +Revision history of ngx_http_upstream_memcached_hash_module. + +0.03 2008-05-01 + - bugfix release. + + Fix key distribution bug in compatible mode. Because of + accumulated rounding error some keys were mapped to the + different server than with Cache::Memcached. + + +0.02 2008-02-19 + - add support for $memcached_namespace variable. + + If Cache::Memcached::Fast uses + + namespace => 'prefix', + + then nginx configuration file should have + + set $memcached_namespace "prefix"; + + This is not the same as prepending "prefix" to $memcached_key: + namespace prefix should not be hashed. + + +0.01 2008-01-27 + - first official release. + + The hashing is fully compatible with Cache::Memcached::Fast + and Cache::Memcached, and thus with any other client that is + compatible with C::M. + + +0.00 2007-12-24 + - development started. diff --git a/memcached_hash/README b/memcached_hash/README new file mode 100644 index 0000000..13266dc --- /dev/null +++ b/memcached_hash/README @@ -0,0 +1,216 @@ +ngx_http_upstream_memcached_hash_module 0.03 +============================================ + +This module is a load balancer for nginx server that is meant to be +used together with Cache::Memcached::Fast Perl module (or its ancestor +Cache::Memcached). It distributes requests among several memcached +servers in a way consistent with the named Perl modules. I.e. unlike +other load balancers that try the servers one after another, this +module calculates a certain hash function of the request URI, and then +delegates the request to the "right" memcached server, the same that +would be used by the Perl module. This enables the setup where the +data is uploaded to memcached servers by the Perl script (possibly a +CGI script), and then served by nginx from there. + + +INSTALLATION + +The latest release and Git repository are available from + + http://openhack.ru/nginx-patched + +If you install both this module and nginx 0.5.x from the +memcached_hash branch of the Git repo, you don't have to apply any +patches as described below. + + +All nginx modules are meant to be statically compiled into nginx +server, so first you have to obtain nginx server source code: + + http://nginx.net/ + +Unpack server and module archives into some temporal location, and cd +to _server_ directory. From there do + + cat /path/to/ngx_http_upstream_memcached_hash_module/nginx-patches/* \ + | patch -N -p2 + +This will apply minor patches to the server code. The patches do not +change nginx functionality, they only add some utility functions that +are used in the module. The patches should apply cleanly against +nginx 0.5 stable branch (fuzz shift is OK). One of the patches is +actually a backport of some functionality from 0.6, so when applied to +0.6 development branch the following warning will be given: + + Reversed (or previously applied) patch detected! Skipping patch. + +This is OK for files src/core/nginx.c, src/core/ngx_crc32.c, +src/core/ngx_crc32.h, and this is what -N argument above is for. + +After applying the patches do + + ./configure \ + --add-module=/path/to/ngx_http_upstream_memcached_hash_module/ + +If you already have nginx installed, you'd probably want to also +specify the configure parameters you have used before. To see what +arguments have been given to configure for currently installed nginx +server, run 'nginx -V'. At last, do + + make + make install + +This will install nginx server. Do not forget to restart the server +if it was running, but before starting the new one you will have to +update the configuration file. See the next section. + + +USAGE + +As a quick start, you'll add something like + + upstream memcached_cluster { + memcached_hash ketama_points=150 weight_scale=10; + + server backend1.example.com:11211 weight=15; + server 127.0.0.1:11211 weight=10 max_fails=3 fail_timeout=30s; + + server unix:/tmp/memcached.sock weight=10 down; + } + +into the nginx server configuration file, and then you'll use +memcached_cluster in memcached_pass directive: + + server { + location / { + set $memcached_key "$uri$is_args$args"; + set $memcached_namespace "prefix"; + memcached_pass memcached_cluster; + error_page 404 502 504 = @fallback; + } + + location @fallback { + proxy_pass http://backend; + } + } + + +In the upstream block the essential directive is memcached_hash. +There are two different hashing modes: basic and Ketama. Basic mode +is compatible with both Cache::Memcached::Fast and Cache::Memcached. +It is enabled by specifying memcached_hash without any parameters, +i.e. + + upstream memcached_cluster { + memcached_hash; + ... + } + +In this mode you specify the same servers and their weights as you did +in the Perl script, _in the same order_. For instance, if the script +has + + use Cache::Memcached::Fast; + + my $memd = new Cache::Memcached::Fast({ + servers => [ { address => 'localhost:11211', weight => 2 }, + '192.168.254.2:11211', + { address => '/path/to/unix.sock', weight => 4 } ], + ... + }); + +in nginx configuration file you'd write + + upstream memcached_cluster { + memcached_hash; + + server localhost:11211 weight=2; + server 192.168.254.2:11211; + server unix:/path/to/unix.sock weight=4; + } + +Note that the server order is the same, and weight=1 is the default in +both configurations. + +Ketama mode uses the Ketama consistent hashing algorithm that is +compatible with Cache::Memcached::Fast (see Perl module documentation +for further explanation and references). It is enabled by specifying +positive ketama_points argument to memcached_hash, and possibly +weight_scale, since nginx's weights are always integer. For instance, +if you have this in your Perl script + + use Cache::Memcached::Fast; + + my $memd = new Cache::Memcached::Fast({ + servers => [ { address => 'localhost:11211', weight => 2.5 }, + '192.168.254.2:11211', + { address => '/path/to/unix.sock', weight => 4 } ], + ketama_points => 150, + ... + }); + +(note the rational server weight of 2.5, and ketama_points), your +nginx configuration will have + + upstream memcached_cluster { + memcached_hash ketama_points=150 weight_scale=10; + + server localhost:11211 weight=25; + server 192.168.254.2:11211 weight=10; + server unix:/path/to/unix.sock weight=40; + } + +Note that 192.168.254.2:11211 has the default weight of 1, but since +we scale all weights to 10, we have to explicitly specify weight=10. + +You may actually use rational server weights without enabling the +Ketama algorithm by using weight_scale while omitting ketama_points +(or setting it to zero) in both Cache::Memcached::Fast constructor and +nginx configuration file. As of this writing Cache::Memcached +supports only integer weights and does not support consistent hashing. + +If the client uses a namespace, i.e. constructor has + + namespace => 'prefix', + +then you have to set $memcached_namespace variable in nginx +configuration file: + + set $memcached_namespace "prefix"; + +Note that this is not the same as prepending prefix to $memcached_key: +namespace prefix is not hashed when the key is hashed to decide which +memcached server to talk to. + +Also note that nginx escapes an URI key before sending the request to +memcached. As of this writing the transformation is equivalent to the +Perl code + + use bytes; + $uri =~ s/[\x00-\x1f %]/ sprintf "%%%02x", ord $& /ge; + +I.e. percent sign ('%'), space (' '), and control characters with the +codes in the range 0x00--0x1f are replaced with percent sign and two +digit hexadecimal character code (with a-f in lowercase). You have to +escape URI keys the same way before uploading the data to memcached +server, otherwise nginx won't find them. + + +SUPPORT + +http://openhack.ru/nginx-patched - project home. + +Send bug reports and feature requests to . + + +ACKNOWLEDGEMENTS + +Development of this module is sponsored by Monashev Co. Ltd. + + +COPYRIGHT AND LICENCE + +Copyright (C) 2007-2008 Tomash Brechko. All rights reserved. + +This module is distributed on the same terms as the rest of nginx +source code. diff --git a/memcached_hash/config b/memcached_hash/config new file mode 100644 index 0000000..b823f17 --- /dev/null +++ b/memcached_hash/config @@ -0,0 +1,6 @@ +# ngx_http_upstream_memcached_hash_module config. + +ngx_addon_name=ngx_http_upstream_memcached_hash_module +HTTP_MODULES="$HTTP_MODULES ngx_http_upstream_memcached_hash_module" +NGX_ADDON_SRCS="$NGX_ADDON_SRCS \ + $ngx_addon_dir/ngx_http_upstream_memcached_hash_module.c" diff --git a/memcached_hash/ngx_http_upstream_memcached_hash_module.c b/memcached_hash/ngx_http_upstream_memcached_hash_module.c new file mode 100644 index 0000000..e0d5a5f --- /dev/null +++ b/memcached_hash/ngx_http_upstream_memcached_hash_module.c @@ -0,0 +1,548 @@ +/* + Copyright (C) 2007-2008 Tomash Brechko. All rights reserved. + + Development of this module was sponsored by Monashev Co. Ltd. + + This file is distributed on the same terms as the rest of nginx + source code. + + Version 0.03. +*/ + +#include +#include +#include + + +#define CONTINUUM_MAX_POINT 0xffffffffU + + +static ngx_str_t memcached_ns = ngx_string("memcached_namespace"); + + +struct memcached_hash_continuum +{ + unsigned int point; + unsigned int index; +}; + + +struct memcached_hash_peer +{ + ngx_http_upstream_server_t *server; + unsigned int addr_index; + time_t accessed; + unsigned int fails; +}; + + +struct memcached_hash +{ + struct memcached_hash_continuum *buckets; + struct memcached_hash_peer *peers; + unsigned int buckets_count; + unsigned int peer_count; + unsigned int total_weight; + unsigned int ketama_points; + unsigned int scale; + ngx_int_t ns_index; +}; + + +struct memcached_hash_find_ctx +{ + struct memcached_hash *memd; + ngx_http_upstream_server_t *server; + ngx_http_request_t *request; +}; + + +static +unsigned int +memcached_hash_find_bucket(struct memcached_hash *memd, unsigned int point) +{ + struct memcached_hash_continuum *left, *right; + + left = memd->buckets; + right = memd->buckets + memd->buckets_count; + + while (left < right) + { + struct memcached_hash_continuum *middle = left + (right - left) / 2; + if (middle->point < point) + { + left = middle + 1; + } + else if (middle->point > point) + { + right = middle; + } + else + { + /* Find the first point for this value. */ + while (middle != memd->buckets && (middle - 1)->point == point) + --middle; + + return (middle - memd->buckets); + } + } + + /* Wrap around. */ + if (left == memd->buckets + memd->buckets_count) + left = memd->buckets; + + return (left - memd->buckets); +} + + +static +ngx_int_t +memcached_hash_get_peer(ngx_peer_connection_t *pc, void *data) +{ + struct memcached_hash_peer *peer = data; + ngx_peer_addr_t *addr; + + if (peer->server->down) + goto fail; + + if (peer->server->max_fails > 0 && peer->fails >= peer->server->max_fails) + { + time_t now = ngx_time(); + if (now - peer->accessed <= peer->server->fail_timeout) + goto fail; + else + peer->fails = 0; + } + + addr = &peer->server->addrs[peer->addr_index]; + + pc->sockaddr = addr->sockaddr; + pc->socklen = addr->socklen; + pc->name = &addr->name; + + return NGX_OK; + +fail: + /* This is the last try. */ + pc->tries = 1; + + return NGX_BUSY; +} + + +static +ngx_int_t +memcached_hash_find_peer(ngx_peer_connection_t *pc, void *data) +{ + struct memcached_hash_find_ctx *find_ctx = data; + struct memcached_hash *memd = find_ctx->memd; + u_char *key; + size_t len; + unsigned int point, bucket, index; + + if (memd->peer_count == 1) + { + index = 0; + } + else + { + ngx_chain_t *request_bufs = find_ctx->request->upstream->request_bufs; + ngx_http_variable_value_t *ns_vv = + ngx_http_get_indexed_variable(find_ctx->request, memd->ns_index); + + /* + We take the key directly from request_buf, because there it is + in the escaped form that will be seen by memcached server. + */ + key = request_bufs->buf->start + (sizeof("get ") - 1); + if (ns_vv && ! ns_vv->not_found && ns_vv->len != 0) + { + key += ns_vv->len + 2 * ngx_escape_uri(NULL, ns_vv->data, ns_vv->len, + NGX_ESCAPE_MEMCACHED); + } + + len = request_bufs->buf->last - key - (sizeof("\r\n") - 1); + + point = ngx_crc32_long(key, len); + + if (memd->ketama_points == 0) + { + unsigned int scaled_total_weight = + (memd->total_weight + memd->scale / 2) / memd->scale; + point = ((point >> 16) & 0x00007fffU); + point = point % scaled_total_weight; + point = ((uint64_t) point * CONTINUUM_MAX_POINT + + scaled_total_weight / 2) / scaled_total_weight; + /* + Shift point one step forward to possibly get from the + border point which belongs to the previous bucket. + */ + point += 1; + } + + bucket = memcached_hash_find_bucket(memd, point); + index = memd->buckets[bucket].index; + } + + pc->data = &memd->peers[index]; + pc->get = memcached_hash_get_peer; + pc->tries = find_ctx->server[index].naddrs; + + return memcached_hash_get_peer(pc, pc->data); +} + + +static +void +memcached_hash_free_peer(ngx_peer_connection_t *pc, void *data, + ngx_uint_t state) +{ + struct memcached_hash_peer *peer = data; + + if (state & NGX_PEER_FAILED) + { + if (peer->server->max_fails > 0) + { + time_t now = ngx_time(); + if (now - peer->accessed > peer->server->fail_timeout) + peer->fails = 0; + ++peer->fails; + if (peer->fails == 1 || peer->fails == peer->server->max_fails) + peer->accessed = ngx_time(); + } + + if (--pc->tries > 0) + { + if (++peer->addr_index == peer->server->naddrs) + peer->addr_index = 0; + } + } + else if (state & NGX_PEER_NEXT) + { + /* + If memcached gave negative (NOT_FOUND) reply, there's no need + to try the same cache though different address. + */ + pc->tries = 0; + } +} + + +static +ngx_int_t +memcached_hash_init_peer(ngx_http_request_t *r, + ngx_http_upstream_srv_conf_t *us) +{ + struct memcached_hash *memd = us->peer.data; + struct memcached_hash_find_ctx *find_ctx; + + find_ctx = ngx_palloc(r->pool, sizeof(*find_ctx)); + if (! find_ctx) + return NGX_ERROR; + find_ctx->memd = memd; + find_ctx->request = r; + find_ctx->server = us->servers->elts; + + r->upstream->peer.free = memcached_hash_free_peer; + + /* + The following values will be replaced by + memcached_hash_find_peer(). + */ + r->upstream->peer.get = memcached_hash_find_peer; + r->upstream->peer.data = find_ctx; + r->upstream->peer.tries = 1; + + return NGX_OK; +} + + +static +ngx_int_t +memcached_init_hash(ngx_conf_t *cf, ngx_http_upstream_srv_conf_t *us) +{ + struct memcached_hash *memd = us->peer.data; + ngx_http_upstream_server_t *server; + unsigned int buckets_count, i; + + if (! us->servers) + return NGX_ERROR; + + server = us->servers->elts; + + us->peer.init = memcached_hash_init_peer; + + memd->peers = ngx_palloc(cf->pool, + sizeof(*memd->peers) * us->servers->nelts); + if (! memd->peers) + return NGX_ERROR; + + memd->total_weight = 0; + + for (i = 0; i < us->servers->nelts; ++i) + { + memd->total_weight += server[i].weight; + ngx_memzero(&memd->peers[i], sizeof(memd->peers[i])); + memd->peers[i].server = &server[i]; + } + memd->peer_count = us->servers->nelts; + + if (memd->ketama_points == 0) + { + buckets_count = us->servers->nelts; + } + else + { + buckets_count = 0; + for (i = 0; i < us->servers->nelts; ++i) + buckets_count += (memd->ketama_points * server[i].weight + + memd->scale / 2) / memd->scale; + } + + memd->buckets = ngx_palloc(cf->pool, sizeof(*memd->buckets) * buckets_count); + if (! memd->buckets) + return NGX_ERROR; + + if (memd->ketama_points == 0) + { + unsigned int total_weight = 0; + for (i = 0; i < us->servers->nelts; ++i) + { + unsigned int j; + + total_weight += server[i].weight; + for (j = 0; j < i; ++j) + { + memd->buckets[j].point -= + (uint64_t) memd->buckets[j].point * server[i].weight + / total_weight; + } + + memd->buckets[i].point = CONTINUUM_MAX_POINT; + memd->buckets[i].index = i; + } + memd->buckets_count = buckets_count; + } + else + { + memd->buckets_count = 0; + for (i = 0; i < us->servers->nelts; ++i) + { + static const char delim = '\0'; + u_char *host, *port; + size_t len, port_len = 0; + unsigned int crc32, count, j; + + host = server[i].name.data; + len = server[i].name.len; + +#if NGX_HAVE_UNIX_DOMAIN + if (ngx_strncasecmp(host, (u_char *) "unix:", 5) == 0) + { + host += 5; + len -= 5; + } +#endif /* NGX_HAVE_UNIX_DOMAIN */ + + port = host; + while (*port) + { + if (*port++ == ':') + { + port_len = len - (port - host); + len = (port - host) - 1; + break; + } + } + + ngx_crc32_init(crc32); + ngx_crc32_update(&crc32, host, len); + ngx_crc32_update(&crc32, (u_char *) &delim, 1); + ngx_crc32_update(&crc32, port, port_len); + + count = (memd->ketama_points * server[i].weight + + memd->scale / 2) / memd->scale; + for (j = 0; j < count; ++j) + { + u_char buf[4]; + unsigned int point = crc32, bucket; + + /* + We want the same result on all platforms, so we + hardcode size of int as 4 8-bit bytes. + */ + buf[0] = j & 0xff; + buf[1] = (j >> 8) & 0xff; + buf[2] = (j >> 16) & 0xff; + buf[3] = (j >> 24) & 0xff; + + ngx_crc32_update(&point, buf, 4); + ngx_crc32_final(point); + + if (memd->buckets_count > 0) + { + bucket = memcached_hash_find_bucket(memd, point); + + /* + Check if we wrapped around but actually have new + max point. + */ + if (bucket == 0 && point > memd->buckets[0].point) + { + bucket = memd->buckets_count; + } + else + { + /* + Even if there's a server for the same point + already, we have to add ours, because the + first one may be removed later. But we add + ours after the first server for not to change + key distribution. + */ + while (bucket != memd->buckets_count + && memd->buckets[bucket].point == point) + ++bucket; + + /* Move the tail one position forward. */ + if (bucket != memd->buckets_count) + { + ngx_memmove(memd->buckets + bucket + 1, + memd->buckets + bucket, + (memd->buckets_count - bucket) + * sizeof(*memd->buckets)); + } + } + } + else + { + bucket = 0; + } + + memd->buckets[bucket].point = point; + memd->buckets[bucket].index = i; + + ++memd->buckets_count; + } + } + } + + return NGX_OK; +} + + +static +char * +memcached_hash(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_str_t *value = cf->args->elts; + ngx_http_upstream_srv_conf_t *uscf; + struct memcached_hash *memd; + int ketama_points, scale; + unsigned int i; + + ketama_points = 0; + scale = 1; + + uscf = ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_module); + + for (i = 1; i < cf->args->nelts; ++i) + { + if (ngx_strncmp(value[i].data, "ketama_points=", 14) == 0) + { + ketama_points = ngx_atoi(&value[i].data[14], value[i].len - 14); + + if (ketama_points == NGX_ERROR || ketama_points < 0) + goto invalid; + + continue; + } + + if (ngx_strncmp(value[i].data, "weight_scale=", 13) == 0) + { + scale = ngx_atoi(&value[i].data[13], value[i].len - 13); + + if (scale == NGX_ERROR || scale <= 0) + goto invalid; + + continue; + } + + goto invalid; + } + + memd = ngx_palloc(cf->pool, sizeof(*memd)); + if (! memd) + return "not enough memory"; + + memd->ketama_points = ketama_points; + memd->scale = scale; + memd->ns_index = ngx_http_get_variable_index(cf, &memcached_ns); + + if (memd->ns_index == NGX_ERROR) { + return NGX_CONF_ERROR; + } + + uscf->peer.data = memd; + + uscf->peer.init_upstream = memcached_init_hash; + + uscf->flags = (NGX_HTTP_UPSTREAM_CREATE + | NGX_HTTP_UPSTREAM_WEIGHT + | NGX_HTTP_UPSTREAM_MAX_FAILS + | NGX_HTTP_UPSTREAM_FAIL_TIMEOUT + | NGX_HTTP_UPSTREAM_DOWN); + + return NGX_CONF_OK; + +invalid: + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid parameter \"%V\"", &value[i]); + + return NGX_CONF_ERROR; +} + + +static ngx_command_t memcached_hash_commands[] = { + { + ngx_string("memcached_hash"), + NGX_HTTP_UPS_CONF | NGX_CONF_ANY, /* Should be 0|1|2 params. */ + memcached_hash, + 0, + 0, + NULL + }, + + ngx_null_command +}; + + +static ngx_http_module_t memcached_hash_module_ctx = { + NULL, /* preconfiguration */ + NULL, /* postconfiguration */ + + NULL, /* create main configuration */ + NULL, /* init main configuration */ + + NULL, /* create server configuration */ + NULL, /* merge server configuration */ + + NULL, /* create location configuration */ + NULL /* merge location configuration */ +}; + + +ngx_module_t ngx_http_upstream_memcached_hash_module = { + NGX_MODULE_V1, + &memcached_hash_module_ctx, /* module context */ + memcached_hash_commands, /* module directives */ + NGX_HTTP_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; diff --git a/server/src/core/ngx_string.h b/server/src/core/ngx_string.h index 3514e52..b78d5e7 100644 --- a/server/src/core/ngx_string.h +++ b/server/src/core/ngx_string.h @@ -64,6 +64,7 @@ typedef struct { #define ngx_memzero(buf, n) (void) memset(buf, 0, n) #define ngx_memset(buf, c, n) (void) memset(buf, c, n) +#define ngx_memmove(dst, src, n) (void) memmove(dst, src, n) #if (NGX_MEMCPY_LIMIT) diff --git a/server/src/http/ngx_http_upstream.c b/server/src/http/ngx_http_upstream.c index 7dd99ba..a71b3c7 100644 --- a/server/src/http/ngx_http_upstream.c +++ b/server/src/http/ngx_http_upstream.c @@ -3289,6 +3289,7 @@ ngx_http_upstream_server(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) goto invalid; } + us->name = u.url; us->addrs = u.addrs; us->naddrs = u.naddrs; us->weight = weight; diff --git a/server/src/http/ngx_http_upstream.h b/server/src/http/ngx_http_upstream.h index 6754d65..96554b1 100644 --- a/server/src/http/ngx_http_upstream.h +++ b/server/src/http/ngx_http_upstream.h @@ -68,6 +68,7 @@ typedef struct { typedef struct { + ngx_str_t name; ngx_peer_addr_t *addrs; ngx_uint_t naddrs; ngx_uint_t weight; -- 1.5.5.1.116.ge4b9c