# -*- coding: utf-8 -*- import os import pytest import threading import requests from tests.testserver.server import Server from .utils import override_environ def test_chunked_upload(): """can safely send generators""" close_server = threading.Event() server = Server.basic_response_server(wait_to_close_event=close_server) data = iter([b'a', b'b', b'c']) with server as (host, port): url = 'http://{0}:{1}/'.format(host, port) r = requests.post(url, data=data, stream=True) close_server.set() # release server block assert r.status_code == 200 assert r.request.headers['Transfer-Encoding'] == 'chunked' _schemes_by_var_prefix = [ ('http', ['http']), ('https', ['https']), ('all', ['http', 'https']), ] _proxy_combos = [] for prefix, schemes in _schemes_by_var_prefix: for scheme in schemes: _proxy_combos.append(("{0}_proxy".format(prefix), scheme)) _proxy_combos += [(var.upper(), scheme) for var, scheme in _proxy_combos] @pytest.mark.parametrize("var,scheme", _proxy_combos) def test_use_proxy_from_environment(httpbin, var, scheme): url = "{0}://httpbin.org".format(scheme) fake_proxy = Server() # do nothing with the requests; just close the socket with fake_proxy as (host, port): proxy_url = "socks5://{0}:{1}".format(host, port) kwargs = {var: proxy_url} with override_environ(**kwargs): # fake proxy's lack of response will cause a ConnectionError with pytest.raises(requests.exceptions.ConnectionError): requests.get(url) # the fake proxy received a request assert len(fake_proxy.handler_results) == 1 # it had actual content (not checking for SOCKS protocol for now) assert len(fake_proxy.handler_results[0]) > 0