Hands-on PYZMQ for Async tasks
This tutorial explains how to use PyZMQ for Async tasks
First published on dev.to(A community for programmers)
Hello world, Let’s get into the topic, The Best way to Use pyZMQ.
A few days ago I started using PyZMQ from the blog tutorial in dev.to,
and also I found some code online with flask optimized I thought of reusing, I have reused the code and it worked fine, after that, I started coding for other functions when testing I found that the method `PUB-SUB` in PYZMQ is not suitable for that task, I will explain about the task in the bottom.
#importing ZMQ
import zmq
#Creating a context
context = zmq.Context()#creating and connceting to a socket.
socket = context.socket(zmq.PUSH)
socket.connect('tcp://localhost:5555')#function that sends data to the server
def send_data(data):
# we're sending string to the server as of now..
socket.send_string(f'{data}') #socket.send_json({"data":[1,2,3,4]})
#you can send lot of things, check out ZMQ official docs#Closing connection(We don't need this one, EXTRA).
def exit():
context.term()while 1:
#getting data from user to send to there server
send_data(input("Enter data to send: "))
import time
import zmq#creating contextcontext = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind('tcp://*:5555')#Creating poller
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)while True:
socks = dict(poller.poll())
#if any new message in poller it will PULL that message form the poller.
if socket in socks:
message = socket.recv_string()#message = socket.recv_json()
#you can receive lot of things, check out ZMQ official docs
print("sleeping 20 seconds")
time.sleep(20)#or Call some other function to change or modify
print(message) #If you're using SQLAlachemy
#session.commit() or db.session.commit()
checkout sharely.in (for saving and sharing links online..)
The Tutorial ends here. You can stop here, if you want to.
Why and when I used messaging Queue:-
I’m working in a project which is transferring a task to the like-minded people, It fetches all the like-minded users with some filters and sends this task to those targeted users,
So we decided to do this in with ZMQ I did that, but suddenly I found that when doing one task it rejects/not capturing all the triggering tasks at that particular time. I have the `PUB-SUB` model in ZMQ.
The code for that one is(Correct me if I’m wrong)
# import time
# import zmq# HOST = ''
# PORT = '6666'# _context = zmq.Context()
# _publisher = _context.socket(zmq.PUB)
# url = 'tcp://{}:{}'.format(HOST, PORT)# # socket =_context.socket(zmq.REQ)
# # socket.connect('tcp://{}:{}'.format(HOST, PORT))# def publish_message(message):
# try:
# _publisher.bind(url)
# # time.sleep(1)
# # print(url , message)
# _publisher.send_string(message)
# # socket.send_string(message)# except Exception as e:
# print ("error {}".format(e))# # finally:
# # _publisher.unbind(url)
#while 1:
# import sys
# import time
# import logging
# import os
# import zmq
# import time# HOST = ''
# PORT = '6666'# logging.basicConfig(filename='subscriber.log', level=logging.INFO)# class ZClient(object):# def __init__(self, host=HOST, port=PORT):# self.host = host
# self.port = port
# self._context = zmq.Context()
# self._subscriber = self._context.socket(zmq.SUB)
# # print ("Client Initiated")# def receive_message(self):
# """Start receiving messages"""
# self._subscriber.connect('tcp://{}:{}'.format(self.host, self.port))
# self._subscriber.setsockopt(zmq.SUBSCRIBE, b"")# while True:
# print("LISTENING:-")
# # print( 'listening on tcp://{}:{}'.format(self.host, self.port))
# self.message = self._subscriber.recv_string()
# print(self.message)
# logging.info(
# '{} - {}'.format(self.message, time.strftime("%Y-%m-%d %H:%M")))
# # time.sleep(1)
# self.result()# def result(self):
# time.sleep(10)
# print(self.message)# if __name__ == '__main__':
# zc = ZClient()
# zc.receive_message()