Flow tasks

Start

class viewflow.flow.Start(*args, **kwargs)

Start process event

Example:

start = flow.Start(StartView, fields=["some_process_field"]) \
    .Available(lambda user: user.is_super_user) \
    .Activate(this.first_start)

In case of function based view:

start = flow.Start(start_process)

@flow_start_view()
def start_process(request, activation):
     if not activation.has_perm(request.user):
         raise PermissionDenied

     activation.prepare(request.POST or None)
     form = SomeForm(request.POST or None)

     if form.is_valid():
          form.save()
          activation.done()
          return redirect('/')
     return render(request, {'activation': activation, 'form': form})

Ensure to include {{ activation.management_form }} inside template, to proper track when task was started and other task performance statistics:

<form method="POST">
     {{ form }}
     {{ activation.management_form }}
     <button type="submit"/>
</form>
Available(owner=None, **owner_kwargs)

Make process start action available for the User accepts user lookup kwargs or callable predicate :: User -> bool:

.Available(username='employee')
.Available(lambda user: user.is_super_user)
Permission(permission=None, auto_create=False, obj=None, help_text=None)

Make task available for users with specific permission, aceps permissions name of callable :: Process -> permission_name:

.Permission('my_app.can_approve')
.Permission(lambda process: 'my_app.department_manager_{}'.format(process.depratment.pk))

Task specific permission could be auto created during migration:

# Creates `processcls_app.can_do_task_processcls` permission
do_task = View().Permission(auto_create=True)

# You can specify permission codename and description right here
# The following creates `processcls_app.can_execure_task` permission
do_task = View().Permission('can_execute_task', help_text='Custom text', auto_create=True)

View

View task represents user task performed by interaction with django view.

class viewflow.flow.View(*args, **kwargs)

View task

Example:

task = flow.View(some_view) \
    .Permission('my_app.can_do_task') \
    .Next(this.next_task)

In case of function based view:

task = flow.Task(task)

@flow_start_view()
def task(request, activation):
     if not activation.flow_task.has_perm(request.user):
         raise PermissionDenied

     activation.prepare(request.POST or None)
     form = SomeForm(request.POST or None)

     if form.is_valid():
          form.save()
          activation.done()
          return redirect('/')
     return render(request, {'activation': activation, 'form': form})

Ensure to include {{ activation.management_form }} inside template, to proper track when task was started and other task performance statistics:

<form method="POST">
     {{ form }}
     {{ activation.management_form }}
     <button type="submit"/>
</form>
Assign(owner=None, **owner_kwargs)

Assign task to the User immediately on activation, accepts user lookup kwargs or callable :: Process -> User:

.Assign(username='employee')
.Assign(lambda process: process.created_by)
Permission(permission=None, auto_create=False, obj=None, help_text=None)

Make task available for users with specific permission, aceps permissions name of callable :: Process -> permission_name:

.Permission('my_app.can_approve')
.Permission(lambda process: 'my_app.department_manager_{}'.format(process.depratment.pk))

Task specific permission could be auto created during migration:

# Creates `processcls_app.can_do_task_processcls` permission
do_task = View().Permission(auto_create=True)

# You can specify permission codename and description right here
# The following creates `processcls_app.can_execure_task` permission
do_task = View().Permission('can_execute_task', help_text='Custom text', auto_create=True)
viewflow.flow.flow_view(**lock_args)

Decorator that locks and runs the flow view in transaction.

Expects view with the signature (request, activation, **kwargs) or CBV view that implements TaskActivation, in this case, dispatch with would be called with (request, **kwargs)

Returns (request, flow_task, process_pk, task_pk, **kwargs)

Views decorated with flow_view decorator executed in transaction. If an error happens in view or during nexttask activation, database rollback will be performed and no changes will be stored.

Job

Job task represents user task performed in background by celery

viewflow.flow.flow_job(**lock_args)

Decorator that prepares celery task for execution

Makes celery job function with the following signature (flow_task-strref, process_pk, task_pk, **kwargs)

Expects actual celery job function which has the following signature (activation, **kwargs) If celery task class implements activation interface, job function is called without activation instance (**kwargs)

Process instance is locked only before and after the function execution. Please avoid any process state modification during the celery job.

If any error will happens during job execution task would be moved to error state, and available for administrator desision in admin interface. If error will happens on next task ativation, for example, error raised on If conditions, job task will be commited and marked as done, but the failed for activation task would be created in error state.

If

class viewflow.flow.If(cond)

Activates one of paths based on condition

Example:

check_decision = flow.If(lambda p: p.approved) \
    .OnTrue(this.approved) \
    .OnFalse(this.end)

Switch

class viewflow.flow.Switch

Activates first path with matched condition

Split

class viewflow.flow.Split

Activates outgoing path in-parallel depends on per-path condition.

Example:

split_on_decision = flow.Split() \
    .Next(check_post, cond=lambda p: p,is_check_post_required) \
    .Next(this.perform_task_always)

Join

class viewflow.flow.Join(wait_all=True)

Waits for one or all incoming links and activates next path.

Join should be connected to one split task only.

Example:

join_on_warehouse = flow.Join() \
    .Next(this.next_task)

Function

class viewflow.flow.Function(func=None, **kwargs)

Node that can be executed within you code.

Example:

def my_function(activation, **kwargs):
    activation.prepare()
    # Your custom code
    activation.done()

class MyFlow(Flow):
    my_task = flow.Function(my_function)

MyFlow.my_task.run(**kwargs)

Note

Any kwarg you pass of the run call will be passed to the function.

Handler

class viewflow.flow.Handler(handler=None, **kwargs)

Node that can be executed automatically after task was created.

In difference to Function a Handler is not explicitly called in code, but executes automatically.

Example:

def my_handler(activation):
    # Your custom code
    pass

class MyFlow(Flow):
    previous_task = flow.View(ProcessView)                 .Next(this.my_task)

    my_task = flow.Function(my_handler)                 .Next(this.End)

    end = flow.End()

Note

You don’t need to call prepare() or done() on the activation in you handler callback.

Signal

class viewflow.flow.Signal(signal, receiver, sender=None, **kwargs)

Node that connects to a django signal.

Example:

create_model = flow.Signal(post_create, my_receiver, sender=MyModelCls)

Note

Other than the StartSignal you will need to provide activation for your receiver yourself. This can be done using the flow_signal() decorator.

End

class viewflow.flow.End

Ends process event.