diff --git a/Jenkinsfile b/Jenkinsfile
index 46624d57d2..030d2f21bd 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -21,7 +21,7 @@ Matrix Types:
Parameters:
- EVENT_LOOP: 'LIBEV' (Default), 'GEVENT', 'EVENTLET', 'ASYNCIO', 'ASYNCORE', 'TWISTED'
+ EVENT_LOOP: 'LIBEV' (Default), 'ASYNCIO', 'ASYNCORE'
CYTHON: Default, 'True', 'False'
*/
@@ -296,8 +296,6 @@ def executeStandardTests() {
failure=0
EVENT_LOOP=${EVENT_LOOP} VERIFY_CYTHON=${CYTHON_ENABLED} JVM_EXTRA_OPTS="$JVM_EXTRA_OPTS -Xss384k" pytest -s -v --log-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --junit-xml=unit_results.xml tests/unit/ || failure=1
- EVENT_LOOP_MANAGER=eventlet VERIFY_CYTHON=${CYTHON_ENABLED} JVM_EXTRA_OPTS="$JVM_EXTRA_OPTS -Xss384k" pytest -s -v --log-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --junit-xml=unit_eventlet_results.xml tests/unit/io/test_eventletreactor.py || failure=1
- EVENT_LOOP_MANAGER=gevent VERIFY_CYTHON=${CYTHON_ENABLED} JVM_EXTRA_OPTS="$JVM_EXTRA_OPTS -Xss384k" pytest -s -v --log-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --junit-xml=unit_gevent_results.xml tests/unit/io/test_geventreactor.py || failure=1
exit $failure
'''
} catch (err) {
@@ -688,7 +686,7 @@ pipeline {
''')
choice(
name: 'EVENT_LOOP',
- choices: ['LIBEV', 'GEVENT', 'EVENTLET', 'ASYNCIO', 'ASYNCORE', 'TWISTED'],
+ choices: ['LIBEV', 'ASYNCIO', 'ASYNCORE'],
description: '''
Event loop manager to utilize for scheduled or adhoc builds
@@ -701,14 +699,6 @@ pipeline {
| LIBEV |
A full-featured and high-performance event loop that is loosely modeled after libevent, but without its limitations and bugs |
-
- | GEVENT |
- A co-routine -based Python networking library that uses greenlet to provide a high-level synchronous API on top of the libev or libuv event loop |
-
-
- | EVENTLET |
- A concurrent networking library for Python that allows you to change how you run your code, not how you write it |
-
| ASYNCIO |
A library to write concurrent code using the async/await syntax |
@@ -717,10 +707,6 @@ pipeline {
ASYNCORE |
A module provides the basic infrastructure for writing asynchronous socket service clients and servers |
-
- | TWISTED |
- An event-driven networking engine written in Python and licensed under the open source MIT license |
-
''')
choice(
name: 'CI_SCHEDULE',
diff --git a/benchmarks/base.py b/benchmarks/base.py
index 290ba28788..bdfcfa4493 100644
--- a/benchmarks/base.py
+++ b/benchmarks/base.py
@@ -67,15 +67,6 @@
except (ImportError, SyntaxError):
pass
-have_twisted = False
-try:
- from cassandra.io.twistedreactor import TwistedConnection
- have_twisted = True
- supported_reactors.append(TwistedConnection)
-except ImportError as exc:
- log.exception("Error importing twisted")
- pass
-
KEYSPACE = "testkeyspace" + str(int(time.time()))
TABLE = "testtable"
@@ -230,8 +221,6 @@ def parse_options():
help='only benchmark with asyncio connections')
parser.add_option('--libev-only', action='store_true', dest='libev_only',
help='only benchmark with libev connections')
- parser.add_option('--twisted-only', action='store_true', dest='twisted_only',
- help='only benchmark with Twisted connections')
parser.add_option('-m', '--metrics', action='store_true', dest='enable_metrics',
help='enable and print metrics for operations')
parser.add_option('-l', '--log-level', default='info',
@@ -271,11 +260,6 @@ def parse_options():
log.error("libev is not available")
sys.exit(1)
options.supported_reactors = [LibevConnection]
- elif options.twisted_only:
- if not have_twisted:
- log.error("Twisted is not available")
- sys.exit(1)
- options.supported_reactors = [TwistedConnection]
else:
options.supported_reactors = supported_reactors
if not have_libev:
diff --git a/cassandra/cluster.py b/cassandra/cluster.py
index 6b2ab4b288..9ffefe99b5 100644
--- a/cassandra/cluster.py
+++ b/cassandra/cluster.py
@@ -93,55 +93,11 @@
from cassandra.datastax.graph.query import _request_timeout_key, _GraphSONContextRowFactory
from cassandra.datastax import cloud as dscloud
-try:
- from cassandra.io.twistedreactor import TwistedConnection
-except ImportError:
- TwistedConnection = None
-
-try:
- from cassandra.io.eventletreactor import EventletConnection
-# PYTHON-1364
-#
-# At the moment eventlet initialization is chucking AttributeErrors due to its dependence on pyOpenSSL
-# and some changes in Python 3.12 which have some knock-on effects there.
-except (ImportError, AttributeError):
- EventletConnection = None
-
try:
from weakref import WeakSet
except ImportError:
from cassandra.util import WeakSet # NOQA
-def _is_gevent_monkey_patched():
- if 'gevent.monkey' not in sys.modules:
- return False
- import gevent.socket
- return socket.socket is gevent.socket.socket
-
-def _try_gevent_import():
- if _is_gevent_monkey_patched():
- from cassandra.io.geventreactor import GeventConnection
- return (GeventConnection,None)
- else:
- return (None,None)
-
-def _is_eventlet_monkey_patched():
- if 'eventlet.patcher' not in sys.modules:
- return False
- try:
- import eventlet.patcher
- return eventlet.patcher.is_monkey_patched('socket')
- # Another case related to PYTHON-1364
- except AttributeError:
- return False
-
-def _try_eventlet_import():
- if _is_eventlet_monkey_patched():
- from cassandra.io.eventletreactor import EventletConnection
- return (EventletConnection,None)
- else:
- return (None,None)
-
def _try_libev_import():
try:
from cassandra.io.libevreactor import LibevConnection
@@ -168,7 +124,7 @@ def _connection_reduce_fn(val,import_fn):
log = logging.getLogger(__name__)
-conn_fns = (_try_gevent_import, _try_eventlet_import, _try_libev_import, _try_asyncore_import)
+conn_fns = (_try_libev_import, _try_asyncore_import)
(conn_class, excs) = reduce(_connection_reduce_fn, conn_fns, (None,[]))
if not conn_class:
raise DependencyException("Unable to load a default connection class", excs)
@@ -878,9 +834,6 @@ def default_retry_policy(self, policy):
* :class:`cassandra.io.asyncorereactor.AsyncoreConnection`
* :class:`cassandra.io.libevreactor.LibevConnection`
- * :class:`cassandra.io.eventletreactor.EventletConnection` (requires monkey-patching - see doc for details)
- * :class:`cassandra.io.geventreactor.GeventConnection` (requires monkey-patching - see doc for details)
- * :class:`cassandra.io.twistedreactor.TwistedConnection`
* EXPERIMENTAL: :class:`cassandra.io.asyncioreactor.AsyncioConnection`
By default, ``AsyncoreConnection`` will be used, which uses
@@ -888,9 +841,6 @@ def default_retry_policy(self, policy):
If ``libev`` is installed, ``LibevConnection`` will be used instead.
- If ``gevent`` or ``eventlet`` monkey-patching is detected, the corresponding
- connection class will be used automatically.
-
``AsyncioConnection``, which uses the ``asyncio`` module in the Python
standard library, is also available, but currently experimental. Note that
it requires ``asyncio`` features that were only introduced in the 3.4 line
@@ -1168,9 +1118,7 @@ def __init__(self,
raise ValueError("contact_points, endpoint_factory, ssl_context, and ssl_options "
"cannot be specified with a cloud configuration")
- uses_twisted = TwistedConnection and issubclass(self.connection_class, TwistedConnection)
- uses_eventlet = EventletConnection and issubclass(self.connection_class, EventletConnection)
- cloud_config = dscloud.get_cloud_config(cloud, create_pyopenssl_context=uses_twisted or uses_eventlet)
+ cloud_config = dscloud.get_cloud_config(cloud)
ssl_context = cloud_config.ssl_context
ssl_options = {'check_hostname': True}
@@ -1389,7 +1337,7 @@ def __init__(self,
HostDistance.REMOTE: DEFAULT_MAX_CONNECTIONS_PER_REMOTE_HOST
}
- self.executor = self._create_thread_pool_executor(max_workers=executor_threads)
+ self.executor = ThreadPoolExecutor(max_workers=executor_threads)
self.scheduler = _Scheduler(self.executor)
self._lock = RLock()
@@ -1411,42 +1359,6 @@ def __init__(self,
if application_version is not None:
self.application_version = application_version
- def _create_thread_pool_executor(self, **kwargs):
- """
- Create a ThreadPoolExecutor for the cluster. In most cases, the built-in
- `concurrent.futures.ThreadPoolExecutor` is used.
-
- Python 3.7+ and Eventlet cause the `concurrent.futures.ThreadPoolExecutor`
- to hang indefinitely. In that case, the user needs to have the `futurist`
- package so we can use the `futurist.GreenThreadPoolExecutor` class instead.
-
- :param kwargs: All keyword args are passed to the ThreadPoolExecutor constructor.
- :return: A ThreadPoolExecutor instance.
- """
- tpe_class = ThreadPoolExecutor
- if sys.version_info[0] >= 3 and sys.version_info[1] >= 7:
- try:
- from cassandra.io.eventletreactor import EventletConnection
- is_eventlet = issubclass(self.connection_class, EventletConnection)
- except:
- # Eventlet is not available or can't be detected
- return tpe_class(**kwargs)
-
- if is_eventlet:
- try:
- from futurist import GreenThreadPoolExecutor
- tpe_class = GreenThreadPoolExecutor
- except ImportError:
- # futurist is not available
- raise ImportError(
- ("Python 3.7+ and Eventlet cause the `concurrent.futures.ThreadPoolExecutor` "
- "to hang indefinitely. If you want to use the Eventlet reactor, you "
- "need to install the `futurist` package to allow the driver to use "
- "the GreenThreadPoolExecutor. See https://github.com/eventlet/eventlet/issues/508 "
- "for more details."))
-
- return tpe_class(**kwargs)
-
def register_user_type(self, keyspace, user_type, klass):
"""
Registers a class to use to represent a particular user-defined type.
diff --git a/cassandra/connection.py b/cassandra/connection.py
index 3ceaa08afc..48aa709821 100644
--- a/cassandra/connection.py
+++ b/cassandra/connection.py
@@ -29,10 +29,7 @@
import weakref
-if 'gevent.monkey' in sys.modules:
- from gevent.queue import Queue, Empty
-else:
- from queue import Queue, Empty # noqa
+from queue import Queue, Empty # noqa
from cassandra import ConsistencyLevel, AuthenticationFailed, OperationTimedOut, ProtocolVersion
from cassandra.marshal import int32_pack
diff --git a/cassandra/datastax/cloud/__init__.py b/cassandra/datastax/cloud/__init__.py
index e175b2928b..f5633b5e62 100644
--- a/cassandra/datastax/cloud/__init__.py
+++ b/cassandra/datastax/cloud/__init__.py
@@ -78,7 +78,7 @@ def from_dict(cls, d):
return c
-def get_cloud_config(cloud_config, create_pyopenssl_context=False):
+def get_cloud_config(cloud_config):
if not _HAS_SSL:
raise DriverException("A Python installation with SSL is required to connect to a cloud cluster.")
@@ -86,17 +86,15 @@ def get_cloud_config(cloud_config, create_pyopenssl_context=False):
raise ValueError("The cloud config doesn't have a secure_connect_bundle specified.")
try:
- config = read_cloud_config_from_zip(cloud_config, create_pyopenssl_context)
+ config = read_cloud_config_from_zip(cloud_config)
except BadZipFile:
raise ValueError("Unable to open the zip file for the cloud config. Check your secure connect bundle.")
config = read_metadata_info(config, cloud_config)
- if create_pyopenssl_context:
- config.ssl_context = config.pyopenssl_context
return config
-def read_cloud_config_from_zip(cloud_config, create_pyopenssl_context):
+def read_cloud_config_from_zip(cloud_config):
secure_bundle = cloud_config['secure_connect_bundle']
use_default_tempdir = cloud_config.get('use_default_tempdir', None)
with ZipFile(secure_bundle) as zipfile:
@@ -104,12 +102,12 @@ def read_cloud_config_from_zip(cloud_config, create_pyopenssl_context):
tmp_dir = tempfile.mkdtemp(dir=base_dir)
try:
zipfile.extractall(path=tmp_dir)
- return parse_cloud_config(os.path.join(tmp_dir, 'config.json'), cloud_config, create_pyopenssl_context)
+ return parse_cloud_config(os.path.join(tmp_dir, 'config.json'), cloud_config)
finally:
shutil.rmtree(tmp_dir)
-def parse_cloud_config(path, cloud_config, create_pyopenssl_context):
+def parse_cloud_config(path, cloud_config):
with open(path, 'r') as stream:
data = json.load(stream)
@@ -123,11 +121,7 @@ def parse_cloud_config(path, cloud_config, create_pyopenssl_context):
ca_cert_location = os.path.join(config_dir, 'ca.crt')
cert_location = os.path.join(config_dir, 'cert')
key_location = os.path.join(config_dir, 'key')
- # Regardless of if we create a pyopenssl context, we still need the builtin one
- # to connect to the metadata service
config.ssl_context = _ssl_context_from_cert(ca_cert_location, cert_location, key_location)
- if create_pyopenssl_context:
- config.pyopenssl_context = _pyopenssl_context_from_cert(ca_cert_location, cert_location, key_location)
return config
@@ -178,18 +172,3 @@ def _ssl_context_from_cert(ca_cert_location, cert_location, key_location):
return ssl_context
-
-def _pyopenssl_context_from_cert(ca_cert_location, cert_location, key_location):
- try:
- from OpenSSL import SSL
- except ImportError as e:
- raise ImportError(
- "PyOpenSSL must be installed to connect to Astra with the Eventlet or Twisted event loops")\
- .with_traceback(e.__traceback__)
- ssl_context = SSL.Context(SSL.TLSv1_METHOD)
- ssl_context.set_verify(SSL.VERIFY_PEER, callback=lambda _1, _2, _3, _4, ok: ok)
- ssl_context.use_certificate_file(cert_location)
- ssl_context.use_privatekey_file(key_location)
- ssl_context.load_verify_locations(ca_cert_location)
-
- return ssl_context
\ No newline at end of file
diff --git a/cassandra/datastax/insights/reporter.py b/cassandra/datastax/insights/reporter.py
index 607c723a1a..e3ea5a1c3a 100644
--- a/cassandra/datastax/insights/reporter.py
+++ b/cassandra/datastax/insights/reporter.py
@@ -144,11 +144,7 @@ def _get_startup_data(self):
cert_validation = None
try:
if self._session.cluster.ssl_context:
- if isinstance(self._session.cluster.ssl_context, ssl.SSLContext):
- cert_validation = self._session.cluster.ssl_context.verify_mode == ssl.CERT_REQUIRED
- else: # pyopenssl
- from OpenSSL import SSL
- cert_validation = self._session.cluster.ssl_context.get_verify_mode() != SSL.VERIFY_NONE
+ cert_validation = self._session.cluster.ssl_context.verify_mode == ssl.CERT_REQUIRED
elif self._session.cluster.ssl_options:
cert_validation = self._session.cluster.ssl_options.get('cert_reqs') == ssl.CERT_REQUIRED
except Exception as e:
diff --git a/cassandra/io/eventletreactor.py b/cassandra/io/eventletreactor.py
deleted file mode 100644
index 6be7738236..0000000000
--- a/cassandra/io/eventletreactor.py
+++ /dev/null
@@ -1,195 +0,0 @@
-# Copyright 2014 Symantec Corporation
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Originally derived from MagnetoDB source:
-# https://github.com/stackforge/magnetodb/blob/2015.1.0b1/magnetodb/common/cassandra/io/eventletreactor.py
-import eventlet
-from eventlet.green import socket
-from eventlet.queue import Queue
-from greenlet import GreenletExit
-import logging
-from threading import Event
-import time
-from deprecated import deprecated
-
-from cassandra.connection import Connection, ConnectionShutdown, Timer, TimerManager
-try:
- from eventlet.green.OpenSSL import SSL
- _PYOPENSSL = True
-except ImportError as e:
- _PYOPENSSL = False
- no_pyopenssl_error = e
-
-
-log = logging.getLogger(__name__)
-
-
-def _check_pyopenssl():
- if not _PYOPENSSL:
- raise ImportError(
- "{}, pyOpenSSL must be installed to enable "
- "SSL support with the Eventlet event loop".format(str(no_pyopenssl_error))
- )
-
-
-@deprecated(version="3.30.0", reason="The eventlet event loop is deprecated and will be removed in 3.31.0. See CASSPYTHON-12.")
-class EventletConnection(Connection):
- """
- An implementation of :class:`.Connection` that utilizes ``eventlet``.
-
- This implementation assumes all eventlet monkey patching is active. It is not tested with partial patching.
- """
-
- _read_watcher = None
- _write_watcher = None
-
- _socket_impl = eventlet.green.socket
- _ssl_impl = eventlet.green.ssl
-
- _timers = None
- _timeout_watcher = None
- _new_timer = None
-
- @classmethod
- def initialize_reactor(cls):
- eventlet.monkey_patch()
- if not cls._timers:
- cls._timers = TimerManager()
- cls._timeout_watcher = eventlet.spawn(cls.service_timeouts)
- cls._new_timer = Event()
-
- @classmethod
- def create_timer(cls, timeout, callback):
- timer = Timer(timeout, callback)
- cls._timers.add_timer(timer)
- cls._new_timer.set()
- return timer
-
- @classmethod
- def service_timeouts(cls):
- """
- cls._timeout_watcher runs in this loop forever.
- It is usually waiting for the next timeout on the cls._new_timer Event.
- When new timers are added, that event is set so that the watcher can
- wake up and possibly set an earlier timeout.
- """
- timer_manager = cls._timers
- while True:
- next_end = timer_manager.service_timeouts()
- sleep_time = max(next_end - time.time(), 0) if next_end else 10000
- cls._new_timer.wait(sleep_time)
- cls._new_timer.clear()
-
- def __init__(self, *args, **kwargs):
- Connection.__init__(self, *args, **kwargs)
- self.uses_legacy_ssl_options = self.ssl_options and not self.ssl_context
- self._write_queue = Queue()
-
- self._connect_socket()
-
- self._read_watcher = eventlet.spawn(lambda: self.handle_read())
- self._write_watcher = eventlet.spawn(lambda: self.handle_write())
- self._send_options_message()
-
- def _wrap_socket_from_context(self):
- _check_pyopenssl()
- rv = SSL.Connection(self.ssl_context, self._socket)
- rv.set_connect_state()
- if self.ssl_options and 'server_hostname' in self.ssl_options:
- # This is necessary for SNI
- rv.set_tlsext_host_name(self.ssl_options['server_hostname'].encode('ascii'))
- return rv
-
- def _initiate_connection(self, sockaddr):
- if self.uses_legacy_ssl_options:
- super(EventletConnection, self)._initiate_connection(sockaddr)
- else:
- self._socket.connect(sockaddr)
- if self.ssl_context or self.ssl_options:
- self._socket.do_handshake()
-
- def _validate_hostname(self):
- if not self.uses_legacy_ssl_options:
- cert_name = self._socket.get_peer_certificate().get_subject().commonName
- if cert_name != self.endpoint.address:
- raise Exception("Hostname verification failed! Certificate name '{}' "
- "doesn't match endpoint '{}'".format(cert_name, self.endpoint.address))
-
- def close(self):
- with self.lock:
- if self.is_closed:
- return
- self.is_closed = True
-
- log.debug("Closing connection (%s) to %s" % (id(self), self.endpoint))
-
- cur_gthread = eventlet.getcurrent()
-
- if self._read_watcher and self._read_watcher != cur_gthread:
- self._read_watcher.kill()
- if self._write_watcher and self._write_watcher != cur_gthread:
- self._write_watcher.kill()
- if self._socket:
- self._socket.close()
- log.debug("Closed socket to %s" % (self.endpoint,))
-
- if not self.is_defunct:
- self.error_all_requests(
- ConnectionShutdown("Connection to %s was closed" % self.endpoint))
- # don't leave in-progress operations hanging
- self.connected_event.set()
-
- def handle_close(self):
- log.debug("connection closed by server")
- self.close()
-
- def handle_write(self):
- while True:
- try:
- next_msg = self._write_queue.get()
- self._socket.sendall(next_msg)
- except socket.error as err:
- log.debug("Exception during socket send for %s: %s", self, err)
- self.defunct(err)
- return # Leave the write loop
- except GreenletExit: # graceful greenthread exit
- return
-
- def handle_read(self):
- while True:
- try:
- buf = self._socket.recv(self.in_buffer_size)
- self._iobuf.write(buf)
- except socket.error as err:
- log.debug("Exception during socket recv for %s: %s",
- self, err)
- self.defunct(err)
- return # leave the read loop
- except GreenletExit: # graceful greenthread exit
- return
-
- if buf and self._iobuf.tell():
- self.process_io_buffer()
- else:
- log.debug("Connection %s closed by server", self)
- self.close()
- return
-
- def push(self, data):
- chunk_size = self.out_buffer_size
- for i in range(0, len(data), chunk_size):
- self._write_queue.put(data[i:i + chunk_size])
diff --git a/cassandra/io/geventreactor.py b/cassandra/io/geventreactor.py
deleted file mode 100644
index eb1296d6f9..0000000000
--- a/cassandra/io/geventreactor.py
+++ /dev/null
@@ -1,138 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import gevent
-import gevent.event
-from gevent.queue import Queue
-from gevent import socket
-import gevent.ssl
-
-from deprecated import deprecated
-import logging
-import time
-
-from cassandra.connection import Connection, ConnectionShutdown, Timer, TimerManager
-
-log = logging.getLogger(__name__)
-
-@deprecated(version="3.30.0", reason="The gevent event loop is deprecated and will be removed in 3.31.0. See CASSPYTHON-12.")
-class GeventConnection(Connection):
- """
- An implementation of :class:`.Connection` that utilizes ``gevent``.
-
- This implementation assumes all gevent monkey patching is active. It is not tested with partial patching.
- """
-
- _read_watcher = None
- _write_watcher = None
-
- _socket_impl = gevent.socket
- _ssl_impl = gevent.ssl
-
- _timers = None
- _timeout_watcher = None
- _new_timer = None
-
- @classmethod
- def initialize_reactor(cls):
- if not cls._timers:
- cls._timers = TimerManager()
- cls._timeout_watcher = gevent.spawn(cls.service_timeouts)
- cls._new_timer = gevent.event.Event()
-
- @classmethod
- def create_timer(cls, timeout, callback):
- timer = Timer(timeout, callback)
- cls._timers.add_timer(timer)
- cls._new_timer.set()
- return timer
-
- @classmethod
- def service_timeouts(cls):
- timer_manager = cls._timers
- timer_event = cls._new_timer
- while True:
- next_end = timer_manager.service_timeouts()
- sleep_time = max(next_end - time.time(), 0) if next_end else 10000
- timer_event.wait(sleep_time)
- timer_event.clear()
-
- def __init__(self, *args, **kwargs):
- Connection.__init__(self, *args, **kwargs)
-
- self._write_queue = Queue()
-
- self._connect_socket()
-
- self._read_watcher = gevent.spawn(self.handle_read)
- self._write_watcher = gevent.spawn(self.handle_write)
- self._send_options_message()
-
- def close(self):
- with self.lock:
- if self.is_closed:
- return
- self.is_closed = True
-
- log.debug("Closing connection (%s) to %s" % (id(self), self.endpoint))
- if self._read_watcher:
- self._read_watcher.kill(block=False)
- if self._write_watcher:
- self._write_watcher.kill(block=False)
- if self._socket:
- self._socket.close()
- log.debug("Closed socket to %s" % (self.endpoint,))
-
- if not self.is_defunct:
- self.error_all_requests(
- ConnectionShutdown("Connection to %s was closed" % self.endpoint))
- # don't leave in-progress operations hanging
- self.connected_event.set()
-
- def handle_close(self):
- log.debug("connection closed by server")
- self.close()
-
- def handle_write(self):
- while True:
- try:
- next_msg = self._write_queue.get()
- self._socket.sendall(next_msg)
- except socket.error as err:
- log.debug("Exception in send for %s: %s", self, err)
- self.defunct(err)
- return
-
- def handle_read(self):
- while True:
- try:
- buf = self._socket.recv(self.in_buffer_size)
- self._iobuf.write(buf)
- except socket.error as err:
- log.debug("Exception in read for %s: %s", self, err)
- self.defunct(err)
- return # leave the read loop
-
- if buf and self._iobuf.tell():
- self.process_io_buffer()
- else:
- log.debug("Connection %s closed by server", self)
- self.close()
- return
-
- def push(self, data):
- chunk_size = self.out_buffer_size
- for i in range(0, len(data), chunk_size):
- self._write_queue.put(data[i:i + chunk_size])
diff --git a/cassandra/io/twistedreactor.py b/cassandra/io/twistedreactor.py
deleted file mode 100644
index 58e79e9ce9..0000000000
--- a/cassandra/io/twistedreactor.py
+++ /dev/null
@@ -1,309 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""
-Module that implements an event loop based on twisted
-( https://twistedmatrix.com ).
-"""
-import atexit
-from deprecated import deprecated
-import logging
-import time
-from functools import partial
-from threading import Thread, Lock
-import weakref
-
-from twisted.internet import reactor, protocol
-from twisted.internet.endpoints import connectProtocol, TCP4ClientEndpoint, SSL4ClientEndpoint
-from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
-from twisted.python.failure import Failure
-from zope.interface import implementer
-
-from cassandra.connection import Connection, ConnectionShutdown, Timer, TimerManager, ConnectionException
-
-try:
- from OpenSSL import SSL
- _HAS_SSL = True
-except ImportError as e:
- _HAS_SSL = False
- import_exception = e
-log = logging.getLogger(__name__)
-
-
-def _cleanup(cleanup_weakref):
- try:
- cleanup_weakref()._cleanup()
- except ReferenceError:
- return
-
-
-class TwistedConnectionProtocol(protocol.Protocol):
- """
- Twisted Protocol class for handling data received and connection
- made events.
- """
-
- def __init__(self, connection):
- self.connection = connection
-
- def dataReceived(self, data):
- """
- Callback function that is called when data has been received
- on the connection.
-
- Reaches back to the Connection object and queues the data for
- processing.
- """
- self.connection._iobuf.write(data)
- self.connection.handle_read()
-
- def connectionMade(self):
- """
- Callback function that is called when a connection has succeeded.
-
- Reaches back to the Connection object and confirms that the connection
- is ready.
- """
- self.connection.client_connection_made(self.transport)
-
- def connectionLost(self, reason):
- # reason is a Failure instance
- log.debug("Connect lost: %s", reason)
- self.connection.defunct(reason.value)
-
-
-class TwistedLoop(object):
-
- _lock = None
- _thread = None
- _timeout_task = None
- _timeout = None
-
- def __init__(self):
- self._lock = Lock()
- self._timers = TimerManager()
-
- def maybe_start(self):
- with self._lock:
- if not reactor.running:
- self._thread = Thread(target=reactor.run,
- name="cassandra_driver_twisted_event_loop",
- kwargs={'installSignalHandlers': False})
- self._thread.daemon = True
- self._thread.start()
- atexit.register(partial(_cleanup, weakref.ref(self)))
-
- def _reactor_stopped(self):
- return reactor._stopped
-
- def _cleanup(self):
- if self._thread:
- reactor.callFromThread(reactor.stop)
- self._thread.join(timeout=1.0)
- if self._thread.is_alive():
- log.warning("Event loop thread could not be joined, so "
- "shutdown may not be clean. Please call "
- "Cluster.shutdown() to avoid this.")
- log.debug("Event loop thread was joined")
-
- def add_timer(self, timer):
- self._timers.add_timer(timer)
- # callFromThread to schedule from the loop thread, where
- # the timeout task can safely be modified
- reactor.callFromThread(self._schedule_timeout, timer.end)
-
- def _schedule_timeout(self, next_timeout):
- if next_timeout:
- delay = max(next_timeout - time.time(), 0)
- if self._timeout_task and self._timeout_task.active():
- if next_timeout < self._timeout:
- self._timeout_task.reset(delay)
- self._timeout = next_timeout
- else:
- self._timeout_task = reactor.callLater(delay, self._on_loop_timer)
- self._timeout = next_timeout
-
- def _on_loop_timer(self):
- self._timers.service_timeouts()
- self._schedule_timeout(self._timers.next_timeout)
-
-
-@implementer(IOpenSSLClientConnectionCreator)
-class _SSLCreator(object):
- def __init__(self, endpoint, ssl_context, ssl_options, check_hostname, timeout):
- self.endpoint = endpoint
- self.ssl_options = ssl_options
- self.check_hostname = check_hostname
- self.timeout = timeout
-
- if ssl_context:
- self.context = ssl_context
- else:
- self.context = SSL.Context(SSL.TLSv1_METHOD)
- if "certfile" in self.ssl_options:
- self.context.use_certificate_file(self.ssl_options["certfile"])
- if "keyfile" in self.ssl_options:
- self.context.use_privatekey_file(self.ssl_options["keyfile"])
- if "ca_certs" in self.ssl_options:
- self.context.load_verify_locations(self.ssl_options["ca_certs"])
- if "cert_reqs" in self.ssl_options:
- self.context.set_verify(
- self.ssl_options["cert_reqs"],
- callback=self.verify_callback
- )
- self.context.set_info_callback(self.info_callback)
-
- def verify_callback(self, connection, x509, errnum, errdepth, ok):
- return ok
-
- def info_callback(self, connection, where, ret):
- if where & SSL.SSL_CB_HANDSHAKE_DONE:
- if self.check_hostname and self.endpoint.address != connection.get_peer_certificate().get_subject().commonName:
- transport = connection.get_app_data()
- transport.failVerification(Failure(ConnectionException("Hostname verification failed", self.endpoint)))
-
- def clientConnectionForTLS(self, tlsProtocol):
- connection = SSL.Connection(self.context, None)
- connection.set_app_data(tlsProtocol)
- if self.ssl_options and "server_hostname" in self.ssl_options:
- connection.set_tlsext_host_name(self.ssl_options['server_hostname'].encode('ascii'))
- return connection
-
-@deprecated(version="3.30.0", reason="The Twisted event loop is deprecated and will be removed in 3.31.0. See CASSPYTHON-12.")
-class TwistedConnection(Connection):
- """
- An implementation of :class:`.Connection` that utilizes the
- Twisted event loop.
- """
-
- _loop = None
-
- @classmethod
- def initialize_reactor(cls):
- if not cls._loop:
- cls._loop = TwistedLoop()
-
- @classmethod
- def create_timer(cls, timeout, callback):
- timer = Timer(timeout, callback)
- cls._loop.add_timer(timer)
- return timer
-
- def __init__(self, *args, **kwargs):
- """
- Initialization method.
-
- Note that we can't call reactor methods directly here because
- it's not thread-safe, so we schedule the reactor/connection
- stuff to be run from the event loop thread when it gets the
- chance.
- """
- Connection.__init__(self, *args, **kwargs)
-
- self.is_closed = True
- self.connector = None
- self.transport = None
-
- reactor.callFromThread(self.add_connection)
- self._loop.maybe_start()
-
- def _check_pyopenssl(self):
- if self.ssl_context or self.ssl_options:
- if not _HAS_SSL:
- raise ImportError(
- str(import_exception) +
- ', pyOpenSSL must be installed to enable SSL support with the Twisted event loop'
- )
-
- def add_connection(self):
- """
- Convenience function to connect and store the resulting
- connector.
- """
- host, port = self.endpoint.resolve()
- if self.ssl_context or self.ssl_options:
- # Can't use optionsForClientTLS here because it *forces* hostname verification.
- # Cool they enforce strong security, but we have to be able to turn it off
- self._check_pyopenssl()
-
- ssl_connection_creator = _SSLCreator(
- self.endpoint,
- self.ssl_context if self.ssl_context else None,
- self.ssl_options,
- self._check_hostname,
- self.connect_timeout,
- )
-
- endpoint = SSL4ClientEndpoint(
- reactor,
- host,
- port,
- sslContextFactory=ssl_connection_creator,
- timeout=self.connect_timeout,
- )
- else:
- endpoint = TCP4ClientEndpoint(
- reactor,
- host,
- port,
- timeout=self.connect_timeout
- )
- connectProtocol(endpoint, TwistedConnectionProtocol(self))
-
- def client_connection_made(self, transport):
- """
- Called by twisted protocol when a connection attempt has
- succeeded.
- """
- with self.lock:
- self.is_closed = False
- self.transport = transport
- self._send_options_message()
-
- def close(self):
- """
- Disconnect and error-out all requests.
- """
- with self.lock:
- if self.is_closed:
- return
- self.is_closed = True
-
- log.debug("Closing connection (%s) to %s", id(self), self.endpoint)
- reactor.callFromThread(self.transport.connector.disconnect)
- log.debug("Closed socket to %s", self.endpoint)
-
- if not self.is_defunct:
- self.error_all_requests(
- ConnectionShutdown("Connection to %s was closed" % self.endpoint))
- # don't leave in-progress operations hanging
- self.connected_event.set()
-
- def handle_read(self):
- """
- Process the incoming data buffer.
- """
- self.process_io_buffer()
-
- def push(self, data):
- """
- This function is called when outgoing data should be queued
- for sending.
-
- Note that we can't call transport.write() directly because
- it is not thread-safe, so we schedule it to run from within
- the event loop when it gets the chance.
- """
- reactor.callFromThread(self.transport.write, data)
diff --git a/docs/api/cassandra/io/eventletreactor.rst b/docs/api/cassandra/io/eventletreactor.rst
deleted file mode 100644
index 1ba742c7e9..0000000000
--- a/docs/api/cassandra/io/eventletreactor.rst
+++ /dev/null
@@ -1,7 +0,0 @@
-``cassandra.io.eventletreactor`` - ``eventlet``-compatible Connection
-=====================================================================
-
-.. module:: cassandra.io.eventletreactor
-
-.. autoclass:: EventletConnection
- :members:
diff --git a/docs/api/cassandra/io/geventreactor.rst b/docs/api/cassandra/io/geventreactor.rst
deleted file mode 100644
index 603affe140..0000000000
--- a/docs/api/cassandra/io/geventreactor.rst
+++ /dev/null
@@ -1,7 +0,0 @@
-``cassandra.io.geventreactor`` - ``gevent``-compatible Event Loop
-=================================================================
-
-.. module:: cassandra.io.geventreactor
-
-.. autoclass:: GeventConnection
- :members:
diff --git a/docs/api/cassandra/io/twistedreactor.rst b/docs/api/cassandra/io/twistedreactor.rst
deleted file mode 100644
index 24e93bd432..0000000000
--- a/docs/api/cassandra/io/twistedreactor.rst
+++ /dev/null
@@ -1,9 +0,0 @@
-``cassandra.io.twistedreactor`` - Twisted Event Loop
-====================================================
-
-.. module:: cassandra.io.twistedreactor
-
-.. class:: TwistedConnection
-
- An implementation of :class:`~cassandra.io.connection.Connection` that uses
- Twisted's reactor as its event loop.
diff --git a/docs/api/index.rst b/docs/api/index.rst
index 9e778d508c..c98cfd6b28 100644
--- a/docs/api/index.rst
+++ b/docs/api/index.rst
@@ -24,10 +24,7 @@ Core Driver
cassandra/timestamps
cassandra/io/asyncioreactor
cassandra/io/asyncorereactor
- cassandra/io/eventletreactor
cassandra/io/libevreactor
- cassandra/io/geventreactor
- cassandra/io/twistedreactor
.. _om_api:
diff --git a/docs/cloud.rst b/docs/cloud.rst
index 3230720ec9..46b416e53f 100644
--- a/docs/cloud.rst
+++ b/docs/cloud.rst
@@ -67,10 +67,6 @@ In most circumstances, the client code for interacting with an Astra cluster wil
Limitations
===========
-Event loops
-^^^^^^^^^^^
-Evenlet isn't yet supported for python 3.7+ due to an `issue in Eventlet `_.
-
CqlEngine
=========
diff --git a/docs/installation.rst b/docs/installation.rst
index a0a5e25dab..a81222ad0f 100644
--- a/docs/installation.rst
+++ b/docs/installation.rst
@@ -197,9 +197,6 @@ The ``asyncio`` event loop is generally functional but still somewhat experiment
for production systems. We anticipate significant improvements to this event loop (including hopefully
making this event loop the default going forward) in 3.31.0.
-The ``gevent``, ``eventlet`` and ``Twisted`` event loops have been deprecated in 3.30.0 and will be removed
-completely in 3.31.0.
-
libev support
^^^^^^^^^^^^^
If you're on Linux, you should be able to install libev
diff --git a/docs/security.rst b/docs/security.rst
index 6dd2624c24..f64bab9403 100644
--- a/docs/security.rst
+++ b/docs/security.rst
@@ -86,15 +86,6 @@ It might be also useful to learn about the different levels of identity verifica
* `Using SSL in DSE drivers `_
-SSL with Twisted or Eventlet
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-Twisted and Eventlet both use an alternative SSL implementation called pyOpenSSL, so if your `Cluster`'s connection class is
-:class:`~cassandra.io.twistedreactor.TwistedConnection` or :class:`~cassandra.io.eventletreactor.EventletConnection`, you must pass a
-`pyOpenSSL context `_ instead.
-An example is provided in these docs, and more details can be found in the
-`documentation `_.
-pyOpenSSL is not installed by the driver and must be installed separately.
-
SSL Configuration Examples
^^^^^^^^^^^^^^^^^^^^^^^^^^
Here, we'll describe the server and driver configuration necessary to set up SSL to meet various goals, such as the client verifying the server and the server verifying the client. We'll also include Python code demonstrating how to use servers and drivers configured in these ways.
@@ -267,32 +258,6 @@ The following driver code specifies that the connection should use two-way verif
The driver uses ``SSLContext`` directly to give you many other options in configuring SSL. Consider reading the `Python SSL documentation `_
for more details about ``SSLContext`` configuration.
-**Server verifies client and client verifies server using Twisted and pyOpenSSL**
-
-.. code-block:: python
-
- from OpenSSL import SSL, crypto
- from cassandra.cluster import Cluster
- from cassandra.io.twistedreactor import TwistedConnection
-
- ssl_context = SSL.Context(SSL.TLSv1_2_METHOD)
- ssl_context.set_verify(SSL.VERIFY_PEER, callback=lambda _1, _2, _3, _4, ok: ok)
- ssl_context.use_certificate_file('/path/to/client.crt_signed')
- ssl_context.use_privatekey_file('/path/to/client.key')
- ssl_context.load_verify_locations('/path/to/rootca.crt')
-
- cluster = Cluster(
- contact_points=['127.0.0.1'],
- connection_class=TwistedConnection,
- ssl_context=ssl_context,
- ssl_options={'check_hostname': True}
- )
- session = cluster.connect()
-
-
-Connecting using Eventlet would look similar except instead of importing and using ``TwistedConnection``, you would
-import and use ``EventletConnection``, including the appropriate monkey-patching.
-
Versions 3.16.0 and lower
^^^^^^^^^^^^^^^^^^^^^^^^^
@@ -325,12 +290,6 @@ the `python ssl documentation `_.
-SSL with Twisted
-++++++++++++++++
-
-In case the twisted event loop is used pyOpenSSL must be installed or an exception will be risen. Also
-to set the ``ssl_version`` and ``cert_reqs`` in ``ssl_opts`` the appropriate constants from pyOpenSSL are expected.
-
DSE Authentication
------------------
When authenticating against DSE, the Cassandra driver provides two auth providers that work both with legacy kerberos and Cassandra authenticators,
diff --git a/test-requirements.txt b/test-requirements.txt
index 513451b496..073e166000 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -4,9 +4,6 @@ pytest
ccm>=3.1.5
pytz
pure-sasl
-twisted[tls]
-gevent
-eventlet
cython>=3.0
packaging
futurist
diff --git a/tests/__init__.py b/tests/__init__.py
index 7799b51399..9b8dd89717 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -32,28 +32,6 @@
handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s [%(module)s:%(lineno)s]: %(message)s'))
log.addHandler(handler)
-
-def is_eventlet_monkey_patched():
- if 'eventlet.patcher' not in sys.modules:
- return False
- try:
- import eventlet.patcher
- return eventlet.patcher.is_monkey_patched('socket')
- # Yet another case related to PYTHON-1364
- except AttributeError:
- return False
-
-def is_gevent_monkey_patched():
- if 'gevent.monkey' not in sys.modules:
- return False
- import gevent.socket
- return socket.socket is gevent.socket.socket
-
-
-def is_monkey_patched():
- return is_gevent_monkey_patched() or is_eventlet_monkey_patched()
-
-MONKEY_PATCH_LOOP = bool(os.getenv('MONKEY_PATCH_LOOP', False))
EVENT_LOOP_MANAGER = os.getenv('EVENT_LOOP_MANAGER', "libev")
@@ -66,30 +44,9 @@ def is_monkey_patched():
thread_pool_executor_class = ThreadPoolExecutor
-if "gevent" in EVENT_LOOP_MANAGER:
- import gevent.monkey
- gevent.monkey.patch_all()
- from cassandra.io.geventreactor import GeventConnection
- connection_class = GeventConnection
-elif "eventlet" in EVENT_LOOP_MANAGER:
- from eventlet import monkey_patch
- monkey_patch()
-
- from cassandra.io.eventletreactor import EventletConnection
- connection_class = EventletConnection
-
- try:
- from futurist import GreenThreadPoolExecutor
- thread_pool_executor_class = GreenThreadPoolExecutor
- except:
- # futurist is installed only with python >=3.7
- pass
-elif "asyncore" in EVENT_LOOP_MANAGER:
+if "asyncore" in EVENT_LOOP_MANAGER:
from cassandra.io.asyncorereactor import AsyncoreConnection
connection_class = AsyncoreConnection
-elif "twisted" in EVENT_LOOP_MANAGER:
- from cassandra.io.twistedreactor import TwistedConnection
- connection_class = TwistedConnection
elif "asyncio" in EVENT_LOOP_MANAGER:
from cassandra.io.asyncioreactor import AsyncioConnection
connection_class = AsyncioConnection
diff --git a/tests/integration/long/test_ipv6.py b/tests/integration/long/test_ipv6.py
index e20f11cc9c..6666ea79c7 100644
--- a/tests/integration/long/test_ipv6.py
+++ b/tests/integration/long/test_ipv6.py
@@ -20,17 +20,12 @@
from cassandra.cluster import NoHostAvailable
from cassandra.io.asyncorereactor import AsyncoreConnection
-from tests import is_monkey_patched
from tests.integration import use_cluster, remove_cluster, TestCluster
-if is_monkey_patched():
- LibevConnection = -1
- AsyncoreConnection = -1
-else:
- try:
- from cassandra.io.libevreactor import LibevConnection
- except ImportError:
- LibevConnection = None
+try:
+ from cassandra.io.libevreactor import LibevConnection
+except ImportError:
+ LibevConnection = None
import unittest
diff --git a/tests/integration/long/test_ssl.py b/tests/integration/long/test_ssl.py
index 5d86063d3e..33f75850b9 100644
--- a/tests/integration/long/test_ssl.py
+++ b/tests/integration/long/test_ssl.py
@@ -22,8 +22,6 @@
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement
-from OpenSSL import SSL, crypto
-
from tests.integration import (
get_cluster, remove_cluster, use_single_node, start_cluster_wait_for_up, EVENT_LOOP_MANAGER, TestCluster
)
@@ -50,16 +48,9 @@
DRIVER_CERTFILE = os.path.abspath("tests/integration/long/ssl/client.crt_signed")
DRIVER_CERTFILE_BAD = os.path.abspath("tests/integration/long/ssl/client_bad.key")
-USES_PYOPENSSL = "twisted" in EVENT_LOOP_MANAGER or "eventlet" in EVENT_LOOP_MANAGER
-if "twisted" in EVENT_LOOP_MANAGER:
- import OpenSSL
- ssl_version = OpenSSL.SSL.TLSv1_2_METHOD
- verify_certs = {'cert_reqs': SSL.VERIFY_PEER,
- 'check_hostname': True}
-else:
- ssl_version = ssl.PROTOCOL_TLS
- verify_certs = {'cert_reqs': ssl.CERT_REQUIRED,
- 'check_hostname': True}
+ssl_version = ssl.PROTOCOL_TLS
+verify_certs = {'cert_reqs': ssl.CERT_REQUIRED,
+ 'check_hostname': True}
def verify_callback(connection, x509, errnum, errdepth, ok):
@@ -314,10 +305,6 @@ def test_cannot_connect_with_bad_client_auth(self):
'ssl_version': ssl_version,
'keyfile': DRIVER_KEYFILE}
- if not USES_PYOPENSSL:
- # I don't set the bad certfile for pyopenssl because it hangs
- ssl_options['certfile'] = DRIVER_CERTFILE_BAD
-
cluster = TestCluster(
ssl_options={'ca_certs': CLIENT_CA_CERTS,
'ssl_version': ssl_version,
@@ -402,13 +389,9 @@ def test_can_connect_with_sslcontext_certificate(self):
@test_category connection:ssl
"""
- if USES_PYOPENSSL:
- ssl_context = SSL.Context(SSL.TLSv1_2_METHOD)
- ssl_context.load_verify_locations(CLIENT_CA_CERTS)
- else:
- ssl_context = ssl.SSLContext(ssl_version)
- ssl_context.load_verify_locations(CLIENT_CA_CERTS)
- ssl_context.verify_mode = ssl.CERT_REQUIRED
+ ssl_context = ssl.SSLContext(ssl_version)
+ ssl_context.load_verify_locations(CLIENT_CA_CERTS)
+ ssl_context.verify_mode = ssl.CERT_REQUIRED
validate_ssl_options(ssl_context=ssl_context)
def test_can_connect_with_ssl_client_auth_password_private_key(self):
@@ -426,19 +409,11 @@ def test_can_connect_with_ssl_client_auth_password_private_key(self):
abs_driver_certfile = os.path.abspath(DRIVER_CERTFILE)
ssl_options = {}
- if USES_PYOPENSSL:
- ssl_context = SSL.Context(SSL.TLSv1_2_METHOD)
- ssl_context.use_certificate_file(abs_driver_certfile)
- with open(abs_driver_keyfile) as keyfile:
- key = crypto.load_privatekey(crypto.FILETYPE_PEM, keyfile.read(), b'cassandra')
- ssl_context.use_privatekey(key)
- ssl_context.set_verify(SSL.VERIFY_NONE, verify_callback)
- else:
- ssl_context = ssl.SSLContext(ssl_version)
- ssl_context.load_cert_chain(certfile=abs_driver_certfile,
- keyfile=abs_driver_keyfile,
- password="cassandra")
- ssl_context.verify_mode = ssl.CERT_NONE
+ ssl_context = ssl.SSLContext(ssl_version)
+ ssl_context.load_cert_chain(certfile=abs_driver_certfile,
+ keyfile=abs_driver_keyfile,
+ password="cassandra")
+ ssl_context.verify_mode = ssl.CERT_NONE
validate_ssl_options(ssl_context=ssl_context, ssl_options=ssl_options)
def test_can_connect_with_ssl_context_ca_host_match(self):
@@ -447,52 +422,33 @@ def test_can_connect_with_ssl_context_ca_host_match(self):
using client auth, an encrypted keyfile, and host matching
"""
ssl_options = {}
- if USES_PYOPENSSL:
- ssl_context = SSL.Context(SSL.TLSv1_2_METHOD)
- ssl_context.use_certificate_file(DRIVER_CERTFILE)
- with open(DRIVER_KEYFILE_ENCRYPTED) as keyfile:
- key = crypto.load_privatekey(crypto.FILETYPE_PEM, keyfile.read(), b'cassandra')
- ssl_context.use_privatekey(key)
- ssl_context.load_verify_locations(CLIENT_CA_CERTS)
- ssl_options["check_hostname"] = True
- else:
- ssl_context = ssl.SSLContext(ssl_version)
- ssl_context.verify_mode = ssl.CERT_REQUIRED
- ssl_context.load_verify_locations(CLIENT_CA_CERTS)
- ssl_context.load_cert_chain(
- certfile=DRIVER_CERTFILE,
- keyfile=DRIVER_KEYFILE_ENCRYPTED,
- password="cassandra",
- )
- ssl_context.verify_mode = ssl.CERT_REQUIRED
- ssl_options["check_hostname"] = True
+ ssl_context = ssl.SSLContext(ssl_version)
+ ssl_context.verify_mode = ssl.CERT_REQUIRED
+ ssl_context.load_verify_locations(CLIENT_CA_CERTS)
+ ssl_context.load_cert_chain(
+ certfile=DRIVER_CERTFILE,
+ keyfile=DRIVER_KEYFILE_ENCRYPTED,
+ password="cassandra",
+ )
+ ssl_context.verify_mode = ssl.CERT_REQUIRED
+ ssl_options["check_hostname"] = True
validate_ssl_options(ssl_context=ssl_context, ssl_options=ssl_options)
def test_cannot_connect_ssl_context_with_invalid_hostname(self):
ssl_options = {}
- if USES_PYOPENSSL:
- ssl_context = SSL.Context(SSL.TLSv1_2_METHOD)
- ssl_context.use_certificate_file(DRIVER_CERTFILE)
- with open(DRIVER_KEYFILE_ENCRYPTED) as keyfile:
- key = crypto.load_privatekey(crypto.FILETYPE_PEM, keyfile.read(), b"cassandra")
- ssl_context.use_privatekey(key)
- ssl_context.load_verify_locations(CLIENT_CA_CERTS)
- ssl_options["check_hostname"] = True
- else:
- ssl_context = ssl.SSLContext(ssl_version)
- ssl_context.verify_mode = ssl.CERT_REQUIRED
- ssl_context.load_verify_locations(CLIENT_CA_CERTS)
- ssl_context.load_cert_chain(
- certfile=DRIVER_CERTFILE,
- keyfile=DRIVER_KEYFILE_ENCRYPTED,
- password="cassandra",
- )
- ssl_context.verify_mode = ssl.CERT_REQUIRED
- ssl_options["check_hostname"] = True
+ ssl_context = ssl.SSLContext(ssl_version)
+ ssl_context.verify_mode = ssl.CERT_REQUIRED
+ ssl_context.load_verify_locations(CLIENT_CA_CERTS)
+ ssl_context.load_cert_chain(
+ certfile=DRIVER_CERTFILE,
+ keyfile=DRIVER_KEYFILE_ENCRYPTED,
+ password="cassandra",
+ )
+ ssl_context.verify_mode = ssl.CERT_REQUIRED
+ ssl_options["check_hostname"] = True
with self.assertRaises(Exception):
validate_ssl_options(ssl_context=ssl_context, ssl_options=ssl_options, hostname="localhost")
- @unittest.skipIf(USES_PYOPENSSL, "This test is for the built-in ssl.Context")
def test_can_connect_with_sslcontext_default_context(self):
"""
Test to validate that we are able to connect to a cluster using a SSLContext created from create_default_context().
diff --git a/tests/integration/standard/test_connection.py b/tests/integration/standard/test_connection.py
index e7177d8770..88788a4ce2 100644
--- a/tests/integration/standard/test_connection.py
+++ b/tests/integration/standard/test_connection.py
@@ -31,7 +31,6 @@
from cassandra.policies import HostFilterPolicy, RoundRobinPolicy, HostStateListener
from cassandra.pool import HostConnectionPool
-from tests import is_monkey_patched
from tests.integration import use_singledc, get_node, CASSANDRA_IP, local, \
requiresmallclockgranularity, greaterthancass20, TestCluster
@@ -443,8 +442,6 @@ class AsyncoreConnectionTests(ConnectionTests, unittest.TestCase):
event_loop_name = "asyncore_cassandra_driver_event_loop"
def setUp(self):
- if is_monkey_patched():
- raise unittest.SkipTest("Can't test asyncore with monkey patching")
if AsyncoreConnection is None:
raise unittest.SkipTest('Unable to import asyncore module')
ConnectionTests.setUp(self)
@@ -460,8 +457,6 @@ class LibevConnectionTests(ConnectionTests, unittest.TestCase):
event_loop_name = "event_loop"
def setUp(self):
- if is_monkey_patched():
- raise unittest.SkipTest("Can't test libev with monkey patching")
if LibevConnection is None:
raise unittest.SkipTest(
'libev does not appear to be installed properly')
diff --git a/tests/unit/io/eventlet_utils.py b/tests/unit/io/eventlet_utils.py
deleted file mode 100644
index ef3e633ac7..0000000000
--- a/tests/unit/io/eventlet_utils.py
+++ /dev/null
@@ -1,50 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import os
-import select
-import socket
-try:
- import thread
- import Queue
- import __builtin__
- #For python3 compatibility
-except ImportError:
- import _thread as thread
- import queue as Queue
- import builtins as __builtin__
-
-import threading
-import ssl
-import time
-import eventlet
-from imp import reload
-
-def eventlet_un_patch_all():
- """
- A method to unpatch eventlet monkey patching used for the reactor tests
- """
-
- # These are the modules that are loaded by eventlet we reload them all
- modules_to_unpatch = [os, select, socket, thread, time, Queue, threading, ssl, __builtin__]
- for to_unpatch in modules_to_unpatch:
- reload(to_unpatch)
-
-def restore_saved_module(module):
- reload(module)
- del eventlet.patcher.already_patched[module.__name__]
-
diff --git a/tests/unit/io/gevent_utils.py b/tests/unit/io/gevent_utils.py
deleted file mode 100644
index b458d13170..0000000000
--- a/tests/unit/io/gevent_utils.py
+++ /dev/null
@@ -1,58 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from gevent import monkey
-
-
-def gevent_un_patch_all():
- """
- A method to unpatch gevent libraries. These are unloaded
- in the same order that gevent monkey patch loads theirs.
- Order cannot be arbitrary. This is used in the unit tests to
- un monkey patch gevent
- """
- restore_saved_module("os")
- restore_saved_module("time")
- restore_saved_module("thread")
- restore_saved_module("threading")
- restore_saved_module("_threading_local")
- restore_saved_module("stdin")
- restore_saved_module("stdout")
- restore_saved_module("socket")
- restore_saved_module("select")
- restore_saved_module("ssl")
- restore_saved_module("subprocess")
-
-
-def restore_saved_module(module):
- """
- gevent monkey patch keeps a list of all patched modules.
- This will restore the original ones
- :param module: to unpatch
- :return:
- """
-
- # Check the saved attributes in geven monkey patch
- if not (module in monkey.saved):
- return
- _module = __import__(module)
-
- # If it exist unpatch it
- for attr in monkey.saved[module]:
- if hasattr(_module, attr):
- setattr(_module, attr, monkey.saved[module][attr])
-
diff --git a/tests/unit/io/test_asyncioreactor.py b/tests/unit/io/test_asyncioreactor.py
index 65708d41dc..96cf92f528 100644
--- a/tests/unit/io/test_asyncioreactor.py
+++ b/tests/unit/io/test_asyncioreactor.py
@@ -7,7 +7,7 @@
AsyncioConnection = None
ASYNCIO_AVAILABLE = False
-from tests import is_monkey_patched, connection_class
+from tests import connection_class
from tests.unit.io.utils import TimerCallback, TimerTestMixin
from unittest.mock import patch
@@ -15,12 +15,10 @@
import unittest
import time
-skip_me = (is_monkey_patched() or
- (not ASYNCIO_AVAILABLE) or
+skip_me = ( not ASYNCIO_AVAILABLE or
(connection_class is not AsyncioConnection))
-@unittest.skipIf(is_monkey_patched(), 'runtime is monkey patched for another reactor')
@unittest.skipIf(connection_class is not AsyncioConnection,
'not running asyncio tests; current connection_class is {}'.format(connection_class))
@unittest.skipUnless(ASYNCIO_AVAILABLE, "asyncio is not available for this runtime")
diff --git a/tests/unit/io/test_asyncorereactor.py b/tests/unit/io/test_asyncorereactor.py
index b37df83bf6..a1934453c1 100644
--- a/tests/unit/io/test_asyncorereactor.py
+++ b/tests/unit/io/test_asyncorereactor.py
@@ -27,17 +27,13 @@
except DependencyException:
AsyncoreConnection = None
-from tests import is_monkey_patched
-from tests.unit.io.utils import ReactorTestMixin, TimerTestMixin, noop_if_monkey_patched
+from tests.unit.io.utils import ReactorTestMixin, TimerTestMixin
class AsyncorePatcher(unittest.TestCase):
@classmethod
- @noop_if_monkey_patched
def setUpClass(cls):
- if is_monkey_patched():
- return
AsyncoreConnection.initialize_reactor()
socket_patcher = patch('socket.socket', spec=socket.socket)
@@ -56,7 +52,6 @@ def setUpClass(cls):
cls.patchers = (socket_patcher, channel_patcher)
@classmethod
- @noop_if_monkey_patched
def tearDownClass(cls):
for p in cls.patchers:
try:
@@ -72,8 +67,7 @@ class AsyncoreConnectionTest(ReactorTestMixin, AsyncorePatcher):
socket_attr_name = 'socket'
def setUp(self):
- if is_monkey_patched():
- raise unittest.SkipTest("Can't test asyncore with monkey patching")
+ super(AsyncoreConnectionTest, self).setUp()
@unittest.skipUnless(has_asyncore, "asyncore has been removed in Python 3.12")
@@ -89,6 +83,4 @@ def _timers(self):
return asyncorereactor._global_loop._timers
def setUp(self):
- if is_monkey_patched():
- raise unittest.SkipTest("Can't test asyncore with monkey patching")
super(TestAsyncoreTimer, self).setUp()
diff --git a/tests/unit/io/test_eventletreactor.py b/tests/unit/io/test_eventletreactor.py
deleted file mode 100644
index 8228884a4a..0000000000
--- a/tests/unit/io/test_eventletreactor.py
+++ /dev/null
@@ -1,78 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import unittest
-
-from tests.unit.io.utils import TimerTestMixin
-from tests import notpypy, EVENT_LOOP_MANAGER
-
-from eventlet import monkey_patch
-from unittest.mock import patch
-
-try:
- from cassandra.io.eventletreactor import EventletConnection
-except ImportError:
- EventletConnection = None # noqa
-
-skip_condition = EventletConnection is None or EVENT_LOOP_MANAGER != "eventlet"
-# There are some issues with some versions of pypy and eventlet
-@notpypy
-@unittest.skipIf(skip_condition, "Skipping the eventlet tests because it's not installed")
-class EventletTimerTest(TimerTestMixin, unittest.TestCase):
-
- connection_class = EventletConnection
-
- @classmethod
- def setUpClass(cls):
- # This is run even though the class is skipped, so we need
- # to make sure no monkey patching is happening
- if skip_condition:
- return
-
- # This is being added temporarily due to a bug in eventlet:
- # https://github.com/eventlet/eventlet/issues/401
- import eventlet
- eventlet.sleep()
- monkey_patch()
- # cls.connection_class = EventletConnection
-
- EventletConnection.initialize_reactor()
- assert EventletConnection._timers is not None
-
- def setUp(self):
- socket_patcher = patch('eventlet.green.socket.socket')
- self.addCleanup(socket_patcher.stop)
- socket_patcher.start()
-
- super(EventletTimerTest, self).setUp()
-
- recv_patcher = patch.object(self.connection._socket,
- 'recv',
- return_value=b'')
- self.addCleanup(recv_patcher.stop)
- recv_patcher.start()
-
- @property
- def create_timer(self):
- return self.connection.create_timer
-
- @property
- def _timers(self):
- return self.connection._timers
-
- # There is no unpatching because there is not a clear way
- # of doing it reliably
diff --git a/tests/unit/io/test_geventreactor.py b/tests/unit/io/test_geventreactor.py
deleted file mode 100644
index 9bf0c7895f..0000000000
--- a/tests/unit/io/test_geventreactor.py
+++ /dev/null
@@ -1,66 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-from unittest.mock import patch
-
-
-from tests.unit.io.utils import TimerTestMixin
-from tests import EVENT_LOOP_MANAGER
-try:
- from cassandra.io.geventreactor import GeventConnection
- import gevent.monkey
-except ImportError:
- GeventConnection = None # noqa
-
-
-skip_condition = GeventConnection is None or EVENT_LOOP_MANAGER != "gevent"
-@unittest.skipIf(skip_condition, "Skipping the gevent tests because it's not installed")
-class GeventTimerTest(TimerTestMixin, unittest.TestCase):
-
- connection_class = GeventConnection
-
- @classmethod
- def setUpClass(cls):
- # This is run even though the class is skipped, so we need
- # to make sure no monkey patching is happening
- if skip_condition:
- return
- # There is no unpatching because there is not a clear way
- # of doing it reliably
- gevent.monkey.patch_all()
- GeventConnection.initialize_reactor()
-
- def setUp(self):
- socket_patcher = patch('gevent.socket.socket')
- self.addCleanup(socket_patcher.stop)
- socket_patcher.start()
-
- super(GeventTimerTest, self).setUp()
-
- recv_patcher = patch.object(self.connection._socket,
- 'recv',
- return_value=b'')
- self.addCleanup(recv_patcher.stop)
- recv_patcher.start()
-
- @property
- def create_timer(self):
- return self.connection.create_timer
-
- @property
- def _timers(self):
- return self.connection._timers
diff --git a/tests/unit/io/test_libevreactor.py b/tests/unit/io/test_libevreactor.py
index a4050c79c1..9a518ccd0a 100644
--- a/tests/unit/io/test_libevreactor.py
+++ b/tests/unit/io/test_libevreactor.py
@@ -26,8 +26,7 @@
except DependencyException:
LibevConnection = None # noqa
-from tests import is_monkey_patched
-from tests.unit.io.utils import ReactorTestMixin, TimerTestMixin, noop_if_monkey_patched
+from tests.unit.io.utils import ReactorTestMixin, TimerTestMixin
class LibevConnectionTest(ReactorTestMixin, unittest.TestCase):
@@ -37,8 +36,6 @@ class LibevConnectionTest(ReactorTestMixin, unittest.TestCase):
null_handle_function_args = None, 0
def setUp(self):
- if is_monkey_patched():
- raise unittest.SkipTest("Can't test libev with monkey patching")
if LibevConnection is None:
raise unittest.SkipTest('libev does not appear to be installed correctly')
LibevConnection.initialize_reactor()
@@ -94,7 +91,6 @@ def test_watchers_are_finished(self):
class LibevTimerPatcher(unittest.TestCase):
@classmethod
- @noop_if_monkey_patched
def setUpClass(cls):
if LibevConnection is None:
raise unittest.SkipTest('libev does not appear to be installed correctly')
@@ -106,7 +102,6 @@ def setUpClass(cls):
p.start()
@classmethod
- @noop_if_monkey_patched
def tearDownClass(cls):
for p in cls.patchers:
try:
@@ -134,8 +129,6 @@ def make_connection(self):
return c
def setUp(self):
- if is_monkey_patched():
- raise unittest.SkipTest("Can't test libev with monkey patching.")
if LibevConnection is None:
raise unittest.SkipTest('libev does not appear to be installed correctly')
diff --git a/tests/unit/io/test_twistedreactor.py b/tests/unit/io/test_twistedreactor.py
deleted file mode 100644
index 67c4d8eaf3..0000000000
--- a/tests/unit/io/test_twistedreactor.py
+++ /dev/null
@@ -1,192 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-from unittest.mock import Mock, patch
-
-from cassandra.connection import DefaultEndPoint
-
-try:
- from twisted.test import proto_helpers
- from twisted.python.failure import Failure
- from cassandra.io import twistedreactor
- from cassandra.io.twistedreactor import TwistedConnection
-except ImportError:
- twistedreactor = TwistedConnection = None # NOQA
-
-
-from cassandra.connection import _Frame
-
-from tests.unit.io.utils import TimerTestMixin
-
-class TestTwistedTimer(TimerTestMixin, unittest.TestCase):
- """
- Simple test class that is used to validate that the TimerManager, and timer
- classes function appropriately with the twisted infrastructure
- """
-
- connection_class = TwistedConnection
-
- @property
- def create_timer(self):
- return self.connection.create_timer
-
- @property
- def _timers(self):
- return self.connection._loop._timers
-
- def setUp(self):
- if twistedreactor is None:
- raise unittest.SkipTest("Twisted libraries not available")
- twistedreactor.TwistedConnection.initialize_reactor()
- super(TestTwistedTimer, self).setUp()
-
-
-class TestTwistedProtocol(unittest.TestCase):
-
- def setUp(self):
- if twistedreactor is None:
- raise unittest.SkipTest("Twisted libraries not available")
- twistedreactor.TwistedConnection.initialize_reactor()
- self.tr = proto_helpers.StringTransportWithDisconnection()
- self.tr.connector = Mock()
- self.mock_connection = Mock()
- self.obj_ut = twistedreactor.TwistedConnectionProtocol(self.mock_connection)
- self.tr.protocol = self.obj_ut
-
- def tearDown(self):
- loop = twistedreactor.TwistedConnection._loop
- if not loop._reactor_stopped():
- loop._cleanup()
-
- def test_makeConnection(self):
- """
- Verify that the protocol class notifies the connection
- object that a successful connection was made.
- """
- self.obj_ut.makeConnection(self.tr)
- self.assertTrue(self.mock_connection.client_connection_made.called)
-
- def test_receiving_data(self):
- """
- Verify that the dataReceived() callback writes the data to
- the connection object's buffer and calls handle_read().
- """
- self.obj_ut.makeConnection(self.tr)
- self.obj_ut.dataReceived('foobar')
- self.assertTrue(self.mock_connection.handle_read.called)
- self.mock_connection._iobuf.write.assert_called_with("foobar")
-
-
-class TestTwistedConnection(unittest.TestCase):
- def setUp(self):
- if twistedreactor is None:
- raise unittest.SkipTest("Twisted libraries not available")
- twistedreactor.TwistedConnection.initialize_reactor()
- self.reactor_cft_patcher = patch(
- 'twisted.internet.reactor.callFromThread')
- self.reactor_run_patcher = patch('twisted.internet.reactor.run')
- self.mock_reactor_cft = self.reactor_cft_patcher.start()
- self.mock_reactor_run = self.reactor_run_patcher.start()
- self.obj_ut = twistedreactor.TwistedConnection(DefaultEndPoint('1.2.3.4'),
- cql_version='3.0.1')
-
- def tearDown(self):
- self.reactor_cft_patcher.stop()
- self.reactor_run_patcher.stop()
-
- def test_connection_initialization(self):
- """
- Verify that __init__() works correctly.
- """
- self.mock_reactor_cft.assert_called_with(self.obj_ut.add_connection)
- self.obj_ut._loop._cleanup()
- self.mock_reactor_run.assert_called_with(installSignalHandlers=False)
-
- def test_client_connection_made(self):
- """
- Verifiy that _send_options_message() is called in
- client_connection_made()
- """
- self.obj_ut._send_options_message = Mock()
- self.obj_ut.client_connection_made(Mock())
- self.obj_ut._send_options_message.assert_called_with()
-
- @patch('twisted.internet.reactor.connectTCP')
- def test_close(self, mock_connectTCP):
- """
- Verify that close() disconnects the connector and errors callbacks.
- """
- transport = Mock()
- self.obj_ut.error_all_requests = Mock()
- self.obj_ut.add_connection()
- self.obj_ut.client_connection_made(transport)
- self.obj_ut.is_closed = False
- self.obj_ut.close()
-
- self.assertTrue(self.obj_ut.connected_event.is_set())
- self.assertTrue(self.obj_ut.error_all_requests.called)
-
- def test_handle_read__incomplete(self):
- """
- Verify that handle_read() processes incomplete messages properly.
- """
- self.obj_ut.process_msg = Mock()
- self.assertEqual(self.obj_ut._iobuf.getvalue(), b'') # buf starts empty
- # incomplete header
- self.obj_ut._iobuf.write(b'\x84\x00\x00\x00\x00')
- self.obj_ut.handle_read()
- self.assertEqual(self.obj_ut._io_buffer.cql_frame_buffer.getvalue(), b'\x84\x00\x00\x00\x00')
-
- # full header, but incomplete body
- self.obj_ut._iobuf.write(b'\x00\x00\x00\x15')
- self.obj_ut.handle_read()
- self.assertEqual(self.obj_ut._io_buffer.cql_frame_buffer.getvalue(),
- b'\x84\x00\x00\x00\x00\x00\x00\x00\x15')
- self.assertEqual(self.obj_ut._current_frame.end_pos, 30)
-
- # verify we never attempted to process the incomplete message
- self.assertFalse(self.obj_ut.process_msg.called)
-
- def test_handle_read__fullmessage(self):
- """
- Verify that handle_read() processes complete messages properly.
- """
- self.obj_ut.process_msg = Mock()
- self.assertEqual(self.obj_ut._iobuf.getvalue(), b'') # buf starts empty
-
- # write a complete message, plus 'NEXT' (to simulate next message)
- # assumes protocol v3+ as default Connection.protocol_version
- body = b'this is the drum roll'
- extra = b'NEXT'
- self.obj_ut._iobuf.write(
- b'\x84\x01\x00\x02\x03\x00\x00\x00\x15' + body + extra)
- self.obj_ut.handle_read()
- self.assertEqual(self.obj_ut._io_buffer.cql_frame_buffer.getvalue(), extra)
- self.obj_ut.process_msg.assert_called_with(
- _Frame(version=4, flags=1, stream=2, opcode=3, body_offset=9, end_pos=9 + len(body)), body)
-
- @patch('twisted.internet.reactor.connectTCP')
- def test_push(self, mock_connectTCP):
- """
- Verifiy that push() calls transport.write(data).
- """
- self.obj_ut.add_connection()
- transport_mock = Mock()
- self.obj_ut.transport = transport_mock
- self.obj_ut.push('123 pickup')
- self.mock_reactor_cft.assert_called_with(
- transport_mock.write, '123 pickup')
diff --git a/tests/unit/io/utils.py b/tests/unit/io/utils.py
index d4483d08c7..f17a1277a6 100644
--- a/tests/unit/io/utils.py
+++ b/tests/unit/io/utils.py
@@ -22,7 +22,6 @@
write_stringmultimap, write_int, write_string, SupportedMessage, ReadyMessage, ServerError
)
from cassandra.connection import DefaultEndPoint
-from tests import is_monkey_patched
import io
import random
@@ -135,17 +134,6 @@ def submit_and_wait_for_completion(unit_test, create_timer, start, end, incremen
for callback in completed_callbacks:
unit_test.assertAlmostEqual(callback.expected_wait, callback.get_wait_time(), delta=.15)
-
-def noop_if_monkey_patched(f):
- if is_monkey_patched():
- @wraps(f)
- def noop(*args, **kwargs):
- return
- return noop
-
- return f
-
-
class TimerTestMixin(object):
connection_class = connection = None
diff --git a/tox.ini b/tox.ini
index 19e610616e..5ec4139a90 100644
--- a/tox.ini
+++ b/tox.ini
@@ -5,9 +5,6 @@ envlist = py{310,311,312,313,314},pypy
deps = pytest
packaging
cython>=3.0
- eventlet
- gevent
- twisted[tls]
pure-sasl
kerberos
futurist
@@ -23,24 +20,3 @@ setenv = LIBEV_EMBED=0
changedir = {envtmpdir}
commands = pytest -v {toxinidir}/tests/unit/
-
-[testenv:gevent_loop]
-deps = {[base]deps}
-
-setenv = LIBEV_EMBED=0
- CARES_EMBED=0
- EVENT_LOOP_MANAGER=gevent
-changedir = {envtmpdir}
-commands =
- pytest -v {toxinidir}/tests/unit/io/test_geventreactor.py
-
-
-[testenv:eventlet_loop]
-deps = {[base]deps}
-
-setenv = LIBEV_EMBED=0
- CARES_EMBED=0
- EVENT_LOOP_MANAGER=eventlet
-changedir = {envtmpdir}
-commands =
- pytest -v {toxinidir}/tests/unit/io/test_eventletreactor.py