Hands-on PYZMQ for Async tasks
This tutorial explains how to use PyZMQ for Async tasks
First published on dev.to(A community for programmers)
https://dev.to/sanjaykhanssk/the-best-way-to-use-pyzmq-pub-sub-push-pull-543k/
Hello world, Let’s get into the topic, The Best way to Use pyZMQ.
A few days ago I started using PyZMQ from the blog tutorial in dev.to,
and also I found some code online with flask optimized I thought of reusing, I have reused the code and it worked fine, after that, I started coding for other functions when testing I found that the method `PUB-SUB` in PYZMQ is not suitable for that task, I will explain about the task in the bottom.
client.py
#importing ZMQ
import zmq
#Creating a context
context = zmq.Context()#creating and connceting to a socket.
socket = context.socket(zmq.PUSH)
socket.connect('tcp://localhost:5555')#function that sends data to the server
def send_data(data):
# we're sending string to the server as of now..
socket.send_string(f'{data}') #socket.send_json({"data":[1,2,3,4]})
#you can send lot of things, check out ZMQ official docs#Closing connection(We don't need this one, EXTRA).
def exit():
socket.close()
context.term()while 1:
#getting data from user to send to there server
send_data(input("Enter data to send: "))
exit()
server.py
import time
#importing
import zmq#creating contextcontext = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind('tcp://*:5555')#Creating poller
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)while True:
socks = dict(poller.poll())
#if any new message in poller it will PULL that message form the poller.
if socket in socks:
message = socket.recv_string()#message = socket.recv_json()
#you can receive lot of things, check out ZMQ official docs
print("sleeping 20 seconds")
time.sleep(20)#or Call some other function to change or modify
print(message) #If you're using SQLAlachemy
#session.commit() or db.session.commit()
checkout sharely.in (for saving and sharing links online..)
The Tutorial ends here. You can stop here, if you want to.
Why and when I used messaging Queue:-
I’m working in a project which is transferring a task to the like-minded people, It fetches all the like-minded users with some filters and sends this task to those targeted users,
So we decided to do this in with ZMQ I did that, but suddenly I found that when doing one task it rejects/not capturing all the triggering tasks at that particular time. I have the `PUB-SUB` model in ZMQ.
The code for that one is(Correct me if I’m wrong)
sender.py
# import time
# import zmq# HOST = '127.0.0.1'
# PORT = '6666'# _context = zmq.Context()
# _publisher = _context.socket(zmq.PUB)
# url = 'tcp://{}:{}'.format(HOST, PORT)# # socket =_context.socket(zmq.REQ)
# # socket.connect('tcp://{}:{}'.format(HOST, PORT))# def publish_message(message):
# try:
# _publisher.bind(url)
# # time.sleep(1)
# # print(url , message)
# _publisher.send_string(message)
# # socket.send_string(message)# except Exception as e:
# print ("error {}".format(e))# # finally:
# # _publisher.unbind(url)
#while 1:
#publish_message('response..')
`server.py`
# import sys
# import time
# import logging
# import os
# import zmq
# import time# HOST = '127.0.0.1'
# PORT = '6666'# logging.basicConfig(filename='subscriber.log', level=logging.INFO)# class ZClient(object):# def __init__(self, host=HOST, port=PORT):# self.host = host
# self.port = port
# self._context = zmq.Context()
# self._subscriber = self._context.socket(zmq.SUB)
# # print ("Client Initiated")# def receive_message(self):
# """Start receiving messages"""
# self._subscriber.connect('tcp://{}:{}'.format(self.host, self.port))
# self._subscriber.setsockopt(zmq.SUBSCRIBE, b"")# while True:
# print("LISTENING:-")
# # print( 'listening on tcp://{}:{}'.format(self.host, self.port))
# self.message = self._subscriber.recv_string()
# print(self.message)
# logging.info(
# '{} - {}'.format(self.message, time.strftime("%Y-%m-%d %H:%M")))
# # time.sleep(1)
# self.result()# def result(self):
# time.sleep(10)
# print(self.message)# if __name__ == '__main__':
# zc = ZClient()
# zc.receive_message()