目录:
$ pip3 install redis
import redis
# 创建 Redis 连接
r = redis.Redis(
host='localhost', # Redis 服务器地址
port=6379, # 端口
db=0, # 数据库编号
password=None, # 密码
decode_responses=True # 自动解码为字符串
)
try:
r.ping()
print("成功连接到 Redis")
except redis.ConnectionError:
print("无法连接到 Redis")
import redis
# 创建 Redis 连接
r = redis.Redis(
host='localhost', # Redis 服务器地址
port=6379, # 端口
db=0, # 数据库编号
password=None, # 密码
decode_responses=True # 自动解码为字符串
)
# 设置键值对
r.set('name', 'Alice')
r.set('age', '25')
# 设置带过期时间的键(10秒后过期)
r.setex('temp_key', 10, 'temporary value')
# 获取值
name = r.get('name')
print(f"姓名: {name}") # 输出: 姓名: Alice
# 批量设置和获取
r.mset({'key1': 'value1', 'key2': 'value2'})
values = r.mget(['key1', 'key2'])
print(values) # 输出: ['value1', 'value2']
# 数值操作
r.set('counter', 10)
r.incr('counter') # 增加 1
r.incrby('counter', 5) # 增加 5
r.decr('counter') # 减少 1
print(r.get('counter')) # 输出: 15
# 检查键是否存在
if r.exists('name'):
print("键 'name' 存在")
# 删除键
r.delete('temp_key')
就是存取字典。
import redis
# 创建 Redis 连接
r = redis.Redis(
host='localhost', # Redis 服务器地址
port=6379, # 端口
db=0, # 数据库编号
password=None, # 密码
decode_responses=True # 自动解码为字符串
)
# 设置哈希字段
r.hset('user:1000', 'name', 'Bob')
r.hset('user:1000', 'email', 'bob@example.com')
r.hset('user:1000', 'age', '30')
# 获取哈希字段
name = r.hget('user:1000', 'name')
print(f"用户名: {name}")
# 获取所有字段和值
user_data = r.hgetall('user:1000')
print(user_data) # 输出: {'name': 'Bob', 'email': 'bob@example.com', 'age': '30'}
# 获取所有字段名或值
fields = r.hkeys('user:1000')
values = r.hvals('user:1000')
print(f"字段名: {fields}")
print(f"字段值: {values}")
# 字段数量
field_count = r.hlen('user:1000')
print(f"用户字段数量: {field_count}")
# 检查字段是否存在
if r.hexists('user:1000', 'email'):
print("邮箱字段存在")
import redis
# 创建 Redis 连接
r = redis.Redis(
host='localhost', # Redis 服务器地址
port=6379, # 端口
db=0, # 数据库编号
password=None, # 密码
decode_responses=True # 自动解码为字符串
)
# 从左侧添加元素
r.lpush('tasks', 'task1')
r.lpush('tasks', 'task2', 'task3')
# 从右侧添加元素
r.rpush('tasks', 'task4')
# 获取列表范围
tasks = r.lrange('tasks', 0, -1) # 获取所有元素
print(f"任务列表: {tasks}") # 输出: ['task3', 'task2', 'task1', 'task4']
# 获取列表长度
length = r.llen('tasks')
print(f"任务数量: {length}")
# 弹出元素
left_task = r.lpop('tasks') # 从左侧弹出
right_task = r.rpop('tasks') # 从右侧弹出
print(f"弹出的任务: {left_task}, {right_task}")
# 通过索引获取元素
task = r.lindex('tasks', 0)
print(f"第一个任务: {task}")
# 修剪列表
r.ltrim('tasks', 0, 2) # 只保留前3个元素
import redis
# 创建 Redis 连接
r = redis.Redis(
host='localhost', # Redis 服务器地址
port=6379, # 端口
db=0, # 数据库编号
password=None, # 密码
decode_responses=True # 自动解码为字符串
)
# 添加元素到集合
r.sadd('tags', 'python', 'redis', 'database', 'cache')
# 获取集合所有成员
all_tags = r.smembers('tags')
print(f"所有标签: {all_tags}") # {'cache', 'python', 'database', 'redis'}
# 检查成员是否存在
if r.sismember('tags', 'python'):
print("Python 标签存在")
# 集合运算
r.sadd('tags2', 'python', 'web', 'development')
intersection = r.sinter('tags', 'tags2') # 交集
union = r.sunion('tags', 'tags2') # 并集
difference = r.sdiff('tags', 'tags2') # 差集
print(f"交集: {intersection}")
print(f"并集: {union}")
print(f"差集: {difference}")
# 移除元素
r.srem('tags', 'cache')
# 随机弹出元素
random_tag = r.spop('tags')
print(f"随机弹出的标签: {random_tag}")
↶ 返回首页 ↶