Skip to content

Commit

Permalink
Run black over codebase
Browse files Browse the repository at this point in the history
  • Loading branch information
sjkingo committed Jul 9, 2020
1 parent 238baee commit c753c70
Show file tree
Hide file tree
Showing 25 changed files with 581 additions and 609 deletions.
2 changes: 1 addition & 1 deletion freshdesk/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.3.0'
__version__ = "1.3.0"
5 changes: 2 additions & 3 deletions freshdesk/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
from freshdesk.v2 import api as v2_api


_VERSIONS = {1: v1_api.API,
2: v2_api.API}
_VERSIONS = {1: v1_api.API, 2: v2_api.API}


class FreshdeskAPI(object):
Expand All @@ -21,7 +20,7 @@ def API(domain, api_key, version=2, **kwargs):

# trim the v from a 'v1' or similar
try:
version = version.lstrip('v')
version = version.lstrip("v")
except AttributeError:
pass

Expand Down
203 changes: 92 additions & 111 deletions freshdesk/v1/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,27 @@ def __init__(self, api):
self._api = api

def create_ticket(self, subject, **kwargs):
url = 'helpdesk/tickets.json'
status = kwargs.get('status', 2)
priority = kwargs.get('priority', 1)
cc_emails = ','.join(kwargs.get('cc_emails', []))
url = "helpdesk/tickets.json"
status = kwargs.get("status", 2)
priority = kwargs.get("priority", 1)
cc_emails = ",".join(kwargs.get("cc_emails", []))
ticket_data = {
'subject': subject,
'status': status,
'priority': priority,
"subject": subject,
"status": status,
"priority": priority,
}
ticket_data.update(kwargs)
data = {
'helpdesk_ticket': ticket_data,
'cc_emails': cc_emails,
"helpdesk_ticket": ticket_data,
"cc_emails": cc_emails,
}
return Ticket(**self._api._post(url, data=data)['helpdesk_ticket'])

return Ticket(**self._api._post(url, data=data)["helpdesk_ticket"])

def get_ticket(self, ticket_id):
"""Fetches the ticket for the given ticket ID"""
url = 'helpdesk/tickets/%d.json' % ticket_id
return Ticket(**self._api._get(url)['helpdesk_ticket'])
url = "helpdesk/tickets/%d.json" % ticket_id
return Ticket(**self._api._get(url)["helpdesk_ticket"])

def list_tickets(self, **kwargs):
"""List all tickets, optionally filtered by a view. Specify filters as
Expand All @@ -42,36 +42,36 @@ def list_tickets(self, **kwargs):
Multiple filters are AND'd together.
"""

filter_name = 'all_tickets'
if 'filter_name' in kwargs and kwargs['filter_name'] is not None:
filter_name = kwargs['filter_name']
del kwargs['filter_name']
filter_name = "all_tickets"
if "filter_name" in kwargs and kwargs["filter_name"] is not None:
filter_name = kwargs["filter_name"]
del kwargs["filter_name"]

url = 'helpdesk/tickets/filter/%s?format=json' % filter_name
url = "helpdesk/tickets/filter/%s?format=json" % filter_name
page = 1
tickets = []

# Skip pagination by looping over each page and adding tickets
while True:
this_page = self._api._get(url + '&page=%d' % page, kwargs)
this_page = self._api._get(url + "&page=%d" % page, kwargs)
if len(this_page) == 0:
break
tickets += this_page
page += 1

return [self.get_ticket(t['display_id']) for t in tickets]
return [self.get_ticket(t["display_id"]) for t in tickets]

def list_all_tickets(self):
"""List all tickets, closed or open."""
return self.list_tickets(filter_name='all_tickets')
return self.list_tickets(filter_name="all_tickets")

def list_open_tickets(self):
"""List all new and open tickets."""
return self.list_tickets(filter_name='new_my_open')
return self.list_tickets(filter_name="new_my_open")

def list_deleted_tickets(self):
"""Lists all deleted tickets."""
return self.list_tickets(filter_name='deleted')
return self.list_tickets(filter_name="deleted")


class ContactAPI(object):
Expand All @@ -96,48 +96,42 @@ def list_contacts(self, **kwargs):
"""

url = 'contacts.json?'
if 'query' in kwargs.keys():
filter_query = kwargs.pop('query')
url = "contacts.json?"
if "query" in kwargs.keys():
filter_query = kwargs.pop("query")
url = url + "query={}".format(filter_query)

if 'state' in kwargs.keys():
state_query = kwargs.pop('state')
url = url + "state={}".format(state_query)
if "state" in kwargs.keys():
state_query = kwargs.pop("state")
url = url + "state={}".format(state_query)

if 'letter' in kwargs.keys():
name_query = kwargs.pop('letter')
url = url + "letter={}".format(name_query)
contacts = self._api._get(url)
return [Contact(**c['user']) for c in contacts]
if "letter" in kwargs.keys():
name_query = kwargs.pop("letter")
url = url + "letter={}".format(name_query)

contacts = self._api._get(url)
return [Contact(**c["user"]) for c in contacts]

def create_contact(self, *args, **kwargs):
"""Creates a contact"""
url = 'contacts.json'
contact_data = {
'active': True,
'helpdesk_agent': False,
'description': 'Freshdesk Contact'
}
url = "contacts.json"
contact_data = {"active": True, "helpdesk_agent": False, "description": "Freshdesk Contact"}
contact_data.update(kwargs)
payload = {
'user': contact_data
}
payload = {"user": contact_data}

return Contact(**self._api._post(url, data=payload)["user"])

return Contact(**self._api._post(url, data=payload)['user'])

def make_agent(self, contact_id):
url = 'contacts/%d/make_agent.json' % contact_id
agent = self._api._put(url)['agent']
return self._api.agents.get_agent(agent['id'])
url = "contacts/%d/make_agent.json" % contact_id
agent = self._api._put(url)["agent"]
return self._api.agents.get_agent(agent["id"])

def get_contact(self, contact_id):
url = 'contacts/%d.json' % contact_id
return Contact(**self._api._get(url)['user'])
url = "contacts/%d.json" % contact_id
return Contact(**self._api._get(url)["user"])

def delete_contact(self, contact_id):
url = 'contacts/%d.json' % contact_id
url = "contacts/%d.json" % contact_id
self._api._delete(url)


Expand All @@ -162,32 +156,32 @@ def list_agents(self, **kwargs):
"""

url = 'agents.json?'
if 'query' in kwargs.keys():
filter_query = kwargs.pop('query')
url = "agents.json?"
if "query" in kwargs.keys():
filter_query = kwargs.pop("query")
url = url + "query={}".format(filter_query)

if 'state' in kwargs.keys():
state_query = kwargs.pop('state')
url = url + "state={}".format(state_query)
agents = self._api._get(url)
return [Agent(**a['agent']) for a in agents]
if "state" in kwargs.keys():
state_query = kwargs.pop("state")
url = url + "state={}".format(state_query)

agents = self._api._get(url)
return [Agent(**a["agent"]) for a in agents]

def get_agent(self, agent_id):
"""Fetches the agent for the given agent ID"""
url = 'agents/%s.json' % agent_id
return Agent(**self._api._get(url)['agent'])
"""Fetches the agent for the given agent ID"""
url = "agents/%s.json" % agent_id
return Agent(**self._api._get(url)["agent"])

def update_agent(self, agent_id, **kwargs):
"""Updates an agent"""
url = 'agents/%s.json' % agent_id
agent = self._api._put(url, data=json.dumps(kwargs))['agent']
url = "agents/%s.json" % agent_id
agent = self._api._put(url, data=json.dumps(kwargs))["agent"]
return Agent(**agent)

def delete_agent(self, agent_id):
"""Delete the agent for the given agent ID"""
url = 'agents/%d.json' % agent_id
url = "agents/%d.json" % agent_id
self._api._delete(url)


Expand All @@ -196,8 +190,8 @@ def __init__(self, api):
self._api = api

def get_customer(self, company_id):
url = 'customers/%s.json' % company_id
return Customer(**self._api._get(url)['customer'])
url = "customers/%s.json" % company_id
return Customer(**self._api._get(url)["customer"])

def get_customer_from_contact(self, contact):
return self.get_customer(contact.customer_id)
Expand All @@ -208,24 +202,24 @@ def __init__(self, api):
self._api = api

def get_all_timesheets(self, **kwargs):
url = 'helpdesk/time_sheets.json'
url = "helpdesk/time_sheets.json"
if "filter_name" in kwargs.keys() and "filter_value" in kwargs.keys():
url = url + "?{}={}".format(kwargs["filter_name"], kwargs["filter_value"])
l = []
timesheet_data = self._api._get(url)
i = 0
while (i < len(timesheet_data)):
l.append(TimeEntry(**timesheet_data[i]['time_entry']))
while i < len(timesheet_data):
l.append(TimeEntry(**timesheet_data[i]["time_entry"]))
i = i + 1
return l

def get_timesheet_by_ticket(self, ticket_id):
url = 'helpdesk/tickets/%s/time_sheets.json' % ticket_id
url = "helpdesk/tickets/%s/time_sheets.json" % ticket_id
l = []
timesheet_data = self._api._get(url)
i = 0
while (i < len(timesheet_data)):
l.append(TimeEntry(**timesheet_data[i]['time_entry']))
while i < len(timesheet_data):
l.append(TimeEntry(**timesheet_data[i]["time_entry"]))
i = i + 1
return l

Expand All @@ -242,9 +236,9 @@ def __init__(self, domain, api_key):
.tickets: the Ticket API
"""

self._api_prefix = 'https://{}/'.format(domain.rstrip('/'))
self.auth = (api_key, 'X')
self.headers = {'Content-Type': 'application/json'}
self._api_prefix = "https://{}/".format(domain.rstrip("/"))
self.auth = (api_key, "X")
self.headers = {"Content-Type": "application/json"}

self.tickets = TicketAPI(self)
self.contacts = ContactAPI(self)
Expand All @@ -254,41 +248,27 @@ def __init__(self, domain, api_key):

def _get(self, url, params={}):
"""Wrapper around request.get() to use the API prefix. Returns a JSON response."""
r = requests.get(self._api_prefix + url,
params=params,
headers=self.headers,
auth=self.auth,
)
r = requests.get(self._api_prefix + url, params=params, headers=self.headers, auth=self.auth,)
return self._action(r)

def _post(self, url, data={}):
"""Wrapper around request.post() to use the API prefix. Returns a JSON response."""
r = requests.post(self._api_prefix + url,
data=json.dumps(data),
headers=self.headers,
auth=self.auth,
allow_redirects=False,
r = requests.post(
self._api_prefix + url, data=json.dumps(data), headers=self.headers, auth=self.auth, allow_redirects=False,
)
return self._action(r)
return self._action(r)

def _put(self, url, data={}):
"""Wrapper around request.put() to use the API prefix. Returns a JSON response."""
r = requests.put(self._api_prefix + url,
data=json.dumps(data),
headers=self.headers,
auth=self.auth,
allow_redirects=False,
r = requests.put(
self._api_prefix + url, data=json.dumps(data), headers=self.headers, auth=self.auth, allow_redirects=False,
)
return self._action(r)

def _delete(self, url):
"""Wrapper around request.delete() to use the API prefix. Returns a JSON response."""
r = requests.delete(self._api_prefix + url,
headers=self.headers,
auth=self.auth,
allow_redirects=False,
)
return self._action(r)
r = requests.delete(self._api_prefix + url, headers=self.headers, auth=self.auth, allow_redirects=False,)
return self._action(r)

def _action(self, res):
"""Returns JSON response or raise exception if errors are present"""
Expand All @@ -298,16 +278,17 @@ def _action(self, res):
res.raise_for_status()
j = {}

if 'Retry-After' in res.headers:
raise HTTPError('403 Forbidden: API rate-limit has been reached until {}.'
'See http://freshdesk.com/api#ratelimit'.format(res.headers['Retry-After']))
if "Retry-After" in res.headers:
raise HTTPError(
"403 Forbidden: API rate-limit has been reached until {}."
"See http://freshdesk.com/api#ratelimit".format(res.headers["Retry-After"])
)

if 'require_login' in j:
raise HTTPError('403 Forbidden: API key is incorrect for this domain')
if "require_login" in j:
raise HTTPError("403 Forbidden: API key is incorrect for this domain")

if 'error' in j:
raise HTTPError('{}: {}'.format(j.get('description'),
j.get('errors')))
if "error" in j:
raise HTTPError("{}: {}".format(j.get("description"), j.get("errors")))

# Catch any other errors
try:
Expand Down
Loading

0 comments on commit c753c70

Please sign in to comment.