I don't want to ever see the phrase "Exactly Once" without several asterisks behind it. It might be exactly once from an "overall" point of view, but the client effectively needs infinitely durable infinite memory to perform the "distributed transaction" of acting on the message and responding to the server.<p>Imagine:<p>- Server delivers message M<p>- Client process event E entailed by message M<p>- Client tries to ack (A) message on server, but "packet loss"<p>- To make matters worse, let's say the client also immediately dies after this<p>How do you handle this situation?
The client must transactionally/simultaneously commit both E and A/intent-to-A. Since the server never received an acknowledgment of M, it will either redeliver the message, in which case some record of E must be kept to deduplicate on, or it will wait for client to resend A, or some mixture of both. Note: if you say "just make E idempotent", then you don't need exactly-once delivery in the first place...<p>I suppose you could go back to some kind of lock-step processing of messages to avoid needing to record all (E,A) that are in flight, but that would obviously kill throughput of the message queue.<p>Exactly Once can only ever be At Least Once with some out-of-the-box idempotency that may not be as cheap as the natural idempotency of your system.<p>EDIT: Recommended reading: "Life Beyond Distributed Transactions", Pat Helland - <a href="http://queue.acm.org/detail.cfm?id=3025012" rel="nofollow">http://queue.acm.org/detail.cfm?id=3025012</a>
Here's a radical solution. Instead of becoming a scala pro akka stream 200k engineer with a cluster of kafka nodes that costs your company over $100,000 of engineering time, technical debt, opportunity cost, and server costs, just put it all in bigtable, with deduping by id....<p>Enough of resume-driven-engineering, why does every need to reinvent the wheel?
In terms of connectivity, we deal with a similar problem here at CloudWalk to process payment transactions from POS terminals, where most of them rely on GPRS connections.<p>Our network issues are nearly 6 times higher (~3.5%) due to GPRS, and we solved the duplication problem with an approach involving both client and server side.<p>Clients would always ensure that all the information sent by the server was successfully received. If something goes wrong, instead of retrying (sending the payment again), the client sends just the transaction UUID to the server, and the server might either respond with: A. the corresponding response for the transaction or B. not found.<p>In the scenario A, the POS terminal managed to properly send all the information to the server but failed to receive the response.<p>In the scenario B, the POS terminal didn't even manage to properly send the information to the server, so the POS can safely retry.
So, a combination of a best effort "at least once" messaging with deduplication near the receiving edge. Fairly standard, honestly.<p>There is still a potential for problems in the message delivery to the endpoints (malformed messages, Kafka errors, messages not being consumed fast enough and lost), or duplication at that level (restart a listener on the Kafka stream with the wrong message ID) as well.<p>This is based on my own pains with Kinesis and Lambda (which, I know, isn't Kafka).<p>In my experience, better to just allow raw "at least once" messaging and perform idempotant actions based off the messages. It's not always possible (and harder when it is possible), but its tradeoffs mean you're less likely to lose messages.
"The single requirement of all data pipelines is that they cannot lose data."<p>Unless the business value of data is derived after applying some summary statistics, than even sampling the data works, and you can lose events in an event stream, while not changing the insight gained. Originally Kafka was designed to be a high throughput data bus for analytical pipeline where losing messages was ok. More recently they are experimenting with exactly once delivery.
Having built something similar with RabbitMQ in a high-volume industry, there are a lot of benefits people in this thread seem to be glossing over and are instead debating semantics. Yes, this is not "exactly once" -- there really is no such thing in a distributed system. The best you can hope for is that your edge consumers are idempotent.<p>There is a lot of value derived from de-duping near ingress of a heavy stream such as this. You're saving downstream consumers time (money) and potential headaches. You may be in an industry where duplicates <i>can</i> be handled by a legacy system, but it takes 5-10 minutes of manual checks and corrections by support staff. That was my exact use case and I can't count the number of times we were thankful our de-duping handled "most" cases.
"Exactly Once<i>"<p></i> Over a window of time that changes depending on the amount of ingested events.<p>Basically, they read from a kafka stream and have a deduplication layer in rocks db that produces to another kafka stream. They process about 2.2 billion events through it per day.<p>While this will reduce duplicates and get closer to Exactly Once (helping reduce the two generals problem on incoming requests and potentially work inside their data center), they still have to face the same problem again when they push data out to their partners. Some packet loss, and they will be sending out duplicate to the partner.<p>Not to downplay what they have done as we are doing a similar thing near our exit nodes to do our best to prevent duplicate events making it out of our system.
To be fair, they are upfront in the beginning about not being able to adhere to an exactly-once model.<p>"In the past three months we’ve built an entirely new de-duplication system to get as close as possible to exactly-once delivery"<p>What's annoying is that they do not get precise and formal about what they want out of their new model. Also, their numbers only speak to performance, not correctness.<p>On the plus side, I think it's awesome to see bloom filters successfully used in production. That sort of thing is easy to implement, but not easy to get right for every use case.
So there's a lot of talk on here about the Two Generals Problem, so I thought I'd chime in with some misconceptions about how the Two Generals Problem relates to Exactly Once Messaging (EOM). WARNING: I'm going mostly on memory with this, I could be completely wrong.<p>EOM is NOT strictly speaking equivalent to the Two Generals Problem, or Distributed Consensus, in an unreliable network. In distributed consensus, at some given point in time, A has to know X, A has to know B knows X, A has to know B knows A knows X, ... It has to do with the fact that the message broker is in some sense the arbitrator of truth, so the consumer(s) don't need full consensus. In an unreliable network, you can have EOM. <a href="http://ilpubs.stanford.edu:8090/483/1/2000-7.pdf" rel="nofollow">http://ilpubs.stanford.edu:8090/483/1/2000-7.pdf</a> gives some examples of how that works.<p>HOWEVER, you can't have EOM when the consumers can fail. If a consumer fails there's no way, in general, to tell if the last message it was working on was completed.<p>There are a couple of edge cases where you can still have EOM. For instance, a system where you have a message broker A, and a bunch of consumers that read messages x from that queue, compute f(x), and insert f(x) onto message broker B, where f(x) may be computed multiple times for the same x (i.e. if f is a pure function or you don't care about the side effects). This system can implement EOM in the presence of an unreliable network and consumer failures (I think it can handle one or both of the message brokers failing too, not 100% sure) in the sense that x will never be in broker A at the same time as f(x) is in broker B, f(x) will never be in broker B more than once for the same x, and any y in B had some x that was in A such that y = f(x).
Was thinking a 'reverse bloom filter' could be cool to possibly avoid the RocksDB for situations like this- turns out it already exists:
<a href="https://github.com/jmhodges/opposite_of_a_bloom_filter" rel="nofollow">https://github.com/jmhodges/opposite_of_a_bloom_filter</a><p>I love it when that happens.
Sounds very cool. A couple of questions I had:<p>1) What happens if they lose their rocksdb with all of the messageIds?<p>2) Is their kafka atleast-once delivery? How do they guarantee that kafka doesn't reject their write? Also, assuming they have set up their kafka for at least once delivery, doesn't that make the output topic susceptible to duplicates due to retries, etc?<p>3) >Instead of searching a central database for whether we’ve seen a key amongst hundreds of billions of messages, we’re able to narrow our search space by orders of magnitude simply by routing to the right partition.<p>Is "orders of magnitude" really correct? Aren't you really just narrowing the search space by the number of partitions in kafka? I suppose if you have a hundred partitions, that would be 2 orders of magnitude, but it makes it sound like it's much more than that.
I wonder how they partition by "messageID" they use to ensure that the de-duplication happens on the same worker. I would imagine that this affects their ability to add more brokers in the future.<p>Perhaps they expect a 1:1 mapping of RocksDB, partition, and de-duplication worker.
tl;dr: Clickbait headline. Exactly-once delivery not even close to implemented. Typical de-duping, as you've seen and read about hundreds of times already, is what they did.
"Almost Exactly Once" doesn't have quite the same ring to it, but it is actually accurate. We've already discovered better trade-offs haven't we?
If the OP doesn't mind expanding a little on this bit, I'd be grateful.<p>> If the dedupe worker crashes for any reason or encounters an error from Kafka, when it re-starts it will first consult the “source of truth” for whether an event was published: the output topic.<p>Does this mean that "on worker crash" the worker replays the entire output topic and compare it to the rocksdb dataset?<p>Also, how do you handle scaling up or down the number of workers/partitions?
It's funny, at my company we implemented deduplication almost exactly the same way for our push notification sender.<p>The scale is smaller (about 10k rpm), but the basic idea is the same (store a message ID in a key-value store after each successful send).<p>I like the idea of invalidating records by overall size, we hadn't thought of that. We just use a fixed 24-hour TTL.
Would something like AWS SQS not scale for something like this? We currently push about 25k daily transactions over SQS, obviously no where near the scale of this, just wondering about what limitations we will bump into potentially.
It's worth noting that the next major Kafka release (0.11, out soon) will include exactly once semantics! With basically no configuration and no code changes for the user. Perhaps even more noteworthy is this feature is built on top of a new transactions feature [0]. With this release, you'll be able to atomically write to multiple topics.<p>[0] <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging" rel="nofollow">https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+E...</a>
Isn't the new feature of Kafka about this?<p><a href="https://issues.apache.org/jira/browse/KAFKA-4815" rel="nofollow">https://issues.apache.org/jira/browse/KAFKA-4815</a>
> [I]t’s pretty much impossible to have messages only ever be delivered once.<p>IIRC, it's provably impossible in a distributed system where processes might fail, i.e. all real systems.
Relevant to this topic: Description of exactly-once implementation in Google Cloud Dataflow + what "exactly once" means in context of streaming:<p><a href="https://cloud.google.com/blog/big-data/2017/05/after-lambda-exactly-once-processing-in-google-cloud-dataflow-part-1" rel="nofollow">https://cloud.google.com/blog/big-data/2017/05/after-lambda-...</a><p>(Google Cloud emp speaking)
Qubit's strategy to do this via streaming, leveraging Google Cloud Dataflow:<p><a href="https://cloud.google.com/blog/big-data/2017/06/how-qubit-deduplicates-streaming-data-at-scale-with-google-cloud-platform" rel="nofollow">https://cloud.google.com/blog/big-data/2017/06/how-qubit-ded...</a>
What is so exciting about this? There is still possibility of duplicates. You still have to put the engineering effort to deal with duplicates end-to-end. If the code is there to deal with duplicates end-to-end, then does it really matter to have 5 duplicates or 35? Or may be they just did it to add some useful cool-tech in to CV?
Another possible approach:
<a href="https://cloud.google.com/blog/big-data/2017/06/how-qubit-deduplicates-streaming-data-at-scale-with-google-cloud-platform" rel="nofollow">https://cloud.google.com/blog/big-data/2017/06/how-qubit-ded...</a>
Why do I get the feeling this is repeating TCP features at the Message level? There must a protocol that can hide this exactly once need away. TCP doesn't create downloads, generally, that are bad and fail their checksum test, hence packets that make up the file are not duplicated.
This is interesting work. But I think I'll continue relying on at least once and idempotency. Exactly once is impossible anyway.<p>> In Python (aka pseudo-pseudocode)<p>This annoyed probably more than it should have.
This isn't the solution I would architect. It is much easier to de-duplicate when processing your analytics workload later and you don't need to do so much work.
Awesome story.
What I would like to hear more about, is the people side. The teams and personalities involved with coming up with this new system and the transition.
"Exactly once" model of message is theoretically impossible to do in distributed environment with nonzero possibility of failure. If you haven't received acknowledgement from the other side of communication in the specified amount of time you can only do one of two things:<p>1) do nothing, risking message loss<p>2) retransmit, risking duplication<p>But of course that's only from messaging system point of view. Deduplication at receiver end can help reduce problem, but itself can fail (there is no foolproof way of implementing that pseudocode's "has_seen(message.id)" method)