Source code for tuna.zeromq.zmq_client
"""
This module's scope is the ZeroMQ client.
"""
import logging
import zmq
[docs]class zmq_client ( object ):
"""
This class' responsibility is to define a ZeroMQ client that connects to a proxy to mediate communication with other clients.
Utilizes the `lazy pirate pattern <http://zguide.zeromq.org/page:all#Client-Side-Reliability-Lazy-Pirate-Pattern>`_.
"""
def __init__ ( self ):
self.log = logging.getLogger ( )
self.log.setLevel ( logging.DEBUG )
self.zmq_context = zmq.Context ( )
self.zmq_socket_req = None
self.poller = None
[docs] def close_socket ( self ):
"""
This method's goal is to gracefully close the client connection with the proxy.
"""
self.zmq_socket_req.setsockopt ( zmq.LINGER, 0 )
self.zmq_socket_req.close ( )
self.poller.unregister ( self.zmq_socket_req )
[docs] def send ( self, message ):
"""
This method's goal is to send a message to the proxy.
Parameters:
* message : string
Returns:
* string
Containing the answer, which should be "ACK".
"""
self.open_socket ( )
self.register_poller ( )
self.zmq_socket_req.send_unicode ( prefixed_msg )
retries = 0
answer = None
while answer == None:
answer = dict ( self.poller.poll ( 1000 ) )
if answer.get ( self.zmq_socket_req ) == zmq.POLLIN:
received = self.zmq_socket_req.recv ( )
answer = received.decode ( 'utf-8' )
if answer != 'ACK':
self.log ( 'Something is fishy!' )
self.log ( 'Received: "%s".' % answer.decode("utf-8") )
self.log ( "Expected: 'ACK'" )
else:
retries += 1
self.open_socket ( )
self.register_poller ( )
suffixed_message = prefixed_msg + " (message resent " + str ( retries ) + " times)"
self.zmq_socket_req.send_unicode ( suffixed_message )
self.close_socket ( )
return answer
[docs] def open_socket ( self ):
"""
This method's goal is to gracefully open the connection with the ZeroMQ proxy.
"""
self.zmq_socket_req = self.zmq_context.socket ( zmq.REQ )
self.zmq_socket_req.connect ( "tcp://127.0.0.1:5000" )
[docs] def register_poller ( self ):
"""
This method's goal is to create a polling mechanism so that message passing is non-blocking.
"""
self.poller = zmq.Poller ( )
self.poller.register ( self.zmq_socket_req,
zmq.POLLIN )