ARTICLE AD BOX
Asked today
Viewed 30 times
I am trying to setup a distributed environment for NLP processing. I use Ray and Python on GCP. I have a master and several workers. What happens is that when I run 1 worker or 8 workers, it takes same amount of time to have 10 results. There is no parallelism at all. I've been struggling with this for hours but still no clue.
Here is my scheduler code:
import ray from pymongo import MongoClient import worker # your worker.py will define the Ray actor import logging, threading, time from datetime import datetime def start_orchestration(): logging.info("--- Starting orchestration ---") ray.init(address="auto") # connect to Ray cluster (or just ray.init() locally) cpus_per_actor = 2 nlp_actors = [] def scale_actors(): resources = ray.cluster_resources() total_cpus = int(resources.get("CPU", 0)) desired = total_cpus // cpus_per_actor print(f"Desired actors={desired}, current={len(nlp_actors)}") # Add actors if needed while len(nlp_actors) < desired: new_actor = worker.NLPActor.options(num_cpus=cpus_per_actor).remote( "gs://babyllm-data/embeddings.pkl" ) nlp_actors.append(new_actor) # Remove actors if cluster shrank while len(nlp_actors) > desired: actor_to_remove = nlp_actors.pop() ray.kill(actor_to_remove) def scaler_loop(interval=30): while True: try: scale_actors() except Exception as e: logging.error(f"[Scaler] Error {e}") time.sleep(interval) # Initial scale scale_actors() # Run scaler in background threading.Thread(target=scaler_loop, daemon=True).start() logging.info("--- NLPActor started ---") # Mongo Connection mongo = MongoClient("mongodb+srv://....../") col = mongo["BabyLLM"]["simple_texts_wiki_v2"] MAX_IN_FLIGHT = len(nlp_actors) * 2 logging.info("--- Processing Docs ---") active_futures = [] cursor = col.find({"text_l1": {"$exists": False}})#{"text_mini": {"$exists": False}}) counter = 0 start_time = datetime.now() for doc in cursor: # 1. Submit the task and keep moving actor = nlp_actors[counter % len(nlp_actors)] f = actor.extract_and_replace.remote(doc["_id"], doc["title"], doc["text"][:1000]) active_futures.append(f) # 2. Only wait once we've filled our "pipeline" buffer if len(active_futures) >= MAX_IN_FLIGHT: # Get whichever task finishes FIRST (not necessarily the one we just sent) done, active_futures = ray.wait(active_futures, num_returns=1) for finished in done: try: res = ray.get(finished) if res: # Save immediately as requested # col.update_one( # {"_id": res["_id"]}, # {"$set": { # "text_l1": res["text_mini_l1"] # # "text_mini": res["text_mini"], # # "text_mini_l1": res["text_mini_l1"] # }} # ) counter += 1 if counter % 10 == 0: print(f"Saved {counter} documents total. Took {(datetime.now() - start_time).total_seconds()} seconds.") start_time = datetime.now() except Exception as e: logging.error(f"Task failed: {e}") # 3. Clean up the final tasks left in the pipeline while active_futures: done, active_futures = ray.wait(active_futures, num_returns=1) for finished in done: res = ray.get(finished) if res: col.update_one({"_id": res["_id"]}, {"$set": {"text_mini": res["text_mini"]}}) counter += 1 logging.info("--- Job Complete ---") if __name__ == "__main__": start_orchestration()1,3942 gold badges17 silver badges36 bronze badges
I think that MAX_IN_FLIGHT is calculated too early, so when is calculated after scale_actors(), not all actors may be ready yet, it will be better if you move it inside the loop.
if len(active_futures) >= len(nlp_actors) * 2:i also notice counter only increments when a task completes, so multiple tasks end up going to the same actor, in this case my friend there is a beter way to do this and is a simple round robin instead
actor = nlp_actors[len(active_futures) % len(nlp_actors)]as a final advice if you are using spaCy or transformers inside NLPActor, they may block the GIL and cause actors to run sequentially despite the parallelism setup.
Explore related questions
See similar questions with these tags.
