1 简介
Redis(Remote Dictionary Server)是一个开源的高性能键值对存储系统,具有快速、灵活和可扩展的特性。它是一个基于内存的数据结构存储系统,可以用作数据库、缓存和消息代理等。
2 主要特点
- **高性能:**Redis 数据存储在内存中,因此能够提供极快的读写操作。它采用单线程模型和异步 I/O,避免了多线程的竞争和阻塞,从而达到了非常高的性能。
- **数据结构多样:**Redis 支持多种数据结构,包括字符串(String)、哈希(Hash)、列表(List)、集合(Set)和有序集合(Sorted Set)。这些数据结构提供了丰富的操作命令,使得开发者可以方便地处理各种数据需求。
- **持久化支持:**Redis 提供了两种持久化方式,即快照(Snapshotting)和日志追加(Append-only file,AOF)。快照方式将 Redis 内存数据以二进制格式写入磁盘,而 AOF 则通过追加记录 Redis 的操作命令来实现持久化。
- **发布/订阅:**Redis 支持发布/订阅模式,可以用作消息代理。发布者将消息发送到指定的频道,订阅者则可以接收和处理这些消息。这种模式在构建实时通信、事件驱动系统和消息队列等场景中非常有用。
- **分布式缓存:**Redis可以通过主从复制和分片来实现数据的分布式存储和高可用性。主从复制可以将数据复制到多个从节点,实现读写分离和数据备份。而分片则可以将数据分布在多个Redis节点上,实现横向扩展和负载均衡。
- **事务支持:**Redis 支持事务,开发者可以将多个操作组合成一个原子性的操作序列,保证这些操作要么全部执行成功,要么全部不执行。
- **功能丰富:**Redis不仅仅是一个简单的缓存,它还提供了许多其他功能,如事务支持、Lua脚本执行、定时任务、原子操作等。这使得开发者可以在Redis中实现更复杂的应用逻辑。
3 spring-data-redis
3.1 yml配置
3.1.1 通用配置
spring:
data:
redis:
client-name: RName # 设置到CLIENT SETNAME的名称。Redis 的 CLIENT SETNAME 命令用于为当前连接分配一个名字。 这个名字会显示在 CLIENT LIST 命令的结果中, 用于识别当前正在与服务器进行连接的客户端
client-type: lettuce # 客户端类型
#url: redis://username:password@host:port # 使用url来代替 host, port, username, and password的配置
username: admin # 用户名
password: 123456 # 密码
database: 0 # 数据库
connect-timeout: 1000 # 连接超时。单位:ms
timeout: 1000 # 读取超时。单位:ms
repositories:
enabled: true # 是否启用Redis存储库
ssl:
bundle: # SSL bundle name.
enabled: true # 是否启用ssl
jedis: # 使用jedis客户端
pool:
enabled: true # 是否启用连接池
max-active: 8 # 最大连接数。负数表示不限制
max-idle: 8 # 最大空闲数。负数表示不限制
min-idle: 0 # 最小空闲数。必须为正数
max-wait: -1ms # 当池耗尽时,连接分配在引发异常之前应阻塞的最长时间,负数表示不限制
time-between-eviction-runs: # 空闲对象逐出器线程运行之间的时间。 当为正时,空闲对象逐出线程启动,否则不执行空闲对象逐出
lettuce: # 使用lettuce客户端
pool:
enabled: true # 是否启用连接池
max-active: 8 # 最大连接数。负数表示不限制
max-idle: 8 # 最大空闲数。负数表示不限制
min-idle: 0 # 最小空闲数。必须为正数
max-wait: -1ms # 当池耗尽时,连接分配在引发异常之前应阻塞的最长时间,负数表示不限制
time-between-eviction-runs: # 空闲对象逐出器线程运行之间的时间。 当为正时,空闲对象逐出线程启动,否则不执行空闲对象逐出
shutdown-timeout: 100ms # 关闭连接超时时间
3.1.2 专有配置
3.1.2.1 单机模式
spring:
data:
redis:
host: 192.168.1.11 # 地址
port: 6379 # 端口
3.1.2.2 主从复制模式(哨兵模式)
spring:
data:
redis:
sentinel:
master: master # 哨兵的sentinel.conf配置文件中的主节点名称
nodes: 192.168.1.11:26379, 192.168.1.12:26379, 192.168.1.13:26379 # 哨兵节点
username: admin # 哨兵用户名
password: 123456 # 哨兵密码
3.1.2.3 集群模式
spring:
data:
redis:
cluster:
nodes: 192.168.1.11:6379, 192.168.1.12:6379, 192.168.1.13:6379, 192.168.1.11:6380, 192.168.1.12:6380, 192.168.1.13:6380
max-redirects: 3 # 同重向的最大次数。第一台服务不可用,会尝试第二台,以此类推
lettuce:
cluster:
refresh:
adaptive: true # 定时刷新Redis Cluster集群缓存,动态改变客户端的节点情况,完成故障转移
dynamic-refresh-sources: true # 是否动态刷新拓扑源
period: 1000 # 集群模式!需要设置群集拓扑刷新周期(毫秒)
3.2 RedisTemplate和StringRedisTemplate
/** 操作通用类型的对象 */
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/** 操作String类型的对象 */
@Autowired
private StringRedisTemplate stringRedisTemplate;
RedisTemplate
可以通过opsForXxx
生成对应的类型的操作对象
// RedisTemplate 可以获取对应数据类型的 XxxOperations
ValueOperations<String, Object> objectValueOperations = redisTemplate.opsForValue();
HashOperations<String, String, Object> objectHashOperations = redisTemplate.opsForHash();
ListOperations<String, Object> objectListOperations = redisTemplate.opsForList();
SetOperations<String, Object> objectSetOperations = redisTemplate.opsForSet();
ZSetOperations<String, Object> objectZSetOperations = redisTemplate.opsForZSet();
同时,可以使用@Resource
注解获取xxxOperations
对象,且获取时可以将Object
对象根据值类型换成具体的对象。注意,值类型为String
时,name
需要使用stringRedisTemplate
@Autowired
private StringRedisTemplate stringRedisTemplate;
private ValueOperations<String, String> stringValueOperations = stringRedisTemplate.opsForValue();
||
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Resource(name = "stringRedisTemplate")
private ValueOperations<String, String> stringValueOperations;
@Autowired
private RedisTemplate<String, User> redisTemplate; // 注意:这里值类型必须为 User,不可以为 Object
private ValueOperations<String, User> userValueOperations = redisTemplate.opsForValue();
||
@Autowired
private RedisTemplate<String, Object> redisTemplate; // 注意:这里值类型可以为 Object
@Resource(name = "redisTemplate")
private ValueOperations<String, User> userValueOperations; // 注意:这里值类型为 User,变量名称为userXXX(user与类型User同名)
@Autowired
private RedisTemplate<String, Integer> redisTemplate; // 注意:这里值类型必须为 Integer,不可以为
private ValueOperations<String, Integer> userValueOperations = redisTemplate.opsForValue();
||
@Autowired
private RedisTemplate<String, Object> redisTemplate; // 注意:这里值类型可以为 Object
@Resource(name = "redisTemplate")
private ValueOperations<String, Integer> integerValueOperations; // 注意:这里值类型为 Integer,变量名称为integerXXX(integer与类型Integer同名)
3.3 RedisTemplate主要public方法
public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperations<K, V>, BeanClassLoaderAware {
//
public void setBeanClassLoader(ClassLoader classLoader);
public void afterPropertiesSet(); // 初始化RedisTemplate的一些参数设置
public void setExposeConnection(boolean exposeConnection);
public void setEnableTransactionSupport(boolean enableTransactionSupport);
public boolean isExposeConnection();
public Cursor<K> scan(ScanOptions options);
public List<RedisClientInfo> getClientList();
public void killClient(String host, int port);
public void replicaOf(String host, int port);
public void replicaOfNoOne();
// Serializer
public boolean isEnableDefaultSerializer();
public void setEnableDefaultSerializer(boolean enableDefaultSerializer);
public void setDefaultSerializer(RedisSerializer<?> serializer);
public RedisSerializer<?> getDefaultSerializer();
public void setKeySerializer(RedisSerializer<?> serializer);
public RedisSerializer<?> getKeySerializer();
public void setStringSerializer(RedisSerializer<String> stringSerializer);
public RedisSerializer<String> getStringSerializer();
public void setValueSerializer(RedisSerializer<?> serializer);
public RedisSerializer<?> getValueSerializer();
public void setHashKeySerializer(RedisSerializer<?> hashKeySerializer);
public RedisSerializer<?> getHashKeySerializer();
public void setHashValueSerializer(RedisSerializer<?> hashValueSerializer);
public RedisSerializer<?> getHashValueSerializer();
// Operations
public ValueOperations<K, V> opsForValue();
public <HK, HV> HashOperations<K, HK, HV> opsForHash();
public ListOperations<K, V> opsForList();
public SetOperations<K, V> opsForSet();
public ZSetOperations<K, V> opsForZSet();
public ClusterOperations<K, V> opsForCluster();
public GeoOperations<K, V> opsForGeo();
public HyperLogLogOperations<K, V> opsForHyperLogLog();
public <HK, HV> StreamOperations<K, HK, HV> opsForStream();
public <HK, HV> StreamOperations<K, HK, HV> opsForStream(HashMapper<? super K, ? super HK, ? super HV> hashMapper);
// boundXXXOps
public BoundValueOperations<K, V> boundValueOps(K key);
public <HK, HV> BoundHashOperations<K, HK, HV> boundHashOps(K key);
public BoundListOperations<K, V> boundListOps(K key);
public BoundSetOperations<K, V> boundSetOps(K key);
public BoundZSetOperations<K, V> boundZSetOps(K key);
public BoundGeoOperations<K, V> boundGeoOps(K key);
public <HK, HV> BoundStreamOperations<K, HK, HV> boundStreamOps(K key);
// key
public Boolean hasKey(K key);
public Long countExistingKeys(Collection<K> keys);
public Set<K> keys(K pattern);
public DataType type(K key);
public Boolean delete(K key);
public Long delete(Collection<K> keys);
public Boolean unlink(K key);
public Long unlink(Collection<K> keys);
public void rename(K oldKey, K newKey);
public Boolean renameIfAbsent(K oldKey, K newKey);
public K randomKey();
// expire
public Boolean expire(K key, final long timeout, final TimeUnit unit);
public Boolean expireAt(K key, final Date date);
public Long getExpire(K key);
public Long getExpire(K key, TimeUnit timeUnit);
public Boolean persist(K key);
// 数据迁移、备份和恢复
public Boolean copy(K source, K target, boolean replace);
public Boolean move(K key, final int dbIndex); // 将key移动到其他数据库dbIndex中
public byte[] dump(K key);
public void restore(K key, byte[] value, long timeToLive, TimeUnit unit, boolean replace);
// sort
public List<V> sort(SortQuery<K> query);
public <T> List<T> sort(SortQuery<K> query, @Nullable RedisSerializer<T> resultSerializer);
public <T> List<T> sort(SortQuery<K> query, BulkMapper<T, V> bulkMapper);
public <T, S> List<T> sort(SortQuery<K> query, BulkMapper<T, S> bulkMapper, @Nullable RedisSerializer<S> resultSerializer);
public Long sort(SortQuery<K> query, K storeKey);
// transaction
public void watch(K key);
public void watch(Collection<K> keys);
public void unwatch();
public void multi();
public List<Object> exec();
public List<Object> exec(RedisSerializer<?> valueSerializer);
// execute。执行原生/底层实现
public void setScriptExecutor(ScriptExecutor<K> scriptExecutor);
public <T> T execute(RedisCallback<T> action);
public <T> T execute(RedisCallback<T> action, boolean exposeConnection);
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline);
public <T> T execute(SessionCallback<T> session);
public List<Object> executePipelined(SessionCallback<?> session);
public List<Object> executePipelined(SessionCallback<?> session, @Nullable RedisSerializer<?> resultSerializer);
public List<Object> executePipelined(RedisCallback<?> action);
public List<Object> executePipelined(RedisCallback<?> action, @Nullable RedisSerializer<?> resultSerializer);
public <T> T execute(RedisScript<T> script, List<K> keys, Object... args);
public <T> T execute(RedisScript<T> script, RedisSerializer<?> argsSerializer, RedisSerializer<T> resultSerializer, List<K> keys, Object... args);
public <T extends Closeable> T executeWithStickyConnection(RedisCallback<T> callback);
//
public void discard();
public Long convertAndSend(String channel, Object message);
}
3.4 示例
3.4.1 pom
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.3</version>
</dependency>
3.4.2 yml
spring:
data:
redis:
client-type: lettuce # 客户端类型
host: 192.168.1.10 # 地址
port: 6379 # 端口
# username: admin # 用户名
# password: 123456 # 密码
database: 0 # 数据库
connect-timeout: 1000 # 连接超时。单位:ms
timeout: 1000 # 读取超时。单位:ms
repositories:
enabled: true # 是否启用Redis存储库
lettuce: # 使用lettuce客户端
pool:
enabled: true # 是否启用连接池
max-active: 8 # 最大连接数。负数表示不限制
max-idle: 8 # 最大空闲数。负数表示不限制
min-idle: 0 # 最小空闲数。必须为正数
max-wait: -1ms # 当池耗尽时,连接分配在引发异常之前应阻塞的最长时间,负数表示不限制
time-between-eviction-runs: # 空闲对象逐出器线程运行之间的时间。 当为正时,空闲对象逐出线程启动,否则不执行空闲对象逐出
shutdown-timeout: 100ms # 关闭连接超时时间
3.6.1 bean配置
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.nio.charset.StandardCharsets;
@Configuration
public class RedisBeanConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);// 连接工厂
template.setKeySerializer(new StringRedisSerializer(StandardCharsets.UTF_8)); // key序列化
template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); // value序列化
template.setHashKeySerializer(new StringRedisSerializer(StandardCharsets.UTF_8)); // hash序列化
template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer()); // hash序列化
template.setEnableTransactionSupport(true); // 启用事务支持
template.afterPropertiesSet(); // 初始化RedisTemplate的一些参数设置。如果不执行此方法,可能会报一些莫名其妙的错误,那应该就是部分参数没有初始化造成的
return template;
}
}
3.6.2 redisTemplate基本操作
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.core.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest
public class BaseOperationsTest {
@Autowired
private StringRedisTemplate stringRedisTemplate;
//region key-value
String keyPrefix = "string-";
String noExistsKey = "abc";
String key1 = keyPrefix + "key1";
String key2 = keyPrefix + "key2";
String key3 = keyPrefix + "key3";
String key4 = keyPrefix + "key4";
String key1Value = "STRING KEY 1";
String key2Value = "STRING KEY 2";
String key3Value = "STRING KEY 3";
String key4Value = "STRING KEY 4";
//endregion
void reset() {
this.stringRedisTemplate.delete(Arrays.asList(key1, key2, key3, key4));
ValueOperations<String, String> operations = this.stringRedisTemplate.opsForValue();
operations.set(key1, key1Value);
operations.set(key2, key2Value);
operations.set(key3, key3Value);
operations.set(key4, key4Value);
}
@Test
void test1() {
assertNull(this.stringRedisTemplate.keys(keyPrefix + "*"));
this.reset();
assertTrue(this.stringRedisTemplate.hasKey(key1));
assertTrue(!this.stringRedisTemplate.hasKey(noExistsKey));
assertEquals(2, this.stringRedisTemplate.countExistingKeys(Arrays.asList(key1, key2)));
assertEquals(1, this.stringRedisTemplate.countExistingKeys(Arrays.asList(key1, noExistsKey)));
assertEquals(4, this.stringRedisTemplate.keys(keyPrefix + "*").size());
assertEquals(DataType.STRING, stringRedisTemplate.type(key1));
}
/**
* redis删除键的方式包含:del和unlink
* <p>
* |------------------|-----------------|
* | del | unlink |
* |------------------|-----------------|
* | 同步 | 异步 |
* |------------------|-----------------|
* | 不会释放已分配内存 | 会释放已分配内存 |
* |------------------|-----------------|
*/
@Test
void testDelete() {
this.reset();
// delete
assertTrue(this.stringRedisTemplate.delete(key1));
assertTrue(!this.stringRedisTemplate.delete(noExistsKey));
assertEquals(2, this.stringRedisTemplate.delete(Arrays.asList(key2, key3, noExistsKey)));
// unlink
this.reset();
assertTrue(this.stringRedisTemplate.unlink(key1));
assertTrue(!this.stringRedisTemplate.unlink(noExistsKey));
assertEquals(2, this.stringRedisTemplate.unlink(Arrays.asList(key2, key3, noExistsKey)));
}
@Test
void testRename() {
this.reset();
String tmpKey1 = "string-tmp-key1";
String tmpKey2 = "string-tmp-key2";
String tmpKey3 = "string-tmp-key3";
this.stringRedisTemplate.rename(noExistsKey, tmpKey1);
this.stringRedisTemplate.rename(key1, tmpKey2);
assertTrue(!this.stringRedisTemplate.hasKey(key1));
assertTrue(this.stringRedisTemplate.hasKey(tmpKey2));
assertTrue(!this.stringRedisTemplate.renameIfAbsent(tmpKey2, tmpKey3));
assertTrue(this.stringRedisTemplate.renameIfAbsent(tmpKey3, key1));
assertEquals(1, this.stringRedisTemplate.delete(Arrays.asList(key1, noExistsKey)));
}
@Test
void testExpire() {
this.reset();
assertEquals(-1, this.stringRedisTemplate.getExpire(key1));
this.stringRedisTemplate.expire(key1, 10, TimeUnit.SECONDS);
assertEquals(10, this.stringRedisTemplate.getExpire(key1));
this.stringRedisTemplate.expire(key1, Duration.ofSeconds(20));
assertEquals(20, this.stringRedisTemplate.getExpire(key1));
this.stringRedisTemplate.persist(key1); // 设置永不过期
assertEquals(-1, this.stringRedisTemplate.getExpire(key1));
}
}
3.6.3 事务
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.ValueOperations;
import java.util.List;
@SpringBootTest
public class TransactionsTest {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Resource(name = "redisTemplate")
private ValueOperations<String, Integer> valueOperations;
@Test
void test() {
valueOperations.set("a", 100);
valueOperations.set("b", 100);
List<Long> execute = redisTemplate.execute(new SessionCallback<>() {
@Override
public List<Long> execute(RedisOperations operations) throws DataAccessException {
operations.multi(); // Mark the start of a transaction block.
//operations.watch("a"); // 乐观锁。watch某个key,当该key被其它客户端改变时,则中断当前的操作
ValueOperations<String, Long> valueOperations = operations.opsForValue();
valueOperations.increment("a", 10);
valueOperations.decrement("b", 10);
return operations.exec(); // Executes all queued commands in a transaction started with multi().
}
});
Assertions.assertEquals(110, execute.get(0));
Assertions.assertEquals(90, execute.get(1));
}
}
3.6.4 pub/sub
- RedisBeanConfig
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.nio.charset.StandardCharsets;
@Configuration
public class RedisBeanConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);// 连接工厂
template.setKeySerializer(new StringRedisSerializer(StandardCharsets.UTF_8)); // key序列化
template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); // value序列化
template.setHashKeySerializer(new StringRedisSerializer(StandardCharsets.UTF_8)); // hash序列化
template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer()); // hash序列化
template.setEnableTransactionSupport(true); // 启用事务支持
template.afterPropertiesSet(); // 初始化RedisTemplate的一些参数设置。如果不执行此方法,可能会报一些莫名其妙的错误,那应该就是部分参数没有初始化造成的
return template;
}
@Bean
MessageListenerAdapter messageListener() {
return new MessageListenerAdapter(new RedisMessageSubscriber());
}
@Bean
RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(new MessageListenerAdapter(messageListener(), "handleMessage"), ChannelTopic.of("channal_a"));
return container;
}
}
- RedisMessageSubscriber
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Service
public class RedisMessageSubscriber implements MessageListener {
public static List<Message> ALL_MESSAGES = new ArrayList<>();
@Override
public void onMessage(Message message, byte[] pattern) {
ALL_MESSAGES.add(message);
}
}
- MQTest
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.RedisTemplate;
@SpringBootTest
public class MQTest {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RedisMessageSubscriber redisMessageSubscriber;
@Test
public void testSend() {
String channel = "channal_a";
String message = "MESSAGE";
long numberOfClients = redisTemplate.convertAndSend(channel, message);
System.out.println(String.format("numberOfClients = %s", numberOfClients));
Message msg = null;
long start = System.currentTimeMillis();
long end = start;
boolean exit = false;
while (!exit) {
try {
Thread.sleep(1000);
if (RedisMessageSubscriber.ALL_MESSAGES.size() > 0) {
msg = RedisMessageSubscriber.ALL_MESSAGES.remove(0);
System.out.println(String.format("==================================\nReceived Message: \nchannel=%s, \nbody=%s",
new String(msg.getChannel(), "utf-8"), new String(msg.getBody(), "utf-8")));
exit = true;
}
end = System.currentTimeMillis();
if ((end - start) / 1000 >= 30) {
exit = true;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}