12. Practicas AWS
12.1. Practica ETL en AWS
- En un bucket S3 (datos_etl) crear dos directorios
- ciclistas
- ciclistas_procesados
- Con AWS Glue
- Utilizando el Data Catalog, crear:
- Base de datos (datos_ciclistas)
- Tabla (ciclistas)
- Crear un job (Lab Role) S3 Originales --> Eliminar Duplicados --> Filtrar filas --> S3 Procesados
- RedShift (Lab Role)
- Crear cluster admin Qwe_1234
- Crear Base de Datos
- Crear Tabla y Definir campos
- Cargar datos en la Tabla (IGNOREHEADER AS 1)
- Crear una función Lamda que haga de trigger (lanzador) del Job
12.2. ETL con jobs en AWS Glue
Servicios AWS: - AWS S3 - AWS Glue Studio - AWS Glue Data Catalog - AWS Glue Job - AWS RedShift - AWS Lambda
La práctica consiste en hacer una ETL con AWS Glue utilizando un Job Visual. Los datos de partida son un fichero en formato csv con datos de ciclistas. Cargaremos los datos en un bucket S3 que tendrá dos carpetas, una para los datos de entrada y otra para los datos procesados. Crearemos un job que elimine los duplicados y haga un filtro y seleccione solo las filas con el campo Severity = "Grave". Luego cargaremos los datos en un cluster RedShift y finalmente prepararemos una función lambda que lance el job. Revisaremos cómo se monitorizan los jobs y las funciones lambda.
Los campos del fichero son:
Pasos a seguir: - En un bucket S3 (datos_etl) crear dos directorios - ciclistas - ciclistas_procesados - Con AWS Glue - Utilizando el Data Catalog, crear: - Base de datos (datos_ciclistas) - Tabla (ciclistas) - Crear un job (Lab Role) S3 Originales --> Eliminar Duplicados --> Filtrar filas --> S3 Procesados - RedShift (Lab Role) - Crear cluster admin Qwe_1234 - Crear Base de Datos - Crear Tabla y Definir campos - Cargar datos en la Tabla (IGNOREHEADER AS 1) - Crear una función Lambda que haga de trigger (lanzador) del Job
12.3. ETL con PySpark
Servicios AWS: - AWS S3 - AWS Glue Studio - AWS Glue Data Catalog - AWS Glue Job - AWS Glue Crawler - AWS Glue Workflow
La práctica consiste en hacer una ETL con AWS Glue utilizando código PySpark. Los datos de partida son dos ficheros en formato csv con datos de clientes y ventas. Cargaremos los datos en un bucket S3 que también tendrá una carpeta para el script de PySpark y otra para los resultados de salida. Con estos datos queremos un fichero con las ventas totales por cliente en formato JSON.
Los campos de los ficheros son: - customers: {CUSTOMERID, CUSTOMERNAME, EMAIL, CITY, COUNTRY, TERRITORY, CONTACTFIRSTNAME, CONTACTLASTNAME} - sales: {ORDERNUMBER, QUANTITYORDERED, PRICEEACH, ORDERLINENUMBER, SALES, ORDERDATE, STATUS, QTR_ID, MONTH_ID, YEAR_ID, PRODUCTLINE, MSRP, PRODUCTCODE, DEALSIZE, CUSTOMERID}
Pasos a seguir: - Crear un bucket (lago de datos) S3 con la siguiente estructura de carpetas y ficheros: - datos - clientes - customers.csv - ventas - sales.csv - Crear un crawler con AWS Glue que rastree la carpeta datos y los introduzca en una BD (ventas) - Crear un job con Glue de Spark Script Editor - El script debe: - Guardarse en la carpeta scripts del bucket - Debe seleccionar las ventas totales por cliente y guardarlas en un fichero en formato JSON en la carpeta de salida - Crear un flujo de trabajo que lo haga todo. - Añadir Workflow - Añadir Trigger - Inicio Rastreador - Añadir Nodo - Seleccionar Crawler - Añadir Trigger - Type: Evento - Start after ANY watched event - Añadir Job/Crawler - Seleccionar Crawler - Evento: Éxito
Ejemplo de código:
import sys
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession\
.builder\
.appName("SparkETL")\
.getOrCreate()
spark.catalog.setCurrentDatabase("ventas")
df = spark.sql("select * from clientes")
# df.show()
df = df.select("customername", "email")
# df.show()
df.write.format("json").mode("overwrite").save("s3://lagodatos/salida/")
12.4. Streaming con Kinesis Data Streams y Kinesis Data Firehose
Servicios AWS: - AWS S3 - AWS Glue Data Catalog - AWS Cloud9 - AWS Kinesis Data Streams - AWS Kinesis Data Firehose
La práctica consiste en crear un escenario en el que Amazon Kinesis Delivery Stream convierte los datos de origen con formato JSON en datos de destino con formato Apache Parquet mediante el esquema de tabla de catálogo de Glue y los almacena en un S3.
Pasos a seguir: - Crear un bucket S3 - Crear una base de datos y una tabla (Glue Data Catalog) con el siguiente esquema: - firstname: string - lastname: string - age: int - Configurar el data stream (Kinesis Data Streams) - Crear el delivery stream (Kinesis Firehose) - Configurar el entorno de desarrollo - Crear el código Python que hace la función de productor
import boto3
import random
import time
client = boto3.client('kinesis')
partitionkey = random.randint(10, 100);
for a in range(1, 10, 1):
edad = random.randint(1, 90);
mydata = '{ "firstname": "John", "lastname": "Smith", "age": ' + str(edad) + ' }'
print(mydata)
response = client.put_record(StreamName="flujo_luis", Data=mydata, PartitionKey=str(partitionkey))
print(response)
time.sleep(
5)
12.5. Streaming con Kinesis Data Firehose y Kinesis Data Analytics
Servicios AWS: - AWS S3 - AWS Kinesis Data Firehose - AWS Kinesis Data Analytics
La práctica consiste en crear un flujo de datos de bolsa con Amazon Kinesis Delivery Stream y con Kinesis Data Analytics guardarlos en un Bucket S3 tanto en crudo tal cual llegan, como ligeramente procesados. Para ello, utilizaremos 4 carpetas en el bucket, dos para los datos limpios (éxito y errores) y dos para los datos procesados (éxito y errores): - datos_brutos - errores_datos_brutos - datos_limpios - errores_datos_limpios
Vamos a necesitar también dos flujos de Kinesis Firehose.
Pasos a seguir:
- Crear el primer flujo (Firehose) con una entrada directa de datos (DIRECT PUT). Le asignamos las carpetas para el destino que se crearán automáticamente.
- Datos del flujo: 1 Mb o 60 seg
- Rol: LabRole
- Realizar el test del Data Stream Delivery
- Comprobar que se crean los datos en bruto en la carpeta correspondiente
- Limpieza con Kinesis Data Analytics (con aplicaciones SQL)
- Crear aplicación
- Descubrir el esquema. Confirmar que se están generando los datos de prueba
- Configurar la plantilla. Función de agregación en una ventana deslizante de tiempo
- Probar y ejecutar
- Crear el segundo flujo de destino
- Ejecutar
Ayuda: Enlace a video de ayuda
Ejemplos de Análisis de Flujo Continuo:
-- Continuous Filter
-- Performs a continuous filter based on a WHERE condition.
-- .----------. .----------. .----------.
-- | SOURCE | | INSERT | | DESTIN. |
-- Source-->| STREAM |-->| & SELECT |-->| STREAM |-->Destination
-- | | | (PUMP) | | |
-- '----------' '----------' '----------'
-- STREAM (in-application): a continuously updated entity that you can SELECT from and INSERT into like a TABLE
-- PUMP: an entity used to continuously 'SELECT ... FROM' a source STREAM, and INSERT SQL results into an output STREAM
-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ticker_symbol VARCHAR(4), sector VARCHAR(12), change REAL, price REAL);
-- Create pump to insert into output
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
-- Select all columns from source stream
SELECT STREAM ticker_symbol, sector, change, price
FROM "SOURCE_SQL_STREAM_001"
-- LIKE compares a string to a string pattern (_ matches all char, % matches substring)
-- SIMILAR TO compares string to a regex, may use ESCAPE
WHERE sector SIMILAR TO '%TECH%';
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("eventType" VARCHAR(16), "ses_timestamp" timestamp, "messageId" VARCHAR(64), "ses_to" VARCHAR(64), "ses_configuration_set" VARCHAR(32));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "eventType", "ses_timestamp", "messageId", "ses_to", "ses_configuration_set"
FROM "SOURCE_SQL_STREAM_001"
WHERE "eventType" = 'Send'
12.6. Visualización de datos con QuickSight
Para todos los ejercicios prácticos, utilizaremos el archivo de datos SaaS-Sales.csv proporcionado aquí.
Este conjunto de datos representa datos de ventas de una empresa ficticia de SaaS (Software como Servicio) que vende software de ventas y marketing a otras empresas (B2B). Cada fila de datos es una transacción/pedido.
Servicios AWS: - AWS QuickSight
Pasos a seguir: - Acceder a AWS QuickSight - Registrarse (gratis 30 días) - Crear un nuevo análisis - Cargar un dataset - Empezar a añadir visualizaciones - Publicar el informe
Ayuda: Enlace al tutorial de ayuda