查询
query = db.session.query(
CollectionCase.id, CollectionCase.label, CollectionCase.remark,
CollectionCase.case_time, VehiclePilotFile.file_type,
VehiclePilotFile.begin_time, VehiclePilotFile.end_time,
VehiclePilotFile.vehicle_id).join(VehiclePilotFile).join(User).filter(
User.username == username).slice((page - 1) * per_page,
page * per_page).all()
listen event
from sqlalchemy.event import listen
assoc_version_vehicle = Table(
'version_vehicle',
Model.metadata,
Column('id',
Integer,
Sequence("version_vehicle_aid_seq", start=1, increment=1),
primary_key=True),
Column('version_id', Integer, ForeignKey('version.id')),
Column('vehicle_id', Integer, ForeignKey('vehicle.id')),
)
class Vehicle(Model):
"""车辆
"""
id = Column(Integer,
Sequence("vehicle_aid_seq", start=1, increment=1),
primary_key=True)
...
class Version(Model):
"""车辆自动驾驶版本
"""
id = Column(Integer,
Sequence("version_aid_seq", start=1, increment=1),
primary_key=True)
...
vehicles = relationship('Vehicle',
secondary=assoc_version_vehicle,
backref='version')
def push_version_after_version_create_or_update_listener(
mapper, connection, target):
"""push version after create version
"""
from app.utils import request_car_drive_mode
version_name = target.name
for car in target.vehicles:
if request_car_drive_mode(car.carid) == "ready":
threading.Thread(target=push_version_by_name_and_carid,
args=(version_name, car.carid)).start()
def push_version_after_vesion_append_vehicle_listener(mapper, connection,
target):
"""push version after create assoc_version_vehicle
"""
print(target)
from app.tasks import push_version_by_name_and_carid
version_name = mapper.name
car = connection
threading.Thread(target=push_version_by_name_and_carid,
args=(version_name, car.carid, True)).start()
# listen Version 的 after_insert 和 after_update 事件
listen(Version, 'after_insert',
push_version_after_version_create_or_update_listener)
listen(Version, 'after_update',
push_version_after_version_create_or_update_listener)
# listen Version 添加 vehicle 事件
listen(Version.vehicles, 'append',
push_version_after_vesion_append_vehicle_listener)
问题汇总
in_查询批量更新时报错
参考资料:https://stackoverflow.com/questions/33703070/using-sqlalchemy-result-set-for-update
代码示例:
session.query(cls).filter(cls.id.in_(ids)).update({"password": ******})
查询时采用in_
时,update 会报错,错误信息如下:
InvalidRequestError: Could not evaluate current criteria in Python. Specify 'fetch' or False for the synchronize_session parameter.
设置 synchronize_session 的值为 False 或者 fetch 即可
session.query(cls).filter_by(id=id).update({"password": ******}, synchronize_session=False)
server_default 和 default
看这个代码:
from sqlalchemy.sql import func
class Version(Model):
...
time_created = Column(DateTime(timezone=True), server_default=func.now())
time_updated = Column(DateTime(timezone=True), default=func.now(), onupdate=func.now())
为什么 time_created 用 server_default,time_updated 用 default?
server_default 会更新已有数据, default 不会