Source code for bulkdata.parse
from .error import EmptyLineError
[docs]class BDFParser:
BEGINBULK = "BEGIN BULK"
ENDDATA = "ENDDATA"
MAXLINELENGTH = 80
FIELDWIDTH = 8
FIELDSPERLINE = 10
FIELDSPERBODY = 8
def __init__(self, bdf_str):
# bdf_str = self.expand_tabs(bdf_str)
self.bds = self.ignore_enddata(bdf_str)
self.lines = self.bds.split("\n")
self.remove_comments()
self.line_idx = 0
self.cards = []
[docs] def current_line(self):
return self.lines[self.line_idx]
[docs] def increment_line(self, i=1):
self.line_idx += i
return self.current_line()
[docs] def ignore_enddata(self, bdf_str):
try:
enddata_i = bdf_str.index(self.ENDDATA)
except ValueError:
return bdf_str
else:
return bdf_str[:enddata_i]
[docs] def is_line_free(self, line):
return "," in line
[docs] def parse_fields(self, fields):
fieldsperline = self.FIELDSPERLINE
fieldsperbody = self.FIELDSPERBODY
numfields = len(fields)
head, body, tail = None, [], None
if numfields == 0:
raise EmptyLineError()
if numfields > 0:
head = fields[0]
if numfields > 1:
body = fields[1:]
if numfields == fieldsperline:
tail = body.pop(-1)
# missing fields are blank fields
nummissing = fieldsperbody - len(body)
if nummissing > 0:
body.extend(["" for _ in range(nummissing)])
return head, body, tail
[docs] def parse_line_free(self, line):
fields = line.rstrip(",").split(",")
return self.parse_fields(fields)
[docs] def parse_line_fixed(self, line):
fieldwidth = self.FIELDWIDTH
length = len(line)
fields = [line[i:i+fieldwidth]
for i in range(0, length, fieldwidth)]
return self.parse_fields(fields)
[docs] def parse_line(self, line):
if self.is_line_free(line):
return self.parse_line_free(line)
else:
return self.parse_line_fixed(line)
[docs] def parse_card(self):
line = self.current_line()
name, fields, tail = self.parse_line(line)
while True:
try:
next_line = self.increment_line()
except IndexError:
break
next_head, next_fields, next_tail = self.parse_line(next_line)
if not tail:
next_head = next_head.strip()
if next_head and not ("+" in next_head):
break
tail = next_tail
fields.extend(next_fields)
# pop trailing blank fields
for i in reversed(range(len(fields))):
if not fields[i].strip():
del fields[i]
else:
break
return name, fields
[docs] def endofbdf(self):
return self.line_idx == len(self.lines)
[docs] def parse(self):
header = self.parse_header()
cards = []
while not self.endofbdf():
card = self.parse_card()
cards.append(card)
return header, cards