1 简介

Redis(Remote Dictionary Server)是一个开源的高性能键值对存储系统,具有快速、灵活和可扩展的特性。它是一个基于内存的数据结构存储系统,可以用作数据库、缓存和消息代理等。

2 主要特点

  • **高性能:**Redis 数据存储在内存中,因此能够提供极快的读写操作。它采用单线程模型和异步 I/O,避免了多线程的竞争和阻塞,从而达到了非常高的性能。
  • **数据结构多样:**Redis 支持多种数据结构,包括字符串(String)、哈希(Hash)、列表(List)、集合(Set)和有序集合(Sorted Set)。这些数据结构提供了丰富的操作命令,使得开发者可以方便地处理各种数据需求。
  • **持久化支持:**Redis 提供了两种持久化方式,即快照(Snapshotting)和日志追加(Append-only file,AOF)。快照方式将 Redis 内存数据以二进制格式写入磁盘,而 AOF 则通过追加记录 Redis 的操作命令来实现持久化。
  • **发布/订阅:**Redis 支持发布/订阅模式,可以用作消息代理。发布者将消息发送到指定的频道,订阅者则可以接收和处理这些消息。这种模式在构建实时通信、事件驱动系统和消息队列等场景中非常有用。
  • **分布式缓存:**Redis可以通过主从复制和分片来实现数据的分布式存储和高可用性。主从复制可以将数据复制到多个从节点,实现读写分离和数据备份。而分片则可以将数据分布在多个Redis节点上,实现横向扩展和负载均衡。
  • **事务支持:**Redis 支持事务,开发者可以将多个操作组合成一个原子性的操作序列,保证这些操作要么全部执行成功,要么全部不执行。
  • **功能丰富:**Redis不仅仅是一个简单的缓存,它还提供了许多其他功能,如事务支持、Lua脚本执行、定时任务、原子操作等。这使得开发者可以在Redis中实现更复杂的应用逻辑。

3 spring-data-redis

3.1 yml配置

3.1.1 通用配置

spring:
  data:
    redis:
      client-name: RName # 设置到CLIENT SETNAME的名称。Redis 的 CLIENT SETNAME 命令用于为当前连接分配一个名字。 这个名字会显示在 CLIENT LIST 命令的结果中, 用于识别当前正在与服务器进行连接的客户端
      client-type: lettuce  # 客户端类型
      #url: redis://username:password@host:port # 使用url来代替 host, port, username, and password的配置
      username: admin  # 用户名
      password: 123456 # 密码
      database: 0 # 数据库
      connect-timeout: 1000  # 连接超时。单位:ms
      timeout: 1000  # 读取超时。单位:ms
      repositories:
          enabled: true # 是否启用Redis存储库

      ssl:
        bundle: # SSL bundle name.
        enabled: true # 是否启用ssl

      jedis:   # 使用jedis客户端
        pool:
          enabled: true # 是否启用连接池
          max-active: 8  # 最大连接数。负数表示不限制
          max-idle: 8  # 最大空闲数。负数表示不限制
          min-idle: 0  # 最小空闲数。必须为正数
          max-wait: -1ms  # 当池耗尽时,连接分配在引发异常之前应阻塞的最长时间,负数表示不限制
          time-between-eviction-runs: # 空闲对象逐出器线程运行之间的时间。 当为正时,空闲对象逐出线程启动,否则不执行空闲对象逐出

      lettuce: # 使用lettuce客户端
        pool:
          enabled: true # 是否启用连接池
          max-active: 8  # 最大连接数。负数表示不限制
          max-idle: 8  # 最大空闲数。负数表示不限制
          min-idle: 0  # 最小空闲数。必须为正数
          max-wait: -1ms  # 当池耗尽时,连接分配在引发异常之前应阻塞的最长时间,负数表示不限制
          time-between-eviction-runs: # 空闲对象逐出器线程运行之间的时间。 当为正时,空闲对象逐出线程启动,否则不执行空闲对象逐出
        shutdown-timeout: 100ms # 关闭连接超时时间

3.1.2 专有配置

3.1.2.1 单机模式

spring:
  data:
    redis:
      host: 192.168.1.11  # 地址
      port: 6379 # 端口

3.1.2.2 主从复制模式(哨兵模式)

spring:
  data:
    redis:
      sentinel:
        master: master # 哨兵的sentinel.conf配置文件中的主节点名称
        nodes: 192.168.1.11:26379, 192.168.1.12:26379, 192.168.1.13:26379 # 哨兵节点
        username: admin # 哨兵用户名
        password: 123456 # 哨兵密码

3.1.2.3 集群模式

spring:
  data:
    redis:
      cluster:
        nodes: 192.168.1.11:6379, 192.168.1.12:6379, 192.168.1.13:6379, 192.168.1.11:6380, 192.168.1.12:6380, 192.168.1.13:6380
        max-redirects: 3 # 同重向的最大次数。第一台服务不可用,会尝试第二台,以此类推

      lettuce:
        cluster:
          refresh:
            adaptive: true # 定时刷新Redis Cluster集群缓存,动态改变客户端的节点情况,完成故障转移
            dynamic-refresh-sources: true # 是否动态刷新拓扑源
            period: 1000 # 集群模式!需要设置群集拓扑刷新周期(毫秒)

3.2 RedisTemplate和StringRedisTemplate

/** 操作通用类型的对象 */
@Autowired
private RedisTemplate<String, Object> redisTemplate;

/** 操作String类型的对象 */
@Autowired
private StringRedisTemplate stringRedisTemplate;

RedisTemplate可以通过opsForXxx生成对应的类型的操作对象

// RedisTemplate 可以获取对应数据类型的 XxxOperations
ValueOperations<String, Object> objectValueOperations = redisTemplate.opsForValue();
HashOperations<String, String, Object> objectHashOperations = redisTemplate.opsForHash();
ListOperations<String, Object> objectListOperations = redisTemplate.opsForList();
SetOperations<String, Object> objectSetOperations = redisTemplate.opsForSet();
ZSetOperations<String, Object> objectZSetOperations = redisTemplate.opsForZSet();

同时,可以使用@Resource注解获取xxxOperations对象,且获取时可以将Object对象根据值类型换成具体的对象。注意,值类型为String时,name需要使用stringRedisTemplate

@Autowired
private StringRedisTemplate stringRedisTemplate;
private ValueOperations<String, String> stringValueOperations = stringRedisTemplate.opsForValue();

        ||

@Autowired
private StringRedisTemplate stringRedisTemplate;
@Resource(name = "stringRedisTemplate")
private ValueOperations<String, String> stringValueOperations;
@Autowired
private RedisTemplate<String, User> redisTemplate; // 注意:这里值类型必须为 User,不可以为 Object
private ValueOperations<String, User> userValueOperations = redisTemplate.opsForValue();

        ||

@Autowired
private RedisTemplate<String, Object> redisTemplate; // 注意:这里值类型可以为 Object
@Resource(name = "redisTemplate")
private ValueOperations<String, User> userValueOperations; // 注意:这里值类型为 User,变量名称为userXXX(user与类型User同名)
@Autowired
private RedisTemplate<String, Integer> redisTemplate; // 注意:这里值类型必须为 Integer,不可以为 
private ValueOperations<String, Integer> userValueOperations = redisTemplate.opsForValue();

        ||

@Autowired
private RedisTemplate<String, Object> redisTemplate; // 注意:这里值类型可以为 Object
@Resource(name = "redisTemplate")
private ValueOperations<String, Integer> integerValueOperations; // 注意:这里值类型为 Integer,变量名称为integerXXX(integer与类型Integer同名)

3.3 RedisTemplate主要public方法

public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperations<K, V>, BeanClassLoaderAware {
	// 
    public void setBeanClassLoader(ClassLoader classLoader);
    public void afterPropertiesSet(); // 初始化RedisTemplate的一些参数设置
    public void setExposeConnection(boolean exposeConnection);
    public void setEnableTransactionSupport(boolean enableTransactionSupport);
    public boolean isExposeConnection();

    public Cursor<K> scan(ScanOptions options);
    public List<RedisClientInfo> getClientList();
    public void killClient(String host, int port);
    public void replicaOf(String host, int port);
    public void replicaOfNoOne();

    // Serializer
    public boolean isEnableDefaultSerializer();
    public void setEnableDefaultSerializer(boolean enableDefaultSerializer);
    public void setDefaultSerializer(RedisSerializer<?> serializer);
    public RedisSerializer<?> getDefaultSerializer();

    public void setKeySerializer(RedisSerializer<?> serializer);
    public RedisSerializer<?> getKeySerializer();

    public void setStringSerializer(RedisSerializer<String> stringSerializer);
    public RedisSerializer<String> getStringSerializer();

    public void setValueSerializer(RedisSerializer<?> serializer);
    public RedisSerializer<?> getValueSerializer();

    public void setHashKeySerializer(RedisSerializer<?> hashKeySerializer);
    public RedisSerializer<?> getHashKeySerializer();
    public void setHashValueSerializer(RedisSerializer<?> hashValueSerializer);
    public RedisSerializer<?> getHashValueSerializer();

    // Operations
    public ValueOperations<K, V> opsForValue();
    public <HK, HV> HashOperations<K, HK, HV> opsForHash();
    public ListOperations<K, V> opsForList();
    public SetOperations<K, V> opsForSet();
    public ZSetOperations<K, V> opsForZSet();
    public ClusterOperations<K, V> opsForCluster();
    public GeoOperations<K, V> opsForGeo();
    public HyperLogLogOperations<K, V> opsForHyperLogLog();
    public <HK, HV> StreamOperations<K, HK, HV> opsForStream();
    public <HK, HV> StreamOperations<K, HK, HV> opsForStream(HashMapper<? super K, ? super HK, ? super HV> hashMapper);

    // boundXXXOps
    public BoundValueOperations<K, V> boundValueOps(K key);
    public <HK, HV> BoundHashOperations<K, HK, HV> boundHashOps(K key);
    public BoundListOperations<K, V> boundListOps(K key);
    public BoundSetOperations<K, V> boundSetOps(K key);
    public BoundZSetOperations<K, V> boundZSetOps(K key);
    public BoundGeoOperations<K, V> boundGeoOps(K key);
    public <HK, HV> BoundStreamOperations<K, HK, HV> boundStreamOps(K key);

    // key
    public Boolean hasKey(K key);
    public Long countExistingKeys(Collection<K> keys);
    public Set<K> keys(K pattern);
    public DataType type(K key);

    public Boolean delete(K key);
    public Long delete(Collection<K> keys);
    public Boolean unlink(K key);
    public Long unlink(Collection<K> keys);

    public void rename(K oldKey, K newKey);
    public Boolean renameIfAbsent(K oldKey, K newKey);
	
    public K randomKey();

    // expire
    public Boolean expire(K key, final long timeout, final TimeUnit unit);
    public Boolean expireAt(K key, final Date date);
    public Long getExpire(K key);
    public Long getExpire(K key, TimeUnit timeUnit);
    public Boolean persist(K key);

	// 数据迁移、备份和恢复
	public Boolean copy(K source, K target, boolean replace);
    public Boolean move(K key, final int dbIndex); // 将key移动到其他数据库dbIndex中

    public byte[] dump(K key);
    public void restore(K key, byte[] value, long timeToLive, TimeUnit unit, boolean replace);

    // sort
    public List<V> sort(SortQuery<K> query);
    public <T> List<T> sort(SortQuery<K> query, @Nullable RedisSerializer<T> resultSerializer);
    public <T> List<T> sort(SortQuery<K> query, BulkMapper<T, V> bulkMapper);
    public <T, S> List<T> sort(SortQuery<K> query, BulkMapper<T, S> bulkMapper, @Nullable RedisSerializer<S> resultSerializer);
    public Long sort(SortQuery<K> query, K storeKey);
    
	// transaction
    public void watch(K key);
    public void watch(Collection<K> keys);
    public void unwatch();
    public void multi();
    public List<Object> exec();
    public List<Object> exec(RedisSerializer<?> valueSerializer);

    // execute。执行原生/底层实现
    public void setScriptExecutor(ScriptExecutor<K> scriptExecutor);
    public <T> T execute(RedisCallback<T> action);
    public <T> T execute(RedisCallback<T> action, boolean exposeConnection);
    public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline);
    public <T> T execute(SessionCallback<T> session);
    public List<Object> executePipelined(SessionCallback<?> session);
    public List<Object> executePipelined(SessionCallback<?> session, @Nullable RedisSerializer<?> resultSerializer);
    public List<Object> executePipelined(RedisCallback<?> action);
    public List<Object> executePipelined(RedisCallback<?> action, @Nullable RedisSerializer<?> resultSerializer);
    public <T> T execute(RedisScript<T> script, List<K> keys, Object... args);
    public <T> T execute(RedisScript<T> script, RedisSerializer<?> argsSerializer, RedisSerializer<T> resultSerializer, List<K> keys, Object... args);
    public <T extends Closeable> T executeWithStickyConnection(RedisCallback<T> callback);

    //
    public void discard();
    public Long convertAndSend(String channel, Object message);
}

3.4 示例

3.4.1 pom

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.15.3</version>
</dependency>

3.4.2 yml

spring:
  data:
    redis:
      client-type: lettuce  # 客户端类型
      host: 192.168.1.10  # 地址
      port: 6379 # 端口
#      username: admin  # 用户名
#      password: 123456 # 密码
      database: 0 # 数据库
      connect-timeout: 1000  # 连接超时。单位:ms
      timeout: 1000  # 读取超时。单位:ms
      repositories:
        enabled: true # 是否启用Redis存储库

      lettuce: # 使用lettuce客户端
        pool:
          enabled: true # 是否启用连接池
          max-active: 8  # 最大连接数。负数表示不限制
          max-idle: 8  # 最大空闲数。负数表示不限制
          min-idle: 0  # 最小空闲数。必须为正数
          max-wait: -1ms  # 当池耗尽时,连接分配在引发异常之前应阻塞的最长时间,负数表示不限制
          time-between-eviction-runs: # 空闲对象逐出器线程运行之间的时间。 当为正时,空闲对象逐出线程启动,否则不执行空闲对象逐出
        shutdown-timeout: 100ms # 关闭连接超时时间

3.6.1 bean配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.nio.charset.StandardCharsets;

@Configuration
public class RedisBeanConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);// 连接工厂
        template.setKeySerializer(new StringRedisSerializer(StandardCharsets.UTF_8)); // key序列化
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); // value序列化
        template.setHashKeySerializer(new StringRedisSerializer(StandardCharsets.UTF_8)); // hash序列化
        template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer()); // hash序列化
        template.setEnableTransactionSupport(true); // 启用事务支持
        template.afterPropertiesSet(); // 初始化RedisTemplate的一些参数设置。如果不执行此方法,可能会报一些莫名其妙的错误,那应该就是部分参数没有初始化造成的
        return template;
    }
}

3.6.2 redisTemplate基本操作

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.core.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.*;

@SpringBootTest
public class BaseOperationsTest {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;


    //region key-value
    String keyPrefix = "string-";
    String noExistsKey = "abc";
    String key1 = keyPrefix + "key1";
    String key2 = keyPrefix + "key2";
    String key3 = keyPrefix + "key3";
    String key4 = keyPrefix + "key4";
    String key1Value = "STRING KEY 1";
    String key2Value = "STRING KEY 2";
    String key3Value = "STRING KEY 3";
    String key4Value = "STRING KEY 4";
    //endregion

    void reset() {
        this.stringRedisTemplate.delete(Arrays.asList(key1, key2, key3, key4));

        ValueOperations<String, String> operations = this.stringRedisTemplate.opsForValue();
        operations.set(key1, key1Value);
        operations.set(key2, key2Value);
        operations.set(key3, key3Value);
        operations.set(key4, key4Value);
    }

    @Test
    void test1() {
        assertNull(this.stringRedisTemplate.keys(keyPrefix + "*"));

        this.reset();

        assertTrue(this.stringRedisTemplate.hasKey(key1));
        assertTrue(!this.stringRedisTemplate.hasKey(noExistsKey));

        assertEquals(2, this.stringRedisTemplate.countExistingKeys(Arrays.asList(key1, key2)));
        assertEquals(1, this.stringRedisTemplate.countExistingKeys(Arrays.asList(key1, noExistsKey)));

        assertEquals(4, this.stringRedisTemplate.keys(keyPrefix + "*").size());

        assertEquals(DataType.STRING, stringRedisTemplate.type(key1));
    }

    /**
     * redis删除键的方式包含:del和unlink
     * <p>
     * |------------------|-----------------|
     * |       del        |      unlink     |
     * |------------------|-----------------|
     * |       同步        |       异步      |
     * |------------------|-----------------|
     * | 不会释放已分配内存  |  会释放已分配内存  |
     * |------------------|-----------------|
     */
    @Test
    void testDelete() {
        this.reset();

        // delete
        assertTrue(this.stringRedisTemplate.delete(key1));
        assertTrue(!this.stringRedisTemplate.delete(noExistsKey));

        assertEquals(2, this.stringRedisTemplate.delete(Arrays.asList(key2, key3, noExistsKey)));

        // unlink
        this.reset();
        assertTrue(this.stringRedisTemplate.unlink(key1));
        assertTrue(!this.stringRedisTemplate.unlink(noExistsKey));

        assertEquals(2, this.stringRedisTemplate.unlink(Arrays.asList(key2, key3, noExistsKey)));
    }

    @Test
    void testRename() {
        this.reset();

        String tmpKey1 = "string-tmp-key1";
        String tmpKey2 = "string-tmp-key2";
        String tmpKey3 = "string-tmp-key3";
        this.stringRedisTemplate.rename(noExistsKey, tmpKey1);
        this.stringRedisTemplate.rename(key1, tmpKey2);
        assertTrue(!this.stringRedisTemplate.hasKey(key1));
        assertTrue(this.stringRedisTemplate.hasKey(tmpKey2));

        assertTrue(!this.stringRedisTemplate.renameIfAbsent(tmpKey2, tmpKey3));
        assertTrue(this.stringRedisTemplate.renameIfAbsent(tmpKey3, key1));

        assertEquals(1, this.stringRedisTemplate.delete(Arrays.asList(key1, noExistsKey)));
    }

    @Test
    void testExpire() {
        this.reset();

        assertEquals(-1, this.stringRedisTemplate.getExpire(key1));

        this.stringRedisTemplate.expire(key1, 10, TimeUnit.SECONDS);
        assertEquals(10, this.stringRedisTemplate.getExpire(key1));

        this.stringRedisTemplate.expire(key1, Duration.ofSeconds(20));
        assertEquals(20, this.stringRedisTemplate.getExpire(key1));

        this.stringRedisTemplate.persist(key1); // 设置永不过期
        assertEquals(-1, this.stringRedisTemplate.getExpire(key1));
    }
}

3.6.3 事务

import jakarta.annotation.Resource;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.ValueOperations;
import java.util.List;

@SpringBootTest
public class TransactionsTest {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Resource(name = "redisTemplate")
    private ValueOperations<String, Integer> valueOperations;

    @Test
    void test() {
        valueOperations.set("a", 100);
        valueOperations.set("b", 100);
        List<Long> execute = redisTemplate.execute(new SessionCallback<>() {
            @Override
            public List<Long> execute(RedisOperations operations) throws DataAccessException {
                operations.multi(); // Mark the start of a transaction block.

                //operations.watch("a"); // 乐观锁。watch某个key,当该key被其它客户端改变时,则中断当前的操作

                ValueOperations<String, Long> valueOperations = operations.opsForValue();
                valueOperations.increment("a", 10);
                valueOperations.decrement("b", 10);
                return operations.exec(); // Executes all queued commands in a transaction started with multi().
            }
        });
        Assertions.assertEquals(110, execute.get(0));
        Assertions.assertEquals(90, execute.get(1));
    }
}

3.6.4 pub/sub

  • RedisBeanConfig
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.nio.charset.StandardCharsets;

@Configuration
public class RedisBeanConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);// 连接工厂
        template.setKeySerializer(new StringRedisSerializer(StandardCharsets.UTF_8)); // key序列化
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); // value序列化
        template.setHashKeySerializer(new StringRedisSerializer(StandardCharsets.UTF_8)); // hash序列化
        template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer()); // hash序列化
        template.setEnableTransactionSupport(true); // 启用事务支持
        template.afterPropertiesSet(); // 初始化RedisTemplate的一些参数设置。如果不执行此方法,可能会报一些莫名其妙的错误,那应该就是部分参数没有初始化造成的
        return template;
    }

    @Bean
    MessageListenerAdapter messageListener() {
        return new MessageListenerAdapter(new RedisMessageSubscriber());
    }

    @Bean
    RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(new MessageListenerAdapter(messageListener(), "handleMessage"), ChannelTopic.of("channal_a"));
        return container;
    }
}
  • RedisMessageSubscriber
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

@Slf4j
@Service
public class RedisMessageSubscriber implements MessageListener {
    public static List<Message> ALL_MESSAGES = new ArrayList<>();

    @Override
    public void onMessage(Message message, byte[] pattern) {
        ALL_MESSAGES.add(message);
    }
}
  • MQTest
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.RedisTemplate;

@SpringBootTest
public class MQTest {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    private RedisMessageSubscriber redisMessageSubscriber;

    @Test
    public void testSend() {
        String channel = "channal_a";
        String message = "MESSAGE";
        long numberOfClients = redisTemplate.convertAndSend(channel, message);
        System.out.println(String.format("numberOfClients = %s", numberOfClients));

        Message msg = null;
        long start = System.currentTimeMillis();
        long end = start;
        boolean exit = false;
        while (!exit) {
            try {
                Thread.sleep(1000);
                if (RedisMessageSubscriber.ALL_MESSAGES.size() > 0) {
                    msg = RedisMessageSubscriber.ALL_MESSAGES.remove(0);
                    System.out.println(String.format("==================================\nReceived Message: \nchannel=%s, \nbody=%s",
                             new String(msg.getChannel(), "utf-8"), new String(msg.getBody(), "utf-8")));
                    exit = true;
                }

                end = System.currentTimeMillis();
                if ((end - start) / 1000 >= 30) {
                    exit = true;
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
}