Automate Rest Api's using Python
Automate Rest Api's using Python
Automation of rest API using python is done by inbuilt library of python , Python package index Request
$ pip install requests
For more detailed documentation refer
http://docs.python-requests.org/en/master/
import requests
import json
import time
import threading
import smtplib
from time import strftime
import random
class Tenant(threading.Thread):
def __init__(self, email, mobile, accessurl):
threading.Thread.__init__(self)
self.email = email
self.mobile = mobile
self.accessurl = accessurl + '.testapp.com'
self.headers = {'Content-type': 'application/json'}
self.request_id = {}
self.status = {}
self.emailLogin = 'ashish@gmail.com'
self.emailPassword = '123@123'
self.to_addr_list = ['piyush@gmailcom','sunny@gmail.com', 'ashish@gmail.com']
#self.to_addr_list =['ashish@gmail.com']
self.cc_addr_list = ['ashish@gmail.com']
self.subject = 'Tenant Creation Sanity Daily Status'
self.message = {}
self.from_addr = 'ashish@gmail.com'
self.tenanttime = 0
def run(self):
print "Starting" + self.accessurl
self.createClientverifyApi()
print "Existing " + self.accessurl
def createClientverifyApi(self):
try:
requestclientverify = {"apiurl": "http://www.unicommerce.info/client/verify",
"body": {"email": self.email, "mobile": self.mobile, "accessUrl": self.accessurl}}
responseclientverify = requests.post(requestclientverify["apiurl"],
data=json.dumps(requestclientverify["body"]), headers= self.headers)
#isSuccess = json.loads(responseclientverify.text).get('successful')
#if responseclientverify.status_code == 200 and isSuccess == True:
if responseclientverify.status_code==200:
isSuccess = json.loads(responseclientverify.text).get('successful')
if isSuccess ==True:
print "+++ Tenant Create Step 1 Passed +++", responseclientverify.text
request = json.loads(responseclientverify.text)
self.request_id = request["tenantStatusDTO"]["requestId"]
#print self.request_id
self.status = request["tenantStatusDTO"]["status"]
#print self.status
self.createUpdateTenantverificationStatusApi()
else:
print "+++ API Giving False response ", responseclientverify.text
self.sendemail(responseclientverify.text)
else:
print "+++some internal server error in Tenant Create Step 1 api+++"
print responseclientverify.text
self.sendemail(responseclientverify.text)
#exit()
except requests.HTTPError, e:
print "HTTP ERROR %s occurred" % e.code
def createUpdateTenantverificationStatusApi(self):
try:
if self.request_id is None:
print ("request id is null")
else:
requesttenantverifstatus = {
"apiurl": "http://www.unicommerce.info/client/updateTenantVerificationStatus",
"body": {"requestId": self.request_id, "status": "success", "mobile": self.mobile, "description": "",
"date": "", "keypress": 1}}
responsetenantverifystatus = requests.post(requesttenantverifstatus["apiurl"],
data=json.dumps(requesttenantverifstatus["body"]),
headers=self.headers)
#print responsetenantverifystatus.text
if responsetenantverifystatus.status_code==200:
isSuccess= json.loads(responsetenantverifystatus.text).get('successful')
if isSuccess==True:
print ("+++Phone No verified+++")
print responsetenantverifystatus.text
self.createTenantverify()
else:
print "+++Phone Verify Api giving false response +++", responsetenantverifystatus.text
self.sendemail(responsetenantverifystatus.text)
else:
print "+++some error - Phone No verification api Failed++++"
print responsetenantverifystatus.text
self.sendemail(responsetenantverifystatus.text)
#exit()
except requests.HTTPError, e:
print "HTTP error %s occured" % e.code
def createTenantverify(self):
try:
requesttenantverify = {"apiurl": "http://www.unicommerce.info/client/verificationStatus",
"body": {"requestId": self.request_id}}
responsetenantverify = requests.post(requesttenantverify["apiurl"],
data=json.dumps(requesttenantverify["body"]),
headers=self.headers)
#isSuccess = json.loads(responsetenantverify.text).get('successful')
#if responsetenantverify.status_code == 200 and isSuccess==True:
if responsetenantverify.status_code==200:
isSuccess = json.loads(responsetenantverify.text).get('successful')
if isSuccess==True:
t0 = time.time()
#print "Tenant Create Final Api ", responsetenantverify.text
request = json.loads(responsetenantverify.text)
self.status = request["tenantStatusDTO"]["status"]
#print self.status
while self.status == "INITIATED":
responsetenantverify = requests.post(requesttenantverify["apiurl"],
data=json.dumps(requesttenantverify["body"]),
headers=self.headers)
request = json.loads(responsetenantverify.text)
self.status = request["tenantStatusDTO"]["status"]
print self.status
else:
self.tenanttime = time.time() - t0
print "++++ Tenant Creation Final Status ++++ ", responsetenantverify.text, time.time() - t0,\
"seconds took for tenantcreate"
self.sendemail(responsetenantverify.text)
else:
print "+++Phone Verify Api giving false response +++", responsetenantverify.text
self.sendemail(responsetenantverify.text)
else:
print "++++ Tenant Creation Final Status Failed +++", responsetenantverify.text
self.sendemail(responsetenantverify.text)
#exit()
except requests.HTTPError, e:
print "HTTP error %s occured" % e.code
def sendemail(self, res):
smtpserver='smtp.gmail.com:587'
self.message = ' Hello All\n\n Total time taken in Tenant Creation : %d sec\n\n Tenant Name : %s \n Tenant Status: %s ' \
'\n\n Tenant creation Response Logs\n\n %s' %(self.tenanttime, self.accessurl, self.status, res)
header = 'From: %s\n' % self.from_addr
header += 'To: %s\n' % ','.join(self.to_addr_list)
header += 'Cc: %s\n' % ','.join(self.cc_addr_list)
header += 'Subject: %s\n\n' % self.subject
self.message = header + self.message
server = smtplib.SMTP(smtpserver)
server.starttls()
server.login(self.emailLogin, self.emailPassword)
problems = server.sendmail(self.from_addr, self.to_addr_list, self.message)
print 'mail sent'
server.quit()
if __name__=='__main__':
#email = raw_input("enter valid email address:")
email ="testautomate" + strftime("%d%b%H%M%S") + "@test.com"
#email ="ashish@ testapp.com"
accessUrl = "2a5350-020-staging1"
#accessUrl ='TenantAutomate' + strftime("%d%b%H%M%S")
mobile = random.randrange(1000000000, 7999999999)
#accessUrl = raw_input("enter the access Url[tenantname. testapp.com]:")
#mobile = raw_input("enter 10 digit mobile no:")
print (" Tenant Getting Created Please be patient!!")
t = Tenant(email, mobile, accessUrl)
t.start()
Installation
Installing Requests via pip is fairly simple, just run this in your terminal.$ pip install requests
For more detailed documentation refer
http://docs.python-requests.org/en/master/
Sample Code
import requests
url = "http://vulcan-olpstaging.unicommerce.info:8089/trips/lastmile"
headers = {
'content-type': "application/json",
'cache-control': "no-cache",
}
payload ={
"pyMessage": {
"Tasks": [{
"Task": {
"CONSIGNE_STATE": "Delhi",
"CONSIGNE_NAME": "sonu sharma",
"WIDTH": 10.0,
"CREW_CODE": "OLP9876",
"TRIP_TYPE": "FWD",
"TASK_ACTION": "ADD",
"DATE": "2016-06-23 13:13:41",
"VOL_WEIGHT": 500.0,
"TRIP_NO": "Trip1001",
"COLLECTIBLE_AMOUNT": 1.0,
"AWB_NUMBER": "Ship1900",
"REFERENCE_NO": "Ship1900",
"LENGTH": 10.0,
"QTY": 1,
"SHIPMENT_TYPE": "cod",
"TRIP_STATUS": "PLD",
"ITEM_DESCRIPTION": "Nova 1045 Trimmer - White",
"WEIGHT": 500.0,
"HEIGHT": 10.0,
"CONSIGNE_PINCODE": 122001,
"CONSIGNE_ADDRESS2": "near solanki market",
"CONSIGNE_ADDRESS3": "",
"CONSIGNE_ADDRESS1": "d-100 madhu vihar near solanki market uttam shop name friends computer zone",
"CREW_NAME": "Sonu Goyal",
"TASK_TYPE": "FWD",
"MOBILE": 9999872129,
"DELIVERY_CENTRE_CODE": "GGN",
"CONSIGNE_CITY": "Delhi",
"ORG": "vul"
}
}
]
}
}
response = requests.post(url, headers=headers,json =payload)
print(response.text)
{"status":true,"token":"","Message":"","Result":"OK","TotalRecordCount":1,"Records":[{"STATUS":"TRUE","TASK_TYPE":"FWD","AWB_NUMBER":"Ship1900","REASON":""}]}
Automation API Flow- Sample Code Using Python
__author__ = "Ashish ~ _self_name ='marcus'"import requests
import json
import time
import threading
import smtplib
from time import strftime
import random
class Tenant(threading.Thread):
def __init__(self, email, mobile, accessurl):
threading.Thread.__init__(self)
self.email = email
self.mobile = mobile
self.accessurl = accessurl + '.testapp.com'
self.headers = {'Content-type': 'application/json'}
self.request_id = {}
self.status = {}
self.emailLogin = 'ashish@gmail.com'
self.emailPassword = '123@123'
self.to_addr_list = ['piyush@gmailcom','sunny@gmail.com', 'ashish@gmail.com']
#self.to_addr_list =['ashish@gmail.com']
self.cc_addr_list = ['ashish@gmail.com']
self.subject = 'Tenant Creation Sanity Daily Status'
self.message = {}
self.from_addr = 'ashish@gmail.com'
self.tenanttime = 0
def run(self):
print "Starting" + self.accessurl
self.createClientverifyApi()
print "Existing " + self.accessurl
def createClientverifyApi(self):
try:
requestclientverify = {"apiurl": "http://www.unicommerce.info/client/verify",
"body": {"email": self.email, "mobile": self.mobile, "accessUrl": self.accessurl}}
responseclientverify = requests.post(requestclientverify["apiurl"],
data=json.dumps(requestclientverify["body"]), headers= self.headers)
#isSuccess = json.loads(responseclientverify.text).get('successful')
#if responseclientverify.status_code == 200 and isSuccess == True:
if responseclientverify.status_code==200:
isSuccess = json.loads(responseclientverify.text).get('successful')
if isSuccess ==True:
print "+++ Tenant Create Step 1 Passed +++", responseclientverify.text
request = json.loads(responseclientverify.text)
self.request_id = request["tenantStatusDTO"]["requestId"]
#print self.request_id
self.status = request["tenantStatusDTO"]["status"]
#print self.status
self.createUpdateTenantverificationStatusApi()
else:
print "+++ API Giving False response ", responseclientverify.text
self.sendemail(responseclientverify.text)
else:
print "+++some internal server error in Tenant Create Step 1 api+++"
print responseclientverify.text
self.sendemail(responseclientverify.text)
#exit()
except requests.HTTPError, e:
print "HTTP ERROR %s occurred" % e.code
def createUpdateTenantverificationStatusApi(self):
try:
if self.request_id is None:
print ("request id is null")
else:
requesttenantverifstatus = {
"apiurl": "http://www.unicommerce.info/client/updateTenantVerificationStatus",
"body": {"requestId": self.request_id, "status": "success", "mobile": self.mobile, "description": "",
"date": "", "keypress": 1}}
responsetenantverifystatus = requests.post(requesttenantverifstatus["apiurl"],
data=json.dumps(requesttenantverifstatus["body"]),
headers=self.headers)
#print responsetenantverifystatus.text
if responsetenantverifystatus.status_code==200:
isSuccess= json.loads(responsetenantverifystatus.text).get('successful')
if isSuccess==True:
print ("+++Phone No verified+++")
print responsetenantverifystatus.text
self.createTenantverify()
else:
print "+++Phone Verify Api giving false response +++", responsetenantverifystatus.text
self.sendemail(responsetenantverifystatus.text)
else:
print "+++some error - Phone No verification api Failed++++"
print responsetenantverifystatus.text
self.sendemail(responsetenantverifystatus.text)
#exit()
except requests.HTTPError, e:
print "HTTP error %s occured" % e.code
def createTenantverify(self):
try:
requesttenantverify = {"apiurl": "http://www.unicommerce.info/client/verificationStatus",
"body": {"requestId": self.request_id}}
responsetenantverify = requests.post(requesttenantverify["apiurl"],
data=json.dumps(requesttenantverify["body"]),
headers=self.headers)
#isSuccess = json.loads(responsetenantverify.text).get('successful')
#if responsetenantverify.status_code == 200 and isSuccess==True:
if responsetenantverify.status_code==200:
isSuccess = json.loads(responsetenantverify.text).get('successful')
if isSuccess==True:
t0 = time.time()
#print "Tenant Create Final Api ", responsetenantverify.text
request = json.loads(responsetenantverify.text)
self.status = request["tenantStatusDTO"]["status"]
#print self.status
while self.status == "INITIATED":
responsetenantverify = requests.post(requesttenantverify["apiurl"],
data=json.dumps(requesttenantverify["body"]),
headers=self.headers)
request = json.loads(responsetenantverify.text)
self.status = request["tenantStatusDTO"]["status"]
print self.status
else:
self.tenanttime = time.time() - t0
print "++++ Tenant Creation Final Status ++++ ", responsetenantverify.text, time.time() - t0,\
"seconds took for tenantcreate"
self.sendemail(responsetenantverify.text)
else:
print "+++Phone Verify Api giving false response +++", responsetenantverify.text
self.sendemail(responsetenantverify.text)
else:
print "++++ Tenant Creation Final Status Failed +++", responsetenantverify.text
self.sendemail(responsetenantverify.text)
#exit()
except requests.HTTPError, e:
print "HTTP error %s occured" % e.code
def sendemail(self, res):
smtpserver='smtp.gmail.com:587'
self.message = ' Hello All\n\n Total time taken in Tenant Creation : %d sec\n\n Tenant Name : %s \n Tenant Status: %s ' \
'\n\n Tenant creation Response Logs\n\n %s' %(self.tenanttime, self.accessurl, self.status, res)
header = 'From: %s\n' % self.from_addr
header += 'To: %s\n' % ','.join(self.to_addr_list)
header += 'Cc: %s\n' % ','.join(self.cc_addr_list)
header += 'Subject: %s\n\n' % self.subject
self.message = header + self.message
server = smtplib.SMTP(smtpserver)
server.starttls()
server.login(self.emailLogin, self.emailPassword)
problems = server.sendmail(self.from_addr, self.to_addr_list, self.message)
print 'mail sent'
server.quit()
if __name__=='__main__':
#email = raw_input("enter valid email address:")
email ="testautomate" + strftime("%d%b%H%M%S") + "@test.com"
#email ="ashish@ testapp.com"
accessUrl = "2a5350-020-staging1"
#accessUrl ='TenantAutomate' + strftime("%d%b%H%M%S")
mobile = random.randrange(1000000000, 7999999999)
#accessUrl = raw_input("enter the access Url[tenantname. testapp.com]:")
#mobile = raw_input("enter 10 digit mobile no:")
print (" Tenant Getting Created Please be patient!!")
t = Tenant(email, mobile, accessUrl)
t.start()
Comments
Post a Comment