Skip to main content

dedupe

Deduplicates messages by storing a key value in a cache using the add operator. If the message already exists within the cache it is dropped.

# Config fields, showing default values
label: ""
dedupe:
cache: ""
key: ""
drop_on_err: true

Caches must be configured as resources, for more information check out the cache documentation here.

When using this processor with an output target that might fail you should always wrap the output within an indefinite retry block. This ensures that during outages your messages aren't reprocessed after failures, which would result in messages being dropped.

Delivery Guarantees​

Performing deduplication on a stream using a distributed cache voids any at-least-once guarantees that it previously had. This is because the cache will preserve message signatures even if the message fails to leave the Benthos pipeline, which would cause message loss in the event of an outage at the output sink followed by a restart of the Benthos instance (or a server crash, etc).

This problem can be mitigated by using an in-memory cache and distributing messages to horizontally scaled Benthos pipelines partitioned by the deduplication key. However, in situations where at-least-once delivery guarantees are important it is worth avoiding deduplication in favour of implement idempotent behaviour at the edge of your stream pipelines.

Fields​

cache​

The cache resource to target with this processor.

Type: string
Default: ""

key​

An interpolated string yielding the key to deduplicate by for each message. This field supports interpolation functions.

Type: string
Default: ""

# Examples
key: ${! meta("kafka_key") }
key: ${! content().hash("xxhash64") }

drop_on_err​

Whether messages should be dropped when the cache returns a general error such as a network issue.

Type: bool
Default: true

Examples​

The following configuration demonstrates a pipeline that deduplicates messages based on the Kafka key.

pipeline:
processors:
- dedupe:
cache: keycache
key: ${! meta("kafka_key") }
cache_resources:
- label: keycache
memory:
default_ttl: 60s