Հոսքերի սինխրոնիզացիա Python-ում

multitrading in python

Քանզի սինխրոնիզացիան ներդաշնակություն է

Ինձ համար թերևս մի կախարդական պահ, անսպասելի ինսայթ էր, երբ առաջին անգամ լսեցի մուլտիթրեդինգի (բազմահոսք) մասին: Տպավորվել էի գործողությունների զուգահեռ կատարման հնարավորությունից (չնայած, կարևոր է նշել, որ մեկ միջուկային պրոցեսոր ունեցող համակարգչում հաշվարկները չեն կատարվում խստորեն զուգահեռաբար, ընդ որում, Python-ում հաշվարկները մասամբ զուգահեռացվում են GIL կենցեպցիայի (հայեցակարգի) առկայության շնորհիվ: GIL (Global Interpreter Lock) – Ինտերպրետատորի գլոբալ արգելափակումը հոսքերի սինխրոնիզացման եղանակ է, որը հիմնականում օգտագործվում է Python-ում և Ruby-ում: Մուլտիթրեդինգը, անկասկած, հաշվողական աշխատանքների օպտիմալացման և կատարելագործման համար նոր հնարավորություններ է ընձեռնում, բայց, ինչպես ասում են, հնարավորություններն առաջ են բերում պատասխանատվություններ:

Կան մի շարք խնդիրներ, որոնք կարող են առաջանալ մուլտիթրեդինգի օգտագործման արդյունքում: Օրինակ, երբ բազմաթիվ հոսքեր փորձեն տվյալների միևնույն ֆրագմենտին հասանելիություն ստանան, կարող է որոշ դեպքերում առաջանալ անհամատեղելության խնդիր, կամ ինֆորմացիան ստացվի աղավաղված (կոնսոլում HWeolrldo` Hello World-ի փոխարեն): Նմանատիպ խնդիրներ առաջանում են, երբ համակարգում նշված չէ հոսքերի կառավարման եղանակը, կազմակերպման միջոցները:

Ուստի մեր առաջ խնդիր է ձևավորվում, նախ համակարգչին ստիպել կարգավորել հոսքերն ու սինխրոնիզացնել որոշակի կոնկրետ ալգորիթմով: Այս նպատակով օգտագործվում են սինխրոնիզացման պրիմիտիվներ – պարզագույն ծրագրային մեխանիզմներ, որոնք ապահովում են հոսքերի ներդաշնակ փոխազդեցությունը միմյանց միջև:

Այս գրառման մեջ ներկայացված են Python-ում սինխրոնացման առավել հայտնի պրիմիտիվները, որոնք սահմանված են ստանդարտ threading.py մոդուլում: Այս պրիմիտիվների արգելափակման մեթոդների մեծ մասը (այսինքն` այն մեթոդները, որոնք արգելափակում են որոշակի թրեդի կատարումը, քանի դեռ պայմանը բավարարում է), տրամադրում են թայմաութի լրացուցիչ ֆունկցիաներ, բայց պարզության համար դրանք այստեղ չեն նկարագրվի, ինչպես նաև, նկարագրված են այդ օբյեկտների միայն հիմնական ֆունկցիաները: Նյութը նախատեսված է մուլտիթրեդինգի բազային գիտելիքներ և պատկերացումներ ունեցող ընթերցողի համար: Մուլտիթրեդինգին առավել բազային կանդրադառնանք հաջորդիվ գրառումներում:

Եվ այսպես, կուսումնասիրենք Locks, RLocks, Semaphores, Events, Conditions և Barriers պրիմիտիվները:
Իհարկե, դուք կարող եք ստեղծել ձեր սեփական սինխրոնիզացման պրիմիտիվները՝ օգտագործելով վերը նշվածները որպես ենթակլասներ: Սկսենք Locks-ից` որպես ամենապարզ պրիմիտիվ, և աստիճանաբար անցնենք ավելի բարդերին:

Locks

Lock պրիմիտիվները, հավանաբար, պարզագույնն են Python-ում: Locks– ի համար հնարավոր է միայն երկու վիճակ՝ կողպված (արգելափակված) և բացված (ապաարգելափակված): Պրիմիտիվը ստեղծվում է բացված վիճակում և պարունակում է երկու մեթոդ` acquire() և release(): acquire() մեթոդը արգելափակում է Lock-ն ու բլոկի շարունակույան կատարումը մինչև որ մեկ այլ բլոկից release() մեթոդը այն չի բացել: Հետո այն նորից կողպում է Lock-ը և վերադառձնում True արժեք: release() մեթոդը կանչվում է միայն կողպված վիճակում – սահմանում է ապաարգելափակման վիճակը և անմիջապես վերադառձնում ղեկավարումը: Բացված վիճակում release() մեթոդի կանչը հանգեցնում է RunTimeError-ի:

Ստորև ներկայացված է կոդի օրինակ, որն օգտագործում է Lock պրիմիտիվը ընդհանուր փոփոխականին անվտանգ հասանելիության համար.

#lock_tut.py
from threading import Lock, Thread
lock = Lock()
g = 0

def add_one():
   """
   Just used for demonstration. It’s bad to use the ‘global’
   statement in general.
   """
   
   global g
   lock.acquire()
   g += 1
   lock.release()

def add_two():
   global g
   lock.acquire()
   g += 2
   lock.release()

threads = []
for func in [add_one, add_two]:
   threads.append(Thread(target=func))
   threads[-1].start()

for thread in threads:
   """
   Waits for threads to complete before moving on with the main
   script.
   """
   thread.join()

print(g)
from threading import Lock, Thread
lock = Lock()
g = 0

def add_one():
   """
   Just used for demonstration. It’s bad to use the ‘global’
   statement in general.
   """
   
   global g
   lock.acquire()
   g += 1
   lock.release()

def add_two():
   global g
   lock.acquire()
   g += 2
   lock.release()

threads = []
for func in [add_one, add_two]:
   threads.append(Thread(target=func))
   threads[-1].start()

for thread in threads:
   """
   Waits for threads to complete before moving on with the main
   script.
   """
   thread.join()

print(g)

Այս կոդը պարզապես տալիս է արդյունքը՝ 3 թվի տեսքով, բայց հիմա մենք համոզված ենք, որ երկու ֆունկցիաներ միաժամանակ չեն փոխում գլոբալ g փոփոխականի արժեքը, չնայած որ դրանք աշխատում են երկու տարբեր թրեդերում: Այսպիսով, Lock-ը կարող է օգտագործվել արդյունքի անհամապատասխանությունը կանխելու համար` թույլ տալով յուրաքանչյուր անգամ միայն մեկ թրեդից տվյալների փոփոխման հնարավորություն:

RLocks

Ստանդարտ Lock-ը չի կարգավորում, թէ որ հոսքն է ժամանակի տվյալ պահին արգելափակվում: Եթե արգելափակումը հիշվում է, արգելափակվում է ցանկացած հոսք, որը փորձում է հասանելիություն ստանալ, նույնիսկ եթե դա հենց նույն հոսքն է, որը պահում է արգելափակումը: Ստացվում է, որ արգելափակման կարգավորումը Lock-ի եղանակով անհնար է, քանզի ապաարգելափակող հոսքը հասանելիությունից զրկված է: Հենց նմանատիպ դեպքերի համար էլ օգտագործվում է RLock – կրկնվող մուտքի արգելափակումը: Դուք կարող եք ընդլայնել կոդը հետևյալ ֆրագմենտում, ավելացնելով ելքային գործիքներ, ցուցադրելու համար RLock-ի անցանկալի արգելափակումները չեզոքացնող հնարավորությունը:

#rlock_tut.py
import threading

num = 0
lock = Threading.Lock()

lock.acquire()
num += 1
lock.acquire() # This will block.
num += 2
lock.release()


# With RLock, that problem doesn’t happen.
lock = Threading.RLock()

lock.acquire()
num += 3
lock.acquire() # This won’t block.
num += 4
lock.release()
lock.release() # You need to call release once for each call to acquire.

RLock-ը հնարավոր է նաև օգտագործել ռեկուրսիվ, երբ ծնող ֆունկցիայի կանչն արգելափակում է նրա մեջ ներառված կանչերը: Այս կերպ, RLock-ն օգտագործվում է ընդհանուր ռեսուրսների ներդրված(nested) հասանելիության համար:

Semaphore

Սեմաֆորները պարզապես լրացուցիչ հաշվիչներ են: acquire()-ի կանչը սեմաֆորի կողմից արգելափակվում է միայն acquire()-ի գործարկված հոսքերի՝ որոշակի քանակի գերազանցման պարագայում: Համապատասխան հաշվիչի արժեքը փոքրանում է acquire()-ի, և մեծանում՝ release()-ի յուրաքանչյուր կանչի ժամանակ: ValueError արժեք կձևավորվի այն ժամանակ, երբ release()-ի կանչը կփորձի մեծացնել հաշվիչի արժեքը, երբ հաշվիչն արդեն հասել է տրված առավելագույն արժեքին: Ստորև բերված կոդը պատկերում է սեմաֆորի կիրառությունը՝ արտադրող-սպառող պարզագույն խնդրի համար:

#semaphores_tut.py
import random, time
from threading import BoundedSemaphore, Thread
max_items = 5
"""
Consider 'container' as a container, of course, with a capacity of 5
items. Defaults to 1 item if 'max_items' is passed.
"""
container = BoundedSemaphore(max_items)
def producer(nloops):
    for i in range(nloops):
        time.sleep(random.randrange(2, 5))
        print(time.ctime(), end=": ")
        try:
            container.release()
            print("Produced an item.")
        except ValueError:
            print("Full, skipping.")
def consumer(nloops):
    for i in range(nloops):
        time.sleep(random.randrange(2, 5))
        print(time.ctime(), end=": ")
        """
        In the following if statement we disable the default
        blocking behaviour by passing False for the blocking flag.
        """
        if container.acquire(False):
            print("Consumed an item.")
        else:
            print("Empty, skipping.")
threads = []
nloops = random.randrange(3, 6)
print("Starting with %s items." % max_items)
threads.append(Thread(target=producer, args=(nloops,)))
threads.append(Thread(target=consumer, args=(random.randrange(nloops, nloops+max_items+2),)))
for thread in threads:  # Starts all the threads.
    thread.start()
for thread in threads:  # Waits for threads to complete before moving on with the main script.
    thread.join()
print("All done.")

Bash code Semafore

threading մոդուլն իր մեջ արդեն իսկ ներառում է պարզագույն Semaphore կլաս, որը տրամադրում է հաշվիչ, որն էլ իր հերթին հնարավորություն է տալիս կանչել release()-ը կամայական քանակությամբ: Սակայն, ծրագրավորման ընթացքում սխալներից խուսափելու համար ցանկալի է օգտագործել BoundedSemaphore կլասը, որը սխալ է առաջացնում, եթե release()-ի կանչը փորձում է մեծացնել հաշվիչի արժեքը սահմանված առավելագույնից:

Սեմաֆորները սովորաբար օգտագործվում են ռեսուրսները սահմանափակելու համար, օրինակ` սերվեր մուտք գործելու սահմանափակում, թույլ տալով միաժամանակ մշակել/սպասարկել միայն 10 հաճախորդի: Այս դեպքում կապի մի քանի հոսք մրցակցում են սահմանափակ ռեսուրսի, մեր օրինակում՝ սերվերի համար:

Events

Սինխրոնիզացման Event(գործողություն) պրիմիտիվն աշխատում է հոսքերի միջև սովորական կոմունիկատորի դերում: Այն օգտագործում է ներքին ֆլագ, որոնք հոսքերը կարող են սահմանել՝ set(), կամ զրոյացնել՝ clear(): Մնացած հոսքերը կարող են սպասել wait() ներքին ֆլագի set() սահմանմանը: wait() մեթոդը արգելափակում է իրականացնում, քանի դեռ ֆլագը չի դառնում ճիշտ: Հաջորդիվ ներկայացող կոդը ցույց է տալիս, թէ ինչպես Event-ն օգտագործել որևէ գործողության գործարկամ համար:

#event_tut.py
import random, time
from threading import Event, Thread

event = Event()

def waiter(event, nloops):
    for i in range(nloops):
    print(“%s. Waiting for the flag to be set.” % (i+1))
    event.wait() # Blocks until the flag becomes true.
    print(“Wait complete at:”, time.ctime())
    event.clear() # Resets the flag.
    print()

def setter(event, nloops):
    for i in range(nloops):
    time.sleep(random.randrange(2, 5)) # Sleeps for some time.
    event.set()

threads = []
nloops = random.randrange(3, 6)

threads.append(Thread(target=waiter, args=(event, nloops)))
threads[-1].start()
threads.append(Thread(target=setter, args=(event, nloops)))
threads[-1].start()

for thread in threads:
    thread.join()

print(“All done.”)

Events example

Conditions

Condition(պայման) օբյեկտն իրենից ներկայացնում է Event-ի կատարելագործված տարբերակը: Այն ևս հանդես է գալիս որպես կոմունիկատոր-հաղորդակից հոսքերի միջև և կարող է օգտագործվել ծրագրի վիճակի փոփոխության մասին այլ հոսքերին notify() ծանուցելու համար: Օրինակ, այն կարող է օգտագործվել որպես ռեսուրսի հասանելիության ազդանշանային համակարգ: Մյուս հոսքերը նույնպես պետք է ստանան acquire() պայմանը (հետևաբար նաև դրա հետ կապված արգելափակումը) նախքան սպասելը wait()՝ պայմանը բավարարելու համար: Բացի այդ, հոսքը պետք է ազատի release()-ը՝ ըստ Condition-ի պայմանի՝ դրա հետ կապված գործողություններն ավարտելուց հետո, որպեսզի այլ հոսքեր ևս կարողանան պայմանը ստանալ իրենց սեփական նպատակների համար: Հետևյալ կոդը ցույց է տալիս, թե ինչպես կարելի է իրականացնել արտադրող-սպառող մեկ այլ պարզ խնդիր` օգտագործելով Condition օբյեկտ:

#condition_tut.py
import random, time
from threading import Condition, Thread
"""
'condition' variable will be used to represent the availability of a produced
item.
"""
condition = Condition()
box = []
def producer(box, nitems):
    for i in range(nitems):
        time.sleep(random.randrange(2, 5))  # Sleeps for some time.
        condition.acquire()
        num = random.randint(1, 10)
        box.append(num)  # Puts an item into box for consumption.
        condition.notify()  # Notifies the consumer about the availability.
        print("Produced:", num)
        condition.release()
def consumer(box, nitems):
    for i in range(nitems):
        condition.acquire()
        condition.wait()  # Blocks until an item is available for consumption.
        print("%s: Acquired: %s" % (time.ctime(), box.pop()))
        condition.release()
threads = []
"""
'nloops' is the number of times an item will be produced and
consumed.
"""
nloops = random.randrange(3, 6)
for func in [producer, consumer]:
    threads.append(Thread(target=func, args=(box, nloops)))
    threads[-1].start()  # Starts the thread.
for thread in threads:
    """Waits for the threads to complete before moving on
       with the main script.
    """
    thread.join()
print("All done.")

Conditions bash example

Condition-ը շատ հաճախ կիրառվում է Streaming API մշակելիս, որը հաճախորդին տեղեկացնում է տվյալները հասանելիության ժամկետի սկզբի վերաբերյալ:

Barriers

Բարիերները (պատնեշ, խոչընդոտ) պարզունակ պրիմիտիվներն օգտագործվում են հոսքերի կողմից միմյանց սպասելու համար: Յուրաքանչյուր հոսք փորձում է փոխանցել բարիերը wait() մեթոդի կանչի միջոցով, որը կարգելափակի, քանի դեռ մնացած հոսքեր չեն ստեղծի այս կանչը: Իսկ հենց դա տեղի ունենա՝ հոսքերը միաժամանակ կգործարկվեն: Հետևյալ հատվածը ցույց է տալիս Barrier-ի օգտագործման օրինակ:

#barrier_tut.py
from random import randrange
from threading import Barrier, Thread
from time import ctime, sleep

num = 4
# 4 threads will need to pass this barrier to get released.
b = Barrier(num)
names = [“Harsh”, “Lokesh”, “George”, “Iqbal”]

def player():
    name = names.pop()
    sleep(randrange(2, 5))
    print(“%s reached the barrier at: %s” % (name, ctime()))
    b.wait()
    
threads = []
print(“Race starts now…”)

for i in range(num):
    threads.append(Thread(target=player))
    threads[-1].start()
"""
Following loop enables waiting for the threads to complete before moving on with the main script.
"""
for thread in threads:
    thread.join()
print()
print(“Race over!”)

Barriers bash example

Բարիերների համար շատ գործածություններ կան, որոնցից մեկը կարող է լինել սերվերի և հաճախորդի աշխատանքների սինխրոնացումը, քանի որ ինիցիալիզացիայից հետո սերվերը հաճախ ստիպված է լինում սպասել հաճախորդին:

Սրանով ավարտենք Python-ում սինխրոնիզացման պրիմիտիվներին նվիրված վերլուծությունը:

Սույն հոդվածում անդրադարձ է կատարվում Ուեսլի Չանի “Հավելվածների ծրագրավորում Python-ի միջուկի հիման վրա” գրքում նկարագրված որոշ առաջադրանքների: Առաջադրանքների լուծումների տարբերակներ կարող եք գտնել GitHub-ի այս էջում:

Սկզբնաղբյուր՝ Let’s Synchronize Threads in Python