2323from databricks .sql .telemetry .circuit_breaker_manager import CircuitBreakerManager
2424
2525
26+ def wait_for_circuit_state (circuit_breaker , expected_state , timeout = 5 ):
27+ """
28+ Wait for circuit breaker to reach expected state with polling.
29+
30+ Args:
31+ circuit_breaker: The circuit breaker instance to monitor
32+ expected_state: The expected state (STATE_OPEN, STATE_CLOSED, STATE_HALF_OPEN)
33+ timeout: Maximum time to wait in seconds
34+
35+ Returns:
36+ True if state reached, False if timeout
37+ """
38+ start = time .time ()
39+ while time .time () - start < timeout :
40+ if circuit_breaker .current_state == expected_state :
41+ return True
42+ time .sleep (0.1 ) # Poll every 100ms
43+ return False
44+
45+
46+ def wait_for_circuit_state_multiple (circuit_breaker , expected_states , timeout = 5 ):
47+ """
48+ Wait for circuit breaker to reach one of multiple expected states.
49+
50+ Args:
51+ circuit_breaker: The circuit breaker instance to monitor
52+ expected_states: List of acceptable states
53+ timeout: Maximum time to wait in seconds
54+
55+ Returns:
56+ True if any state reached, False if timeout
57+ """
58+ start = time .time ()
59+ while time .time () - start < timeout :
60+ if circuit_breaker .current_state in expected_states :
61+ return True
62+ time .sleep (0.1 )
63+ return False
64+
65+
2666@pytest .fixture (autouse = True )
2767def aggressive_circuit_breaker_config ():
2868 """
@@ -107,9 +147,13 @@ def mock_request(*args, **kwargs):
107147 time .sleep (0.5 )
108148
109149 if should_trigger :
110- # Circuit should be OPEN after 2 rate-limit failures
150+ # Wait for circuit to open (async telemetry may take time)
151+ assert wait_for_circuit_state (circuit_breaker , STATE_OPEN , timeout = 5 ), \
152+ f"Circuit didn't open within 5s, state: { circuit_breaker .current_state } "
153+
154+ # Circuit should be OPEN after rate-limit failures
111155 assert circuit_breaker .current_state == STATE_OPEN
112- assert circuit_breaker .fail_counter == 2
156+ assert circuit_breaker .fail_counter >= 2 # At least 2 failures
113157
114158 # Track requests before another query
115159 requests_before = request_count ["count" ]
@@ -197,7 +241,9 @@ def mock_conditional_request(*args, **kwargs):
197241 cursor .fetchone ()
198242 time .sleep (2 )
199243
200- assert circuit_breaker .current_state == STATE_OPEN
244+ # Wait for circuit to open
245+ assert wait_for_circuit_state (circuit_breaker , STATE_OPEN , timeout = 5 ), \
246+ f"Circuit didn't open, state: { circuit_breaker .current_state } "
201247
202248 # Wait for reset timeout (5 seconds in test)
203249 time .sleep (6 )
@@ -208,24 +254,20 @@ def mock_conditional_request(*args, **kwargs):
208254 # Execute query to trigger HALF_OPEN state
209255 cursor .execute ("SELECT 3" )
210256 cursor .fetchone ()
211- time .sleep (1 )
212257
213- # Circuit should be recovering
214- assert circuit_breaker .current_state in [
215- STATE_HALF_OPEN ,
216- STATE_CLOSED ,
217- ], f"Circuit should be recovering, but is { circuit_breaker .current_state } "
258+ # Wait for circuit to start recovering
259+ assert wait_for_circuit_state_multiple (
260+ circuit_breaker , [STATE_HALF_OPEN , STATE_CLOSED ], timeout = 5
261+ ), f"Circuit didn't recover, state: { circuit_breaker .current_state } "
218262
219263 # Execute more queries to fully recover
220264 cursor .execute ("SELECT 4" )
221265 cursor .fetchone ()
222- time .sleep (1 )
223266
224- current_state = circuit_breaker .current_state
225- assert current_state in [
226- STATE_CLOSED ,
227- STATE_HALF_OPEN ,
228- ], f"Circuit should recover to CLOSED or HALF_OPEN, got { current_state } "
267+ # Wait for full recovery
268+ assert wait_for_circuit_state_multiple (
269+ circuit_breaker , [STATE_CLOSED , STATE_HALF_OPEN ], timeout = 5
270+ ), f"Circuit didn't fully recover, state: { circuit_breaker .current_state } "
229271
230272
231273if __name__ == "__main__" :
0 commit comments