-
Notifications
You must be signed in to change notification settings - Fork 807
Batchable telemetry #1272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Batchable telemetry #1272
Conversation
IBatchableTelem adds two methods to the ITelem interface. These methods allow a telemetry object to mange batches of telemetry entries, rather than just one.
Adds an implementation as a mixin of the two methods specified by IBatchableTelem.
This allows encryption attempt telmetries to be batched into one telemetry object so they can be sent to the island in batches.
The term "wrapper" is sometimes used as synonym for the decorator pattern, whereas this class is a textbook adapter. Use the term "adapter" instead of "wrapper" and rename "TelemetryMessengerWrapper" to "LegacyTelemetryMessengerAdapter", as this class servers as an adapter between the new ITelemetryMessenger interface and the (soon to be) legacy way of sending telemetry.
This telemetry messenger is a decorator that aggregates batchable telemetries and sends them to the island periodically.
We don't want the ransomware payload to encrypt all files and then send telemetry to the island. This could lead to a long period of time where the user has no insight into what the monkey is doing on a node. We also don't want to flood the island with telemetries. By using the BatchingTelemetryMessenger, ransomware encryption telemetries are batched together and periodically sent to the island.
My original plan was to start a thread in __init__() and stop the thread when __del__() was called. Since the running thread (object) contains a reference to the BatchingTelemetryMessenger object that launched it, the destructor will not be called until the thread is stopped. Therefore, a stop() was added to allow the BatchingTelemetryMessenger to be stopped. Since it has an explicit stop, it should also have an explicit start, rather than starting the thread in the constructor.
My original plan was to start a thread in __init__() and stop the thread when __del__() was called. Since the running thread (object) contains a reference to the BatchingTelemetryMessenger object that launched it, the destructor will not be called until the thread is stopped. This resulted in adding a stop() method (fadd978) followed by adding a start() method (1d066c8). By using an inner class to run the thread, we enable the class to be used as originally intended, reducing the burden on the user of this class. The thread is now started on construction and stopped on destruction. The user can remain blissfully unaware that anything resembling threading is going in, and can use the BatchingTelemetryMessenger just like any other ITelemetryMessenger.
Codecov Report
@@ Coverage Diff @@
## develop #1272 +/- ##
===========================================
+ Coverage 30.13% 30.65% +0.51%
===========================================
Files 444 449 +5
Lines 13323 13457 +134
===========================================
+ Hits 4015 4125 +110
- Misses 9308 9332 +24
Continue to review full report at Codecov.
|
WAKES_PER_PERIOD = 4 | ||
|
||
|
||
class BatchingTelemetryMessenger(ITelemetryMessenger): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the responsibility of this class is not to send the telemetries but to batch them. I wonder if we should make a distinction in the name, because a decorator now has exatcly the same naming convention as a messanger even though the responsibility and creation of the object is different
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could name it BatchingTelemetryMessengerDecorator
.
WAKES_PER_PERIOD = 4 | ||
|
||
|
||
class BatchingTelemetryMessenger(ITelemetryMessenger): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of implementing the ITelemetryMessenger interface? We already receive an ITelemetryMessenger
object, so this class will know it's interface. Can't we do something like telemetry_messenger.send_telemetry = self.add_telemetry_to_queue
in the constructor? This would also solve the issue that send_telemetry
method is not sending anything, in fact, it's just adding the telemetry to a queue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of implementing the ITelemetryMessenger interface?
In implementing ITelemetryMessenger, we're stating explicitly that this object must conform to that interface.
We already receive an ITelemetryMessenger object, so this class will know it's interface.
Just because this class "has-a" ITelemetryMessenger doesn't mean it implements ITelemetryMessenger.
Can't we do something like telemetry_messenger.send_telemetry = self.add_telemetry_to_queue in the constructor?
I think this would make the code harder to read. It implements ITelemetryMessenger, but where is its send_telemetry()
method? I have to read the constructor to find out how it implements ITelemetryMessenger.
This would also solve the issue that send_telemetry method is not sending anything, in fact, it's just adding the telemetry to a queue.
I'm not sure I see what problem needs to be solved. For anyone who uses the class, all they care about is that telemetry is sent. The purpose of ITelemetryMessenger is that they don't need to care how it is sent.
self._telemetry_messenger = telemetry_messenger | ||
self._period = period | ||
|
||
self._should_run_batch_thread = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this variable required? It's only used in _manage_telemetry_batches
, and if False
only if thread shoud stop. But if the thread should stop, it's joined and _manage_telemetry_batches
no longer runs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Joining a thread doesn't stop it. It waits for the thread to stop before execution continues. A mechanism is required to signal the thread to stop. In this case, it's the state of self._should_run_batch_thread
.
|
||
def start(self): | ||
self._should_run_batch_thread = True | ||
self._manage_telemetry_batches_thread = threading.Thread( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the thread not defined in init?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Threads can only be started once, so it needs to be initialized on start. I've added self._manage_telemetry_batches_thread = None
to '__init__()'
def _manage_telemetry_batches(self): | ||
self._reset() | ||
|
||
while self._should_run_batch_thread or not self._queue.empty(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of self._should_run_batch_thread
and or not self._queue.empty()
seems to be sending the last batch of telemetries before stopping the tread so that none telemetries go missing. That's not readable. Is it possible to explicitly send all telemetries in the queue before stopping the thread?
|
||
while self._should_run_batch_thread or not self._queue.empty(): | ||
try: | ||
telemetry = self._queue.get(block=True, timeout=self._period / WAKES_PER_PERIOD) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should add a comment explaining why we chose these parameters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't the names of the parameters explain what they do?
try: | ||
telemetry = self._queue.get(block=True, timeout=self._period / WAKES_PER_PERIOD) | ||
|
||
if isinstance(telemetry, IBatchableTelem): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if this is the last loop and thread pauses here. Another thread in the meanwhile adds telems to the queue, which will never get sent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment explains it: #1272 (comment)
The loop condition ensures the loop will only exit if the queue is empty. Regardless, I've refactored the code to more clearly state its intent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's discuss my comments via zoom.
We need to ensure when a BatchingTelemetryMessenger stops, all remaining telemetries in its queue are sent. The existing logic does this, but this commit improves the readability and intent of the code, as well as adds a test for this condition.
What does this PR do?
Allows ransomware telemetry to be sent in batches to minimize network traffic but also provide the user with periodic feedback.
This PR makes some compromises. Issue #1268 was opened to document the reasons for some of this PR's shortcomings and suggest a path forward.
PR Checklist
Was the CHANGELOG.md updated to reflect the changes?Was the documentation framework updated to reflect the changes?Testing Checklist
Screenshots