Source code for tuna.zeromq.zmq_proxy

"""
This module's scope is to mediate communication between ZeroMQ clients.
"""

import time
import zmq

[docs]class zmq_proxy ( ): """ This class' responsibility is to setup a ZeroMQ proxy and mediate message passing between its clients. It binds to port 5000. """ def __init__ ( self ): self.__zmq_context = zmq.Context ( ) self.__zmq_socket_rep = self.__zmq_context.socket ( zmq.REP ) self.__lock = False self.__zmq_poller = zmq.Poller ( ) self.__zmq_poller.register ( self.__zmq_socket_rep, zmq.POLLIN ) def __call_log ( self, msg ): """ This method's goal is to dispatch the input message to a log server. Parameters: * msg : string """ self.__zmq_socket_req = self.__zmq_context.socket ( zmq.REQ ) self.__zmq_socket_req.setsockopt ( zmq.LINGER, 0 ) self.__zmq_socket_req.connect ( "tcp://127.0.0.1:5001" ) self.__zmq_socket_req.send ( msg.encode("utf-8") ) answer = self.__zmq_socket_req.recv ( ) if answer.decode ( "utf-8" ) != 'ACK': print ( u'Something is fishy!' ) print ( u'Received: "%s" from %s.' % answer.decode("utf-8"), msg_destination ) print ( u"Expected: 'ACK'" ) self.__zmq_socket_req.close ( ) def __call_print ( self, msg ): """ This method's goal is to print a received message. It is meant as a fallback in case the log server is unavailable. Parameters: * msg : string """ print ( "zmq_proxy received the message '%s'." % msg )
[docs] def check_ACK ( self, ack_msg ): """ This method's goal is to verify that the input ZeroMQ message contains the string "ACK". Parameters: * ack_msg : (byte) string """ if ack_msg.decode ( "utf-8" ) != 'ACK': print ( u'Something is fishy!' ) print ( u'Received: "%s" from %s.' % answer.decode("utf-8"), msg_destination ) print ( u"Expected: 'ACK'" )
[docs] def close ( self ): """ This method's goal is to gracefully shutdown the ZeroMQ proxy. """ print ( "Shutting down zmq_proxy." ) self.__lock = False
def __del__ ( self ): self.__lock = False self.__zmq_socket_req = self.__zmq_context.socket ( zmq.REQ ) self.__zmq_socket_req.setsockopt ( zmq.LINGER, 0 ) self.__zmq_socket_req.connect ( "tcp://127.0.0.1:5000" ) self.__zmq_socket_req.send ( b'info: Shutting down zmq_proxy.' ) answer = self.__zmq_socket_req.recv ( ) self.check_ACK ( answer ) self.__zmq_socket_req.close ( )
[docs] def run ( self ): """ This method's goal is to orchestrate incoming messages. It will run in loop, listening to messages and dispatching them as appropriated. Note to developers: destination_call_table is a dictionary associating target strings with the functions to be run. The services responsible for a given target can be changed here without changing the clients. """ started = False first_try = True while ( not started ): try: started = True self.__zmq_socket_rep.bind ( "tcp://127.0.0.1:5000" ) except zmq.ZMQError as error_message: if ( first_try ): #print ( 'ZMQError: %s' % error_message ) #print ( "Is zmq_proxy already running? Will silently retry every 10 seconds." ) first_try = False time.sleep ( 10 ) started = False self.__lock = True destination_call_table = { 'info' : self.__call_print, 'log' : self.__call_log, 'test' : self.__call_print, } while self.__lock == True: zmq_buffer = dict ( self.__zmq_poller.poll ( 5 ) ) if self.__zmq_socket_rep in zmq_buffer and zmq_buffer [ self.__zmq_socket_rep ] == zmq.POLLIN: msg = self.__zmq_socket_rep.recv ( ) msg_partition = str ( msg, ( "utf-8" ) ).partition ( ": " ) msg_destination = msg_partition[0] msg_contents = msg_partition[2] destination_call_table [ msg_destination ] ( msg_contents ) self.__zmq_socket_rep.send ( b'ACK' )