Class: MqttService

Inherits:
Object
  • Object
show all
Includes:
SlackRubyBot::Loggable
Defined in:
src/services/mqtt_service.rb

Instance Method Summary collapse

Constructor Details

#initialize(mqtt_uri) ⇒ MqttService

Returns a new instance of MqttService



8
9
10
11
12
# File 'src/services/mqtt_service.rb', line 8

def initialize(mqtt_uri)
  @mqtt_uri = mqtt_uri
  @topics = []
  @on_message = ->() {}
end

Instance Method Details

#connected?Boolean

Returns:

  • (Boolean)


14
15
16
# File 'src/services/mqtt_service.rb', line 14

def connected?
  @client && @client.connected?
end

#on_message(&block) ⇒ Object



18
19
20
# File 'src/services/mqtt_service.rb', line 18

def on_message(&block)
  @on_message = block
end

#publish(topic, payload) ⇒ Object



22
23
24
# File 'src/services/mqtt_service.rb', line 22

def publish(topic, payload)
  @client.publish topic, payload
end

#start_listeningObject



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'src/services/mqtt_service.rb', line 26

def start_listening
  promise = Promise.new

  Thread.new do
    # TODO: How to do graceful shutdown here?
    loop do
      begin
        MQTT::Client.connect @mqtt_uri do |client|
          logger.info 'Connected to MQTT broker'

          @client = client

          @topics.each do |topic|
             topic
          end

          promise.resolve

          @client.get do |topic, message|
            @on_message.call(topic, message)
          end
        end
      rescue => error
        logger.error 'MQTT Error'
        logger.error error.message
        logger.error error.backtrace
      ensure
        @client = nil
        sleep 5
      end
    end
  end

  promise.await
end

#subscribe(*topics) ⇒ Object



62
63
64
65
# File 'src/services/mqtt_service.rb', line 62

def subscribe(*topics)
  @topics += topics
   topics if connected?
end