Aggregation framework ช่วยให้เราเขียน query เพื่อ transform document ให้อยู่ในรูปแบบที่เราต้องการได้ หรือแม้แต่สร้างข้อมูลใหม่ๆ ที่ไม่ได้มีอยู่แล้วใน document ไหนๆ เลยขึ้นมาได้อย่างง่ายดาย ยกตัวอย่างเช่น เราสามารถใช้มันพิจารณายอดขายในแต่ละเดือน ยอดขายของสินค้าแต่ละชนิด หรือยอดสั่งซื้อรวมในช่วงเวลาที่กำหนดได้ ถ้าเทียบกับฐานข้อมูลประเภท Relational Database แล้วก็จะคล้ายๆ กับ GROUP BY
ในภาษา SQL
นั่นเอง
ถึงแม้ว่าเราสามารถเขียนโค้ดจัดการข้อมูลที่ได้จากการ query ออกมาเพื่อออกรายงานสรุป หรือ ค้นหาค่าบางอย่าง หรือ transform document ให้อยู่ในรูปแบบที่ต้องการได้ แต่การใช้ aggregation framework นั้นทำให้เราทำได้ง่าย และมีประสิทธิภาพดีกว่ากันมาก ซึ่งเราสามารถนิยามขั้นตอนต่างๆ จากนั้นส่งไปรันได้ด้วยคำสั่งเดียวก็ได้ผลลัพธ์ออกมาแล้ว
Introduction
Aggregation Framework เป็นเครื่องมือช่วยให้เราประมวลผลข้อมูลโดยใช้แนวคิด data processing pipeline ซึ่งนั่นคือสายการทำงาน (pipeline) ที่ต่อกันด้วย stage โดยข้อมูลจะถูกส่งเข้าไปใน stage แรกสุดจากนั้นประมวลผลแล้วส่งไปยัง stage ถัดไปเรื่อยๆ จนถึง stage สุดท้ายจึงจบการทำงานของ pipeline ซึ่งนั่นแปลว่าสิ่งที่เราต้องสร้างและออกแบบก็คือ pipeline นั่นเอง
เวลาเราจะใช้ aggregation framework เราจะใช้มันผ่านฟังก์ชั่น aggregate()
ที่รับ array ของ stage เข้าไป ยกตัวอย่างเช่น
db.orders.aggregate([
{ $match: { status: "A" } }, // Stage 1
{ $group: { _id: "$cust_id", total: { $sum: "$amount" } } } // Stage 2
])
ประกอบด้วย 2 stage (บรรทัดที่ 1 และ บรรทัดที่ 2)
- stage ที่ 1 ใช้คำสั่ง
$match
เพื่อหา document ที่ fieldstatus
มีค่าA
จากนั้นส่ง document ที่หาเจอไปยัง stage ถัดไป - stage ที่ 2 ใช้คำสั่ง
$group
เพื่อจัดกลุ่ม document ที่มี field$cust_id
เหมือนกันเข้าด้วยกัน จากนั้นใช้ฟังก์ชั่น$sum
เพื่อรวมค่าที่เก็บไว้ใน field$amount
ของแต่ละ document ในกลุ่มเดียวกัน เก็บไว้ใน field ใหม่ที่ชื่อtotal
Operators
aggregation operators แบ่งเป็น 2 กลุ่มใหญ่ ได้แก่
Pipeline Stage
Pipeline Stage คือ operator สำหรับบอกว่า stage ไหนให้ทำอะไร เช่น บอกให้ค้นหา document ที่ match กับค่าที่ต้องการ, บอกให้จัดกลุ่มตาม field ที่กำหนด, บอกให้เรียงลำดับจากน้อยไปหามาก, บอกให้ดึงมาเฉพาะ field ที่ต้องการ เป็นต้น
Pipeline Operators
Pipeline Operators คือ operator ที่ใช้กับ field ต่างๆ ของ document ที่เข้ามาในแต่ละ stage โดยมากใช้สำหรับการ reshape document ให้อยู่ในรูปแบบที่ต้องการ เช่น การรวมค่าของ 2 field เข้าด้วยกัน เป็นต้น ให้มองว่า pipeline operators เป็นฟังก์ชั่นที่รับ arguments เข้ามาแล้ว return ค่ากลับ ซึ่งรูปแบบทั่วไปจะใช้งานแบบนี้
{ <operator>: [ <argument1>, <argument2> ... ] }
Operators แบ่งเป็น 2 กลุ่ม ได้แก่
- ใช้กำหนดหน้าที่ของ Stage
- ใช้เป็น Operators ในแต่ละ stage
เวลาออกแบบ pipeline เราจะเริ่มจากคิดว่า stage ไหนต้องทำอะไรบ้าง และแต่ละ stage จะต้อง transform document อย่างไรบ้าง เพื่อส่งต่อไปอีก stage
Pipeline Stages
pipeline ประกอบด้วย stage หลายๆ stage มาอยู่รวมกัน ทำงานต่อกันตั้งแต่ stage แรกไปจนถึง stage สุดท้าย เมื่อเราสร้างแต่ละ stage ขึ้นมา เราจำเป็นต้องระบุว่าแต่ละ stage ต้องทำอะไรโดยใช้ Pipeline Operators
ปัจจุบัน (MongoDB 4.0) มี pipeline operators ให้ใช้งานมากมาย ในบทความนี้จะยกตัวอย่าง ให้ดูบาง operators ที่มักถูกใช้อยู่เป็นประจำ ดังนี้
$match
ใช้ filter document มีมีเงื่อนไขตรงกับที่เรากำหนดไว้ ก่อนส่ง document ที่ match ไปยัง stage ถัดไป ซึ่ง $match
มักถูกใช้ในช่วงต้นๆ ของ pipeline เพื่อลดจำนวน document ให้เหลือน้อยที่สุดเท่าที่ทำได้ ส่งผลให้ประสิทธิภาพของ pipeline ดีขึ้น เนื่องจากเหลือข้อมูลให้ process น้อยลง
$project
ใช้สำหรับเลือก field ที่จะส่งไปยัง stage ถัดไป คล้ายกับที่คำสั่ง $match ที่เราใช้เลือก document ที่จะส่งไปยัง stage ถัดไป โดยการที่เราเลือก document หรือเลือก field ก่อนที่จะส่งไปยัง stage ถัดไปนั้นส่งผลให้ขนาดของแต่ละ document ลดลงไปด้วย ทำให้ stage ถัดไป process document เฉพาะในส่วนที่ต้องใช้งานจริงๆ เท่านั้น และนี่ส่งผลดีต่อ performance ของระบบ
$group
เรามักเห็น operator $group
อยู่เสมอ เมื่อเรา aggregate ข้อมูล เนื่องจาก operator นี้ใช้สำหรับจัดกลุ่ม document เข้าด้วยกัน แล้วนำมาใช้ร่วมกับฟังก์ชั่นบางอย่างเพื่อหวังผลทางสถิติ เช่น
- $min, $max หาค่าของที่น้อยที่สุด หรือมากที่สุดของ field ที่ระบุ
- $avg หาค่าเฉลี่ยของ field ที่ระบุ
- $addToSet จับ field ที่ระบุมารวมกันเพื่อสร้างเป็น array ขึ้นมาใหม่ โดยข้อมูลใน array นี้จะไม่ซ้ำกัน
- $first, $last กรณีข้อมูลเรียงมาก่อนหน้นานี้ สามารถหา document ตัวแรกสุด หรือตัวสุดท้ายมาใช้งานได้
- $push, จับ field ที่ระบุมารวมกันเพื่อสร้างเป็น array ใหม่โดยไม่คัดตัวที่ซ้ำกันออกไป
- $sum, หาผลรวมของข้อมูลในกลุ่ม (ตั้งแต่ MongoDB 3.2
$sum
สามารถนำมาใช้ใน$project
stage ได้ด้วย)
และยังมีฟังก์ชั่นอื่นๆ อีกมากมาย อ่านเพิ่มได้ที่ Aggregation Pipeline Operators
ตัวอย่างการใช้ $group
โดยการจัดกลุ่มข้อมูลจาก เดือน วัน ปี และใช้ฟังก์ชั่นต่างๆ เพื่อหาข้อมูลทางสถิติ ดังนี้
- คำนวนยอดขายจากการการนำราคาขาย (price) คูณกับจำนวนที่ขายได้ (quantity) โดยใช้ฟังก์ชั่น
$multiply
ช่วยคูณให้ - หาค่าเฉลี่ยของจำนวนสินค้าที่ขายได้ โดยใช้ฟังก์ชั่น
$avg
- นับจำนวน document ที่ถูกจัดกลุ่ม โดยใช้ฟังก์ชั่น
$sum
db.sales.aggregate([
{
$group : {
_id : { month: { $month: "$date" }, day: { $dayOfMonth: "$date" }, year: { $year: "$date" } },
totalPrice: { $sum: { $multiply: [ "$price", "$quantity" ] } },
averageQuantity: { $avg: "$quantity" },
count: { $sum: 1 }
}
}
])
$unwind
ใช้สำหรับคลี่ Array ออกมาแล้วจับคู่เข้ากับ document ที่เป็น owner เป็นรายตัวไป ยกตัวอย่างเช่น ถ้าเรามีข้อมูลแบบนี้
{ "_id" : 1, "item" : "ABC1", sizes: [ "S", "M", "L"] }
แล้วเราลอง $unwind
แบบนี้
db.inventory.aggregate( [ { $unwind : "$sizes" } ] )
จะได้ผลลัพธ์ดังนี้
{ "_id" : 1, "item" : "ABC1", "sizes" : "S" }
{ "_id" : 1, "item" : "ABC1", "sizes" : "M" }
{ "_id" : 1, "item" : "ABC1", "sizes" : "L" }
ถ้า stage ถัดไปเรา group by size และ count เราก็จะทราบจำนวนว่าแต่ละ size นั้นมี item กี่ชิ้น ดังนี้
db.inventory.aggregate([
{ $unwind: '$sizes' },
{ $group: { _id: '$sizes', count: { $sum:1 } } }
])
$out
ใช้เมื่อเราต้องการบันทึกผลลัพธ์จาก pipeline ไปยัง collection ที่ต้องการ ถ้ายังไม่มี collection ปลายทางคำสั่งนี้จะสร้าง collection ปลายทางขึ้นมาใหม่ แต่หากมีแล้วก็จะบันทึกทับไปเลย และหากการเขียนข้อมูลลงไปใน collection ปลายทางไม่สำเร็จ ข้อมูลใน collection จะยังอยู่เหมือนเดิมไม่ถูกทับด้วยคำสั่ง $out
ตัวอย่างการใช้งารน
db.reviews.aggregate([
{ $group: { _id: '$product_id', count: { $sum:1 } } },
{ $out: 'resultCollection' }
])
นอกจาก operators ดังที่กล่าวมาข้างต้น ก็ยังมี operators ที่น่าสนใจอื่นๆ อีก เช่น
- $limit – จำกัดจำนวน document ที่จะถูกส่งต่อไปยัง step ถัดไป
- $skip – skip document
- $sort – เรียง document
- $geoNear – เลือก document ที่อยู่ใกล้กับ location ที่ระบุ
- $out – เขียนผลลัพธ์จาก pipeline ลงใน collection
อย่างไรก็ตาม เราควรเข้าไปทำความรู้จักกับ pipeline stage operators อื่นๆ เพิ่มเติมว่า MongoDB มีอะไรมาให้เราได้ใช้บ้าง ซึ่งจะทำให้เราสามารถใช้ Aggregation Framework ได้อย่างเต็มประสิทธิภาพได้ที่นี่ Aggregation Pipeline Stages
Reshaping documents
มีฟังก์ชั่นให้เราใช้ reshape document เยอะมากจนจำไม่หมด เวลาจะทำอะไรสักอย่างอันดับแรกให้คิดไว้ก่อนเลยว่า Aggregation Framework นั้นมีฟังก์ชั่นที่ทำให้เราได้เลยหรือไม่ โดยเริ่มจากการเข้ามาดูที่คู่มืออย่างเป็นทางการของ MongoDB
การ reshape document อย่างง่ายที่สุดที่มักถูกใช้ก็คือการเปลี่ยนชื่อ field หรือสร้าง field ใหม่ ยกตัวอย่างเช่น เราต้องการ สร้าง field ชื่อ name
และ field นี้มี อีก 2 field ข้างในคือ first
, last
เราสามารถทำได้ดังนี้
db.users.aggregate([
{ $match: { username: 'khomkrit' } },
{ $project: { name: { first: '$first_name',
last: '$last_name'}}
}
])
String functions
String Expression Operators คือกลุ่มฟังก์ชั่นที่ใช้จัดการกับ string ซึ่งโดยส่วนใหญ่แล้วจะเป็นพังก์ชั่นที่เราคุ้นเคยดีอยู่แล้ว เช่น $split
, $concat
, $dateToString
, $substr
, $trim
เป็นต้น
หากเราต้องการนำ first_name
รวมกับ last_name
มารวมกันแล้วสร้างเป็น field ใหม่ชื่อ name
เราสามารถทำได้ดังนี้
db.users.aggregate([
{
$project: { name: { $concat: ['$first_name', ' ', '$last_name'] } }
}
])
จากตัวอย่างด้านบนใช้ Pipeline Operators ชื่อ $concat เพื่อรวม string เข้าด้วยกัน ทำให้ได้ผลลัพธ์ออกมาแบบนี้
{
"_id" : ObjectId("5d4fe3bea6cbfb4102f49f74"),
"name" : "khomkrit sriwichai"
}
คราวนี้เรามาลองใช้ฟังก์ชั่นให้ซับซ้อนยิ่งขึ้นไปอีก โดยเราต้องการนำตัวอักษรตัวแรกของ first_name
และ last_name
มารวมกัน แล้วทำให้มันเป็นตัวพิมพ์ใหญ่ เราสามารถทำได้ดังนี้
db.users.aggregate([
{
$project: { name: { $concat: ['$first_name', ' ', '$last_name'] },
abbreviate: {
$toUpper: {
$concat: [
{ $substr: ['$first_name', 0, 1] },
{ $substr: ['$last_name', 0, 1] }
]
}
}
}
}
])
เราก็จะได้ผลลัพธ์ออกมาแบบนี้
{
"_id" : ObjectId("5d4fe3bea6cbfb4102f49f74"),
"name" : "khomkrit sriwichai",
"abbreviate" : "KS"
}
นอกจาก String functions ที่ยกมาดังกล่าวแล้ว MongoDB ยังเตรียมฟังก์ชั่นกลุ่มต่างๆ ไว้ให้เราได้ใช้อีกเป็นจำนวนมาก และที่มักถูกใช้เป็นประจำ ซึ่งผมคิดว่าเราควรเริ่มทำความรู้จักกับฟังก์ชั่นกลุ่มเหล่านี้ก่อนเพราะมีโอกาสใช้บ่อย ได้แก่
- Arithemetic เกี่ยวกับการคำนวนทางคณิตศาสตร์
- Array จัดการข้อมูลชนิด array
- Date จัดการวันที่
- Set จัดการ array แบบ set เช่น เทียบว่า array 2 ชุดนี้เหมือนกันหรือไม่ หรือ array 2 ชุดนี้ มีอะไรบ้างที่ไม่อยู่ใน array อีกชุด หรือมีอะไรบ้างที่มีเหมือนกันทั้ง 2 array เป็นต้น
- Logical เป็น boolean operator สำหรับกำหนดเงื่อนไขว่าให้ทำ หรือไม่ทำสิ่งใดในแต่ละ stage
- Miscellaneous ฟังก์ชั่นอำนวยความสะดวกต่างๆ
และอื่นๆ อีกอีกมากมาย สามารถอ่านเอกสารอย่างเป็นทางการเพิ่มเติมได้ที่นี่
Pipeline performance
มีบางสิ่งที่เราต้องคำนึงอยู่เสมอเมื่อเราออกแบบ pipeline เพื่อประสิทธิภาพการทำงานที่ดี ได้แก่
- พยายามลดจำนวน document ก่อนนำไปคำนวน หรือก่อนส่งต่อไปยัง stage ถัดไป ยิ่งเหลือ document จำนวนน้อยเท่าไหร่ การทำงานใน stage ถัดไปยิ่งเร็วขึ้นเท่านั้น
- ใช้ index กับ
$match
และ$sort
- ถ้าเราทำ sharding
$match
และ$project
จะรันแยก shard กันของใครของมัน แต่สำหรับ operator อื่นๆ จะรันบน primary shard
Options
ฟังก์ชั่น aggregate()
สามารถรับ parameter ตัวที่สองได้ (ตัวแรกคือ array ของ stage) ซึ่งก็คือ options ที่เราสามารถกำหนดได้ 3 ค่า ได้แก่ explain
, allowDiskUse
และ cursor
explain
ผลลัพธ์จะคล้ายกับฟังก์ชั่น .explain()
ตอนใช้คำสั่ง .find().explain()
เมื่อต้องการใช้ให้กำหนดค่าเป็น true
ดังนี้
db.reviews.aggregate([
{ $match: { ... } },
{ $group: { ... }}
], { explain: true })
allowDiskUse
เมื่อเราต้องเข้าไป query ข้อมูลขนาดใหญ่ๆ แล้วเราพบ error ประมาณนี้
assert: command failed: {
"errmsg": "exception: Exceeded memory limit for $group, but didn't allow external sort. Pass allowDiskUse:true to opt in.",
"code": 16945,
"ok", 0
} : aggregate failed
โดยทั่วไปแล้ว error นี้จะเกิดขึ้นหลังจากมีการรอที่นานเกินไปในขณะที่ aggregation pipeline process document จำนวนมาก ในกรณีนี้ pipeline ใช้ memory มากเกินไปจนเกิดปัญหา ซึ่งเราสามารถแก้ได้โดยใช้ option allowDiskUse:true
db.reviews.aggregate([
{ $match: { ... } },
{ $group: { ... }}
], { allowDiskUse: true })
โดยทั่วไปแล้วการใช้ allowDiskUse
มักทำให้ pipeline ทำงานช้าลง ดังนั้นเราควรใช้มันเท่าที่จำเป็นเท่านั้น โดยอาจเริ่มจากการพยายาม optimize pipeline ให้เหลือ document ที่เราต้องทำงานด้วยน้อยที่สุดเท่าที่เราทำได้ก่อนโดยใช้ $match
และ $project
เข้าช่วย
cursor
ตั้งแต่ MongoDB 2.6 เป็นต้นมา result ของ pipeline จะ return กลับออกมาเป็น cursor (ถ้าเรา access ผ่าน Mongo shell) ซึ่ง cursor ที่เราได้กลับมา เราสามารถเรียกใช้ method ต่างๆ ได้ เช่น .hasNext()
, .next()
, toArray()
, .pretty()
และอื่นๆ อีกมากมาย
จุดประสงค์ของ cursor
ก็คือ ทำให้เราสามารถ stream data จำนวนมากๆ ได้ ทำให้เราสามารถ process data จำนวนมากๆ ได้โดยสนใจข้อมูลชุดหนึ่ง ณ เวลาหนึ่งๆ โดยไม่จำเป็นต้องโหลดข้อมูลเข้ามา process พร้อม กันทีเดียวทั้งหมดทำให้เราไม่ต้องใช้ memory จำนวนมากจนเกินไป แต่เมื่อไรก็ตามที่เราใช้ method toArray()
และ pretty()
result document ที่เราได้ทุกอันจะถูกอ่านจาก memory ทันที
itcount()
จะอ่าน document ทุกอันเพื่อนับจำนวน document จากนั้นส่งจำนวนกลับไปยัง client และทิ้ง document เหล่านั้นไป ดังนั้นถ้า application ของเราอยากรู้จำนวนของ document เราสามารถใช้ $group
แล้ว count
แทนได้ ซึ่งมีประสิทธิภาพที่ดีกว่าการอ่านจาก itcount()
Other capabilities
Aggregation pipeline นั้นนับเป็นทางที่เราควรเลือกใช้ก่อนเสมอเมื่อต้องทำ aggregation ใน MongoDB แต่ก็มีบางทางเลือกที่ดูดีกว่าและสั้นกว่าการใช้ aggregation pipeline เช่น .count()
และ .distinct()
ที่เป็นฟังก์ชั่นสำเร็จรูปที่ MongoDB เตรียมมาให้ใช้
db.users.count()
db.users.distinct('first_name')
map-reduce
ทำให้เราสามารถใช้ JavaScript ในการนิยาม process ทั้งหมดเองได้ มีความยืดหยุ่นค่อนข้างสูง เพราะเราเป็นคนเขียน logic ต่างๆ เองทั้งหมด แต่ก็แลกมากับการทำงานที่อาจช้าลง การอ่านที่ค่อนข้างเข้าใจยากเพราะต้องไล่โค้ด JavaScript เพื่อทำความเข้าใจ ไม่ใช่ฟังก์ชั่นสำเร็จรูปเหมือนที่ MongoDB เตรียมไว้ให้
aggregation operation โดยส่วนมากแล้ว MongoDB ได้เตรียม operations ต่างๆ ไว้ให้ค่อนข้างพร้อมใช้ในแทบทุกสถานการณ์ทั่วไป ดังนั้นก่อนคิดจะเขียน map-reduce ฟังก์ชั่นเองก็ควรมองหาจาก operator ต่างๆ ที่มีอยู่แล้วก่อน แต่อย่างไรก็ตาม map-reduce ก็อาจยังจำเป็นอยู่บ้างในกรณีที่ไม่มี operators ใดๆ สามารถทำงานได้อย่างที่เราต้องการ
สรุป
- operator แบ่งออกเป็น 2 กลุ่ม สำหรับใช้กำหนดหน้าที่ให้ stage และใช้สำหรับ transform document
- ควรใช้
$match
และ$project
ตั้งแต่ต้นๆ pipeline เพื่อลดจำนวนข้อมูลที่ต้อง process ใน pipeline ลง - เราสามารถสร้าง aggregation operator โดยใช้ JavaScript เองได้ผ่าน
mapReduce
แต่ก่อนใช้ควรดูก่อนว่า MongoDB นั้นสามารถทำได้อยู่แล้วหรือไม่