Merge pull request #3979 from schlamar/bypass-proxy

improve proxy bypass on Windows
This commit is contained in:
Cory Benfield
2017-05-04 15:18:30 +01:00
committed by GitHub
6 changed files with 113 additions and 177 deletions
+2
View File
@@ -15,6 +15,8 @@ Release History
resolution on Windows.
- Added ``win_inet_pton`` as conditional dependency for the ``[socks]`` extra
on Windows with Python 2.7.
- Changed the proxy bypass implementation on Windows: the proxy bypass
check doesn't use forward and reverse DNS requests anymore
2.13.0 (2017-01-24)
+++++++++++++++++++
+4 -2
View File
@@ -37,7 +37,9 @@ except (ImportError, SyntaxError):
# ---------
if is_py2:
from urllib import quote, unquote, quote_plus, unquote_plus, urlencode, getproxies, proxy_bypass
from urllib import (
quote, unquote, quote_plus, unquote_plus, urlencode, getproxies,
proxy_bypass, proxy_bypass_environment, getproxies_environment)
from urlparse import urlparse, urlunparse, urljoin, urlsplit, urldefrag
from urllib2 import parse_http_list
import cookielib
@@ -54,7 +56,7 @@ if is_py2:
elif is_py3:
from urllib.parse import urlparse, urlunparse, urljoin, urlsplit, urlencode, quote, unquote, quote_plus, unquote_plus, urldefrag
from urllib.request import parse_http_list, getproxies, proxy_bypass
from urllib.request import parse_http_list, getproxies, proxy_bypass, proxy_bypass_environment, getproxies_environment
from http import cookiejar as cookielib
from http.cookies import Morsel
from io import StringIO
-89
View File
@@ -8,12 +8,9 @@ Data structures that power Requests.
"""
import collections
import time
from .compat import OrderedDict
current_time = getattr(time, 'monotonic', time.time)
class CaseInsensitiveDict(collections.MutableMapping):
"""A case-insensitive ``dict``-like object.
@@ -106,89 +103,3 @@ class LookupDict(dict):
def get(self, key, default=None):
return self.__dict__.get(key, default)
class TimedCacheManaged(object):
"""
Wrap a function call in a timed cache
"""
def __init__(self, fnc):
self.fnc = fnc
self.cache = TimedCache()
def __call__(self, *args, **kwargs):
key = args[0]
found = None
try:
found = self.cache[key]
except KeyError:
found = self.fnc(key, **kwargs)
self.cache[key] = found
return found
class TimedCache(collections.MutableMapping):
"""
Evicts entries after expiration_secs. If none are expired and maxlen is hit,
will evict the oldest cached entry
"""
def __init__(self, maxlen=32, expiration_secs=60):
"""
:param maxlen: most number of entries to hold on to
:param expiration_secs: the number of seconds to hold on
to entries
"""
self.maxlen = maxlen
self.expiration_secs = expiration_secs
self._dict = OrderedDict()
def __repr__(self):
return '<TimedCache maxlen:%d len:%d expiration_secs:%d>' % \
(self.maxlen, len(self._dict), self.expiration_secs)
def __iter__(self):
return ((key, value[1]) for key, value in self._dict.items())
def __delitem__(self, item):
del self._dict[item]
def __getitem__(self, key):
"""
Look up an item in the cache. If the item
has already expired, it will be invalidated and not returned
:param key: which entry to look up
:return: the value in the cache, or None
"""
occurred, value = self._dict[key]
now = int(current_time())
if now - occurred > self.expiration_secs:
del self._dict[key]
raise KeyError(key)
else:
return value
def __setitem__(self, key, value):
"""
Locates the value at lookup key, if cache is full, will evict the
oldest entry
:param key: the key to search the cache for
:param value: the value to be added to the cache
"""
now = int(current_time())
while len(self._dict) >= self.maxlen:
self._dict.popitem(last=False)
self._dict[key] = (now, value)
def __len__(self):
""":return: the length of the cache"""
return len(self._dict)
def clear(self):
"""Clears the cache"""
return self._dict.clear()
+53 -13
View File
@@ -14,6 +14,7 @@ import collections
import contextlib
import io
import os
import platform
import re
import socket
import struct
@@ -26,9 +27,10 @@ from ._internal_utils import to_native_string
from .compat import parse_http_list as _parse_list_header
from .compat import (
quote, urlparse, bytes, str, OrderedDict, unquote, getproxies,
proxy_bypass, urlunparse, basestring, integer_types)
proxy_bypass, urlunparse, basestring, integer_types, is_py3,
proxy_bypass_environment, getproxies_environment)
from .cookies import cookiejar_from_dict
from .structures import CaseInsensitiveDict, TimedCacheManaged
from .structures import CaseInsensitiveDict
from .exceptions import (
InvalidURL, InvalidHeader, FileModeWarning, UnrewindableBodyError)
@@ -37,6 +39,54 @@ NETRC_FILES = ('.netrc', '_netrc')
DEFAULT_CA_BUNDLE_PATH = certs.where()
if platform.system() == 'Windows':
# provide a proxy_bypass version on Windows without DNS lookups
def proxy_bypass_registry(host):
if is_py3:
import winreg
else:
import _winreg as winreg
try:
internetSettings = winreg.OpenKey(winreg.HKEY_CURRENT_USER,
r'Software\Microsoft\Windows\CurrentVersion\Internet Settings')
proxyEnable = winreg.QueryValueEx(internetSettings,
'ProxyEnable')[0]
proxyOverride = winreg.QueryValueEx(internetSettings,
'ProxyOverride')[0]
except OSError:
return False
if not proxyEnable or not proxyOverride:
return False
# make a check value list from the registry entry: replace the
# '<local>' string by the localhost entry and the corresponding
# canonical entry.
proxyOverride = proxyOverride.split(';')
# now check if we match one of the registry values.
for test in proxyOverride:
if test == '<local>':
if '.' not in host:
return True
test = test.replace(".", r"\.") # mask dots
test = test.replace("*", r".*") # change glob sequence
test = test.replace("?", r".") # change glob char
if re.match(test, host, re.I):
return True
return False
def proxy_bypass(host): # noqa
"""Return True, if the host should be bypassed.
Checks proxy settings gathered from the environment, if specified,
or the registry.
"""
if getproxies_environment():
return proxy_bypass_environment(host)
else:
return proxy_bypass_registry(host)
def dict_to_sequence(d):
"""Returns an internal sequence dictionary update."""
@@ -577,16 +627,6 @@ def set_environ(env_name, value):
os.environ[env_name] = old_value
@TimedCacheManaged
def _proxy_bypass_cached(netloc):
"""
Looks for netloc in the cache, if not found, will call proxy_bypass
for the netloc and store its result in the cache
:rtype: bool
"""
return proxy_bypass(netloc)
def should_bypass_proxies(url, no_proxy):
"""
Returns whether we should bypass proxies or not.
@@ -634,7 +674,7 @@ def should_bypass_proxies(url, no_proxy):
# legitimate problems.
with set_environ('no_proxy', no_proxy_arg):
try:
bypass = _proxy_bypass_cached(netloc)
bypass = proxy_bypass(netloc)
except (TypeError, socket.gaierror):
bypass = False
+1 -73
View File
@@ -2,7 +2,7 @@
import pytest
from requests.structures import CaseInsensitiveDict, LookupDict, TimedCache, TimedCacheManaged
from requests.structures import CaseInsensitiveDict, LookupDict
class TestCaseInsensitiveDict:
@@ -74,75 +74,3 @@ class TestLookupDict:
@get_item_parameters
def test_get(self, key, value):
assert self.lookup_dict.get(key) == value
class TestTimedCache(object):
@pytest.fixture(autouse=True)
def setup(self):
self.any_value = 'some value'
self.expiration_secs = 60
self.cache = TimedCache(expiration_secs=self.expiration_secs)
yield
self.cache.clear()
def test_get(self):
self.cache['a'] = self.any_value
assert self.cache['a'] is self.any_value
def test_repr(self):
repr = str(self.cache)
assert repr == '<TimedCache maxlen:32 len:0 expiration_secs:60>'
def test_get_expired_item(self, mocker):
self.cache = TimedCache(maxlen=1, expiration_secs=self.expiration_secs)
mocker.patch('requests.structures.current_time', lambda: 0)
self.cache['a'] = self.any_value
mocker.patch('requests.structures.current_time', lambda: self.expiration_secs + 1)
assert self.cache.get('a') is None
def test_evict_first_entry_when_full(self, mocker):
self.cache = TimedCache(maxlen=2, expiration_secs=2)
mocker.patch('requests.structures.current_time', lambda: 0)
self.cache['a'] = self.any_value
mocker.patch('requests.structures.current_time', lambda: 1)
self.cache['b'] = self.any_value
mocker.patch('requests.structures.current_time', lambda: 3)
self.cache['c'] = self.any_value
assert len(self.cache) is 2
with pytest.raises(KeyError, message='Expected key not found'):
self.cache['a']
assert self.cache['b'] is self.any_value
assert self.cache['c'] is self.any_value
def test_delete_item_removes_item(self):
self.cache['a'] = self.any_value
del self.cache['a']
with pytest.raises(KeyError, message='Expected key not found'):
self.cache['a']
def test_iterating_hides_timestamps(self):
self.cache['a'] = 1
self.cache['b'] = 2
expected = [('a', 1), ('b', 2)]
actual = [(key, val) for key, val in self.cache]
assert expected == actual
class TestTimedCacheManagedDecorator(object):
def test_caches_repeated_calls(self, mocker):
mocker.patch('requests.structures.current_time', lambda: 0)
nonlocals = {'value': 0}
@TimedCacheManaged
def some_method(x):
nonlocals['value'] = nonlocals['value'] + x
return nonlocals['value']
first_result = some_method(1)
assert first_result is 1
second_result = some_method(1)
assert second_result is 1
third_result = some_method(2)
assert third_result is 3
+53
View File
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import os
from io import BytesIO
import pytest
@@ -599,3 +600,55 @@ def test_should_bypass_proxies_no_proxy(
no_proxy = '192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1'
# Test 'no_proxy' argument
assert should_bypass_proxies(url, no_proxy=no_proxy) == expected
@pytest.mark.skipif(os.name != 'nt', reason='Test only on Windows')
@pytest.mark.parametrize(
'url, expected, override', (
('http://192.168.0.1:5000/', True, None),
('http://192.168.0.1/', True, None),
('http://172.16.1.1/', True, None),
('http://172.16.1.1:5000/', True, None),
('http://localhost.localdomain:5000/v1.0/', True, None),
('http://172.16.1.22/', False, None),
('http://172.16.1.22:5000/', False, None),
('http://google.com:5000/v1.0/', False, None),
('http://mylocalhostname:5000/v1.0/', True, '<local>'),
('http://192.168.0.1/', False, ''),
))
def test_should_bypass_proxies_win_registry(url, expected, override,
monkeypatch):
"""Tests for function should_bypass_proxies to check if proxy
can be bypassed or not with Windows registry settings
"""
if override is None:
override = '192.168.*;127.0.0.1;localhost.localdomain;172.16.1.1'
if compat.is_py3:
import winreg
else:
import _winreg as winreg
class RegHandle:
def Close(self):
pass
ie_settings = RegHandle()
def OpenKey(key, subkey):
return ie_settings
def QueryValueEx(key, value_name):
if key is ie_settings:
if value_name == 'ProxyEnable':
return [1]
elif value_name == 'ProxyOverride':
return [override]
monkeypatch.setenv('http_proxy', '')
monkeypatch.setenv('https_proxy', '')
monkeypatch.setenv('ftp_proxy', '')
monkeypatch.setenv('no_proxy', '')
monkeypatch.setenv('NO_PROXY', '')
monkeypatch.setattr(winreg, 'OpenKey', OpenKey)
monkeypatch.setattr(winreg, 'QueryValueEx', QueryValueEx)
assert should_bypass_proxies(url, no_proxy=None) == expected