| """ Python 'utf-8-sig' Codec |
| This work similar to UTF-8 with the following changes: |
| |
| * On encoding/writing a UTF-8 encoded BOM will be prepended/written as the |
| first three bytes. |
| |
| * On decoding/reading if the first three bytes are a UTF-8 encoded BOM, these |
| bytes will be skipped. |
| """ |
| import codecs |
| |
| ### Codec APIs |
| |
| def encode(input, errors='strict'): |
| return (codecs.BOM_UTF8 + codecs.utf_8_encode(input, errors)[0], len(input)) |
| |
| def decode(input, errors='strict'): |
| prefix = 0 |
| if input[:3] == codecs.BOM_UTF8: |
| input = input[3:] |
| prefix = 3 |
| (output, consumed) = codecs.utf_8_decode(input, errors, True) |
| return (output, consumed+prefix) |
| |
| class IncrementalEncoder(codecs.IncrementalEncoder): |
| def __init__(self, errors='strict'): |
| codecs.IncrementalEncoder.__init__(self, errors) |
| self.first = 1 |
| |
| def encode(self, input, final=False): |
| if self.first: |
| self.first = 0 |
| return codecs.BOM_UTF8 + codecs.utf_8_encode(input, self.errors)[0] |
| else: |
| return codecs.utf_8_encode(input, self.errors)[0] |
| |
| def reset(self): |
| codecs.IncrementalEncoder.reset(self) |
| self.first = 1 |
| |
| def getstate(self): |
| return self.first |
| |
| def setstate(self, state): |
| self.first = state |
| |
| class IncrementalDecoder(codecs.BufferedIncrementalDecoder): |
| def __init__(self, errors='strict'): |
| codecs.BufferedIncrementalDecoder.__init__(self, errors) |
| self.first = True |
| |
| def _buffer_decode(self, input, errors, final): |
| if self.first: |
| if len(input) < 3: |
| if codecs.BOM_UTF8.startswith(input): |
| # not enough data to decide if this really is a BOM |
| # => try again on the next call |
| return (u"", 0) |
| else: |
| self.first = None |
| else: |
| self.first = None |
| if input[:3] == codecs.BOM_UTF8: |
| (output, consumed) = codecs.utf_8_decode(input[3:], errors, final) |
| return (output, consumed+3) |
| return codecs.utf_8_decode(input, errors, final) |
| |
| def reset(self): |
| codecs.BufferedIncrementalDecoder.reset(self) |
| self.first = True |
| |
| class StreamWriter(codecs.StreamWriter): |
| def reset(self): |
| codecs.StreamWriter.reset(self) |
| try: |
| del self.encode |
| except AttributeError: |
| pass |
| |
| def encode(self, input, errors='strict'): |
| self.encode = codecs.utf_8_encode |
| return encode(input, errors) |
| |
| class StreamReader(codecs.StreamReader): |
| def reset(self): |
| codecs.StreamReader.reset(self) |
| try: |
| del self.decode |
| except AttributeError: |
| pass |
| |
| def decode(self, input, errors='strict'): |
| if len(input) < 3: |
| if codecs.BOM_UTF8.startswith(input): |
| # not enough data to decide if this is a BOM |
| # => try again on the next call |
| return (u"", 0) |
| elif input[:3] == codecs.BOM_UTF8: |
| self.decode = codecs.utf_8_decode |
| (output, consumed) = codecs.utf_8_decode(input[3:],errors) |
| return (output, consumed+3) |
| # (else) no BOM present |
| self.decode = codecs.utf_8_decode |
| return codecs.utf_8_decode(input, errors) |
| |
| ### encodings module API |
| |
| def getregentry(): |
| return codecs.CodecInfo( |
| name='utf-8-sig', |
| encode=encode, |
| decode=decode, |
| incrementalencoder=IncrementalEncoder, |
| incrementaldecoder=IncrementalDecoder, |
| streamreader=StreamReader, |
| streamwriter=StreamWriter, |
| ) |