Automate Rest Api's using Python

Automate Rest Api's using Python 


Automation of rest API using python is done by inbuilt library of python , Python package index Request

Installation

Installing Requests via pip is fairly simple, just run this in your terminal.

$ pip install requests

For more detailed documentation refer
http://docs.python-requests.org/en/master/

Sample Code


import requests

url = "http://vulcan-olpstaging.unicommerce.info:8089/trips/lastmile"

headers = {
    'content-type': "application/json",
    'cache-control': "no-cache",
    }

payload ={
"pyMessage": {
"Tasks": [{
"Task": {
"CONSIGNE_STATE": "Delhi",
"CONSIGNE_NAME": "sonu sharma",
"WIDTH": 10.0,
"CREW_CODE": "OLP9876",
"TRIP_TYPE": "FWD",
"TASK_ACTION": "ADD",
"DATE": "2016-06-23 13:13:41",
"VOL_WEIGHT": 500.0,
"TRIP_NO": "Trip1001",
"COLLECTIBLE_AMOUNT": 1.0,
"AWB_NUMBER": "Ship1900",
"REFERENCE_NO": "Ship1900",
"LENGTH": 10.0,
"QTY": 1,
"SHIPMENT_TYPE": "cod",
"TRIP_STATUS": "PLD",
"ITEM_DESCRIPTION": "Nova 1045 Trimmer - White",
"WEIGHT": 500.0,
"HEIGHT": 10.0,
"CONSIGNE_PINCODE": 122001,
"CONSIGNE_ADDRESS2": "near solanki market",
"CONSIGNE_ADDRESS3": "",
"CONSIGNE_ADDRESS1": "d-100 madhu vihar near solanki market uttam                                            shop name friends computer zone",
"CREW_NAME": "Sonu Goyal",
"TASK_TYPE": "FWD",
"MOBILE": 9999872129,
"DELIVERY_CENTRE_CODE": "GGN",
"CONSIGNE_CITY": "Delhi",
"ORG": "vul"
}
}


]
}
}
response = requests.post(url, headers=headers,json =payload)

print(response.text)
{"status":true,"token":"","Message":"","Result":"OK","TotalRecordCount":1,"Records":[{"STATUS":"TRUE","TASK_TYPE":"FWD","AWB_NUMBER":"Ship1900","REASON":""}]}

Automation API Flow- Sample Code Using Python 

__author__ = "Ashish ~ _self_name ='marcus'"
import requests
import json
import time
import threading
import smtplib
from time import strftime
import random

class Tenant(threading.Thread):
    def __init__(self, email, mobile, accessurl):
        threading.Thread.__init__(self)
        self.email = email
        self.mobile = mobile
        self.accessurl = accessurl + '.testapp.com'
        self.headers = {'Content-type': 'application/json'}
        self.request_id = {}
        self.status = {}
        self.emailLogin = 'ashish@gmail.com'
        self.emailPassword = '123@123'
        self.to_addr_list = ['piyush@gmailcom','sunny@gmail.com', 'ashish@gmail.com']
        #self.to_addr_list =['ashish@gmail.com']
        self.cc_addr_list = ['ashish@gmail.com']
        self.subject = 'Tenant Creation Sanity Daily Status'
        self.message = {}
        self.from_addr = 'ashish@gmail.com'
        self.tenanttime = 0
    def run(self):
        print "Starting" + self.accessurl
        self.createClientverifyApi()
        print "Existing " + self.accessurl
    def createClientverifyApi(self):
        try:
            requestclientverify = {"apiurl": "http://www.unicommerce.info/client/verify",
                                   "body": {"email": self.email, "mobile": self.mobile, "accessUrl": self.accessurl}}
            responseclientverify = requests.post(requestclientverify["apiurl"],
                                                   data=json.dumps(requestclientverify["body"]), headers= self.headers)
            #isSuccess = json.loads(responseclientverify.text).get('successful')
            #if responseclientverify.status_code == 200 and isSuccess == True:
            if responseclientverify.status_code==200:
                isSuccess = json.loads(responseclientverify.text).get('successful')
                if isSuccess ==True:
                    print "+++ Tenant Create Step 1 Passed +++", responseclientverify.text
                    request = json.loads(responseclientverify.text)
                    self.request_id = request["tenantStatusDTO"]["requestId"]
                    #print self.request_id
                    self.status = request["tenantStatusDTO"]["status"]
                    #print self.status
                    self.createUpdateTenantverificationStatusApi()
                else:
                    print "+++ API Giving False response ", responseclientverify.text
                    self.sendemail(responseclientverify.text)
            else:
                print "+++some internal server error in Tenant Create Step 1 api+++"
                print responseclientverify.text
                self.sendemail(responseclientverify.text)
                #exit()
        except requests.HTTPError, e:
            print "HTTP ERROR %s occurred" % e.code

    def createUpdateTenantverificationStatusApi(self):
        try:
            if self.request_id is None:
                print ("request id is null")
            else:
                requesttenantverifstatus = {
                    "apiurl": "http://www.unicommerce.info/client/updateTenantVerificationStatus",
                    "body": {"requestId": self.request_id, "status": "success", "mobile": self.mobile, "description": "",
                             "date": "", "keypress": 1}}
                responsetenantverifystatus = requests.post(requesttenantverifstatus["apiurl"],
                                                                   data=json.dumps(requesttenantverifstatus["body"]),
                                                                   headers=self.headers)
                #print responsetenantverifystatus.text
                if responsetenantverifystatus.status_code==200:
                    isSuccess= json.loads(responsetenantverifystatus.text).get('successful')
                    if isSuccess==True:
                        print ("+++Phone No verified+++")
                        print responsetenantverifystatus.text
                        self.createTenantverify()
                    else:
                        print "+++Phone Verify Api giving false response +++", responsetenantverifystatus.text
                        self.sendemail(responsetenantverifystatus.text)
                else:
                    print "+++some error - Phone No verification api Failed++++"
                    print responsetenantverifystatus.text
                    self.sendemail(responsetenantverifystatus.text)
                    #exit()
        except requests.HTTPError, e:
            print "HTTP error %s occured" % e.code
    def createTenantverify(self):
        try:
            requesttenantverify = {"apiurl": "http://www.unicommerce.info/client/verificationStatus",
                                                "body": {"requestId": self.request_id}}
            responsetenantverify = requests.post(requesttenantverify["apiurl"],
                                                               data=json.dumps(requesttenantverify["body"]),
                                                               headers=self.headers)
            #isSuccess = json.loads(responsetenantverify.text).get('successful')
            #if responsetenantverify.status_code == 200 and isSuccess==True:
            if responsetenantverify.status_code==200:
                isSuccess = json.loads(responsetenantverify.text).get('successful')
                if isSuccess==True:
                    t0 = time.time()
                    #print "Tenant Create Final Api ", responsetenantverify.text
                    request = json.loads(responsetenantverify.text)
                    self.status = request["tenantStatusDTO"]["status"]
                    #print self.status
                    while self.status == "INITIATED":
                        responsetenantverify = requests.post(requesttenantverify["apiurl"],
                                                               data=json.dumps(requesttenantverify["body"]),
                                                               headers=self.headers)
                        request = json.loads(responsetenantverify.text)
                        self.status = request["tenantStatusDTO"]["status"]
                        print self.status
                    else:
                        self.tenanttime = time.time() - t0
                        print "++++ Tenant Creation Final Status ++++ ", responsetenantverify.text, time.time() - t0,\
                            "seconds took for tenantcreate"
                        self.sendemail(responsetenantverify.text)
                else:
                    print "+++Phone Verify Api giving false response +++", responsetenantverify.text
                    self.sendemail(responsetenantverify.text)
            else:
                print "++++ Tenant Creation Final Status Failed +++", responsetenantverify.text
                self.sendemail(responsetenantverify.text)
                #exit()
        except requests.HTTPError, e:
            print "HTTP error %s occured" % e.code

    def sendemail(self, res):

        smtpserver='smtp.gmail.com:587'
        self.message = ' Hello All\n\n Total time taken in Tenant Creation : %d sec\n\n Tenant Name : %s \n Tenant Status: %s ' \
                       '\n\n Tenant creation Response Logs\n\n %s' %(self.tenanttime, self.accessurl, self.status, res)
        header  = 'From: %s\n' % self.from_addr
        header += 'To: %s\n' % ','.join(self.to_addr_list)
        header += 'Cc: %s\n' % ','.join(self.cc_addr_list)
        header += 'Subject: %s\n\n' % self.subject
        self.message = header + self.message
        server = smtplib.SMTP(smtpserver)
        server.starttls()
        server.login(self.emailLogin, self.emailPassword)
        problems = server.sendmail(self.from_addr, self.to_addr_list, self.message)
        print 'mail sent'
        server.quit()

if __name__=='__main__':

    #email = raw_input("enter valid email address:")
    email ="testautomate" + strftime("%d%b%H%M%S") + "@test.com"
    #email ="ashish@ testapp.com"
    accessUrl = "2a5350-020-staging1"
    #accessUrl ='TenantAutomate' + strftime("%d%b%H%M%S")
    mobile = random.randrange(1000000000, 7999999999)
    #accessUrl = raw_input("enter the access Url[tenantname. testapp.com]:")
    #mobile = raw_input("enter 10 digit mobile no:")
    print (" Tenant Getting Created Please be patient!!")
    t = Tenant(email, mobile, accessUrl)
    t.start()






Comments

Popular posts from this blog

ADD EMPTY HEADER IN JMETER SAMPLE REQUEST

How To Automate Rest API in Postman

Testing SOAP API using JMETER