Ở các bài trước, mình đã cùng bạn xây dựng Agent, Task, Tool và ghép chúng thành Crew hoàn chỉnh. Crew xử lý tốt những pipeline đơn giản, nhưng khi bạn cần điều phối nhiều Crew cùng lúc, rẽ nhánh theo điều kiện, hay chia sẻ dữ liệu giữa các bước thì sao? Đó là lúc Flow phát huy tác dụng.
Flow cho phép bạn thiết kế workflow phức tạp theo mô hình sự kiện (event-driven), kết hợp code Python thuần với Crew một cách linh hoạt. Bài này mình sẽ đi từ khái niệm cơ bản đến ví dụ thực tế, giúp bạn nắm được cách dùng Flow trong dự án thật.
Flow là gì, khác Crew thế nào

Nếu Crew giống một nhóm làm việc với danh sách task cố định, thì Flow giống một bản vẽ quy trình (flowchart) mà bạn tự thiết kế từng bước. Crew chạy tuần tự hoặc song song các task trong một nhóm agent. Flow ở tầng cao hơn, nó điều phối nhiều Crew, nhiều đoạn code Python, và cho phép bạn kiểm soát luồng chạy chi tiết.
Vài điểm khác biệt chính:
- Crew: tập hợp agent + task, chạy theo process (sequential hoặc hierarchical). Phù hợp khi bạn có một nhóm agent làm một việc cụ thể.
- Flow: chuỗi các bước (method) nối với nhau bằng sự kiện. Mỗi bước có thể là code Python thuần, gọi LLM trực tiếp, hoặc chạy cả một Crew. Phù hợp khi bạn cần workflow nhiều giai đoạn, có rẽ nhánh, có logic xử lý giữa các bước.
Nôm na, Crew xử lý “ai làm gì”, còn Flow xử lý “làm theo thứ tự nào, rẽ nhánh ở đâu, dữ liệu truyền ra sao”.
ℹ️ Flow không thay thế Crew. Hai khái niệm này bổ trợ nhau. Trong thực tế, bạn sẽ thường dùng Flow để điều phối nhiều Crew chạy theo quy trình phức tạp.
Một ví dụ dễ hình dung: bạn cần xây hệ thống tự động viết bài blog. Bước 1 là nghiên cứu (giao cho Crew A), bước 2 là viết nháp (giao cho Crew B), bước 3 là kiểm tra chất lượng bằng code Python, bước 4 là quyết định xuất bản hay yêu cầu sửa. Toàn bộ quy trình này cần Flow để điều phối, vì mỗi Crew chỉ lo phần việc của nó, còn Flow lo “kịch bản tổng thể”.
Tạo Flow cơ bản với @start và @listen

Flow trong CrewAI xây dựng trên hai decorator chính: @start() đánh dấu điểm bắt đầu, và @listen() đánh dấu method sẽ chạy khi một method khác hoàn thành. Cách hoạt động giống kiểu “khi A xong thì chạy B”.
Ví dụ đơn giản: tạo Flow sinh ra một thành phố ngẫu nhiên, rồi từ đó tìm một fun fact:
from crewai.flow.flow import Flow, listen, start
from litellm import completion
class CityFactFlow(Flow):
model = "gpt-4o-mini"
@start()
def generate_city(self):
# Gọi LLM để lấy tên thành phố ngẫu nhiên
response = completion(
model=self.model,
messages=[{"role": "user", "content": "Trả về tên một thành phố ngẫu nhiên trên thế giới."}],
)
city = response["choices"][0]["message"]["content"]
print(f"Thành phố: {city}")
return city
@listen(generate_city)
def generate_fun_fact(self, city):
# Khi generate_city xong, method này tự động chạy
response = completion(
model=self.model,
messages=[{"role": "user", "content": f"Kể một điều thú vị về {city}"}],
)
return response["choices"][0]["message"]["content"]
# Chạy Flow
flow = CityFactFlow()
result = flow.kickoff()
print(f"Kết quả: {result}")
Chỗ này có mấy điểm đáng chú ý:
@start()có thể khai báo nhiều lần trong một Flow. Tất cả các method có@start()sẽ chạy song song khi Flow khởi động.@listen(generate_city)nhận output củagenerate_citylàm tham số. Bạn có thể truyền tên method trực tiếp hoặc dùng string@listen("generate_city").kickoff()trả về output của method cuối cùng hoàn thành trong Flow.
💡 Bạn có thể gọi flow.plot() trước kickoff() để tạo file HTML trực quan hóa flow. Rất tiện để debug workflow phức tạp.
Một điểm nữa: Flow hỗ trợ cả async. Nếu bạn cần gọi API bên ngoài hoặc chạy nhiều task song song mà không muốn block, dùng async def cho method và gọi flow.kickoff_async() thay vì kickoff(). Cách này đặc biệt hữu ích khi Flow có nhiều bước I/O-bound (gọi API, đọc file, query database).
State management trong Flow

Một trong những điểm mạnh của Flow là quản lý trạng thái (state) xuyên suốt các bước. Thay vì truyền dữ liệu qua return value, bạn có thể lưu vào self.state để bất kỳ method nào trong Flow cũng truy cập được.
CrewAI hỗ trợ hai kiểu state:
Unstructured state (dạng dict)
Kiểu mặc định, linh hoạt, bạn muốn thêm key gì cũng được:
class SimpleFlow(Flow):
@start()
def step_one(self):
self.state["counter"] = 0
self.state["data"] = "khởi tạo xong"
@listen(step_one)
def step_two(self):
self.state["counter"] += 1
print(self.state)
# {'id': 'uuid-abc...', 'counter': 1, 'data': 'khởi tạo xong'}
Mỗi Flow tự động sinh một id (UUID) trong state, dùng để theo dõi từng lần chạy.
Structured state (dùng Pydantic)
Khi workflow phức tạp hơn, bạn nên định nghĩa state bằng Pydantic model để có type checking và auto-complete trong IDE:
from pydantic import BaseModel
class ResearchState(BaseModel):
topic: str = ""
outline: str = ""
draft: str = ""
review_score: int = 0
class ResearchFlow(Flow[ResearchState]):
@start()
def pick_topic(self):
self.state.topic = "CrewAI Flow"
@listen(pick_topic)
def create_outline(self):
# Truy cập state qua attribute thay vì dict key
self.state.outline = f"Outline cho chủ đề: {self.state.topic}"
return self.state.outline
Structured state giúp bạn tránh lỗi typo khi gõ tên key, và IDE sẽ gợi ý được các field có sẵn. Với project nhỏ thì dùng dict cho nhanh, còn project lớn nên dùng Pydantic.
💡 CrewAI còn hỗ trợ @persist decorator để lưu state vào SQLite, giúp Flow có thể phục hồi state sau khi restart. Hữu ích cho workflow chạy dài.
Khi nào dùng kiểu nào? Nếu Flow của bạn chỉ có vài bước đơn giản, state ít field thì dùng dict cho gọn. Khi flow bắt đầu phức tạp (10+ method, state có nhiều field liên quan), chuyển sang Pydantic sẽ giúp bạn bắt lỗi sớm hơn và code dễ bảo trì hơn. Mình thường bắt đầu bằng dict, rồi refactor sang Pydantic khi flow “phình” ra.
Kết hợp Flow với Crew


Cái hay nhất của Flow là khả năng kết hợp code Python thuần với Crew. Bạn có thể dùng Flow để chuẩn bị dữ liệu, gọi Crew xử lý, rồi lấy kết quả về xử lý tiếp bằng code.
Ví dụ một Flow điều phối hai Crew: một crew nghiên cứu, một crew viết bài:
from crewai import Agent, Task, Crew, Process
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
class ContentState(BaseModel):
topic: str = ""
research: str = ""
article: str = ""
class ContentPipelineFlow(Flow[ContentState]):
@start()
def pick_topic(self):
self.state.topic = "Ứng dụng AI trong quản lý server"
@listen(pick_topic)
def research_phase(self):
# Crew 1: Nghiên cứu
researcher = Agent(
role="Chuyên gia nghiên cứu",
goal=f"Nghiên cứu chuyên sâu về: {self.state.topic}",
backstory="Bạn là chuyên gia phân tích công nghệ."
)
research_task = Task(
description=f"Tìm hiểu và tổng hợp thông tin về {self.state.topic}",
expected_output="Báo cáo nghiên cứu chi tiết",
agent=researcher
)
crew = Crew(
agents=[researcher],
tasks=[research_task],
process=Process.sequential
)
result = crew.kickoff()
self.state.research = result.raw
return result.raw
@listen(research_phase)
def writing_phase(self, research_data):
# Crew 2: Viết bài dựa trên nghiên cứu
writer = Agent(
role="Content Writer",
goal="Viết bài blog chất lượng từ dữ liệu nghiên cứu",
backstory="Bạn là cây viết kỹ thuật giàu kinh nghiệm."
)
write_task = Task(
description=f"Viết bài blog dựa trên nghiên cứu:\n{research_data}",
expected_output="Bài blog hoàn chỉnh, 1000+ từ",
agent=writer
)
crew = Crew(
agents=[writer],
tasks=[write_task],
process=Process.sequential
)
result = crew.kickoff()
self.state.article = result.raw
return result.raw
# Chạy pipeline
flow = ContentPipelineFlow()
article = flow.kickoff()
print(article)
Ở đây Flow đóng vai trò “nhạc trưởng”: bước 1 chọn chủ đề, bước 2 giao cho crew nghiên cứu, bước 3 giao cho crew viết bài. Giữa các bước, bạn hoàn toàn có thể xen thêm logic Python (lọc dữ liệu, validate, gọi API bên ngoài) mà không cần tạo agent riêng.
ℹ️ Bạn cũng có thể dùng Agent đơn lẻ trong Flow (không cần tạo Crew) bằng cách gọi agent.kickoff_async() trực tiếp. Cách này nhẹ hơn khi chỉ cần một agent xử lý một việc nhỏ.
Pattern “Flow + nhiều Crew” rất phổ biến trong thực tế. Ví dụ xây chatbot hỗ trợ khách hàng: Crew 1 phân loại câu hỏi, Crew 2 tìm kiếm tài liệu, Crew 3 soạn câu trả lời, và Flow điều phối luồng chạy giữa ba crew dựa trên loại câu hỏi. Mỗi Crew có thể dùng agent và tool riêng, không cần biết đến nhau, Flow lo phần kết nối.
Rẽ nhánh có điều kiện và chạy song song

Flow hỗ trợ ba cơ chế điều khiển luồng chạy khá mạnh:
Router: rẽ nhánh theo điều kiện
Decorator @router() cho phép method trả về một string, và các method khác “lắng nghe” string đó để quyết định có chạy hay không:
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel
class QAState(BaseModel):
score: int = 0
class QualityCheckFlow(Flow[QAState]):
@start()
def evaluate(self):
# Giả sử đánh giá chất lượng bài viết
self.state.score = 75
@router(evaluate)
def check_quality(self):
if self.state.score >= 80:
return "approved"
else:
return "needs_revision"
@listen("approved")
def publish(self):
print("Bài viết đạt chuẩn, tiến hành xuất bản!")
@listen("needs_revision")
def revise(self):
print(f"Điểm {self.state.score}/100, cần chỉnh sửa lại.")
Khi chạy Flow trên, vì score = 75 nên chỉ method revise() được gọi. Nếu score >= 80 thì publish() sẽ chạy thay.
or_ và and_: chạy song song và đợi nhiều nguồn
CrewAI cung cấp hai hàm or_() và and_() để xử lý trường hợp nhiều method chạy song song:
or_(method_a, method_b): listener sẽ chạy khi bất kỳ method nào trong danh sách hoàn thành.and_(method_a, method_b): listener chỉ chạy khi tất cả method trong danh sách đều đã hoàn thành.
from crewai.flow.flow import Flow, listen, or_, and_, start
class ParallelFlow(Flow):
@start()
def fetch_data_a(self):
return "Dữ liệu từ nguồn A"
@start()
def fetch_data_b(self):
return "Dữ liệu từ nguồn B"
@listen(and_(fetch_data_a, fetch_data_b))
def merge_results(self):
# Chỉ chạy khi CẢ HAI nguồn đã trả dữ liệu
print("Gộp dữ liệu từ cả hai nguồn")
print(self.state)
Vì cả fetch_data_a và fetch_data_b đều có @start(), chúng sẽ chạy song song khi Flow khởi động. Method merge_results chỉ chạy sau khi cả hai đã xong.
⚠️ Khi dùng and_(), các method phải lưu kết quả vào self.state thay vì truyền qua return value, vì listener không nhận được output riêng của từng method.
Human in the Loop
Từ CrewAI 1.8.0 trở lên, Flow có thêm decorator @human_feedback cho phép dừng flow để chờ phản hồi từ người dùng. Bạn định nghĩa các outcome có thể (ví dụ: “approved”, “rejected”, “needs_revision”), và LLM sẽ phân loại feedback của người dùng vào một trong các outcome đó, rồi flow tiếp tục chạy theo nhánh tương ứng.
from crewai.flow.flow import Flow, start, listen
from crewai.flow.human_feedback import human_feedback
class ReviewFlow(Flow):
@start()
@human_feedback(
message="Bạn có duyệt nội dung này không?",
emit=["approved", "rejected"],
default_outcome="rejected"
)
def generate_content(self):
return "Nội dung cần review..."
@listen("approved")
def on_approve(self, result):
print(f"Đã duyệt! Feedback: {result.feedback}")
@listen("rejected")
def on_reject(self, result):
print(f"Từ chối. Lý do: {result.feedback}")
Tính năng này rất hữu ích cho các workflow cần bước phê duyệt thủ công, ví dụ: duyệt nội dung trước khi xuất bản, xác nhận đơn hàng, hay review code trước khi merge.
Ví dụ thực tế: Flow tạo learning guide từ tài liệu

Để bạn hình dung rõ hơn, mình sẽ xây một Flow hoàn chỉnh: nhận một chủ đề, nghiên cứu tài liệu, tạo outline, viết nội dung, rồi kiểm tra chất lượng trước khi xuất ra.
from crewai import Agent, Task, Crew, Process
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel
class GuideState(BaseModel):
topic: str = ""
research_data: str = ""
outline: str = ""
content: str = ""
quality_score: int = 0
class LearningGuideFlow(Flow[GuideState]):
@start()
def gather_requirements(self):
self.state.topic = "Docker cho người mới bắt đầu"
print(f"Chủ đề: {self.state.topic}")
@listen(gather_requirements)
def research(self):
"""Crew nghiên cứu tài liệu"""
researcher = Agent(
role="Researcher",
goal=f"Thu thập tài liệu về {self.state.topic}",
backstory="Chuyên gia tìm và tổng hợp thông tin kỹ thuật."
)
task = Task(
description=f"Nghiên cứu và tổng hợp tài liệu về {self.state.topic}",
expected_output="Tóm tắt các điểm chính cần có trong learning guide",
agent=researcher
)
crew = Crew(agents=[researcher], tasks=[task], process=Process.sequential)
result = crew.kickoff()
self.state.research_data = result.raw
return result.raw
@listen(research)
def create_outline(self, research_data):
"""Tạo outline bằng code Python (không cần agent)"""
sections = [
"Giới thiệu",
"Cài đặt môi trường",
"Các khái niệm cốt lõi",
"Thực hành cơ bản",
"Bài tập nâng cao",
"Tổng kết"
]
self.state.outline = "\n".join(f"- {s}" for s in sections)
return self.state.outline
@listen(create_outline)
def write_content(self, outline):
"""Crew viết nội dung dựa trên outline và research"""
writer = Agent(
role="Technical Writer",
goal="Viết learning guide dễ hiểu, có ví dụ minh họa",
backstory="Cây viết kỹ thuật, chuyên tạo tài liệu hướng dẫn."
)
task = Task(
description=f"""Viết learning guide với:
- Chủ đề: {self.state.topic}
- Outline: {outline}
- Tài liệu tham khảo: {self.state.research_data[:500]}""",
expected_output="Learning guide hoàn chỉnh, 2000+ từ",
agent=writer
)
crew = Crew(agents=[writer], tasks=[task], process=Process.sequential)
result = crew.kickoff()
self.state.content = result.raw
self.state.quality_score = 85 # Giả lập điểm chất lượng
return result.raw
@router(write_content)
def quality_gate(self):
if self.state.quality_score >= 80:
return "pass"
return "fail"
@listen("pass")
def finalize(self):
print(f"Learning guide hoàn thành! ({len(self.state.content)} ký tự)")
return self.state.content
@listen("fail")
def request_revision(self):
print("Chất lượng chưa đạt, cần chỉnh sửa.")
# Chạy
flow = LearningGuideFlow()
flow.plot("learning_guide_flow") # Tạo sơ đồ trực quan
result = flow.kickoff()
Flow trên minh họa một pipeline thực tế: thu thập yêu cầu ➜ nghiên cứu (Crew) ➜ tạo outline (code) ➜ viết nội dung (Crew) ➜ kiểm tra chất lượng (router) ➜ xuất bản hoặc yêu cầu sửa. Mỗi bước độc lập, dễ test riêng, và bạn có thể thay thế bất kỳ bước nào mà không ảnh hưởng toàn bộ flow.
💡 Với những dự án cần server mạnh để chạy các mô hình AI hoặc workflow phức tạp, bạn có thể tham khảo Pro VPS tại AZDIGI để có hiệu năng ổn định.
Câu hỏi thường gặp
Flow là bước tiến quan trọng khi bạn cần xây dựng ứng dụng AI phức tạp hơn mức một Crew đơn lẻ. Kết hợp với Memory và Knowledge, bạn có thể tạo ra những hệ thống AI vừa thông minh vừa có khả năng ghi nhớ và học hỏi. Nếu bạn chưa đọc các bài trước trong serie, hãy bắt đầu từ bài tổng quan về CrewAI để có nền tảng vững.
Có thể bạn cần xem thêm
- Task trong CrewAI: giao việc cho AI agent
- CrewAI là gì? Hướng dẫn xây dựng hệ thống Multi-Agent AI với Python
- Tool trong CrewAI: mở rộng khả năng agent
- Hướng dẫn dùng Subagents trong Claude để tạo trợ lý AI thông minh
- Deploy ứng dụng Node.js / Python với Docker Compose
- Tối ưu chi phí OpenClaw: Cách giảm 70% token usage mà vẫn giữ chất lượng
Về tác giả
Trần Thắng
Chuyên gia tại AZDIGI với nhiều năm kinh nghiệm trong lĩnh vực web hosting và quản trị hệ thống.