Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions mssql_python/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(self, connection) -> None:
Args:
connection: Database connection object.
"""
self.connection = connection
self._connection = connection # Store as private attribute
# self.connection.autocommit = False
self.hstmt = None
self._initialize_cursor()
Expand Down Expand Up @@ -426,7 +426,7 @@ def _allocate_statement_handle(self):
"""
Allocate the DDBC statement handle.
"""
self.hstmt = self.connection._conn.alloc_statement_handle()
self.hstmt = self._connection._conn.alloc_statement_handle()

def _reset_cursor(self) -> None:
"""
Expand Down Expand Up @@ -564,6 +564,24 @@ def rownumber(self):

return self._rownumber

@property
def connection(self):
"""
DB-API 2.0 attribute: Connection object that created this cursor.

This is a read-only reference to the Connection object that was used to create
this cursor. This attribute is useful for polymorphic code that needs access
to connection-level functionality.

Returns:
Connection: The connection object that created this cursor.

Note:
This attribute is read-only as specified by DB-API 2.0. Attempting to
assign to this attribute will raise an AttributeError.
"""
return self._connection

def _reset_rownumber(self):
"""Reset the rownumber tracking when starting a new result set."""
self._rownumber = 0
Expand Down
237 changes: 237 additions & 0 deletions tests/test_004_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2045,3 +2045,240 @@ def test_close(db_connection):
pytest.fail(f"Cursor close test failed: {e}")
finally:
cursor = db_connection.cursor()

def test_cursor_connection_attribute_exists(cursor, db_connection):
"""Test that cursor.connection attribute exists and returns the correct connection"""
assert hasattr(cursor, 'connection'), "Cursor should have connection attribute"
assert cursor.connection is db_connection, "Cursor.connection should return the same connection object used to create it"
assert id(cursor.connection) == id(db_connection), "Cursor.connection should be the exact same object reference"

def test_cursor_connection_attribute_readonly(cursor, db_connection):
"""Test that cursor.connection is read-only"""
# Test that we can read the connection
conn = cursor.connection
assert conn is db_connection, "Should be able to read cursor.connection"

# Test that we cannot write to the connection attribute
try:
cursor.connection = None
pytest.fail("Should not be able to assign to cursor.connection (read-only attribute)")
except AttributeError as e:
assert "can't set attribute" in str(e).lower() or "has no setter" in str(e).lower(), "Should raise AttributeError for read-only property"

# Verify the connection is still intact after failed assignment
assert cursor.connection is db_connection, "Connection should remain unchanged after failed assignment"

def test_cursor_connection_multiple_cursors(db_connection):
"""Test cursor.connection with multiple cursors from same connection"""
# Create multiple cursors from the same connection
cursor1 = db_connection.cursor()
cursor2 = db_connection.cursor()
cursor3 = db_connection.cursor()

try:
# All cursors should reference the same connection
assert cursor1.connection is db_connection, "First cursor should reference the connection"
assert cursor2.connection is db_connection, "Second cursor should reference the connection"
assert cursor3.connection is db_connection, "Third cursor should reference the connection"

# All cursors should reference the exact same connection object
assert cursor1.connection is cursor2.connection, "All cursors should reference the same connection"
assert cursor2.connection is cursor3.connection, "All cursors should reference the same connection"
assert cursor1.connection is cursor3.connection, "All cursors should reference the same connection"

# Test that they can all be used independently
cursor1.execute("SELECT 1 as test1")
result1 = cursor1.fetchone() # Fetch immediately to free connection

cursor2.execute("SELECT 2 as test2")
result2 = cursor2.fetchone() # Fetch immediately to free connection

cursor3.execute("SELECT 3 as test3")
result3 = cursor3.fetchone() # Fetch immediately to free connection

assert result1[0] == 1, "First cursor should work independently"
assert result2[0] == 2, "Second cursor should work independently"
assert result3[0] == 3, "Third cursor should work independently"

finally:
# Clean up cursors
for cursor in [cursor1, cursor2, cursor3]:
try:
if not cursor.closed:
cursor.close()
except:
pass

def test_cursor_connection_multi_connection_environment(conn_str):
"""Test cursor.connection in multi-connection environment"""
from mssql_python import connect

# Create multiple connections
conn1 = connect(conn_str)
conn2 = connect(conn_str)

try:
# Create cursors from different connections
cursor1 = conn1.cursor()
cursor2 = conn2.cursor()

# Each cursor should reference its own connection
assert cursor1.connection is conn1, "First cursor should reference first connection"
assert cursor2.connection is conn2, "Second cursor should reference second connection"

# Cursors should reference different connections
assert cursor1.connection is not cursor2.connection, "Cursors from different connections should reference different connections"

# Test polymorphic code example - function that works with any cursor
def get_connection_info(cursor):
"""Example of polymorphic code using cursor.connection"""
return {
'cursor_id': id(cursor),
'connection_id': id(cursor.connection),
'autocommit': cursor.connection.autocommit
}

info1 = get_connection_info(cursor1)
info2 = get_connection_info(cursor2)

assert info1['connection_id'] != info2['connection_id'], "Should have different connection IDs"
assert info1['cursor_id'] != info2['cursor_id'], "Should have different cursor IDs"

# Test that both cursors work with their respective connections
cursor1.execute("SELECT 'conn1' as source")
cursor2.execute("SELECT 'conn2' as source")

result1 = cursor1.fetchone()
result2 = cursor2.fetchone()

assert result1[0] == 'conn1', "First cursor should work with first connection"
assert result2[0] == 'conn2', "Second cursor should work with second connection"

finally:
# Clean up
try:
conn1.close()
except:
pass
try:
conn2.close()
except:
pass

def test_cursor_connection_after_cursor_close(db_connection):
"""Test that cursor.connection is still accessible after cursor is closed"""
cursor = db_connection.cursor()

# Verify connection works before closing cursor
assert cursor.connection is db_connection, "Connection should be accessible before close"

# Close the cursor
cursor.close()
assert cursor.closed, "Cursor should be marked as closed"

# Connection should still be accessible even after cursor is closed
assert cursor.connection is db_connection, "Connection should still be accessible after cursor close"

# Should still be the same object reference
assert id(cursor.connection) == id(db_connection), "Should still be the same connection object"

def test_cursor_connection_after_connection_close(conn_str):
"""Test cursor.connection behavior after connection is closed"""
from mssql_python import connect

temp_conn = connect(conn_str)
cursor = temp_conn.cursor()

# Verify connection works initially
assert cursor.connection is temp_conn, "Connection should be accessible initially"

# Close the connection
temp_conn.close()

# Connection reference should still exist (it's just a reference)
assert cursor.connection is temp_conn, "Connection reference should still exist"

# But the connection itself should be closed
assert temp_conn._closed, "Connection should be marked as closed"

def test_cursor_connection_polymorphic_code_examples(db_connection):
"""Test real-world examples of polymorphic code using cursor.connection"""

def execute_with_autocommit_control(cursor, sql, autocommit_mode=True):
"""Example function that uses cursor.connection for autocommit control"""
original_autocommit = cursor.connection.autocommit
try:
cursor.connection.autocommit = autocommit_mode
cursor.execute(sql)
return cursor.fetchall() if sql.strip().upper().startswith('SELECT') else cursor.rowcount
finally:
cursor.connection.autocommit = original_autocommit

def get_cursor_metadata(cursor):
"""Example function that extracts metadata using cursor.connection"""
return {
'connection_closed': getattr(cursor.connection, '_closed', False),
'autocommit': cursor.connection.autocommit,
'cursor_closed': cursor.closed,
'has_exception_attributes': all(hasattr(cursor.connection, attr) for attr in
['Error', 'InterfaceError', 'DatabaseError'])
}

# Test the polymorphic functions
cursor = db_connection.cursor()

try:
# Test autocommit control function
result = execute_with_autocommit_control(cursor, "SELECT 1 as test", autocommit_mode=True)
assert len(result) == 1, "Should return one row"
assert result[0][0] == 1, "Should return correct value"

# Test metadata extraction function
metadata = get_cursor_metadata(cursor)
assert metadata['connection_closed'] == False, "Connection should not be closed"
assert isinstance(metadata['autocommit'], bool), "Autocommit should be boolean"
assert metadata['cursor_closed'] == False, "Cursor should not be closed"

finally:
cursor.close()

def test_cursor_connection_transaction_control(db_connection):
"""Test using cursor.connection for transaction control"""
cursor = db_connection.cursor()

try:
# Create a test table
cursor.execute("CREATE TABLE #test_cursor_connection (id INT, value VARCHAR(50))")
db_connection.commit()

# Test transaction control through cursor.connection
original_autocommit = cursor.connection.autocommit
cursor.connection.autocommit = False

try:
# Insert data in transaction
cursor.execute("INSERT INTO #test_cursor_connection VALUES (1, 'test')")

# Verify data exists before commit
cursor.execute("SELECT COUNT(*) FROM #test_cursor_connection")
count = cursor.fetchone()[0]
assert count == 1, "Data should exist before commit"

# Rollback the transaction using cursor.connection
cursor.connection.rollback()

# Verify data was rolled back
cursor.execute("SELECT COUNT(*) FROM #test_cursor_connection")
count = cursor.fetchone()[0]
assert count == 0, "Data should be rolled back"

finally:
cursor.connection.autocommit = original_autocommit

finally:
try:
cursor.execute("DROP TABLE #test_cursor_connection")
db_connection.commit()
except:
pass
cursor.close()