源码分析
memcached本身是一个集中式的内存缓存系统,对于分布式的支持服务端并没有实现,只有通过客户端实现;再者,memcached是基于TCP/UDP进行通信,只要客户端语言支持TCP/UDP即可实现客户端,并且可以根据需要进行功能扩展。memchaced-client-forjava 既是使用java语言实现的客户端,并且实现了自己的功能扩展,下面这张类图描述了其主要类之间的关系。
几个重要类的说明:
MemcachedCacheManager: 管理类,负责缓存服务端,客户端,以及相关资源池的初始化工作,获取客户端等等
MemcachedCache:memcached缓存实体类,实现了所有的缓存API,实际上也会调用MemcachedClient进行操作
MemcachedClient:memcached缓存客户端,一个逻辑概念,负责与服务端实例的实际交互,通过调用sockiopool中的socket
SockIOPool:socket连接资源池,负责与memcached服务端进行交互
ClusterProcessor:集群内数据异步操作工具类
客户端可配置化
MemcachedCacheManager是入口,其start方法读取配置文件memcached.xml,初始化各个组建,包括memcached客户端,socket连接池以及集群节点。
memcached客户端是个逻辑概念,并不是和memcached服务端实例一一对应的,可以认为其是一个逻辑环上的某个节点(后面会讲到hash一致性算法时涉及),该配置文件中,可配置一个或多个客户端,每个客户端可配置一个socketPool连接池,如下:
代码如下 | 复制代码 | ||||
代码如下 |
复制代码 |
|
代码如下 | 复制代码 |
HASH环映射的初始化的代码位于SocketIOPool.populateConsistentBuckets方法中,主要代码如下:
代码如下 | 复制代码 |
private void populateConsistentBuckets() { ……... for (int i = 0; i < servers.length; i++) { int thisWeight = 1; if (this.weights != null && this.weights[i] != null) thisWeight = this.weights[i]; double factor = Math .floor(((double) (40 * this.servers.length * thisWeight)) / (double ) this.totalWeight); for (long j = 0; j < factor; j++) { byte[] d = md5.digest((servers[i] + "-" + j).getBytes()); for (int h = 0; h < 4; h++) { // k 的值使用MD5hash算法计算获得 Long k = ((long) (d[3 + h * 4] & 0xFF) << 24) | ((long) (d[2 + h * 4] & 0xFF) << 16) | ((long) (d[1 + h * 4] & 0xFF) << 8) | ((long) (d[0 + h * 4] & 0xFF)); // 用treemap来存储memcached实例所在的ip地址, // 也就是将每个缓存实例所在的ip地址映射到由k组成的hash环上 consistentBuckets.put(k, servers[i]); if (log.isDebugEnabled()) log.debug("++++ added " + servers[i] + " to server bucket"); } } ……... } } |
代码如下 | 复制代码 |
public SockIO getSock(String key, Integer hashCode){ …………. // from here on, we are working w/ multiple servers // keep trying different servers until we find one // making sure we only try each server one time Set // get initial bucket // 通过key值计算hash值,使用的是基于MD5的算法 long bucket = getBucket(key, hashCode); String server = (this.hashingAlg == CONSISTENT_HASH) ? consistentBuckets .get(bucket) : buckets.g et((int) bucket); …………... } private long getBucket(String key, Integer hashCode) { / / 通过key值计算hash值,使用的是基于MD5的算法 long hc = getHash(key, hashCode); if (this.hashingAlg == CONSISTENT_HASH) { return findPointFor(hc); } else { long bucket = hc % buckets.size(); if (bucket < 0) bucket *= -1; return bucket; } } /** * Gets the first available key equal or above the given one, if none found, * returns the first k in the bucket * * @param k * key * @return */ private Long findPointFor(Long hv) { // this works in java 6, but still want to release support for java5 // Long k = this.consistentBuckets.ceilingKey( hv ); // return ( k == null ) ? this.consistentBuckets.firstKey() : k; // 该consistentBuckets中存储的是HASH结构初始化时,存入的所有memcahced实例节点,也就是整个hash环 // tailMap方法是取出大于等于hv的所有节点,并且是递增有序的 SortedMap // 如果tmap为空,就默认返回hash环上的第一个值,否则就返回最接近hv值的那个节点 return (tmap.isEmpty()) ? this.consistentBuckets.firstKey() : tmap .firstKey(); } /** * Returns a bucket to check for a given key. * * @param key * String key cache is stored under * @return int bucket */ private long getHash(String key, Integer hashCode) { if (hashCode != null) { if (hashingAlg == CONSISTENT_HASH) return hashCode.longValue() & 0xffffffffL; else return hashCode.longValue(); } else { switch (hashingAlg) { case NATIVE_HASH: return (long) key.hashCode(); case OLD_COMPAT_HASH: return origCompatHashingAlg(key); case NEW_COMPAT_HASH: return newCompatHashingAlg(key); case CONSISTENT_HASH: return md5HashingAlg(key); default: // use the native hash as a default hashingAlg = NATIVE_HASH; return (long) key.hashCode(); } } } /** * Internal private hashing method. * * MD5 based hash algorithm for use in the consistent hashing approach. * * @param key * @return */ private static long md5HashingAlg(String key) { / /通过key值计算hash值,使用的是基于MD5的算法 MessageDigest md5 = MD5.get(); md5.reset(); md5.update(key.getBytes()); byte[] bKey = md5.digest(); long res = ((long) (bKey[3] & 0xFF) << 24) | ((long) (bKey[2] & 0xFF) << 16) | ((long) (bKey[1] & 0xFF) << 8) | (long) (bKey[0] & 0xFF); return res; } |
代码如下 | 复制代码 |
// log that we tried // 先删除定位失败的实例 tryServers.remove(server); if (tryServers.isEmpty()) break; // if we failed to get a socket from this server // then we try again by adding an incrementer to the // current key and then rehashing int rehashTries = 0; while (!tryServers.contains(server)) { // 重新计算key值 String newKey = new StringBuilder().append(rehashTries).append(key).toString(); // String.format( "%s%s", rehashTries, key ); if (log.isDebugEnabled()) log.debug("rehashing with: " + newKey); // 去HASH环上定位实例节点 bucket = getBucket(newKey, null); server=(this.hashingAlg == CONSISTENT_HASH) ? consistentBuckets.get(bucket) : buckets.get((int) bucket); rehashTries++; } |
代码如下 | 复制代码 |
public Object get(String key) { Object result = null; boolean isError = false; ……....... if (result == null && helper.hasCluster()) if (isError || helper.getClusterMode().equals(MemcachedClientClusterConfig.CLUSTER_MODE_ACTIVE)) { List for(MemCachedClient cache : caches) { if (getCacheClient(key).equals(cache)) continue; try{ try { result = cache.get(key); } catch(MemcachedException ex) { Logger.error(new StringBuilder(helper.getCacheName()) .append(" cluster get error"),ex); continue; } //仅仅判断另一台备份机器,不多次判断,防止效率低下 if (helper.getClusterMode().equals(MemcachedClientClusterConfig.CLUSTER_MODE_ACTIVE ) && result != null) { Object[] commands = new Object[]{CacheCommand.RECOVER,key,result}; // 加入队列,异步执行复制数据 addCommandToQueue(commands); } break; } catch(Exception e) { Logger.error(new StringBuilder(helper.getCacheName()) .append(" cluster get error"),e); } } } return result; } |
代码如下 | 复制代码 |
public boolean add(String key, Object value) { boolean result = getCacheClient(key).add(key,value); if (helper.hasCluster()) { Object[] commands = new Object[]{CacheCommand.ADD,key,value}; // 加入队列,异步执行 addCommandToQueue(commands); } return result; } |
代码如下 | 复制代码 |
public Object remove(String key) { Object result = getCacheClient(key).delete(key); //异步删除由于集群会导致无法被删除,因此需要一次性全部清除 if (helper.hasCluster()) { List for(MemCachedClient cache : caches) { if (getCacheClient(key).equals(cache)) continue; try { cache.delete(key); } catch(Exception ex) { Logger.error(new StringBuilder(helper.getCacheName()) .append(" cluster remove error"),ex); } } } return result; } |
代码如下 | 复制代码 |
public MemcachedCache(MemCachedClientHelper helper,int statisticsInterval) { this.helper = helper; dataQueue = new LinkedBlockingQueue |
代码如下 | 复制代码 |
public Object get(String key, int localTTL) { Object result = null; // 本地缓存中查找 result = localCache.get(key); if (result == null) { result = get(key); if (result != null) { Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.SECOND, localTTL); // 放入本地缓存 localCache.put(key, result,calendar.getTime()); } } return result; } |
代码如下 | 复制代码 |
public Object put(String key, Object value, Date expiry) { boolean result = getCacheClient(key).set(key,value,expiry); //移除本地缓存的内容 if (result) localCache.remove(key); …….. return value; } |
代码如下 | 复制代码 |
//populate cluster node to hash consistent Buckets MessageDigest md5 = MD5.get(); // 使用cluster的名称计算HASH数值空间 byte[] d = md5.digest((node.getName()).getBytes()); for (int h = 0; h < 4; h++) { Long k = ((long) (d[3 + h * 4] & 0xFF) << 24) | ((long) (d[2 + h * 4] & 0xFF) << 16) | ((long) (d[1 + h * 4] & 0xFF) << 8) | ((long) (d[0 + h * 4] & 0xFF)); consistentClusterBuckets.put(k, node.getName()); if (log.isDebugEnabled()) log.debug("++++ added " + node.getName() + " to cluster bucket"); } |
野比大雄的涅槃 最新版v0.8-625
野比大雄的涅槃是一款非常好玩的从端游移植而来的精品哆啦A梦同
汽车模拟器2内置菜单全车解锁版2024 最新版v1.54.2
汽车模拟器2内置涂装版是游戏的破解版本,在该版本中为玩家提供
快递到了亲内置菜单 安卓版v0.6.2
快递到了亲内置菜单是一款非常好玩的模拟经营类手游,内部有功能
我的世界为时已晚模组整合包 手机版v皮神木马
我的世界为时已晚模组整合包是一款剧情向的恐怖游戏像素风格沙盒
纸牌农庄内购版 v1.12.77
纸牌农庄无限道具版是一款将纸牌与模拟经营相结合的游戏,为玩家