Skip to main content

Microservice with django

Project description

Tutorial

Install

pip install bgtasks

Configuration

settings.py
AMQP = {
    'USERNAME': 'guest',
    'PASSWORD': 'guest',
    'VHOST': '/',
    'HOST': 'localhost',
    'PORT': 5672,
    'RPC_SLEEP_TIME': 0.005
}

RPC methods

app1 views.py
import json
from django.http import HttpResponse
from bgtasks import RPCClient

def add_user(request):
    data = {
        'username': request.POST.get('username'),
        'password': request.POST.get('password')
    }
    client = RPCClient()
    # Create user and get id
    data = client.call('user.add', data)
    content = json.dumps({"success": true, "user_id": data})
    return HttpResponse(content)
app2 tasks.py
from django.contrib.auth import User
from bgtasks import rpc_tasks

@rpc_tasks('user.add')
def add_user(data):
    user = User.objects.create(username=data['username'])
    user.set_password(data['password'])
    return user.id

for listing rabbitmq

python manange.py tasks

Job methods

app1 views.py
from django.http import HttpResponse
from bgtasks import JobClient

def send_email(request):
    data = {
        'email': request.POST.get('email')
    }
    client = JobClient()
    data = client.call('user.send_mail', data)
    content = json.dumps({"success": true})
    return HttpResponse(content)
app2 tasks.py
from django.contrib.auth import User
from django.core.mail import send_mail
from bgtasks import job_tasks

@job_tasks('user.send_mail')
def send_mail(data):
    send_mail(
        'Hi!',
        'Hello World.',
        'from@example.com',
        [data['email']],
        fail_silently=False,
    )

Testing

add ENVIRONMENT = 'test' on settings.py

import json
from django.test import TestCase
from django.test import Client
from bgtasks import rpc_tasks


@rpc_tasks('user.add')
def add_user(data):
    return 1

class RPCTestCase(TestCase):
    def test_add_user(self):
        data = {'username': 'john', 'password': 'smith'}
        c = Client()
        response = c.post('/user/add/', data)
        data = json.loads(response.content)
        self.assertEqual(response.status_code, 200)
        self.assertEqual(data['user_id'], 1)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for bgtasks, version 0.0.15
Filename, size File type Python version Upload date Hashes
Filename, size bgtasks-0.0.15-py3-none-any.whl (9.5 kB) File type Wheel Python version py3 Upload date Hashes View
Filename, size bgtasks-0.0.15.tar.gz (6.0 kB) File type Source Python version None Upload date Hashes View

Supported by

Pingdom Pingdom Monitoring Google Google Object Storage and Download Analytics Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page