Skip to content

Lab: Data Pipeline

คุณได้รับ raw data จากระบบลงทะเบียน — ข้อมูลยังไม่สะอาด, type ยังไม่ถูก, ต้อง clean → JOIN → สรุป ให้เสร็จใน query เดียว

เวลาที่ใช้: ~30 นาที

ระบบลงทะเบียนส่ง raw data มาเป็นตาราง raw_enrollments ที่ทุกคอลัมน์เป็น TEXT:

student_idstudent_namecourse_idcourse_namecreditsscoreenrolled_date
1สมชายCS101Database385.52025-08-15
1สมชายCS102Web Dev372.02025-08-16
2สมหญิงCS101Database392.02025-08-15
2สมหญิงCS103Analytics488.52025-08-17
3วิชัยNULLNULLNULLNULLNULL
  1. สร้าง raw table และ INSERT ข้อมูลทั้งหมด

  2. Query 1: Clean & CAST — แปลง credits เป็น INTEGER, score เป็น NUMERIC, enrolled_date เป็น DATE

  3. Query 2: สรุปต่อนักเรียน — ใช้ CTE จาก Query 1 แล้ว GROUP BY เพื่อหา: จำนวนวิชา, เกรดเฉลี่ย, หน่วยกิตรวม

  4. Query 3: เพิ่มนักเรียนใหม่ — INSERT STD004 (มาลี) ที่ยังไม่ลงวิชา แล้วรัน Query 2 ใหม่ ดูว่ามาลีปรากฏไหม

  5. Query 4: UPDATE คะแนน — แก้คะแนน สมชาย CS101 จาก 85.5 เป็น 90.0

  6. Query 5: Full Pipeline — รวมทุกขั้นเป็น CTE เดียว ที่ clean → JOIN → aggregate → rank

CREATE TABLE raw_enrollments (
student_id TEXT,
student_name TEXT,
course_id TEXT,
course_name TEXT,
credits TEXT,
score TEXT,
enrolled_date TEXT
);
INSERT INTO raw_enrollments VALUES ('1','สมชาย','CS101','Database','3','85.5','2025-08-15');
INSERT INTO raw_enrollments VALUES ('1','สมชาย','CS102','Web Dev','3','72.0','2025-08-16');
INSERT INTO raw_enrollments VALUES ('2','สมหญิง','CS101','Database','3','92.0','2025-08-15');
INSERT INTO raw_enrollments VALUES ('2','สมหญิง','CS103','Analytics','4','88.5','2025-08-17');
INSERT INTO raw_enrollments VALUES ('3','วิชัย',NULL,NULL,NULL,NULL,NULL);
-- เริ่มเขียน query ด้านล่าง
ดูเฉลย

Query 1: Clean & CAST

SELECT
CAST(student_id AS INTEGER) AS student_id,
student_name,
course_id,
course_name,
CAST(credits AS INTEGER) AS credits,
CAST(score AS NUMERIC(5,2)) AS score,
CAST(enrolled_date AS DATE) AS enrolled_date
FROM raw_enrollments
WHERE course_id IS NOT NULL;

Query 2: สรุปต่อนักเรียน (CTE)

WITH cleaned AS (
SELECT
CAST(student_id AS INTEGER) AS student_id,
student_name,
CAST(credits AS INTEGER) AS credits,
CAST(score AS NUMERIC(5,2)) AS score
FROM raw_enrollments
WHERE course_id IS NOT NULL
)
SELECT
student_name,
COUNT(*) AS num_courses,
ROUND(AVG(score), 2) AS avg_score,
SUM(credits) AS total_credits
FROM cleaned
GROUP BY student_id, student_name
ORDER BY avg_score DESC;

ผลลัพธ์: สมหญิง (2 วิชา, avg 90.25, 7 credits), สมชาย (2 วิชา, avg 78.75, 6 credits)

Query 3: INSERT + LEFT JOIN

INSERT INTO raw_enrollments VALUES ('4','มาลี',NULL,NULL,NULL,NULL,NULL);
-- ใช้ LEFT JOIN เพื่อรวมคนที่ยังไม่ลง
WITH cleaned AS (
SELECT DISTINCT
CAST(student_id AS INTEGER) AS student_id,
student_name
FROM raw_enrollments
),
with_courses AS (
SELECT
c.student_id, c.student_name,
CAST(r.credits AS INTEGER) AS credits,
CAST(r.score AS NUMERIC(5,2)) AS score
FROM cleaned c
LEFT JOIN raw_enrollments r
ON CAST(r.student_id AS INTEGER) = c.student_id
AND r.course_id IS NOT NULL
)
SELECT
student_name,
COUNT(score) AS num_courses,
COALESCE(ROUND(AVG(score), 2), 0) AS avg_score,
COALESCE(SUM(credits), 0) AS total_credits
FROM with_courses
GROUP BY student_id, student_name
ORDER BY avg_score DESC;

Query 4: UPDATE

UPDATE raw_enrollments
SET score = '90.0'
WHERE student_id = '1' AND course_id = 'CS101';

Query 5: Full Pipeline

WITH
-- ขั้น 1: Clean data
clean_data AS (
SELECT
CAST(student_id AS INTEGER) AS sid,
student_name AS name,
course_name,
CAST(credits AS INTEGER) AS credits,
CAST(score AS NUMERIC(5,2)) AS score
FROM raw_enrollments
WHERE course_id IS NOT NULL
),
-- ขั้น 2: Aggregate
summary AS (
SELECT
name,
COUNT(*) AS courses,
ROUND(AVG(score), 2) AS avg_score,
SUM(credits) AS total_credits
FROM clean_data
GROUP BY sid, name
)
-- ขั้น 3: Rank
SELECT
name, courses, avg_score, total_credits,
RANK() OVER (ORDER BY avg_score DESC) AS rank
FROM summary;

บทเรียนรวม:

  • Raw data มักเป็น TEXT ทั้งหมด — ต้อง CAST ก่อนใช้
  • CTE ทำให้ pipeline อ่านง่ายจากบนลงล่าง
  • Safety first: SELECT ก่อน UPDATE/DELETE เสมอ