Yet another, super lightweight MySQL ORM in Python.
Project description
LightORM 💡🧺🗄️
It is what the name means. Light yet Do Stuffs (Almost) .
Yet another, super
lightweight
MySQL
ORM (Object-Relational Mapping)
in Python.
💾 How to Install
pip install lightorm
If you find this project interesting, you can always star it. :D
📷 A Background for LightORM
The Dall-E
prompt was
light yagami from death note is writing code as darth vader yakuza style
🔌 Connecting and Querying
LightORM
is designed to reduce the overhead of doing everything and focus on
doing things in a very light way. First connect with the database,
import os
from dotenv import load_dotenv
load_dotenv()
from lightorm import Database, Field, _AND
table_name = 'person_table'
db = Database(
db_name=os.environ.get('DB_NAME'),
user=os.environ.get('DB_USER'),
password=os.environ.get('DB_PASSWORD'),
host=os.environ.get('DB_HOST'),
port=int(os.environ.get('DB_PORT'))
)
Here, the database
configs are read from .env
file. And the db is instantiated.
💉🗄️ Insert a Record
Inserting a record is quiet simple.
person = {
'name': 'John Doe',
'age': 23,
'address': 'LA',
'hobby': 'writing'
}
row_id = db.table(table_name).insert(**person)
If successful, the insert(**person)
will return the row-id
🗃️🗄️ Insert Multiple Records
Inserting multiple record is simply feeding the insert_many
method with the
list of the dictionary of the records to be inserted.
persons = [
{
'name': 'John Doe',
'age': 23,
'address': 'LA',
'hobby': 'writing'
},
{
'name': 'Jane Doe',
'age': 27,
'address': 'Kentucky',
'hobby': 'sleeping'
}
]
record_inserted = db.table(table_name).insert_many(rows=persons)
Upon successful insertion, insert_many
returns the number of rows inserted.
🗄️🧩 Getting Data Back (SELECT)
For getting all data back, simply
users = db.table(table_name).select().execute()
or simply with empty where
clause [ not suggested, but it will work]
users = db.table(table_name).select().where().execute()
Note, there is an extra method execute
, required for the operation.
🥣🗄️ Filtering
lightorm
is tested with several filtering, and it is simply chaining
filtering clauses. Let's see
🥣🗄️ Filtering users by age
and hobby
from lightorm import Field,_AND
...
...
users = db.table(table_name).select().where([
Field('age').eq(33), _AND,
Field('hobby').eq('programming')
]).execute()
🥣🗄️ Filtering users
where age
is less than 33
users = db.table(table_name).select().where([
Field('age').lt(35)
]).execute()
print('users:', users)
🥣🗄️ Filtering users where adress
is in `['Dhaka','Khulna']
users = db.table(table_name).select().where([
Field('address').find_in(['Dhaka', 'Khulna'])
]).execute()
print('users:', users)
🚥🗄️ Updating the Records
update()
method receivers key-val
dict for fields to be changed. Simply,
v_set = {
'age': 65,
'hobby': 'sleeping'
}
user_count = db.table(table_name).update(**v_set).where([
Field('address').eq('Dhaka')
]).execute()
print('Affected Row:', user_count)
v_set
is the dict
that is the followed by SET
value in sql
query.
After successful query, it returns rows affected.
✏️🗄️ Deleting Records
delete()
works just like the select()
method. It returns boolean
True
if
is the query is successfully executed.
delete_flag = self.db.table(self.table_name).delete().where([
Field('hobby').eq('sleeping')
]).execute()
print('Delete-Flag:', delete_flag)
📜 Almost Full Example
import os
import random
import unittest
from dotenv import load_dotenv
from lightorm import Database, Field, _AND
load_dotenv()
class TestTinyOrm(unittest.TestCase):
table_name = os.environ.get('TABLE_NAME')
db = Database(
db_name=os.environ.get('DB_NAME'),
user=os.environ.get('DB_USER'),
password=os.environ.get('DB_PASSWORD'),
host=os.environ.get('DB_HOST'),
port=int(os.environ.get('DB_PORT'))
)
first_name = ['John', 'Jane', 'Jason', 'Guido', 'Martin', 'Rob']
last_name = ['Doe', 'Dee', 'Mraz', 'Van Russom', 'Fowler', 'Pike']
addresses = ['Dhaka', 'LA', 'Kentucky', 'Madrid', 'Khulna', 'Sylhet']
hobbies = ['singing', 'art', ' gaming', 'programming', 'writing', 'sleeping']
def get_name(self):
name = '{} {}'.format(random.choice(self.first_name),
random.choice(self.last_name))
return name
def get_age(self):
return random.choice([i for i in range(25, 60)])
def get_address(self):
return random.choice(self.addresses)
def get_hobby(self):
return random.choice(self.hobbies)
def test_insert(self):
person = {
'name': self.get_name(),
'age': self.get_age(),
'address': self.get_address(),
'hobby': self.get_hobby()
}
row_id = self.db.table(self.table_name).insert(**person)
print('row-id:', row_id)
def test_insert_many(self):
persons = []
for i in range(1, 50):
person = {
'name': self.get_name(),
'age': self.get_age(),
'address': self.get_address(),
'hobby': self.get_hobby()
}
persons.append(person)
count = self.db.table(self.table_name).insert_many(rows=persons)
print('recored created:', count)
def test_get_users(self):
users = self.db.table(self.table_name).select().where().execute()
print('users:', users)
def test_get_user_by_age_and_hobby(self):
users = self.db.table(self.table_name).select().where([
Field('age').eq(33), _AND,
Field('hobby').eq('art')
]).execute()
print('users:', users)
def test_get_users_where_age_lt_33(self):
users = self.db.table(self.table_name).select().where([
Field('age').lt(35)
]).execute()
print('users:', users)
def test_get_users_where_age_is_in_list_33(self):
users = self.db.table(self.table_name).select().where([
Field('age').find_in([33])
]).execute()
print('users:', users)
def test_get_user_where_address_is_in_dhaka_or_sylhet(self):
users = self.db.table(self.table_name).select().where([
Field('address').find_in(['Dhaka', 'Khulna'])
]).execute()
print('users:', users)
def test_update_users_age_to_50_if_address_is_dhaka(self):
v_set = {
'age': 65,
'hobby': 'sleeping'
}
user_count = self.db.table(self.table_name).update(**v_set).where([
Field('address').eq('Dhaka')
]).execute()
print('Affected Row:', user_count)
def test_delete_users_where_hobby_eq_art(self):
delete_flag = self.db.table(self.table_name).delete().where([
Field('hobby').eq('sleeping')
]).execute()
print('Delete-Flag:', delete_flag)
def test_find_not_in(self):
users = self.db.table(self.table_name).select().where([
Field('age').find_not_in([49, 39, 28])
]).execute()
print('users:', len(users))
def test_paginated_query(self):
size = 10
users = self.db.table(self.table_name).select().where(
[
Field('age').eq(65)
]
).paginate(0, size).execute()
print('len(users):', len(users))
print('users:', users)
if __name__ == '__main__':
unittest.main()
🔮 Upcoming Features
- Raw
SQL
execution. - Adding Pagination and Sorting
- Adding proper Logging and debugging messages.
- Adding
Aggregate Function
function in the ORM.
🧚 Inspiration
Peewee
, SQLalchemy
djangoORM
and all the other ORMs out there, making Developers
life easier.
Change logs
0.0.6
- Adding
find_not_in()
method.
0.0.7
- Adding
paginate()
method.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file lightorm-0.0.7.tar.gz
.
File metadata
- Download URL: lightorm-0.0.7.tar.gz
- Upload date:
- Size: 7.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.8.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0846dc4ae16e60ec6e265570087b1e74d15f086a1c57d368b0cdd3cf9499a279 |
|
MD5 | f17065be8feb34657f45ea779f86bea1 |
|
BLAKE2b-256 | 0b946dee9a7a122fcda7d76fc72fd4c51372ddf4034737bf8549b5f9d89598e4 |
File details
Details for the file lightorm-0.0.7-py3-none-any.whl
.
File metadata
- Download URL: lightorm-0.0.7-py3-none-any.whl
- Upload date:
- Size: 5.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.8.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d485bfea9af34790356be2b8d5b233f971812d78c17cf1c2192208f7275dccaa |
|
MD5 | 9b49c2b3b3f5885481535f310c300d34 |
|
BLAKE2b-256 | 131360625760146344d6549fcee463d54ea44b26f60fa7b6232ae0b9766ca96e |