Hands-on PYZMQ for Async tasks

SanjayKhanSSK
3 min readMay 15, 2020

This tutorial explains how to use PyZMQ for Async tasks

credits to author

First published on dev.to(A community for programmers)

https://dev.to/sanjaykhanssk/the-best-way-to-use-pyzmq-pub-sub-push-pull-543k/

Hello world, Let’s get into the topic, The Best way to Use pyZMQ.
A few days ago I started using PyZMQ from the blog tutorial in dev.to,
and also I found some code online with flask optimized I thought of reusing, I have reused the code and it worked fine, after that, I started coding for other functions when testing I found that the method `PUB-SUB` in PYZMQ is not suitable for that task, I will explain about the task in the bottom.

client.py

#importing ZMQ
import zmq
#Creating a context
context = zmq.Context()
#creating and connceting to a socket.
socket = context.socket(zmq.PUSH)
socket.connect('tcp://localhost:5555')
#function that sends data to the server
def send_data(data):
# we're sending string to the server as of now..
socket.send_string(f'{data}')
#socket.send_json({"data":[1,2,3,4]})
#you can send lot of things, check out ZMQ official docs
#Closing connection(We don't need this one, EXTRA).
def exit():
socket.close()
context.term()
while 1:
#getting data from user to send to there server
send_data(input("Enter data to send: "))
exit()

server.py

import time
#importing
import zmq
#creating contextcontext = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind('tcp://*:5555')
#Creating poller
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
while True:
socks = dict(poller.poll())
#if any new message in poller it will PULL that message form the poller.
if socket in socks:
message = socket.recv_string()
#message = socket.recv_json()
#you can receive lot of things, check out ZMQ official docs
print("sleeping 20 seconds")
time.sleep(20)#or Call some other function to change or modify
print(message)
#If you're using SQLAlachemy
#session.commit() or db.session.commit()

checkout sharely.in (for saving and sharing links online..)

The Tutorial ends here. You can stop here, if you want to.

Why and when I used messaging Queue:-

I’m working in a project which is transferring a task to the like-minded people, It fetches all the like-minded users with some filters and sends this task to those targeted users,
So we decided to do this in with ZMQ I did that, but suddenly I found that when doing one task it rejects/not capturing all the triggering tasks at that particular time. I have the `PUB-SUB` model in ZMQ.
The code for that one is(Correct me if I’m wrong)

sender.py

# import time
# import zmq
# HOST = '127.0.0.1'
# PORT = '6666'
# _context = zmq.Context()
# _publisher = _context.socket(zmq.PUB)
# url = 'tcp://{}:{}'.format(HOST, PORT)
# # socket =_context.socket(zmq.REQ)
# # socket.connect('tcp://{}:{}'.format(HOST, PORT))
# def publish_message(message):

# try:
# _publisher.bind(url)
# # time.sleep(1)
# # print(url , message)
# _publisher.send_string(message)
# # socket.send_string(message)
# except Exception as e:
# print ("error {}".format(e))
# # finally:
# # _publisher.unbind(url)
#while 1:
#publish_message('response..')

`server.py`

# import sys
# import time
# import logging
# import os
# import zmq
# import time
# HOST = '127.0.0.1'
# PORT = '6666'
# logging.basicConfig(filename='subscriber.log', level=logging.INFO)# class ZClient(object):# def __init__(self, host=HOST, port=PORT):# self.host = host
# self.port = port
# self._context = zmq.Context()
# self._subscriber = self._context.socket(zmq.SUB)
# # print ("Client Initiated")
# def receive_message(self):
# """Start receiving messages"""
# self._subscriber.connect('tcp://{}:{}'.format(self.host, self.port))
# self._subscriber.setsockopt(zmq.SUBSCRIBE, b"")
# while True:
# print("LISTENING:-")
# # print( 'listening on tcp://{}:{}'.format(self.host, self.port))
# self.message = self._subscriber.recv_string()
# print(self.message)
# logging.info(
# '{} - {}'.format(self.message, time.strftime("%Y-%m-%d %H:%M")))
# # time.sleep(1)
# self.result()
# def result(self):
# time.sleep(10)
# print(self.message)
# if __name__ == '__main__':
# zc = ZClient()
# zc.receive_message()

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

No responses yet

Write a response