mirror of
https://github.com/kennethreitz/requests.git
synced 2026-06-05 22:50:18 +00:00
remove all old tests
This commit is contained in:
+2
-2
@@ -4,7 +4,7 @@ python:
|
||||
- 2.7
|
||||
- 3.2
|
||||
env: HTTPBIN_URL=http://httpbin.org/
|
||||
script: make travis
|
||||
script: make test
|
||||
install:
|
||||
- pip install nose
|
||||
- make test-deps
|
||||
- pip install . --use-mirrors
|
||||
|
||||
@@ -1,34 +1,10 @@
|
||||
SHELL := /bin/bash
|
||||
|
||||
# test_requests_ext.py depends on external services, and async doesn't work under Python 3
|
||||
# Travis/Jenkins should be ensuring that all other tests pass on all supported versions
|
||||
CI_TESTS=$(shell find tests/ -name "*.py" ! -name "test_requests_ext.py" ! -name "test_requests_async.py")
|
||||
|
||||
iter:
|
||||
python test_iteration.py
|
||||
|
||||
init:
|
||||
python setup.py develop
|
||||
pip install -r requirements.txt
|
||||
|
||||
test:
|
||||
nosetests ./tests/*
|
||||
py.test
|
||||
|
||||
lazy:
|
||||
nosetests --with-color tests/test_requests.py
|
||||
|
||||
simple:
|
||||
nosetests tests/test_requests.py
|
||||
|
||||
citests:
|
||||
nosetests ${CI_TESTS} --with-xunit --xunit-file=junit-report.xml
|
||||
|
||||
ci: citests cipyflakes
|
||||
|
||||
travis: citests
|
||||
|
||||
server:
|
||||
gunicorn httpbin:app --bind=0.0.0.0:7077 &
|
||||
test-deps:
|
||||
pip install py.test
|
||||
|
||||
deps: urllib3 certs charade
|
||||
|
||||
@@ -47,6 +23,4 @@ charade:
|
||||
rm -fr charade
|
||||
|
||||
certs:
|
||||
cd requests && curl -O https://raw.github.com/kennethreitz/certifi/master/certifi/cacert.pem
|
||||
|
||||
docs: site
|
||||
cd requests && curl -O https://raw.github.com/kennethreitz/certifi/master/certifi/cacert.pem
|
||||
+2
-2
@@ -1,2 +1,2 @@
|
||||
nose
|
||||
rudolf2
|
||||
py.test
|
||||
sphinx
|
||||
@@ -1,29 +0,0 @@
|
||||
import requests
|
||||
import unittest
|
||||
|
||||
|
||||
class IterationTestCase(unittest.TestCase):
|
||||
|
||||
def test_assertion(self):
|
||||
assert 1
|
||||
|
||||
# def test_dzubia(self):
|
||||
# s = requests.Session()
|
||||
# r = requests.Request(method='GET', url='http://github.com/')
|
||||
|
||||
# # r = s.send(r)
|
||||
|
||||
def test_prepared_request(self):
|
||||
s = requests.Session()
|
||||
r = requests.Request(method='GET', url='http://github.com/')
|
||||
r = r.prepare()
|
||||
|
||||
r = s.send(r)
|
||||
print r
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -1,46 +0,0 @@
|
||||
import asyncore
|
||||
import threading
|
||||
import socket
|
||||
|
||||
class HttpServer(threading.Thread):
|
||||
def __init__(self, port):
|
||||
threading.Thread.__init__(self)
|
||||
self.dispatcher = HttpServerDispatcher(port)
|
||||
|
||||
def run(self):
|
||||
asyncore.loop()
|
||||
|
||||
@property
|
||||
def connection_count(self):
|
||||
return self.dispatcher.connection_count
|
||||
|
||||
def close(self):
|
||||
asyncore.close_all()
|
||||
|
||||
class HttpServerDispatcher(asyncore.dispatcher):
|
||||
def __init__(self, port):
|
||||
asyncore.dispatcher.__init__(self)
|
||||
self.connected = False
|
||||
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.bind(('127.0.0.1', port))
|
||||
self.listen(1)
|
||||
self.connection_count = 0
|
||||
|
||||
def handle_accept(self):
|
||||
self.connection_count += 1
|
||||
self.handler = RequestHandler(self.accept()[0])
|
||||
|
||||
def handle_close(self):
|
||||
self.close()
|
||||
|
||||
|
||||
class RequestHandler(asyncore.dispatcher_with_send):
|
||||
def __init__(self, sock):
|
||||
asyncore.dispatcher_with_send.__init__(self, sock)
|
||||
self.response = ("HTTP/1.1 200 OK\r\n"
|
||||
"Connection: keep-alive\r\n"
|
||||
"Content-Length: 0\r\n\r\n")
|
||||
|
||||
def handle_read(self):
|
||||
self.recv(1024)
|
||||
self.send(self.response)
|
||||
@@ -1,31 +0,0 @@
|
||||
"""
|
||||
This is an informal test originally written by Bluehorn;
|
||||
it verifies that Requests does not leak connections when
|
||||
the body of the request is not read.
|
||||
"""
|
||||
|
||||
import gc
|
||||
import os
|
||||
import requests
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
|
||||
def main():
|
||||
gc.disable()
|
||||
|
||||
for x in range(20):
|
||||
requests.head("http://www.google.com/")
|
||||
|
||||
print("Open sockets after 20 head requests:")
|
||||
pid = os.getpid()
|
||||
subprocess.call("lsof -p%d -a -iTCP" % (pid,), shell=True)
|
||||
|
||||
gcresult = gc.collect()
|
||||
print("Garbage collection result: %s" % (gcresult,))
|
||||
|
||||
print("Open sockets after garbage collection:")
|
||||
subprocess.call("lsof -p%d -a -iTCP" % (pid,), shell=True)
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main())
|
||||
@@ -1,244 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import sys
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
# Path hack.
|
||||
sys.path.insert(0, os.path.abspath('..'))
|
||||
import requests
|
||||
from requests.compat import cookielib
|
||||
|
||||
# More hacks
|
||||
sys.path.append('.')
|
||||
from test_requests import httpbin, TestBaseMixin
|
||||
|
||||
|
||||
class CookieTests(TestBaseMixin, unittest.TestCase):
|
||||
|
||||
def test_cookies_from_response(self):
|
||||
"""Basic test that we correctly parse received cookies in the Response object."""
|
||||
r = requests.get(httpbin('cookies', 'set', 'myname', 'myvalue'))
|
||||
|
||||
# test deprecated dictionary interface
|
||||
self.assertEqual(r.cookies['myname'], 'myvalue')
|
||||
self.assertEqual(r.cookies.get('myname'), 'myvalue')
|
||||
# test CookieJar interface
|
||||
jar = r.cookies
|
||||
self.assertEqual(len(jar), 1)
|
||||
cookie_from_jar = list(jar)[0]
|
||||
self.assertCookieHas(cookie_from_jar, name='myname', value='myvalue')
|
||||
|
||||
q = requests.get(httpbin('cookies'), cookies=jar)
|
||||
self.assertEqual(json.loads(q.text)['cookies'], {'myname': 'myvalue'})
|
||||
|
||||
def test_crossdomain_cookies(self):
|
||||
"""Cookies should not be sent to domains they didn't originate from."""
|
||||
r = requests.get("http://github.com")
|
||||
c = r.cookies
|
||||
# github should send us cookies
|
||||
self.assertTrue(len(c) >= 1)
|
||||
|
||||
# github cookies should not be sent to httpbin.org:
|
||||
r2 = requests.get(httpbin('cookies'), cookies=c)
|
||||
self.assertEqual(json.loads(r2.text)['cookies'], {})
|
||||
|
||||
# let's do this again using the session object
|
||||
s = requests.session()
|
||||
s.get("http://github.com")
|
||||
self.assertTrue(len(s.cookies) >= 1)
|
||||
r = s.get(httpbin('cookies'))
|
||||
self.assertEqual(json.loads(r.text)['cookies'], {})
|
||||
# we can set a cookie and get exactly that same-domain cookie back:
|
||||
r = s.get(httpbin('cookies', 'set', 'myname', 'myvalue'))
|
||||
self.assertEqual(json.loads(r.text)['cookies'], {'myname': 'myvalue'})
|
||||
|
||||
def test_overwrite(self):
|
||||
"""Cookies should get overwritten when appropriate."""
|
||||
r = requests.get(httpbin('cookies', 'set', 'shimon', 'yochai'))
|
||||
cookies = r.cookies
|
||||
requests.get(httpbin('cookies', 'set', 'elazar', 'shimon'), cookies=cookies)
|
||||
r = requests.get(httpbin('cookies'), cookies=cookies)
|
||||
self.assertEqual(json.loads(r.text)['cookies'],
|
||||
{'shimon': 'yochai', 'elazar': 'shimon'})
|
||||
# overwrite the value of 'shimon'
|
||||
r = requests.get(httpbin('cookies', 'set', 'shimon', 'gamaliel'), cookies=cookies)
|
||||
self.assertEqual(len(cookies), 2)
|
||||
r = requests.get(httpbin('cookies'), cookies=cookies)
|
||||
self.assertEqual(json.loads(r.text)['cookies'],
|
||||
{'shimon': 'gamaliel', 'elazar': 'shimon'})
|
||||
|
||||
def test_redirects(self):
|
||||
"""Test that cookies set by a 302 page are correctly processed."""
|
||||
r = requests.get(httpbin('cookies', 'set', 'redirects', 'work'))
|
||||
self.assertEqual(r.history[0].status_code, 302)
|
||||
expected_cookies = {'redirects': 'work'}
|
||||
self.assertEqual(json.loads(r.text)['cookies'], expected_cookies)
|
||||
|
||||
r2 = requests.get(httpbin('cookies', 'set', 'very', 'well'), cookies=r.cookies)
|
||||
expected_cookies = {'redirects': 'work', 'very': 'well'}
|
||||
self.assertEqual(json.loads(r2.text)['cookies'], expected_cookies)
|
||||
self.assertTrue(r.cookies is r2.cookies)
|
||||
|
||||
def test_none_cookie(self):
|
||||
"""Regression test: don't send a Cookie header with a string value of 'None'!"""
|
||||
page = json.loads(requests.get(httpbin('headers')).text)
|
||||
self.assertTrue('Cookie' not in page['headers'])
|
||||
|
||||
def test_secure_cookies(self):
|
||||
"""Test that secure cookies can only be sent via https."""
|
||||
header = "Set-Cookie: ThisIsA=SecureCookie; Path=/; Secure; HttpOnly"
|
||||
url = 'https://httpbin.org/response-headers?%s' % (requests.utils.quote(header),)
|
||||
cookies = requests.get(url, verify=False).cookies
|
||||
self.assertEqual(len(cookies), 1)
|
||||
self.assertEqual(list(cookies)[0].secure, True)
|
||||
|
||||
secure_resp = requests.get('https://httpbin.org/cookies', cookies=cookies, verify=False)
|
||||
secure_cookies_sent = json.loads(secure_resp.text)['cookies']
|
||||
self.assertEqual(secure_cookies_sent, {'ThisIsA': 'SecureCookie'})
|
||||
|
||||
insecure_resp = requests.get('http://httpbin.org/cookies', cookies=cookies)
|
||||
insecure_cookies_sent = json.loads(insecure_resp.text)['cookies']
|
||||
self.assertEqual(insecure_cookies_sent, {})
|
||||
|
||||
def test_jar_utility_functions(self):
|
||||
"""Test utility functions such as list_domains, list_paths, multiple_domains."""
|
||||
r = requests.get("http://github.com")
|
||||
c = r.cookies
|
||||
# github should send us cookies
|
||||
self.assertTrue(len(c) >= 1)
|
||||
self.assertEqual(len(c), len(r.cookies.keys()))
|
||||
self.assertEqual(len(c), len(r.cookies.values()))
|
||||
self.assertEqual(len(c), len(r.cookies.items()))
|
||||
|
||||
# domain and path utility functions
|
||||
domain = r.cookies.list_domains()[0]
|
||||
path = r.cookies.list_paths()[0]
|
||||
self.assertEqual(dict(r.cookies), r.cookies.get_dict(domain=domain, path=path))
|
||||
self.assertEqual(len(r.cookies.list_domains()), 1)
|
||||
self.assertEqual(len(r.cookies.list_paths()), 1)
|
||||
self.assertFalse(r.cookies.multiple_domains())
|
||||
|
||||
def test_convert_jar_to_dict(self):
|
||||
"""Test that keys, values, and items are defined and that we can convert
|
||||
cookie jars to plain old Python dicts."""
|
||||
r = requests.get(httpbin('cookies', 'set', 'myname', 'myvalue'))
|
||||
|
||||
# test keys, values, and items
|
||||
self.assertEqual(r.cookies.keys(), ['myname'])
|
||||
self.assertEqual(r.cookies.values(), ['myvalue'])
|
||||
self.assertEqual(r.cookies.items(), [('myname', 'myvalue')])
|
||||
|
||||
# test if we can convert jar to dict
|
||||
dictOfCookies = dict(r.cookies)
|
||||
self.assertEqual(dictOfCookies, {'myname': 'myvalue'})
|
||||
self.assertEqual(dictOfCookies, r.cookies.get_dict())
|
||||
|
||||
|
||||
class LWPCookieJarTest(TestBaseMixin, unittest.TestCase):
|
||||
"""Check store/load of cookies to FileCookieJar's, specifically LWPCookieJar's."""
|
||||
|
||||
COOKIEJAR_CLASS = cookielib.LWPCookieJar
|
||||
|
||||
def setUp(self):
|
||||
# blank the file
|
||||
self.cookiejar_file = tempfile.NamedTemporaryFile()
|
||||
self.cookiejar_filename = self.cookiejar_file.name
|
||||
cookiejar = self.COOKIEJAR_CLASS(self.cookiejar_filename)
|
||||
cookiejar.save()
|
||||
|
||||
def tearDown(self):
|
||||
try:
|
||||
self.cookiejar_file.close()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def test_cookiejar_persistence(self):
|
||||
"""Test that we can save cookies to a FileCookieJar."""
|
||||
cookiejar = self.COOKIEJAR_CLASS(self.cookiejar_filename)
|
||||
cookiejar.load()
|
||||
# initially should be blank
|
||||
self.assertEqual(len(cookiejar), 0)
|
||||
|
||||
response = requests.get(httpbin('cookies', 'set', 'key', 'value'), cookies=cookiejar)
|
||||
self.assertEqual(len(cookiejar), 1)
|
||||
cookie = list(cookiejar)[0]
|
||||
self.assertEqual(json.loads(response.text)['cookies'], {'key': 'value'})
|
||||
self.assertCookieHas(cookie, name='key', value='value')
|
||||
|
||||
# save and reload the cookies from the file:
|
||||
cookiejar.save(ignore_discard=True)
|
||||
cookiejar_2 = self.COOKIEJAR_CLASS(self.cookiejar_filename)
|
||||
cookiejar_2.load(ignore_discard=True)
|
||||
self.assertEqual(len(cookiejar_2), 1)
|
||||
cookie_2 = list(cookiejar_2)[0]
|
||||
# this cookie should have been saved with the correct domain restriction:
|
||||
self.assertCookieHas(cookie_2, name='key', value='value',
|
||||
domain='httpbin.org', path='/')
|
||||
|
||||
# httpbin sets session cookies, so if we don't ignore the discard attribute,
|
||||
# there should be no cookie:
|
||||
cookiejar_3 = self.COOKIEJAR_CLASS(self.cookiejar_filename)
|
||||
cookiejar_3.load()
|
||||
self.assertEqual(len(cookiejar_3), 0)
|
||||
|
||||
def test_crossdomain(self):
|
||||
"""Test persistence of the domains associated with the cookies."""
|
||||
cookiejar = self.COOKIEJAR_CLASS(self.cookiejar_filename)
|
||||
cookiejar.load()
|
||||
self.assertEqual(len(cookiejar), 0)
|
||||
|
||||
# github sets a cookie
|
||||
requests.get("http://github.com", cookies=cookiejar)
|
||||
num_github_cookies = len(cookiejar)
|
||||
self.assertTrue(num_github_cookies >= 1)
|
||||
# httpbin sets another
|
||||
requests.get(httpbin('cookies', 'set', 'key', 'value'), cookies=cookiejar)
|
||||
num_total_cookies = len(cookiejar)
|
||||
self.assertTrue(num_total_cookies >= 2)
|
||||
self.assertTrue(num_total_cookies > num_github_cookies)
|
||||
|
||||
# save and load
|
||||
cookiejar.save(ignore_discard=True)
|
||||
cookiejar_2 = self.COOKIEJAR_CLASS(self.cookiejar_filename)
|
||||
cookiejar_2.load(ignore_discard=True)
|
||||
self.assertEqual(len(cookiejar_2), num_total_cookies)
|
||||
r = requests.get(httpbin('cookies'), cookies=cookiejar_2)
|
||||
self.assertEqual(json.loads(r.text)['cookies'], {'key': 'value'})
|
||||
|
||||
def test_persistent_cookies(self):
|
||||
"""Test that we correctly interpret persistent cookies."""
|
||||
# httpbin's normal cookie methods don't send persistent cookies,
|
||||
# so cook up the appropriate header and force it to send
|
||||
header = "Set-Cookie: Persistent=CookiesAreScary; expires=Sun, 04-May-2032 04:56:50 GMT; path=/"
|
||||
url = httpbin('response-headers?%s' % (requests.utils.quote(header),))
|
||||
cookiejar = self.COOKIEJAR_CLASS(self.cookiejar_filename)
|
||||
|
||||
requests.get(url, cookies=cookiejar)
|
||||
self.assertEqual(len(cookiejar), 1)
|
||||
self.assertCookieHas(list(cookiejar)[0], name='Persistent', value='CookiesAreScary')
|
||||
|
||||
requests.get(httpbin('cookies', 'set', 'ThisCookieIs', 'SessionOnly'), cookies=cookiejar)
|
||||
self.assertEqual(len(cookiejar), 2)
|
||||
self.assertEqual(len([c for c in cookiejar if c.name == 'Persistent']), 1)
|
||||
self.assertEqual(len([c for c in cookiejar if c.name == 'ThisCookieIs']), 1)
|
||||
|
||||
# save and load
|
||||
cookiejar.save()
|
||||
cookiejar_2 = self.COOKIEJAR_CLASS(self.cookiejar_filename)
|
||||
cookiejar_2.load()
|
||||
# we should only load the persistent cookie
|
||||
self.assertEqual(len(cookiejar_2), 1)
|
||||
self.assertCookieHas(list(cookiejar_2)[0], name='Persistent', value='CookiesAreScary')
|
||||
|
||||
|
||||
class MozCookieJarTest(LWPCookieJarTest):
|
||||
"""Same test, but substitute MozillaCookieJar."""
|
||||
|
||||
COOKIEJAR_CLASS = cookielib.MozillaCookieJar
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -1,65 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
# Path hack.
|
||||
sys.path.insert(0, os.path.abspath('..'))
|
||||
import requests
|
||||
import dummy_server
|
||||
|
||||
class KeepAliveTests(unittest.TestCase):
|
||||
server_and_proxy_port = 1234
|
||||
request_count = 2
|
||||
url = 'http://localhost:{0}'.format(server_and_proxy_port)
|
||||
proxies={'http': url}
|
||||
|
||||
def setUp(self):
|
||||
self.session = requests.session()
|
||||
self.proxy_server = dummy_server.HttpServer(self.server_and_proxy_port)
|
||||
self.proxy_server.start()
|
||||
|
||||
def tearDown(self):
|
||||
self.proxy_server.close()
|
||||
|
||||
def test_keep_alive_with_direct_connection(self):
|
||||
self.make_requests()
|
||||
self.check_each_request_are_in_same_connection()
|
||||
|
||||
def test_no_keep_alive_with_direct_connection(self):
|
||||
self.disable_keep_alive_in_session()
|
||||
self.make_requests()
|
||||
self.check_each_request_are_in_different_connection()
|
||||
|
||||
def test_keep_alive_with_proxy_connection(self):
|
||||
self.make_proxy_requests()
|
||||
self.check_each_request_are_in_same_connection()
|
||||
|
||||
def test_no_keep_alive_with_proxy_connection(self):
|
||||
self.disable_keep_alive_in_session()
|
||||
self.make_proxy_requests()
|
||||
self.check_each_request_are_in_different_connection()
|
||||
|
||||
def make_proxy_requests(self):
|
||||
self.make_requests(self.proxies)
|
||||
|
||||
def make_requests(self, proxies=None):
|
||||
for _ in range(self.request_count):
|
||||
self.session.get(self.url, proxies=proxies).text
|
||||
|
||||
def check_each_request_are_in_same_connection(self):
|
||||
"""Keep-alive requests open a single connection to the server."""
|
||||
self.assertEqual(self.proxy_server.connection_count, 1)
|
||||
|
||||
def check_each_request_are_in_different_connection(self):
|
||||
"""Keep-alive requests open a single connection to the server."""
|
||||
self.assertEqual(self.proxy_server.connection_count, self.request_count)
|
||||
|
||||
def disable_keep_alive_in_session(self):
|
||||
self.session.config['keep_alive'] = False
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -1,29 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
# Path hack.
|
||||
sys.path.insert(0, os.path.abspath('..'))
|
||||
import requests
|
||||
|
||||
|
||||
class HTTPSProxyTest(unittest.TestCase):
|
||||
"""Smoke test for https functionality."""
|
||||
|
||||
smoke_url = "https://github.com"
|
||||
|
||||
def test_empty_https_proxy(self):
|
||||
proxy = {"https": ""}
|
||||
result = requests.get(self.smoke_url, verify=False, proxies=proxy)
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
def test_empty_http_proxy(self):
|
||||
proxy = {"http": ""}
|
||||
result = requests.get(self.smoke_url, proxies=proxy)
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,124 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Path hack.
|
||||
import os
|
||||
import sys
|
||||
sys.path.insert(0, os.path.abspath('..'))
|
||||
|
||||
import unittest
|
||||
|
||||
import requests
|
||||
from requests.compat import is_py2, is_py3
|
||||
|
||||
try:
|
||||
import omnijson as json
|
||||
except ImportError:
|
||||
import json
|
||||
|
||||
|
||||
class RequestsTestSuite(unittest.TestCase):
|
||||
"""Requests test cases."""
|
||||
|
||||
# It goes to eleven.
|
||||
_multiprocess_can_split_ = True
|
||||
|
||||
def test_addition(self):
|
||||
self.assertEqual(2, 1 + 1)
|
||||
|
||||
def test_ssl_hostname_ok(self):
|
||||
requests.get('https://github.com', verify=True)
|
||||
|
||||
def test_ssl_hostname_not_ok(self):
|
||||
requests.get('https://kennethreitz.com', verify=False)
|
||||
|
||||
self.assertRaises(requests.exceptions.SSLError, requests.get, 'https://kennethreitz.com')
|
||||
|
||||
def test_ssl_hostname_session_not_ok(self):
|
||||
|
||||
s = requests.session()
|
||||
|
||||
self.assertRaises(requests.exceptions.SSLError, s.get, 'https://kennethreitz.com')
|
||||
|
||||
s.get('https://kennethreitz.com', verify=False)
|
||||
|
||||
def test_binary_post(self):
|
||||
'''We need to be careful how we build the utf-8 string since
|
||||
unicode literals are a syntax error in python3
|
||||
'''
|
||||
|
||||
if is_py2:
|
||||
# Blasphemy!
|
||||
utf8_string = eval("u'Smörgås'.encode('utf-8')")
|
||||
elif is_py3:
|
||||
utf8_string = 'Smörgås'.encode('utf-8')
|
||||
else:
|
||||
raise EnvironmentError('Flesh out this test for your environment.')
|
||||
requests.post('http://www.google.com/', data=utf8_string)
|
||||
|
||||
def test_unicode_error(self):
|
||||
url = 'http://blip.fm/~1abvfu'
|
||||
requests.get(url)
|
||||
|
||||
def test_chunked_head_redirect(self):
|
||||
url = "http://t.co/NFrx0zLG"
|
||||
r = requests.head(url, allow_redirects=True)
|
||||
self.assertEqual(r.status_code, 200)
|
||||
|
||||
def test_unicode_redirect(self):
|
||||
'''This url redirects to a location that has a nonstandard
|
||||
character in it, that breaks requests in python2.7
|
||||
|
||||
After some research, the cause was identified as an unintended
|
||||
sideeffect of overriding of str with unicode.
|
||||
|
||||
In the case that the redirected url is actually a malformed
|
||||
"bytes" object, i.e. a string with character c where
|
||||
ord(c) > 127,
|
||||
then unicode(url) breaks.
|
||||
'''
|
||||
r = requests.get('http://www.marketwire.com/mw/release.' +
|
||||
'do?id=1628202&sourceType=3')
|
||||
self.assertTrue(r.ok)
|
||||
|
||||
def test_unicode_url_outright(self):
|
||||
'''This url visits in my browser'''
|
||||
r = requests.get('http://www.marketwire.com/press-release/' +
|
||||
'jp-morgan-behauptet-sich-der-spitze-euro' +
|
||||
'p%C3%A4ischer-anleihe-analysten-laut-umf' +
|
||||
'rageergebnissen-1628202.htm')
|
||||
self.assertTrue(r.ok)
|
||||
|
||||
def test_redirect_encoding(self):
|
||||
'''This url redirects to
|
||||
http://www.dealipedia.com/deal_view_investment.php?r=20012'''
|
||||
|
||||
r = requests.get('http://feedproxy.google.com/~r/Dealipedia' +
|
||||
'News/~3/BQtUJRJeZlo/deal_view_investment.' +
|
||||
'php')
|
||||
self.assertTrue(r.ok)
|
||||
|
||||
def test_cookies_on_redirects(self):
|
||||
"""Test interaction between cookie handling and redirection."""
|
||||
# get a cookie for tinyurl.com ONLY
|
||||
s = requests.session()
|
||||
s.get(url='http://tinyurl.com/preview.php?disable=1')
|
||||
# we should have set a cookie for tinyurl: preview=0
|
||||
self.assertTrue('preview' in s.cookies)
|
||||
self.assertEqual(s.cookies['preview'], '0')
|
||||
self.assertEqual(list(s.cookies)[0].name, 'preview')
|
||||
self.assertEqual(list(s.cookies)[0].domain, 'tinyurl.com')
|
||||
|
||||
# get cookies on another domain
|
||||
r2 = s.get(url='http://httpbin.org/cookies')
|
||||
# the cookie is not there
|
||||
self.assertTrue('preview' not in json.loads(r2.text)['cookies'])
|
||||
|
||||
# this redirects to another domain, httpbin.org
|
||||
# cookies of the first domain should NOT be sent to the next one
|
||||
r3 = s.get(url='http://tinyurl.com/7zp3jnr')
|
||||
self.assertEqual(r3.url, 'http://httpbin.org/cookies')
|
||||
self.assertTrue('preview' not in json.loads(r3.text)['cookies'])
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -1,32 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
# Path hack.
|
||||
sys.path.insert(0, os.path.abspath('..'))
|
||||
import requests
|
||||
|
||||
|
||||
class HTTPSTest(unittest.TestCase):
|
||||
"""Smoke test for https functionality."""
|
||||
|
||||
smoke_url = "https://github.com"
|
||||
|
||||
def perform_smoke_test(self, verify=False):
|
||||
result = requests.get(self.smoke_url, verify=verify)
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
def test_smoke(self):
|
||||
"""Smoke test without verification."""
|
||||
self.perform_smoke_test(verify=False)
|
||||
|
||||
def test_smoke_verified(self):
|
||||
"""Smoke test with SSL verification."""
|
||||
self.perform_smoke_test(verify=True)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -1,122 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import codecs
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
import random
|
||||
|
||||
# Path hack.
|
||||
sys.path.insert(0, os.path.abspath('..'))
|
||||
from requests.utils import get_environ_proxies
|
||||
import requests.utils
|
||||
from requests.compat import is_py3, bytes
|
||||
|
||||
|
||||
if is_py3:
|
||||
unichr = chr
|
||||
byteschr = lambda c: bytes([c])
|
||||
else:
|
||||
byteschr = chr
|
||||
|
||||
|
||||
class UtilityTests(unittest.TestCase):
|
||||
"""Tests for the JSON UTF encoding guessing code."""
|
||||
|
||||
codecs = (
|
||||
'utf-8', 'utf-8-sig',
|
||||
'utf-16', 'utf-16-le', 'utf-16-be',
|
||||
'utf-32', 'utf-32-le', 'utf-32-be'
|
||||
)
|
||||
|
||||
def test_guess_encoding(self):
|
||||
# Throw 4-character ASCII strings (encoded to a UTF encoding)
|
||||
# at the guess routine; it should correctly guess all codecs.
|
||||
guess = requests.utils.guess_json_utf
|
||||
for c in range(33, 127): # printable only
|
||||
sample = unichr(c) * 4
|
||||
for codec in self.codecs:
|
||||
res = guess(sample.encode(codec))
|
||||
self.assertEqual(res, codec)
|
||||
|
||||
def test_smoke_encoding(self):
|
||||
# Throw random 4-byte strings at the guess function.
|
||||
# Any guess for a UTF encoding is verified, a decode exception
|
||||
# is a test failure.
|
||||
guess = requests.utils.guess_json_utf
|
||||
for i in range(1000):
|
||||
sample = bytes().join(
|
||||
[byteschr(random.randrange(256)) for _ in range(4)])
|
||||
res = guess(sample)
|
||||
if res is not None:
|
||||
# This should decode without errors if this is *really*
|
||||
# something in this encoding. However, UTF-8 is a lot
|
||||
# more picky, so we expect errors there. UTF-16 surrogate
|
||||
# pairs also fail
|
||||
try:
|
||||
sample.decode(res)
|
||||
except UnicodeDecodeError as e:
|
||||
self.assertEqual(e.args[0].replace('-', '').lower(),
|
||||
res.replace('-', '').lower())
|
||||
if res == 'utf-8':
|
||||
self.assertTrue(e.args[-1], (
|
||||
'invalid continuation byte',
|
||||
'invalid start byte'))
|
||||
continue
|
||||
if res == 'utf-16':
|
||||
self.assertEqual(e.args[-1], 'unexpected end of data')
|
||||
self.assertTrue(sample[:2] in (
|
||||
codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE))
|
||||
# the second two bytes are in the range \ud800-\udfff
|
||||
# if someone wants to add tests for that as well. I don't
|
||||
# see the need; we are not testing UTF decoding here.
|
||||
continue
|
||||
raise
|
||||
|
||||
def test_get_environ_proxies_respects_no_proxy(self):
|
||||
'''This test confirms that the no_proxy environment setting is
|
||||
respected by get_environ_proxies().'''
|
||||
|
||||
# Store the current environment settings.
|
||||
try:
|
||||
old_http_proxy = os.environ['http_proxy']
|
||||
except KeyError:
|
||||
old_http_proxy = None
|
||||
|
||||
try:
|
||||
old_no_proxy = os.environ['no_proxy']
|
||||
except KeyError:
|
||||
old_no_proxy = None
|
||||
|
||||
# Set up some example environment settings.
|
||||
os.environ['http_proxy'] = 'http://www.example.com/'
|
||||
os.environ['no_proxy'] = r'localhost,.0.0.1:8080'
|
||||
|
||||
# Set up expected proxy return values.
|
||||
proxy_yes = {'http': 'http://www.example.com/'}
|
||||
proxy_no = {}
|
||||
|
||||
# Check that we get the right things back.
|
||||
self.assertEqual(proxy_yes,
|
||||
get_environ_proxies('http://www.google.com/'))
|
||||
self.assertEqual(proxy_no,
|
||||
get_environ_proxies('http://localhost/test'))
|
||||
self.assertEqual(proxy_no,
|
||||
get_environ_proxies('http://127.0.0.1:8080/'))
|
||||
self.assertEqual(proxy_yes,
|
||||
get_environ_proxies('http://127.0.0.1:8081/'))
|
||||
|
||||
# Return the settings to what they were.
|
||||
if old_http_proxy:
|
||||
os.environ['http_proxy'] = old_http_proxy
|
||||
else:
|
||||
del os.environ['http_proxy']
|
||||
|
||||
if old_no_proxy:
|
||||
os.environ['no_proxy'] = old_no_proxy
|
||||
else:
|
||||
del os.environ['no_proxy']
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user