Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 100 additions & 6 deletions cmd/e2e/alert/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,63 @@ type AlertRequest struct {
}

type AlertPayload struct {
Topic string `json:"topic"`
Timestamp time.Time `json:"timestamp"`
Data json.RawMessage `json:"data"`
}

// ConsecutiveFailureAlert is a parsed alert for "alert.consecutive_failure"
type ConsecutiveFailureAlert struct {
Topic string `json:"topic"`
Timestamp time.Time `json:"timestamp"`
Data ConsecutiveFailureData `json:"data"`
}

// DestinationDisabledAlert is a parsed alert for "alert.destination.disabled"
type DestinationDisabledAlert struct {
Topic string `json:"topic"`
Timestamp time.Time `json:"timestamp"`
Data DestinationDisabledData `json:"data"`
}

// AlertDestination matches internal/alert.AlertDestination
type AlertDestination struct {
ID string `json:"id"`
TenantID string `json:"tenant_id"`
Type string `json:"type"`
Topics []string `json:"topics"`
Filter map[string]any `json:"filter,omitempty"`
Config map[string]string `json:"config"`
Metadata map[string]string `json:"metadata,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
DisabledAt *time.Time `json:"disabled_at"`
}

// ConsecutiveFailures represents the nested consecutive failure state
type ConsecutiveFailures struct {
Current int `json:"current"`
Max int `json:"max"`
Threshold int `json:"threshold"`
}

// ConsecutiveFailureData matches internal/alert.ConsecutiveFailureData
type ConsecutiveFailureData struct {
MaxConsecutiveFailures int `json:"max_consecutive_failures"`
ConsecutiveFailures int `json:"consecutive_failures"`
WillDisable bool `json:"will_disable"`
Destination *models.Destination `json:"destination"`
Data map[string]interface{} `json:"data"`
TenantID string `json:"tenant_id"`
Attempt *models.Attempt `json:"attempt"`
Event *models.Event `json:"event"`
Destination *AlertDestination `json:"destination"`
ConsecutiveFailures ConsecutiveFailures `json:"consecutive_failures"`
}

// DestinationDisabledData matches the expected payload for "alert.destination.disabled"
type DestinationDisabledData struct {
TenantID string `json:"tenant_id"`
Destination *AlertDestination `json:"destination"`
DisabledAt time.Time `json:"disabled_at"`
Reason string `json:"reason"`
Attempt *models.Attempt `json:"attempt,omitempty"`
Event *models.Event `json:"event,omitempty"`
}

type AlertMockServer struct {
Expand Down Expand Up @@ -117,14 +163,23 @@ func (s *AlertMockServer) handleAlert(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}

// alertDataWithDestination is used to extract destination from any alert type
type alertDataWithDestination struct {
Destination *AlertDestination `json:"destination"`
}

// Helper methods for assertions
func (s *AlertMockServer) GetAlertsForDestination(destinationID string) []AlertRequest {
s.mu.RLock()
defer s.mu.RUnlock()

var filtered []AlertRequest
for _, alert := range s.alerts {
if alert.Alert.Data.Destination != nil && alert.Alert.Data.Destination.ID == destinationID {
var data alertDataWithDestination
if err := json.Unmarshal(alert.Alert.Data, &data); err != nil {
continue
}
if data.Destination != nil && data.Destination.ID == destinationID {
filtered = append(filtered, alert)
}
}
Expand All @@ -141,3 +196,42 @@ func (s *AlertMockServer) GetLastAlert() *AlertRequest {
alert := s.alerts[len(s.alerts)-1]
return &alert
}

// GetAlertsForDestinationByTopic returns alerts filtered by destination ID and topic
func (s *AlertMockServer) GetAlertsForDestinationByTopic(destinationID, topic string) []AlertRequest {
s.mu.RLock()
defer s.mu.RUnlock()

var filtered []AlertRequest
for _, alert := range s.alerts {
if alert.Alert.Topic != topic {
continue
}
var data alertDataWithDestination
if err := json.Unmarshal(alert.Alert.Data, &data); err != nil {
continue
}
if data.Destination != nil && data.Destination.ID == destinationID {
filtered = append(filtered, alert)
}
}
return filtered
}

// ParseConsecutiveFailureData parses the Data field as ConsecutiveFailureData
func (a *AlertRequest) ParseConsecutiveFailureData() (*ConsecutiveFailureData, error) {
var data ConsecutiveFailureData
if err := json.Unmarshal(a.Alert.Data, &data); err != nil {
return nil, err
}
return &data, nil
}

// ParseDestinationDisabledData parses the Data field as DestinationDisabledData
func (a *AlertRequest) ParseDestinationDisabledData() (*DestinationDisabledData, error) {
var data DestinationDisabledData
if err := json.Unmarshal(a.Alert.Data, &data); err != nil {
return nil, err
}
return &data, nil
}
141 changes: 134 additions & 7 deletions cmd/e2e/alerts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ func (s *basicSuite) TestAlerts_ConsecutiveFailuresTriggerAlertCallback() {
dest := s.createWebhookDestination(tenant.ID, "*", withSecret(testSecret))

// Publish 20 failing events
for i := 0; i < 20; i++ {
for i := range 20 {
s.publish(tenant.ID, "user.created", map[string]any{
"index": i,
}, withPublishMetadata(map[string]string{"should_err": "true"}))
Expand All @@ -22,14 +22,46 @@ func (s *basicSuite) TestAlerts_ConsecutiveFailuresTriggerAlertCallback() {

// Wait for 4 alert callbacks to be processed
s.waitForAlerts(dest.ID, 4)
alerts := s.alertServer.GetAlertsForDestination(dest.ID)
alerts := s.alertServer.GetAlertsForDestinationByTopic(dest.ID, "alert.destination.consecutive_failure")
s.Require().Len(alerts, 4, "should have 4 alerts")

expectedCounts := []int{10, 14, 18, 20}
for i, alert := range alerts {
// Parse alert data
data, err := alert.ParseConsecutiveFailureData()
s.Require().NoError(err, "failed to parse consecutive failure data")

// Auth header assertion
s.Equal(fmt.Sprintf("Bearer %s", s.config.APIKey), alert.AuthHeader, "auth header should match")
s.Equal(expectedCounts[i], alert.Alert.Data.ConsecutiveFailures,

// Topic assertion
s.Equal("alert.destination.consecutive_failure", alert.Alert.Topic, "alert topic should be alert.consecutive_failure")

// TenantID assertion
s.NotEmpty(data.TenantID, "alert should have tenant_id")
s.Equal(tenant.ID, data.TenantID, "alert tenant_id should match")

// Destination assertions
s.Require().NotNil(data.Destination, "alert should have destination")
s.Equal(dest.ID, data.Destination.ID, "alert destination ID should match")
s.Equal(tenant.ID, data.Destination.TenantID, "alert destination tenant_id should match")
s.Equal("webhook", data.Destination.Type, "alert destination type should be webhook")

// Event assertions
s.NotEmpty(data.Event.ID, "alert event should have ID")
s.Equal("user.created", data.Event.Topic, "alert event topic should match")
s.NotNil(data.Event.Data, "alert event should have data")

// ConsecutiveFailures assertions
s.Equal(expectedCounts[i], data.ConsecutiveFailures.Current,
"alert %d should have %d consecutive failures", i, expectedCounts[i])
s.Equal(20, data.ConsecutiveFailures.Max, "max consecutive failures should be 20")
s.Greater(data.ConsecutiveFailures.Threshold, 0, "threshold should be > 0")

// Attempt assertion
s.Require().NotNil(data.Attempt, "alert should have attempt")
s.NotEmpty(data.Attempt.ID, "attempt should have ID")
s.NotEmpty(data.Attempt.Status, "attempt should have status")
}
}

Expand All @@ -38,7 +70,7 @@ func (s *basicSuite) TestAlerts_SuccessResetsConsecutiveFailureCounter() {
dest := s.createWebhookDestination(tenant.ID, "*", withSecret(testSecret))

// First batch: 14 failures
for i := 0; i < 14; i++ {
for i := range 14 {
s.publish(tenant.ID, "user.created", map[string]any{
"index": i,
}, withPublishMetadata(map[string]string{"should_err": "true"}))
Expand All @@ -56,7 +88,7 @@ func (s *basicSuite) TestAlerts_SuccessResetsConsecutiveFailureCounter() {
s.waitForNewMockServerEvents(dest.mockID, 15)

// Second batch: 14 more failures
for i := 0; i < 14; i++ {
for i := range 14 {
s.publish(tenant.ID, "user.created", map[string]any{
"index": i,
}, withPublishMetadata(map[string]string{"should_err": "true"}))
Expand All @@ -71,13 +103,108 @@ func (s *basicSuite) TestAlerts_SuccessResetsConsecutiveFailureCounter() {

// Wait for 4 alert callbacks: [10, 14] from first batch, [10, 14] from second batch
s.waitForAlerts(dest.ID, 4)
alerts := s.alertServer.GetAlertsForDestination(dest.ID)
alerts := s.alertServer.GetAlertsForDestinationByTopic(dest.ID, "alert.destination.consecutive_failure")
s.Require().Len(alerts, 4, "should have 4 alerts")

expectedCounts := []int{10, 14, 10, 14}
for i, alert := range alerts {
// Parse alert data
data, err := alert.ParseConsecutiveFailureData()
s.Require().NoError(err, "failed to parse consecutive failure data")

// Auth header assertion
s.Equal(fmt.Sprintf("Bearer %s", s.config.APIKey), alert.AuthHeader, "auth header should match")
s.Equal(expectedCounts[i], alert.Alert.Data.ConsecutiveFailures,

// Topic assertion
s.Equal("alert.destination.consecutive_failure", alert.Alert.Topic, "alert topic should be alert.consecutive_failure")

// TenantID assertion
s.NotEmpty(data.TenantID, "alert should have tenant_id")
s.Equal(tenant.ID, data.TenantID, "alert tenant_id should match")

// Destination assertions
s.Require().NotNil(data.Destination, "alert should have destination")
s.Equal(dest.ID, data.Destination.ID, "alert destination ID should match")
s.Equal(tenant.ID, data.Destination.TenantID, "alert destination tenant_id should match")
s.Equal("webhook", data.Destination.Type, "alert destination type should be webhook")

// Event assertions
s.NotEmpty(data.Event.ID, "alert event should have ID")
s.Equal("user.created", data.Event.Topic, "alert event topic should match")
s.NotNil(data.Event.Data, "alert event should have data")

// ConsecutiveFailures assertions
s.Equal(expectedCounts[i], data.ConsecutiveFailures.Current,
"alert %d should have %d consecutive failures", i, expectedCounts[i])
s.Equal(20, data.ConsecutiveFailures.Max, "max consecutive failures should be 20")
s.Greater(data.ConsecutiveFailures.Threshold, 0, "threshold should be > 0")
s.Less(data.ConsecutiveFailures.Threshold, 100, "threshold should be < 100 (counter resets)")

// Attempt assertion
s.Require().NotNil(data.Attempt, "alert should have attempt")
s.NotEmpty(data.Attempt.ID, "attempt should have ID")
}
}

func (s *basicSuite) TestAlerts_DestinationDisabledCallback() {
tenant := s.createTenant()
dest := s.createWebhookDestination(tenant.ID, "*", withSecret(testSecret))

// Publish 20 failing events to trigger auto-disable
for i := range 20 {
s.publish(tenant.ID, "user.created", map[string]any{
"index": i,
}, withPublishMetadata(map[string]string{"should_err": "true"}))
}

// Wait for destination to be disabled (sync point for all 20 deliveries)
s.waitForNewDestinationDisabled(tenant.ID, dest.ID)

// Verify destination is disabled
got := s.getDestination(tenant.ID, dest.ID)
s.NotNil(got.DisabledAt, "destination should be disabled")

// Wait for the destination.disabled alert callback
s.waitForAlertsByTopic(dest.ID, "alert.destination.disabled", 1)
alerts := s.alertServer.GetAlertsForDestinationByTopic(dest.ID, "alert.destination.disabled")
s.Require().Len(alerts, 1, "should have 1 destination.disabled alert")

alert := alerts[0]
data, err := alert.ParseDestinationDisabledData()
s.Require().NoError(err, "failed to parse destination disabled data")

// Auth header assertion
s.Equal(fmt.Sprintf("Bearer %s", s.config.APIKey), alert.AuthHeader, "auth header should match")

// Topic assertion
s.Equal("alert.destination.disabled", alert.Alert.Topic, "alert topic should be alert.destination.disabled")

// TenantID assertion
s.NotEmpty(data.TenantID, "alert should have tenant_id")
s.Equal(tenant.ID, data.TenantID, "alert tenant_id should match")

// Destination assertions
s.Require().NotNil(data.Destination, "alert should have destination")
s.Equal(dest.ID, data.Destination.ID, "alert destination ID should match")
s.Equal(tenant.ID, data.Destination.TenantID, "alert destination tenant_id should match")
s.Equal("webhook", data.Destination.Type, "alert destination type should be webhook")
s.NotNil(data.Destination.DisabledAt, "destination should have disabled_at set")

// DisabledAt assertion
s.False(data.DisabledAt.IsZero(), "disabled_at should not be zero")

// Event assertions (optional but expected)
if data.Event != nil {
s.NotEmpty(data.Event.ID, "event should have ID")
s.Equal("user.created", data.Event.Topic, "event topic should match")
}

// Attempt assertions (optional but expected)
if data.Attempt != nil {
s.NotEmpty(data.Attempt.ID, "attempt should have ID")
s.NotEmpty(data.Attempt.Status, "attempt should have status")
}

// Reason assertion
s.Equal("consecutive_failure", data.Reason, "reason should be consecutive_failure")
}
17 changes: 17 additions & 0 deletions cmd/e2e/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,23 @@ func (s *basicSuite) waitForAlerts(destID string, count int) {
s.Require().FailNowf("timeout", "timed out waiting for %d alerts for %s (got %d)", count, destID, lastCount)
}

// waitForAlertsByTopic polls until at least count alerts with the specific topic exist for the destination.
func (s *basicSuite) waitForAlertsByTopic(destID, topic string, count int) {
s.T().Helper()
timeout := alertPollTimeout
deadline := time.Now().Add(timeout)
var lastCount int

for time.Now().Before(deadline) {
lastCount = len(s.alertServer.GetAlertsForDestinationByTopic(destID, topic))
if lastCount >= count {
return
}
time.Sleep(100 * time.Millisecond)
}
s.Require().FailNowf("timeout", "timed out waiting for %d %s alerts for %s (got %d)", count, topic, destID, lastCount)
}

// =============================================================================
// Absence assertion
// =============================================================================
Expand Down
Loading
Loading