From a6fc3a0980b87bdef09954eaab7a72de9b0673a6 Mon Sep 17 00:00:00 2001 From: Eric Ihli Date: Mon, 7 Aug 2023 23:55:35 -0700 Subject: [PATCH] Initial Python and Arduino code It's a promising start. --- arduino_sketch/arduino_sketch.ino | 71 +++++++++++ main.py | 192 ++++++++++++++++++++++++++++++ 2 files changed, 263 insertions(+) create mode 100644 arduino_sketch/arduino_sketch.ino create mode 100644 main.py diff --git a/arduino_sketch/arduino_sketch.ino b/arduino_sketch/arduino_sketch.ino new file mode 100644 index 0000000..034f6cf --- /dev/null +++ b/arduino_sketch/arduino_sketch.ino @@ -0,0 +1,71 @@ +int PIN_MEATBAG_IN = 2; +int PIN_MEATBAG_ACTIVATE = 3; + +// MSG 0 and 1 are reserved for the low/high signals +// sent when meatbags are transcribing. +int MSG_MEATBAG_DEACTIVATE = 2; +int MSG_MEATBAG_ACTIVATE = 3; +// Heartbeats every 0.5, 1.5, 2.5, etc... of cycle time. Heartbeats are when we tell the GUI to draw the rise/fall. +// Probes every 1, 2, 3, etc... of cycle time. Probes are when we tell the server what the bit was for that cycle. +int MSG_EVENT_HEARTBEAT = 4; +int MSG_EVENT_PROBE = 5; + +int MEATBAG_BAUD = 4; +int CYCLE_TIMESPAN = 1000 / MEATBAG_BAUD; +int EVENT_TIMESPAN = CYCLE_TIMESPAN / 2; + +int isMeatbagTranscribing = 0; +unsigned long previousTimestamp = 0; +unsigned long previousEvent = MSG_EVENT_PROBE; + +void setup() { + Serial.begin(9600); + pinMode(PIN_MEATBAG_IN, INPUT); + pinMode(PIN_MEATBAG_ACTIVATE, INPUT); +} + +// It could be difficult to sync the timing of reading the input from the switch and +// displaying the signal on a graph. How do we get this loop and the Python/Matplotlib +// loop to be as in-sync as possible? And how can we sample in such a way that isn't +// vulnerable to slight mistakes? +// +// I think the key is going to be to probe the switch in the middle of the signal +// So, this loop will be the metronome. We'll send a "beat" message over Serial every +// 250, 500, 750, 1000 milliseconds (for example). And we'll send a "value" message +// every 125, 375, 625, 875 milliseconds. +void loop() { + unsigned long currentTimestamp = millis(); + int timeElapsed = currentTimestamp - previousTimestamp; + + // Send the heartbeat every 125, 375, 625, etc... + if (timeElapsed > EVENT_TIMESPAN + && previousEvent == MSG_EVENT_PROBE) + { + previousTimestamp = currentTimestamp; + previousEvent = MSG_EVENT_HEARTBEAT; + Serial.print(MSG_EVENT_HEARTBEAT); + } else + + // Send the signal on every 250, 500, 750, etc... + if (timeElapsed > EVENT_TIMESPAN + && previousEvent == MSG_EVENT_HEARTBEAT + && isMeatbagTranscribing == 1 + && timeElapsed > EVENT_TIMESPAN) + { + previousTimestamp = currentTimestamp; + previousEvent = MSG_EVENT_PROBE; + int value = digitalRead(PIN_MEATBAG_IN); + Serial.print(value); + } + + // Meatbags have to eat and sleep. We don't want to drop the signal + // during their moments of weakness. When meat fails, metal resumes control. + int activate = digitalRead(PIN_MEATBAG_ACTIVATE); + if (isMeatbagTranscribing == 1 && activate == LOW) { + isMeatbagTranscribing = 0; + Serial.print(MSG_MEATBAG_DEACTIVATE); + } else if (isMeatbagTranscribing == 0 && activate == HIGH) { + isMeatbagTranscribing = 1; + Serial.print(MSG_MEATBAG_ACTIVATE); + } +} diff --git a/main.py b/main.py new file mode 100644 index 0000000..c340694 --- /dev/null +++ b/main.py @@ -0,0 +1,192 @@ +"""Scratchpad for Defcon's Hardware Hacking Village's Rube Goldberg Machine. + +For local testing you can use `socat` to spin up two virtual RS-232 devices. + socat -d -d pty,rawer,echo=0,link=/tmp/ttyV0,b9600 pty,rawer,echo=0,link=/tmp/ttyV1,b9600 + +The human signal will come from a switch connected to an Arduino's GPIO pin. The +Arduino will capture the signal at 4 baud and send the bit over its serial/USB +connection to the computer at 9600 baud. + +""" +import queue +import random +import threading +import time + +import matplotlib.pyplot as plt +import serial + +# This in queue will contain bytes from DCE. It's the very start of our chain. +dce_inq = queue.Queue() +# The out queue will contain bits that we can pull at a very low baud rate and +# display to the meat bag. +dce_outq = queue.Queue() + +dte_outq = queue.Queue() + + +class TransformerQueue: + """Transforms values from an input queue and puts the transformation in an output queue. + + `transform` needs to be a function that returns a list, even if you're + transforming a single value to a single value. + + This class is useful as both a buffer to throttle the incoming messages + and a way to convert bytes to 1's and 0's so that we can display those 1's + and 0's on a graph. + + If we want to terminate the manual work that we're doing of transcribing + the 1's and 0's, then we still have the backing queue of messages that + we can pass along by automated digital means, so we don't lose any + messages and thus break the Rube Goldberg machine. + """ + def __init__(self, q1, q2, transform): + self.q1 = q1 + self.q2 = q2 + self.transform = transform + + def empty(self): + return self.q2.empty() and self.q1.empty() + + def put(self, *args, **kwargs): + return self.q1.put(*args, **kwargs) + + def get(self, *args, **kwargs): + if self.empty(): + raise queue.Empty() + if self.q2.empty(): + for v in self.transform(self.q1.get(*args, **kwargs)): + self.q2.put(v) + return self.q2.get(*args, **kwargs) + + +def byte_to_bits(byte): + """Convert a byte to a list of 8 1's and 0's.""" + assert byte <= 255, "Can't send more than 8 bits in a single RS-232 message." + out = [] + for _ in range(8): + out.append(byte & 0x01) + byte >>= 1 + return reversed(out) + + +dce_transformer = TransformerQueue(dce_inq, dce_outq, byte_to_bits) + + +# This in queue will contain the bits that the meat bag signaled. +# We'll probably receive each bit by way of an Arduino talking to us over RS-232 +# where each RS-232 byte will contain a single human-entered bit. +human_signal_inq = queue.Queue() # Bits +# This out queue will be the bytes we get when we combine those bits. +# (Maybe we could just write the byte directly to serial. But I'm going with +# this for now.) +human_signal_outq = queue.Queue() # Bytes + + +# Pretend we're getting input from a DCE terminal. +# For testing purposes. +# Create a device like this with: +# socat -d -d pty,rawer,echo=0,link=/tmp/ttyV0,b9600 pty,rawer,echo=0,link=/tmp/ttyV1,b9600 +with serial.Serial("/dev/pts/6", 9600) as dce_out: + dce_out.write(b"Hello, world!") + + +def dce_inq_listen(): + """ + Read one byte at a time from DCE and put in a queue. + + The queue is a "transformer" queue that will transform + the byte values into a sequence of 0's and 1's. + """ + with serial.Serial("/dev/pts/7", 9600) as dce_in: + while True: + dce_transformer.put(ord(dce_in.read())) + + +def speak(): + """Background thread to populate the in queue.""" + t = threading.Thread(target=dce_inq_listen) + t.start() + + +def bits_to_byte(bits): + """Convert list of 8 1's and 0's to a single int value.""" + out = 0 + for bit in bits: + out = (out << 1) | bit + return out + + +def display_dce(): + """ + Display a graph of the input signal, slow enough for a human to transcribe. + + This is probably the most finicky part of the process right now. It's not a very + clean display. The framerate isn't great. It's a bit choppy. + """ + SAMPLE_RATE = 500 # In milliseconds + FRAME_RATE = 60 + FRAME_SIZE = 1000 + SLEEP = 1 / FRAME_RATE + + fig, ax = plt.subplots() + + ax.set_xlim(0, 1000, auto=False) + ax.spines["left"].set_position("center") + ax.spines["bottom"].set_position("center") + ax.spines["right"].set_color("none") + ax.spines["top"].set_color("none") + + points = [] + + prev = time.time_ns() + + plt.ion() + plt.show() + while True: + # Advance all points + for point in points: + point[0] += 1/80 * FRAME_SIZE + + # If we've passed 250ms, take a sample. + if time.time_ns() - prev > SAMPLE_RATE * 1e6: + prev += SAMPLE_RATE * 1e6 + new_sample = [0, dce_transformer.get()] + if points and new_sample[1] != points[-1][1]: + points.append([points[-1][0], new_sample[1]]) # prev point's x, new sample's y. + points.append(new_sample) + + filtered = [] + for point in points: + if point[0] < FRAME_SIZE: + filtered.append(point) + + points = filtered + + lines = ax.get_lines() + for line in lines: + line.remove() + + ax.plot(*zip(*points), color="black") + plt.draw() + plt.pause(SLEEP) + + +def sample_signal(): + """Get a signal from the human-managed switch.""" + # The signal will come from an Arduino or something. + # This fakes it until that's setup. + return [0, random.randint(0, 1)] + + +def human_signal_listen(): + """Read one byte (which represents 1 bit) at a time from human input.""" + bits = [0] * 8 + i = 0 + with serial.Serial("/dev/pts/11", 9600) as human_signal_in: + while True: + bits[i] = human_signal_in.put(human_signal_inq.read()) + i += 1 + if i == 8: + i = 0 + human_signal_outq.put()