|
| 1 | +import io |
| 2 | +from urllib.parse import urlencode |
| 3 | + |
| 4 | +from multidict import MultiDict, MultiDictProxy |
| 5 | + |
| 6 | +from . import hdrs, multipart, payload |
| 7 | +from .helpers import guess_filename |
| 8 | + |
| 9 | +__all__ = ('FormData',) |
| 10 | + |
| 11 | + |
| 12 | +class FormData: |
| 13 | + """Helper class for multipart/form-data and |
| 14 | + application/x-www-form-urlencoded body generation.""" |
| 15 | + |
| 16 | + def __init__(self, fields=(), quote_fields=True): |
| 17 | + self._writer = multipart.MultipartWriter('form-data') |
| 18 | + self._fields = [] |
| 19 | + self._is_multipart = False |
| 20 | + self._quote_fields = quote_fields |
| 21 | + |
| 22 | + if isinstance(fields, dict): |
| 23 | + fields = list(fields.items()) |
| 24 | + elif not isinstance(fields, (list, tuple)): |
| 25 | + fields = (fields,) |
| 26 | + self.add_fields(*fields) |
| 27 | + |
| 28 | + def add_field(self, name, value, *, content_type=None, filename=None, |
| 29 | + content_transfer_encoding=None): |
| 30 | + |
| 31 | + if isinstance(value, io.IOBase): |
| 32 | + self._is_multipart = True |
| 33 | + elif isinstance(value, (bytes, bytearray, memoryview)): |
| 34 | + if filename is None and content_transfer_encoding is None: |
| 35 | + filename = name |
| 36 | + |
| 37 | + type_options = MultiDict({'name': name}) |
| 38 | + if filename is not None and not isinstance(filename, str): |
| 39 | + raise TypeError('filename must be an instance of str. ' |
| 40 | + 'Got: %s' % filename) |
| 41 | + if filename is None and isinstance(value, io.IOBase): |
| 42 | + filename = guess_filename(value, name) |
| 43 | + if filename is not None: |
| 44 | + type_options['filename'] = filename |
| 45 | + self._is_multipart = True |
| 46 | + |
| 47 | + headers = {} |
| 48 | + if content_type is not None: |
| 49 | + if not isinstance(content_type, str): |
| 50 | + raise TypeError('content_type must be an instance of str. ' |
| 51 | + 'Got: %s' % content_type) |
| 52 | + headers[hdrs.CONTENT_TYPE] = content_type |
| 53 | + self._is_multipart = True |
| 54 | + if content_transfer_encoding is not None: |
| 55 | + if not isinstance(content_transfer_encoding, str): |
| 56 | + raise TypeError('content_transfer_encoding must be an instance' |
| 57 | + ' of str. Got: %s' % content_transfer_encoding) |
| 58 | + headers[hdrs.CONTENT_TRANSFER_ENCODING] = content_transfer_encoding |
| 59 | + self._is_multipart = True |
| 60 | + |
| 61 | + self._fields.append((type_options, headers, value)) |
| 62 | + |
| 63 | + def add_fields(self, *fields): |
| 64 | + to_add = list(fields) |
| 65 | + |
| 66 | + while to_add: |
| 67 | + rec = to_add.pop(0) |
| 68 | + |
| 69 | + if isinstance(rec, io.IOBase): |
| 70 | + k = guess_filename(rec, 'unknown') |
| 71 | + self.add_field(k, rec) |
| 72 | + |
| 73 | + elif isinstance(rec, (MultiDictProxy, MultiDict)): |
| 74 | + to_add.extend(rec.items()) |
| 75 | + |
| 76 | + elif isinstance(rec, (list, tuple)) and len(rec) == 2: |
| 77 | + k, fp = rec |
| 78 | + self.add_field(k, fp) |
| 79 | + |
| 80 | + else: |
| 81 | + raise TypeError('Only io.IOBase, multidict and (name, file) ' |
| 82 | + 'pairs allowed, use .add_field() for passing ' |
| 83 | + 'more complex parameters, got {!r}' |
| 84 | + .format(rec)) |
| 85 | + |
| 86 | + def _gen_form_urlencoded(self, encoding): |
| 87 | + # form data (x-www-form-urlencoded) |
| 88 | + data = [] |
| 89 | + for type_options, _, value in self._fields: |
| 90 | + data.append((type_options['name'], value)) |
| 91 | + |
| 92 | + return payload.BytesPayload( |
| 93 | + urlencode(data, doseq=True).encode(encoding), |
| 94 | + content_type='application/x-www-form-urlencoded') |
| 95 | + |
| 96 | + def _gen_form_data(self, encoding): |
| 97 | + """Encode a list of fields using the multipart/form-data MIME format""" |
| 98 | + for dispparams, headers, value in self._fields: |
| 99 | + if hdrs.CONTENT_TYPE in headers: |
| 100 | + part = payload.get_payload( |
| 101 | + value, content_type=headers[hdrs.CONTENT_TYPE], |
| 102 | + headers=headers, encoding=encoding) |
| 103 | + else: |
| 104 | + part = payload.get_payload( |
| 105 | + value, headers=headers, encoding=encoding) |
| 106 | + if dispparams: |
| 107 | + part.set_content_disposition( |
| 108 | + 'form-data', quote_fields=self._quote_fields, **dispparams |
| 109 | + ) |
| 110 | + # FIXME cgi.FieldStorage doesn't likes body parts with |
| 111 | + # Content-Length which were sent via chunked transfer encoding |
| 112 | + part.headers.pop(hdrs.CONTENT_LENGTH, None) |
| 113 | + |
| 114 | + self._writer.append_payload(part) |
| 115 | + |
| 116 | + return self._writer |
| 117 | + |
| 118 | + def __call__(self, encoding): |
| 119 | + if self._is_multipart: |
| 120 | + return self._gen_form_data(encoding) |
| 121 | + else: |
| 122 | + return self._gen_form_urlencoded(encoding) |
0 commit comments