Tags

redhat
employment
ripple
interfaces
ncurses
ruby
refs
filesystems
retro gaming
raspberry pi
sinatra
3d printing
nethack
gcc
compiler
fedora
virtfs
project
gaming
vim
grep
sed
aikido
philosophy
splix
android
lvm
storage
bitcoin
projects
sig315
miq
db
polisher
meditation
hopex
conferences
omega
simulator
bundler_ext
rubygems
book review
google code in
isitfedoraruby
svn
gsoc
design patters
jsonrpc
rjr
aeolus
ohiolinuxfest
rome
europe
travel
brno
gtk
python
puppet
conference
fudcon
snap
html5
tips
ssh
linux
hardware
libvirt
virtualization
engineering expo
cloud
rpm
yum
rake
redmine
plugins
screencasting
jruby
fosscon
pidgin
gnome-shell
distros
notacon
presentation
rails
deltacloud
apache
qmf
passenger
syrlug
hackerspace
music
massive attack
crypto
backups
vnc
xsd
rxsd
x3d
mercurial
ovirt
qpid
webdev
haikus
poetry
legaleese
jquery
selenium
testing
xpath
git
sshfs
svg
ldap
autotools
pygtk
xmlrpc
slackware

Oct 27 2009 apache qpid

Getting Started With Apache Qpid

Apache Qpid is an implementation of AMQP - Advanced Message Queuing Protocol

AMQP is a cross platform / cross language messaging protocol

A central broker is established which manages exchanges and queues.

Note while exchanges are currently a big part of AMQP, they are going away w/ the upcoming AMQP/1.0 spec. Regardless as they are currently integral to using Qpid, I'm going to discuss them here.

Exchanges are the named target entities which messages are sent, they get mapped to queues through bindings. Queues are also named and they queue up messages so that clients can subscribe and/or pop messages off. Both Exchanges and Queues have optional properties which if set alter the interal operations of the entity.

Messages which get passed to exchanges and accumulate in queues consist of a header, containing a number of optional properties, and a body containing the actual data.

A host connecting to the broker first establishes the underlying tcp connection on top of which 'channels' or logical connections to the broker are established. A channel should be established for every thread / concurrent entity a client uses to communicate with the broker.

Qpid is The Apache Foundation's implementation of AMQP. It's aiming to be 100% AMQP standard complaint. It is licensed under the Apache License, eg it is open source, but not copyleft, meaning you can interface and use it via any software of any license of your chooising.

qmf is a remoting framework built ontop of qpid that allows you to call methods on remote objects. I'm not going to go into qmf in this article.

To setup what you need to use Qpid on Fedora, follow these steps:

  1. Install the broker: yum install qpidd
  2. sudo service qpidd start # there seems to be several ways to configure qpidd See this, be warned though that some means will vary w/ the qpidd that is packaged w/ Fedora ('man qpidd' and see the init script and sysconf for more information)
  3. To start qpidd manually, first stop the service via 'service qpidd stop' and then run (as root) 'sudo -u qpidd /usr/sbin/qpidd --pid-dir /var/run/qpidd --data-dir /var/lib/qpidd -t'. The "-t" results in _alot_ of output being written to stdout.
  4. Install library bindings for whatever language you want to use: yum install qpidc-devel python-qpid ruby-qpid
  5. At this point your ready to write your software to interface w/ qpid, obviously how you do this depends on the api for particular language. I'm going to implement an example in ruby but the terminology should be the same for the most part regardless of your language of choice

In this ruby example, we will setup a traditional request-response model, eg with a server listening for requests and returning responses. Note this is only one of many exchange types which to follow once you start playing around with the api. (much of this example comes from a ruby and a C++ example)

All you need to do to run this is download the server.rb and client.rb files below (also attached to this article). Run the server first and then the client and view the STDOUT output for each.

server.rb

# setup a 'server' qpid endpoint

require "qpid"
require "socket"

################################## establish the connection to the broker 

broker = "localhost"
port = 5672

conn = Qpid::Connection.new(TCPSocket.new(broker, port))
conn.start(10)


################################## setup a session / queue / exchange

ssn = conn.session("server")

# create the server request exchange
ssn.exchange_declare("request-exchange", :type => "direct")

# create the server request queue
ssn.queue_declare("request-queue")

# create the server request exchange
ssn.exchange_bind(:exchange => "request-exchange", 
                  :queue => "request-queue")
                  

# subscribe to messages coming in on the queue.
# this tells the session to place any messages in request-queue 
# into a local buffer named 'messages' which we can later retrieve.
ssn.message_subscribe(:destination => "messages", :queue => "request-queue",
                      :accept_mode => ssn.message_accept_mode.none)

# we grab a handle to the 'messages' buffer here
incoming = ssn.incoming("messages")

################################## receive messages

# start incoming message flow
incoming.start()

puts "receiving messages"

# probably should have some exit condition
while true

# grab a message from the queue 
request = incoming.get(10)

# grab request body / extract what we need
s = request.body
puts "Message received " + s
s.slice!(0..3) # slice the 'syn ' out (eg handle the message however you want to)

################################## send the response message

# get the reply_to field the message specifies 
# note often you won't need / care about the reply_to field and it won't be set
if request.get(:message_properties).reply_to.nil?
  raise RuntimeError("received message doesn't specify reply field")
end

# need this to send the response
routing_key = request.get(:message_properties).reply_to.routing_key
puts "!!!" + routing_key.to_s

# construct response message and send it
response_body= 'ack ' + s
dp = ssn.delivery_properties(:routing_key => routing_key)
mp = ssn.message_properties(:content_type => "text/plain")
response = Qpid::Message.new(dp, mp, response_body)
puts "sending response " + response_body
ssn.message_transfer(:message => response,
                     :destination => "request-exchange")
                     

end

################################## terminate operations

# cancel the subscription and close the session and connection
ssn.message_cancel(:destination => "messages")
ssn.close()
conn.close()

puts "server finished"


client.rb

# setup a 'client' qpid endpoint

require "qpid"
require "socket"

# generate a random id for the client
client_id = rand(100).to_s
#exchange  = "client" + client_id + "-exchange"
exchange="request-exchange"
queue     = "client" + client_id + "-queue"

puts "client " + client_id + " started"

################################## establish the connection to the broker 

broker = "localhost"
port = 5672

conn = Qpid::Connection.new(TCPSocket.new(broker, port))
conn.start(10)

################################## setup a session / queue / exchange

ssn = conn.session("client")

# create an queue / exchange which to receive replies
#ssn.exchange_declare(exchange, :type => "direct")
ssn.queue_declare(queue)
ssn.exchange_bind(:exchange => exchange, 
                  :queue => queue,
                  :binding_key => queue)

# subscribe to messages coming in the response queue
ssn.message_subscribe(:destination => "client-messages", :queue => queue,
                      :accept_mode => ssn.message_accept_mode.none)

# handle message received event asynchronously
incoming = ssn.incoming("client-messages")
incoming.start
incoming.listen { |msg|
  puts "Response received " + msg.body
}

################################## send request message

# create request message
dp = ssn.delivery_properties(:routing_key => "request-queue")
mp = ssn.message_properties( :content_type => "text/plain")
rp = ssn.message_properties( :reply_to => 
                              ssn.reply_to(exchange, queue))
msg = Qpid::Message.new(dp, mp, rp, "syn " + client_id)

# send it
ssn.message_transfer(:message => msg)

################################## terminate operations

# wait a little time for a response
# FIXME do this via a lock
sleep(5)

# cancel the subscription and close the session and connection
ssn.message_cancel(:destination => "messages")
ssn.close()
conn.close()

puts "client finished"

Disclaimer, I'm pretty new to all this myself so there might be better / more efficient ways of doing things. But these examples should work and demonstrate the basic functionality of qpid. From here, all you need is to deteremine the model which you want to communicate between endpoints and leverage the qpid API to do that (obviously one of the most powerful features of AMQP / QPID is its language-agnosticity, different endpoints can be written in different languages, even a javascript client was recently written).

Hope this helps anyone getting started with AMQP / Qpid like I am. Look for more posts on the subject as I start learning the intrices of the API.

Useful links: AMQP Terminology (a MUST read if you intend to use Qpid) Official Qpid Documentation Good ruby/qpid example Good informantion C++ QPID API / and examples Python QPID API / and examples Ruby QPID API which I'm hosting locally since no official version is on the Apache Qpid site (abliet it's a bit lacking in comments) / and examples