Package scrapfly

Sub-modules

scrapfly.api_config
scrapfly.api_response
scrapfly.client
scrapfly.errors
scrapfly.extraction_config
scrapfly.frozen_dict
scrapfly.polyfill
scrapfly.reporter
scrapfly.scrape_config
scrapfly.scrapy
scrapfly.screenshot_config
scrapfly.webhook

Classes

class ApiHttpClientError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ApiHttpClientError(HttpError):
    pass

Ancestors

  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Subclasses

  • ApiHttpServerError
  • scrapfly.errors.BadApiKeyError
  • scrapfly.errors.PaymentRequired
  • scrapfly.errors.TooManyRequest
class ApiHttpServerError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ApiHttpServerError(ApiHttpClientError):
    pass

Ancestors

class EncoderError (content: str)

Common base class for all exceptions

Expand source code
class EncoderError(BaseException):

    def __init__(self, content:str):
        self.content = content
        super().__init__()

    def __str__(self) -> str:
        return self.content

    def __repr__(self):
        return "Invalid payload: %s" % self.content

Ancestors

  • builtins.BaseException
class ErrorFactory
Expand source code
class ErrorFactory:
    RESOURCE_TO_ERROR = {
        ScrapflyError.RESOURCE_SCRAPE: ScrapflyScrapeError,
        ScrapflyError.RESOURCE_WEBHOOK: ScrapflyWebhookError,
        ScrapflyError.RESOURCE_PROXY: ScrapflyProxyError,
        ScrapflyError.RESOURCE_SCHEDULE: ScrapflyScheduleError,
        ScrapflyError.RESOURCE_ASP: ScrapflyAspError,
        ScrapflyError.RESOURCE_SESSION: ScrapflySessionError
    }

    # Notable http error has own class for more convenience
    # Only applicable for generic API error
    HTTP_STATUS_TO_ERROR = {
        401: BadApiKeyError,
        402: PaymentRequired,
        429: TooManyRequest
    }

    @staticmethod
    def _get_resource(code: str) -> Optional[Tuple[str, str]]:

        if isinstance(code, str) and '::' in code:
            _, resource, _ = code.split('::')
            return resource

        return None

    @staticmethod
    def create(api_response: 'ScrapeApiResponse'):
        is_retryable = False
        kind = ScrapflyError.KIND_HTTP_BAD_RESPONSE if api_response.success is False else ScrapflyError.KIND_SCRAPFLY_ERROR
        http_code = api_response.status_code
        retry_delay = 5
        retry_times = 3
        description = None
        error_url = 'https://scrapfly.io/docs/scrape-api/errors#api'
        code = api_response.error['code']

        if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE':
            http_code = api_response.scrape_result['status_code']

        if 'description' in api_response.error:
            description = api_response.error['description']

        message = '%s %s %s' % (str(http_code), code, api_response.error['message'])

        if 'doc_url' in api_response.error:
            error_url = api_response.error['doc_url']

        if 'retryable' in api_response.error:
            is_retryable = api_response.error['retryable']

        resource = ErrorFactory._get_resource(code=code)

        if is_retryable is True:
            if 'X-Retry' in api_response.headers:
                retry_delay = int(api_response.headers['Retry-After'])

        message = '%s: %s' % (message, description) if description else message

        if retry_delay is not None and is_retryable is True:
            message = '%s. Retry delay : %s seconds' % (message, str(retry_delay))

        args = {
            'message': message,
            'code': code,
            'http_status_code': http_code,
            'is_retryable': is_retryable,
            'api_response': api_response,
            'resource': resource,
            'retry_delay': retry_delay,
            'retry_times': retry_times,
            'documentation_url': error_url,
            'request': api_response.request,
            'response': api_response.response
        }

        if kind == ScrapflyError.KIND_HTTP_BAD_RESPONSE:
            if http_code >= 500:
                return ApiHttpServerError(**args)

            is_scraper_api_error = resource in ErrorFactory.RESOURCE_TO_ERROR

            if http_code in ErrorFactory.HTTP_STATUS_TO_ERROR and not is_scraper_api_error:
                return ErrorFactory.HTTP_STATUS_TO_ERROR[http_code](**args)

            if is_scraper_api_error:
                return ErrorFactory.RESOURCE_TO_ERROR[resource](**args)

            return ApiHttpClientError(**args)

        elif kind == ScrapflyError.KIND_SCRAPFLY_ERROR:
            if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE':
                if http_code >= 500:
                    return UpstreamHttpServerError(**args)

                if http_code >= 400:
                    return UpstreamHttpClientError(**args)

            if resource in ErrorFactory.RESOURCE_TO_ERROR:
                return ErrorFactory.RESOURCE_TO_ERROR[resource](**args)

            return ScrapflyError(**args)

Class variables

var HTTP_STATUS_TO_ERROR
var RESOURCE_TO_ERROR

Static methods

def create(api_response: ScrapeApiResponse)
class ExtractionAPIError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ExtractionAPIError(HttpError):
    pass

Ancestors

  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ExtractionApiResponse (request: requests.models.Request, response: requests.models.Response, extraction_config: ExtractionConfig, api_result: Optional[bytes] = None)
Expand source code
class ExtractionApiResponse(ApiResponse):
    def __init__(self, request: Request, response: Response, extraction_config: ExtractionConfig, api_result: Optional[bytes] = None):
        super().__init__(request, response)
        self.extraction_config = extraction_config
        self.result = self.handle_api_result(api_result)

    @property
    def extraction_result(self) -> Optional[Dict]:
        extraction_result = self.result.get('result', None)
        if not extraction_result:  # handle empty extraction responses
            return {'data': None, 'content_type': None}
        else:
            return extraction_result

    @property
    def data(self) -> Union[Dict, List, str]:  # depends on the LLM prompt
        if self.error is None:
            return self.extraction_result['data']

        return None

    @property
    def content_type(self) -> Optional[str]:
        if self.error is None:
            return self.extraction_result['content_type']

        return None

    @property
    def extraction_success(self) -> bool:
        extraction_result = self.extraction_result
        if extraction_result is None or extraction_result['data'] is None:
            return False

        return True

    @property
    def error(self) -> Optional[Dict]:
        if self.extraction_result is None:
            return self.result

        return None

    def _is_api_error(self, api_result: Dict) -> bool:
        if api_result is None:
            return True

        return 'error_id' in api_result

    def handle_api_result(self, api_result: bytes) -> FrozenDict:
        if self._is_api_error(api_result=api_result) is True:
            return FrozenDict(api_result)

        return FrozenDict({'result': api_result})

    def raise_for_result(self, raise_on_upstream_error=True, error_class=ExtractionAPIError):
        super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)

Ancestors

Instance variables

prop content_type : Optional[str]
Expand source code
@property
def content_type(self) -> Optional[str]:
    if self.error is None:
        return self.extraction_result['content_type']

    return None
prop data : Union[Dict, List, str]
Expand source code
@property
def data(self) -> Union[Dict, List, str]:  # depends on the LLM prompt
    if self.error is None:
        return self.extraction_result['data']

    return None
prop error : Optional[Dict]
Expand source code
@property
def error(self) -> Optional[Dict]:
    if self.extraction_result is None:
        return self.result

    return None
prop extraction_result : Optional[Dict]
Expand source code
@property
def extraction_result(self) -> Optional[Dict]:
    extraction_result = self.result.get('result', None)
    if not extraction_result:  # handle empty extraction responses
        return {'data': None, 'content_type': None}
    else:
        return extraction_result
prop extraction_success : bool
Expand source code
@property
def extraction_success(self) -> bool:
    extraction_result = self.extraction_result
    if extraction_result is None or extraction_result['data'] is None:
        return False

    return True

Methods

def handle_api_result(self, api_result: bytes) ‑> FrozenDict
def raise_for_result(self, raise_on_upstream_error=True, error_class=scrapfly.errors.ExtractionAPIError)

Inherited members

class ExtractionConfig (body: Union[str, bytes], content_type: str, url: Optional[str] = None, charset: Optional[str] = None, extraction_template: Optional[str] = None, extraction_ephemeral_template: Optional[Dict] = None, extraction_prompt: Optional[str] = None, extraction_model: Optional[str] = None, is_document_compressed: Optional[bool] = None, document_compression_format: Optional[CompressionFormat] = None, webhook: Optional[str] = None, raise_on_upstream_error: bool = True, template: Optional[str] = None, ephemeral_template: Optional[Dict] = None)
Expand source code
class ExtractionConfig(BaseApiConfig):
    body: Union[str, bytes]
    content_type: str
    url: Optional[str] = None
    charset: Optional[str] = None
    extraction_template: Optional[str] = None  # a saved template name
    extraction_ephemeral_template: Optional[Dict]  # ephemeraly declared json template
    extraction_prompt: Optional[str] = None
    extraction_model: Optional[str] = None
    is_document_compressed: Optional[bool] = None
    document_compression_format: Optional[CompressionFormat] = None
    webhook: Optional[str] = None
    raise_on_upstream_error: bool = True

    # deprecated options
    template: Optional[str] = None
    ephemeral_template: Optional[Dict] = None

    def __init__(
        self,
        body: Union[str, bytes],
        content_type: str,
        url: Optional[str] = None,
        charset: Optional[str] = None,
        extraction_template: Optional[str] = None,  # a saved template name
        extraction_ephemeral_template: Optional[Dict] = None,  # ephemeraly declared json template
        extraction_prompt: Optional[str] = None,
        extraction_model: Optional[str] = None,
        is_document_compressed: Optional[bool] = None,
        document_compression_format: Optional[CompressionFormat] = None,
        webhook: Optional[str] = None,
        raise_on_upstream_error: bool = True,

        # deprecated options
        template: Optional[str] = None,
        ephemeral_template: Optional[Dict] = None     
    ):
        if template:
            print("WARNGING")
            warnings.warn(
                "Deprecation warning: 'template' is deprecated. Use 'extraction_template' instead."
            )
            extraction_template = template

        if ephemeral_template:
            warnings.warn(
                "Deprecation warning: 'ephemeral_template' is deprecated. Use 'extraction_ephemeral_template' instead."
            )
            extraction_ephemeral_template = ephemeral_template

        self.key = None
        self.body = body
        self.content_type = content_type
        self.url = url
        self.charset = charset
        self.extraction_template = extraction_template
        self.extraction_ephemeral_template = extraction_ephemeral_template
        self.extraction_prompt = extraction_prompt
        self.extraction_model = extraction_model
        self.is_document_compressed = is_document_compressed
        self.document_compression_format = CompressionFormat(document_compression_format) if document_compression_format else None
        self.webhook = webhook
        self.raise_on_upstream_error = raise_on_upstream_error

        if isinstance(body, bytes) or document_compression_format:
            compression_format = detect_compression_format(body)

            if compression_format is not None:
                self.is_document_compressed = True

                if self.document_compression_format and compression_format != self.document_compression_format:
                    raise ExtractionConfigError(
                        f'The detected compression format `{compression_format}` does not match declared format `{self.document_compression_format}`. '
                        f'You must pass the compression format or disable compression.'
                    )
                
                self.document_compression_format = compression_format
            
            else:
                self.is_document_compressed = False

            if self.is_document_compressed is False:
                compression_foramt = CompressionFormat(self.document_compression_format) if self.document_compression_format else None
                
                if isinstance(self.body, str) and compression_foramt:
                    self.body = self.body.encode('utf-8')

                if compression_foramt == CompressionFormat.GZIP:
                    import gzip
                    self.body = gzip.compress(self.body)

                elif compression_foramt == CompressionFormat.ZSTD:
                    try:
                        import zstandard as zstd
                    except ImportError:
                        raise ExtractionConfigError(
                            f'zstandard is not installed. You must run pip install zstandard'
                            f' to auto compress into zstd or use compression formats.'
                        )
                    self.body = zstd.compress(self.body)
                
                elif compression_foramt == CompressionFormat.DEFLATE:
                    import zlib
                    compressor = zlib.compressobj(wbits=-zlib.MAX_WBITS) # raw deflate compression
                    self.body = compressor.compress(self.body) + compressor.flush()

    def to_api_params(self, key: str) -> Dict:
        params = {
            'key': self.key or key,
            'content_type': self.content_type
        }

        if self.url:
            params['url'] = self.url

        if self.charset:
            params['charset'] = self.charset

        if self.extraction_template and self.extraction_ephemeral_template:
            raise ExtractionConfigError('You cannot pass both parameters extraction_template and extraction_ephemeral_template. You must choose')

        if self.extraction_template:
            params['extraction_template'] = self.extraction_template

        if self.extraction_ephemeral_template:
            self.extraction_ephemeral_template = json.dumps(self.extraction_ephemeral_template)
            params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.extraction_ephemeral_template.encode('utf-8')).decode('utf-8')

        if self.extraction_prompt:
            params['extraction_prompt'] = quote_plus(self.extraction_prompt)

        if self.extraction_model:
            params['extraction_model'] = self.extraction_model

        if self.webhook:
            params['webhook_name'] = self.webhook

        return params

    def to_dict(self) -> Dict:
        """
        Export the ExtractionConfig instance to a plain dictionary.
        """

        if self.is_document_compressed is True:
                compression_foramt = CompressionFormat(self.document_compression_format) if self.document_compression_format else None

                if compression_foramt == CompressionFormat.GZIP:
                    import gzip
                    self.body = gzip.decompress(self.body)
                    
                elif compression_foramt == CompressionFormat.ZSTD:
                    import zstandard as zstd
                    self.body = zstd.decompress(self.body)

                elif compression_foramt == CompressionFormat.DEFLATE:
                    import zlib
                    decompressor = zlib.decompressobj(wbits=-zlib.MAX_WBITS)
                    self.body = decompressor.decompress(self.body) + decompressor.flush()

                if isinstance(self.body, bytes):
                    self.body = self.body.decode('utf-8')
                    self.is_document_compressed = False

        return {
            'body': self.body,
            'content_type': self.content_type,
            'url': self.url,
            'charset': self.charset,
            'extraction_template': self.extraction_template,
            'extraction_ephemeral_template': self.extraction_ephemeral_template,
            'extraction_prompt': self.extraction_prompt,
            'extraction_model': self.extraction_model,
            'is_document_compressed': self.is_document_compressed,
            'document_compression_format': CompressionFormat(self.document_compression_format).value if self.document_compression_format else None,
            'webhook': self.webhook,
            'raise_on_upstream_error': self.raise_on_upstream_error,
        }
    
    @staticmethod
    def from_dict(extraction_config_dict: Dict) -> 'ExtractionConfig':
        """Create an ExtractionConfig instance from a dictionary."""
        body = extraction_config_dict.get('body', None)
        content_type = extraction_config_dict.get('content_type', None)
        url = extraction_config_dict.get('url', None)
        charset = extraction_config_dict.get('charset', None)
        extraction_template = extraction_config_dict.get('extraction_template', None)
        extraction_ephemeral_template = extraction_config_dict.get('extraction_ephemeral_template', None)
        extraction_prompt = extraction_config_dict.get('extraction_prompt', None)
        extraction_model = extraction_config_dict.get('extraction_model', None)
        is_document_compressed = extraction_config_dict.get('is_document_compressed', None)

        document_compression_format = extraction_config_dict.get('document_compression_format', None)
        document_compression_format = CompressionFormat(document_compression_format) if document_compression_format else None
        
        webhook = extraction_config_dict.get('webhook', None)
        raise_on_upstream_error = extraction_config_dict.get('raise_on_upstream_error', True)

        return ExtractionConfig(
            body=body,
            content_type=content_type,
            url=url,
            charset=charset,
            extraction_template=extraction_template,
            extraction_ephemeral_template=extraction_ephemeral_template,
            extraction_prompt=extraction_prompt,
            extraction_model=extraction_model,
            is_document_compressed=is_document_compressed,
            document_compression_format=document_compression_format,
            webhook=webhook,
            raise_on_upstream_error=raise_on_upstream_error
        )

Ancestors

Class variables

var body : Union[str, bytes]
var charset : Optional[str]
var content_type : str
var document_compression_format : Optional[CompressionFormat]
var ephemeral_template : Optional[Dict]
var extraction_ephemeral_template : Optional[Dict]
var extraction_model : Optional[str]
var extraction_prompt : Optional[str]
var extraction_template : Optional[str]
var is_document_compressed : Optional[bool]
var raise_on_upstream_error : bool
var template : Optional[str]
var url : Optional[str]
var webhook : Optional[str]

Static methods

def from_dict(extraction_config_dict: Dict) ‑> ExtractionConfig

Create an ExtractionConfig instance from a dictionary.

Methods

def to_api_params(self, key: str) ‑> Dict
def to_dict(self) ‑> Dict

Export the ExtractionConfig instance to a plain dictionary.

class HttpError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class HttpError(ScrapflyError):

    def __init__(self, request:Request, response:Optional[Response]=None, **kwargs):
        self.request = request
        self.response = response
        super().__init__(**kwargs)

    def __str__(self) -> str:
        if isinstance(self, UpstreamHttpError):
            return f"Target website responded with {self.api_response.scrape_result['status_code']} - {self.api_response.scrape_result['reason']}"

        if self.api_response is not None:
            return self.api_response.error_message

        text = f"{self.response.status_code} - {self.response.reason}"

        if isinstance(self, (ApiHttpClientError, ApiHttpServerError)):
            text += " - " + self.message

        return text

Ancestors

Subclasses

  • ApiHttpClientError
  • scrapfly.errors.ExtractionAPIError
  • scrapfly.errors.QuotaLimitReached
  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.ScreenshotAPIError
  • scrapfly.errors.TooManyConcurrentRequest
  • scrapfly.errors.UpstreamHttpError
class ResponseBodyHandler (use_brotli: bool = False, signing_secrets: Optional[Tuple[str]] = None)
Expand source code
class ResponseBodyHandler:

    SUPPORTED_COMPRESSION = ['gzip', 'deflate']
    SUPPORTED_CONTENT_TYPES = ['application/msgpack', 'application/json']

    class JSONDateTimeDecoder(JSONDecoder):
        def __init__(self, *args, **kargs):
            JSONDecoder.__init__(self, *args, object_hook=_date_parser, **kargs)

    # brotli under perform at same gzip level and upper level destroy the cpu so
    # the trade off do not worth it for most of usage
    def __init__(self, use_brotli: bool = False, signing_secrets: Optional[Tuple[str]] = None):
        if use_brotli is True and 'br' not in self.SUPPORTED_COMPRESSION:
            try:
                try:
                    import brotlicffi as brotli
                    self.SUPPORTED_COMPRESSION.insert(0, 'br')
                except ImportError:
                    import brotli
                    self.SUPPORTED_COMPRESSION.insert(0, 'br')
            except ImportError:
                pass

        try:
            import zstd
            self.SUPPORTED_COMPRESSION.append('zstd')
        except ImportError:
            pass

        self.content_encoding: str = ', '.join(self.SUPPORTED_COMPRESSION)
        self._signing_secret: Optional[Tuple[str]] = None

        if signing_secrets:
            _secrets = set()

            for signing_secret in signing_secrets:
                _secrets.add(binascii.unhexlify(signing_secret))

            self._signing_secret = tuple(_secrets)

        try:  # automatically use msgpack if available https://msgpack.org/
            import msgpack
            self.accept = 'application/msgpack;charset=utf-8'
            self.content_type = 'application/msgpack;charset=utf-8'
            self.content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False)
        except ImportError:
            self.accept = 'application/json;charset=utf-8'
            self.content_type = 'application/json;charset=utf-8'
            self.content_loader = partial(loads, cls=self.JSONDateTimeDecoder)

    def support(self, headers: Dict) -> bool:
        if 'content-type' not in headers:
            return False

        for content_type in self.SUPPORTED_CONTENT_TYPES:
            if headers['content-type'].find(content_type) != -1:
                return True

        return False

    def verify(self, message: bytes, signature: str) -> bool:
        for signing_secret in self._signing_secret:
            if hmac.new(signing_secret, message, hashlib.sha256).hexdigest().upper() == signature:
                return True

        return False

    def read(self, content: bytes, content_encoding: str, content_type: str, signature: Optional[str]) -> Dict:
        if content_encoding == 'gzip' or content_encoding == 'gz':
            import gzip
            content = gzip.decompress(content)
        elif content_encoding == 'deflate':
            import zlib
            content = zlib.decompress(content)
        elif content_encoding == 'brotli' or content_encoding == 'br':
            import brotli
            content = brotli.decompress(content)
        elif content_encoding == 'zstd':
            import zstd
            content = zstd.decompress(content)

        if self._signing_secret is not None and signature is not None:
            if not self.verify(content, signature):
                raise WebhookSignatureMissMatch()

        if content_type.startswith('application/json'):
            content = loads(content, cls=self.JSONDateTimeDecoder)
        elif content_type.startswith('application/msgpack'):
            import msgpack
            content = msgpack.loads(content, object_hook=_date_parser, strict_map_key=False)

        return content

    def __call__(self, content: bytes, content_type: str) -> Union[str, Dict]:
        content_loader = None

        if content_type.find('application/json') != -1:
            content_loader = partial(loads, cls=self.JSONDateTimeDecoder)
        elif content_type.find('application/msgpack') != -1:
            import msgpack
            content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False)

        if content_loader is None:
            raise Exception('Unsupported content type')

        try:
            return content_loader(content)
        except Exception as e:
            try:
                raise EncoderError(content=content.decode('utf-8')) from e
            except UnicodeError:
                raise EncoderError(content=base64.b64encode(content).decode('utf-8')) from e

Class variables

var JSONDateTimeDecoder

Simple JSON https://json.org decoder

Performs the following translations in decoding by default:

+---------------+-------------------+ | JSON | Python | +===============+===================+ | object | dict | +---------------+-------------------+ | array | list | +---------------+-------------------+ | string | str | +---------------+-------------------+ | number (int) | int | +---------------+-------------------+ | number (real) | float | +---------------+-------------------+ | true | True | +---------------+-------------------+ | false | False | +---------------+-------------------+ | null | None | +---------------+-------------------+

It also understands NaN, Infinity, and -Infinity as their corresponding float values, which is outside the JSON spec.

var SUPPORTED_COMPRESSION
var SUPPORTED_CONTENT_TYPES

Methods

def read(self, content: bytes, content_encoding: str, content_type: str, signature: Optional[str]) ‑> Dict
def support(self, headers: Dict) ‑> bool
def verify(self, message: bytes, signature: str) ‑> bool
class ScrapeApiResponse (request: requests.models.Request, response: requests.models.Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None, large_object_handler: Optional[Callable] = None)
Expand source code
class ScrapeApiResponse(ApiResponse):
    scrape_config:ScrapeConfig
    large_object_handler:Callable

    def __init__(self, request: Request, response: Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None, large_object_handler:Optional[Callable]=None):
        super().__init__(request, response)
        self.scrape_config = scrape_config
        self.large_object_handler = large_object_handler

        if self.scrape_config.method == 'HEAD':
            api_result = {
                'result': {
                    'request_headers': {},
                    'status': 'DONE',
                    'success': 200 >= self.response.status_code < 300,
                    'response_headers': self.response.headers,
                    'status_code': self.response.status_code,
                    'reason': self.response.reason,
                    'format': 'text',
                    'content': ''
                },
                'context': {},
                'config': self.scrape_config.__dict__
            }

            if 'X-Scrapfly-Reject-Code' in self.response.headers:
                api_result['result']['error'] = {
                    'code': self.response.headers['X-Scrapfly-Reject-Code'],
                    'http_code': int(self.response.headers['X-Scrapfly-Reject-Http-Code']),
                    'message': self.response.headers['X-Scrapfly-Reject-Description'],
                    'error_id': self.response.headers['X-Scrapfly-Reject-ID'],
                    'retryable': True if self.response.headers['X-Scrapfly-Reject-Retryable'] == 'yes' else False,
                    'doc_url': '',
                    'links': {}
                }

                if 'X-Scrapfly-Reject-Doc' in self.response.headers:
                    api_result['result']['error']['doc_url'] = self.response.headers['X-Scrapfly-Reject-Doc']
                    api_result['result']['error']['links']['Related Docs'] = self.response.headers['X-Scrapfly-Reject-Doc']

        if isinstance(api_result, str):
            raise HttpError(
                request=request,
                response=response,
                message='Bad gateway',
                code=502,
                http_status_code=502,
                is_retryable=True
            )

        self.result = self.handle_api_result(api_result=api_result)

    @property
    def scrape_result(self) -> Optional[Dict]:
        return self.result.get('result', None)

    @property
    def config(self) -> Optional[Dict]:
        if self.scrape_result is None:
            return None

        return self.result['config']

    @property
    def context(self) -> Optional[Dict]:
        if self.scrape_result is None:
            return None

        return self.result['context']

    @property
    def content(self) -> str:
        if self.scrape_result is None:
            return ''

        return self.scrape_result['content']

    @property
    def success(self) -> bool:
        """
            Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code
        """
        return 200 >= self.response.status_code <= 299

    @property
    def scrape_success(self) -> bool:
        scrape_result = self.scrape_result

        if not scrape_result:
            return False

        return self.scrape_result['success']

    @property
    def error(self) -> Optional[Dict]:
        if self.scrape_result is None:
            return None

        if self.scrape_success is False:
            return self.scrape_result['error']

    @property
    def upstream_status_code(self) -> Optional[int]:
        if self.scrape_result is None:
            return None

        if 'status_code' in self.scrape_result:
            return self.scrape_result['status_code']

        return None

    @cached_property
    def soup(self) -> 'BeautifulSoup':
        if self.scrape_result['format'] != 'text':
            raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content")

        try:
            from bs4 import BeautifulSoup
            soup = BeautifulSoup(self.content, "lxml")
            return soup
        except ImportError as e:
            logger.error('You must install scrapfly[parser] to enable this feature')

    @cached_property
    def selector(self) -> 'Selector':
        if self.scrape_result['format'] != 'text':
            raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content")

        try:
            from parsel import Selector
            return Selector(text=self.content)
        except ImportError as e:
            logger.error('You must install parsel or scrapy package to enable this feature')
            raise e

    def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]:
        if self._is_api_error(api_result=api_result) is True:
            return FrozenDict(api_result)

        try:
            if isinstance(api_result['config']['headers'], list):
                api_result['config']['headers'] = {}
        except TypeError:
            logger.info(api_result)
            raise

        with suppress(KeyError):
            api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers'])
            api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers'])

        if self.large_object_handler is not None and api_result['result']['content']:
            content_format = api_result['result']['format']

            if content_format in ['clob', 'blob']:
                api_result['result']['content'], api_result['result']['format'] = self.large_object_handler(callback_url=api_result['result']['content'], format=content_format)
            elif content_format == 'binary':
                base64_payload = api_result['result']['content']

                if isinstance(base64_payload, bytes):
                    base64_payload = base64_payload.decode('utf-8')

                api_result['result']['content'] = BytesIO(b64decode(base64_payload))

        return FrozenDict(api_result)

    def _is_api_error(self, api_result: Dict) -> bool:
        if self.scrape_config.method == 'HEAD':
            if 'X-Reject-Reason' in self.response.headers:
                return True
            return False

        if api_result is None:
            return True

        return 'error_id' in api_result

    def upstream_result_into_response(self, _class=Response) -> Optional[Response]:
        if _class != Response:
            raise RuntimeError('only Response from requests package is supported at the moment')

        if self.result is None:
            return None

        if self.response.status_code != 200:
            return None

        response = Response()
        response.status_code = self.scrape_result['status_code']
        response.reason = self.scrape_result['reason']

        if self.scrape_result['content']:
            if isinstance(self.scrape_result['content'], BytesIO):
                response._content = self.scrape_result['content'].getvalue()
            elif isinstance(self.scrape_result['content'], bytes):
                response._content = self.scrape_result['content']
            elif isinstance(self.scrape_result['content'], str):
                response._content = self.scrape_result['content'].encode('utf-8')
        else:
            response._content = None

        response.headers.update(self.scrape_result['response_headers'])
        response.url = self.scrape_result['url']

        response.request = Request(
            method=self.config['method'],
            url=self.config['url'],
            headers=self.scrape_result['request_headers'],
            data=self.config['body'] if self.config['body'] else None
        )

        if 'set-cookie' in response.headers:
            for raw_cookie in response.headers['set-cookie']:
                for name, cookie in SimpleCookie(raw_cookie).items():
                    expires = cookie.get('expires')

                    if expires == '':
                        expires = None

                    if expires:
                        try:
                            expires = parse(expires).timestamp()
                        except ValueError:
                            expires = None

                    if type(expires) == str:
                        if '.' in expires:
                            expires = float(expires)
                        else:
                            expires = int(expires)

                    response.cookies.set_cookie(Cookie(
                        version=cookie.get('version') if cookie.get('version') else None,
                        name=name,
                        value=cookie.value,
                        path=cookie.get('path', ''),
                        expires=expires,
                        comment=cookie.get('comment'),
                        domain=cookie.get('domain', ''),
                        secure=cookie.get('secure'),
                        port=None,
                        port_specified=False,
                        domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '',
                        domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False,
                        path_specified=cookie.get('path') != '' and cookie.get('path') is not None,
                        discard=False,
                        comment_url=None,
                        rest={
                            'httponly': cookie.get('httponly'),
                            'samesite': cookie.get('samesite'),
                            'max-age': cookie.get('max-age')
                        }
                    ))

        return response

    def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content: Optional[Union[str, bytes]] = None):
        file_content = content or self.scrape_result['content']
        file_path = None
        file_extension = None

        if name:
            name_parts = name.split('.')
            if len(name_parts) > 1:
                file_extension = name_parts[-1]

        if not file:
            if file_extension is None:
                try:
                    mime_type = self.scrape_result['response_headers']['content-type']
                except KeyError:
                    mime_type = 'application/octet-stream'

                if ';' in mime_type:
                    mime_type = mime_type.split(';')[0]

                file_extension = '.' + mime_type.split('/')[1]

            if not name:
                name = self.config['url'].split('/')[-1]

            if name.find(file_extension) == -1:
                name += file_extension

            file_path = path + '/' + name if path is not None else name

            if file_path == file_extension:
                url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-')

                if url[-1] == '-':
                    url = url[:-1]

                url += file_extension

                file_path = url

            file = open(file_path, 'wb')

        if isinstance(file_content, str):
            file_content = BytesIO(file_content.encode('utf-8'))
        elif isinstance(file_content, bytes):
            file_content = BytesIO(file_content)

        file_content.seek(0)
        with file as f:
            shutil.copyfileobj(file_content, f, length=131072)

        logger.info('file %s created' % file_path)

    def raise_for_result(self, raise_on_upstream_error=True, error_class=ApiHttpClientError):
        super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
        if self.result['result']['status'] == 'DONE' and self.scrape_success is False:
            error = ErrorFactory.create(api_response=self)
            if error:
                if isinstance(error, UpstreamHttpError):
                    if raise_on_upstream_error is True:
                        raise error
                else:
                    raise error

Ancestors

Class variables

var large_object_handler : Callable
var scrape_configScrapeConfig

Instance variables

prop config : Optional[Dict]
Expand source code
@property
def config(self) -> Optional[Dict]:
    if self.scrape_result is None:
        return None

    return self.result['config']
prop content : str
Expand source code
@property
def content(self) -> str:
    if self.scrape_result is None:
        return ''

    return self.scrape_result['content']
prop context : Optional[Dict]
Expand source code
@property
def context(self) -> Optional[Dict]:
    if self.scrape_result is None:
        return None

    return self.result['context']
prop error : Optional[Dict]
Expand source code
@property
def error(self) -> Optional[Dict]:
    if self.scrape_result is None:
        return None

    if self.scrape_success is False:
        return self.scrape_result['error']
prop scrape_result : Optional[Dict]
Expand source code
@property
def scrape_result(self) -> Optional[Dict]:
    return self.result.get('result', None)
prop scrape_success : bool
Expand source code
@property
def scrape_success(self) -> bool:
    scrape_result = self.scrape_result

    if not scrape_result:
        return False

    return self.scrape_result['success']
var selector
Expand source code
def __get__(self, instance, owner=None):
    if instance is None:
        return self
    if self.attrname is None:
        raise TypeError(
            "Cannot use cached_property instance without calling __set_name__ on it.")
    try:
        cache = instance.__dict__
    except AttributeError:  # not all objects have __dict__ (e.g. class defines slots)
        msg = (
            f"No '__dict__' attribute on {type(instance).__name__!r} "
            f"instance to cache {self.attrname!r} property."
        )
        raise TypeError(msg) from None
    val = cache.get(self.attrname, _NOT_FOUND)
    if val is _NOT_FOUND:
        val = self.func(instance)
        try:
            cache[self.attrname] = val
        except TypeError:
            msg = (
                f"The '__dict__' attribute on {type(instance).__name__!r} instance "
                f"does not support item assignment for caching {self.attrname!r} property."
            )
            raise TypeError(msg) from None
    return val
var soup
Expand source code
def __get__(self, instance, owner=None):
    if instance is None:
        return self
    if self.attrname is None:
        raise TypeError(
            "Cannot use cached_property instance without calling __set_name__ on it.")
    try:
        cache = instance.__dict__
    except AttributeError:  # not all objects have __dict__ (e.g. class defines slots)
        msg = (
            f"No '__dict__' attribute on {type(instance).__name__!r} "
            f"instance to cache {self.attrname!r} property."
        )
        raise TypeError(msg) from None
    val = cache.get(self.attrname, _NOT_FOUND)
    if val is _NOT_FOUND:
        val = self.func(instance)
        try:
            cache[self.attrname] = val
        except TypeError:
            msg = (
                f"The '__dict__' attribute on {type(instance).__name__!r} instance "
                f"does not support item assignment for caching {self.attrname!r} property."
            )
            raise TypeError(msg) from None
    return val
prop success : bool

Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code

Expand source code
@property
def success(self) -> bool:
    """
        Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code
    """
    return 200 >= self.response.status_code <= 299
prop upstream_status_code : Optional[int]
Expand source code
@property
def upstream_status_code(self) -> Optional[int]:
    if self.scrape_result is None:
        return None

    if 'status_code' in self.scrape_result:
        return self.scrape_result['status_code']

    return None

Methods

def handle_api_result(self, api_result: Dict) ‑> Optional[FrozenDict]
def raise_for_result(self, raise_on_upstream_error=True, error_class=scrapfly.errors.ApiHttpClientError)
def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Union[TextIO, _io.BytesIO, ForwardRef(None)] = None, content: Union[str, bytes, ForwardRef(None)] = None)
def upstream_result_into_response(self) ‑> Optional[requests.models.Response]

Inherited members

class ScrapeConfig (url: str, retry: bool = True, method: str = 'GET', country: Optional[str] = None, render_js: bool = False, cache: bool = False, cache_clear: bool = False, ssl: bool = False, dns: bool = False, asp: bool = False, debug: bool = False, raise_on_upstream_error: bool = True, cache_ttl: Optional[int] = None, proxy_pool: Optional[str] = None, session: Optional[str] = None, tags: Union[List[str], Set[str], ForwardRef(None)] = None, format: Optional[Format] = None, format_options: Optional[List[FormatOption]] = None, extraction_template: Optional[str] = None, extraction_ephemeral_template: Optional[Dict] = None, extraction_prompt: Optional[str] = None, extraction_model: Optional[str] = None, correlation_id: Optional[str] = None, cookies: Optional[requests.structures.CaseInsensitiveDict] = None, body: Optional[str] = None, data: Optional[Dict] = None, headers: Union[requests.structures.CaseInsensitiveDict, Dict[str, str], ForwardRef(None)] = None, js: str = None, rendering_wait: int = None, rendering_stage: Literal['complete', 'domcontentloaded'] = 'complete', wait_for_selector: Optional[str] = None, screenshots: Optional[Dict] = None, screenshot_flags: Optional[List[ScreenshotFlag]] = None, session_sticky_proxy: Optional[bool] = None, webhook: Optional[str] = None, timeout: Optional[int] = None, js_scenario: Optional[List] = None, extract: Optional[Dict] = None, os: Optional[str] = None, lang: Optional[List[str]] = None, auto_scroll: Optional[bool] = None, cost_budget: Optional[int] = None)
Expand source code
class ScrapeConfig(BaseApiConfig):

    PUBLIC_DATACENTER_POOL = 'public_datacenter_pool'
    PUBLIC_RESIDENTIAL_POOL = 'public_residential_pool'

    url: str
    retry: bool = True
    method: str = 'GET'
    country: Optional[str] = None
    render_js: bool = False
    cache: bool = False
    cache_clear:bool = False
    ssl:bool = False
    dns:bool = False
    asp:bool = False
    debug: bool = False
    raise_on_upstream_error:bool = True
    cache_ttl:Optional[int] = None
    proxy_pool:Optional[str] = None
    session: Optional[str] = None
    tags: Optional[List[str]] = None
    format: Optional[Format] = None, # raw(unchanged)
    format_options: Optional[List[FormatOption]] 
    extraction_template: Optional[str] = None  # a saved template name
    extraction_ephemeral_template: Optional[Dict]  # ephemeraly declared json template
    extraction_prompt: Optional[str] = None
    extraction_model: Optional[str] = None    
    correlation_id: Optional[str] = None
    cookies: Optional[CaseInsensitiveDict] = None
    body: Optional[str] = None
    data: Optional[Dict] = None
    headers: Optional[CaseInsensitiveDict] = None
    js: str = None
    rendering_wait: int = None
    rendering_stage: Literal["complete", "domcontentloaded"] = "complete"
    wait_for_selector: Optional[str] = None
    session_sticky_proxy:bool = True
    screenshots:Optional[Dict]=None
    screenshot_flags: Optional[List[ScreenshotFlag]] = None,
    webhook:Optional[str]=None
    timeout:Optional[int]=None # in milliseconds
    js_scenario: Dict = None
    extract: Dict = None
    lang:Optional[List[str]] = None
    os:Optional[str] = None
    auto_scroll:Optional[bool] = None
    cost_budget:Optional[int] = None

    def __init__(
        self,
        url: str,
        retry: bool = True,
        method: str = 'GET',
        country: Optional[str] = None,
        render_js: bool = False,
        cache: bool = False,
        cache_clear:bool = False,
        ssl:bool = False,
        dns:bool = False,
        asp:bool = False,
        debug: bool = False,
        raise_on_upstream_error:bool = True,
        cache_ttl:Optional[int] = None,
        proxy_pool:Optional[str] = None,
        session: Optional[str] = None,
        tags: Optional[Union[List[str], Set[str]]] = None,
        format: Optional[Format] = None, # raw(unchanged)
        format_options: Optional[List[FormatOption]] = None, # raw(unchanged)
        extraction_template: Optional[str] = None,  # a saved template name
        extraction_ephemeral_template: Optional[Dict] = None,  # ephemeraly declared json template
        extraction_prompt: Optional[str] = None,
        extraction_model: Optional[str] = None,        
        correlation_id: Optional[str] = None,
        cookies: Optional[CaseInsensitiveDict] = None,
        body: Optional[str] = None,
        data: Optional[Dict] = None,
        headers: Optional[Union[CaseInsensitiveDict, Dict[str, str]]] = None,
        js: str = None,
        rendering_wait: int = None,
        rendering_stage: Literal["complete", "domcontentloaded"] = "complete",
        wait_for_selector: Optional[str] = None,
        screenshots:Optional[Dict]=None,
        screenshot_flags: Optional[List[ScreenshotFlag]] = None,
        session_sticky_proxy:Optional[bool] = None,
        webhook:Optional[str] = None,
        timeout:Optional[int] = None, # in milliseconds
        js_scenario:Optional[List] = None,
        extract:Optional[Dict] = None,
        os:Optional[str] = None,
        lang:Optional[List[str]] = None,
        auto_scroll:Optional[bool] = None,
        cost_budget:Optional[int] = None
    ):
        assert(type(url) is str)

        if isinstance(tags, List):
            tags = set(tags)

        cookies = cookies or {}
        headers = headers or {}

        self.cookies = CaseInsensitiveDict(cookies)
        self.headers = CaseInsensitiveDict(headers)
        self.url = url
        self.retry = retry
        self.method = method
        self.country = country
        self.session_sticky_proxy = session_sticky_proxy
        self.render_js = render_js
        self.cache = cache
        self.cache_clear = cache_clear
        self.asp = asp
        self.webhook = webhook
        self.session = session
        self.debug = debug
        self.cache_ttl = cache_ttl
        self.proxy_pool = proxy_pool
        self.tags = tags or set()
        self.format = format
        self.format_options = format_options
        self.extraction_template = extraction_template
        self.extraction_ephemeral_template = extraction_ephemeral_template
        self.extraction_prompt = extraction_prompt
        self.extraction_model = extraction_model        
        self.correlation_id = correlation_id
        self.wait_for_selector = wait_for_selector
        self.body = body
        self.data = data
        self.js = js
        self.rendering_wait = rendering_wait
        self.rendering_stage = rendering_stage
        self.raise_on_upstream_error = raise_on_upstream_error
        self.screenshots = screenshots
        self.screenshot_flags = screenshot_flags
        self.key = None
        self.dns = dns
        self.ssl = ssl
        self.js_scenario = js_scenario
        self.timeout = timeout
        self.extract = extract
        self.lang = lang
        self.os = os
        self.auto_scroll = auto_scroll
        self.cost_budget = cost_budget

        if cookies:
            _cookies = []

            for name, value in cookies.items():
                _cookies.append(name + '=' + value)

            if 'cookie' in self.headers:
                if self.headers['cookie'][-1] != ';':
                    self.headers['cookie'] += ';'
            else:
                self.headers['cookie'] = ''

            self.headers['cookie'] += '; '.join(_cookies)

        if self.body and self.data:
            raise ScrapeConfigError('You cannot pass both parameters body and data. You must choose')

        if method in ['POST', 'PUT', 'PATCH']:
            if self.body is None and self.data is not None:
                if 'content-type' not in self.headers:
                    self.headers['content-type'] = 'application/x-www-form-urlencoded'
                    self.body = urlencode(data)
                else:
                    if self.headers['content-type'].find('application/json') != -1:
                        self.body = json.dumps(data)
                    elif self.headers['content-type'].find('application/x-www-form-urlencoded') != -1:
                        self.body = urlencode(data)
                    else:
                        raise ScrapeConfigError('Content-Type "%s" not supported, use body parameter to pass pre encoded body according to your content type' % self.headers['content-type'])
            elif self.body is None and self.data is None:
                self.headers['content-type'] = 'text/plain'

    def to_api_params(self, key:str) -> Dict:
        params = {
            'key': self.key or key,
            'url': self.url
        }

        if self.country is not None:
            params['country'] = self.country

        for name, value in self.headers.items():
            params['headers[%s]' % name] = value

        if self.webhook is not None:
            params['webhook_name'] = self.webhook

        if self.timeout is not None:
            params['timeout'] = self.timeout

        if self.extract is not None:
            params['extract'] = base64.urlsafe_b64encode(json.dumps(self.extract).encode('utf-8')).decode('utf-8')

        if self.cost_budget is not None:
            params['cost_budget'] = self.cost_budget

        if self.render_js is True:
            params['render_js'] = self._bool_to_http(self.render_js)

            if self.wait_for_selector is not None:
                params['wait_for_selector'] = self.wait_for_selector

            if self.js:
                params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8')

            if self.js_scenario:
                params['js_scenario'] = base64.urlsafe_b64encode(json.dumps(self.js_scenario).encode('utf-8')).decode('utf-8')

            if self.rendering_wait:
                params['rendering_wait'] = self.rendering_wait
            
            if self.rendering_stage:
                params['rendering_stage'] = self.rendering_stage

            if self.screenshots is not None:
                for name, element in self.screenshots.items():
                    params['screenshots[%s]' % name] = element

            if self.screenshot_flags is not None:
                self.screenshot_flags = [ScreenshotFlag(flag) for flag in self.screenshot_flags]
                params["screenshot_flags"] = ",".join(flag.value for flag in self.screenshot_flags)
            else:
                if self.screenshot_flags is not None:
                    logging.warning('Params "screenshot_flags" is ignored. Works only if screenshots is enabled')

            if self.auto_scroll is True:
                params['auto_scroll'] = self._bool_to_http(self.auto_scroll)
        else:
            if self.wait_for_selector is not None:
                logging.warning('Params "wait_for_selector" is ignored. Works only if render_js is enabled')

            if self.screenshots:
                logging.warning('Params "screenshots" is ignored. Works only if render_js is enabled')

            if self.js_scenario:
                logging.warning('Params "js_scenario" is ignored. Works only if render_js is enabled')

            if self.js:
                logging.warning('Params "js" is ignored. Works only if render_js is enabled')

            if self.rendering_wait:
                logging.warning('Params "rendering_wait" is ignored. Works only if render_js is enabled')

        if self.asp is True:
            params['asp'] = self._bool_to_http(self.asp)

        if self.retry is False:
            params['retry'] = self._bool_to_http(self.retry)

        if self.cache is True:
            params['cache'] = self._bool_to_http(self.cache)

            if self.cache_clear is True:
                params['cache_clear'] = self._bool_to_http(self.cache_clear)

            if self.cache_ttl is not None:
                params['cache_ttl'] = self.cache_ttl
        else:
            if self.cache_clear is True:
                logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled')

            if self.cache_ttl is not None:
                logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled')

        if self.dns is True:
            params['dns'] = self._bool_to_http(self.dns)

        if self.ssl is True:
            params['ssl'] = self._bool_to_http(self.ssl)

        if self.tags:
            params['tags'] = ','.join(self.tags)

        if self.format:
            params['format'] = Format(self.format).value
            if self.format_options:
                params['format'] += ':' + ','.join(FormatOption(option).value for option in self.format_options)

        if self.extraction_template and self.extraction_ephemeral_template:
            raise ScrapeConfigError('You cannot pass both parameters extraction_template and extraction_ephemeral_template. You must choose')

        if self.extraction_template:
            params['extraction_template'] = self.extraction_template

        if self.extraction_ephemeral_template:
            self.extraction_ephemeral_template = json.dumps(self.extraction_ephemeral_template)
            params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.extraction_ephemeral_template.encode('utf-8')).decode('utf-8')

        if self.extraction_prompt:
            params['extraction_prompt'] = quote_plus(self.extraction_prompt)

        if self.extraction_model:
            params['extraction_model'] = self.extraction_model

        if self.correlation_id:
            params['correlation_id'] = self.correlation_id

        if self.session:
            params['session'] = self.session

            if self.session_sticky_proxy is True: # false by default
                params['session_sticky_proxy'] = self._bool_to_http(self.session_sticky_proxy)
        else:
            if self.session_sticky_proxy:
                logging.warning('Params "session_sticky_proxy" is ignored. Works only if session is enabled')

        if self.debug is True:
            params['debug'] = self._bool_to_http(self.debug)

        if self.proxy_pool is not None:
            params['proxy_pool'] = self.proxy_pool

        if self.lang is not None:
            params['lang'] = ','.join(self.lang)

        if self.os is not None:
            params['os'] = self.os

        return params

    @staticmethod
    def from_exported_config(config:str) -> 'ScrapeConfig':
        try:
            from msgpack import loads as msgpack_loads
        except ImportError as e:
            print('You must install msgpack package - run: pip install "scrapfly-sdk[seepdup] or pip install msgpack')
            raise

        data = msgpack_loads(base64.b64decode(config))

        headers = {}

        for name, value in data['headers'].items():
            if isinstance(value, Iterable):
                headers[name] = '; '.join(value)
            else:
                headers[name] = value

        return ScrapeConfig(
            url=data['url'],
            retry=data['retry'],
            headers=headers,
            session=data['session'],
            session_sticky_proxy=data['session_sticky_proxy'],
            cache=data['cache'],
            cache_ttl=data['cache_ttl'],
            cache_clear=data['cache_clear'],
            render_js=data['render_js'],
            method=data['method'],
            asp=data['asp'],
            body=data['body'],
            ssl=data['ssl'],
            dns=data['dns'],
            country=data['country'],
            debug=data['debug'],
            correlation_id=data['correlation_id'],
            tags=data['tags'],
            format=data['format'],
            js=data['js'],
            rendering_wait=data['rendering_wait'],
            screenshots=data['screenshots'] or {},
            screenshot_flags=data['screenshot_flags'],
            proxy_pool=data['proxy_pool'],
            auto_scroll=data['auto_scroll'],
            cost_budget=data['cost_budget']
        )

    def to_dict(self) -> Dict:
        """
        Export the ScrapeConfig instance to a plain dictionary. 
        Useful for JSON-serialization or other external storage.
        """
        
        return {
            'url': self.url,
            'retry': self.retry,
            'method': self.method,
            'country': self.country,
            'render_js': self.render_js,
            'cache': self.cache,
            'cache_clear': self.cache_clear,
            'ssl': self.ssl,
            'dns': self.dns,
            'asp': self.asp,
            'debug': self.debug,
            'raise_on_upstream_error': self.raise_on_upstream_error,
            'cache_ttl': self.cache_ttl,
            'proxy_pool': self.proxy_pool,
            'session': self.session,
            'tags': list(self.tags),
            'format': Format(self.format).value if self.format else None,
            'format_options': [FormatOption(option).value for option in self.format_options] if self.format_options else None,
            'extraction_template': self.extraction_template,
            'extraction_ephemeral_template': self.extraction_ephemeral_template,
            'extraction_prompt': self.extraction_prompt,
            'extraction_model': self.extraction_model,
            'correlation_id': self.correlation_id,
            'cookies': CaseInsensitiveDict(self.cookies),
            'body': self.body,
            'data': None if self.body else self.data,
            'headers': CaseInsensitiveDict(self.headers),
            'js': self.js,
            'rendering_wait': self.rendering_wait,
            'wait_for_selector': self.wait_for_selector,
            'session_sticky_proxy': self.session_sticky_proxy,
            'screenshots': self.screenshots,
            'screenshot_flags': [ScreenshotFlag(flag).value for flag in self.screenshot_flags] if self.screenshot_flags else None,
            'webhook': self.webhook,
            'timeout': self.timeout,
            'js_scenario': self.js_scenario,
            'extract': self.extract,
            'lang': self.lang,
            'os': self.os,
            'auto_scroll': self.auto_scroll,
            'cost_budget': self.cost_budget,
        }

    @staticmethod
    def from_dict(scrape_config_dict: Dict) -> 'ScrapeConfig':
        """Create a ScrapeConfig instance from a dictionary."""
        url = scrape_config_dict.get('url', None)
        retry = scrape_config_dict.get('retry', False)
        method = scrape_config_dict.get('method', 'GET')
        country = scrape_config_dict.get('country', None)
        render_js = scrape_config_dict.get('render_js', False)
        cache = scrape_config_dict.get('cache', False)
        cache_clear = scrape_config_dict.get('cache_clear', False)
        ssl = scrape_config_dict.get('ssl', False)
        dns = scrape_config_dict.get('dns', False)
        asp = scrape_config_dict.get('asp', False)
        debug = scrape_config_dict.get('debug', False)
        raise_on_upstream_error = scrape_config_dict.get('raise_on_upstream_error', True)
        cache_ttl = scrape_config_dict.get('cache_ttl', None)
        proxy_pool = scrape_config_dict.get('proxy_pool', None)
        session = scrape_config_dict.get('session', None)
        tags = scrape_config_dict.get('tags', [])

        format = scrape_config_dict.get('format', None)
        format = Format(format) if format else None

        format_options = scrape_config_dict.get('format_options', None)
        format_options = [FormatOption(option) for option in format_options] if format_options else None

        extraction_template = scrape_config_dict.get('extraction_template', None)
        extraction_ephemeral_template = scrape_config_dict.get('extraction_ephemeral_template', None)
        extraction_prompt = scrape_config_dict.get('extraction_prompt', None)
        extraction_model = scrape_config_dict.get('extraction_model', None)
        correlation_id = scrape_config_dict.get('correlation_id', None)
        cookies = scrape_config_dict.get('cookies', {})
        body = scrape_config_dict.get('body', None)
        data = scrape_config_dict.get('data', None)
        headers = scrape_config_dict.get('headers', {})
        js = scrape_config_dict.get('js', None)
        rendering_wait = scrape_config_dict.get('rendering_wait', None)
        wait_for_selector = scrape_config_dict.get('wait_for_selector', None)
        screenshots = scrape_config_dict.get('screenshots', [])
        
        screenshot_flags = scrape_config_dict.get('screenshot_flags', [])
        screenshot_flags = [ScreenshotFlag(flag) for flag in screenshot_flags] if screenshot_flags else None

        session_sticky_proxy = scrape_config_dict.get('session_sticky_proxy', False)
        webhook = scrape_config_dict.get('webhook', None)
        timeout = scrape_config_dict.get('timeout', None)
        js_scenario = scrape_config_dict.get('js_scenario', None)
        extract = scrape_config_dict.get('extract', None)
        os = scrape_config_dict.get('os', None)
        lang = scrape_config_dict.get('lang', None)
        auto_scroll = scrape_config_dict.get('auto_scroll', None)
        cost_budget = scrape_config_dict.get('cost_budget', None)

        return ScrapeConfig(
            url=url,
            retry=retry,
            method=method,
            country=country,
            render_js=render_js,
            cache=cache,
            cache_clear=cache_clear,
            ssl=ssl,
            dns=dns,
            asp=asp,
            debug=debug,
            raise_on_upstream_error=raise_on_upstream_error,
            cache_ttl=cache_ttl,
            proxy_pool=proxy_pool,
            session=session,
            tags=tags,
            format=format,
            format_options=format_options,
            extraction_template=extraction_template,
            extraction_ephemeral_template=extraction_ephemeral_template,
            extraction_prompt=extraction_prompt,
            extraction_model=extraction_model,
            correlation_id=correlation_id,
            cookies=cookies,
            body=body,
            data=data,
            headers=headers,
            js=js,
            rendering_wait=rendering_wait,
            wait_for_selector=wait_for_selector,
            screenshots=screenshots,
            screenshot_flags=screenshot_flags,
            session_sticky_proxy=session_sticky_proxy,
            webhook=webhook,
            timeout=timeout,
            js_scenario=js_scenario,
            extract=extract,
            os=os,
            lang=lang,
            auto_scroll=auto_scroll,
            cost_budget=cost_budget,
        )

Ancestors

Class variables

var PUBLIC_DATACENTER_POOL
var PUBLIC_RESIDENTIAL_POOL
var asp : bool
var auto_scroll : Optional[bool]
var body : Optional[str]
var cache : bool
var cache_clear : bool
var cache_ttl : Optional[int]
var cookies : Optional[requests.structures.CaseInsensitiveDict]
var correlation_id : Optional[str]
var cost_budget : Optional[int]
var country : Optional[str]
var data : Optional[Dict]
var debug : bool
var dns : bool
var extract : Dict
var extraction_ephemeral_template : Optional[Dict]
var extraction_model : Optional[str]
var extraction_prompt : Optional[str]
var extraction_template : Optional[str]
var format : Optional[Format]
var format_options : Optional[List[FormatOption]]
var headers : Optional[requests.structures.CaseInsensitiveDict]
var js : str
var js_scenario : Dict
var lang : Optional[List[str]]
var method : str
var os : Optional[str]
var proxy_pool : Optional[str]
var raise_on_upstream_error : bool
var render_js : bool
var rendering_stage : Literal['complete', 'domcontentloaded']
var rendering_wait : int
var retry : bool
var screenshot_flags : Optional[List[ScreenshotFlag]]
var screenshots : Optional[Dict]
var session : Optional[str]
var session_sticky_proxy : bool
var ssl : bool
var tags : Optional[List[str]]
var timeout : Optional[int]
var url : str
var wait_for_selector : Optional[str]
var webhook : Optional[str]

Static methods

def from_dict(scrape_config_dict: Dict) ‑> ScrapeConfig

Create a ScrapeConfig instance from a dictionary.

def from_exported_config(config: str) ‑> ScrapeConfig

Methods

def to_api_params(self, key: str) ‑> Dict
def to_dict(self) ‑> Dict

Export the ScrapeConfig instance to a plain dictionary. Useful for JSON-serialization or other external storage.

class ScraperAPI
Expand source code
class ScraperAPI:

    MONITORING_DATA_FORMAT_STRUCTURED = 'structured'
    MONITORING_DATA_FORMAT_PROMETHEUS = 'prometheus'

    MONITORING_PERIOD_SUBSCRIPTION = 'subscription'
    MONITORING_PERIOD_LAST_7D = 'last7d'
    MONITORING_PERIOD_LAST_24H = 'last24h'
    MONITORING_PERIOD_LAST_1H = 'last1h'
    MONITORING_PERIOD_LAST_5m = 'last5m'

    MONITORING_ACCOUNT_AGGREGATION = 'account'
    MONITORING_PROJECT_AGGREGATION = 'project'
    MONITORING_TARGET_AGGREGATION = 'target'

Class variables

var MONITORING_ACCOUNT_AGGREGATION
var MONITORING_DATA_FORMAT_PROMETHEUS
var MONITORING_DATA_FORMAT_STRUCTURED
var MONITORING_PERIOD_LAST_1H
var MONITORING_PERIOD_LAST_24H
var MONITORING_PERIOD_LAST_5m
var MONITORING_PERIOD_LAST_7D
var MONITORING_PERIOD_SUBSCRIPTION
var MONITORING_PROJECT_AGGREGATION
var MONITORING_TARGET_AGGREGATION
class ScrapflyAspError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflyAspError(ScraperAPIError):
    pass

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflyClient (key: str, host: Optional[str] = 'https://api.scrapfly.io', verify=True, debug: bool = False, max_concurrency: int = 1, connect_timeout: int = 30, web_scraping_api_read_timeout: int = 160, extraction_api_read_timeout: int = 35, screenshot_api_read_timeout: int = 60, read_timeout: int = 30, default_read_timeout: int = 30, reporter: Optional[Callable] = None, **kwargs)
Expand source code
class ScrapflyClient:

    HOST = 'https://api.scrapfly.io'
    DEFAULT_CONNECT_TIMEOUT = 30
    DEFAULT_READ_TIMEOUT = 30

    DEFAULT_WEBSCRAPING_API_READ_TIMEOUT = 160 # 155 real
    DEFAULT_SCREENSHOT_API_READ_TIMEOUT = 60  # 30 real
    DEFAULT_EXTRACTION_API_READ_TIMEOUT = 35 # 30 real

    host:str
    key:str
    max_concurrency:int
    verify:bool
    debug:bool
    distributed_mode:bool
    connect_timeout:int
    web_scraping_api_read_timeout:int
    screenshot_api_read_timeout:int
    extraction_api_read_timeout:int
    monitoring_api_read_timeout:int
    default_read_timeout:int
    brotli: bool
    reporter:Reporter
    version:str

    # @deprecated
    read_timeout:int

    CONCURRENCY_AUTO = 'auto' # retrieve the allowed concurrency from your account
    DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'

    def __init__(
        self,
        key: str,
        host: Optional[str] = HOST,
        verify=True,
        debug: bool = False,
        max_concurrency:int=1,
        connect_timeout:int = DEFAULT_CONNECT_TIMEOUT,
        web_scraping_api_read_timeout: int = DEFAULT_WEBSCRAPING_API_READ_TIMEOUT,
        extraction_api_read_timeout: int = DEFAULT_EXTRACTION_API_READ_TIMEOUT,
        screenshot_api_read_timeout: int = DEFAULT_SCREENSHOT_API_READ_TIMEOUT,

        # @deprecated
        read_timeout:int = DEFAULT_READ_TIMEOUT,
        default_read_timeout:int = DEFAULT_READ_TIMEOUT,
        reporter:Optional[Callable]=None,
        **kwargs
    ):
        if host[-1] == '/':  # remove last '/' if exists
            host = host[:-1]

        if 'distributed_mode' in kwargs:
            warnings.warn("distributed mode is deprecated and will be remove the next version -"
              " user should handle themself the session name based on the concurrency",
              DeprecationWarning,
              stacklevel=2
            )

        if 'brotli' in kwargs:
            warnings.warn("brotli arg is deprecated and will be remove the next version - "
                "brotli is disabled by default",
                DeprecationWarning,
                stacklevel=2
            )

        self.version = __version__
        self.host = host
        self.key = key
        self.verify = verify
        self.debug = debug
        self.connect_timeout = connect_timeout
        self.web_scraping_api_read_timeout = web_scraping_api_read_timeout
        self.screenshot_api_read_timeout = screenshot_api_read_timeout
        self.extraction_api_read_timeout = extraction_api_read_timeout
        self.monitoring_api_read_timeout = default_read_timeout
        self.default_read_timeout = default_read_timeout

        # @deprecated
        self.read_timeout = default_read_timeout

        self.max_concurrency = max_concurrency
        self.body_handler = ResponseBodyHandler(use_brotli=False)
        self.async_executor = ThreadPoolExecutor()
        self.http_session = None

        if not self.verify and not self.HOST.endswith('.local'):
            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

        if self.debug is True:
            http.client.HTTPConnection.debuglevel = 5

        if reporter is None:
            from .reporter import NoopReporter

            reporter = NoopReporter()

        self.reporter = Reporter(reporter)

    @property
    def ua(self) -> str:
        return 'ScrapflySDK/%s (Python %s, %s, %s)' % (
            self.version,
            platform.python_version(),
            platform.uname().system,
            platform.uname().machine
        )

    @cached_property
    def _http_handler(self):
        return partial(self.http_session.request if self.http_session else requests.request)

    @property
    def http(self):
        return self._http_handler

    def _scrape_request(self, scrape_config:ScrapeConfig):
        return {
            'method': scrape_config.method,
            'url': self.host + '/scrape',
            'data': scrape_config.body,
            'verify': self.verify,
            'timeout': (self.connect_timeout, self.web_scraping_api_read_timeout),
            'headers': {
                'content-type': scrape_config.headers['content-type'] if scrape_config.method in ['POST', 'PUT', 'PATCH'] else self.body_handler.content_type,
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
            'params': scrape_config.to_api_params(key=self.key)
        }
    
    def _screenshot_request(self, screenshot_config:ScreenshotConfig):
        return {
            'method': 'GET',
            'url': self.host + '/screenshot',
            'timeout': (self.connect_timeout, self.screenshot_api_read_timeout),
            'headers': {
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },            
            'params': screenshot_config.to_api_params(key=self.key)
        }        

    def _extraction_request(self, extraction_config:ExtractionConfig):
        headers = {
                'content-type': extraction_config.content_type,
                'accept-encoding': self.body_handler.content_encoding,
                'content-encoding': extraction_config.document_compression_format if extraction_config.document_compression_format else None,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
        }

        if extraction_config.document_compression_format:
            headers['content-encoding'] = extraction_config.document_compression_format.value

        return {
            'method': 'POST',
            'url': self.host + '/extraction',
            'data': extraction_config.body,
            'timeout': (self.connect_timeout, self.extraction_api_read_timeout),
            'headers': headers,
            'params': extraction_config.to_api_params(key=self.key)
        }


    def account(self) -> Union[str, Dict]:
        response = self._http_handler(
            method='GET',
            url=self.host + '/account',
            params={'key': self.key},
            verify=self.verify,
            headers={
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
        )

        response.raise_for_status()

        if self.body_handler.support(response.headers):
            return self.body_handler(response.content, response.headers['content-type'])

        return response.content.decode('utf-8')

    def get_monitoring_metrics(self, format:str=ScraperAPI.MONITORING_DATA_FORMAT_STRUCTURED, period:Optional[str]=None, aggregation:Optional[List[MonitoringAggregation]]=None):
        params = {'key': self.key, 'format': format}

        if period is not None:
            params['period'] = period

        if aggregation is not None:
            params['aggregation'] = ','.join(aggregation)

        response = self._http_handler(
            method='GET',
            url=self.host + '/scrape/monitoring/metrics',
            params=params,
            timeout=(self.connect_timeout, self.monitoring_api_read_timeout),
            verify=self.verify,
            headers={
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
        )

        response.raise_for_status()

        if self.body_handler.support(response.headers):
            return self.body_handler(response.content, response.headers['content-type'])

        return response.content.decode('utf-8')

    def get_monitoring_target_metrics(
            self,
            domain:str,
            group_subdomain:bool=False,
            period:Optional[MonitoringTargetPeriod]=ScraperAPI.MONITORING_PERIOD_LAST_24H,
            start:Optional[datetime.datetime]=None,
            end:Optional[datetime.datetime]=None,
    ):
        params = {
            'key': self.key,
            'domain': domain,
            'group_subdomain': group_subdomain
        }

        if (start is not None and end is None) or (start is None and end is not None):
            raise ValueError('You must provide both start and end date')

        if start is not None and end is not None:
            params['start'] = start.strftime(self.DATETIME_FORMAT)
            params['end'] = end.strftime(self.DATETIME_FORMAT)
            period = None

        params['period'] = period

        response = self._http_handler(
            method='GET',
            url=self.host + '/scrape/monitoring/metrics/target',
            timeout=(self.connect_timeout, self.monitoring_api_read_timeout),
            params=params,
            verify=self.verify,
            headers={
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
        )

        response.raise_for_status()

        if self.body_handler.support(response.headers):
            return self.body_handler(response.content, response.headers['content-type'])

        return response.content.decode('utf-8')


    def resilient_scrape(
        self,
        scrape_config:ScrapeConfig,
        retry_on_errors:Set[Exception]={ScrapflyError},
        retry_on_status_code:Optional[List[int]]=None,
        tries: int = 5,
        delay: int = 20,
    ) -> ScrapeApiResponse:
        assert retry_on_errors is not None, 'Retry on error is None'
        assert isinstance(retry_on_errors, set), 'retry_on_errors is not a set()'

        @backoff.on_exception(backoff.expo, exception=tuple(retry_on_errors), max_tries=tries, max_time=delay)
        def inner() -> ScrapeApiResponse:

            try:
                return self.scrape(scrape_config=scrape_config)
            except (UpstreamHttpClientError, UpstreamHttpServerError) as e:
                if retry_on_status_code is not None and e.api_response:
                    if e.api_response.upstream_status_code in retry_on_status_code:
                        raise e
                    else:
                        return e.api_response

                raise e

        return inner()

    def open(self):
        if self.http_session is None:
            self.http_session = Session()
            self.http_session.verify = self.verify
            self.http_session.timeout = (self.connect_timeout, self.default_read_timeout)
            self.http_session.params['key'] = self.key
            self.http_session.headers['accept-encoding'] = self.body_handler.content_encoding
            self.http_session.headers['accept'] = self.body_handler.accept
            self.http_session.headers['user-agent'] = self.ua

    def close(self):
        self.http_session.close()
        self.http_session = None

    def __enter__(self) -> 'ScrapflyClient':
        self.open()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    async def async_scrape(self, scrape_config:ScrapeConfig, loop:Optional[AbstractEventLoop]=None) -> ScrapeApiResponse:
        if loop is None:
            loop = asyncio.get_running_loop()

        return await loop.run_in_executor(self.async_executor, self.scrape, scrape_config)

    async def concurrent_scrape(self, scrape_configs:List[ScrapeConfig], concurrency:Optional[int]=None):
        if concurrency is None:
            concurrency = self.max_concurrency
        elif concurrency == self.CONCURRENCY_AUTO:
            concurrency = self.account()['subscription']['max_concurrency']

        loop = asyncio.get_running_loop()
        processing_tasks = []
        results = []
        processed_tasks = 0
        expected_tasks = len(scrape_configs)

        def scrape_done_callback(task:Task):
            nonlocal processed_tasks

            try:
                if task.cancelled() is True:
                    return

                error = task.exception()

                if error is not None:
                    results.append(error)
                else:
                    results.append(task.result())
            finally:
                processing_tasks.remove(task)
                processed_tasks += 1

        while scrape_configs or results or processing_tasks:
            logger.info("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))

            if scrape_configs:
                if len(processing_tasks) < concurrency:
                    # @todo handle backpressure
                    for _ in range(0, concurrency - len(processing_tasks)):
                        try:
                            scrape_config = scrape_configs.pop()
                        except:
                            break

                        scrape_config.raise_on_upstream_error = False
                        task = loop.create_task(self.async_scrape(scrape_config=scrape_config, loop=loop))
                        processing_tasks.append(task)
                        task.add_done_callback(scrape_done_callback)

            for _ in results:
                result = results.pop()
                yield result

            await asyncio.sleep(.5)

        logger.debug("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))

    @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
    def scrape(self, scrape_config:ScrapeConfig, no_raise:bool=False) -> ScrapeApiResponse:
        """
        Scrape a website
        :param scrape_config: ScrapeConfig
        :param no_raise: bool - if True, do not raise exception on error while the api response is a ScrapflyError for seamless integration
        :return: ScrapeApiResponse

        If you use no_raise=True, make sure to check the api_response.scrape_result.error attribute to handle the error.
        If the error is not none, you will get the following structure for example

        'error': {
            'code': 'ERR::ASP::SHIELD_PROTECTION_FAILED',
            'message': 'The ASP shield failed to solve the challenge against the anti scrapping protection - heuristic_engine bypass failed, please retry in few seconds',
            'retryable': False,
            'http_code': 422,
            'links': {
                'Checkout ASP documentation': 'https://scrapfly.io/docs/scrape-api/anti-scraping-protection#maximize_success_rate', 'Related Error Doc': 'https://scrapfly.io/docs/scrape-api/error/ERR::ASP::SHIELD_PROTECTION_FAILED'
            }
        }
        """

        try:
            logger.debug('--> %s Scrapping %s' % (scrape_config.method, scrape_config.url))
            request_data = self._scrape_request(scrape_config=scrape_config)
            response = self._http_handler(**request_data)
            scrape_api_response = self._handle_response(response=response, scrape_config=scrape_config)

            self.reporter.report(scrape_api_response=scrape_api_response)

            return scrape_api_response
        except BaseException as e:
            self.reporter.report(error=e)

            if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
                return e.api_response

            raise e

    async def async_screenshot(self, screenshot_config:ScreenshotConfig, loop:Optional[AbstractEventLoop]=None) -> ScreenshotApiResponse:
        if loop is None:
            loop = asyncio.get_running_loop()

        return await loop.run_in_executor(self.async_executor, self.screenshot, screenshot_config)

    @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
    def screenshot(self, screenshot_config:ScreenshotConfig, no_raise:bool=False) -> ScreenshotApiResponse:
        """
        Take a screenshot
        :param screenshot_config: ScrapeConfig
        :param no_raise: bool - if True, do not raise exception on error while the screenshot api response is a ScrapflyError for seamless integration
        :return: str

        If you use no_raise=True, make sure to check the screenshot_api_response.error attribute to handle the error.
        If the error is not none, you will get the following structure for example

        'error': {
            'code': 'ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT',
            'message': 'For some reason we were unable to take the screenshot',
            'http_code': 422,
            'links': {
                'Checkout the related doc: https://scrapfly.io/docs/screenshot-api/error/ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT'
            }
        }
        """

        try:
            logger.debug('--> %s Screenshoting' % (screenshot_config.url))
            request_data = self._screenshot_request(screenshot_config=screenshot_config)
            response = self._http_handler(**request_data)
            screenshot_api_response = self._handle_screenshot_response(response=response, screenshot_config=screenshot_config)
            return screenshot_api_response
        except BaseException as e:
            self.reporter.report(error=e)

            if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
                return e.api_response

            raise e

    async def async_extraction(self, extraction_config:ExtractionConfig, loop:Optional[AbstractEventLoop]=None) -> ExtractionApiResponse:
        if loop is None:
            loop = asyncio.get_running_loop()

        return await loop.run_in_executor(self.async_executor, self.extract, extraction_config)

    @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
    def extract(self, extraction_config:ExtractionConfig, no_raise:bool=False) -> ExtractionApiResponse:
        """
        Extract structured data from text content
        :param extraction_config: ExtractionConfig
        :param no_raise: bool - if True, do not raise exception on error while the extraction api response is a ScrapflyError for seamless integration
        :return: str

        If you use no_raise=True, make sure to check the extraction_api_response.error attribute to handle the error.
        If the error is not none, you will get the following structure for example

        'error': {
            'code': 'ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED',
            'message': 'The content type of the response is not supported for extraction',
            'http_code': 422,
            'links': {
                'Checkout the related doc: https://scrapfly.io/docs/extraction-api/error/ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED'
            }
        }
        """

        try:
            logger.debug('--> %s Extracting data from' % (extraction_config.content_type))
            request_data = self._extraction_request(extraction_config=extraction_config)
            response = self._http_handler(**request_data)
            extraction_api_response = self._handle_extraction_response(response=response, extraction_config=extraction_config)
            return extraction_api_response
        except BaseException as e:
            self.reporter.report(error=e)

            if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
                return e.api_response

            raise e

    def _handle_response(self, response:Response, scrape_config:ScrapeConfig) -> ScrapeApiResponse:
        try:
            api_response = self._handle_api_response(
                response=response,
                scrape_config=scrape_config,
                raise_on_upstream_error=scrape_config.raise_on_upstream_error
            )

            if scrape_config.method == 'HEAD':
                logger.debug('<-- [%s %s] %s | %ss' % (
                    api_response.response.status_code,
                    api_response.response.reason,
                    api_response.response.request.url,
                    0
                ))
            else:
                logger.debug('<-- [%s %s] %s | %ss' % (
                    api_response.result['result']['status_code'],
                    api_response.result['result']['reason'],
                    api_response.result['config']['url'],
                    api_response.result['result']['duration'])
                )

                logger.debug('Log url: %s' % api_response.result['result']['log_url'])

            return api_response
        except UpstreamHttpError as e:
            logger.critical(e.api_response.error_message)
            raise
        except HttpError as e:
            if e.api_response is not None:
                logger.critical(e.api_response.error_message)
            else:
                logger.critical(e.message)
            raise
        except ScrapflyError as e:
            logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url))
            raise

    def _handle_screenshot_response(self, response:Response, screenshot_config:ScreenshotConfig) -> ScreenshotApiResponse:    
        try:
            api_response = self._handle_screenshot_api_response(
                response=response,
                screenshot_config=screenshot_config,
                raise_on_upstream_error=screenshot_config.raise_on_upstream_error
            )
            return api_response
        except UpstreamHttpError as e:
            logger.critical(e.api_response.error_message)
            raise
        except HttpError as e:
            if e.api_response is not None:
                logger.critical(e.api_response.error_message)
            else:
                logger.critical(e.message)
            raise
        except ScrapflyError as e:
            logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url))
            raise         

    def _handle_extraction_response(self, response:Response, extraction_config:ExtractionConfig) -> ExtractionApiResponse:
        try:
            api_response = self._handle_extraction_api_response(
                response=response,
                extraction_config=extraction_config,
                raise_on_upstream_error=extraction_config.raise_on_upstream_error
            )
            return api_response
        except UpstreamHttpError as e:
            logger.critical(e.api_response.error_message)
            raise
        except HttpError as e:
            if e.api_response is not None:
                logger.critical(e.api_response.error_message)
            else:
                logger.critical(e.message)
            raise
        except ScrapflyError as e:
            logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url))
            raise    

    def save_screenshot(self, screenshot_api_response:ScreenshotApiResponse, name:str, path:Optional[str]=None):
        """
        Save a screenshot from a screenshot API response
        :param api_response: ScreenshotApiResponse
        :param name: str - name of the screenshot to save as
        :param path: Optional[str]
        """

        if screenshot_api_response.screenshot_success is not True:
            raise RuntimeError('Screenshot was not successful')

        if not screenshot_api_response.image:
            raise RuntimeError('Screenshot binary does not exist')

        content = screenshot_api_response.image
        extension_name = screenshot_api_response.metadata['extension_name']

        if path:
            os.makedirs(path, exist_ok=True)
            file_path = os.path.join(path, f'{name}.{extension_name}')
        else:
            file_path = f'{name}.{extension_name}'

        if isinstance(content, bytes):
            content = BytesIO(content)

        with open(file_path, 'wb') as f:
            shutil.copyfileobj(content, f, length=131072)

    def save_scrape_screenshot(self, api_response:ScrapeApiResponse, name:str, path:Optional[str]=None):
        """
        Save a screenshot from a scrape result
        :param api_response: ScrapeApiResponse
        :param name: str - name of the screenshot given in the scrape config
        :param path: Optional[str]
        """

        if not api_response.scrape_result['screenshots']:
            raise RuntimeError('Screenshot %s do no exists' % name)

        try:
            api_response.scrape_result['screenshots'][name]
        except KeyError:
            raise RuntimeError('Screenshot %s do no exists' % name)

        screenshot_response = self._http_handler(
            method='GET',
            url=api_response.scrape_result['screenshots'][name]['url'],
            params={'key': self.key},
            verify=self.verify
        )

        screenshot_response.raise_for_status()

        if not name.endswith('.jpg'):
            name += '.jpg'

        api_response.sink(path=path, name=name, content=screenshot_response.content)

    def sink(self, api_response:ScrapeApiResponse, content:Optional[Union[str, bytes]]=None, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None) -> str:
        scrape_result = api_response.result['result']
        scrape_config = api_response.result['config']

        file_content = content or scrape_result['content']
        file_path = None
        file_extension = None

        if name:
            name_parts = name.split('.')
            if len(name_parts) > 1:
                file_extension = name_parts[-1]

        if not file:
            if file_extension is None:
                try:
                    mime_type = scrape_result['response_headers']['content-type']
                except KeyError:
                    mime_type = 'application/octet-stream'

                if ';' in mime_type:
                    mime_type = mime_type.split(';')[0]

                file_extension = '.' + mime_type.split('/')[1]

            if not name:
                name = scrape_config['url'].split('/')[-1]

            if name.find(file_extension) == -1:
                name += file_extension

            file_path = path + '/' + name if path else name

            if file_path == file_extension:
                url = re.sub(r'(https|http)?://', '', api_response.config['url']).replace('/', '-')

                if url[-1] == '-':
                    url = url[:-1]

                url += file_extension

                file_path = url

            file = open(file_path, 'wb')

        if isinstance(file_content, str):
            file_content = BytesIO(file_content.encode('utf-8'))
        elif isinstance(file_content, bytes):
            file_content = BytesIO(file_content)

        file_content.seek(0)
        with file as f:
            shutil.copyfileobj(file_content, f, length=131072)

        logger.info('file %s created' % file_path)
        return file_path

    def _handle_scrape_large_objects(
        self,
        callback_url:str,
        format: Literal['clob', 'blob']
    ) -> Tuple[Union[BytesIO, str], str]:
        if format not in ['clob', 'blob']:
            raise ContentError('Large objects handle can handles format format [blob, clob], given: %s' % format)

        response = self._http_handler(**{
            'method': 'GET',
            'url': callback_url,
            'verify': self.verify,
            'timeout': (self.connect_timeout, self.default_read_timeout),
            'headers': {
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
            'params': {'key': self.key}
        })

        if self.body_handler.support(headers=response.headers):
            content = self.body_handler(content=response.content, content_type=response.headers['content-type'])
        else:
            content = response.content

        if format == 'clob':
            return content.decode('utf-8'), 'text'

        return BytesIO(content), 'binary'

    def _handle_api_response(
        self,
        response: Response,
        scrape_config:ScrapeConfig,
        raise_on_upstream_error: Optional[bool] = True
    ) -> ScrapeApiResponse:

        if scrape_config.method == 'HEAD':
            body = None
        else:
            if self.body_handler.support(headers=response.headers):
                body = self.body_handler(content=response.content, content_type=response.headers['content-type'])
            else:
                body = response.content.decode('utf-8')

        api_response:ScrapeApiResponse = ScrapeApiResponse(
            response=response,
            request=response.request,
            api_result=body,
            scrape_config=scrape_config,
            large_object_handler=self._handle_scrape_large_objects
        )

        api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error)

        return api_response

    def _handle_screenshot_api_response(
        self,
        response: Response,
        screenshot_config:ScreenshotConfig,
        raise_on_upstream_error: Optional[bool] = True
    ) -> ScreenshotApiResponse:

        if self.body_handler.support(headers=response.headers):
            body = self.body_handler(content=response.content, content_type=response.headers['content-type'])
        else:
            body = {'result': response.content}

        api_response:ScreenshotApiResponse = ScreenshotApiResponse(
            response=response,
            request=response.request,
            api_result=body,
            screenshot_config=screenshot_config
        )

        api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error)

        return api_response

    def _handle_extraction_api_response(
        self,
        response: Response,
        extraction_config:ExtractionConfig,
        raise_on_upstream_error: Optional[bool] = True
    ) -> ExtractionApiResponse:
        
        if self.body_handler.support(headers=response.headers):
            body = self.body_handler(content=response.content, content_type=response.headers['content-type'])
        else:
            body = response.content.decode('utf-8')

        api_response:ExtractionApiResponse = ExtractionApiResponse(
            response=response,
            request=response.request,
            api_result=body,
            extraction_config=extraction_config
        )

        api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error)

        return api_response

Class variables

var CONCURRENCY_AUTO
var DATETIME_FORMAT
var DEFAULT_CONNECT_TIMEOUT
var DEFAULT_EXTRACTION_API_READ_TIMEOUT
var DEFAULT_READ_TIMEOUT
var DEFAULT_SCREENSHOT_API_READ_TIMEOUT
var DEFAULT_WEBSCRAPING_API_READ_TIMEOUT
var HOST
var brotli : bool
var connect_timeout : int
var debug : bool
var default_read_timeout : int
var distributed_mode : bool
var extraction_api_read_timeout : int
var host : str
var key : str
var max_concurrency : int
var monitoring_api_read_timeout : int
var read_timeout : int
var reporter : scrapfly.reporter.Reporter
var screenshot_api_read_timeout : int
var verify : bool
var version : str
var web_scraping_api_read_timeout : int

Instance variables

prop http
Expand source code
@property
def http(self):
    return self._http_handler
prop ua : str
Expand source code
@property
def ua(self) -> str:
    return 'ScrapflySDK/%s (Python %s, %s, %s)' % (
        self.version,
        platform.python_version(),
        platform.uname().system,
        platform.uname().machine
    )

Methods

def account(self) ‑> Union[str, Dict]
async def async_extraction(self, extraction_config: ExtractionConfig, loop: Optional[asyncio.events.AbstractEventLoop] = None) ‑> ExtractionApiResponse
async def async_scrape(self, scrape_config: ScrapeConfig, loop: Optional[asyncio.events.AbstractEventLoop] = None) ‑> ScrapeApiResponse
async def async_screenshot(self, screenshot_config: ScreenshotConfig, loop: Optional[asyncio.events.AbstractEventLoop] = None) ‑> ScreenshotApiResponse
def close(self)
async def concurrent_scrape(self, scrape_configs: List[ScrapeConfig], concurrency: Optional[int] = None)
def extract(self, extraction_config: ExtractionConfig, no_raise: bool = False) ‑> ExtractionApiResponse

Extract structured data from text content :param extraction_config: ExtractionConfig :param no_raise: bool - if True, do not raise exception on error while the extraction api response is a ScrapflyError for seamless integration :return: str

If you use no_raise=True, make sure to check the extraction_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example

'error': { 'code': 'ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED', 'message': 'The content type of the response is not supported for extraction', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/extraction-api/error/ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED' } }

def get_monitoring_metrics(self, format: str = 'structured', period: Optional[str] = None, aggregation: Optional[List[Literal['account', 'project', 'target']]] = None)
def get_monitoring_target_metrics(self, domain: str, group_subdomain: bool = False, period: Optional[Literal['subscription', 'last7d', 'last24h', 'last1h', 'last5m']] = 'last24h', start: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None)
def open(self)
def resilient_scrape(self, scrape_config: ScrapeConfig, retry_on_errors: Set[Exception] = {<class 'scrapfly.errors.ScrapflyError'>}, retry_on_status_code: Optional[List[int]] = None, tries: int = 5, delay: int = 20) ‑> ScrapeApiResponse
def save_scrape_screenshot(self, api_response: ScrapeApiResponse, name: str, path: Optional[str] = None)

Save a screenshot from a scrape result :param api_response: ScrapeApiResponse :param name: str - name of the screenshot given in the scrape config :param path: Optional[str]

def save_screenshot(self, screenshot_api_response: ScreenshotApiResponse, name: str, path: Optional[str] = None)

Save a screenshot from a screenshot API response :param api_response: ScreenshotApiResponse :param name: str - name of the screenshot to save as :param path: Optional[str]

def scrape(self, scrape_config: ScrapeConfig, no_raise: bool = False) ‑> ScrapeApiResponse

Scrape a website :param scrape_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the api response is a ScrapflyError for seamless integration :return: ScrapeApiResponse

If you use no_raise=True, make sure to check the api_response.scrape_result.error attribute to handle the error. If the error is not none, you will get the following structure for example

'error': { 'code': 'ERR::ASP::SHIELD_PROTECTION_FAILED', 'message': 'The ASP shield failed to solve the challenge against the anti scrapping protection - heuristic_engine bypass failed, please retry in few seconds', 'retryable': False, 'http_code': 422, 'links': { 'Checkout ASP documentation': 'https://scrapfly.io/docs/scrape-api/anti-scraping-protection#maximize_success_rate', 'Related Error Doc': 'https://scrapfly.io/docs/scrape-api/error/ERR::ASP::SHIELD_PROTECTION_FAILED' } }

def screenshot(self, screenshot_config: ScreenshotConfig, no_raise: bool = False) ‑> ScreenshotApiResponse

Take a screenshot :param screenshot_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the screenshot api response is a ScrapflyError for seamless integration :return: str

If you use no_raise=True, make sure to check the screenshot_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example

'error': { 'code': 'ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT', 'message': 'For some reason we were unable to take the screenshot', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/screenshot-api/error/ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT' } }

def sink(self, api_response: ScrapeApiResponse, content: Union[str, bytes, ForwardRef(None)] = None, path: Optional[str] = None, name: Optional[str] = None, file: Union[TextIO, _io.BytesIO, ForwardRef(None)] = None) ‑> str
class ScrapflyError (message: str, code: str, http_status_code: int, resource: Optional[str] = None, is_retryable: bool = False, retry_delay: Optional[int] = None, retry_times: Optional[int] = None, documentation_url: Optional[str] = None, api_response: Optional[ForwardRef('ApiResponse')] = None)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflyError(Exception):
    KIND_HTTP_BAD_RESPONSE = 'HTTP_BAD_RESPONSE'
    KIND_SCRAPFLY_ERROR = 'SCRAPFLY_ERROR'

    RESOURCE_PROXY = 'PROXY'
    RESOURCE_THROTTLE = 'THROTTLE'
    RESOURCE_SCRAPE = 'SCRAPE'
    RESOURCE_ASP = 'ASP'
    RESOURCE_SCHEDULE = 'SCHEDULE'
    RESOURCE_WEBHOOK = 'WEBHOOK'
    RESOURCE_SESSION = 'SESSION'

    def __init__(
        self,
        message: str,
        code: str,
        http_status_code: int,
        resource: Optional[str]=None,
        is_retryable: bool = False,
        retry_delay: Optional[int] = None,
        retry_times: Optional[int] = None,
        documentation_url: Optional[str] = None,
        api_response: Optional['ApiResponse'] = None
    ):
        self.message = message
        self.code = code
        self.retry_delay = retry_delay
        self.retry_times = retry_times
        self.resource = resource
        self.is_retryable = is_retryable
        self.documentation_url = documentation_url
        self.api_response = api_response
        self.http_status_code = http_status_code

        super().__init__(self.message, str(self.code))

    def __str__(self):
        message = self.message

        if self.documentation_url is not None:
            message += '. Learn more: %s' % self.documentation_url

        return message

Ancestors

  • builtins.Exception
  • builtins.BaseException

Subclasses

  • scrapfly.errors.ExtraUsageForbidden
  • scrapfly.errors.HttpError

Class variables

var KIND_HTTP_BAD_RESPONSE
var KIND_SCRAPFLY_ERROR
var RESOURCE_ASP
var RESOURCE_PROXY
var RESOURCE_SCHEDULE
var RESOURCE_SCRAPE
var RESOURCE_SESSION
var RESOURCE_THROTTLE
var RESOURCE_WEBHOOK
class ScrapflyProxyError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflyProxyError(ScraperAPIError):
    pass

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflyScheduleError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflyScheduleError(ScraperAPIError):
    pass

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflyScrapeError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflyScrapeError(ScraperAPIError):
    pass

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflySessionError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflySessionError(ScraperAPIError):
    pass

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflyThrottleError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflyThrottleError(ScraperAPIError):
    pass

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflyWebhookError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflyWebhookError(ScraperAPIError):
    pass

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScreenshotAPIError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScreenshotAPIError(HttpError):
    pass

Ancestors

  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScreenshotApiResponse (request: requests.models.Request, response: requests.models.Response, screenshot_config: ScreenshotConfig, api_result: Optional[bytes] = None)
Expand source code
class ScreenshotApiResponse(ApiResponse):
    def __init__(self, request: Request, response: Response, screenshot_config: ScreenshotConfig, api_result: Optional[bytes] = None):
        super().__init__(request, response)
        self.screenshot_config = screenshot_config
        self.result = self.handle_api_result(api_result)

    @property
    def image(self) -> Optional[str]:
        binary = self.result.get('result', None)
        if binary is None:
            return ''

        return binary

    @property
    def metadata(self) -> Optional[Dict]:
        if not self.image:
            return {}

        content_type = self.response.headers.get('content-type')
        extension_name = content_type[content_type.find('/') + 1:].split(';')[0]

        return {
            'extension_name': extension_name,
            'upstream-status-code': self.response.headers.get('X-Scrapfly-Upstream-Http-Code'),
            'upstream-url': self.response.headers.get('X-Scrapfly-Upstream-Url')
        }

    @property
    def screenshot_success(self) -> bool:
        if not self.image:
            return False

        return True

    @property
    def error(self) -> Optional[Dict]:
        if self.image:
            return None

        if self.screenshot_success is False:
            return self.result

    def _is_api_error(self, api_result: Dict) -> bool:
        if api_result is None:
            return True

        return 'error_id' in api_result

    def handle_api_result(self, api_result: bytes) -> FrozenDict:
        if self._is_api_error(api_result=api_result) is True:
            return FrozenDict(api_result)

        return api_result

    def raise_for_result(self, raise_on_upstream_error=True, error_class=ScreenshotAPIError):
        super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)

Ancestors

Instance variables

prop error : Optional[Dict]
Expand source code
@property
def error(self) -> Optional[Dict]:
    if self.image:
        return None

    if self.screenshot_success is False:
        return self.result
prop image : Optional[str]
Expand source code
@property
def image(self) -> Optional[str]:
    binary = self.result.get('result', None)
    if binary is None:
        return ''

    return binary
prop metadata : Optional[Dict]
Expand source code
@property
def metadata(self) -> Optional[Dict]:
    if not self.image:
        return {}

    content_type = self.response.headers.get('content-type')
    extension_name = content_type[content_type.find('/') + 1:].split(';')[0]

    return {
        'extension_name': extension_name,
        'upstream-status-code': self.response.headers.get('X-Scrapfly-Upstream-Http-Code'),
        'upstream-url': self.response.headers.get('X-Scrapfly-Upstream-Url')
    }
prop screenshot_success : bool
Expand source code
@property
def screenshot_success(self) -> bool:
    if not self.image:
        return False

    return True

Methods

def handle_api_result(self, api_result: bytes) ‑> FrozenDict
def raise_for_result(self, raise_on_upstream_error=True, error_class=scrapfly.errors.ScreenshotAPIError)

Inherited members

class ScreenshotConfig (url: str, format: Optional[Format] = None, capture: Optional[str] = None, resolution: Optional[str] = None, country: Optional[str] = None, timeout: Optional[int] = None, rendering_wait: Optional[int] = None, wait_for_selector: Optional[str] = None, options: Optional[List[Options]] = None, auto_scroll: Optional[bool] = None, js: Optional[str] = None, cache: Optional[bool] = None, cache_ttl: Optional[bool] = None, cache_clear: Optional[bool] = None, vision_deficiency: Optional[VisionDeficiency] = None, webhook: Optional[str] = None, raise_on_upstream_error: bool = True)
Expand source code
class ScreenshotConfig(BaseApiConfig):
    url: str
    format: Optional[Format] = None
    capture: Optional[str] = None
    resolution: Optional[str] = None
    country: Optional[str] = None
    timeout: Optional[int] = None # in milliseconds
    rendering_wait: Optional[int] = None # in milliseconds
    wait_for_selector: Optional[str] = None
    options: Optional[List[Options]] = None
    auto_scroll: Optional[bool] = None
    js: Optional[str] = None
    cache: Optional[bool] = None
    cache_ttl: Optional[bool] = None
    cache_clear: Optional[bool] = None
    webhook: Optional[str] = None
    raise_on_upstream_error: bool = True

    def __init__(
        self,
        url: str,
        format: Optional[Format] = None,
        capture: Optional[str] = None,
        resolution: Optional[str] = None,
        country: Optional[str] = None,
        timeout: Optional[int] = None, # in milliseconds
        rendering_wait: Optional[int] = None, # in milliseconds
        wait_for_selector: Optional[str] = None,
        options: Optional[List[Options]] = None,
        auto_scroll: Optional[bool] = None,
        js: Optional[str] = None,
        cache: Optional[bool] = None,
        cache_ttl: Optional[bool] = None,
        cache_clear: Optional[bool] = None,
        vision_deficiency: Optional[VisionDeficiency] = None,
        webhook: Optional[str] = None,
        raise_on_upstream_error: bool = True
    ):
        assert(type(url) is str)

        self.url = url
        self.key = None
        self.format = format
        self.capture = capture
        self.resolution = resolution
        self.country = country
        self.timeout = timeout
        self.rendering_wait = rendering_wait
        self.wait_for_selector = wait_for_selector
        self.options = [Options(flag) for flag in options] if options else None
        self.auto_scroll = auto_scroll
        self.js = js
        self.cache = cache
        self.cache_ttl = cache_ttl
        self.cache_clear = cache_clear
        self.vision_deficiency = vision_deficiency
        self.webhook = webhook
        self.raise_on_upstream_error = raise_on_upstream_error

    def to_api_params(self, key:str) -> Dict:
        params = {
            'key': self.key or key,
            'url': self.url
        }

        if self.format:
            params['format'] = Format(self.format).value

        if self.capture:
            params['capture'] = self.capture

        if self.resolution:
            params['resolution'] = self.resolution

        if self.country is not None:
            params['country'] = self.country

        if self.timeout is not None:
            params['timeout'] = self.timeout

        if self.rendering_wait is not None:
            params['rendering_wait'] = self.rendering_wait

        if self.wait_for_selector is not None:
            params['wait_for_selector'] = self.wait_for_selector            

        if self.options is not None:
            params["options"] = ",".join(flag.value for flag in self.options)

        if self.auto_scroll is not None:
            params['auto_scroll'] = self._bool_to_http(self.auto_scroll)

        if self.js:
            params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8')

        if self.cache is not None:
            params['cache'] = self._bool_to_http(self.cache)
            
            if self.cache_ttl is not None:
                params['cache_ttl'] = self._bool_to_http(self.cache_ttl)

            if self.cache_clear is not None:
                params['cache_clear'] = self._bool_to_http(self.cache_clear)

        else:
            if self.cache_ttl is not None:
                logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled')

            if self.cache_clear is not None:
                logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled')

        if self.vision_deficiency is not None:
            params['vision_deficiency'] = self.vision_deficiency.value

        if self.webhook is not None:
            params['webhook_name'] = self.webhook

        return params

    def to_dict(self) -> Dict:
        """
        Export the ScreenshotConfig instance to a plain dictionary.
        """
        return {
            'url': self.url,
            'format': Format(self.format).value if self.format else None,
            'capture': self.capture,
            'resolution': self.resolution,
            'country': self.country,
            'timeout': self.timeout,
            'rendering_wait': self.rendering_wait,
            'wait_for_selector': self.wait_for_selector,
            'options': [Options(option).value for option in self.options] if self.options else None,
            'auto_scroll': self.auto_scroll,
            'js': self.js,
            'cache': self.cache,
            'cache_ttl': self.cache_ttl,
            'cache_clear': self.cache_clear,
            'vision_deficiency': self.vision_deficiency.value if self.vision_deficiency else None,
            'webhook': self.webhook,
            'raise_on_upstream_error': self.raise_on_upstream_error
        }
    
    @staticmethod
    def from_dict(screenshot_config_dict: Dict) -> 'ScreenshotConfig':
        """Create a ScreenshotConfig instance from a dictionary."""
        url = screenshot_config_dict.get('url', None)

        format = screenshot_config_dict.get('format', None)
        format = Format(format) if format else None

        capture = screenshot_config_dict.get('capture', None)
        resolution = screenshot_config_dict.get('resolution', None)
        country = screenshot_config_dict.get('country', None)
        timeout = screenshot_config_dict.get('timeout', None)
        rendering_wait = screenshot_config_dict.get('rendering_wait', None)
        wait_for_selector = screenshot_config_dict.get('wait_for_selector', None)

        options = screenshot_config_dict.get('options', None)
        options = [Options(option) for option in options] if options else None

        auto_scroll = screenshot_config_dict.get('auto_scroll', None)
        js = screenshot_config_dict.get('js', None)
        cache = screenshot_config_dict.get('cache', None)
        cache_ttl = screenshot_config_dict.get('cache_ttl', None)
        cache_clear = screenshot_config_dict.get('cache_clear', None)
        vision_deficiency = screenshot_config_dict.get('vision_deficiency', None)
        webhook = screenshot_config_dict.get('webhook', None)
        raise_on_upstream_error = screenshot_config_dict.get('raise_on_upstream_error', True)

        return ScreenshotConfig(
            url=url,
            format=format,
            capture=capture,
            resolution=resolution,
            country=country,
            timeout=timeout,
            rendering_wait=rendering_wait,
            wait_for_selector=wait_for_selector,
            options=options,
            auto_scroll=auto_scroll,
            js=js,
            cache=cache,
            cache_ttl=cache_ttl,
            cache_clear=cache_clear,
            vision_deficiency=vision_deficiency,
            webhook=webhook,
            raise_on_upstream_error=raise_on_upstream_error
        )

Ancestors

Class variables

var auto_scroll : Optional[bool]
var cache : Optional[bool]
var cache_clear : Optional[bool]
var cache_ttl : Optional[bool]
var capture : Optional[str]
var country : Optional[str]
var format : Optional[Format]
var js : Optional[str]
var options : Optional[List[Options]]
var raise_on_upstream_error : bool
var rendering_wait : Optional[int]
var resolution : Optional[str]
var timeout : Optional[int]
var url : str
var wait_for_selector : Optional[str]
var webhook : Optional[str]

Static methods

def from_dict(screenshot_config_dict: Dict) ‑> ScreenshotConfig

Create a ScreenshotConfig instance from a dictionary.

Methods

def to_api_params(self, key: str) ‑> Dict
def to_dict(self) ‑> Dict

Export the ScreenshotConfig instance to a plain dictionary.

class UpstreamHttpClientError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class UpstreamHttpClientError(UpstreamHttpError):
    pass

Ancestors

  • scrapfly.errors.UpstreamHttpError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Subclasses

class UpstreamHttpError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class UpstreamHttpError(HttpError):
    pass

Ancestors

  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Subclasses

class UpstreamHttpServerError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class UpstreamHttpServerError(UpstreamHttpClientError):
    pass

Ancestors