Saturday, January 12, 2013

NetRexx, Raspberry Pi and MQTT

It is more affordable, easier and quicker than ever to acquire, install and use a system that would still have cost a lot of time and money some years ago. A $35 Raspberry Pi, a card-size computer (bring your own keyboard and screen, until ssh is enabled by default by the distributions) and a message broker running on it can do impressive things. I am toying around with some security cameras, only, seeing the ease with which they can be compromised, I do not run them exposed to the internet, just use a Raspberry Pi for tunneling their http over ssh, and some software to control them, capture events and send them to a message broker that runs overseas, so I can have a good near-realtime view over what happens. As an indication how quickly the developments are, I had to revise a chapter in the NetRexx Programmer's Guide, that was written after the 3.01 release, already twice for the impending 3.02 release. The version of Linux running on the Raspberry, aptly named 'Raspbian' because it is a Debian for the Raspberry, needed to be compiled with the soft-float api because the Oracle Embedded Java version required it - no longer with the preview for version 8. This prompted me to refresh the install of one of my two Raspberries. I am happy to be able to state here that it took less time than ever to refresh an OS and my whole development environment. This is due to at least three facts: 1) The OS images are ready to run and only have to be copied to a SDHC card - I use SanDisk Ultra 16GB cards which are cheap and run well with the Raspi. 2) The instructions provided by nice people on their blogs are excellent. Oracle's VM still runs rings around the others, speed-wise, so this was an easy choice. I used this one: http://www.savagehomeautomation.com/projects/raspberry-pi-installing-oracle-java-se-8-with-javafx-develop.html which is good and to the point. You can skip the GUI instructions for writing the SD card and just use dd - but I presume you already did if you are in this stage. The JDK is installed like this: scp it to the raspi sudo mkdir -p -v /opt/java
tar xvzf ~/jdk-8-ea-b36e-linux-arm-hflt-29_nov_2012.tar.gz (the filename changed)
sudo mv -v ~/jdk1.8.0 /opt/java
sudo update-alternatives --install "/usr/bin/java" "java" "/opt/java/jdk1.8.0/bin/java" 1
sudo update-alternatives --set java /opt/java/jdk1.8.0/bin/java
Actually, it runs if you only run the tar step, but the added tidiness in telling apt that you are running this is excellent. 3) I have all my stuff in Git so I can easily move it everywhere I am. Installing mosquitto For mosquitto, the open source mqtt broker (at http://http://mosquitto.org), there is also a debian package (only for the hard-float Raspbian, another reason to move to this, having previously run Java 7 on the soft-float Wheezy distribution). This boils down to (on the raspi): wget http://repo.mosquitto.org/debian/mosquitto-repo.gpg.key
sudo apt-key add mosquitto-repo.gpg.key
cd /etc/apt/sources.list.d/
sudo wget http://repo.mosquitto.org/debian/mosquitto-repo.list
sudo apt-get update
sudo apt-get install mosquitto
That is it - a message broker, programmable in NetRexx (on the Java Runtime) for $35, an SDcard and half an hour of work. The new NetRexx version 3.02 has examples on how to use mqtt (and also WebSphere MQ, to which it connects, by the way). The examples are short, so I can publish them here:
import java.sql.Timestamp
import org.eclipse.paho.client.mqttv3.

class Publish implements MqttCallback
  
  method Publish()
    conOpt    = MqttConnectOptions()
    conOpt.setCleanSession(0)
    tmpDir    = System.getProperty("java.io.tmpdir")
    dataStore = MqttDefaultFilePersistence(tmpDir)
    clientId  = MqttClient.generateClientId()
    topicName = "/world"
    payload   = "hello".toString().getBytes()
    qos        = 2    

    do
      broker  = "localhost"
      port  = "1883"
      brokerUrl  = "tcp://"broker":"port
      client  = MqttClient(brokerUrl,clientId, dataStore)
      client.setCallback(this)
    catch e=mqttException
      say e.getMessage()
      e.printStackTrace()
    end -- do

    client.connect()
    log("Connected to "brokerUrl" with client ID "client.getClientId())

    -- Get an instance of the topic
    topic = client.getTopic(topicName)
    
    message = MqttMessage(payload)
    message.setQos(qos)
  
    -- Publish the message
    time = Timestamp(System.currentTimeMillis()).toString()
    log('Publishing at: 'time' to topic "'topicName'" with qos 'qos)
    token = topic.publish(message)
    
    -- Wait until the message has been delivered to the server
    token.waitForCompletion()
    
    -- Disconnect the client
    client.disconnect()
    log("Disconnected")
  

  method log(line)
    say line
    
  method messageArrived(t=MqttTopic,m=MqttMessage)
    log("Message Arrived: " t m)
    
  method deliveryComplete(t=MqttDeliveryToken)
    log("Delivery Complete: " t)

  method connectionLost(t=Throwable)
    log("Connection Lost:" t.getMessage())

  method main(args=String[]) static
    Publish()
And the corresponding Subscriber is like this:
import java.sql.Timestamp
import org.eclipse.paho.client.mqttv3.




class Subscribe implements MqttCallback




  properties
  client = MqttClient
  conOpt    = MqttConnectOptions()
  tmpDir    = System.getProperty("java.io.tmpdir")
  clientId  = MqttClient.generateClientId()
  topicName = "/world"
  qos        = 2    
  
  method Subscribe()
    do
      connectAndSubscribe()
    catch mqx=MqttException
      log(mqx.getMessage())
    end
    -- Block until Enter is pressed
    log("Press <Enter> to exit");
    do
      System.in.read()
    catch IOException
    end
    
    -- Disconnect the client
    client.disconnect()
    log("Disconnected")




  method connectAndSubscribe() signals MqttSecurityException,MqttException,MqttPersistenceException
    conOpt.setCleanSession(1)
    dataStore = MqttDefaultFilePersistence(tmpDir)
    do
      broker    = "localhost"
      port    = "1883"
      brokerUrl    = "tcp://"broker":"port
      client = MqttClient(brokerUrl,clientId, dataStore)
      client.setCallback(this)
    catch e=mqttException
      say e.getMessage()
      e.printStackTrace()
    end -- do
    
    this.client.connect()
    log("Connected to "brokerUrl" with client ID "client.getClientId())
    
    -- Subscribe to the topic
    log('Subscribing to topic "'topicName'" qos 'qos)
    this.client.subscribe(topicName, qos)




  method log(line)
    say line
    
  method messageArrived(t=MqttTopic,m=MqttMessage)
    log("Message Arrived: " t m)
    
  method deliveryComplete(t=MqttDeliveryToken)
    log("Delivery Complete: " t)




  method connectionLost(t=Throwable)
    do
    connectAndSubscribe()
    catch mqx=MqttException
      log(mqx.getMessage())
    end
    
  method main(args=String[]) static
    Subscribe()
Both work with the current Paho Java client library, to be gotten at: http://www.eclipse.org/paho/