Skip to content

Commit

Permalink
Add ProcessGuarantee to WindowConfig (#792)
Browse files Browse the repository at this point in the history
* Add `ProcessGuarantee` to WindowConfig

* Fix error

* Fix ci
  • Loading branch information
jiangpengcheng authored Nov 22, 2024
1 parent 3c92372 commit f8172da
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 12 deletions.
14 changes: 13 additions & 1 deletion .ci/helm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,22 @@ function ci::verify_crypto_function() {
function ci::send_test_data() {
inputtopic=$1
inputmessage=$2
kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-client produce -m "${inputmessage}" -n 100 "${inputtopic}"
count=$3
kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-client produce -m "${inputmessage}" -n $count "${inputtopic}"
return 0
}

function ci::verify_backlog() {
topic=$1
sub=$2
expected=$3
BACKLOG=$(kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin topics stats $topic | grep msgBacklog)
if [[ "$BACKLOG" == *"\"msgBacklog\" : $expected"* ]]; then
return 0
fi
return 1
}

function ci::verify_exclamation_function() {
inputtopic=$1
outputtopic=$2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ spec:
windowConfig:
windowLengthCount: 10
slidingIntervalCount: 5
processingGuarantee: ATLEAST_ONCE
# the processingGuarantee should be manual for window function
# see: https://github.com/apache/pulsar/pull/16279/files#diff-c77c024ccb31c94a7aa80cb8e96d7e370709157bdc104a1be7867fb6c7aa0586R318-R319
processingGuarantee: manual
subscriptionPosition: earliest
---
apiVersion: v1
Expand Down
33 changes: 31 additions & 2 deletions .ci/tests/integration/cases/logging-window-function/verify.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,46 @@ if [ $? -ne 0 ]; then
exit 1
fi

verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "persistent://public/default/window-function-input-topic" "test-message" 2>&1)
verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "persistent://public/default/window-function-input-topic" "test-message" 3 2>&1)
if [ $? -ne 0 ]; then
echo "$verify_java_result"
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
exit 1
fi

sleep 3

# the 3 messages will not be processed, so backlog should be 3
verify_backlog_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_backlog "persistent://public/default/window-function-input-topic" "public/default/window-function-sample" 3 2>&1)
if [ $? -ne 0 ]; then
echo "$verify_backlog_result"
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
exit 1
fi

# it will fire the window with first 5 messages when get the 5th message, and then fire again with 10 messages when get 10th message
verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "persistent://public/default/window-function-input-topic" "test-message" 7 2>&1)
if [ $? -ne 0 ]; then
echo "$verify_java_result"
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
exit 1
fi

# there is a bug in upstream that messages don't get ack if the function return null
# should be fixed by: https://github.com/apache/pulsar/pull/23618
#sleep 3
#
#verify_backlog_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_backlog "persistent://public/default/window-function-input-topic" "public/default/window-function-sample" 0 2>&1)
#if [ $? -ne 0 ]; then
# echo "$verify_backlog_result"
# kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
# exit 1
#fi

verify_log_result=$(kubectl logs -l compute.functionmesh.io/name=window-function-sample --tail=-1 | grep -e "-window-log" | wc -l)
if [ $verify_log_result -ne 0 ]; then
sub_name=$(echo $RANDOM | md5sum | head -c 20; echo;)
verify_log_topic_result=$(kubectl exec -n ${PULSAR_NAMESPACE} ${PULSAR_RELEASE_NAME}-pulsar-broker-0 -- bin/pulsar-client consume -n 10 -s $sub_name --subscription-position Earliest "persistent://public/default/window-function-logs" | grep -e "-window-log" | wc -l)
verify_log_topic_result=$(kubectl exec -n ${PULSAR_NAMESPACE} ${PULSAR_RELEASE_NAME}-pulsar-broker-0 -- bin/pulsar-client consume -n 15 -s $sub_name --subscription-position Earliest "persistent://public/default/window-function-logs" | grep -e "-window-log" | wc -l)
if [ $verify_log_topic_result -ne 0 ]; then
echo "e2e-test: ok" | yq eval -
else
Expand Down
23 changes: 14 additions & 9 deletions api/compute/v1alpha1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,10 @@ const (
Manual ProcessGuarantee = "manual"
)

// WindowProcessGuarantee enum type
// +kubebuilder:validation:Enum=ATLEAST_ONCE;ATMOST_ONCE
type WindowProcessGuarantee string

// LogTopicAgent enum type
// +kubebuilder:validation:Enum=runtime;sidecar
type LogTopicAgent string
Expand Down Expand Up @@ -533,15 +537,16 @@ type LogConfig struct {
}

type WindowConfig struct {
ActualWindowFunctionClassName string `json:"actualWindowFunctionClassName"`
WindowLengthCount *int32 `json:"windowLengthCount,omitempty"`
WindowLengthDurationMs *int64 `json:"windowLengthDurationMs,omitempty"`
SlidingIntervalCount *int32 `json:"slidingIntervalCount,omitempty"`
SlidingIntervalDurationMs *int64 `json:"slidingIntervalDurationMs,omitempty"`
LateDataTopic string `json:"lateDataTopic,omitempty"`
MaxLagMs *int64 `json:"maxLagMs,omitempty"`
WatermarkEmitIntervalMs *int64 `json:"watermarkEmitIntervalMs,omitempty"`
TimestampExtractorClassName *string `json:"timestampExtractorClassName,omitempty"`
ActualWindowFunctionClassName string `json:"actualWindowFunctionClassName"`
WindowLengthCount *int32 `json:"windowLengthCount,omitempty"`
WindowLengthDurationMs *int64 `json:"windowLengthDurationMs,omitempty"`
SlidingIntervalCount *int32 `json:"slidingIntervalCount,omitempty"`
SlidingIntervalDurationMs *int64 `json:"slidingIntervalDurationMs,omitempty"`
LateDataTopic string `json:"lateDataTopic,omitempty"`
MaxLagMs *int64 `json:"maxLagMs,omitempty"`
WatermarkEmitIntervalMs *int64 `json:"watermarkEmitIntervalMs,omitempty"`
TimestampExtractorClassName *string `json:"timestampExtractorClassName,omitempty"`
ProcessingGuarantee WindowProcessGuarantee `json:"processingGuarantee,omitempty"`
}

type VPASpec struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3786,6 +3786,11 @@ spec:
maxLagMs:
format: int64
type: integer
processingGuarantee:
enum:
- ATLEAST_ONCE
- ATMOST_ONCE
type: string
slidingIntervalCount:
format: int32
type: integer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3805,6 +3805,11 @@ spec:
maxLagMs:
format: int64
type: integer
processingGuarantee:
enum:
- ATLEAST_ONCE
- ATMOST_ONCE
type: string
slidingIntervalCount:
format: int32
type: integer
Expand Down
5 changes: 5 additions & 0 deletions config/crd/bases/compute.functionmesh.io_functionmeshes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3786,6 +3786,11 @@ spec:
maxLagMs:
format: int64
type: integer
processingGuarantee:
enum:
- ATLEAST_ONCE
- ATMOST_ONCE
type: string
slidingIntervalCount:
format: int32
type: integer
Expand Down
5 changes: 5 additions & 0 deletions config/crd/bases/compute.functionmesh.io_functions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3783,6 +3783,11 @@ spec:
maxLagMs:
format: int64
type: integer
processingGuarantee:
enum:
- ATLEAST_ONCE
- ATMOST_ONCE
type: string
slidingIntervalCount:
format: int32
type: integer
Expand Down

0 comments on commit f8172da

Please sign in to comment.