mirror of
https://github.com/kennethreitz/requests.git
synced 2026-06-05 14:50:16 +00:00
57 lines
1.8 KiB
Python
57 lines
1.8 KiB
Python
import os
|
|
import pytest
|
|
import threading
|
|
import requests
|
|
|
|
from tests.testserver.server import Server
|
|
|
|
from .utils import override_environ
|
|
|
|
|
|
def test_chunked_upload():
|
|
"""can safely send generators"""
|
|
close_server = threading.Event()
|
|
server = Server.basic_response_server(wait_to_close_event=close_server)
|
|
data = iter([b'a', b'b', b'c'])
|
|
|
|
with server as (host, port):
|
|
url = 'http://{0}:{1}/'.format(host, port)
|
|
r = requests.post(url, data=data, stream=True)
|
|
close_server.set() # release server block
|
|
|
|
assert r.status_code == 200
|
|
assert r.request.headers['Transfer-Encoding'] == 'chunked'
|
|
|
|
|
|
_schemes_by_var_prefix = [
|
|
('http', ['http']),
|
|
('https', ['https']),
|
|
('all', ['http', 'https']),
|
|
]
|
|
|
|
_proxy_combos = []
|
|
for prefix, schemes in _schemes_by_var_prefix:
|
|
for scheme in schemes:
|
|
_proxy_combos.append(("{0}_proxy".format(prefix), scheme))
|
|
|
|
_proxy_combos += [(var.upper(), scheme) for var, scheme in _proxy_combos]
|
|
|
|
|
|
@pytest.mark.parametrize("var,scheme", _proxy_combos)
|
|
def test_use_proxy_from_environment(httpbin, var, scheme):
|
|
url = "{0}://httpbin.org".format(scheme)
|
|
fake_proxy = Server() # do nothing with the requests; just close the socket
|
|
with fake_proxy as (host, port):
|
|
proxy_url = "socks5://{0}:{1}".format(host, port)
|
|
kwargs = {var: proxy_url}
|
|
with override_environ(**kwargs):
|
|
# fake proxy's lack of response will cause a ConnectionError
|
|
with pytest.raises(requests.exceptions.ConnectionError):
|
|
requests.get(url)
|
|
|
|
# the fake proxy received a request
|
|
assert len(fake_proxy.handler_results) == 1
|
|
|
|
# it had actual content (not checking for SOCKS protocol for now)
|
|
assert len(fake_proxy.handler_results[0]) > 0
|