Redis 单节点迁移到Cluster集群-AI
Redis 单节点迁移到Cluster集群-AI,AI终将取代程序员。
=== Redis 迁移工具: 单节点到集群 ===
源: Redis 5.0 单节点
目标: Redis 7.2 集群
==================================================
配置:
– 工作线程数: 16
– 批处理大小: 1000
– 管道大小: 100
==================================================
正在连接源 Redis 5.0 单节点…
正在连接目标 Redis 7.2 集群…
正在创建二进制数据客户端…
在 Redis 5.0 源中发现 29416 个键。
正在扫描键…
已收集 29381 个键准备迁移。
使用 16 个并行工作线程开始迁移…
进度: 10/30 块, 10000/29416 键 (34.0%), 109.6 键/秒, 0 个错误
进度: 20/30 块, 19381/29416 键 (65.9%), 120.6 键/秒, 0 个错误
进度: 30/30 块, 29381/29416 键 (99.9%), 178.1 键/秒, 0 个错误
迁移完成,耗时 165.01 秒。
成功迁移 29381 个键,有 0 个错误。
键统计信息:
KEYA键: 460 个
KEYB键: 28921 个
其他键前缀统计:
总操作完成,耗时 165.23 秒。
成功迁移 29381 个键,有 0 个错误。
注意: 已跳过 TTL 验证步骤。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
#!/usr/bin/env python3 import redis from redis.cluster import RedisCluster, ClusterNode import time import sys import concurrent.futures import re from collections import Counter # Redis 5.0 (Single Node) Configuration REDIS_SINGLE = { "HOST": "172.18.1.8", "PORT": 6379, "PASSWORD": "12345678", "DATABASE": 1, "TIMEOUT": 7 * 24 * 60 * 60 } # Redis 7.2 (Cluster) Configuration REDIS_CLUSTER = { "NODES": [ {"host": "172.18.5.2", "port": 6379}, {"host": "172.18.5.3", "port": 6379}, {"host": "172.18.5.4", "port": 6379}, {"host": "172.18.5.5", "port": 6379}, {"host": "172.18.5.6", "port": 6379}, {"host": "172.18.5.7", "port": 6379} ], "PASSWORD": "12345678", "TIMEOUT": 7 * 24 * 60 * 60 } # Global settings MAX_WORKERS = 16 # Number of parallel workers BATCH_SIZE = 1000 # Increased batch size for performance PIPELINE_SIZE = 100 # Number of commands to batch in a pipeline # Create binary (non-decoded) Redis clients for binary data def create_binary_clients(): src_binary = redis.Redis( host=REDIS_SINGLE["HOST"], port=REDIS_SINGLE["PORT"], password=REDIS_SINGLE["PASSWORD"], db=REDIS_SINGLE["DATABASE"], decode_responses=False # Don't decode binary data ) dst_binary = RedisCluster( startup_nodes=[ClusterNode(node["host"], node["port"]) for node in REDIS_CLUSTER["NODES"]], password=REDIS_CLUSTER["PASSWORD"], decode_responses=False, # Don't decode binary data skip_full_coverage_check=True ) return src_binary, dst_binary def migrate_key_batch(keys, src_redis, dst_redis, src_binary, dst_binary): """ Migrate a batch of keys with pipelining """ migrated = 0 errors = 0 key_types = Counter() # 统计不同类型的键 for key in keys: try: # Special handling for binary data in DATAFILE keys is_binary = key.startswith('KEYA:') # 统计键前缀 if key.startswith('KEYA:'): key_types['KEYA'] += 1 elif key.startswith('KEYB:'): key_types['KEYB'] += 1 else: prefix = key.split(':', 1)[0] if ':' in key else 'OTHER' key_types[prefix] += 1 # Select the appropriate clients based on data type source = src_binary if is_binary else src_redis destination = dst_binary if is_binary else dst_redis # Get the key type key_type = src_redis.type(key) # Get TTL (Time To Live) ttl = src_redis.ttl(key) if ttl < 0: # -1 means no expiry, -2 means key doesn't exist ttl = None # Migrate data based on type if key_type == "string": value = source.get(key) destination.set(key, value) if ttl is not None: destination.expire(key, ttl) elif key_type == "list": value = source.lrange(key, 0, -1) if value: # Only if the list is not empty destination.delete(key) # Delete if exists in destination destination.rpush(key, *value) if ttl is not None: destination.expire(key, ttl) elif key_type == "hash": # Special handling for binary hash data if is_binary: # For binary hash data like DATAFILE keys value = source.hgetall(key) if value: destination.delete(key) # Iterate through each field-value pair for field, val in value.items(): destination.hset(key, field, val) if ttl is not None: destination.expire(key, ttl) else: # Regular hash handling value = source.hgetall(key) if value: destination.delete(key) destination.hset(key, mapping=value) if ttl is not None: destination.expire(key, ttl) elif key_type == "set": value = source.smembers(key) if value: # Only if the set is not empty destination.delete(key) # Delete if exists in destination destination.sadd(key, *value) if ttl is not None: destination.expire(key, ttl) elif key_type == "zset": # Get all members with their scores value = source.zrange(key, 0, -1, withscores=True) if value: # Only if the sorted set is not empty destination.delete(key) # Delete if exists in destination destination.zadd(key, dict(value)) if ttl is not None: destination.expire(key, ttl) migrated += 1 except Exception as e: print(f"Error migrating key '{key}': {str(e)}") errors += 1 return migrated, errors, key_types def migrate_redis_data(): """ Migrate data from Redis 5.0 single node to Redis 7.2 cluster """ print("正在连接源 Redis 5.0 单节点...") try: src_redis = redis.Redis( host=REDIS_SINGLE["HOST"], port=REDIS_SINGLE["PORT"], password=REDIS_SINGLE["PASSWORD"], db=REDIS_SINGLE["DATABASE"], decode_responses=True ) src_redis.ping() # Test connection except Exception as e: print(f"连接源 Redis 出错: {str(e)}") sys.exit(1) print("正在连接目标 Redis 7.2 集群...") try: dst_redis = RedisCluster( startup_nodes=[ClusterNode(node["host"], node["port"]) for node in REDIS_CLUSTER["NODES"]], password=REDIS_CLUSTER["PASSWORD"], decode_responses=True, skip_full_coverage_check=True ) dst_redis.ping() # Test connection except Exception as e: print(f"连接目标 Redis 集群出错: {str(e)}") sys.exit(1) print("正在创建二进制数据客户端...") src_binary, dst_binary = create_binary_clients() # Count total keys for progress tracking total_keys = src_redis.dbsize() print(f"在 Redis 5.0 源中发现 {total_keys} 个键。") # Migration stats migrated = 0 errors = 0 key_types_total = Counter() start_time = time.time() # Collect all keys first to distribute work better print("正在扫描键...") all_keys = [] cursor = '0' while cursor != 0: cursor, keys = src_redis.scan(cursor=cursor, count=BATCH_SIZE) all_keys.extend(keys) print(f"已收集 {len(all_keys)} 个键准备迁移。") print(f"使用 {MAX_WORKERS} 个并行工作线程开始迁移...") # Split keys into chunks for parallel processing key_chunks = [all_keys[i:i + BATCH_SIZE] for i in range(0, len(all_keys), BATCH_SIZE)] # Use ThreadPoolExecutor to process in parallel with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: future_to_chunk = { executor.submit( migrate_key_batch, chunk, src_redis, dst_redis, src_binary, dst_binary ): chunk for chunk in key_chunks } completed_chunks = 0 for future in concurrent.futures.as_completed(future_to_chunk): chunk = future_to_chunk[future] try: chunk_migrated, chunk_errors, chunk_key_types = future.result() migrated += chunk_migrated errors += chunk_errors key_types_total.update(chunk_key_types) completed_chunks += 1 if completed_chunks % 10 == 0 or completed_chunks == len(key_chunks): elapsed = time.time() - start_time rate = migrated / elapsed if elapsed > 0 else 0 progress = (migrated / total_keys * 100) if total_keys > 0 else 0 print(f"进度: {completed_chunks}/{len(key_chunks)} 块, " f"{migrated}/{total_keys} 键 ({progress:.1f}%), " f"{rate:.1f} 键/秒, {errors} 个错误") except Exception as e: print(f"处理块出错: {str(e)}") errors += len(chunk) elapsed_time = time.time() - start_time print(f"\n迁移完成,耗时 {elapsed_time:.2f} 秒。") print(f"成功迁移 {migrated} 个键,有 {errors} 个错误。") print("\n键统计信息:") if 'DATAFILE' in key_types_total: print(f"DATAFILE 键: {key_types_total['DATAFILE']} 个") if 'DATAAPI' in key_types_total: print(f"DATAAPI 键: {key_types_total['DATAAPI']} 个") print("其他键前缀统计:") for prefix, count in sorted([(k, v) for k, v in key_types_total.items() if k not in ('DATAFILE', 'DATAAPI') and v > 10], key=lambda x: x[1], reverse=True): print(f" - {prefix}: {count} 个") return migrated, errors if __name__ == "__main__": print("=== Redis 迁移工具: 单节点到集群 ===") print("源: Redis 5.0 单节点") print("目标: Redis 7.2 集群") print("="*50) print(f"配置:") print(f" - 工作线程数: {MAX_WORKERS}") print(f" - 批处理大小: {BATCH_SIZE}") print(f" - 管道大小: {PIPELINE_SIZE}") print("="*50) start_time = time.time() migrated, errors = migrate_redis_data() elapsed_time = time.time() - start_time print(f"\n总操作完成,耗时 {elapsed_time:.2f} 秒。") print(f"成功迁移 {migrated} 个键,有 {errors} 个错误。") print("\n注意: 已跳过 TTL 验证步骤。") |