Subscribing to Redis Channels with Java, Spring Boot, and Lettuce
The source code for what follows can be found on Github.
Pub/Sub in redis allows a publisher to send things to subscribers without knowing who is actually subscribed. In a previous post, we covered a simple unit test for publishing and subscribing to lettuce, but if you want to have a subscription initialized on application startup, and respond to events, we’ll have to do a bit more, which I’ll demonstrate here.
Subscribing on Application Startup
We will want to make sure we have the right configuration to connect to redis using lettuce with something like:
@Configuration
public class RedisConfig {
@Bean("redis-primary-client")
public RedisClient redisClient(RedisPrimaryConfig redisPrimaryConfig) {
return RedisClient.create(
// adjust things like thread pool size with client resources
ClientResources.builder().build(),
"redis://" + redisPrimaryConfig.getHost() + ":" + redisPrimaryConfig.getPort()
);
}
}
Where our RedisPrimaryConfig looks like:
@Configuration
@ConfigurationProperties(prefix = "redis-primary")
public class RedisPrimaryConfig {
private String host;
private Integer port;
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
}
And our application.yml has the host and port [this example is a locally redis instance]:
redis-primary:
host: 127.0.0.1
port: 6379
We can then add our RedisPubSubReactiveCommands bean to our RedisConfig configuration class:
@Bean("redis-subscription-commands")
public RedisPubSubReactiveCommands<String, String> redisPubSubReactiveCommands(RedisClient redisClient) {
return redisClient.connectPubSub().reactive();
}
With the boilerplate out of the way, we can finally leverage @PostConstruct to subscribe to one or more redis channels of our choosing, just after we initialize our IoC container and just before the application finishes starting up:
@Service
public class RedisSubscriptionInitializer {
private final Logger LOG = LoggerFactory.getLogger(RedisSubscriptionInitializer.class);
private final RedisPubSubReactiveCommands<String, String> redisPubSubReactiveCommands;
public RedisSubscriptionInitializer(RedisPubSubReactiveCommands<String, String> redisPubSubReactiveCommands) {
this.redisPubSubReactiveCommands = redisPubSubReactiveCommands;
}
@PostConstruct
public void setupSubscriber() {
redisPubSubReactiveCommands.subscribe("channel-1").subscribe();
redisPubSubReactiveCommands.observeChannels().doOnNext(stringStringChannelMessage -> {
if ("channel-1".equals(stringStringChannelMessage.getChannel())) {
LOG.info("found message in channel 1: {}", stringStringChannelMessage.getMessage());
}
}).subscribe();
}
}
In this case, we’re just logging all the messages we get from channel-1, you could obviously introduce whatever code you want there [you could also do something other than doOnNext, for example flatMap].
If I start up this application and have my local redis instance up and running, I can:
$redis-cli publish channel-1 some-message-1
(integer) 1
$redis-cli publish channel-1 some-message-2
(integer) 1
Note that the response indicates how many subscribers the message was delivered to. I can then cross check the logs on my application:
[llEventLoop-5-2] c.n.r.s.RedisSubscriptionInitializer : found message in channel 1: some-message-1
[llEventLoop-5-2] c.n.r.s.RedisSubscriptionInitializer : found message in channel 1: some-message-2
Then, if I SIGINT the application and try sending another message, I will see that it delivers it to zero subscribers:
$ redis-cli publish channel-1 some-message-3
(integer) 0
So this should be a good starting point for you.