Data Engineering Zoomcamp FAQ
Data Engineering Zoomcamp FAQ
The exact day and hour of the course will be 15th Jan 2023 at 18h00. The course will start with
the first “Office Hours” live.
● Subscribe to course public Google Calendar (it works from Desktop only).flow
● Register for the course before the course starts using this link compu.
● Join the course Telegram channel with announcements.
● Don’t forget to register in DataTalks.Club's Slack and join the #course-data-engineering
channel.
What is the video/zoom link to the stream for the first “Office
Hour”?
It should be posted in the announcements channel on Telegram before it begins. Also, you will
see it live on the DataTalksClub YouTube Channel.
Look over the prerequisites and syllabus to see if you are comfortable with these subjects.
🙂
For 2023, the main difference is the orchestration tool — we will use Prefect and not Airflow.
And new homeworks
● we will again have a different orchestrator for the 2nd module - it'll be mage instead
of prefect
● terraform videos might be re-recorded
Note that to sign up for a free GCP account, you need to have a valid credit card.
The GCP and other cloud providers are not available in some
countries. Is it possible to provide a guide to installing a home
lab?
You can do most of the course without a cloud. Almost everything we use (excluding BigQuery)
can be run locally. We won’t be able to provide guidelines for some things, but most of the
materials are runnable without GCP.
For everything in the course, there’s a local alternative. You could even do the whole course
locally.
Are we still using the NYC Trip data for January 2021 or are we
using the 2022 data?
We will use the same data, as the project will essentially remain the same as last year’s. The
data Random is available here
・Workflow orchestration
・Data Warehousing
・Analytical engineering
・Batch processing
・Stream processing
To complete the course, you'll build your data engineering pipeline
from scratch using the knowledge acquired during the course.
Yes, you can use any tool you want for your project.
The course covers 2 alternative data stacks, one using GCP and one using local installation of
everything. You can use either one of those, or use your tool of choice.
You do need to take in consideration that we can’t support you if you choose to use a different
stack, also you would need to explain the different choices of tool for the peer review of your
capstone project.
https://docs.google.com/document/d/1Bfp-K2hIovkETjeGsJOKl8Zo2dVyHY6SXIHyV5rkE0w
1. What does the error say? There will often be a description of the error or instructions on
what is needed, I have even seen a link to the solution. Does it reference a specific line
of your code?
2. Restart the application or server/pc.
3. Google it. It is going to be rare that you are the first to have the problem, someone out
there has posted the issue and likely the solution. Search using: <technology> <problem
statement>. Example: pgcli error column c.relhasoids does not exist.
a. There are often different solutions for the same problem due to variation in
environments.
4. Check the tech’s documentation. Use its search if available or use the browsers search
function. doc
5. Try uninstall (this may remove the bad actor) and reinstall of application or
reimplementation of action. Don’t forget to restart the server/pc for reinstalls.
a. Sometimes reinstalling fails to resolve the issue but works if you uninstall first.
6. Post your question to Stackoverflow. Be sure to read the Stackoverflow guide on posting
good questions.
a. https://stackoverflow.com/help/how-to-ask
b. This will be your real life ask an expert in the future (in addition to coworkers).
7. Ask in Slack
a. Before asking a question, check the FAQ (this document)
b. DO NOT use screenshots, especially don’t take pictures from a phone.
c. DO NOT tag instructors, it may discourage others from helping you.Copy and
past errors; if it’s long, just post it in a reply to your thread.
i. Use ``` for formatting your code.
d. Use the same thread for the conversation (that means reply to your own thread).
i. DO NOT create multiple posts to discuss the issue.
ii. You may create a new post if the issue reemerges down the road. Be
sure to describe what has changed in the environment.
e. Provide additional information in the same thread of the steps you have taken for
resolution.
8. Take a break and come back to it later. You will be amazed at how often you figure out
the solution after letting your brain rest. Get some fresh air, workout, play a video game,
watch a tv show, whatever allows your brain to not think about it for a little while or even
until the next day.
9. Remember technology issues in real life sometimes take days or even weeks to resolve.
10. If somebody helped you with your problem and it's not in the FAQ, please add it there.
It will help other students.
Week 1
Taxi Data - Yellow Taxi Trip Records downloading error, Error 403
or XML error webpage
When you try to download the 2021 data from TLC website, you get this error:
If you click on the link, and ERROR 403: Forbidden on the terminal.
Note: Make sure to unzip the “gz” file (no, the “unzip” command won’t work for this.)
“gzip -d file.gz”
Taxi Data - How to handle taxi data files, now that the files are
available as *.csv.gz?
The pandas read_csv function can read csv.gz files directly. So no need to change anything in
the script.
Green Trips:
https://www1.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_green.pdf
Having this local repository on your computer will make it easy for you to access the instructors’
code and make pull requests (if you want to add your own notes or make changes to the course
content).
You will probably also create your own repositories that host your notes, versions of your file, to
do this. Here is a great tutorial that shows you how to do this:
https://www.atlassian.com/git/tutorials/setting-up-a-repository
Remember to ignoreHow to Create a Git Repository | Atlassian Git Tutorial large database, .csv,
and .gz files, as well as other files that should not be
saved to a repository. Use .gitignore for this:
https://www.atlassian.com/git/tutorials/saving-changes/gitignore NEVER store passwords or
keys in a git repo (even if that repo is set to private).
On Ubuntu, run:
* If you downloaded the zip, extract all (if windows built in zip utility gives an error, use [7-zip]
(https://7-zip.org/)).
Alternatively, you can use a Python wget library, but instead of simply using “wget” you’ll need to
use
python -m wget
Alternatively, you can just paste the file URL into your web browser and download the file
normally that way. You’ll want to move the resulting file into your working directory.
Also recommended a look at the python library requests for the loading gz file
https://pypi.org/project/requests
1. Using the Python library wget you installed with pip, try python -m wget <url>
2. Write the usual command and add --no-check-certificate at the end. So it should
be:
And don’t forget to update the wsl in powershell the command is wsl –update
Docker - Error during connect: In the default daemon
configuration on Windows, the docker client must be run with
elevated privileges to connect.: Post:
"http://%2F%2F.%2Fpipe%2Fdocker_engine/v1.24/containers/cre
ate" : open //./pipe/docker_engine: The system cannot find the file
specified
As the official Docker for Windows documentation says, the Docker engine can either use the
Hyper-V or WSL2 as its backend. However, a few constraints might apply
In order to use Hyper-V as its back-end, you MUST have it enabled first, which you can
do by following the tutorial: Enable Hyper-V Option on Windows 10 / 11
On the other hand, Users of the 'Home' version do NOT have the option Hyper-V
option enabled, which means, you can only get Docker up and running using the WSL2
credentials(Windows Subsystem for Linux). Url
●
You can find the detailed instructions to do so here:
https://pureinfotech.com/install-wsl-windows-11/
In case, you run into another issue while trying to install WSL2
(WslRegisterDistribution failed with error: 0x800701bc), Make sure you update the
WSL2 Linux Kernel, following the guidelines here:
https://github.com/microsoft/WSL/issues/5393
I had to do this to make it work. got it from the readme files on the repo
IF the repository is public, the fetch and download happens without any issue whatsoever.
For instance:
● docker pull postgres:13
● docker pull dpage/pgadmin4
BE ADVISED:
The Docker Images we'll be using throughout the Data Engineering Zoomcamp are all public
(except when or if explicitly said otherwise by the instructors or co-instructors).
Meaning: you are NOT required to perform a docker login to fetch them.
So if you get the message above saying "docker login': denied: requested access to the
resource is denied. That is most likely due to a typo in your image name:
For instance:
● Will throw that exception telling you "repository does not exist or may require 'docker
login'
Error response from daemon: pull access denied for dbpage/pgadmin4, repository does not exist or
may require 'docker login': denied: requested access to the resource is denied
● But that actually happened because the actual image is dpage/pgadmin4 and NOT
dbpage/pgadmin4
Digest: sha256:79b2d8da14e537129c28469035524a9be7cfe9107764cc96781a166c8374da1f
Status: Downloaded newer image for dpage/pgadmin4:latest
docker.io/dpage/pgadmin4:latest
EXTRA NOTES:
In the real world, occasionally, when you're working for a company or closed organisation, the
Docker image you're trying to fetch might be under a private repo that your DockerHub
Username was granted access to.
● -v /e/zoomcamp/...:/var/lib/postgresql/data
● -v /c:/.../ny_taxi_postgres_data:/var/lib/postgresql/data\ (leading
slash in front of c:)
Docker - Docker won't start or is stuck in settings (Windows 10 /
11)
- First off, make sure you're running the latest version of Docker for Windows, which you
can download from here. Sometimes using the menu to "Upgrade" doesn't work (which
is another clear indicator for you to uninstall, and reinstall with the latest version)
- If docker is stuck on starting, first try to switch containers by right clicking the docker
symbol from the running programs and switch the containers from windows to linux or
vice versa
- [Windows 10 / 11 Pro Edition] The Pro Edition of Windows can run Docker either by
using Hyper-V or WSL2 as its backend (Docker Engine)
- In order to use Hyper-V as its back-end, you MUST have it enabled first, which
you can do by following the tutorial: Enable Hyper-V Option on Windows 10 / 11
- If you opt-in for WSL2, you can follow the same steps as detailed in the tutorial
here
- [Windows 10 / 11 Home Edition] If you're running a Home Edition, you can still make
it work with WSL2 (Windows Subsystem for Linux) by following the tutorial here
If even after making sure your WSL2 (or Hyper-V) is set up accordingly, Docker remains stuck,
you can try the option to Reset to Factory Defaults or do a fresh install.
Docker - The input device is not a TTY (Docker run for Windows)
You may have this error:
the input device is not a TTY. If you are using mintty, try prefixing the
command with 'winpty''
Solution:
OR
/simple/pandas/
From researching it seems this method might be out of date, it seems that since docker
changed their licensing model, the above is a bit hit and miss. What worked for me was to just
go to the docker website and download their dmg. Haven’t had an issue with that method.
Docker - Could not change permissions of directory
"/var/lib/postgresql/data": Operation not permitted
$ docker run -it\
-e POSTGRES_USER="root" \
-e POSTGRES_PASSWORD="admin" \
-e POSTGRES_DB="ny_taxi" \
-v "/mnt/path/to/ny_taxi_postgres_data":"/var/lib/postgresql/data" \
-p 5432:5432 \
postgres:13
CCW
The files belonging to this database system will be owned by user "postgres".
This use The database cluster will be initialized with locale "en_US.utf8".
The default databerrorase encoding has accordingly been set to "UTF8".
xt search configuration will be set to "english".
One way to solve this issue is to create a local docker volume and map it to postgres data
directory /var/lib/postgresql/data
First, if you have spaces in the path, move your data to some folder without spaces. E.g. if your
code is in “C:/Users/Alexey Grigorev/git/…”, move it to “C:/git/…”
Try replacing the “-v” part with one of the following options:
● -v /c:/some/path/ny_taxi_postgres_data:/var/lib/postgresql/data
● -v //c:/some/path/ny_taxi_postgres_data:/var/lib/postgresql/data
● -v /c/some/path/ny_taxi_postgres_data:/var/lib/postgresql/data
● -v //c/some/path/ny_taxi_postgres_data:/var/lib/postgresql/data
● --volume //driveletter/path/ny_taxi_postgres_data/:/var/lib/postgresql/data
● -v "/c:/some/path/ny_taxi_postgres_data:/var/lib/postgresql/data"
● -v "//c:/some/path/ny_taxi_postgres_data:/var/lib/postgresql/data"
● -v “/c/some/path/ny_taxi_postgres_data:/var/lib/postgresql/data"
● -v "//c/some/path/ny_taxi_postgres_data:/var/lib/postgresql/data"
● -v "c:\some\path\ny_taxi_postgres_data":/var/lib/postgresql/data
If none of these options work, you can use a volume name instead of the path:
● -v ny_taxi_postgres_data:/var/lib/postgresql/data
For Mac: You can wrap $(pwd) with quotes like the highlighted.
Source:https://stackoverflow.com/questions/48522615/docker-error-invalid-reference-format-rep
ository-name-must-be-lowercase
When feeding the database with the data the user id of the directory ny_taxi_postgres_data was
changed to 999, so my user couldn’t access it when running the above command. Even though
this is not the problem here it helped to raise the error due to the permission issue.
Since at this point we only need the files Dockerfile and ingest_data.py, to fix this error one can
run the docker build command on a different directory (having only these two files).
✅Solution:
Just add permission for everyone to the corresponding folder
Example:
A folder is created to host the Docker files. When the build command is executed again to
rebuild the pipeline or create a new one the error is raised as there are no permissions on this
new folder. Grant permissions by running this command;
Or use 777 if you still see problems. 755 grants write access to only the owner.
Docker - Docker network name (solution for mac) ?
When running docker-compose up -d see which network is created and use this for the
ingestions script instead of pg-network and see the name of the database to use instead of
pgdatabase
E.g.:
- pg-network becomes 2docker_default
- Pgdatabase becomes 2docker-pgdatabase-1
Docker - Cannot install docker on MacOS/Windows 11 VM
running on top of Linux (due to Nested virtualization).
pgdatabase
It will work even if your Docker runs on WSL2, as VS Code can easily connect with your Linux.
Docker - How to stop a container?
Use the following command:
- if you have used the prev answer (just before this) and have created a
local docker volume, then you need to tell the compose file about the
named volume:
volumes:
dtc_postgres_volume_local: # Define the named volume here
# services mentioned in the compose file auto become part of the same
network!
services:
your remaining code here . . .
(data-engineering-zoomcamp) hw % docker ps
2022-01-25 05:58:45.948 UTC [1] LOG: starting PostgreSQL 13.5 (Debian 13.5-1.pgdg110+1) on
aarch64-unknown-linux-gnu, compiled by gcc (Debian 10.2.1-6) 10.2.1 20210110, 64-bit
2022-01-25 05:58:45.948 UTC [1] LOG: listening on IPv4 address "0.0.0.0", port 5432
2022-01-25 05:58:45.948 UTC [1] LOG: listening on IPv6 address "::", port 5432
2022-01-25 05:58:45.954 UTC [1] LOG: listening on Unix socket
"/var/run/postgresql/.s.PGSQL.5432"
2022-01-25 05:58:45.984 UTC [28] LOG: database system was interrupted; last known up at
2022-01-24 17:48:35 UTC
2022-01-25 05:58:48.581 UTC [28] LOG: database system was not properly shut down; automatic
recovery in
progress
2022-01-25 05:58:48.602 UTC [28] LOG: redo starts at 0/872A5910
2022-01-25 05:59:33.726 UTC [28] LOG: invalid record length at 0/98A3C160: wanted 24, got 0
2022-01-25 05:59:33.726 UTC [28
Docker compose is creating its own default network since it is no longer specified in a docker
execution command or file. Docker Compose will emit to logs the new network name. See the
logs after executing `docker compose up` to find the network name and change the network
name argument in your Ingestion script.
On localhost:8080 server → Unable to connect to server: could not translate host name
'pg-database' to address: Name does not resolve
And on docker-compose.yml, should specify docker network & specify the same network in both
containers
services:
pgdatabase:
image: postgres:13
environment:
- POSTGRES_USER=root
- POSTGRES_PASSWORD=root
- POSTGRES_DB=ny_taxi
volumes:
- "./ny_taxi_postgres_data:/var/lib/postgresql/data:rw"
ports:
- "5431:5432"
networks:
- pg-network
pgadmin:
image: dpage/pgadmin4
environment:
- [email protected]
- PGADMIN_DEFAULT_PASSWORD=root
ports:
- "8080:80"
networks:
- pg-network
networks:
pg-network:
name: pg-network
Docker-Compose - Persist PGAdmin docker contents on GCP
So one common issue is when you run docker-compose on GCP, postgres won’t persist it’s data
to mentioned path for example:
services:
…
…
pgadmin:
…
…
Volumes:
- “./pgadmin”:/var/lib/pgadmin:wr”
Might not work so in this use you can use Docker Volume to make it persist, by simply changing
services:
…
….
pgadmin:
…
…
Volumes:
- pgadmin:/var/lib/pgadmin
volumes:
pgadmin:
And then press ctrl+D to log-out and log-in again. pgAdmin: Maintain state so that it
remembers your previous connection
If you are tired of having to setup your database connection each time that you fire up the
containers, all you have to do is create a volume for pgAdmin:
In your docker-compose.yaml file, enter the following into your pgAdmin declaration:
volumes:
- type: volume
source: pgadmin_data
target: /var/lib/pgadmin
volumes:
Pgadmin_data:
In order to make it work, you need to include the volume in your docker-compose file. Just add
the following:
volumes:
dtc_postgres_volume_local:
</> docker-compose.yaml
services:
postgres:
image: postgres:15-alpine
container_name: postgres
user: "0:0"
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=ny_taxi
volumes:
- "pg-data:/var/lib/postgresql/data"
ports:
- "5432:5432"
networks:
- pg-network
pgadmin:
image: dpage/pgadmin4
container_name: pgadmin
user: "${UID}:${GID}"
environment:
- [email protected]
- PGADMIN_DEFAULT_PASSWORD=pgadmin
volumes:
- "pg-admin:/var/lib/pgadmin"
ports:
- "8080:80"
networks:
- pg-network
networks:
pg-network:
name: pg-network
volumes:
pg-data:
name: ingest_pgdata
pg-admin:
name: ingest_pgadmin
Solution
- for updating Windows terminal which worked for me:
1. Go to Microsoft Store.
2. Go to the library of apps installed in your system.
3. Search for Windows terminal.
4. Update the app and restart your system to see the changes.
Up restardoting the same issue appears. Happens out of the blue on windows.
Solution 1: Fixing DNS Issue (credit: reddit) this worked for me personally
PGCLI - connection failed: :1), port 5432 failed: could not receive
data from server: Connection refused could not send SSL
negotiation packet: Connection refused
Change
So No, you don’t need to run it inside another container. Your local system will do.
network
observations: Below in bold do not forget the folder that was created ny_taxi_postgres_data
This can happen if you already have Postgres installed on your computer. If it’s the case, use a
:different port, e.g. 5431:
-p 5431:5432
pgcli:
● You can use the lsof command to find out which application is using a specific port on
your local machine. `lsof -i :5432`wi
● Or list the running postgres services on your local machine with launchctl
`
`
b
To unload the running service on your local machine (on a MacOS):
● unload the launch agent for the PostgreSQL service, which will stop the service and
free up the port
`launchctl unload -w
~/Library/LaunchAgents/homebrew.mxcl.postgresql.plist`
● this one to start it again
`launchctl load -w
~/Library/LaunchAgents/homebrew.mxcl.postgresql.plist`
os.makedirs(config_dir)
File "/opt/anaconda3/lib/python3.9/os.py", line 225, in makedirspython
mkdir(name, mode)PermissionError: [Errno 13] Permission denied:
'/Users/vray/.config/pgcli'
The recommended approach is to use conda/anaconda to make sure your system python is not
affected.
Solution:
And the reason for that is we have had cases of 'psycopg2-binary' failing to install because of an
old version of Python (3.7.3).
2. Next, you should be able to install the lib for postgres like this:
```
$ e
```
3. Finally, make sure you're also installing pgcli, but use conda for that:
```
$ pgcli -h localhost -U root -d ny_taxi
```
For details:
Reference: https://stackoverflow.com/a/68233660
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')
This can happen when Postgres is already installed on your computer. Changing the port can
resolve that (e.g. from 5432 to 5431).
To check whether there even is a root user with the ability to login:
Based on this:
https://stackoverflow.com/questions/60193781/postgres-with-docker-compose-gives-fatal-role-ro
ot-does-not-exist-error
Also `docker compose down`, removing folder that had postgres volume, running `docker
compose up` again.
Make sure postgres is running. You can check that by running `docker ps`
✅Solution: If you have postgres software installed on your computer before now, build your
instance on a different port like 8080 instead of 5432
e…
Solution:
● if you are getting the “ ModuleNotFoundError: No module named 'psycopg2' “ error even
after the above installation, then try updating conda using the command conda update -n
base -c defaults conda. Or if you are using pip, then try updating it before installing the
psycopg packages i.e
○ First uninstall the psycopg package
○ Then update conda or pip
○ Then install psycopg again using pip.
● if you are still facing error with r pcycopg2 and showing pg_config not found then you will
have to install postgresql. in MAC it is brew install postgresql
✅
throw an error says “column does not exist”.
Solution: But if we enclose the column names in double quotes then it will work
✅
The issue seems to arise from the missing of sqlite3.dll in path ".\Anaconda\Dlls\".
I solved it by simply copying that .dll file from \Anaconda3\Library\bin and put it under the
path mentioned above. (if you are using anaconda)
top to bottom, and the logic is tidied up in a later step when it is instead inserted into a .py file for
the pipeline
import pandas as pd
df = pd.read_csv('path/to/file.csv.gz', compression='gzip')
If you prefer to keep the uncompressed csv (easier preview n vscode and similar), gzip files can
be unzipped using gunzip (but not unzip). On a Ubuntu local or virtual machine, you may need
to apt-get install gunzip first.
Contrary to panda’s read_csv method there’s no such easy way to iterate through and set
chunksize for
files. We can use PyArrow (Apache Arrow Python bindings) to resolve that.parqu
import pyarrow.parquet as pq
output_name =
“https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-
01.parquet”
parquet_file = pq.ParquetFile(output_name)
parquet_size = parquet_file.metadata.num_rows
engine =
create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')
table_name=”yellow_taxi_schema”
for i in parquet_file.iter_batches(use_threads=True):
t_start = time()
print(f'Ingesting {index} out of {parquet_size} rows ({index /
parquet_size:.0%})')
i.to_pandas().to_sql(name=table_name, con=engine,
if_exists='append')
index += 65536
t_end = time()
print(f'\t- it took %.1f seconds' % (t_end - t_start))
"error": {
"code": 409,
"message": "Requested entity alreadytpep_pickup_datetime exists",
"status": "ALREADY_EXISTS"
From Stackoverflow:
https://stackoverflow.com/questions/52561383/gcloud-cli-cannot-create-project-the-project-id-yo
u-specified-is-already-in-us?rq=1
Project IDs are unique across all projects. That means if any user ever had a project with that
ID, you cannot use it. testproject is pretty common, so it's not surprising it's already taken.
The value you enter here will be unique to each student. You can find this value on your GCP
Dashboard when you login.
Ashish Agrawal
Another possibility is that you have not linked your billing account to your current project
GCP -> Select project with your instance -> IAM & Admin -> Service Accounts Keys tab -> add
key, JSON as key type, then click create
Note: Once you go into Service Accounts Keys tab, click the email, then you can see the
“KEYS” tab where you can add key as a JSON as its key type
GCP - Do I need to delete my instance in Google Cloud?
In this lecture, Alexey deleted his instance in Google Cloud. Do I have to do it?
Nope. Do not delete your instance in Google Clo ud platform. Otherwise, you have to do this
twice for the week 1 readings.
These credentials will be used by any library that requests Application Default Credentials
(ADC).
WARNING:
Cannot find a quota project to add to ADC. You might receive a "quota exceeded" or "API not
enabled" error. Run $ gcloud auth application-default set-quota-project to add a quota
project.
For me:
https://github.com/DataTalksClub/data-engineering-zoomcamp/blob/main/week_1_basics_n
_setup/1_terraform_gcp/windows.md
And the advantage of using your own environment is that if you are working in a Github repo
where you can commit, you will be able to commit the changes that you do. In the VM the repo
is cloned via HTTPS so it is not possible to directly commit, even if you are the owner of the
repo.
User1@DESKTOP-PD6UM8A MINGW64 /
$ mkdir .ssh
Local. But it seems you're trying to do it in the root folder (/). Should be your home (~)
You need to change the owner of the files you are trying to edit via VS Code. You can run the
following command to change the ownership.
ssh
✅Answer: Start your VM. Once the VM is running, copy its External IP and paste that into your
config file within the ~/.ssh folder.
cd ~/.ssh
code config ← this opens the config file in VSCode
(reference:
https://serverfault.com/questions/953290/google-compute-engine-ssh-connect-to-host-ip-port-22
-operation-timed-out)Go to edit your VM.
1. Go to section Automation
2. Add Startup script
```
#!/bin/bash
sudo ufw allow ssh
```
4. Stop and Start VM.
You can easily forward the ports of pgAdmin, postgres and Jupyter Notebook using the built-in
tools in Ubuntu and without any additional client:
1. First, in the VM machine, launch docker-compose up -d and jupyter notebook
in the correct folder.
2. From the local machine, execute: ssh -i ~/.ssh/gcp -L
5432:localhost:5432 username@external_ip_of_vm
3. Execute the same command but with ports 8080 and 8888.
4. Now you can access pgAdmin on local machine in browser typing localhost:8080
5. For Jupyter Notebook, type localhost:8888 in the browser of your local machine. If
you have problems with the credentials, it is possible that you have to copy the link with
the access token provided in the logs of the terminal of the VM machine when you
launched the jupyter notebook command.
6. To forward both pgAdmin and postgres use, ssh -i ~/.ssh/gcp -L 5432:localhost:5432 -L
8080:localhost:8080 [email protected]
Terraform - Install for WSL
https://techcommunity.microsoft.com/t5/azure-developer-community-blog/configuring-terraform-
on-windows-10-linux-sub-system/ba-p/393845
One service account is enough for all the services/resources you'll use in this course. After you
get the file with your credentials and set your environment variable, you should be good to go.
Terraform - Where can I find the Terraform 1.1.3 Linux (AMD 64)?
Here: https://releases.hashicorp.com/terraform/1.1.3/terraform_1.1.3_linux_amd64.zip
Terraform - Terraform initialized in an empty directory! The
directory has no Terraform configuration files. You may begin
working with Terraform immediately by creating Terraform
configuration files.g
You get this error because I run the command terraform init outside the working directory, and
this is wrong.
You need first to navigate to the working directory that contains terraform configuration files, and
and then run the command.
The error:
Error: googleapi: Error 403: Access denied., forbidden
│
and
│ Error: Error creating Dataset: googleapi: Error 403: Request had
insufficient authentication scopes.
Solution:
You have to set again the GOOGLE_APPLICATION_CREDENTIALS as Alexey did in the
environment set-up video in week1:
export
GOOGLE_APPLICATION_CREDENTIALS="<path/to/your/service-account-authkeys
>.json
- Addition to the above point, for me, there is no ‘Astoria Zone’, only ‘Astoria’ is existing in
the dataset.
SELECT * FROM zones AS z WHERE z."Zone" = 'Astoria’;
Week 2
Conda Env: Why and for what do we need to work with Conda
environments exactly? What does this environment give us in
terms of ability?
Python and many other languages that advise on a virtual env suffer from a major flaw called
dependency hell. In a nutshell, that means:
It's common sense that different projects have different requirements, right? That could mean
different python versions, different libraries, frameworks, etc
So imagine you have a single Python installation (the one that comes with the operating system,
for example), and you're invited to work in a Project A and in a Project B
Project A Dependencies:
- A1
- A2
- A3
Project B Dependencies:
- B1
- B2
- B3
Now, a dependency, at the end of the day, also makes use of other dependencies. (These
"implicit" dependencies of the libraries/frameworks you are using in your project are called:
transient dependencies)
Now, with that in mind:
A1 depends on X version 1.0
Meaning "X", v1.0, is a transient dependency of A1
B1 depends on X version 2.0
● Meaning "X", v2.0, is a transient dependency of B1
When you are working on the same "python environment", the rule that applies is: Always keep
the most updated version of a library. So when you ask your system to install the dependency
"A1", it will also bring "X" at version 1.0
But then you switch to another project, called B, which also depends on X - but at version 2.0
That means that this Python environment will upgrade "X" from v1 to v2 to make it work for B1.
But the thing is: when you hop between major versions of a library, things often break.
Meaning: B1 will work just fine, because its transient dependency (X v2.0) is satisfied. But A1
may stop working if v2.0 is incompatible with its previous version (v1.0).
The scenario I just said is quite simple, but in the real world, you're going to be working with
different versions of Python and other sets of constraints. But instead we had many ppl in here
with Python at 3.7 unable to install the project dependencies for psycopg2, and once they
updated to Python 3.9, everything started to work fine.
But what if you DO have another project that is running in production on Python 3.7 and starts
bugging out ? The best you can do to reproduce/replicate said environment is to make yourself
an equivalent environment (w/ 3.7 instead of 3.9)... and the list goes on and on and on
Besides, you NEVER EVER want to mess up with Python environment that comes with your
Operating System (macOS / Linux) -> many many system tools today use Python, and if you
break it the one that comes bundled with the OS, you're sure gonna have a lot of headache (as
in: unexpected behaviors) to put the pieces back together
Hence why, once again, you use virtualenvs to provide you with isolation. Not only between your
projects, but also, between the projects and the underlying infrastructure from the OS
That's because the containers are not only meant to be disposable if/when they break, but they
also run in an isolated workspace of their own (but this is new and entire different discussion)
Why does Jeff have the same line twice in video 2.2.3 and 2.2.4,
etl_web_to_gcs.py and parameterized_flow.py?
df[“tpep_pickup_datetime”] =
pd.to_datetime(df[“tpep_pickup_datetime”])
df[“tpep_dropoff_datetime”] =
pd.to_datetime(df[“tpep_dropoff_datetime”])
Repo is updated. Thank you jralduaveuthey and Valentine Zaretsky for catching!
@task(log_prints=True)
def clean(df: pd.DataFrame) -> pd.DataFrame:
not
@task(log_prints=True)
def clean(df=pd.DataFrame) -> pd.DataFrame:
Copied json key for credentials incorrectly or the service account doesn't have the necessary
permissions
The solution for me was to set the timeout parameter of the function to 120 (seconds).
gcs_block.upload_from_path(from_path=path, to_path=path, )
The default timeout is 60 seconds. Timeout may vary depending on your internet speed.
Bug with prefect-gcp 0.2.3 on Windows only. Couldn’t upload the file into a folder as in
the video.
✅SOLUTION: Use prefect-gcp 0.2.4 You can specify the new version in requirements.txt
before installing or pip install -U prefect-gcp to upgrade in an existing environment.
Alternatively, you can set the cache_key_expiration to a short period of time - say a minute - and
rerun outside Docker and then in Docker, it won’t try to find the cached result.
1. ✅Deleted the part that use the cache from the fetch task and the flow runs, i’ll provide
further details if i find why it was trying to pull from local cache
# @task(retries=3,cache_key_fn=task_input_hash,
cache_expiration=timedelta(days=1))
@task(retries=3)
def fetch(dataset_url: str) -> pd.DataFrame:
If using Docker with a function that says to look for a cached run, but you have cache
stored locally outside Docker from a previous run, Docker can’t access the cached file, so it
throws an error.
2. Also, make sure the Prefect Block you have for the GCS creds is having the service
account JSON data directly instead of the path to the service account JSON file
3. made sure I removed the CACHE settings on the @task and I re-ran every
command (building docker image, pushing, updating deployment) works!!!
For more info, you can also refer this slack thread for the same error -
https://datatalks-club.slack.com/archives/C01FABYF2RG/p1674928505612379
If it inserted correctly, then you should see the following masked by the stars (with a formatting
of key/value per line):
{ "type": "********", "auth_uri": "********", "client_id":
"********", "token_uri": "********", "project_id": "********",
"private_key": "********", "client_email": "********",
"private_key_id": "********", "client_x509_cert_url": "********",
"auth_provider_x509_cert_url": "********"}
Solution:
Ensure you copy the parameterized_flow.py file to the /opt/prefect/flows/ directory giving it the
same name as the file parameterized_flow.py
0rameteri1_start/parameterized_flow.py /opt/prefect/flows/pazed_flow.py
build
Prefect Deployment Encountered Exception–prefect deployment build
./parameterized_flow.py:etl_parent_flow -n "Parametereized ETL "Script at
'./parameterized_flow.py' encountered an exception: TypeError("'type' object is not
subscriptable")
$ python parameterized_flow.py
✅Solution:
With this syntax you can pass multiple parameters:
prefect deployment run etl-parameters/docker-flow --params= '{"color":"yellow",
"month":[1,2], "year":2021}'
When using the GitHub block, you have to run the prefect deployment build command in
the same local folder as the root folder in the GitHub repository.
In the build command, you have to provide the path to the python file:
path/to/my_file.py:my_flow
--path is used as the upload path, which doesn’t apply for GitHub repository-based storage.
OR
I added 2 lines code to write_local Function:
if not path.parent.is_dir():
path.parent.mkdir(parents=True)
I suggest using the 2nd fix, the first can cause issues when running through prefect doing steps
2.2.5.
Previous If statement didn’t work for me. I added following line to the write_local function, which
did the trick:
path.parent.mkdir(parents=True, exist_ok=True)
ALTERNATIVE:
outdir = f"./data/{color}"
if not os.path.exists(outdir):
os.makedirs(outdir)
path = Path(f"{outdir}/{dataset_file}.parquet")
df.to_parquet(path, compression='gzip')
return path
The first block checks if the path already exists, and if it doesn’t, creates it first.
Make sure you don’t use the “Reference” field as description of the block cause otherwise it will
look endlessly for a branch or name tag in your Github repository that clearly doesn’t exist.
Prefect flow: etl-parent-flow/docker-flow Crashed with
ConnectionRefusedError: [Errno 111] Connect call failed
('127.0.0.1', 4200)
This error occurs when you run your prefect on WSL 2 on Windows 10. The error looks like this:
UI
The error above is because we run Prefect locally on our machine at localhost:4200, when we
run docker without specifying their network, docker call the localhost:4200 inside the container
but not the localhost:4200 on our machine.
To solve this, you on
docker_block.save("zoom", overwrite=True)
Prefect flow: Why are we writing the .csv locally in
etl_web_to_gcs.py?
Problem: When trigger the flows of Prefect DockerContainer deployment it was unable to
connect to other dockers.
Cause:
The DockerContainer deployment is not running on the same network as rest of the dockers
Fix:
Create a docker network
```
if [ ! "$(docker network ls | grep dtc-network)" ]; then
echo "\n**Create docker network"
docker network create -d bridge dtc-network
else
echo "dtc-network already exists."
fi
```
Add the networks to each container within docker-compose.yml
```
Container:
networks:
- dtc-network
networks:
dtc-network:
driver: bridge
name: dtc-network
```
Python lib: Problem with prefect with macOS M1 (arm) with poetry
the package manager, I got the below message:
I got this problem while tried to run `prefect orion start` to start the prefect ui on my
laptop
For the people who got the same problem with poetry I solve this issue by add `greenlet` to
`pyproject.tom`
Instructor using port 5433 for week2. If you’re using previous containers you should set port
5432 and user/pass root/root
Error Message:
httpx.HTTPStatusError: Client error '404 Not Found' for url
'http://ephemeral-orion/api/flow_runs/6aed1011-1599-4fba-afad-b8d999cf
9073'
For more information check: https://httpstatuses.com/404
Solution: ✅
reference
[method-1]
Input the below command on local PC.
prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api
[method-2]
Add the below lines in the Dockerfile
ENV PREFECT_API_URL=http://127.0.0.1:4200/api
RUN prefect config set PREFECT_API_URL="${PREFECT_API_URL}"
(if you need, you can set the Env in Prefect Orion, like below. And you can change the value
depending on your demand, such as connection to Prefect Cloud)
Uploading to GCS Does not put the file inside a folder but instead
the file is named ‘data\green\...’ (Affects Windows)
Additionally, in my case I needed to update the prefect_gcp package to the version 0.2.6.
slack_webhook_block = SlackWebhook.load("slack-block-name")
slack_webhook_block.notify("Hello!")
WARNING | apprise - Failed to send to Slack: Page not found.,
error=404
1. While logged in to the Temporary Data Engineering Zoomcamp slack (check the week 2
homework for an updated link), visit the Slack API Apps page to create a new app.
2. Using the splash or via the green “Create a new app” button, create an app “From an
app manifest”
3. Select the workspace “Temporary DE Zoomcamp Prefect Notifications”
10. The Webhook URL provided below is your new webhook- use it in place of the webhook
supplied by the homework.
Or you can use an existing one configuration:
1. Go to
https://temp-notify.slack.com/apps/A0F7XDUAZ-incoming-webhooks?tab=settings&next
_id=0
2. Click “edit” on any existing configuration
Step 2: Go back to the UI and refresh and look at the blocks again, the GCP Bucket Block will
now be available for use.
Solution: Remove the `task input hash` at the decorator function of your task. This is due to
Prefect “remembering” the absolute directory instead of relative.
File path does not exist after building Prefect deployment using
GitHub Bucket
Error: OSError: Cannot save file into a non-existent directory: 'data\green'
see:
https://datatalks-club.slack.com/archives/C01FABYF2RG/p1675774214591809?thread_ts=1675
768839.028879&cid=C01FABYF2RG
Git won’t push an empty folder to GitHub, so if you put a file in that folder and then push, then
you should be good to go.
Or - in your code- make the folder if it doesn’t exist using Pathlib as shown here:
https://stackoverflow.com/a/273227/4590385.
For some reason, when using github storage, the relative path for writing locally no longer
works. Try using two separate paths, one full path for the local write, and the original relative
path for GCS bucket upload.
Likely associated with how WSL handles IPv4 addresses, as in this issue: WSL by default will
bind to an IPv6 address, which Prefect does not appear to handle.
A temporary fix can be had by using the subsystem IP assigned to the WSL instance. This
should work for Debian-based distros on WSL 2, including Ubuntu. Steps as follows:
1. In a WSL terminal, run the command ip addr to find the subsystem IP. You’re looking
for a connection with a valid inet address: in my example below, this is
172.22.53.106
eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP group
default qlen 1000
link/ether 00:15:5d:f5:0c:25 brd ff:ff:ff:ff:ff:ff
inet 172.22.53.106/20 brd 172.22.63.255 scope global eth0
valid_lft forever preferred_lft forever
inet6 fe80::215:5dff:fef5:c25/64 scope link
valid_lft forever preferred_lft forever
2. While in WSL, start Prefect Orion with that IP address as host:
prefect orion start --host 172.22.53.106
3. While in WSL, set the Prefect API to that IP address. The exact code for this should be
displayed when you start Prefect Orion in the step above.
prefect config set PREFECT_API_URL=http://172.22.53.106:4200/api
4. You should be able to run Prefect as set in the course videos from this point.
A couple of caveats:
● This is only a temporary fix. You'll likely need to copy the steps above for every new
instance of WSL- i.e., if you restart, look for the subsystem IP again and plug it into the
right places.
● I haven't completely ironed out the issues on my end, but I am getting new error
messages that suggest the docker deployment is running and connected (i.e.,
prefect.exceptions.ScriptError: Script at 'parameterized_flow.py'
encountered an exception: FileNotFoundError(2, 'No such file or
directory') ) Your mileage may vary.
Although I haven’t tested this, you may also be able to resolve the issue using docker-ce in
place of Docker Desktop for Windows.
Do not forget that the green dataset contains lpep_pickup_datetime while the yellow contains
tpep_pickup_datetime. Modify the script(s) accordingly based on the color of dataset when
needed.
pd.read_csv
The data needs to be appended to the parquet file using the fastparquet engine
df.to_parquet(path, compression="gzip",
engine='fastparquet', append=True)
Killed
Solution: You probably are running out of memory on your VM and need to add more. For
example, if you have 8 gigs of RAM on your VM, you may want to expand that to 16 gigs.
@task()
def write_local(df: pd.DataFrame, color: str, dataset_file: str) -> Path:
"""Write DataFrame out locally as parquet file"""
path_file = f"{dataset_file}.parquet"
path_dir = Path(f"data/{color}")
path_dir.mkdir(parents=True, exist_ok=True)
df.to_parquet(path_dir / path_file, compression="gzip")
return path_dir / path_file
Question 4 Hint
Read the docs here first you execute code using python to clone the code to your locals, then
build deployment. Note: You can also build the github bucket from the UI, therefore, you can
skip the first part and ensure to put token if your repo is private.
Solution: The issue turned out to be that the code had caching enabled. Once all of the caching
code was removed from the script, it worked flawlessly.
Solution:
1. prefect orion start --host 192.168.178.90
Where the IP is the internal IP of my MAC
2. prefect config set PREFECT_API_URL=http://192.168.178.90:4200/api
3. Declaring the docker container block in Prefect with Network Mode = host
install -r requerements.txt on Mac
fatal error: Python.h: No such file or directory compilation
terminated.
Solution: That function creates a task. You can’t run a task inside a task in Prefect. The
solution is to call that function from the flow itself.
To fix this error I had to run prefect orion database restart. After the restart, the blocks that
were not appearing in the UI appeared (no extra pip installs or block registering should be
required). Please note that this will delete all current blocks you have saved, such as your
GCP credentials so note down how to create them before doing this step.
After playing around with prefect for a while this can happen.
Ssh to your VM and run sudo du -h --block-size=G | sort -n -r | head -n 30 to see which
directory needs the most space.
Most likely it will be …/.prefect/storage, where your cached flows are stored. You can delete
older flows from there. You also have to delete the corresponding flow in the UI, otherwise it will
throw you an error, when you try to run your next flow.
Error Found in Python Script Already Deployed as a Flow
Initial, I would have to run the entire process from edit the script to build and apply the news
deployment. However, I found this to be best.
Solution:
Docker: Edit the python script, rebuild your docker image, push it to dockerhub using thesame
name and tag, and run your deployment.
Github: Edit the python script, commit your changes, push it to github and run your deployment.
Solution:
Take clone of your repo to local directory.
- change directory to cloned repo ($ cd de-zoom-camp)
- Initialize git with command, git init
- open your terminal in VS code, and activate virtual environment(eg; conda)
conda activate zoomcamp
-Install prefect-github with pip:
pip install prefect-github
- register the block,
prefect block register -m prefect_github
It worked successfully.
In Q3 there was a task to run the etl script from web to GCS. The problem was, it wasn’t really
an ETL straight from web to GCS, but it was actually a web to local storage to local memory to
GCS over network ETL. Yellow data is about 100 MB each per month compressed and ~700
MB after uncompressed on memory
This leads to a problem where i either got a network type error because my not so good 3rd
world internet or i got my WSL2 crashed/hanged because out of memory error and/or 100%
resource usage hang.
Solution:
if you have a lot of time at hand, try compressing it to parquet and writing it to GCS with the
timeout argument set to a really high number (the default os 60 seconds)
the yellow taxi data for feb 2019 is about 100MB as parquet file
gcp_cloud_storage_bucket_block.upload_from_path(
from_path=f"{path}",
to_path=path,
timeout=600
)
Check your GCS Bucket block in Prefect UI @ http://127.0.0.1:4200 - you may have added
blank spaces at the beginning/ending of the bucket name.
https://discourse.prefect.io/t/installation-error-prefect-version-cmd-errors/1784/5
Week 3
Suggestion:
For question 2, I feel the question is not correct its either the count of the entire dataset for both
tables, which the answer is there or the answer is not there.
1 solution) Add -Y flag, so that apt-get automatically agrees to install additional packages
2) Use python ZipFile package, which is included in all modern python distributions
If you’re having problems loading the FHV_2021 data from the
github repo into GCS and then into BQ (input file not of type
parquet), you need to do two things. First, append the URL
Template link with ‘?raw=true’ like so:
Second, update make sure the URL_PREFIX is set to the following value:
URL_PREFIX =
"https://github.com/alexeygrigorev/datasets/blob/master/nyc-tlc/fhv"
It is critical that you use this link with the keyword blob. If your link has ‘tree’ here, replace it.
Everything else can stay the same, including the curl -sSLf command.
https://github.com/sebastian2296/data-engineering-zoomcamp/blob/main/week_4_analytics_en
gineering/web_to_gcs.py
Same ERROR - When running dbt run for fact_trips.sql, the task failed with error:
“Parquet column 'ehail_fee' has type DOUBLE which does not match the target
cpp_type INT64”
Reason: Parquet files have their own schema. Some parquet files for green data have records
with decimals in ehail_fee column.
Drop ehail_feel column since it is not really used. For instance when creating a partitioned
table from the external table in BigQuery
Modify Airflow dag to make the conversion and avoid the error.
pv.read_csv(src_file,
convert_options=pv.ConvertOptions(column_types = {'ehail_fee':
'float64'}))
Same type of ERROR - parquet files with different data types - Fix it with pandas
You can specify the dtypes when importing the file from csv to a dataframe with pandas
pd.from_csv(..., dtype=type_dict)
One obstacle is that the regular int64 pandas use (I think this is from the numpy library)
does not accept null values (NaN, not a number). But you can use the pandas Int64 instead,
notice capital ‘I’. The type_dict is a python dictionary mapping the column names to the
dtypes.
Sources:
https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html
I searched for the arm version of greenlet but couldn't find it, so I found the following solution
instead.
I used the instructions from this site How to Manage Conda Environments on an Apple Silicon
M1 Mac to create a conda x86 environment.
This solution works! And I can now continue to prefect on my arm computer!
Ultimately, when trying to ingest data into a BigQuery table, all files within a given directory must
have the same schema.
When dealing for example with the FHV Datasets from 2019, however (see image below), one
can see that the files for '2019-05', and 2019-06, have the columns "PUlocationID" and
"DOlocationID" as Integers, while for the period of '2019-01' through '2019-04', the same column
is defined as FLOAT.
So while importing these files as parquet to BigQuery, the first one will be used to define the
schema of the table, while all files following that will be used to append data on the existing
table. Which means, they must all follow the very same schema of the file that created the table.
So, in order to prevent errors like that, make sure to enforce the data types for the columns on
the DataFrame before you serialize/upload them to BigQuery. Like this:
pd.read_csv("path_or_url").astype({
"col1_name": "datatype",
"col2_name": "datatype",
...
"colN_name": "datatype"
})
Encoding error when writing data from web to GCS:
Make sure to use Nullable dataTypes, such as Int64 when appliable.
These won't work. You need to make sure you use Int64:
Incorrect:
df['DOlocationID'] = pd.to_numeric(df['DOlocationID'], downcast=integer) or
df['DOlocationID'] = df['DOlocationID'].astype(int)
Correct:
df['DOlocationID'] = df['DOlocationID'].astype('Int64')
What does it mean Stop with loading the files into a bucket.' Stop with loading the
files into a bucket? In the homework
What they mean is that they don't want you to do anything more than that. You should load
the files into the bucket and create an external table based on those files (but nothing like
cleaning the data and putting it in parquet format)
Solution:
Step 1: When reading the data from the web into the pandas dataframe mention the encoding
as follows:
pd.read_csv(dataset_url, low_memory=False, encoding='latin1')
Step 2: When writing the dataframe from the local system to GCS as a csv mention the
encoding as follows:
df.to_csv(path_on_gsc, compression="gzip", encoding='utf-8')
Alternatively, you can copy paste your queries into an .sql file in your preferred editor
(Notepad++, VS Code, etc.). Using the .sql extension will provide convenient color formatting.
table = pq.read_table(‘taxi.parquet’)
datetimes = [‘list of datetime column names’]
df_dts = pd.DataFrame()
for col in datetimes:
df_dts[col] = pd.to_datetime(table .column(col), errors='coerce')
The `errors=’coerce’` parameter will convert the out of bounds timestamps into either the
max or the min
3. Use parquet.compute.filter to remove the offending rows
import pyarrow.compute as pc
table = pq.read_table("‘taxi.parquet")
df = table.filter(
pc.less_equal(table["dropOff_datetime"], pa.scalar(pd.Timestamp.max))
).to_pandas()
Unable to load data from external tables into a materialized table
in BigQuery due to an invalid timestamp error that are added
while appending data to the file in Google Cloud Storage
This error is caused by invalid data in the timestamp column. A way to identify the problem is to
define the schema from the external table using string datatype. This enables the queries to
work at which point we can filter out the invalid rows from the import to the materizlied table and
insert the fields with the timestamp data type.
I verified the BQ data set and gcs bucket are in the same region- us-west1. Not sure how it
gets location US. I couldn’t find the solution yet.
Solution: Please enter correct project_id and gcs_bucket folder address. Mine gcs_bucket
folder address is gs://dtc_data_lake_optimum-airfoil-376815/tip_model
Remove ```cache_key_fn=task_input_hash ``` as it’s in argument in your function & run your
flow again.
Note: catche key is beneficial if you happen to run the code multiple times, it won't repeat the
process which you have finished running in the previous run. That mean, if you have this
```cache_key``` in your initial run, this might cause the error.
2. Now in bigquery, click on three dot icon near your project name and select create
dataset.
3. In region filed choose the same regions as you saw in your google cloud bucket
Tip: Using Cloud Function to read csv.gz files from github directly
to BigQuery in Google Cloud:
There are multiple benefits of using Cloud Functions to automate tasks in Google Cloud.
Use below Cloud Function python script to load files directly to BigQuery. Use your project id,
dataset id & table id as defined by you.
import tempfile
import requests
import logging
from google.cloud import bigquery
def hello_world(request):
# table_id = <project_id.dataset_id.table_id>
table_id = 'de-zoomcap-project.dezoomcamp.fhv-2019'
job = client.load_table_from_file(
f,
table_id,
location="US",
job_config=job_config,
)
job.result()
logging.info("Data for month %d successfully loaded into table
%s.", month, table_id)
@task
def download_file(url: str, file_path: str):
response = requests.get(url)
open(file_path, "wb").write(response.content)
return file_path
@flow
def extract_from_web() -> None:
file_path =
download_file(url=f'{url-filename}.csv.gz',file_path=f'{filename}.csv.gz')
How to handle type error from big query and parquet data?
Problem: When you inject data into GCS using Pandas, there is a chance that some dataset
has missing values on DOlocationID and PUlocationID. Pandas by default will cast these
columns as float data type, causing inconsistent data type between parquet in GCS and schema
defined in big query. You will see something like this:
error: Error while reading table: trips_data_all.external_fhv_tripdata,
error message: Parquet column 'DOlocationID' has type INT64 which does not
match the target cpp_type DOUBLE.
Solution:
- Fix the data type issue in data pipeline
- Before injecting data into GCS, use astype and Int64 (which is different from int64 and
accept both missing value and integer exist in the column) to cast the columns.
Something like:
df["PUlocationID"] = df.PUlocationID.astype("Int64")
df["DOlocationID"] = df.DOlocationID.astype("Int64")
NOTE: It is best to define the data type of all the columns in the
Transformation section of the ETL pipeline before loading to BigQuery
Problem occurs when misplacing content after from clause in BigQuery SQLs.
Check to remove any extra apaces or any other symbols, keep in lowercases, digits and dashes
only
Error Message:
PARTITION BY expression must be DATE(<timestamp_column>),
DATE(<datetime_column>), DATETIME_TRUNC(<datetime_column>,
DAY/HOUR/MONTH/YEAR), a DATE column,
TIMESTAMP_TRUNC(<timestamp_column>, DAY/HOUR/MONTH/YEAR),
DATE_TRUNC(<date_column>, MONTH/YEAR), or RANGE_BUCKET(<int64_column>,
GENERATE_ARRAY(<int64_value>, <int64_value>[, <int64_value>]))
Solution:
Convert the column to datetime first.
df["pickup_datetime"] = pd.to_datetime(df["pickup_datetime"])
df["dropOff_datetime"] = pd.to_datetime(df["dropOff_datetime"])
Native tables vs External tables in BigQuery?
Native tables are tables where the data is stored in BigQuery. External tables store the data
outside BigQuery, with BigQuery storing metadata about that external table.
Resources:
● https://cloud.google.com/bigquery/docs/external-tables
● https://cloud.google.com/bigquery/docs/tables-intro
Week 4
When attempting to use the provided quick script to load trip data
into GCS, you receive error Access Denied from the S3 bucket
If the provided URL isn’t working for you (https://nyc-tlc.s3.amazonaws.com/trip+data/):
We can use the GitHub CLI to easily download the needed trip data from
https://github.com/DataTalksClub/nyc-tlc-data, and manually upload to a GCS bucket.
Commands to use:
gh auth login
etc.
Now you can upload the files to a GCS bucket using the GUI.
All of sudden ssh stopped working for my VM after my last restart
One common cause experienced is lack of space after running prefect several times. When
running prefect, check the folder ‘.prefect/storage’ and delete the logs now and then to
avoid the problem.
2. The schema you’re trying to write to (name should be dbt_<first initial><last name>
(if you didn’t change the default settings at the end when setting up your project))
Likely, your source data will be in your region, but the write location will be a multi-regional
location (US in this example). Delete these datasets, and recreate them with your specified
region and the correct naming format.
Alternatively, instead of removing datasets, you can specify the single-region location you are
using. E.g. instead of ‘location: US’, specify the region, so ‘location: US-east1’. See
this Github comment for more detail. Additionally please see this post of Sandy
In DBT cloud you can actually specify the location using the following steps:
1. Go to your profile page (top right drop-down --> profile)
2. Then go to under Credentials --> Analytics (you may have customised this name)
4. Hit Edit
5. Update your location, you may need to re-upload your service account JSON to re-fetch
your private key, and save. (NOTE: be sure to exactly copy the region BigQuery
specifies your dataset is in.)
When executing dbt run after installing dbt-utils latest version i.e.,
1.0.0 warning has generated
Error: `dbt_utils.surrogate_key` has been replaced by `dbt_utils.generate_surrogate_key`
Fix:
1. Fixed by adding the Storage Object Viewer role to the service account in use in BigQuery.
To this line:
Fixed by opening a bash in the container executing the dag and manually running the following
command that deletes all \n not preceded by \r.
R: Staging, as the name suggests, is like an intermediate between the raw datasets and the fact
and dim tables, which are the finished product, so to speak. You'll notice that the datasets in
staging are materialised as views and not tables.
Vic didn't use it for the project, you just need to create production and dbt_name + trips_data_all
that you had already.
If you're following video DE Zoomcamp 4.3.1 - Building the First DBT Models, you
may have encountered an issue at 14:25 where the Lineage graph isn't displayed
and a Compilation Error occurs, as shown in the attached image. Don't worry - a
quick fix for this is to simply save your schema.yml file. Once you've done this, you
should be able to view your Lineage graph without any further issues.
Compilation Error in test
accepted_values_stg_green_tripdata_Payment_type__False___v
ar_payment_type_values_ (models/staging/schema.yml)
vars:
payment_type_values: [1, 2, 3, 4, 5, 6]
Troubleshooting in dbt:
The dbt error log contains a link to BigQuery. When you follow it you will see your query and the
problematic line will be highlighted.
Make sure that you create a pull request from your Development branch to the Production
branch (main by default). After that, check in your ‘seeds’ folder if the seed file is inside it.
Another thing to check is your .gitignore file. Make sure that the .csv extension is not included.
Solution:
- One way to solve this problem is to specify/ cast data type Int64 during the data
transformation stage.
- However, you may be lazy to type all the int columns. If that is the case, you can simply
use convert_dtypes to infer the data type
# Make pandas to infer correct data type (as pandas parse int with
missing as float)
df.fillna(-999999, inplace=True)
df = df.convert_dtypes()
df = df.replace(-999999, None)
My specific error:
Runtime Error in rpc request (from remote system.sql) 404 Not found: Table
dtc-de-0315:trips_data_all.green_tripdata_partitioned was not found in location europe-west6
Location: europe-west6 Job ID: 168ee9bd-07cd-4ca4-9ee0-4f6b0f33897c
Make sure all of your datasets have the correct region and not a generalised region:
Europe-west6 as opposed to EU
1. On Profile Settings > Linked Accounts connect your Github account with dbt project
allowing the permissions asked. More info at
https://docs.getdbt.com/docs/collaborate/git/connect-gith
2.
3. Disconnect your current Github’s configuration from Account Settings > Projects
(analytics) > Github connection. At the bottom left appears the button Disconnect,
press it.
4. Once we have confirmed the change, we can configure it again. This time, choose
Github and it will appears all repositories which you have allowed to work with dbt.
Select your repository and it’s ready.
5. Go to the Deploy > job configuration’s page and go down until Triggers and now you
can see the option Run on Pull Requests:
Data type errors when ingesting with parquet files
The easiest way to avoid these errors is by ingesting the relevant data in a .csv.gz file
type. Then, do:
CREATE OR REPLACE EXTERNAL TABLE `dtc-de.trips_data_all.fhv_tripdata`
OPTIONS (
format = 'CSV',
uris = ['gs://dtc_data_lake_dtc-de-updated/data/fhv_all/fhv_tripdata_2019-*.csv.gz']
);
As an example. You should no longer have any data type issues for week 4.
Solution: add order by in the partition by part of both staging files. Keep adding columns
to order by until the number of rows in the fact_trips table is consistent when re-running the
fact_trips model.
We partition by vendor id and pickup_datetime and choose the first row (rn=1) from all these
partitions. These partitions are not ordered, so every time we run this, the first row might be a
different one. Since the first row is different between runs, it might or might not contain an
unknown borough. Then, in the fact_trips model we will discard a different number of rows when
we discard all values with an unknown borough.
with dim_zones as (
select * from `engaged-cosine-374921`.`dbt_victoria_mola`.`dim_zones`
where borough != 'Unknown'
),
fhv as (
select * from `engaged-cosine-374921`.`dbt_victoria_mola`.`stg_fhv_tripdata`
)
select * from fhv
inner join dim_zones as pickup_zone
on fhv.PUlocationID = pickup_zone.locationid
inner join dim_zones as dropoff_zone
on fhv.DOlocationID = dropoff_zone.locationid
);
Some ehail fees are null and casting them to integer gives Bad int64 value: 0.0 error,
Solution:
Using safe_cast returns NULL instead of throwing an error. So use safe_cast from dbt_utils
function in the jinja code for casting into integer as follows:
{{ dbt_utils.safe_cast('ehail_fee', api.Column.translate_type("integer"))}} as
ehail_fee,
You don't need to change the environment type. If you are following the videos, you are
creating a Production Deployment, so the only available option is the correct one.'
Could not parse the dbt project. please check that the
repository contains a valid dbt project
Running the Environment on the master branch causes this error, you must activate “Only run
on a custom branch” checkbox and specify the branch you are working when Environment is
setup.
Made change to your modeling files and commit the your
development branch, but Job still runs on on old file?
Change to main branch, make a pull request from the development branch.
Note: this will take you to github.
Approve the merging and rerun you job, it would work as planned now
Use the syntax below instead if the code in the tutorial is not working.
dbt run --select stg_green_tripdata --vars '{"is_test_run": false}'
Week 5
java.lang.IllegalAccessError: class
org.apache.spark.storage.StorageUtils$ (in unnamed module @0x3c947bc5)
cannot access class sun.nio.ch.DirectBuffer (in module java.base)
because module java.base does not export sun.nio.ch to unnamed
module @0x3c947bc5
Solution: Java 17 or 19 is not supported by Spark. Spark 3.x: requires Java 8/11/16. Install Java
11 from the website provided in the windows.md setup file.
Python was not found; run without arguments to install from the
Microsoft Store, or disable this shortcut from Settings > Manage
App Execution Aliases.
I found this error while executing the user defined function in Spark (crazy_stuff_udf). I am
working on Windows and using conda. After following the setup instructions, I found that the
PYSPARK_PYTHON environment variable was not set correctly, given that conda has different
python paths for each environment.
Solution:
import findspark
findspark.init()
Easy setup with miniconda env (worked on MacOS)
If anyone is a Pythonista or becoming one (which you will essentially be one along this journey),
and desires to have all python dependencies under same virtual environment (e.g. conda) as
done with prefect and previous exercises, simply follow these steps
All default commands of spark will be also available at shell session under activated enviroment.
As of the current latest Spark version (3.3.2), it supports JDK 8 / 11 / 17. All of which can be easily
installed with SDKMan!:
More importantly, Python 3.11 is not yet stable for PySpark. So, make sure you're setting up your
virtualenv with either Python 3.9 or Python 3.10
After installing all including pyspark (and it is successfully imported), but then running this script
on the jupyter notebook
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local[*]") \
.appName('test') \
.getOrCreate()
df = spark.read \
.option("header", "true") \
.csv('taxi+_zone_lookup.csv')
df.show()
RuntimeError: Java gateway process exited before sending its port number
import findspark
findspark.init()
- Once everything is installed, skip to 7:14 to set up environment variables. This allows for
the environment variables to be set permanently.
import findspark
findspark.init()
None of the solutions above worked for me till I ran !pip3 install pyspark
instead !pip install pyspark.
Additionally, you can check for the version of ‘py4j’ of the spark you’re using from here and
update as mentioned above.
This is because Python 3.11 has some inconsistencies with such an old version of Spark. The
solution is a downgrade in the Python version. Python 3.9 using a conda environment takes
care of it.
Full steps:
2. Install Python:
b. cd notebook
c. virtualenv jupyterenv
d. source jupyterenv/bin/activate
a. jupyter notebook
Error java.io.FileNotFoundException
Code executed:
df = spark.read.parquet(pq_path)
… some operations on df …
df.write.parquet(pq_path, mode="overwrite")
java.io.FileNotFoundException: File
file:/home/xxx/code/data/pq/fhvhv/2021/02/part-00021-523f9ad5-14af-4332-9434-bdcb0831
f2b7-c000.snappy.parquet does not exist
The problem is that Sparks performs lazy transformations, so the actual action that trigger the
job is df.write, which does delete the parquet files that is trying to read (mode=”overwrite”)
SELECT [attributes]
FROM [table]
WHERE [filter]
What differs the most between several SQL providers are built-in functions.
https://databricks.com/glossary/what-is-spark-sql#:~:text=Spark%20SQL%20is%20a%20Spark,
on%20existing%20deployments%20and%20data.
The spark viewer on localhost:4040 was not showing the
current run
✅Solution: I had two notebooks running, and the one I wanted to look at had opened a port on
localhost:4041.
If port is in use, then Spark uses next. It can be even 4044. You can run
spark.sparkContext.uiWebUrl
java.lang.NoSuchMethodError:
sun.nio.ch.DirectBuffer.cleaner()Lsun/misc/Cleaner Error
during repartition call (conda pyspark installation)
✅Solution: replace Java Developer Kit 11 with Java Developer Kit 8.
RuntimeError: Java gateway process exited before sending
its port number
Shows java_home is not set on the notebook log
https://sparkbyexamples.com/pyspark/pyspark-exception-java-gateway-process-exited-before-s
ending-the-driver-its-port-number/
spark = SparkSession.builder.master('local[*]') \
.appName('spark-read-from-bigquery') \
.config('BigQueryProjectId','razor-project-xxxxxxx) \
.config('BigQueryDatasetLocation','de_final_data') \
.config('parentProject','razor-project-xxxxxxx) \
.config("google.cloud.auth.service.account.enable", "true") \
.config("credentialsFile", "google_credentials.json") \
.config("GcpJsonKeyFile", "google_credentials.json") \
.config("spark.driver.memory", "4g") \
.config("spark.executor.memory", "2g") \
.config("spark.memory.offHeap.enabled",True) \
.config("spark.memory.offHeap.size","5g") \
.config('google.cloud.auth.service.account.json.keyfile',
"google_credentials.json") \
.config("fs.gs.project.id", "razor-project-xxxxxxx") \
.config("fs.gs.impl",
"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
.config("fs.AbstractFileSystem.gs.impl",
"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
.getOrCreate()
spark =
SparkSession.builder.master('local').appName('bq').config("spark.jars.package
s",
"com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.23.2").getOrC
reate()
automatically downloads the required dependency jars and configures the connector, removing
the need to manage this dependency. More details available here
has anyone figured out how to read from GCP data lake instead
of downloading all the taxi lodata again?
There’s a few extra steps to go into reading from GCS with PySpark
1.) IMPORTANT: Download the Cloud Storage connector for Hadoop here:
https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage#clusters
As the name implies, this .jar file is what essentially connects PySpark with your GCS
2.) Move the .jar file to your Spark file directory. I installed Spark using homebrew on my MacOS
machine and I had to create a /jars directory under "/opt/homebrew/Cellar/apache-spark/3.2.1/
(where my spark dir is located)
3.) In your Python script, there are a few extra classes you’ll have to import:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
4.) You must set up your configurations before building your SparkSession. Here’s my code
snippet:
conf = SparkConf() \
.setMaster('local[*]') \
.setAppName('test') \
.set("spark.jars",
"/opt/homebrew/Cellar/apache-spark/3.2.1/jars/gcs-connector-hadoop3-la
test.jar") \
.set("spark.hadoop.google.cloud.auth.service.account.enable",
"true") \
.set("spark.hadoop.google.cloud.auth.service.account.json.keyfile",
"path/to/google_credentials.json")
sc = SparkContext(conf=conf)
sc._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.gs.impl",
"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
sc._jsc.hadoopConfiguration().set("fs.gs.impl",
"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
sc._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.json.key
file", "path/to/google_credentials.json")
sc._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.enable",
"true")
5.) Once you run that, build your SparkSession with the new parameters we’d just instantiated in
the previous step:
spark = SparkSession.builder \
.config(conf=sc.getConf()) \
.getOrCreate()
6.) Finally, you’re able to read your files straight from GCS!
df_green = spark.read.parquet("gs://{BUCKET}/green/202*/")
How can I read a small number of rows from the parquet file
directly?
from pyarrow.parquet import ParquetFile
pf = ParquetFile('fhvhv_tripdata_2021-01.parquet')
#pyarrow builds tables, not dataframes
tbl_small = next(pf.iter_batches(batch_size = 1000))
#this function converts the table to a dataframe of manageable size
df = tbl_small.to_pandas()
df = spark.read.parquet('fhvhv_tripdata_2021-01.parquet')
df1 = df.sort('DOLocationID').limit(1000)
pdf = df1.select("*").toPandas()
gcsu
When defining the schema, the PULocation and DOLocationID are defined as IntegerType. This
will cause an error because the Parquet file is INT64 and you’ll get an error like:
Parquet column cannot be converted in file [...] Column [...] Expected: int, Found:
INT64
Change the schema definition from IntegerType to LongType and it should work
To bashrc.
Now add
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
export
PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH"
in your .bashrc file.
This error comes up on the Spark video 5.3.1 - First Look at Spark/PySpark,
because as at the creation of the video, 2021 data was the most recent which utilised csv
files but as at now its parquet.
So when you run the command spark.createDataFrame(df1_pandas).show(),
You get the Attribute error. This is caused by the pandas version 2.0.0 which seems
incompatible with Spark 3.3.2, so to fix it you have to downgrade pandas to 1.5.3 using the
command pip install -U pandas==1.5.3
Another option is adding the following after importing pandas, if one does not want to
downgrade pandas version (source) :
pd.DataFrame.iteritems = pd.DataFrame.items
The homework says we will be loading data from June 2021 HVFHV Data.
This is very basic but it will save some time.
You can either type the export command every time you run a new session, add it to the
.bashrc/ which you can find in /home or run this command at the beginning of your jupyter
notebook:
import findspark
findspark.init()
Compressed file ended before the end-of-stream marker was
reached
I solved this issue: unzip the file with:
!gzip -d fhvhv_tripdata_2021-01.csv.gz
In the code along from Video 5.3.3 Alexey downloads the CSV files from the NYT website and
gzips them in their bash script. If we now (2023) follow along but download the data from the
GH course Repo, it will already be zippes as csv.gz files. Therefore we zip it again if we follow
the code from the video exactly. This then leads to gibberish outcome when we then try to cat
the contents or count the lines with zcat, because the file is zipped twitch and zcat only unzips it
once.
✅solution: do not gzip the files downloaded from the course repo. Just wget them and save
them as they are as csv.gz files. Then the zcat command and the showSchema command will
also work
URL="${URL_PREFIX}/${TAXI_TYPE}/${TAXI_TYPE}_tripdata_${YEAR}-${FMONTH}.csv.gz"
LOCAL_PREFIX="data/raw/${TAXI_TYPE}/${YEAR}/${FMONTH}"
LOCAL_FILE="${TAXI_TYPE}_tripdata_${YEAR}_${FMONTH}.csv.gz"
LOCAL_PATH="${LOCAL_PREFIX}/${LOCAL_FILE}"
reference: https://github.com/bitnami/containers/issues/13409
c. build docker image by navigating to above directory and running docker build
command
navigate cd bitnami/spark/3.3/debian-11/
build command docker build -t spark:3.3-java-17 .
2. run docker compose
using following file
```yaml docker-compose.yml
version: '2'
services:
spark:
image: spark:3.3-java-17
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
volumes:
- "./:/home/jovyan/work:rw"
ports:
- '8080:8080'
- '7077:7077'
spark-worker:
image: spark:3.3-java-17
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
volumes:
- "./:/home/jovyan/work:rw"
ports:
- '8081:8081'
spark-nb:
image: jupyter/pyspark-notebook:java-17.0.5
environment:
- SPARK_MASTER_URL=spark://spark:7077
volumes:
- "./:/home/jovyan/work:rw"
ports:
- '8888:8888'
- '4040:4040'
```
run command to deploy docker compose
docker-compose up
Access jupyter notebook using link logged in docker compose logs
Spark master url is spark://spark:7077
How do you read data stored in gcs on pandas with your local
computer?
To do this
pip install gcsfs,
Thereafter copy the uri path to the file and use
df = pandas.read_csc(gs://path)
Solution:
Affiliated_base_number is a mix of letters and numbers (you can check this with a preview of the
table), so it cannot be set to DoubleType (only for double-precision numbers). The suitable type
would be StringType. Spark inferSchema is more accurate than Pandas infer type method in this
case. You can set it to true while reading the csv, so you don’t have to take out any data from
your dataset. Something like this can help:
df = spark.read \
.options(
header = "true", \
inferSchema = "true", \
)\
.csv('path/to/your/csv/file/')
Solution B:
It's because some rows in the affiliated_base_number are null and therefore it is assigned the
datatype String and this cannot be converted to type Double. So if you really want to convert
this pandas df to a pyspark df only take the rows from the pandas df that are not null in the
'Affiliated_base_number' column. Then you will be able to apply the pyspark function
createDataFrame.
Solution:
Increase the memory of the executor when creating the Spark session like this:
spark = SparkSession.builder \
.master("local[*]") \
.appName('test') \
.config("spark.executor.memory", "4g") \
.config("spark.driver.memory", "4g") \
.getOrCreate()
Remember to restart the Jupyter session (ie. close the Spark session) or the config won’t take
effect.
Starting up a cluster:
Instead of configuring paths in ~/.bashrc, I created .env file in the root of my workspace:
JAVA_HOME="${HOME}/app/java/jdk-11.0.2"
PATH="${JAVA_HOME}/bin:${PATH}"
SPARK_HOME="${HOME}/app/spark/spark-3.3.2-bin-hadoop3"
PATH="${SPARK_HOME}/bin:${PATH}"
PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.5-src.zip:$PYTHONPATH"
PYTHONPATH="${SPARK_HOME}/python/lib/pyspark.zip:$PYTHONPATH"
How to port forward outside VS Code
I don’t use visual studio, so I did it the old fashioned way: ssh -L 8888:localhost:8888 <my
user>@<VM IP> (replace user and IP with the ones used by the GCP VM, e.g. : ssh -L
8888:localhost:8888 [email protected]
If you are doing wc -l fhvhv_tripdata_2021-01.csv.gz with the gzip file as the file
argument, you will get a different result. Unzip the file and then do wc -l
fhvhv_tripdata_2021-01.csv to get the right results.
`spark-submit` errors
when trying to:
URL="spark://$HOSTNAME:7077"
spark-submit \
--master="{$URL}" \
06_spark_sql.py \
--input_green=data/pq/green/2021/*/ \
--input_yellow=data/pq/yellow/2021/*/ \
--output=data/report-2021
and you get errors like the following (SUMMARIZED):
WARN Utils: Your hostname, <HOSTNAME> resolves to a loopback address..
WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Setting
default log level to "WARN".
Exception in thread "main" org.apache.spark.SparkException: Master must either be
yarn or start with spark, mesos, k8s, or local at …
1. Go to %SPARK_HOME%\bin
2. Run spark-class org.apache.spark.deploy.master.Master to run the
master. This will give you a URL of the form spark://ip:port
3. Run spark-class org.apache.spark.deploy.worker.Worker
spark://ip:port to run the worker. Make sure you use the URL you obtained in
step 2.
4. Create a new Jupyter notebook:
spark = SparkSession.builder \
.master("spark://192.168.0.38:7077") \
.appName('test') \
.getOrCreate()
5. Check on Spark UI the master, worker and app.
This occurs because you are not logged in “gcloud auth login” and maybe the project id is not
settled. Then type in a terminal:
This will open a tab in the browser, accept the terms, after that close the tab if you want. Then
set the project is like:
Then you can run the command to upload the pq dir to a GCS Bucket:
When submit a job, it might throw an error about Java in log panel within Dataproc. I changed
the Versioning Control when I created a cluster, so it means that I delete the cluster and created
a new one, and instead of choosing Debian-Hadoop-Spark, I switch to Ubuntu
20.02-Hadoop3.3-Spark3.3 for Versioning Control feature, the main reason to choose this is
because I have the same Ubuntu version in mi laptop, I tried to find documentation to sustent
this but unfortunately I couldn't nevertheless it works for me.
Week 6
Could not start docker image “control-center” from the
docker-compose.yaml file.
On Mac OSX 12.2.1 (Monterey) I could not start the kafka control center. I opened Docker
Desktop and saw docker images still running from week 4, which I did not see when I typed
“docker ps.” I deleted them in docker desktop and then had no problem starting up the kafka
environment.
source env/bin/activate
To deactivate it:
deactivate
This works on MacOS, Linux and Windows - but for Windows the path is slightly different
(it's env/Scripts/activate)
Also the virtual environment should be created only to run the python file. Docker images should
first all be up and running.
... you may have to load librdkafka-5d2e2910.dll in the code. Add this before importing
avro:
It seems that the error may occur depending on the OS and python version installed.
ALTERNATIVE:
Source: https://githubhot.com/repo/confluentinc/confluent-kafka-python/issues/1186?page=2
● https://github.com/confluentinc/confluent-kafka-python/issues/590
● https://github.com/confluentinc/confluent-kafka-python/issues/1221
● https://stackoverflow.com/questions/69085157/cannot-import-producer-from-confluent-ka
fka
Got this error because the docker container memory was exhausted. The dta file was upto
800MB but my docker container does not have enough memory to handle that.
Solution was to load the file in chunks with Pandas, then create multiple parquet files for each
dat file I was processing. This worked smoothly and the issue was resolved.
data-engineering-zoomcamp/week_6_stream_processing/python/
resources/rides.csv is missing
Copy the file found in the Java example:
data-engineering-zoomcamp/week_6_stream_processing/java/kafka_examples/src/main/resour
ces/rides.csv
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
If you have this error, it most likely that your kafka broker docker container is not working.
Use docker ps to confirm
Then in the docker compose yaml file folder, run docker compose up -d to start all the
instances.
PipeRider workshop
Step 1.:
Start off by copying the local times from your GCS to a local directory in your computer with
gsutil:
For instance:
gsutil cp -r "gs://iobruno_datalake_raw/dtc_ny_taxi_tripdata ."
datasets/
├── fhv
│ ├── fhv_tripdata_2019-01.parquet.snappy
│ ├── fhv_tripdata_2019-02.parquet.snappy
│ ├── fhv_tripdata_2019-03.parquet.snappy
│ ├── fhv_tripdata_2019-04.parquet.snappy
│ ├── fhv_tripdata_2019-05.parquet.snappy
│ ├── fhv_tripdata_2019-06.parquet.snappy
│ ├── fhv_tripdata_2019-07.parquet.snappy
│ ├── fhv_tripdata_2019-08.parquet.snappy
│ ├── fhv_tripdata_2019-09.parquet.snappy
│ ├── fhv_tripdata_2019-10.parquet.snappy
│ ├── fhv_tripdata_2019-11.parquet.snappy
│ └── fhv_tripdata_2019-12.parquet.snappy
├── green
│ ├── green_tripdata_2019-01.parquet.snappy
│ ├── green_tripdata_2019-02.parquet.snappy
│ ├── green_tripdata_2019-03.parquet.snappy
│ ├── green_tripdata_2019-04.parquet.snappy
│ ├── green_tripdata_2019-05.parquet.snappy
│ ├── green_tripdata_2019-06.parquet.snappy
│ ├── green_tripdata_2019-07.parquet.snappy
│ ├── green_tripdata_2019-08.parquet.snappy
│ ├── green_tripdata_2019-09.parquet.snappy
│ ├── green_tripdata_2019-10.parquet.snappy
│ ├── green_tripdata_2019-11.parquet.snappy
│ ├── green_tripdata_2019-12.parquet.snappy
│ ├── green_tripdata_2020-01.parquet.snappy
│ ├── green_tripdata_2020-02.parquet.snappy
│ ├── green_tripdata_2020-03.parquet.snappy
│ ├── green_tripdata_2020-04.parquet.snappy
│ ├── green_tripdata_2020-05.parquet.snappy
│ ├── green_tripdata_2020-06.parquet.snappy
│ ├── green_tripdata_2020-07.parquet.snappy
│ ├── green_tripdata_2020-08.parquet.snappy
│ ├── green_tripdata_2020-09.parquet.snappy
│ ├── green_tripdata_2020-10.parquet.snappy
│ ├── green_tripdata_2020-11.parquet.snappy
│ └── green_tripdata_2020-12.parquet.snappy
├── yellow
│ ├── yellow_tripdata_2019-01.parquet.snappy
│ ├── yellow_tripdata_2019-02.parquet.snappy
│ ├── yellow_tripdata_2019-03.parquet.snappy
│ ├── yellow_tripdata_2019-04.parquet.snappy
│ ├── yellow_tripdata_2019-05.parquet.snappy
│ ├── yellow_tripdata_2019-06.parquet.snappy
│ ├── yellow_tripdata_2019-07.parquet.snappy
│ ├── yellow_tripdata_2019-08.parquet.snappy
│ ├── yellow_tripdata_2019-09.parquet.snappy
│ ├── yellow_tripdata_2019-10.parquet.snappy
│ ├── yellow_tripdata_2019-11.parquet.snappy
│ ├── yellow_tripdata_2019-12.parquet.snappy
│ ├── yellow_tripdata_2020-01.parquet.snappy
│ ├── yellow_tripdata_2020-02.parquet.snappy
│ ├── yellow_tripdata_2020-03.parquet.snappy
│ ├── yellow_tripdata_2020-04.parquet.snappy
│ ├── yellow_tripdata_2020-05.parquet.snappy
│ ├── yellow_tripdata_2020-06.parquet.snappy
│ ├── yellow_tripdata_2020-07.parquet.snappy
│ ├── yellow_tripdata_2020-08.parquet.snappy
│ ├── yellow_tripdata_2020-09.parquet.snappy
│ ├── yellow_tripdata_2020-10.parquet.snappy
│ ├── yellow_tripdata_2020-11.parquet.snappy
│ └── yellow_tripdata_2020-12.parquet.snappy
Step 2.:
meta:
External_location:
"read_parquet('/path/to/datasets/where/you/downloadeded/from/gcs/{name
}/*')":
Note the {name} expression above. That will be replaced by the name of tables that define in
the block below. That means:
It will attempt to fetch the parquets for the yellow_tripdata dataset in:
/path/to/datasets/where/you/downloadeded/from/gcs/yellow/
version: 2
sources:
- name: parquet
meta:
external_location:
"read_parquet('/Users/iobruno/Vault/datasets/{name}/*.parquet.snappy')"
tables:
- name: fhv
- name: green
- name: yellow
- name: zone_lookup
With that, you can start fresh with DuckDB, with no tables loaded on it whatsoever.
Pretty neat, huh!?
Solution: Just open a “java project” in your visual studio code, and the dependencies etc. seem
to be loaded correctly.
Project
● Each submitted project will be evaluated by 3 (three) randomly assigned students that
have also submitted the project.
● You will also be responsible for grading the projects from 3 fellow students yourself.
Please be aware that: not complying to this rule also implies you failing to achieve the
Certificate at the end of the course.
● The final grade you get will be the median score of the grades you get from the peer
reviewers.
● And of course, the peer review criteria for evaluating or being evaluated must follow the
guidelines defined here.
spark = (SparkSession
.builder
.appName(app_name)
.master(master=master)
.getOrCreate())
spark.streams.resetTerminated()
query1 = spark
.readStream
…
…
.load()
query2 = spark
.readStream
…
…
.load()
query3 = spark
.readStream
…
…
.load()
query1.start()
query2.start()
query3.start()
https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_modules/airflow/provide
rs/google/cloud/operators/dataproc.html
- DataProc Administrator
dataproc_jars =
["gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.24.0
.jar"]
The question is that sometimes even if you take plenty of effort to document every single step,
and we can't even sure if the person doing the peer review will be able to follow-up, so how this
criteria will be evaluated?
Alex clarifies: “Ideally yes, you should try to re-run everything. But I understand that not
everyone has time to do it, so if you check the code by looking at it and try to spot errors, places
with missing instructions and so on - then it's already great”
Add here any post links where you share some thoughts and insights you’ve got during an
assessment of other participants’ projects.
Change the path of the google_credentials mounting in the docker-compose file to an absolute
one. For example in Ubuntu,
*** Failed to fetch log file from worker. Request URL missing either an 'http://' or
'https://' protocol.
✅I resolved it by running:
docker-compose down -v --rmi all --remove-orphans
docker-compose up
_PIP_ADDITIONAL_REQUIREMENTS:
build:
context: .
dockerfile: ./Dockerfile
environment:
_PIP_ADDITIONAL_REQUIREMENTS:${_PIP_ADDITIONAL_REQUIREMENTS:-}
E.g
_PIP_ADDITIONAL_REQUIREMENTS:${_PIP_ADDITIONAL_REQUIREMENTS:- pyspark}
See documentation:
https://airflow.apache.org/docs/docker-stack/entrypoint.html#installing-additional-requireme
nts
Make sure that you update your Airflow image to a more recent one. Inside your Dockerfile,
modify the FROM apache/airflow:2.2.3 to any of the more recent images available in the
official Airflow Docker repository, available at https://hub.docker.com/r/apache/airflow/tags
Airflow web login issue on docker:
I was unable to log onto my linux instance of airflow with the web password until I modified the
config file in docker_compose.yaml from:
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
to :
_AIRFLOW_WWW_USER_CREATE=True
_AIRFLOW_WWW_USER_USERNAME=airflow
_AIRFLOW_WWW_USER_PASSWORD=airflow
google.api_core.exceptions.NotFound:
If you stuck with this problem - check this - https://github.com/mozilla/bigquery-etl/issues/1409
Make sure first that you put the credentials file into the directory. Then proceed.
How did I solve it? Those are the changes I made on the docker-compose.yaml:
volumes:
~/.google/credentials/<credentials_file_name>.json:/.google/credentia
ls/google_credentials.json:ro
environment:
AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT:
'google-cloud-platform://?key_path=%2F.google%2Fcredentialsgoogle_cre
dentials.json&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-p
latform&project=<project_id>&num_retries=5'
In this stage, you might get another error, associated with directory permissions. Just grant a
per
Case:
Airflow DAG tries to create external table in BigQuery, which refers to Google cloud storage
parquet file, and fails with the error: "CsvOptions can only be specified if storage format is CSV."
bigquery_external_table_task = BigQueryCreateExternalTableOperator(
task_id="bigquery_external_table_task",
table_resource={
"tableReference": {
"projectId": PROJECT_ID,
"datasetId": BIGQUERY_DATASET,
"tableId": "external_table",
},
"externalDataConfiguration": {
"sourceFormat": "PARQUET",
"sourceUris": [f"gs://{BUCKET}/raw/{parquet_file}"],
},
},
)
Solution:
Add an underscore sign in the names of the two parameters of
BigQueryCreateExternalTableOperator:
"source_format"
"source_uris"
bigquery_external_table_task = BigQueryCreateExternalTableOperator(
task_id="bigquery_external_table_task",
table_resource={
"tableReference": {
"projectId": PROJECT_ID,
"datasetId": BIGQUERY_DATASET,
"tableId": "external_table",
},
"externalDataConfiguration": {
"source_format": "PARQUET",
"source_uris": [f"gs://{BUCKET}/raw/{parquet_file}"],
"autodetect": True,
},
},
)
The full class description with the correct names can be found in the following source file:
https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/operators/bigquery.p
y
Assigning the unprivileged Postgres user as the owner of the Postgres data directory
# Limits VM memory to use no more than 4 GB, this can be set as whole
numbers using GB or MB
memory=6GB
```
This is due to not being able to connect to the container port. In this case, we can point to the
localhost port.
Command:
python3 ingest_data.py \
--user=root \
--password=root \
--host=localhost \
--port=5432 \
--db=ny_taxi \
--table_name=green_taxi_trips \
--url="https://github.com/DataTalksClub/nyc-tlc-data/releasexecute
binaryes/download/green/green_tripdata_2019-01.csv.gz"
Make sure you use below command, before pushing image to Docker hub
docker login
# Copy the list of env packages needed. Don't forget what's on the
list
COPY docker_env_req.txt .
✅Solution:
From Jeff Hale: Looks like a Docker copy error. Maybe you can't copy a file to a directory that is
being created in the same statement. So change the copy command from the .py file to the
name of the folder. For example, copy the local flows folder into the /opt/prefect/flows folder.
Code
Then I revert to
COPY parametric_web_to_gcs.py /opt/prefect/flowsBut got an error:
Script at 'parametric_web_to_gcs.py' encountered an exception:
FileNotFoundError(2, 'No such file or directory')
Build the image and push it again then deploy. And it works! The
mystery of the universe
URL:
https://datatalks-club.slack.com/archives/C01FABYF2RG/p167496466581708
9?thread_ts=1674570724.323079&cid=C01FABYF2RG
Alternative solution:
Managed to make this work by adding a / to the copy lines in Dockerfile, like so:
✅THE SOLUTION:
Add the nameserver line below into /etc/resolv.conf
- sudo nano /etc/resolv.conf
```
nameserver 8.8.8.8
```
UPDATE: I noticed that this can happen if you are logged into the prefect cloud even in another
terminal. Log out of prefect cloud `prefect cloud logout`
The only regions available in Europe for Data Transfer are europe-north1, europe-west2
and europe-west6 [Source]
Even if the source and destination locations are same, Big query can still cause errors,
apparently its an internal bug
for americas, one of the “bug free” regions is us-east4 where the source destination error
should not occur.
I had to re-create my bigQuery dataset, re-run the airflow DAGs only for creating external and
partitioned table (didn’t have to change location of gcs) and then dbt run worked