aboutsummaryrefslogtreecommitdiff
path: root/decode/packet.py
diff options
context:
space:
mode:
authorDan Williams <dcbw@redhat.com>2012-01-21 12:55:16 -0600
committerDan Williams <dcbw@redhat.com>2012-01-21 12:55:16 -0600
commitc37fdf5f94711ffbdf966f2ab366bbd047dfc147 (patch)
tree6fdac73a3c57f1415dddb7002d018e7f27a06e25 /decode/packet.py
parenta9d6b5a8b62fc0fc07d7c3fab99b25c67474c68d (diff)
decode: updates all around
Rewrite packet handling so packets can span multiple USB URBs (which sometimes happens with WMC) and also add a bunch more WMC decoding stuff.
Diffstat (limited to 'decode/packet.py')
-rw-r--r--decode/packet.py130
1 files changed, 77 insertions, 53 deletions
diff --git a/decode/packet.py b/decode/packet.py
index a139021a..bc702faa 100644
--- a/decode/packet.py
+++ b/decode/packet.py
@@ -53,8 +53,9 @@ def get_urb_info(l):
if idx >= 0:
direction = defs.TO_HOST
else:
- raise Exception("Invalid packet start line")
+ return (defs.TO_UNKNOWN, -1)
+ # Yay, valid packet, grab URB number
numstr = ""
for c in l[idx + 9:]:
if c.isdigit():
@@ -68,66 +69,90 @@ def get_urb_info(l):
return (direction, int(numstr))
class Packet:
- def __init__(self, lines, control_prot, transfer_prot):
+ def __init__(self, line, control_prot, transfer_prot):
self.direction = defs.TO_UNKNOWN
self.func = URBF_UNKNOWN
+ self.control_prot = control_prot
+ self.transfer_prot = transfer_prot
self.extra = []
self.data = None
self.urbnum = 0
self.protocol = None
self.has_data = False
self.typecode = None
-
- # Parse the packet
-
- (self.direction, self.urbnum) = get_urb_info(lines[0])
-
- try:
- (self.func, self.has_data, self.typecode) = funcs[lines[1].strip()]
- except KeyError:
- raise KeyError("URB function %s not handled" % lines[1].strip())
-
- if self.func == URBF_TRANSFER:
- self.protocol = transfer_prot
- elif self.func == URBF_CONTROL:
- self.protocol = control_prot
-
- # Parse transfer buffer data
- in_data = False
- data = ""
- for i in range(2, len(lines)):
- l = lines[i].strip()
- if self.has_data:
- if l.startswith("TransferBufferMDL"):
- if in_data == True:
- raise Exception("Already in data")
- in_data = True
- elif l.startswith("UrbLink"):
- in_data = False
- elif in_data and len(l) and not "no data supplied" in l:
- d = l[l.index(": ") + 2:] # get data alone
- data += d.replace(" ", "")
+ self.lines = []
+ self.in_data = False
+ self.data_complete = False
+ self.tmpdata = ""
+ self.fcomplete = None
+ self.funpack = None
+ self.fshow = None
+
+ # Check if this is actually a packet
+ self.lines.append(line)
+ (self.direction, self.urbnum) = get_urb_info(line)
+
+ def add_line(self, line):
+ line = line.strip()
+ if not len(line):
+ return
+ self.lines.append(line)
+
+ if line[0] == '[':
+ # Usually the end of a packet, but if we need data from the next
+ # packet keep going
+ if self.has_data and not self.data_complete:
+ return False
+ return True
+
+ if not self.typecode:
+ # haven't gotten our "-- URB_FUNCTION_xxxx" line yet
+ if line.find("-- URB_FUNCTION_") >= 0:
+ try:
+ (self.func, self.has_data, self.typecode) = funcs[line]
+ except KeyError:
+ raise KeyError("URB function %s not handled" % line)
+
+ if self.func == URBF_TRANSFER:
+ self.protocol = self.transfer_prot
+ elif self.func == URBF_CONTROL:
+ self.protocol = self.control_prot
+
+ if self.protocol:
+ exec "from %s import get_funcs" % self.protocol
+ (self.fcomplete, self.funpack, self.fshow) = get_funcs()
else:
- self.extra.append(l)
-
- if len(data) > 0:
- self.parse_data(data)
-
- def get_funcs(self):
- if self.protocol:
- exec "from %s import get_funcs" % self.protocol
- return get_funcs()
- return (None, None)
+ return False # not done; need more lines
+
+ if line.find("TransferBufferMDL = ") >= 0 and self.has_data:
+ self.in_data = True
+ return False # not done; need more lines
+
+ if line.find("UrbLink = ") >= 0:
+ if self.in_data:
+ self.in_data = False
+
+ # special case: zero-length data means complete
+ if len(self.tmpdata) == 0:
+ self.data_complete = True
+ return True
+
+ if self.fcomplete:
+ self.data_complete = self.fcomplete(self.tmpdata, self.direction)
+ if self.data_complete:
+ self.data = self.funpack(self.tmpdata, self.direction)
+ return self.data_complete
+ else:
+ self.data = binascii.unhexlify(self.tmpdata)
+ self.data_complete = True
+ return False # not done; need more lines
- def parse_data(self, data):
- if not self.has_data:
- raise Exception("Data only valid for URBF_TRANSFER or URBF_CONTROL")
+ if self.in_data:
+ if len(line) and not "no data supplied" in line:
+ d = line[line.index(": ") + 2:] # get data alone
+ self.tmpdata += d.replace(" ", "")
- (unpack, show) = self.get_funcs()
- if unpack:
- self.data = unpack(data, self.direction)
- else:
- self.data = binascii.unhexlify(data)
+ return False # not done; need more lines
def add_ascii(self, line, items):
if len(line) < 53:
@@ -188,7 +213,6 @@ class Packet:
print prefix + output
print ""
- (unpack, show) = self.get_funcs()
- if show:
- show(self.data, " " * 8, self.direction)
+ if self.fshow:
+ self.fshow(self.data, " " * 8, self.direction)