You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

193 lines
5.8 KiB
Python

"""Scratchpad for Defcon's Hardware Hacking Village's Rube Goldberg Machine.
For local testing you can use `socat` to spin up two virtual RS-232 devices.
socat -d -d pty,rawer,echo=0,link=/tmp/ttyV0,b9600 pty,rawer,echo=0,link=/tmp/ttyV1,b9600
The human signal will come from a switch connected to an Arduino's GPIO pin. The
Arduino will capture the signal at 4 baud and send the bit over its serial/USB
connection to the computer at 9600 baud.
"""
import queue
import random
import threading
import time
import matplotlib.pyplot as plt
import serial
# This in queue will contain bytes from DCE. It's the very start of our chain.
dce_inq = queue.Queue()
# The out queue will contain bits that we can pull at a very low baud rate and
# display to the meat bag.
dce_outq = queue.Queue()
dte_outq = queue.Queue()
class TransformerQueue:
"""Transforms values from an input queue and puts the transformation in an output queue.
`transform` needs to be a function that returns a list, even if you're
transforming a single value to a single value.
This class is useful as both a buffer to throttle the incoming messages
and a way to convert bytes to 1's and 0's so that we can display those 1's
and 0's on a graph.
If we want to terminate the manual work that we're doing of transcribing
the 1's and 0's, then we still have the backing queue of messages that
we can pass along by automated digital means, so we don't lose any
messages and thus break the Rube Goldberg machine.
"""
def __init__(self, q1, q2, transform):
self.q1 = q1
self.q2 = q2
self.transform = transform
def empty(self):
return self.q2.empty() and self.q1.empty()
def put(self, *args, **kwargs):
return self.q1.put(*args, **kwargs)
def get(self, *args, **kwargs):
if self.empty():
raise queue.Empty()
if self.q2.empty():
for v in self.transform(self.q1.get(*args, **kwargs)):
self.q2.put(v)
return self.q2.get(*args, **kwargs)
def byte_to_bits(byte):
"""Convert a byte to a list of 8 1's and 0's."""
assert byte <= 255, "Can't send more than 8 bits in a single RS-232 message."
out = []
for _ in range(8):
out.append(byte & 0x01)
byte >>= 1
return reversed(out)
dce_transformer = TransformerQueue(dce_inq, dce_outq, byte_to_bits)
# This in queue will contain the bits that the meat bag signaled.
# We'll probably receive each bit by way of an Arduino talking to us over RS-232
# where each RS-232 byte will contain a single human-entered bit.
human_signal_inq = queue.Queue() # Bits
# This out queue will be the bytes we get when we combine those bits.
# (Maybe we could just write the byte directly to serial. But I'm going with
# this for now.)
human_signal_outq = queue.Queue() # Bytes
# Pretend we're getting input from a DCE terminal.
# For testing purposes.
# Create a device like this with:
# socat -d -d pty,rawer,echo=0,link=/tmp/ttyV0,b9600 pty,rawer,echo=0,link=/tmp/ttyV1,b9600
with serial.Serial("/dev/pts/6", 9600) as dce_out:
dce_out.write(b"Hello, world!")
def dce_inq_listen():
"""
Read one byte at a time from DCE and put in a queue.
The queue is a "transformer" queue that will transform
the byte values into a sequence of 0's and 1's.
"""
with serial.Serial("/dev/pts/7", 9600) as dce_in:
while True:
dce_transformer.put(ord(dce_in.read()))
def speak():
"""Background thread to populate the in queue."""
t = threading.Thread(target=dce_inq_listen)
t.start()
def bits_to_byte(bits):
"""Convert list of 8 1's and 0's to a single int value."""
out = 0
for bit in bits:
out = (out << 1) | bit
return out
def display_dce():
"""
Display a graph of the input signal, slow enough for a human to transcribe.
This is probably the most finicky part of the process right now. It's not a very
clean display. The framerate isn't great. It's a bit choppy.
"""
SAMPLE_RATE = 500 # In milliseconds
FRAME_RATE = 60
FRAME_SIZE = 1000
SLEEP = 1 / FRAME_RATE
fig, ax = plt.subplots()
ax.set_xlim(0, 1000, auto=False)
ax.spines["left"].set_position("center")
ax.spines["bottom"].set_position("center")
ax.spines["right"].set_color("none")
ax.spines["top"].set_color("none")
points = []
prev = time.time_ns()
plt.ion()
plt.show()
while True:
# Advance all points
for point in points:
point[0] += 1/80 * FRAME_SIZE
# If we've passed 250ms, take a sample.
if time.time_ns() - prev > SAMPLE_RATE * 1e6:
prev += SAMPLE_RATE * 1e6
new_sample = [0, dce_transformer.get()]
if points and new_sample[1] != points[-1][1]:
points.append([points[-1][0], new_sample[1]]) # prev point's x, new sample's y.
points.append(new_sample)
filtered = []
for point in points:
if point[0] < FRAME_SIZE:
filtered.append(point)
points = filtered
lines = ax.get_lines()
for line in lines:
line.remove()
ax.plot(*zip(*points), color="black")
plt.draw()
plt.pause(SLEEP)
def sample_signal():
"""Get a signal from the human-managed switch."""
# The signal will come from an Arduino or something.
# This fakes it until that's setup.
return [0, random.randint(0, 1)]
def human_signal_listen():
"""Read one byte (which represents 1 bit) at a time from human input."""
bits = [0] * 8
i = 0
with serial.Serial("/dev/pts/11", 9600) as human_signal_in:
while True:
bits[i] = human_signal_in.put(human_signal_inq.read())
i += 1
if i == 8:
i = 0
human_signal_outq.put()