txsrv: Message-based Twisted Services

I've started work on txsrv, a Python library that aims to make developing message-based services in Twisted easy.

The code is still pretty rough, but might be of interest to someone hacking on AMQP-based Twisted services.

Sample Usage

  1. Create a new service and change to the project directory.

    $ txsrv create mytest
    $ cd mytest
    
  2. Edit the mytest.conf file so that the spec_file option is valid.

    [connection:amqp]
    type = amqp
    host = localhost
    port = 5672
    vhost = /
    user = guest
    password = guest
    spec_file = /usr/share/amqp/amqp.0-8.xml
    
    [handler:hello]
    connection = amqp
    exchange = hello
    routing_key = default
    
  3. Edit the mytest.py file so that it prints the message body twice.

    import txsrv
    
    class Service(txsrv.Service):
        @txsrv.handler('hello')
        def hello(self, message):
            print message.body * 2
    
    class ServiceMaker(txsrv.ServiceMaker):
        tapname = 'mytest'
        description = 'A mytest txsrv example.'
        service_type = Service
    
  4. Start the mytest service and send a message to the hello exchange.

    $ twistd -n mytest -c mytest.conf
    2010-09-12 00:41:10-0400 [-] Log opened.
    2010-09-12 00:41:10-0400 [-] twistd 10.1.0 (/usr/bin/python 2.6.4) starting up.
    2010-09-12 00:41:10-0400 [-] reactor class: twisted.internet.selectreactor.SelectReactor.
    2010-09-12 00:41:10-0400 [-] Starting factory <txsrv.protocol.amqp.AmqpFactory instance at 0x2cce950>
    2010-09-12 00:41:10-0400 [AmqpProtocol,client] hellohello
    

Getting Started with AMQP, Qpid, Python & Fedora

The Advanced Message Queuing Protocol (AMQP) is the first open standard for Enterprise Messaging and has been adopted by Cisco, [Microsoft][microsoft], Novel, Red Hat and others.

One implementation of AMQP currently packaged by Red Hat and Fedora is Qpid. Qpid is a collection of brokers (C++, Java) and clients (C++, Java, Python, Ruby) which aims to be 100% AMQP Compliant.

Below is a step-by-step tutorial on getting started with Qpid and the Python client in Fedora.

  1. Install the Qpid broker and Python client

    yum install qpidd python-qpid
    
  2. Start the broker

    service qpidd start
    
  3. Create a message producer (producer.py)

    from qpid.connection import Connection
    from qpid.datatypes import Message, uuid4
    from qpid.util import connect
    
    # Create connection and session
    socket = connect('localhost', 5672)
    connection = Connection(sock=socket, username='guest', password='guest')
    connection.start()
    session = connection.session(str(uuid4()))
    
    # Setup queue
    session.queue_declare(queue='message_queue')
    session.exchange_bind(exchange='amq.direct', queue='message_queue', binding_key='routing_key')
    
    # Setup routing properties
    properties = session.delivery_properties(routing_key='routing_key')
    
    # Send messages
    session.message_transfer(destination='amq.direct', message=Message(properties, 'Message one'))
    session.message_transfer(destination='amq.direct', message=Message(properties, 'Message two'))
    session.message_transfer(destination='amq.direct', message=Message(properties, 'Done'))
    
    # Close session
    session.close(timeout=10)
    
  4. Create a message consumer (consumer.py)

    from qpid.connection import Connection
    from qpid.datatypes import RangedSet, uuid4
    from qpid.util import connect
    
    # Create connection and session
    socket = connect('localhost', 5672)
    connection = Connection(sock=socket, username='guest', password='guest')
    connection.start()
    session = connection.session(str(uuid4()))
    
    # Define local queue
    local_queue_name = 'my_local_queue'
    
    # Create local queue
    queue = session.incoming(local_queue_name)
    
    # Route messages from message_queue to my_local_queue
    session.message_subscribe(queue='message_queue', destination=local_queue_name)
    queue.start()
    
    content = ''
    
    while content != 'Done':
        # Get message from the local queue
        message = queue.get(timeout=10)
        # Get body of the message
        content = message.body
        # Accept message (removes it from the queue)
        session.message_accept(RangedSet(message.id))
        # Print message content
        print content
    
    # Close session
    session.close(timeout=10)
    
  5. First run the producer and then the consumer

    Message one
    Message two
    Done
    

Source: Qpid direct example